Comunicação (Feromônios)
Estratégias de mensageria, chamadas síncronas e assíncronas para garantir o fluxo perfeito do Pólen entre as Operárias e Zangões da sua Colmeia.
O Conceito de Feromônios na Colmeia
Na natureza, os feromônios são sinais químicos que as abelhas usam para comunicação descentralizada. Na bee.arq, os Feromônios representam o sistema de mensagens que permite:
- Coordenação sem acoplamento: Componentes se comunicam sem conhecer detalhes internos uns dos outros
- Resiliência: Mensagens podem ser reprocessadas em caso de falha
- Escalabilidade: Padrões diferentes para diferentes necessidades de volume e velocidade
Assim como na colmeia, onde diferentes feromônios transmitem diferentes informações (alerta, localização de comida, etc.), na bee.arq temos diferentes padrões de comunicação para diferentes contextos.
O Problema
Num sistema distribuído como a `bee.arq`, a comunicação entre os componentes (Operárias e Zangões) é um ponto crítico. Uma escolha errada do mecanismo de comunicação ("Feromônio") pode levar a gargalos de performance, baixa resiliência ou complexidade desnecessária. A questão fundamental é: Como os meus agentes devem comunicar entre si de forma eficiente e escalável?
Estudo de Caso: Processamento de Documentos
Imagine um sistema que processa documentos PDF enviados por usuários. Cada documento precisa passar por OCR, classificação, extração de dados e análise pela LLM. Como coordenar esse fluxo de forma eficiente?
- Se usar apenas chamadas síncronas, uma falha no OCR bloquearia todo o sistema
- Se usar apenas filas, a latência total seria muito alta
- Como garantir que o documento chegue à Rainha (LLM) com todos os dados necessários?
Solução: Padrões de Feromônios
A `bee.arq` não prescreve um único mecanismo de comunicação, mas sim o uso do "feromônio" correto para cada situação. Existem múltiplos padrões de comunicação que podem ser utilizados, sozinhos ou em conjunto, dentro de uma mesma Colmeia.
Padrão de Feromônio: Metadados e Schema
Feromônios são mensagens estruturadas que carregam o estado do 'Pólen' através da colmeia. Para garantir resiliência, rastreabilidade e segurança, recomenda-se o uso de metadados padronizados e validação formal via schema.
Metadados Recomendados
{
"event_version": "1.0",
"timestamp": "2025-06-29T18:00:00Z",
"trace_id": "abc-123-xyz", // Para correlacionar todo o fluxo
"span_id": "def-456", // Para esta operação específica
"attempt": 1, // Tentativa atual
"source": "OperariaDeOCR@ColmeiaDocumentos", // Origem do evento
"destination": ["OperariaClassificacao", "OperariaLog"], // Destinos esperados (opcional)
"pollen_id": "uuid-123", // ID do dado sendo processado
"type": "document.ocr.completed", // Tipo semântico do evento
"status": "success|partial|failed", // Estado do processamento
"error_details": { // Em caso de falha
"code": "ERR_OCR_FAILED",
"message": "Low confidence in text extraction"
},
"payload": { // Dados específicos do domínio
"source_uri": "s3://bucket/doc.pdf",
"pages_processed": 10,
"confidence_score": 0.87
},
"metadata": { // Dados adicionais
"colmeia": "DocumentProcessing",
"priority": "high",
"expires_at": "2025-06-30T18:00:00Z" // Para auto-expurgo
}
}
Schema de Validação
Todos os Feromônios devem ser validados contra um contrato formal. A seguir, exemplos em Pydantic (Python) e TypeBox (TypeScript) ilustram como garantir integridade e consistência:
from pydantic import BaseModel, Field
from typing import Any, Optional, List
from datetime import datetime
class ErrorDetails(BaseModel):
code: str
message: str
stack_trace: Optional[str] = None
class Feromonio(BaseModel):
event_version: str = Field("1.0", pattern=r"^\d+\.\d+$")
timestamp: datetime
trace_id: str = Field(..., min_length=1)
span_id: str = Field(..., min_length=1)
attempt: int = Field(1, ge=1)
source: str
destination: Optional[List[str]] = None
pollen_id: str = Field(..., min_length=1)
type: str = Field(..., pattern=r"^[a-z]+\.[a-z]+(\.[a-z]+)*$")
status: str = Field(..., pattern=r"^(success|partial|failed)$")
error_details: Optional[ErrorDetails] = None
payload: dict[str, Any]
metadata: Optional[dict[str, Any]] = None
import { Type } from "@sinclair/typebox";
const ErrorDetails = Type.Object({
code: Type.String(),
message: Type.String(),
stack_trace: Type.Optional(Type.String())
});
export const Feromonio = Type.Object({
event_version: Type.String({ pattern: "^\\d+\\.\\d+$", default: "1.0" }),
timestamp: Type.String({ format: "date-time" }),
trace_id: Type.String({ minLength: 1 }),
span_id: Type.String({ minLength: 1 }),
attempt: Type.Number({ minimum: 1, default: 1 }),
source: Type.String(),
destination: Type.Optional(Type.Array(Type.String())),
pollen_id: Type.String({ minLength: 1 }),
type: Type.String({ pattern: "^[a-z]+\\.[a-z]+(\\.[a-z]+)*$" }),
status: Type.String({ pattern: "^(success|partial|failed)$" }),
error_details: Type.Optional(ErrorDetails),
payload: Type.Record(Type.String(), Type.Any()),
metadata: Type.Optional(Type.Record(Type.String(), Type.Any()))
});
Padrão: Requisição-Resposta (Síncrono)
Descrição
Este é o padrão de comunicação mais simples e direto. Uma Operária faz uma chamada direta a um Zangão (geralmente via API REST ou gRPC) e aguarda ativamente pela resposta antes de continuar o seu processamento. A comunicação é um-para-um e acontece em tempo real.
Quando Usar
- Para tarefas de baixa latência onde a resposta é necessária imediatamente para o próximo passo.
- Quando o processamento do Zangão é rápido (tipicamente menos de alguns segundos).
- Em fluxos de trabalho mais simples ou quando o acoplamento temporal não é um problema.
Exemplo Prático: Validação de Formulário
Uma Operária recebe dados de um formulário web e precisa validar imediatamente com um Zangão especialista em regras de negócio antes de prosseguir.
Implementação
A implementação mais comum é uma chamada HTTP a um endpoint de API. A Operária age como cliente e o Zangão como servidor.
# Exemplo de chamada síncrona numa Operária
import requests
ZANGAO_URL = "http://zangao-classifier.internal/classify"
def chamar_zangao_sincrono(pollen):
print(f"Enviando Pólen {pollen['pollen_id']} para o Zangão...")
try:
response = requests.post(ZANGAO_URL, json=pollen, timeout=10)
response.raise_for_status() # Lança erro se status for 4xx ou 5xx
# Operária aguarda e processa a resposta imediatamente
resultado_zangao = response.json()
print("Resposta do Zangão recebida com sucesso.")
return resultado_zangao
except requests.RequestException as e:
print(f"Falha na comunicação síncrona: {e}")
raise
// Exemplo de chamada síncrona numa Operária
const ZANGAO_URL = "http://zangao-classifier.internal/classify";
async function chamarZangaoSincrono(pollen: any): Promise<any> {
console.log(`Enviando Pólen ${pollen.pollen_id} para o Zangão...`);
try {
const response = await fetch(ZANGAO_URL, {
method: 'POST',
body: JSON.stringify(pollen),
headers: { 'Content-Type': 'application/json' },
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const resultadoZangao = await response.json();
console.log("Resposta do Zangão recebida com sucesso.");
return resultadoZangao;
} catch (error) {
console.error(`Falha na comunicação síncrona: ${error}`);
throw error;
}
}
Padrão: Fila de Eventos (Assíncrono)
Descrição
Neste padrão, a comunicação é desacoplada através de um intermediário, como uma fila de mensagens. A Operária "A" publica uma mensagem (o "Feromônio" contendo o "Pólen") numa fila e termina a sua execução. A Operária "B", que está a escutar essa fila, é acionada quando a mensagem chega, processa-a e pode publicar o resultado noutra fila.
Quando Usar
- Para processos de longa duração (ex: transcrição de áudio, processamento de vídeo).
- Para processamento em lote (batch processing) de grandes volumes de dados.
- Quando a resiliência é crítica. Se a Operária "B" falhar, a mensagem pode ser re-processada automaticamente a partir da fila.
- Para nivelar a carga (load leveling), evitando sobrecarregar os Zangões com picos de requisições.
Exemplo Prático: Processamento de Vídeos
Uma Operária recebe uploads de vídeos e os coloca em uma fila. Várias instâncias de Operárias especializadas em processamento de vídeo consomem da fila conforme sua capacidade.
Implementação
Utiliza serviços de mensageria como AWS SQS, Google Pub/Sub, Azure Service Bus ou RabbitMQ. O exemplo abaixo usa AWS SQS.
# operaria_A.py (A que envia o feromônio)
import boto3
import json
sqs = boto3.client('sqs')
QUEUE_URL = 'arn:aws:sqs:us-east-1:123:MinhaFilaDeTarefas'
def enviar_tarefa_para_processamento(pollen):
print(f"Publicando Pólen {pollen['pollen_id']} na fila...")
sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(pollen)
)
print("Feromônio enviado. Execução da Operária A concluída.")
# ---------------------------------------------------------------------
# operaria_B.py (A que consome da fila - ex: uma função Lambda)
def handler_da_fila(event, context):
for record in event['Records']:
pollen = json.loads(record['body'])
print(f"Feromônio recebido para o Pólen {pollen['pollen_id']}.")
# Aqui, a Operária B executaria sua lógica...
# ...
print("Processamento concluído.")
return {"status": "success"}
// operaria_A.ts (A que envia o feromônio)
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
const sqsClient = new SQSClient({});
const QUEUE_URL = 'arn:aws:sqs:us-east-1:123:MinhaFilaDeTarefas';
async function enviarTarefaParaProcessamento(pollen: object): Promise<void> {
console.log(`Publicando Pólen ${(pollen as any).pollen_id} na fila...`);
const command = new SendMessageCommand({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify(pollen),
});
await sqsClient.send(command);
console.log("Feromônio enviado. Execução da Operária A concluída.");
}
// ---------------------------------------------------------------------
// operaria_B.ts (A que consome da fila - ex: uma função Lambda)
import { SQSEvent } from "aws-lambda";
export const handler = async (event: SQSEvent): Promise<void> => {
for (const record of event.Records) {
const pollen = JSON.parse(record.body);
console.log(`Feromônio recebido para o Pólen ${pollen.pollen_id}.`);
// Aqui, a Operária B executaria sua lógica...
// ...
console.log("Processamento concluído.");
}
};
Padrão: Paralelismo (Fan-Out/Fan-In)
Descrição
Neste padrão avançado, uma única Operária atua como um coordenador, disparando múltiplas chamadas síncronas para diferentes Zangões em paralelo (Fan-Out). Ela então aguarda a conclusão de todas as tarefas para agregar os resultados (Fan-In) e consolidar o "Pólen" enriquecido. Este padrão é extremamente poderoso para o enriquecimento massivo de um mesmo dado sob diferentes perspetivas.
Quando Usar
- Quando precisar de múltiplas análises independentes sobre o mesmo dado (ex: análise de sentimento, extração de entidades e sumarização de um mesmo texto).
- Para maximizar o throughput e minimizar a latência, aproveitando a capacidade da "Floresta" (nuvem) de escalar os Zangões horizontalmente.
- É importante notar que a latência total do processo será ditada pelo Zangão mais lento daquele lote.
Exemplo Prático: Análise de Texto Completa
Uma Operária recebe um texto e envia paralelamente para Zangões especializados em: análise de sentimento, detecção de entidades nomeadas, sumarização e classificação de tópicos.
Implementação
Utiliza programação assíncrona moderna, como a biblioteca `asyncio` em Python ou `Promise.all` em TypeScript, para executar as chamadas de API em simultâneo.
# Exemplo de Fan-Out/Fan-In com asyncio
import asyncio
import httpx
ZANGAO_SENTIMENTO_URL = "..."
ZANGAO_ENTIDADES_URL = "..."
ZANGAO_SUMARIO_URL = "..."
async def call_zangao_async(client, url, pollen):
response = await client.post(url, json=pollen, timeout=30)
response.raise_for_status()
return response.json()
async def operaria_paralela(pollen):
async with httpx.AsyncClient() as client:
tarefas = [
call_zangao_async(client, ZANGAO_SENTIMENTO_URL, pollen),
call_zangao_async(client, ZANGAO_ENTIDADES_URL, pollen),
call_zangao_async(client, ZANGAO_SUMARIO_URL, pollen),
]
# Fan-Out: executa todas as chamadas em paralelo
# Fan-In: aguarda a conclusão de todas
resultados = await asyncio.gather(*tarefas, return_exceptions=True)
# Trata resultados (sucessos ou falhas)
for i, resultado in enumerate(resultados):
if isinstance(resultado, Exception):
print(f"Erro no Zangão {i}: {resultado}")
continue
# Consolida os resultados bem-sucedidos
if i == 0: # Sentimento
pollen['sentiment'] = resultado
elif i == 1: # Entidades
pollen['entities'] = resultado
elif i == 2: # Sumário
pollen['summary'] = resultado
return pollen
// Exemplo de Fan-Out/Fan-In com Promise.all
const ZANGAO_SENTIMENTO_URL = "...";
const ZANGAO_ENTIDADES_URL = "...";
const ZANGAO_SUMARIO_URL = "...";
async function callZangaoAsync(url: string, pollen: any): Promise<any> {
const response = await fetch(url, {
method: 'POST',
body: JSON.stringify(pollen),
headers: { 'Content-Type': 'application/json' },
});
if (!response.ok) throw new Error(`Erro no Zangão ${url}`);
return response.json();
}
async function operariaParalela(pollen: any): Promise<any> {
const tarefas = [
callZangaoAsync(ZANGAO_SENTIMENTO_URL, pollen),
callZangaoAsync(ZANGAO_ENTIDADES_URL, pollen),
callZangaoAsync(ZANGAO_SUMARIO_URL, pollen),
];
// Fan-Out/Fan-In: executa todas as promessas em paralelo
const resultados = await Promise.allSettled(tarefas);
// Processa resultados
resultados.forEach((resultado, i) => {
if (resultado.status === 'fulfilled') {
if (i === 0) pollen.sentiment = resultado.value;
else if (i === 1) pollen.entities = resultado.value;
else if (i === 2) pollen.summary = resultado.value;
} else {
console.error(`Falha no Zangão ${i}:`, resultado.reason);
}
});
return pollen;
}
Padrão: Circuit Breaker para Comunicação
Descrição
Padrão que monitora falhas em chamadas a Zangões e, após um limiar, "abre o circuito" (para de fazer chamadas) por um tempo determinado, evitando sobrecarga em serviços com problemas.
Quando Usar
- Para Zangões que acessam serviços externos instáveis
- Quando a falha de um Zangão pode causar efeito cascata
- Para evitar gastar recursos com chamadas que provavelmente falharão
Exemplo Prático: API de Pagamentos Externos
Uma Operária que chama um Zangão que integra com um gateway de pagamentos externo. Se o gateway começar a falhar, o Circuit Breaker evita chamadas desnecessárias até que o serviço se normalize.
Implementação
from pybreaker import CircuitBreaker
# Configuração do Circuit Breaker
ocr_breaker = CircuitBreaker(
fail_max=3, # 3 falhas consecutivas abrem o circuito
reset_timeout=60 # 60 segundos para tentar novamente
)
@ocr_breaker
def call_zangao_ocr(pollen):
# Lógica de chamada ao Zangão OCR
response = requests.post(ZANGAO_OCR_URL, json=pollen, timeout=10)
response.raise_for_status()
return response.json()
# Na Operária:
try:
result = call_zangao_ocr(pollen)
except CircuitBreakerError:
# Circuito aberto - não chamar o serviço
pollen['ocr_status'] = 'circuit_open'
enviar_para_fila_de_retentativa(pollen)
import { CircuitBreaker } from 'opossum';
const options = {
timeout: 10000, // 10 segundos
errorThresholdPercentage: 50, // Abre após 50% de falhas
resetTimeout: 30000 // 30 segundos para resetar
};
const breaker = new CircuitBreaker(async (pollen) => {
const response = await fetch(ZANGAO_OCR_URL, {
method: 'POST',
body: JSON.stringify(pollen)
});
if (!response.ok) throw new Error(response.statusText);
return response.json();
}, options);
// Na Operária:
breaker.fire(pollen)
.then(result => processResult(result))
.catch(err => {
if (breaker.opened) {
// Circuito aberto - tratar adequadamente
pollen.ocr_status = 'circuit_open';
enviarParaFilaDeRetentativa(pollen);
}
});
Padrão: Comunicação da Rainha (Fallback e Histórico)
Descrição
A comunicação com a Rainha (a LLM de ponta) é a etapa mais crítica e cara do pipeline. Este padrão garante a sua **resiliência** e **eficiência**, tratando a interação com a LLM não como uma simples chamada de API, mas como uma conversa com estado que pode sobreviver a falhas.
Quando Usar
- Sempre que a decisão final depende de uma LLM, para evitar dependência de um único provedor (vendor lock-in).
- Em sistemas críticos onde uma falha na API da LLM não pode interromper o processo de negócio.
- Para otimização de custos, permitindo escolher dinamicamente o provedor mais barato ou mais rápido.
- Para análises complexas que podem ser iniciadas num provedor e validadas ou continuadas noutro.
Exemplo Prático: Análise de Contratos
Uma Operária prepara um contrato jurídico e envia para análise pela Rainha. Se a OpenAI estiver instável, o sistema automaticamente tenta com o Gemini da Google, mantendo todo o histórico da conversa.
Implementação
A Operária de Consolidação utiliza uma classe ou módulo `Queen` que abstrai a lógica de comunicação. Esta classe mantém um `chat_history` e itera sobre uma lista de provedores de LLM pré-configurados. Se o primeiro falhar, ela tenta o segundo, enviando não apenas a última pergunta, mas todo o histórico da conversa, garantindo que o contexto seja totalmente preservado.
# Exemplo de Rainha com fallback e histórico de chat
import requests
import os
import json
class Queen:
def __init__(self, system_prompt, providers_config):
self.providers = providers_config
# O histórico é mantido para conversas multi-turn ou fallbacks
self.chat_history = [{"role": "system", "content": system_prompt}]
def make_decision(self, honey_data):
self.chat_history.append({"role": "user", "content": f"CONTEXTO FACTUAL (MEL PURO):\n{json.dumps(honey_data, indent=2)}"})
errors = {}
for provider in self.providers:
try:
body = self._adapt_body(provider, self.chat_history)
response = requests.post(provider['url'], json=body, headers=provider['headers'], timeout=45)
response.raise_for_status()
assistant_response = self._extract_response(provider['name'], response.json())
self.chat_history.append({"role": "assistant", "content": assistant_response})
return assistant_response
except requests.RequestException as e:
print(f"Erro ao chamar {provider['name']}: {e}")
errors[provider['name']] = str(e)
# Remove a última mensagem do utilizador se todos falharem
self.chat_history.pop()
raise Exception(f"Falha em todos os provedores de LLM: {errors}")
def _adapt_body(self, provider, history):
if provider['name'] == 'OpenAI':
return {"model": provider.get('model', 'gpt-4'), "messages": history}
# Adicionar adaptadores para outros provedores (Gemini, Claude, etc.)
return {}
def _extract_response(self, provider_name, data):
if provider_name == 'OpenAI':
return data['choices'][0]['message']['content']
return ""
// Exemplo de Rainha com fallback e histórico de chat
interface ChatMessage {
role: 'system' | 'user' | 'assistant';
content: string;
}
interface LLMProvider {
name: string;
url: string;
headers: Record<string, string>;
adaptBody: (history: ChatMessage[]) => Record<string, any>;
extractResponse: (responseData: any) => string;
}
export class Queen {
private chatHistory: ChatMessage[];
private providers: LLMProvider[];
constructor(systemPrompt: string, providers: LLMProvider[]) {
this.providers = providers;
this.chatHistory = [{ role: 'system', content: systemPrompt }];
}
public async makeDecision(honey: any): Promise<string> {
const userMessage = {
role: 'user' as const,
content: `CONTEXTO FACTUAL (MEL PURO):\n ${JSON.stringify(honey, null, 2)}`
};
this.chatHistory.push(userMessage);
const errors: Record<string, string> = {};
for (const provider of this.providers) {
try {
const body = provider.adaptBody(this.chatHistory);
const response = await fetch(provider.url, {
method: 'POST',
headers: { ...provider.headers, 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
if (!response.ok) throw new Error(`API error: ${response.statusText}`);
const responseData = await response.json();
const assistantResponse = provider.extractResponse(responseData);
this.chat_history.push({ role: 'assistant', content: assistantResponse });
return assistant_response;
} catch (error: any) {
console.error(`Erro ao chamar ${provider.name}:`, error.message);
errors[provider.name] = error.message;
}
}
this.chatHistory.pop(); // Remove user message if all failed
throw new Error(`Falha em todos os provedores de LLM: ${JSON.stringify(errors)}`);
}
}
Padrão: Triangulação de Dados via Armazenamento
Descrição
Quando se lida com dados de grande volume (ex: ficheiros de vídeo, áudio de alta qualidade, grandes datasets), passá-los diretamente no corpo de uma requisição HTTP ou numa mensagem de fila é impraticável e ineficiente. Este padrão resolve o problema ao evitar a transferência do dado bruto, trocando apenas as suas referências (ponteiros).
Quando Usar
- Ao processar ficheiros de multimédia (vídeos, áudios, imagens de alta resolução).
- Com datasets de grande volume que excedem os limites de payload de APIs e serviços de mensageria.
- Para desacoplar o processamento do armazenamento, permitindo que cada Zangão aceda aos dados de forma independente e paralela.
Exemplo Prático: Processamento de Vídeos Longos
Uma Operária recebe um vídeo de 2GB, faz upload para o storage cloud, e envia apenas a referência para Zangões de transcrição, análise de cenas e detecção de objetos.
Implementação
O fluxo envolve um serviço de armazenamento de objetos (como AWS S3, Google Cloud Storage ou Azure Blob Storage) como intermediário. A comunicação entre os agentes contém apenas a URI (o "endereço") do dado, não o dado em si.
# operaria.py
import boto3
s3_client = boto3.client('s3')
BUCKET_NAME = "minha-colmeia-bucket"
def operaria_de_video(pollen):
input_uri = pollen['source_uri'] # ex: "s3://.../video.mp4"
# 1. Operária baixa o ficheiro, processa (ex: corta o vídeo) e faz upload do resultado
# ... (lógica de download e processamento) ...
processed_video_key = f"processed/{pollen['pollen_id']}.mp4"
# s3_client.upload_file("video_processado.mp4", BUCKET_NAME, processed_video_key)
# 2. Chama o Zangão enviando apenas a referência
pollen_para_zangao = { "audio_path": f"s3://{BUCKET_NAME}/{processed_video_key}" }
# resultado_zangao = chamar_zangao_sincrono(pollen_para_zangao)
# 5. Recebe de volta a URI do resultado do zangão e continua
# pollen['transcribed_uri'] = resultado_zangao['transcribed_uri']
# zangao.py
def zangao_de_transcricao(pollen_com_referencia):
audio_uri = pollen_com_referencia['audio_path']
# 3. Zangão baixa o ficheiro a partir da URI
# local_path = s3_client.download_file(...)
# ... (processa o áudio e gera um ficheiro de texto) ...
# 4. Zangão faz upload do seu resultado para o Storage
transcription_key = f"transcripts/{pollen_com_referencia['pollen_id']}.txt"
# s3_client.upload_file("transcricao.txt", BUCKET_NAME, transcription_key)
# 5. Retorna a referência ao resultado
return { "transcribed_uri": f"s3://{BUCKET_NAME}/{transcription_key}" }
// operaria.ts
import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3";
const s3Client = new S3Client({});
const BUCKET_NAME = "minha-colmeia-bucket";
async function operariaDeVideo(pollen: any): Promise<any> {
const inputUri = pollen.source_uri;
// 1. Operária processa o ficheiro e faz upload
// ... (lógica para processar o vídeo) ...
const processedVideoKey = `processed/${pollen.pollen_id}.mp4`;
// await s3Client.send(new PutObjectCommand({ Bucket: BUCKET_NAME, Key: processedVideoKey, Body: ... }));
// 2. Chama o Zangão com a referência
const pollenParaZangao = { audio_path: `s3://${BUCKET_NAME}/${processedVideoKey}` };
// const resultadoZangao = await chamarZangao(pollenParaZangao);
// 5. Recebe de volta a URI do resultado
// pollen.transcribed_uri = resultadoZangao.transcribed_uri;
return pollen;
}
// zangao.ts
async function zangaoDeTranscricao(pollenComReferencia: any): Promise<any> {
const audioUri = pollenComReferencia.audio_path;
// 3. Zangão baixa o ficheiro a partir da URI
// const { Body } = await s3Client.send(new GetObjectCommand({ Bucket: ..., Key: ... }));
// ... (processa o áudio) ...
const transcriptionText = "Texto transcrito aqui...";
// 4. Zangão faz upload do seu resultado
const transcriptionKey = `transcripts/${pollenComReferencia.pollen_id}.txt`;
// await s3Client.send(new PutObjectCommand({ Bucket: BUCKET_NAME, Key: transcriptionKey, Body: transcriptionText }));
// 5. Retorna a referência
return { transcribed_uri: `s3://${BUCKET_NAME}/${transcriptionKey}` };
}
Padrão: Broadcast com Seleção Contextual
Neste padrão, um mesmo Feromônio é enviado a múltiplas filas. Cada Operária escuta seletivamente apenas os eventos que fazem sentido para seu contexto, baseando-se em filtros por tipo ou domínio. Isso favorece extensibilidade e reuso entre Colmeias.
Descrição
Neste padrão, uma Operária publica um único Feromônio em múltiplas filas, e diferentes Operárias consomem a mensagem de forma seletiva, conforme o tipo ou domínio do dado. Isso facilita reuso e expansão modular da colmeia.
Quando Usar
- Quando diferentes domínios precisam reagir ao mesmo evento (ex: OCR e Classificação).
- Para desacoplar totalmente o produtor do consumidor.
- Quando o dado publicado pode ter múltiplas interpretações.
Exemplo Prático: Evento de Novo Usuário
Quando um novo usuário se cadastra, múltiplos sistemas precisam ser notificados: envio de e-mail de boas-vindas, criação de perfil analítico, verificação de fraude, etc.
Implementação
A publicação é feita em todas as filas de interesse. O filtro é feito pelas próprias Operárias, com base em campos como `type`, `doc_type`, `source` ou qualquer outra heurística.
# Publicadora (Python)
import boto3, json
sqs = boto3.client("sqs")
filas = [
"https://sqs.us-east-1.amazonaws.com/123456789/ocr-fila",
"https://sqs.us-east-1.amazonaws.com/123456789/classificacao-fila"
]
feromonio = {
"pollen_id": "123",
"type": "document.received",
"payload": { "source_uri": "s3://bucket/doc.pdf" }
}
for fila in filas:
sqs.send_message(QueueUrl=fila, MessageBody=json.dumps(feromonio))
# Consumidora (ex: OCR)
def handler(event, context):
for record in event['Records']:
f = json.loads(record['body'])
if f['type'] == 'document.received':
print("Executando OCR...")
// Publicadora (TypeScript)
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
const sqs = new SQSClient({});
const filas = [
"https://sqs.us-east-1.amazonaws.com/123456789/ocr-fila",
"https://sqs.us-east-1.amazonaws.com/123456789/classificacao-fila"
];
const feromonio = {
pollen_id: "123",
type: "document.received",
payload: { source_uri: "s3://bucket/doc.pdf" }
};
for (const fila of filas) {
await sqs.send(new SendMessageCommand({
QueueUrl: fila,
MessageBody: JSON.stringify(feromonio)
}));
}
// Consumidora
export const handler = async (event: any) => {
for (const record of event.Records) {
const f = JSON.parse(record.body);
if (f.type === "document.received") {
console.log("Executando OCR...");
}
}
};
Resumo e Recomendações
| Padrão | Ideal Para | Tecnologias | Latência | Custo | Complexidade | Resiliência |
|---|---|---|---|---|---|---|
| Síncrono | Tarefas rápidas (<1s) | REST, gRPC | Baixa | Baixo | Baixa | Média |
| Assíncrono | Processos longos ou batch | SQS, Pub/Sub, Kafka | Alta | Médio | Média | Alta |
| Paralelo | Múltiplas análises independentes | AsyncIO, Promise.all | Média* | Médio | Alta | Média |
| Circuit Breaker | Serviços externos instáveis | PyBreaker, Opossum | - | Baixo | Média | Alta |
| Rainha | Chamadas a LLMs | Multi-provedores | Alta | Alto | Alta | Alta |
| Triangulação | Grandes volumes de dados | S3, GCS, Blob | Alta | Baixo* | Média | Alta |
| Broadcast | Eventos para múltiplos sistemas | SNS, Pub/Sub | Variável | Médio | Média | Alta |
Não existe um padrão "melhor" que o outro; eles servem a propósitos diferentes e muitas vezes coexistem na mesma Colmeia. A escolha correta depende da análise dos requisitos de latência, resiliência e volume de cada etapa do seu pipeline.
Fluxo de Decisão para Escolha de Padrão
Use este diagrama como guia para selecionar o padrão de comunicação mais adequado para sua necessidade: