Padrões de Arquitetura

Implementação de Operária

A espinha dorsal da coreografia. Aprenda a estruturar Operárias para executar lógica determinística, orquestrar fluxos e delegar inteligentemente as tarefas cognitivas aos Zangões.

O Problema

Em sistemas de I.A. complexos, a orquestração do fluxo de dados é um desafio central. A coordenação de múltiplas tarefas — como pré-processamento, chamadas a modelos especialistas e lógica de negócio — pode levar a arquiteturas frágeis e de difícil manutenção.

Cenário Problemático

Imagine um sistema de análise de documentos que precisa:

  1. Extrair texto (OCR)
  2. Classificar o tipo de documento
  3. Extrair entidades relevantes
  4. Validar informações com sistemas externos
  5. Gerar um resumo executivo

Numa abordagem tradicional, isso seria implementado como um serviço monolítico ou um orquestrador centralizado, levando a: acoplamento forte entre etapas, dificuldade de reutilização de componentes, um ponto único de falha e complexidade para escalar partes específicas do fluxo de forma independente.

A Solução: A Operária Coreografadora

A Operária é a espinha dorsal da `bee.arq`. Em vez de uma orquestração centralizada, a arquitetura propõe uma coreografia de eventos, onde Operárias autônomas e especialistas reagem a "Feromônios" (eventos) para conduzir o "Pólen" (dados) através do pipeline. Elas podem se comunicar entre si, seja diretamente ou através de eventos, passando o controle de uma etapa para a outra.

O seu papel fundamental é executar lógica de negócio determinística, atuando como um filtro de qualidade e um agente de transformação. Elas garantem que o "Pólen" seja validado, enriquecido e roteado corretamente. É através delas que os Zangões são invocados para as tarefas cognitivas, assegurando que estes especialistas recebam dados no formato e contexto exatos que necessitam.

Diagrama Comparativo

graph TD subgraph Tradicional A["Serviço Monolítico"] --> B["1. OCR"] B --> C["2. Classificação"] C --> D["3. Extração"] D --> E["4. Validação"] E --> F["5. Resumo"] end subgraph bee.arq O1["Operária
Inicial"] -->|Evento| O2["Operária
OCR"] O2 -->|Invoca| Z1["Zangão
OCR"] O2 -->|Evento| O3["Operária
Classificação"] O3 -->|Invoca| Z2["Zangão
Classificador"] O3 -->|Evento| O4["Operária
Consolidação"] O4 --> R["Rainha"] end

Diretriz: A Fronteira entre Operária e Zangão

A distinção entre as responsabilidades de uma Operária e de um Zangão é o princípio mais importante para o sucesso da `bee.arq`. A falha em respeitar esta fronteira leva a componentes acoplados, difíceis de testar e ineficientes.

Regra Fundamental

  • 🐝 Operárias executam lógica DETERMINÍSTICA. Suas tarefas são validação, transformação, enriquecimento com dados de fontes externas (como um banco de dados), roteamento e, crucialmente, a invocação de Zangões. Para a mesma entrada, uma Operária sempre produzirá a mesma saída.
  • 🧠 Zangões executam lógica COGNITIVA. Qualquer tarefa que exija inferência, classificação, extração de entidades, sumarização ou qualquer outra forma de inteligência artificial é, por definição, responsabilidade de um Zangão.

Uma Operária nunca implementa lógica de I.A. Ela sempre delega essa tarefa para um Zangão especialista.

Padrão: Papéis Flexíveis

Uma Operária não possui um tipo fixo; seu papel é definido pelo contexto e pela sua posição no fluxo de trabalho. Esta flexibilidade é o que permite a composição de pipelines complexos a partir de unidades simples e reutilizáveis, que podem chamar outras Operárias ou invocar Zangões conforme a necessidade.

Exemplo de Fluxo Completo (Diagrama de Sequência)

Este diagrama ilustra a interação temporal entre os componentes num fluxo típico de processamento de documentos.

sequenceDiagram participant F as Flor (Fonte de Dados) participant OI as Operária Inicial participant OE as Operária Enriquecimento participant Z as Zangão participant OR as Operária Roteamento participant R as Rainha F->>OI: Evento (ex: novo arquivo) OI->>OE: Feromônio (Pólen inicial) OE->>Z: Invoca com dados do Pólen Z-->>OE: Retorna resultado cognitivo OE->>OR: Feromônio (Pólen enriquecido) OR->>R: Invoca com "Mel Puro" R-->>OR: Retorna decisão final

Papel: Operária Inicial (Ponto de Entrada)

Descrição

A Operária Inicial faz a fronteira entre a "Floresta" e a "Colmeia". Sua função é ser acionada por um evento externo, encapsular o dado bruto numa estrutura "Pólen" com um trace_id e emitir o primeiro "Feromônio" interno, iniciando o pipeline de forma resiliente.

Na Prática: Recebimento de Transações Financeiras

Um sistema de pagamentos publica um evento de "nova transação" num tópico do Google Pub/Sub. Uma Google Cloud Function atua como Operária Inicial, consumindo o evento, validando o schema e publicando-o num tópico interno com política de retry para garantir a entrega.


# gcp_operaria_inicial_pubsub.py (com retry)
import functions_framework, json, uuid, base64, time
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
TOPIC_PATH = publisher.topic_path('gcp-project-id', 'transacoes-para-analise')

@functions_framework.cloud_event
def handler(cloud_event):
    try:
        raw_data = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8')
        pollen = {"pollen_id": str(uuid.uuid4()), "source_data": json.loads(raw_data)}
        feromonio = {"trace_id": cloud_event["id"], "payload": pollen}
        
        # Publica com retry exponencial para robustez
        for attempt in range(3):
            try:
                future = publisher.publish(TOPIC_PATH, json.dumps(feromonio).encode("utf-8"))
                future.result(timeout=10)  # Aguarda a conclusão
                print(f"Pipeline iniciado para pollen {pollen['pollen_id']}.")
                return
            except Exception as e:
                print(f"Falha na tentativa {attempt+1} de publicar: {e}")
                if attempt == 2:
                    raise  # Lança a exceção final após a última tentativa
                time.sleep((2 ** attempt) * 0.5) # 0.5s, 1s, 2s

    except Exception as e:
        # Idealmente, logar para um sistema de monitoramento
        print(f"INITIAL_WORKER_FAILED: Falha crítica ao processar evento: {str(e)}")
        raise

// gcpOperariaInicialPubSub.ts (com retry da biblioteca)
import { CloudEvent } from 'firebase-functions/lib/v2/core';
import { PubSub } from '@google-cloud/pubsub';

const pubsub = new PubSub({
    // A biblioteca do GCP já implementa retry com backoff exponencial por padrão.
    // As configurações podem ser ajustadas se necessário.
    retryOptions: {
        maxRetries: 3, 
        backoffSettings: {
            initialRetryDelayMillis: 200,
            maxRetryDelayMillis: 60000,
            multiplier: 2.0,
        }
    }
});
// ... (código da operária para criar o feromônio) ...
// await pubsub.topic(TOPIC_NAME).publishMessage({ json: feromonio });

Papel: Operária de Enriquecimento

Descrição

Esta Operária é o motor do pipeline, adicionando valor ao "Pólen". Sua responsabilidade crucial é delegar tarefas cognitivas. Ela prepara os dados e invoca um Zangão especialista, recebendo de volta o resultado e incorporando-o ao "Pólen" antes de passá-lo para a próxima Operária ou estágio. A comunicação com Zangões deve ser resiliente.

Na Prática: Extração de Entidades com Circuit Breaker

Uma Azure Function consome um artigo de um Azure Service Bus. Para extrair entidades, ela invoca um Zangão especialista. Para evitar sobrecarregar um Zangão que possa estar instável, a chamada é encapsulada num padrão de Circuit Breaker, que suspende as chamadas temporariamente após uma série de falhas.


# azure_operaria_ner.py (com Circuit Breaker via 'pybreaker')
from pybreaker import CircuitBreaker, CircuitBreakerError
import requests

# O breaker é instanciado a nível de módulo para manter o estado entre invocações
zangao_breaker = CircuitBreaker(fail_max=3, reset_timeout=60)

@zangao_breaker
def call_zangao(text_content):
    # ... (lógica da chamada com requests)
    pass
    
# Dentro da função principal da Operária:
# try:
#    ner_result = call_zangao(pollen["article_text"])
#    pollen["entities"] = ner_result["entities"]
# except CircuitBreakerError:
#    # Lógica de fallback: enfileirar para tentativa posterior ou marcar como falha
#    print("Circuit Breaker está aberto. A chamada ao Zangão foi pulada.")
#    pollen["status"] = "ZANGAO_UNAVAILABLE"

// azureOperariaNer.ts (com Circuit Breaker manual)
import { AzureFunction, Context } from "@azure/functions";
import fetch from "node-fetch";

// Implementação simples de Circuit Breaker
class ZangaoCaller {
    private failures = 0;
    private lastFailureTime = 0;
    private readonly failureThreshold = 3;
    private readonly resetTimeout = 60000; // 1 minuto

    async call(url: string, payload: any): Promise<any> {
        if (this.failures >= this.failureThreshold && Date.now() - this.lastFailureTime < this.resetTimeout) {
            throw new Error("Circuit breaker aberto - chamadas ao Zangão suspensas.");
        }
        try {
            const response = await fetch(url, { /* ... opções com timeout ... */ });
            if (!response.ok) throw new Error(`HTTP Error: ${response.status}`);
            this.failures = 0; // Sucesso, reseta o contador
            return await response.json();
        } catch (error) {
            this.failures++;
            this.lastFailureTime = Date.now();
            throw error;
        }
    }
}
const zangaoCaller = new ZangaoCaller(); // Singleton a nível de instância

// Na Azure Function:
// const nerResult = await zangaoCaller.call(ZANGAO_NER_URL, { text: pollen.articleText });

Papel: Operária de Roteamento (Gateway)

Descrição

Uma Operária de Roteamento aplica lógica de negócio determinística para direcionar o "Pólen". Baseando-se no seu conteúdo, ela pode enviá-lo para um único destino (desvio condicional) ou para múltiplos destinos simultaneamente (desvio paralelo), permitindo a criação de fluxos de trabalho complexos e dinâmicos.

Na Prática: Triagem de Alertas de Segurança

Um sistema de monitoramento gera eventos de segurança. Uma Operária de Roteamento em AWS Lambda analisa cada evento. Se a prioridade for "CRITICAL", o evento é enviado para um tópico SNS que notifica a equipe de plantão. Se a categoria for "Firewall", é enviado para uma fila SQS para análise de logs. Independentemente da prioridade, todos os eventos são enviados para um stream do Kinesis Firehose para arquivamento em S3.

graph TD O_Router{"Operária de Roteamento"} subgraph "Destinos (AWS)" T_SNS["Tópico SNS
(Alerta Crítico)"] Q_SQS["Fila SQS
(Análise de Firewall)"] S_Kinesis["Kinesis Firehose
(Arquivamento)"] end O_Router -- "Prioridade CRITICAL" --> T_SNS O_Router -- "Categoria Firewall" --> Q_SQS O_Router -- "Sempre" --> S_Kinesis

# aws_operaria_roteamento_seguranca.py (AWS Lambda)
import json, boto3

sns = boto3.client("sns")
sqs = boto3.client("sqs")
firehose = boto3.client("firehose")

SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123:TopicoCritico"
SQS_QUEUE_URL = "arn:aws:sqs:us-east-1:123:FilaFirewall"
FIREHOSE_STREAM_NAME = "stream-de-arquivamento"

def handler(event, context):
    pollen = json.loads(event['Records'][0]['body'])['payload']
    message_body = json.dumps(pollen)
    
    # Roteamento paralelo e condicional
    if pollen.get('priority') == 'CRITICAL':
        sns.publish(TopicArn=SNS_TOPIC_ARN, Message=message_body)
    
    if pollen.get('category') == 'Firewall':
        sqs.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message_body)
        
    # Roteamento obrigatório
    firehose.put_record(
        DeliveryStreamName=FIREHOSE_STREAM_NAME,
        Record={'Data': message_body.encode('utf-8')}
    )

// awsOperariaRoteamentoSeguranca.ts (AWS Lambda)
import { SQSHandler, SQSEvent } from 'aws-lambda';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { FirehoseClient, PutRecordCommand } from '@aws-sdk/client-firehose';

const sns = new SNSClient({});
const sqs = new SQSClient({});
const firehose = new FirehoseClient({});

export const handler: SQSHandler = async (event: SQSEvent): Promise<void> => {
    const pollen = JSON.parse(event.Records[0].body).payload;
    const messageBody = JSON.stringify(pollen);
    const promises = [];

    if (pollen.priority === 'CRITICAL') {
        promises.push(sns.send(new PublishCommand({TopicArn: "...", Message: messageBody})));
    }
    if (pollen.category === 'Firewall') {
        promises.push(sqs.send(new SendMessageCommand({QueueUrl: "...", MessageBody: messageBody})));
    }
    
    promises.push(firehose.send(new PutRecordCommand({
        DeliveryStreamName: "...",
        Record: { Data: Buffer.from(messageBody) }
    })));

    await Promise.all(promises);
};

Papel: Operária de Consolidação (Preparadora do Mel)

Descrição

A Operária de Consolidação é a guardiã da Rainha. Sua missão é preparar o "Mel Puro": um payload de dados final, coeso, factual e perfeitamente estruturado. Ela filtra o ruído e seleciona apenas as informações essenciais do "Pólen" para a tarefa de síntese da Rainha, otimizando a precisão e o custo da chamada final à LLM.

Para otimizar ainda mais, esta Operária pode invocar um último Zangão (ex: um sumarizador) para pré-processar um dado complexo, garantindo que a Rainha receba a informação mais concisa e relevante possível.

Na Prática: Geração de Relatório de Análise de Risco

Ao final de um pipeline de análise de crédito, uma Operária de Consolidação recebe um "Pólen" contendo o score de crédito do cliente, o histórico de transações e o resultado de uma verificação de identidade. Ela monta o "Mel Puro", um objeto JSON limpo contendo apenas os campos essenciais, e o envia para a Rainha com a instrução: "Gere um parágrafo de relatório de risco com base nestes dados".

graph TD subgraph Pipeline Fila["Fila: DadosParaConsolidar"] --> O_Cons["Operária de Consolidação"] O_Cons --> R["👑 Rainha"] end

# gcp_operaria_consolidacao.py
# (Assume a existência de uma classe 'Queen' que abstrai a chamada à LLM no Vertex AI)
# from rainha_vertex import Queen

class OperariaConsolidacao:
    def __init__(self, pollen): self.pollen = pollen
    def preparar_mel_e_chamar_rainha(self):
        # Seleciona apenas os dados factuais e essenciais do Pólen
        mel_puro = {
            "score_credito": self.pollen.get("credit_score"),
            "identidade_verificada": self.pollen.get("identity_verified"),
            "principais_transacoes": self.pollen.get("transaction_highlights", [])[:5]
        }
        # rainha = Queen(model_name="gemini-pro")
        # instrucao = "Gere um parágrafo de análise de risco para um novo cliente."
        # relatorio = rainha.make_decision(instrucao, mel_puro)
        return {"mel_puro": mel_puro, "relatorio_simulado": "Risco moderado."}

// gcpOperariaConsolidacao.ts
// import { Queen } from './rainhaVertex';

class OperariaConsolidacao {
    constructor(private pollen: any) {}
    public async prepararMelEChamarRainha(): Promise<any> {
        const melPuro = {
            scoreCredito: this.pollen.creditScore,
            identidadeVerificada: this.pollen.identityVerified,
            principaisTransacoes: (this.pollen.transactionHighlights || []).slice(0, 5),
        };
        // const rainha = new Queen({ modelId: 'gemini-pro' });
        // const instrucao = "Gere um parágrafo de análise de risco...";
        // const relatorio = await rainha.makeDecision(instrucao, melPuro);
        return { melPuro, relatorioSimulado: "Risco moderado." };
    }
}

Padrão Avançado: Operária de Compensação (Saga)

Descrição

Na `bee.arq`, o padrão Saga transcende a simples reversão de transações. Ele é adaptado para gerenciar a consistência de processos de análise de longa duração, onde a "compensação" pode significar reagir à falta de dados dentro de um prazo, em vez de reverter uma ação.

Na Prática: Análise de Processo Jurídico

Um processo jurídico é iniciado e a colmeia precisa consolidar a petição inicial, as provas e a contestação do réu para uma análise final da Rainha. A contestação tem um prazo legal para ser juntada. Uma Operária Coordenadora gerencia o estado. Se o prazo expira e a contestação não chega, ela aciona uma Operária de Compensação.

A compensação aqui não é um "delete", mas uma ação de negócio: a Operária de Compensação atualiza o status do "Pólen" para "INCOMPLETO_PRAZO_EXPIRADO", invoca um Zangão para gerar uma análise preliminar com os dados existentes e notifica a equipe jurídica sobre a situação para que tomem as medidas cabíveis.

sequenceDiagram participant User as Usuário participant OC as Op. Coordenadora participant Timer as Scheduler (EventBridge) participant OComp as Op. Compensação participant R as Rainha (Análise Final) User->>OC: Inicia Análise de Processo (Pólen v1) OC->>OC: Armazena estado: {peticao: ok, provas: ok, contestacao: pendente} OC->>Timer: Agenda evento de "Prazo Expirado" para D+15 alt Documento da contestação chega a tempo User->>OC: Envia Contestação OC->>OC: Atualiza estado: {contestacao: ok} OC->>Timer: Cancela evento "Prazo Expirado" OC->>R: Invoca com "Mel Puro" completo else Prazo expira Timer->>OC: Evento "Prazo Expirado" OC->>OComp: Feromônio de Compensação (Pólen v1) OComp->>OComp: Gera análise preliminar OComp->>User: Notifica "Análise gerada com dados parciais" end

# operaria_compensacao_juridica.py
class OperariaCompensacaoJuridica:
    def __init__(self, feromonio_de_falha):
        self.pollen = feromonio_de_falha['payload']
        self.reason = feromonio_de_falha['metadata']['reason']

    def compensate(self):
        if self.reason == "PRAZO_EXPIRADO":
            print(f"Compensando o processo {self.pollen['case_id']}: prazo expirado.")
            self._gerar_analise_preliminar()
            self._notificar_equipe_juridica()
            self.pollen['status'] = 'ANALISE_PARCIAL_CONCLUIDA'
        return self.pollen

    def _gerar_analise_preliminar(self):
        # Lógica para chamar um Zangão com os dados parciais
        print("-> Gerando análise preliminar...")

    def _notificar_equipe_juridica(self):
        # Lógica para enviar e-mail ou notificação via SNS/PubSub
        print("-> Notificando equipe sobre a incompletude dos dados.")

Padrão Avançado: Operária de Batch

Descrição

Para cenários de alta volumetria, como processamento de logs ou telemetria de IoT, processar cada evento individualmente é ineficiente. Uma Operária de Batch é projetada para agregar múltiplos "Pólens" (eventos) e processá-los em lote, otimizando o uso de recursos e reduzindo a latência total.

Características

  • Agregação: Usa gatilhos baseados em janela de tempo (ex: a cada 5 minutos) ou quantidade (ex: a cada 1000 eventos).
  • Processamento Paralelo: Pode usar paralelismo interno (ex: `ThreadPoolExecutor` em Python) para processar os itens do lote simultaneamente.
  • Isolamento (Bulkhead): O processamento em lote ajuda a isolar os recursos, evitando que um pico de eventos sobrecarregue os sistemas a jusante.
graph TD subgraph Fontes Individuais P1["Pólen 1"] P2["Pólen 2"] P3["..."] end Q["Fila / Stream
Agregadora"] subgraph Processamento OBatch["Operária de Batch"] subgraph Processamento Interno Paralelo T1["Thread 1"] T2["Thread 2"] T3["..."] end end P1 & P2 & P3 --> Q Q -- "Gatilho com Lote de 'Pólens'" --> OBatch OBatch --> T1 & T2 & T3

# Exemplo de Operária de Batch em AWS Lambda com gatilho SQS
from concurrent.futures import ThreadPoolExecutor
import json

def process_record(record):
    # Lógica para processar um único "Pólen" do lote
    pollen = json.loads(record['body'])['payload']
    # ... faz o trabalho ...
    return {"pollen_id": pollen['pollen_id'], "status": "processed"}

def handler(event, context):
    records = event['Records']
    
    # Processa o lote em paralelo para otimizar o tempo de execução da Lambda
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(process_record, records))
    
    print(f"Lote de {len(results)} itens processado com sucesso.")
    return {"status": "success", "processed_count": len(results)}

Padrão: Produção de Geleia Real (Observabilidade)

Descrição

Na `bee.arq`, "Geleia Real" é o termo para logs estruturados e telemetria. É uma diretriz fundamental que cada Operária produza "Geleia Real" para garantir a observabilidade do sistema. Isso permite rastreabilidade (tracing), monitoramento, auditoria e a criação de datasets para re-treinamento de Zangões.

Implementação

A interface de log deve ser enriquecida para suportar o tracing distribuído e a coleta de métricas, alinhando-se com as práticas de OpenTelemetry. Os logs são enviados para o serviço de observabilidade da nuvem escolhida (Amazon CloudWatch, Google Cloud Logging, Azure Monitor).

graph LR O["Operária"] -->|Produz log| Log["Log Estruturado (JSON)"] Log -->|Envia para| CloudLog["☁️ Serviço de Logging
(CloudWatch, GCP Logging, Azure Monitor)"] CloudLog -->|Permite| D["Dashboard"] CloudLog -->|Permite| A["Alertas"] CloudLog -->|Permite| T["Rastreabilidade"]

// Interface expandida para Geleia Real com Tracing e Métricas
interface GeleiaRealLog {
    trace_id: string;      // ID global da transação
    pollen_id: string;     // ID do dado sendo processado
    parent_span_id?: string; // ID da operação que chamou esta
    span_id: string;         // ID único para esta operação específica
    
    start_time: string;      // ISO 8601
    duration_ms?: number;
    
    operation: string;     // ex: "OperariaEnriquecimento:callZangaoNER"
    status: 'started' | 'success' | 'failed';
    details: string;       // Mensagem descritiva

    metrics?: {
        zangao_call_duration_ms?: number;
        records_in_batch?: number;
    };

    error?: {
        message: string;
        stack?: string;
        code?: string;
    };
}