Implementação de Operária
A espinha dorsal da coreografia. Aprenda a estruturar Operárias para executar lógica determinística, orquestrar fluxos e delegar inteligentemente as tarefas cognitivas aos Zangões.
O Problema
Em sistemas de I.A. complexos, a orquestração do fluxo de dados é um desafio central. A coordenação de múltiplas tarefas — como pré-processamento, chamadas a modelos especialistas e lógica de negócio — pode levar a arquiteturas frágeis e de difícil manutenção.
Cenário Problemático
Imagine um sistema de análise de documentos que precisa:
- Extrair texto (OCR)
- Classificar o tipo de documento
- Extrair entidades relevantes
- Validar informações com sistemas externos
- Gerar um resumo executivo
Numa abordagem tradicional, isso seria implementado como um serviço monolítico ou um orquestrador centralizado, levando a: acoplamento forte entre etapas, dificuldade de reutilização de componentes, um ponto único de falha e complexidade para escalar partes específicas do fluxo de forma independente.
A Solução: A Operária Coreografadora
A Operária é a espinha dorsal da `bee.arq`. Em vez de uma orquestração centralizada, a arquitetura propõe uma coreografia de eventos, onde Operárias autônomas e especialistas reagem a "Feromônios" (eventos) para conduzir o "Pólen" (dados) através do pipeline. Elas podem se comunicar entre si, seja diretamente ou através de eventos, passando o controle de uma etapa para a outra.
O seu papel fundamental é executar lógica de negócio determinística, atuando como um filtro de qualidade e um agente de transformação. Elas garantem que o "Pólen" seja validado, enriquecido e roteado corretamente. É através delas que os Zangões são invocados para as tarefas cognitivas, assegurando que estes especialistas recebam dados no formato e contexto exatos que necessitam.
Diagrama Comparativo
Inicial"] -->|Evento| O2["Operária
OCR"] O2 -->|Invoca| Z1["Zangão
OCR"] O2 -->|Evento| O3["Operária
Classificação"] O3 -->|Invoca| Z2["Zangão
Classificador"] O3 -->|Evento| O4["Operária
Consolidação"] O4 --> R["Rainha"] end
Diretriz: A Fronteira entre Operária e Zangão
A distinção entre as responsabilidades de uma Operária e de um Zangão é o princípio mais importante para o sucesso da `bee.arq`. A falha em respeitar esta fronteira leva a componentes acoplados, difíceis de testar e ineficientes.
Regra Fundamental
- 🐝 Operárias executam lógica DETERMINÍSTICA. Suas tarefas são validação, transformação, enriquecimento com dados de fontes externas (como um banco de dados), roteamento e, crucialmente, a invocação de Zangões. Para a mesma entrada, uma Operária sempre produzirá a mesma saída.
- 🧠 Zangões executam lógica COGNITIVA. Qualquer tarefa que exija inferência, classificação, extração de entidades, sumarização ou qualquer outra forma de inteligência artificial é, por definição, responsabilidade de um Zangão.
Uma Operária nunca implementa lógica de I.A. Ela sempre delega essa tarefa para um Zangão especialista.
Padrão: Papéis Flexíveis
Uma Operária não possui um tipo fixo; seu papel é definido pelo contexto e pela sua posição no fluxo de trabalho. Esta flexibilidade é o que permite a composição de pipelines complexos a partir de unidades simples e reutilizáveis, que podem chamar outras Operárias ou invocar Zangões conforme a necessidade.
Exemplo de Fluxo Completo (Diagrama de Sequência)
Este diagrama ilustra a interação temporal entre os componentes num fluxo típico de processamento de documentos.
Papel: Operária Inicial (Ponto de Entrada)
Descrição
A Operária Inicial faz a fronteira entre a "Floresta" e a "Colmeia". Sua função é ser acionada por um evento externo, encapsular o dado bruto numa estrutura "Pólen" com um trace_id e emitir o primeiro "Feromônio" interno, iniciando o pipeline de forma resiliente.
Na Prática: Recebimento de Transações Financeiras
Um sistema de pagamentos publica um evento de "nova transação" num tópico do Google Pub/Sub. Uma Google Cloud Function atua como Operária Inicial, consumindo o evento, validando o schema e publicando-o num tópico interno com política de retry para garantir a entrega.
# gcp_operaria_inicial_pubsub.py (com retry)
import functions_framework, json, uuid, base64, time
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
TOPIC_PATH = publisher.topic_path('gcp-project-id', 'transacoes-para-analise')
@functions_framework.cloud_event
def handler(cloud_event):
try:
raw_data = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8')
pollen = {"pollen_id": str(uuid.uuid4()), "source_data": json.loads(raw_data)}
feromonio = {"trace_id": cloud_event["id"], "payload": pollen}
# Publica com retry exponencial para robustez
for attempt in range(3):
try:
future = publisher.publish(TOPIC_PATH, json.dumps(feromonio).encode("utf-8"))
future.result(timeout=10) # Aguarda a conclusão
print(f"Pipeline iniciado para pollen {pollen['pollen_id']}.")
return
except Exception as e:
print(f"Falha na tentativa {attempt+1} de publicar: {e}")
if attempt == 2:
raise # Lança a exceção final após a última tentativa
time.sleep((2 ** attempt) * 0.5) # 0.5s, 1s, 2s
except Exception as e:
# Idealmente, logar para um sistema de monitoramento
print(f"INITIAL_WORKER_FAILED: Falha crítica ao processar evento: {str(e)}")
raise
// gcpOperariaInicialPubSub.ts (com retry da biblioteca)
import { CloudEvent } from 'firebase-functions/lib/v2/core';
import { PubSub } from '@google-cloud/pubsub';
const pubsub = new PubSub({
// A biblioteca do GCP já implementa retry com backoff exponencial por padrão.
// As configurações podem ser ajustadas se necessário.
retryOptions: {
maxRetries: 3,
backoffSettings: {
initialRetryDelayMillis: 200,
maxRetryDelayMillis: 60000,
multiplier: 2.0,
}
}
});
// ... (código da operária para criar o feromônio) ...
// await pubsub.topic(TOPIC_NAME).publishMessage({ json: feromonio });
Papel: Operária de Enriquecimento
Descrição
Esta Operária é o motor do pipeline, adicionando valor ao "Pólen". Sua responsabilidade crucial é delegar tarefas cognitivas. Ela prepara os dados e invoca um Zangão especialista, recebendo de volta o resultado e incorporando-o ao "Pólen" antes de passá-lo para a próxima Operária ou estágio. A comunicação com Zangões deve ser resiliente.
Na Prática: Extração de Entidades com Circuit Breaker
Uma Azure Function consome um artigo de um Azure Service Bus. Para extrair entidades, ela invoca um Zangão especialista. Para evitar sobrecarregar um Zangão que possa estar instável, a chamada é encapsulada num padrão de Circuit Breaker, que suspende as chamadas temporariamente após uma série de falhas.
# azure_operaria_ner.py (com Circuit Breaker via 'pybreaker')
from pybreaker import CircuitBreaker, CircuitBreakerError
import requests
# O breaker é instanciado a nível de módulo para manter o estado entre invocações
zangao_breaker = CircuitBreaker(fail_max=3, reset_timeout=60)
@zangao_breaker
def call_zangao(text_content):
# ... (lógica da chamada com requests)
pass
# Dentro da função principal da Operária:
# try:
# ner_result = call_zangao(pollen["article_text"])
# pollen["entities"] = ner_result["entities"]
# except CircuitBreakerError:
# # Lógica de fallback: enfileirar para tentativa posterior ou marcar como falha
# print("Circuit Breaker está aberto. A chamada ao Zangão foi pulada.")
# pollen["status"] = "ZANGAO_UNAVAILABLE"
// azureOperariaNer.ts (com Circuit Breaker manual)
import { AzureFunction, Context } from "@azure/functions";
import fetch from "node-fetch";
// Implementação simples de Circuit Breaker
class ZangaoCaller {
private failures = 0;
private lastFailureTime = 0;
private readonly failureThreshold = 3;
private readonly resetTimeout = 60000; // 1 minuto
async call(url: string, payload: any): Promise<any> {
if (this.failures >= this.failureThreshold && Date.now() - this.lastFailureTime < this.resetTimeout) {
throw new Error("Circuit breaker aberto - chamadas ao Zangão suspensas.");
}
try {
const response = await fetch(url, { /* ... opções com timeout ... */ });
if (!response.ok) throw new Error(`HTTP Error: ${response.status}`);
this.failures = 0; // Sucesso, reseta o contador
return await response.json();
} catch (error) {
this.failures++;
this.lastFailureTime = Date.now();
throw error;
}
}
}
const zangaoCaller = new ZangaoCaller(); // Singleton a nível de instância
// Na Azure Function:
// const nerResult = await zangaoCaller.call(ZANGAO_NER_URL, { text: pollen.articleText });
Papel: Operária de Roteamento (Gateway)
Descrição
Uma Operária de Roteamento aplica lógica de negócio determinística para direcionar o "Pólen". Baseando-se no seu conteúdo, ela pode enviá-lo para um único destino (desvio condicional) ou para múltiplos destinos simultaneamente (desvio paralelo), permitindo a criação de fluxos de trabalho complexos e dinâmicos.
Na Prática: Triagem de Alertas de Segurança
Um sistema de monitoramento gera eventos de segurança. Uma Operária de Roteamento em AWS Lambda analisa cada evento. Se a prioridade for "CRITICAL", o evento é enviado para um tópico SNS que notifica a equipe de plantão. Se a categoria for "Firewall", é enviado para uma fila SQS para análise de logs. Independentemente da prioridade, todos os eventos são enviados para um stream do Kinesis Firehose para arquivamento em S3.
(Alerta Crítico)"] Q_SQS["Fila SQS
(Análise de Firewall)"] S_Kinesis["Kinesis Firehose
(Arquivamento)"] end O_Router -- "Prioridade CRITICAL" --> T_SNS O_Router -- "Categoria Firewall" --> Q_SQS O_Router -- "Sempre" --> S_Kinesis
# aws_operaria_roteamento_seguranca.py (AWS Lambda)
import json, boto3
sns = boto3.client("sns")
sqs = boto3.client("sqs")
firehose = boto3.client("firehose")
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123:TopicoCritico"
SQS_QUEUE_URL = "arn:aws:sqs:us-east-1:123:FilaFirewall"
FIREHOSE_STREAM_NAME = "stream-de-arquivamento"
def handler(event, context):
pollen = json.loads(event['Records'][0]['body'])['payload']
message_body = json.dumps(pollen)
# Roteamento paralelo e condicional
if pollen.get('priority') == 'CRITICAL':
sns.publish(TopicArn=SNS_TOPIC_ARN, Message=message_body)
if pollen.get('category') == 'Firewall':
sqs.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message_body)
# Roteamento obrigatório
firehose.put_record(
DeliveryStreamName=FIREHOSE_STREAM_NAME,
Record={'Data': message_body.encode('utf-8')}
)
// awsOperariaRoteamentoSeguranca.ts (AWS Lambda)
import { SQSHandler, SQSEvent } from 'aws-lambda';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { FirehoseClient, PutRecordCommand } from '@aws-sdk/client-firehose';
const sns = new SNSClient({});
const sqs = new SQSClient({});
const firehose = new FirehoseClient({});
export const handler: SQSHandler = async (event: SQSEvent): Promise<void> => {
const pollen = JSON.parse(event.Records[0].body).payload;
const messageBody = JSON.stringify(pollen);
const promises = [];
if (pollen.priority === 'CRITICAL') {
promises.push(sns.send(new PublishCommand({TopicArn: "...", Message: messageBody})));
}
if (pollen.category === 'Firewall') {
promises.push(sqs.send(new SendMessageCommand({QueueUrl: "...", MessageBody: messageBody})));
}
promises.push(firehose.send(new PutRecordCommand({
DeliveryStreamName: "...",
Record: { Data: Buffer.from(messageBody) }
})));
await Promise.all(promises);
};
Papel: Operária de Consolidação (Preparadora do Mel)
Descrição
A Operária de Consolidação é a guardiã da Rainha. Sua missão é preparar o "Mel Puro": um payload de dados final, coeso, factual e perfeitamente estruturado. Ela filtra o ruído e seleciona apenas as informações essenciais do "Pólen" para a tarefa de síntese da Rainha, otimizando a precisão e o custo da chamada final à LLM.
Para otimizar ainda mais, esta Operária pode invocar um último Zangão (ex: um sumarizador) para pré-processar um dado complexo, garantindo que a Rainha receba a informação mais concisa e relevante possível.
Na Prática: Geração de Relatório de Análise de Risco
Ao final de um pipeline de análise de crédito, uma Operária de Consolidação recebe um "Pólen" contendo o score de crédito do cliente, o histórico de transações e o resultado de uma verificação de identidade. Ela monta o "Mel Puro", um objeto JSON limpo contendo apenas os campos essenciais, e o envia para a Rainha com a instrução: "Gere um parágrafo de relatório de risco com base nestes dados".
# gcp_operaria_consolidacao.py
# (Assume a existência de uma classe 'Queen' que abstrai a chamada à LLM no Vertex AI)
# from rainha_vertex import Queen
class OperariaConsolidacao:
def __init__(self, pollen): self.pollen = pollen
def preparar_mel_e_chamar_rainha(self):
# Seleciona apenas os dados factuais e essenciais do Pólen
mel_puro = {
"score_credito": self.pollen.get("credit_score"),
"identidade_verificada": self.pollen.get("identity_verified"),
"principais_transacoes": self.pollen.get("transaction_highlights", [])[:5]
}
# rainha = Queen(model_name="gemini-pro")
# instrucao = "Gere um parágrafo de análise de risco para um novo cliente."
# relatorio = rainha.make_decision(instrucao, mel_puro)
return {"mel_puro": mel_puro, "relatorio_simulado": "Risco moderado."}
// gcpOperariaConsolidacao.ts
// import { Queen } from './rainhaVertex';
class OperariaConsolidacao {
constructor(private pollen: any) {}
public async prepararMelEChamarRainha(): Promise<any> {
const melPuro = {
scoreCredito: this.pollen.creditScore,
identidadeVerificada: this.pollen.identityVerified,
principaisTransacoes: (this.pollen.transactionHighlights || []).slice(0, 5),
};
// const rainha = new Queen({ modelId: 'gemini-pro' });
// const instrucao = "Gere um parágrafo de análise de risco...";
// const relatorio = await rainha.makeDecision(instrucao, melPuro);
return { melPuro, relatorioSimulado: "Risco moderado." };
}
}
Padrão Avançado: Operária de Compensação (Saga)
Descrição
Na `bee.arq`, o padrão Saga transcende a simples reversão de transações. Ele é adaptado para gerenciar a consistência de processos de análise de longa duração, onde a "compensação" pode significar reagir à falta de dados dentro de um prazo, em vez de reverter uma ação.
Na Prática: Análise de Processo Jurídico
Um processo jurídico é iniciado e a colmeia precisa consolidar a petição inicial, as provas e a contestação do réu para uma análise final da Rainha. A contestação tem um prazo legal para ser juntada. Uma Operária Coordenadora gerencia o estado. Se o prazo expira e a contestação não chega, ela aciona uma Operária de Compensação.
A compensação aqui não é um "delete", mas uma ação de negócio: a Operária de Compensação atualiza o status do "Pólen" para "INCOMPLETO_PRAZO_EXPIRADO", invoca um Zangão para gerar uma análise preliminar com os dados existentes e notifica a equipe jurídica sobre a situação para que tomem as medidas cabíveis.
# operaria_compensacao_juridica.py
class OperariaCompensacaoJuridica:
def __init__(self, feromonio_de_falha):
self.pollen = feromonio_de_falha['payload']
self.reason = feromonio_de_falha['metadata']['reason']
def compensate(self):
if self.reason == "PRAZO_EXPIRADO":
print(f"Compensando o processo {self.pollen['case_id']}: prazo expirado.")
self._gerar_analise_preliminar()
self._notificar_equipe_juridica()
self.pollen['status'] = 'ANALISE_PARCIAL_CONCLUIDA'
return self.pollen
def _gerar_analise_preliminar(self):
# Lógica para chamar um Zangão com os dados parciais
print("-> Gerando análise preliminar...")
def _notificar_equipe_juridica(self):
# Lógica para enviar e-mail ou notificação via SNS/PubSub
print("-> Notificando equipe sobre a incompletude dos dados.")
Padrão Avançado: Operária de Batch
Descrição
Para cenários de alta volumetria, como processamento de logs ou telemetria de IoT, processar cada evento individualmente é ineficiente. Uma Operária de Batch é projetada para agregar múltiplos "Pólens" (eventos) e processá-los em lote, otimizando o uso de recursos e reduzindo a latência total.
Características
- Agregação: Usa gatilhos baseados em janela de tempo (ex: a cada 5 minutos) ou quantidade (ex: a cada 1000 eventos).
- Processamento Paralelo: Pode usar paralelismo interno (ex: `ThreadPoolExecutor` em Python) para processar os itens do lote simultaneamente.
- Isolamento (Bulkhead): O processamento em lote ajuda a isolar os recursos, evitando que um pico de eventos sobrecarregue os sistemas a jusante.
Agregadora"] subgraph Processamento OBatch["Operária de Batch"] subgraph Processamento Interno Paralelo T1["Thread 1"] T2["Thread 2"] T3["..."] end end P1 & P2 & P3 --> Q Q -- "Gatilho com Lote de 'Pólens'" --> OBatch OBatch --> T1 & T2 & T3
# Exemplo de Operária de Batch em AWS Lambda com gatilho SQS
from concurrent.futures import ThreadPoolExecutor
import json
def process_record(record):
# Lógica para processar um único "Pólen" do lote
pollen = json.loads(record['body'])['payload']
# ... faz o trabalho ...
return {"pollen_id": pollen['pollen_id'], "status": "processed"}
def handler(event, context):
records = event['Records']
# Processa o lote em paralelo para otimizar o tempo de execução da Lambda
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_record, records))
print(f"Lote de {len(results)} itens processado com sucesso.")
return {"status": "success", "processed_count": len(results)}
Padrão: Produção de Geleia Real (Observabilidade)
Descrição
Na `bee.arq`, "Geleia Real" é o termo para logs estruturados e telemetria. É uma diretriz fundamental que cada Operária produza "Geleia Real" para garantir a observabilidade do sistema. Isso permite rastreabilidade (tracing), monitoramento, auditoria e a criação de datasets para re-treinamento de Zangões.
Implementação
A interface de log deve ser enriquecida para suportar o tracing distribuído e a coleta de métricas, alinhando-se com as práticas de OpenTelemetry. Os logs são enviados para o serviço de observabilidade da nuvem escolhida (Amazon CloudWatch, Google Cloud Logging, Azure Monitor).
(CloudWatch, GCP Logging, Azure Monitor)"] CloudLog -->|Permite| D["Dashboard"] CloudLog -->|Permite| A["Alertas"] CloudLog -->|Permite| T["Rastreabilidade"]
// Interface expandida para Geleia Real com Tracing e Métricas
interface GeleiaRealLog {
trace_id: string; // ID global da transação
pollen_id: string; // ID do dado sendo processado
parent_span_id?: string; // ID da operação que chamou esta
span_id: string; // ID único para esta operação específica
start_time: string; // ISO 8601
duration_ms?: number;
operation: string; // ex: "OperariaEnriquecimento:callZangaoNER"
status: 'started' | 'success' | 'failed';
details: string; // Mensagem descritiva
metrics?: {
zangao_call_duration_ms?: number;
records_in_batch?: number;
};
error?: {
message: string;
stack?: string;
code?: string;
};
}