Padrões de Arquitetura

Comunicação (Feromônios)

Estratégias de mensageria, chamadas síncronas e assíncronas para garantir o fluxo perfeito do Pólen entre as Operárias e Zangões da sua Colmeia.

O Conceito de Feromônios na Colmeia

Na natureza, os feromônios são sinais químicos que as abelhas usam para comunicação descentralizada. Na bee.arq, os Feromônios representam o sistema de mensagens que permite:

  • Coordenação sem acoplamento: Componentes se comunicam sem conhecer detalhes internos uns dos outros
  • Resiliência: Mensagens podem ser reprocessadas em caso de falha
  • Escalabilidade: Padrões diferentes para diferentes necessidades de volume e velocidade

Assim como na colmeia, onde diferentes feromônios transmitem diferentes informações (alerta, localização de comida, etc.), na bee.arq temos diferentes padrões de comunicação para diferentes contextos.

graph LR A[Operária] -->|Feromônio| B[Zangão] B -->|Feromônio| C[Operária] C -->|Feromônio| D[Rainha] style A,B,C fill:#FFE0B2,stroke:#FB8C00 style D fill:#C8E6C9,stroke:#43A047

O Problema

Num sistema distribuído como a `bee.arq`, a comunicação entre os componentes (Operárias e Zangões) é um ponto crítico. Uma escolha errada do mecanismo de comunicação ("Feromônio") pode levar a gargalos de performance, baixa resiliência ou complexidade desnecessária. A questão fundamental é: Como os meus agentes devem comunicar entre si de forma eficiente e escalável?

Estudo de Caso: Processamento de Documentos

Imagine um sistema que processa documentos PDF enviados por usuários. Cada documento precisa passar por OCR, classificação, extração de dados e análise pela LLM. Como coordenar esse fluxo de forma eficiente?

  • Se usar apenas chamadas síncronas, uma falha no OCR bloquearia todo o sistema
  • Se usar apenas filas, a latência total seria muito alta
  • Como garantir que o documento chegue à Rainha (LLM) com todos os dados necessários?

Solução: Padrões de Feromônios

A `bee.arq` não prescreve um único mecanismo de comunicação, mas sim o uso do "feromônio" correto para cada situação. Existem múltiplos padrões de comunicação que podem ser utilizados, sozinhos ou em conjunto, dentro de uma mesma Colmeia.

Princípio Chave: Escolha o padrão mais simples que atenda seus requisitos de latência, resiliência e volume.

Padrão de Feromônio: Metadados e Schema

Feromônios são mensagens estruturadas que carregam o estado do 'Pólen' através da colmeia. Para garantir resiliência, rastreabilidade e segurança, recomenda-se o uso de metadados padronizados e validação formal via schema.

Metadados Recomendados

{
  "event_version": "1.0",
  "timestamp": "2025-06-29T18:00:00Z",
  "trace_id": "abc-123-xyz", // Para correlacionar todo o fluxo
  "span_id": "def-456", // Para esta operação específica
  "attempt": 1, // Tentativa atual
  "source": "OperariaDeOCR@ColmeiaDocumentos", // Origem do evento
  "destination": ["OperariaClassificacao", "OperariaLog"], // Destinos esperados (opcional)
  "pollen_id": "uuid-123", // ID do dado sendo processado
  "type": "document.ocr.completed", // Tipo semântico do evento
  "status": "success|partial|failed", // Estado do processamento
  "error_details": { // Em caso de falha
    "code": "ERR_OCR_FAILED",
    "message": "Low confidence in text extraction"
  },
  "payload": { // Dados específicos do domínio
    "source_uri": "s3://bucket/doc.pdf",
    "pages_processed": 10,
    "confidence_score": 0.87
  },
  "metadata": { // Dados adicionais
    "colmeia": "DocumentProcessing",
    "priority": "high",
    "expires_at": "2025-06-30T18:00:00Z" // Para auto-expurgo
  }
}

Schema de Validação

Todos os Feromônios devem ser validados contra um contrato formal. A seguir, exemplos em Pydantic (Python) e TypeBox (TypeScript) ilustram como garantir integridade e consistência:


from pydantic import BaseModel, Field
from typing import Any, Optional, List
from datetime import datetime

class ErrorDetails(BaseModel):
    code: str
    message: str
    stack_trace: Optional[str] = None

class Feromonio(BaseModel):
    event_version: str = Field("1.0", pattern=r"^\d+\.\d+$")
    timestamp: datetime
    trace_id: str = Field(..., min_length=1)
    span_id: str = Field(..., min_length=1)
    attempt: int = Field(1, ge=1)
    source: str
    destination: Optional[List[str]] = None
    pollen_id: str = Field(..., min_length=1)
    type: str = Field(..., pattern=r"^[a-z]+\.[a-z]+(\.[a-z]+)*$")
    status: str = Field(..., pattern=r"^(success|partial|failed)$")
    error_details: Optional[ErrorDetails] = None
    payload: dict[str, Any]
    metadata: Optional[dict[str, Any]] = None

import { Type } from "@sinclair/typebox";

const ErrorDetails = Type.Object({
    code: Type.String(),
    message: Type.String(),
    stack_trace: Type.Optional(Type.String())
});

export const Feromonio = Type.Object({
    event_version: Type.String({ pattern: "^\\d+\\.\\d+$", default: "1.0" }),
    timestamp: Type.String({ format: "date-time" }),
    trace_id: Type.String({ minLength: 1 }),
    span_id: Type.String({ minLength: 1 }),
    attempt: Type.Number({ minimum: 1, default: 1 }),
    source: Type.String(),
    destination: Type.Optional(Type.Array(Type.String())),
    pollen_id: Type.String({ minLength: 1 }),
    type: Type.String({ pattern: "^[a-z]+\\.[a-z]+(\\.[a-z]+)*$" }),
    status: Type.String({ pattern: "^(success|partial|failed)$" }),
    error_details: Type.Optional(ErrorDetails),
    payload: Type.Record(Type.String(), Type.Any()),
    metadata: Type.Optional(Type.Record(Type.String(), Type.Any()))
});
flowchart TD Gerador["Operária"] -->|Cria Feromônio| F["Feromônio"] F -->|Validado por| Schema["Schema de Validação"] Schema -->|Aprovado| Prox["Próximo Agente"] F -.-|Erro| DLQ["Fila de Erros"] classDef fclass fill:#FFF3CD,stroke:#FFA000 class F fclass

Padrão: Requisição-Resposta (Síncrono)

Descrição

Este é o padrão de comunicação mais simples e direto. Uma Operária faz uma chamada direta a um Zangão (geralmente via API REST ou gRPC) e aguarda ativamente pela resposta antes de continuar o seu processamento. A comunicação é um-para-um e acontece em tempo real.

Quando Usar

  • Para tarefas de baixa latência onde a resposta é necessária imediatamente para o próximo passo.
  • Quando o processamento do Zangão é rápido (tipicamente menos de alguns segundos).
  • Em fluxos de trabalho mais simples ou quando o acoplamento temporal não é um problema.
Exemplo Prático: Validação de Formulário

Uma Operária recebe dados de um formulário web e precisa validar imediatamente com um Zangão especialista em regras de negócio antes de prosseguir.

Implementação

A implementação mais comum é uma chamada HTTP a um endpoint de API. A Operária age como cliente e o Zangão como servidor.


# Exemplo de chamada síncrona numa Operária
import requests

ZANGAO_URL = "http://zangao-classifier.internal/classify"

def chamar_zangao_sincrono(pollen):
    print(f"Enviando Pólen {pollen['pollen_id']} para o Zangão...")
    try:
        response = requests.post(ZANGAO_URL, json=pollen, timeout=10)
        response.raise_for_status() # Lança erro se status for 4xx ou 5xx
        
        # Operária aguarda e processa a resposta imediatamente
        resultado_zangao = response.json()
        print("Resposta do Zangão recebida com sucesso.")
        return resultado_zangao
    except requests.RequestException as e:
        print(f"Falha na comunicação síncrona: {e}")
        raise

// Exemplo de chamada síncrona numa Operária
const ZANGAO_URL = "http://zangao-classifier.internal/classify";

async function chamarZangaoSincrono(pollen: any): Promise<any> {
    console.log(`Enviando Pólen ${pollen.pollen_id} para o Zangão...`);
    try {
        const response = await fetch(ZANGAO_URL, {
            method: 'POST',
            body: JSON.stringify(pollen),
            headers: { 'Content-Type': 'application/json' },
        });
        if (!response.ok) {
            throw new Error(`HTTP error! status: ${response.status}`);
        }
        const resultadoZangao = await response.json();
        console.log("Resposta do Zangão recebida com sucesso.");
        return resultadoZangao;
    } catch (error) {
        console.error(`Falha na comunicação síncrona: ${error}`);
        throw error;
    }
}
Atenção: Para chamadas síncronas, sempre defina um timeout adequado para evitar que a Operária fique indefinidamente bloqueada.

Padrão: Fila de Eventos (Assíncrono)

Descrição

Neste padrão, a comunicação é desacoplada através de um intermediário, como uma fila de mensagens. A Operária "A" publica uma mensagem (o "Feromônio" contendo o "Pólen") numa fila e termina a sua execução. A Operária "B", que está a escutar essa fila, é acionada quando a mensagem chega, processa-a e pode publicar o resultado noutra fila.

Quando Usar

  • Para processos de longa duração (ex: transcrição de áudio, processamento de vídeo).
  • Para processamento em lote (batch processing) de grandes volumes de dados.
  • Quando a resiliência é crítica. Se a Operária "B" falhar, a mensagem pode ser re-processada automaticamente a partir da fila.
  • Para nivelar a carga (load leveling), evitando sobrecarregar os Zangões com picos de requisições.
Exemplo Prático: Processamento de Vídeos

Uma Operária recebe uploads de vídeos e os coloca em uma fila. Várias instâncias de Operárias especializadas em processamento de vídeo consomem da fila conforme sua capacidade.

Implementação

Utiliza serviços de mensageria como AWS SQS, Google Pub/Sub, Azure Service Bus ou RabbitMQ. O exemplo abaixo usa AWS SQS.


# operaria_A.py (A que envia o feromônio)
import boto3
import json

sqs = boto3.client('sqs')
QUEUE_URL = 'arn:aws:sqs:us-east-1:123:MinhaFilaDeTarefas'

def enviar_tarefa_para_processamento(pollen):
    print(f"Publicando Pólen {pollen['pollen_id']} na fila...")
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps(pollen)
    )
    print("Feromônio enviado. Execução da Operária A concluída.")

# ---------------------------------------------------------------------

# operaria_B.py (A que consome da fila - ex: uma função Lambda)
def handler_da_fila(event, context):
    for record in event['Records']:
        pollen = json.loads(record['body'])
        print(f"Feromônio recebido para o Pólen {pollen['pollen_id']}.")
        # Aqui, a Operária B executaria sua lógica...
        # ...
        print("Processamento concluído.")
    return {"status": "success"}

// operaria_A.ts (A que envia o feromônio)
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";

const sqsClient = new SQSClient({});
const QUEUE_URL = 'arn:aws:sqs:us-east-1:123:MinhaFilaDeTarefas';

async function enviarTarefaParaProcessamento(pollen: object): Promise<void> {
    console.log(`Publicando Pólen ${(pollen as any).pollen_id} na fila...`);
    const command = new SendMessageCommand({
        QueueUrl: QUEUE_URL,
        MessageBody: JSON.stringify(pollen),
    });
    await sqsClient.send(command);
    console.log("Feromônio enviado. Execução da Operária A concluída.");
}

// ---------------------------------------------------------------------

// operaria_B.ts (A que consome da fila - ex: uma função Lambda)
import { SQSEvent } from "aws-lambda";
export const handler = async (event: SQSEvent): Promise<void> => {
    for (const record of event.Records) {
        const pollen = JSON.parse(record.body);
        console.log(`Feromônio recebido para o Pólen ${pollen.pollen_id}.`);
        // Aqui, a Operária B executaria sua lógica...
        // ...
        console.log("Processamento concluído.");
    }
};
Dica: Configure uma Dead Letter Queue (DLQ) para lidar com mensagens que falham repetidamente, facilitando a depuração.

Padrão: Paralelismo (Fan-Out/Fan-In)

Descrição

Neste padrão avançado, uma única Operária atua como um coordenador, disparando múltiplas chamadas síncronas para diferentes Zangões em paralelo (Fan-Out). Ela então aguarda a conclusão de todas as tarefas para agregar os resultados (Fan-In) e consolidar o "Pólen" enriquecido. Este padrão é extremamente poderoso para o enriquecimento massivo de um mesmo dado sob diferentes perspetivas.

Quando Usar

  • Quando precisar de múltiplas análises independentes sobre o mesmo dado (ex: análise de sentimento, extração de entidades e sumarização de um mesmo texto).
  • Para maximizar o throughput e minimizar a latência, aproveitando a capacidade da "Floresta" (nuvem) de escalar os Zangões horizontalmente.
  • É importante notar que a latência total do processo será ditada pelo Zangão mais lento daquele lote.
Exemplo Prático: Análise de Texto Completa

Uma Operária recebe um texto e envia paralelamente para Zangões especializados em: análise de sentimento, detecção de entidades nomeadas, sumarização e classificação de tópicos.

Implementação

Utiliza programação assíncrona moderna, como a biblioteca `asyncio` em Python ou `Promise.all` em TypeScript, para executar as chamadas de API em simultâneo.

graph TD O[Operária] -->|Chama em Paralelo| Z1[Zangão A] O -->|Chama em Paralelo| Z2[Zangão B] O -->|Chama em Paralelo| Z3[Zangão C] Z1 -->|Retorna Resultado A| O Z2 -->|Retorna Resultado B| O Z3 -->|Retorna Resultado C| O O -->|Consolida Resultados| Prox(Próximo Passo) style O fill:#90CAF9 style Z1,Z2,Z3 fill:#FFE0B2 style Prox fill:#e0e0e0

# Exemplo de Fan-Out/Fan-In com asyncio
import asyncio
import httpx

ZANGAO_SENTIMENTO_URL = "..."
ZANGAO_ENTIDADES_URL = "..."
ZANGAO_SUMARIO_URL = "..."

async def call_zangao_async(client, url, pollen):
    response = await client.post(url, json=pollen, timeout=30)
    response.raise_for_status()
    return response.json()

async def operaria_paralela(pollen):
    async with httpx.AsyncClient() as client:
        tarefas = [
            call_zangao_async(client, ZANGAO_SENTIMENTO_URL, pollen),
            call_zangao_async(client, ZANGAO_ENTIDADES_URL, pollen),
            call_zangao_async(client, ZANGAO_SUMARIO_URL, pollen),
        ]
        
        # Fan-Out: executa todas as chamadas em paralelo
        # Fan-In: aguarda a conclusão de todas
        resultados = await asyncio.gather(*tarefas, return_exceptions=True)
        
        # Trata resultados (sucessos ou falhas)
        for i, resultado in enumerate(resultados):
            if isinstance(resultado, Exception):
                print(f"Erro no Zangão {i}: {resultado}")
                continue
            
            # Consolida os resultados bem-sucedidos
            if i == 0:  # Sentimento
                pollen['sentiment'] = resultado
            elif i == 1:  # Entidades
                pollen['entities'] = resultado
            elif i == 2:  # Sumário
                pollen['summary'] = resultado
        
        return pollen

// Exemplo de Fan-Out/Fan-In com Promise.all
const ZANGAO_SENTIMENTO_URL = "...";
const ZANGAO_ENTIDADES_URL = "...";
const ZANGAO_SUMARIO_URL = "...";

async function callZangaoAsync(url: string, pollen: any): Promise<any> {
    const response = await fetch(url, {
        method: 'POST',
        body: JSON.stringify(pollen),
        headers: { 'Content-Type': 'application/json' },
    });
    if (!response.ok) throw new Error(`Erro no Zangão ${url}`);
    return response.json();
}

async function operariaParalela(pollen: any): Promise<any> {
    const tarefas = [
        callZangaoAsync(ZANGAO_SENTIMENTO_URL, pollen),
        callZangaoAsync(ZANGAO_ENTIDADES_URL, pollen),
        callZangaoAsync(ZANGAO_SUMARIO_URL, pollen),
    ];

    // Fan-Out/Fan-In: executa todas as promessas em paralelo
    const resultados = await Promise.allSettled(tarefas);

    // Processa resultados
    resultados.forEach((resultado, i) => {
        if (resultado.status === 'fulfilled') {
            if (i === 0) pollen.sentiment = resultado.value;
            else if (i === 1) pollen.entities = resultado.value;
            else if (i === 2) pollen.summary = resultado.value;
        } else {
            console.error(`Falha no Zangão ${i}:`, resultado.reason);
        }
    });
    
    return pollen;
}
⚠️ Dica: Em caso de Fan-Out, considere tratar falhas parciais com `asyncio.gather(..., return_exceptions=True)` ou `Promise.allSettled` para evitar que falhas em apenas um Zangão interrompam toda a consolidação.

Padrão: Circuit Breaker para Comunicação

Descrição

Padrão que monitora falhas em chamadas a Zangões e, após um limiar, "abre o circuito" (para de fazer chamadas) por um tempo determinado, evitando sobrecarga em serviços com problemas.

Quando Usar

  • Para Zangões que acessam serviços externos instáveis
  • Quando a falha de um Zangão pode causar efeito cascata
  • Para evitar gastar recursos com chamadas que provavelmente falharão
Exemplo Prático: API de Pagamentos Externos

Uma Operária que chama um Zangão que integra com um gateway de pagamentos externo. Se o gateway começar a falhar, o Circuit Breaker evita chamadas desnecessárias até que o serviço se normalize.

Implementação


from pybreaker import CircuitBreaker

# Configuração do Circuit Breaker
ocr_breaker = CircuitBreaker(
    fail_max=3,  # 3 falhas consecutivas abrem o circuito
    reset_timeout=60  # 60 segundos para tentar novamente
)

@ocr_breaker
def call_zangao_ocr(pollen):
    # Lógica de chamada ao Zangão OCR
    response = requests.post(ZANGAO_OCR_URL, json=pollen, timeout=10)
    response.raise_for_status()
    return response.json()

# Na Operária:
try:
    result = call_zangao_ocr(pollen)
except CircuitBreakerError:
    # Circuito aberto - não chamar o serviço
    pollen['ocr_status'] = 'circuit_open'
    enviar_para_fila_de_retentativa(pollen)
                        

import { CircuitBreaker } from 'opossum';

const options = {
  timeout: 10000, // 10 segundos
  errorThresholdPercentage: 50, // Abre após 50% de falhas
  resetTimeout: 30000 // 30 segundos para resetar
};

const breaker = new CircuitBreaker(async (pollen) => {
  const response = await fetch(ZANGAO_OCR_URL, {
    method: 'POST',
    body: JSON.stringify(pollen)
  });
  if (!response.ok) throw new Error(response.statusText);
  return response.json();
}, options);

// Na Operária:
breaker.fire(pollen)
  .then(result => processResult(result))
  .catch(err => {
    if (breaker.opened) {
      // Circuito aberto - tratar adequadamente
      pollen.ocr_status = 'circuit_open';
      enviarParaFilaDeRetentativa(pollen);
    }
  });
                        
Monitoramento: Exponha métricas do Circuit Breaker (estado, taxa de falhas) para seu sistema de monitoramento.

Padrão: Comunicação da Rainha (Fallback e Histórico)

Descrição

A comunicação com a Rainha (a LLM de ponta) é a etapa mais crítica e cara do pipeline. Este padrão garante a sua **resiliência** e **eficiência**, tratando a interação com a LLM não como uma simples chamada de API, mas como uma conversa com estado que pode sobreviver a falhas.

Quando Usar

  • Sempre que a decisão final depende de uma LLM, para evitar dependência de um único provedor (vendor lock-in).
  • Em sistemas críticos onde uma falha na API da LLM não pode interromper o processo de negócio.
  • Para otimização de custos, permitindo escolher dinamicamente o provedor mais barato ou mais rápido.
  • Para análises complexas que podem ser iniciadas num provedor e validadas ou continuadas noutro.
Exemplo Prático: Análise de Contratos

Uma Operária prepara um contrato jurídico e envia para análise pela Rainha. Se a OpenAI estiver instável, o sistema automaticamente tenta com o Gemini da Google, mantendo todo o histórico da conversa.

Implementação

A Operária de Consolidação utiliza uma classe ou módulo `Queen` que abstrai a lógica de comunicação. Esta classe mantém um `chat_history` e itera sobre uma lista de provedores de LLM pré-configurados. Se o primeiro falhar, ela tenta o segundo, enviando não apenas a última pergunta, mas todo o histórico da conversa, garantindo que o contexto seja totalmente preservado.


# Exemplo de Rainha com fallback e histórico de chat
import requests
import os
import json

class Queen:
    def __init__(self, system_prompt, providers_config):
        self.providers = providers_config
        # O histórico é mantido para conversas multi-turn ou fallbacks
        self.chat_history = [{"role": "system", "content": system_prompt}]

    def make_decision(self, honey_data):
        self.chat_history.append({"role": "user", "content": f"CONTEXTO FACTUAL (MEL PURO):\n{json.dumps(honey_data, indent=2)}"})
        
        errors = {}
        for provider in self.providers:
            try:
                body = self._adapt_body(provider, self.chat_history)
                response = requests.post(provider['url'], json=body, headers=provider['headers'], timeout=45)
                response.raise_for_status()
                
                assistant_response = self._extract_response(provider['name'], response.json())
                self.chat_history.append({"role": "assistant", "content": assistant_response})
                return assistant_response

            except requests.RequestException as e:
                print(f"Erro ao chamar {provider['name']}: {e}")
                errors[provider['name']] = str(e)
        
        # Remove a última mensagem do utilizador se todos falharem
        self.chat_history.pop()
        raise Exception(f"Falha em todos os provedores de LLM: {errors}")

    def _adapt_body(self, provider, history):
        if provider['name'] == 'OpenAI':
            return {"model": provider.get('model', 'gpt-4'), "messages": history}
        # Adicionar adaptadores para outros provedores (Gemini, Claude, etc.)
        return {}

    def _extract_response(self, provider_name, data):
        if provider_name == 'OpenAI':
            return data['choices'][0]['message']['content']
        return ""

// Exemplo de Rainha com fallback e histórico de chat
interface ChatMessage {
    role: 'system' | 'user' | 'assistant';
    content: string;
}

interface LLMProvider {
    name: string;
    url: string;
    headers: Record<string, string>;
    adaptBody: (history: ChatMessage[]) => Record<string, any>;
    extractResponse: (responseData: any) => string;
}

export class Queen {
    private chatHistory: ChatMessage[];
    private providers: LLMProvider[];

    constructor(systemPrompt: string, providers: LLMProvider[]) {
        this.providers = providers;
        this.chatHistory = [{ role: 'system', content: systemPrompt }];
    }

    public async makeDecision(honey: any): Promise<string> {
        const userMessage = { 
            role: 'user' as const, 
            content: `CONTEXTO FACTUAL (MEL PURO):\n ${JSON.stringify(honey, null, 2)}`
        };
        this.chatHistory.push(userMessage);

        const errors: Record<string, string> = {};

        for (const provider of this.providers) {
            try {
                const body = provider.adaptBody(this.chatHistory);
                const response = await fetch(provider.url, {
                    method: 'POST',
                    headers: { ...provider.headers, 'Content-Type': 'application/json' },
                    body: JSON.stringify(body),
                });
                if (!response.ok) throw new Error(`API error: ${response.statusText}`);
                
                const responseData = await response.json();
                const assistantResponse = provider.extractResponse(responseData);

                this.chat_history.push({ role: 'assistant', content: assistantResponse });
                return assistant_response;

            } catch (error: any) {
                console.error(`Erro ao chamar ${provider.name}:`, error.message);
                errors[provider.name] = error.message;
            }
        }

        this.chatHistory.pop(); // Remove user message if all failed
        throw new Error(`Falha em todos os provedores de LLM: ${JSON.stringify(errors)}`);
    }
}

Padrão: Triangulação de Dados via Armazenamento

Descrição

Quando se lida com dados de grande volume (ex: ficheiros de vídeo, áudio de alta qualidade, grandes datasets), passá-los diretamente no corpo de uma requisição HTTP ou numa mensagem de fila é impraticável e ineficiente. Este padrão resolve o problema ao evitar a transferência do dado bruto, trocando apenas as suas referências (ponteiros).

Quando Usar

  • Ao processar ficheiros de multimédia (vídeos, áudios, imagens de alta resolução).
  • Com datasets de grande volume que excedem os limites de payload de APIs e serviços de mensageria.
  • Para desacoplar o processamento do armazenamento, permitindo que cada Zangão aceda aos dados de forma independente e paralela.
Exemplo Prático: Processamento de Vídeos Longos

Uma Operária recebe um vídeo de 2GB, faz upload para o storage cloud, e envia apenas a referência para Zangões de transcrição, análise de cenas e detecção de objetos.

Implementação

O fluxo envolve um serviço de armazenamento de objetos (como AWS S3, Google Cloud Storage ou Azure Blob Storage) como intermediário. A comunicação entre os agentes contém apenas a URI (o "endereço") do dado, não o dado em si.

graph TD O1["Operária A"]:::operaria_class S["☁️ Storage (S3, GCS, Blob)"]:::storage_class Z["Zangão"]:::zangao_class O2["Operária B"]:::operaria_class O1 -->|1. Upload do ficheiro processado| S O1 -->|2. Envia Feromônio com URI do ficheiro| Z Z -->|3. Lê ficheiro do Storage| S Z -->|4. Upload do resultado| S Z -->|5. Retorna URI do resultado| O1 O1 -->|Chama próximo| O2 classDef operaria_class fill:#90CAF9,stroke:#1565C0 classDef zangao_class fill:#FFE0B2,stroke:#EF6C00 classDef storage_class fill:#e0e0e0,stroke:#6c757d

# operaria.py
import boto3

s3_client = boto3.client('s3')
BUCKET_NAME = "minha-colmeia-bucket"

def operaria_de_video(pollen):
    input_uri = pollen['source_uri'] # ex: "s3://.../video.mp4"
    
    # 1. Operária baixa o ficheiro, processa (ex: corta o vídeo) e faz upload do resultado
    # ... (lógica de download e processamento) ...
    processed_video_key = f"processed/{pollen['pollen_id']}.mp4"
    # s3_client.upload_file("video_processado.mp4", BUCKET_NAME, processed_video_key)
    
    # 2. Chama o Zangão enviando apenas a referência
    pollen_para_zangao = { "audio_path": f"s3://{BUCKET_NAME}/{processed_video_key}" }
    # resultado_zangao = chamar_zangao_sincrono(pollen_para_zangao)
    
    # 5. Recebe de volta a URI do resultado do zangão e continua
    # pollen['transcribed_uri'] = resultado_zangao['transcribed_uri']

# zangao.py
def zangao_de_transcricao(pollen_com_referencia):
    audio_uri = pollen_com_referencia['audio_path']
    
    # 3. Zangão baixa o ficheiro a partir da URI
    # local_path = s3_client.download_file(...)
    
    # ... (processa o áudio e gera um ficheiro de texto) ...
    
    # 4. Zangão faz upload do seu resultado para o Storage
    transcription_key = f"transcripts/{pollen_com_referencia['pollen_id']}.txt"
    # s3_client.upload_file("transcricao.txt", BUCKET_NAME, transcription_key)
    
    # 5. Retorna a referência ao resultado
    return { "transcribed_uri": f"s3://{BUCKET_NAME}/{transcription_key}" }

// operaria.ts
import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3";

const s3Client = new S3Client({});
const BUCKET_NAME = "minha-colmeia-bucket";

async function operariaDeVideo(pollen: any): Promise<any> {
    const inputUri = pollen.source_uri;

    // 1. Operária processa o ficheiro e faz upload
    // ... (lógica para processar o vídeo) ...
    const processedVideoKey = `processed/${pollen.pollen_id}.mp4`;
    // await s3Client.send(new PutObjectCommand({ Bucket: BUCKET_NAME, Key: processedVideoKey, Body: ... }));

    // 2. Chama o Zangão com a referência
    const pollenParaZangao = { audio_path: `s3://${BUCKET_NAME}/${processedVideoKey}` };
    // const resultadoZangao = await chamarZangao(pollenParaZangao);
    
    // 5. Recebe de volta a URI do resultado
    // pollen.transcribed_uri = resultadoZangao.transcribed_uri;
    return pollen;
}

// zangao.ts
async function zangaoDeTranscricao(pollenComReferencia: any): Promise<any> {
    const audioUri = pollenComReferencia.audio_path;

    // 3. Zangão baixa o ficheiro a partir da URI
    // const { Body } = await s3Client.send(new GetObjectCommand({ Bucket: ..., Key: ... }));
    
    // ... (processa o áudio) ...
    const transcriptionText = "Texto transcrito aqui...";

    // 4. Zangão faz upload do seu resultado
    const transcriptionKey = `transcripts/${pollenComReferencia.pollen_id}.txt`;
    // await s3Client.send(new PutObjectCommand({ Bucket: BUCKET_NAME, Key: transcriptionKey, Body: transcriptionText }));

    // 5. Retorna a referência
    return { transcribed_uri: `s3://${BUCKET_NAME}/${transcriptionKey}` };
}
🔐 Recomenda-se o uso de URIs assinadas (presigned URLs) com expiração limitada para proteger o acesso a dados em repouso durante o padrão de triangulação.

Padrão: Broadcast com Seleção Contextual

Neste padrão, um mesmo Feromônio é enviado a múltiplas filas. Cada Operária escuta seletivamente apenas os eventos que fazem sentido para seu contexto, baseando-se em filtros por tipo ou domínio. Isso favorece extensibilidade e reuso entre Colmeias.

Descrição

Neste padrão, uma Operária publica um único Feromônio em múltiplas filas, e diferentes Operárias consomem a mensagem de forma seletiva, conforme o tipo ou domínio do dado. Isso facilita reuso e expansão modular da colmeia.

Quando Usar

  • Quando diferentes domínios precisam reagir ao mesmo evento (ex: OCR e Classificação).
  • Para desacoplar totalmente o produtor do consumidor.
  • Quando o dado publicado pode ter múltiplas interpretações.
Exemplo Prático: Evento de Novo Usuário

Quando um novo usuário se cadastra, múltiplos sistemas precisam ser notificados: envio de e-mail de boas-vindas, criação de perfil analítico, verificação de fraude, etc.

Implementação

A publicação é feita em todas as filas de interesse. O filtro é feito pelas próprias Operárias, com base em campos como `type`, `doc_type`, `source` ou qualquer outra heurística.

graph TD Publicador["Operária Publicadora"]:::operaria Q1["Fila OCR"]:::fila Q2["Fila Classificação"]:::fila O1["Operária OCR"]:::operaria O2["Operária Classificação"]:::operaria O3["Operária Log"]:::operaria O4["Operária Análise de Fraude"]:::operaria Publicador --> Q1 Publicador --> Q2 Q1 --> O1 Q1 --> O3 Q2 --> O2 Q2 --> O4 classDef operaria fill:#90CAF9,stroke:#1565C0 classDef fila fill:#e0e0e0,stroke:#6c757d

# Publicadora (Python)
import boto3, json

sqs = boto3.client("sqs")
filas = [
    "https://sqs.us-east-1.amazonaws.com/123456789/ocr-fila",
    "https://sqs.us-east-1.amazonaws.com/123456789/classificacao-fila"
]

feromonio = {
    "pollen_id": "123",
    "type": "document.received",
    "payload": { "source_uri": "s3://bucket/doc.pdf" }
}

for fila in filas:
    sqs.send_message(QueueUrl=fila, MessageBody=json.dumps(feromonio))

# Consumidora (ex: OCR)
def handler(event, context):
    for record in event['Records']:
        f = json.loads(record['body'])
        if f['type'] == 'document.received':
            print("Executando OCR...")

// Publicadora (TypeScript)
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";

const sqs = new SQSClient({});
const filas = [
  "https://sqs.us-east-1.amazonaws.com/123456789/ocr-fila",
  "https://sqs.us-east-1.amazonaws.com/123456789/classificacao-fila"
];

const feromonio = {
  pollen_id: "123",
  type: "document.received",
  payload: { source_uri: "s3://bucket/doc.pdf" }
};

for (const fila of filas) {
  await sqs.send(new SendMessageCommand({
    QueueUrl: fila,
    MessageBody: JSON.stringify(feromonio)
  }));
}

// Consumidora
export const handler = async (event: any) => {
  for (const record of event.Records) {
    const f = JSON.parse(record.body);
    if (f.type === "document.received") {
      console.log("Executando OCR...");
    }
  }
};

Resumo e Recomendações

Padrão Ideal Para Tecnologias Latência Custo Complexidade Resiliência
Síncrono Tarefas rápidas (<1s) REST, gRPC Baixa Baixo Baixa Média
Assíncrono Processos longos ou batch SQS, Pub/Sub, Kafka Alta Médio Média Alta
Paralelo Múltiplas análises independentes AsyncIO, Promise.all Média* Médio Alta Média
Circuit Breaker Serviços externos instáveis PyBreaker, Opossum - Baixo Média Alta
Rainha Chamadas a LLMs Multi-provedores Alta Alto Alta Alta
Triangulação Grandes volumes de dados S3, GCS, Blob Alta Baixo* Média Alta
Broadcast Eventos para múltiplos sistemas SNS, Pub/Sub Variável Médio Média Alta
* Latência ditada pelo serviço mais lento / * Custo apenas de storage

Não existe um padrão "melhor" que o outro; eles servem a propósitos diferentes e muitas vezes coexistem na mesma Colmeia. A escolha correta depende da análise dos requisitos de latência, resiliência e volume de cada etapa do seu pipeline.

Fluxo de Decisão para Escolha de Padrão

Use este diagrama como guia para selecionar o padrão de comunicação mais adequado para sua necessidade:

graph TD Start[Precisa comunicar agentes?] --> Tempo{Resposta <1s?} Tempo -->|Sim| Conhecido{Receptor conhecido?} Tempo -->|Não| Volume{Alto volume?} Conhecido -->|Sim| Sincrono[REST/gRPC] Conhecido -->|Não| Assincrono[Filas] Volume -->|Sim| Paralelo[Fan-Out/Fan-In] Volume -->|Não| Critico{Sistema crítico?} Critico -->|Sim| Rainha[Padrão Rainha] Critico -->|Não| Assincrono Sincorno -.->|Serviço instável| CB[Adicione Circuit Breaker] Assincrono -->|Grandes arquivos| Triang[Padrão Triangulação] Assincrono -->|Múltiplos consumidores| Broad[Padrão Broadcast] style Start fill:#e3f2fd,stroke:#2196F3 style Sincrono fill:#C8E6C9,stroke:#4CAF50 style Assincrono fill:#FFE0B2,stroke:#FFA000 style Paralelo fill:#D1C4E9,stroke:#7E57C2 style Rainha fill:#F8BBD0,stroke:#E91E63 style CB fill:#B3E5FC,stroke:#03A9F4 style Triang fill:#B2DFDB,stroke:#009688 style Broad fill:#FFCCBC,stroke:#FF5722
Dica: Muitas vezes você usará combinações desses padrões. Por exemplo, pode ter um fluxo principal assíncrono com filas, mas chamadas síncronas para Zangões dentro de uma Operária, protegidas por Circuit Breaker.