Padrões de Arquitetura

Implementação de Zangão

O especialista cognitivo puro. Aprenda a isolar a lógica de Inteligência Artificial, garantir máxima reutilização e proteger a sua Colmeia contra alucinações.

O Problema

Em sistemas que dependem de Grandes Modelos de Linguagem (LLMs), como podemos garantir que estes recursos, que são caros e complexos, sejam usados de forma eficiente? O problema central é duplo:

  1. Risco de Alucinação e Perda de Acurácia: Enviar dados brutos ou mal contextualizados para uma LLM (a Rainha) aumenta a probabilidade de respostas incorretas (alucinações). Forçar o modelo principal a realizar tarefas de pré-processamento, classificação ou extração — para as quais não é otimizado — diminui a qualidade da sua análise de síntese.
  2. Ineficiência e Custo: Como podemos garantir que a LLM foque apenas na sua tarefa principal (raciocínio complexo), em vez de desperdiçar ciclos e tokens em tarefas que podem ser executadas por modelos mais simples, mais rápidos e mais baratos?

O Custo da Generalização Excessiva

LLMs são ferramentas poderosas, mas caras e generalistas. Quando usadas para tarefas especializadas:

  • Custo computacional: Cada token processado tem um custo monetário e temporal
  • Ineficiência: Modelos grandes consomem mais recursos para tarefas simples
  • Complexidade: Manter a lógica em prompts torna o sistema frágil e difícil de evoluir

Os Zangões resolvem isso oferecendo:

  • Especialização: Modelos otimizados para tarefas específicas
  • Eficiência: Menor custo por operação
  • Manutenibilidade: Lógica encapsulada e testável

Adicionalmente, como assegurar que estes modelos especialistas (os Zangões) sejam integrados de forma a não criar um acoplamento forte que dificulte a manutenção e a reutilização noutros contextos?

Solução: O Padrão do Zangão Puro

A `bee.arq` resolve este problema tratando cada Zangão como um serviço puro, agnóstico e sem estado. A sua função principal é ser uma barreira contra a alucinação das LLMs. Ao garantir decisões especialistas sobre os dados, o Zangão diminui drasticamente a necessidade de a Rainha (e a sua LLM) trabalhar em processos desnecessários. Ele protege a Rainha de tarefas paralelas, permitindo que a LLM se foque numa análise pura, o que aumenta exponencialmente a acurácia (accuracy) sobre o dado.

Um Zangão bem projetado segue um conjunto de diretrizes estritas que garantem a sua máxima reutilização e desacoplamento. Ele não sabe nada sobre o fluxo de negócio; a sua única responsabilidade é executar a sua tarefa cognitiva com perfeição.

Padrão: O Contrato de Dados

Descrição

A interação com um Zangão deve ser sempre mediada por um contrato de dados formal e bem definido. Este contrato, especificado no "Pólen" que ele recebe, define exatamente o que o Zangão espera como entrada e o que ele promete devolver como saída. O output do Zangão torna-se o `payload` do próximo "Feromônio" que a Operária irá emitir.

Versionamento e Evolução

Contratos de dados devem seguir um versionamento semântico (SemVer) e incluir:

  • Documentação clara dos campos
  • Exemplos de uso
  • Política de compatibilidade

Implementação

Use ferramentas de validação de schema para definir os modelos de entrada e saída. Isto serve como a documentação viva do Zangão. O exemplo mostra o contrato de um Zangão e como o seu output se encaixa num Feromônio completo.


# contrato_zangao.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import date, datetime

# 1. Definição do contrato do Zangão
class InvoicePollenInput(BaseModel):
    pollen_id: str = Field(..., description="ID único do pólen")
    text_content: str = Field(..., description="Conteúdo textual para análise")

class ExtractedData(BaseModel):
    valor_total: Optional[float] = Field(None, description="Valor total da fatura")
    data_vencimento: Optional[date] = Field(None, description="Data de vencimento")

class InvoicePollenOutput(BaseModel):
    pollen_id: str = Field(..., description="ID do pólen processado")
    extracted_data: ExtractedData = Field(..., description="Dados extraídos")
    model_version: str = Field("1.2.0", description="Versão do modelo usado")

# 2. Definição do Feromônio que transportará o resultado
class Feromonio(BaseModel):
    event_version: str = Field("1.0", description="Versão do evento")
    timestamp: datetime = Field(..., description="Momento do processamento")
    trace_id: str = Field(..., description="ID para rastreamento distribuído")
    source: str = Field(..., description="Origem do evento") 
    pollen_id: str = Field(..., description="ID do pólen relacionado")
    type: str = Field(..., description="Tipo do evento")
    payload: Dict[str, Any] = Field(..., description="Dados do evento")

# 3. Versão estendida do contrato (compatível com versão anterior)
class InvoicePollenInputV2(InvoicePollenInput):
    metadata: Optional[Dict[str, Any]] = Field(
        None,
        description="Metadados adicionais para processamento",
        example={"priority": "high", "origin": "scanned_document"}
    )
    
    class Config:
        schema_extra = {
            "description": "Contrato para processamento de faturas",
            "examples": [
                {
                    "pollen_id": "uuid-123",
                    "text_content": "Fatura XYZ...",
                    "metadata": {"priority": "high"}
                }
            ]
        }

# Como a Operária cria o Feromônio após receber a resposta do Zangão
# zangao_response = InvoicePollenOutput(...)
# feromonio_para_proxima_etapa = Feromonio(
#     ...,
#     payload=zangao_response.model_dump() 
# )

// contrato_zangao.ts
import { Type, Static } from "@sinclair/typebox";
import { TypeCompiler } from "@sinclair/typebox/compiler";

// 1. Definição do contrato do Zangão
export const ExtractedDataSchema = Type.Object({
    valor_total: Type.Optional(Type.Number()),
    data_vencimento: Type.Optional(Type.String({ format: "date" })),
});

export const InvoicePollenInputSchema = Type.Object({
    pollen_id: Type.String(),
    text_content: Type.String(),
});

export const InvoicePollenOutputSchema = Type.Object({
    pollen_id: Type.String(),
    extracted_data: ExtractedDataSchema,
    model_version: Type.String(),
});

// 2. Definição do Feromônio que transportará o resultado
export const FeromonioSchema = Type.Object({
    event_version: Type.String(),
    timestamp: Type.String({ format: "date-time" }),
    trace_id: Type.String(),
    source: Type.String(),
    pollen_id: Type.String(),
    type: Type.String(),
    payload: Type.Record(Type.String(), Type.Any())
});

// 3. Versão estendida do contrato
export const InvoicePollenInputV2Schema = Type.Composite([
    InvoicePollenInputSchema,
    Type.Object({
        metadata: Type.Optional(Type.Record(Type.String(), Type.Any()))
    })
]);

// Compiladores para validação eficiente
export const InvoicePollenInputValidator = TypeCompiler.Compile(InvoicePollenInputSchema);
export const FeromonioValidator = TypeCompiler.Compile(FeromonioSchema);

export type InvoicePollenOutput = Static;
// const zangaoResponse: InvoicePollenOutput = ...;
// const feromonioParaProximaEtapa: Static = {
//      ...,
//      payload: zangaoResponse
// };

Padrão: Exposição como Unidade Computacional

Descrição

A essência de um Zangão não está em ser uma API, mas sim em ser uma unidade computacional independente e desacoplada. A sua forma de implementação é uma escolha técnica baseada em requisitos, não uma regra da arquitetura. O crucial é que a Operária que o chama não tem conhecimento da sua implementação interna; ela apenas conhece o contrato de dados e o "Feromônio" (o mecanismo de comunicação) acordado.

Padrões de Comunicação

Método Melhor Para Vantagens Desvantagens
API Gateway + Lambda/Container Exposição externa ou entre serviços Segurança, monitoração, versionamento Overhead de latência
Invocção Direta (Lambda-to-Lambda) Comunicação interna na mesma nuvem Baixa latência, baixo custo Sem controle de tráfego avançado
Fila de Mensagens Processamento assíncrono Escalabilidade, resiliência Latência maior, complexidade

Implementação

A escolha da implementação deve ser guiada pelos requisitos de performance, custo e complexidade.

  • API (via API Gateway + Função/Contentor): Ótimo para expor o Zangão a múltiplos clientes (internos ou externos) de forma segura e gerida. Padrão ideal para comunicação síncrona.
  • Função Serverless com Invocação Direta: Para comunicação síncrona de baixíssima latência entre duas funções na mesma nuvem (ex: Lambda invocando outro Lambda). Elimina a sobrecarga de um API Gateway, sendo mais rápido e barato para comunicação interna.
  • Função Serverless acionada por Fila: Ideal para Zangões que executam tarefas assíncronas e de longa duração, consumindo de uma fila de mensagens.
graph TD O(Operária):::processo subgraph "Comunicação (Feromônios)" API("API Gateway") Fila("Fila de Mensagens") Invocacao["Invocação Direta (SDK)"] end subgraph "Implementação do Zangão" Z_API([Zangão via API]):::zangao Z_Fila([Zangão via Fila]):::zangao Z_Direto([Zangão Invocado Diretamente]):::zangao end O -- "Chamada HTTP" --> API API --> Z_API O -- "Publica Mensagem" --> Fila Fila --> Z_Fila O -- "Invocação Nativa" --> Invocacao Invocacao --> Z_Direto classDef processo fill:#90CAF9,stroke:#1565C0,color:#222 classDef zangao fill:#FFE0B2,stroke:#EF6C00,color:#222

Exemplo: Invocação Direta Lambda-para-Lambda


# operaria-lambda.py (A que chama)
import boto3
import json
from tenacity import retry, stop_after_attempt, wait_exponential

lambda_client = boto3.client('lambda')

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def chamar_zangao_diretamente(pollen):
    """Chama um Zangão com política de retry"""
    try:
        response = lambda_client.invoke(
            FunctionName='arn:aws:lambda:us-east-1:123:function:zangao-classifier',
            InvocationType='RequestResponse', # Invocação síncrona
            Payload=json.dumps(pollen)
        )
        response_payload = json.loads(response['Payload'].read())
        
        # Verifica se houve erro na execução
        if 'errorMessage' in response_payload:
            raise Exception(response_payload['errorMessage'])
            
        return response_payload
    except Exception as e:
        print(f"Falha ao chamar Zangão: {str(e)}")
        raise

# zangao-lambda.py (A que é chamada)
def handler(event, context):
    # 'event' é o payload (pollen) enviado pela Operária
    pollen = event 
    
    # Health check endpoint
    if pollen.get('action') == 'health_check':
        return {
            'status': 'healthy',
            'version': '1.2.0',
            'timestamp': datetime.utcnow().isoformat()
        }
    
    # ... executa a lógica do Zangão ...
    try:
        resultado = {"doc_type": "contract", "confidence": 0.99}
        return resultado
    except Exception as e:
        return {
            'errorMessage': str(e),
            'errorType': type(e).__name__,
            'stackTrace': traceback.format_exc().splitlines()
        }

// operaria-lambda.ts (A que chama)
import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda";
import { retry } from "@aws-sdk/util-retry";

const lambdaClient = new LambdaClient({
    retryStrategy: retry({
        maxRetries: 3,
        delay: 1000 // 1 segundo inicial
    })
});

async function chamarZangaoDiretamente(pollen: any): Promise {
    const command = new InvokeCommand({
        FunctionName: 'arn:aws:lambda:us-east-1:123:function:zangao-classifier',
        InvocationType: 'RequestResponse',
        Payload: JSON.stringify(pollen),
    });

    try {
        const response = await lambdaClient.send(command);
        const responsePayload = JSON.parse(new TextDecoder().decode(response.Payload));
        
        if (responsePayload.errorMessage) {
            throw new Error(responsePayload.errorMessage);
        }
        
        return responsePayload;
    } catch (error: any) {
        console.error("Falha ao chamar Zangão:", error.message);
        throw error;
    }
}

// zangao-lambda.ts (A que é chamada)
interface HealthCheckResponse {
    status: string;
    version: string;
    timestamp: string;
}

export const handler = async (event: any, context: any): Promise => {
    // Health check endpoint
    if (event.action === 'health_check') {
        return {
            status: 'healthy',
            version: '1.2.0',
            timestamp: new Date().toISOString()
        } as HealthCheckResponse;
    }
    
    const pollen = event;
    try {
        // ... executa a lógica do Zangão ...
        const resultado = { doc_type: "contract", confidence: 0.99 };
        return resultado;
    } catch (error: any) {
        return {
            errorMessage: error.message,
            errorType: error.constructor.name,
            stackTrace: error.stack?.split('\n')
        };
    }
};

Padrão: Inovação e Desenvolvimento de Zangões

Descrição

O Zangão é o coração da especialização cognitiva na `bee.arq` e representa o principal esforço de inovação do projeto. A escolha de como construir um Zangão é uma decisão estratégica que depende do problema, do cronograma, do orçamento e da experiência da equipa. A arquitetura é flexível para acomodar diversas abordagens.

Abordagens Comparativas

Abordagem Melhor Para Vantagens Desvantagens
Modelos Comunitários Prototipagem rápida Baixo custo, fácil implantação Pode não atender necessidades específicas
Modelos Proprietários Tarefas especializadas Otimizado para domínio Requer dados e expertise
Proxy para LLM Tarefas complexas Reutiliza conhecimento geral Custo mais alto por operação

Na Prática

Um Zangão pode ser implementado de várias formas, cada uma com as suas vantagens:

  • Modelos de Comunidade (ex: Hugging Face): A forma mais rápida de validar uma ideia. Uma Operária pode chamar um Zangão que, internamente, descarrega e utiliza um modelo pré-treinado de uma plataforma como a Hugging Face. É excelente para prototipagem e para tarefas comuns com modelos bem estabelecidos.
  • Modelos Proprietários: Quando a tarefa é muito específica ou os dados são sensíveis, a equipa pode desenvolver e treinar o seu próprio modelo. Este modelo é o ativo intelectual do projeto, e o Zangão é o serviço que o expõe para a Colmeia.
  • Zangão como Proxy de LLM: Para tarefas complexas mas bem delimitadas (ex: tradução de alta qualidade ou sumarização de um tipo específico de texto), um Zangão pode atuar como um proxy para uma LLM de ponta. Em vez de a Operária construir um prompt complexo, ela envia apenas os dados necessários, e o Zangão tem um "prompt template" interno, otimizado para aquela tarefa. Embora possa ter um custo maior por chamada, abstrai a complexidade e cria um especialista reutilizável.
graph TD subgraph "Origens do Modelo do Zangão" HuggingFace["🤗
Hugging Face"] ModeloProprio["🧠
Modelo Proprietário"] LLM["🤖
Proxy para LLM"] end Z[Zangão] HuggingFace -->|Usa modelo| Z ModeloProprio -->|Usa modelo| Z LLM -->|Usa prompt template| Z style Z fill:#FFE0B2,stroke:#EF6C00

Exemplo: Zangão com Modelo Proprietário (PyTorch ResNet-18)


# zangao_resnet.py (FastAPI com PyTorch e TorchVision)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import torchvision.transforms as T
from torchvision.models import resnet18, ResNet18_Weights
from PIL import Image
import requests
from io import BytesIO
import logging

# Configuração de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 1. Carregar modelo pré-treinado (ou seu modelo proprietário)
weights = ResNet18_Weights.IMAGENET1K_V1
model = resnet18(weights=weights)
model.eval() # Colocar em modo de avaliação

# 2. Definir as transformações de imagem necessárias
preprocess = weights.transforms()

app = FastAPI(
    title="Zangão ResNet18",
    description="Serviço especializado em classificação de imagens",
    version="1.2.0"
)

class ImageInput(BaseModel):
    image_url: str
    metadata: Optional[dict] = None

@app.post("/classify-image")
def classify_image(data: ImageInput):
    """Endpoint principal para classificação de imagens"""
    try:
        logger.info(f"Processando imagem: {data.image_url}")
        
        # Em um caso real, esta URL poderia ser uma URI de S3/GCS
        response = requests.get(data.image_url, stream=True)
        response.raise_for_status()
        
        img = Image.open(BytesIO(response.content)).convert("RGB")
        
        # Aplicar pré-processamento
        batch = preprocess(img).unsqueeze(0)
        
        # Realizar predição
        with torch.no_grad():
            prediction = model(batch).squeeze(0).softmax(0)
            class_id = prediction.argmax().item()
            score = prediction[class_id].item()
            category_name = weights.meta["categories"][class_id]

        return {
            "predicted_category": category_name,
            "confidence": score,
            "model_version": "resnet18_v1",
            "metadata": data.metadata
        }
    except Exception as e:
        logger.error(f"Erro ao processar imagem: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
def health_check():
    """Endpoint para verificação de saúde"""
    return {
        "status": "healthy",
        "model": "resnet18",
        "version": "1.2.0"
    }

// operaria_chama_resnet.ts (Cliente do Zangão ResNet)
const ZANGAO_RESNET_URL = "http://localhost:8000/classify-image";

interface ImageClassificationInput {
    image_url: string;
    metadata?: Record;
}

interface ImageClassificationOutput {
    predicted_category: string;
    confidence: number;
    model_version: string;
    metadata?: Record;
}

async function analisarImagemComZangao(
    imageUrl: string,
    metadata?: Record
): Promise {
    const payload: ImageClassificationInput = { 
        image_url: imageUrl,
        metadata 
    };

    try {
        const response = await fetch(ZANGAO_RESNET_URL, {
            method: 'POST',
            body: JSON.stringify(payload),
            headers: { 'Content-Type': 'application/json' },
        });

        if (!response.ok) {
            const error = await response.json();
            throw new Error(error.detail || "Falha ao classificar imagem");
        }

        return response.json();
    } catch (error: any) {
        console.error("Erro ao chamar Zangão ResNet:", error.message);
        throw error;
    }
}

// Exemplo de uso com fallback
async function processarImagemComResiliencia(imageUrl: string) {
    try {
        return await analisarImagemComZangao(imageUrl);
    } catch (error) {
        console.warn("Falha com Zangão principal, tentando fallback...");
        return await analisarImagemComZangaoFallback(imageUrl);
    }
}

Padrão: Resiliência e Tolerância a Falhas

Descrição

Zangões especializados podem se tornar pontos únicos de falha no sistema. Para garantir a resiliência da colmeia, cada Zangão deve implementar padrões que permitam lidar com falhas de forma elegante.

Implementação

Os principais padrões de resiliência para Zangões incluem:

1. Circuit Breaker

Evita chamadas repetidas a um Zangão que está falhando:


# Python usando tenacity
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    retry=retry_if_exception_type((TimeoutError, ConnectionError))
)
def call_zangao_with_circuit(pollen):
    """Chama um Zangão com política de retry inteligente"""
    # Implementação da chamada aqui
    pass

// TypeScript usando axios-retry
import axios from 'axios';
import axiosRetry from 'axios-retry';

axiosRetry(axios, {
    retries: 3,
    retryDelay: (retryCount) => {
        return retryCount * 1000; // 1s, 2s, 3s
    },
    retryCondition: (error) => {
        return axiosRetry.isNetworkError(error) || 
               axiosRetry.isRetryableError(error) ||
               error.response?.status === 429;
    }
});

async function callZangaoWithRetry(pollen: any) {
    const response = await axios.post(ZANGAO_URL, pollen);
    return response.data;
}

2. Fallback Strategies

Fornece alternativas quando o Zangão principal falha:


def process_with_fallback(pollen):
    """Tenta o Zangão principal, depois fallbacks"""
    strategies = [
        primary_zangao,
        fallback_zangao_1,
        fallback_zangao_2,
        ultimate_fallback
    ]
    
    last_error = None
    for strategy in strategies:
        try:
            return strategy(pollen)
        except Exception as e:
            print(f"Falha com estratégia {strategy.__name__}: {e}")
            last_error = e
    
    raise Exception("Todas estratégias falharam") from last_error

async function processWithFallback(pollen: any) {
    const strategies = [
        primaryZangao,
        fallbackZangao1,
        fallbackZangao2
    ];
    
    let lastError: Error | null = null;
    for (const strategy of strategies) {
        try {
            return await strategy(pollen);
        } catch (error: any) {
            console.error(`Falha com estratégia ${strategy.name}:`, error.message);
            lastError = error;
        }
    }
    
    throw new Error("Todas estratégias falharam", { cause: lastError });
}

3. Health Checks

Permite monitorar a saúde do Zangão:


@app.get("/health")
def health_check():
    """Endpoint de health check"""
    try:
        # Verifica se o modelo está carregado
        if not model:
            raise Exception("Modelo não carregado")
            
        # Teste de predição simples
        test_input = torch.rand(1, 3, 224, 224)
        with torch.no_grad():
            model(test_input)
            
        return {
            "status": "healthy",
            "version": "1.2.0",
            "timestamp": datetime.utcnow().isoformat()
        }
    except Exception as e:
        raise HTTPException(
            status_code=503,
            detail=f"Service Unhealthy: {str(e)}"
        )

app.get('/health', (req, res) => {
    try {
        // Verificações de saúde
        if (!modelLoaded) {
            throw new Error('Model not loaded');
        }
        
        // Teste simples do modelo
        const testInput = new Tensor(new Float32Array(3 * 224 * 224), [1, 3, 224, 224]);
        model.predict(testInput);
        
        res.json({
            status: 'healthy',
            version: '1.2.0',
            timestamp: new Date().toISOString()
        });
    } catch (error: any) {
        res.status(503).json({
            status: 'unhealthy',
            error: error.message
        });
    }
});
graph TD O[Operária] -->|Chamada| Z1[Zangão Principal] Z1 -->|Falha| O O -->|Tenta| Z2[Zangão Fallback 1] Z2 -->|Falha| O O -->|Tenta| Z3[Zangão Fallback 2] Z3 -->|Sucesso| O classDef operaria fill:#90CAF9,stroke:#1565C0 classDef zangao fill:#FFE0B2,stroke:#EF6C00 classDef fallback fill:#C8E6C9,stroke:#2E7D32 class O operaria class Z1,Z2,Z3 zangao class Z2,Z3 fallback

Padrão: Zangão com RAG (Retrieval-Augmented Generation)

Descrição

Zangões RAG combinam recuperação de informação com geração de respostas, ideal para sistemas que precisam acessar conhecimentos específicos durante o processamento.

Implementação

Um Zangão RAG típico possui três componentes principais:

  1. Base de Conhecimento Vetorial: Armazena informações especializadas em formato vetorial
  2. Mecanismo de Recuperação: Encontra informações relevantes para a consulta
  3. Modelo Generativo: Sintetiza a resposta final com base nas informações recuperadas
graph LR O[Operária] -->|Consulta| Z[Zangão RAG] Z -->|1. Recupera| KB[Base de Conhecimento] KB -->|Documentos| Z Z -->|2. Gera| LLM[Modelo Generativo] LLM -->|Resposta| Z Z -->|Resposta Final| O classDef operaria fill:#90CAF9,stroke:#1565C0 classDef zangao fill:#FFE0B2,stroke:#EF6C00 classDef kb fill:#D1C4E9,stroke:#5E35B1 classDef llm fill:#BBDEFB,stroke:#1976D2 class O operaria class Z zangao class KB kb class LLM llm

Exemplo de Implementação


# zangao_rag.py
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from pydantic import BaseModel
from fastapi import FastAPI

app = FastAPI(title="Zangão RAG")

class QueryInput(BaseModel):
    question: str
    context: Optional[dict] = None

class RAGZangao:
    def __init__(self, knowledge_base_path):
        # 1. Carrega base de conhecimento vetorial
        self.embeddings = HuggingFaceEmbeddings()
        self.db = FAISS.load_local(knowledge_base_path, self.embeddings)
        
        # 2. Configura cadeia RAG
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=OpenAI(temperature=0),
            chain_type="stuff",
            retriever=self.db.as_retriever(search_kwargs={"k": 3}),
            return_source_documents=True
        )
    
    def query(self, question: str, context: dict = None) -> dict:
        """Processa uma consulta usando RAG"""
        augmented_question = f"Contexto: {context}\n\nPergunta: {question}" if context else question
        
        result = self.qa_chain({"query": augmented_question})
        
        return {
            "answer": result["result"],
            "sources": [doc.metadata["source"] for doc in result["source_documents"]],
            "confidence": self._calculate_confidence(result)
        }
    
    def _calculate_confidence(self, result) -> float:
        """Calcula confiança baseada na similaridade dos documentos"""
        if not result["source_documents"]:
            return 0.0
        return sum([doc.metadata.get("score", 0.5) for doc in result["source_documents"]]) / len(result["source_documents"])

# Inicialização (em produção, seria um singleton)
rag_zangao = RAGZangao("caminho/para/base_de_conhecimento")

@app.post("/query")
async def handle_query(query: QueryInput):
    return rag_zangao.query(query.question, query.context)

@app.get("/health")
async def health_check():
    return {"status": "healthy", "docs_loaded": rag_zangao.db.index.ntotal}

// zangao_rag.ts
import { HNSWLib } from "langchain/vectorstores/hnswlib";
import { OpenAIEmbeddings } from "langchain/embeddings/openai";
import { RetrievalQAChain } from "langchain/chains";
import { OpenAI } from "langchain/llms/openai";
import express from 'express';

const app = express();
app.use(express.json());

interface QueryInput {
    question: string;
    context?: Record;
}

interface RAGResult {
    answer: string;
    sources: string[];
    confidence: number;
}

class RAGZangao {
    private vectorStore: HNSWLib;
    private qaChain: RetrievalQAChain;

    constructor(knowledgeBasePath: string) {
        this.initialize(knowledgeBasePath);
    }

    private async initialize(kbPath: string) {
        // 1. Carrega base de conhecimento vetorial
        this.vectorStore = await HNSWLib.load(
            kbPath,
            new OpenAIEmbeddings()
        );

        // 2. Configura cadeia RAG
        const model = new OpenAI({ temperature: 0 });
        this.qaChain = RetrievalQAChain.fromLLM(
            model,
            this.vectorStore.asRetriever(3)
        );
    }

    public async query(input: QueryInput): Promise {
        const augmentedQuestion = input.context 
            ? `Contexto: ${JSON.stringify(input.context)}\n\nPergunta: ${input.question}`
            : input.question;

        const result = await this.qaChain.call({
            query: augmentedQuestion
        });

        return {
            answer: result.text,
            sources: result.sourceDocuments?.map((doc: any) => doc.metadata.source) || [],
            confidence: this.calculateConfidence(result.sourceDocuments)
        };
    }

    private calculateConfidence(docs: any[]): number {
        if (!docs || docs.length === 0) return 0;
        return docs.reduce((sum, doc) => sum + (doc.metadata.score || 0.5), 0) / docs.length;
    }
}

// Uso (em produção, seria um singleton)
const ragZangao = new RAGZangao("caminho/para/base_de_conhecimento");

app.post('/query', async (req, res) => {
    try {
        const result = await ragZangao.query(req.body);
        res.json(result);
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

app.get('/health', async (req, res) => {
    res.json({ 
        status: 'healthy',
        docsLoaded: ragZangao.vectorStore?.index?.getCurrentCount() || 0
    });
});

app.listen(3000, () => console.log('Zangão RAG ouvindo na porta 3000'));

Padrão: Preparação para Re-treinamento

Descrição

Um Zangão inteligente é um Zangão que aprende. Embora a lógica de re-treinamento em si pertença ao ciclo da "Geleia Real", o Zangão deve ser construído de forma a facilitar este processo. Isto significa que os dados que ele processa (pares de input/output) são um ativo valioso que deve ser facilmente aproveitável.

Ciclo de Vida do Aprendizado

  1. Coleta de Dados: Operárias registram inputs e outputs válidos
  2. Validação: Verificar qualidade e balanceamento dos dados
  3. Treinamento: Pipeline reprodutível com ferramentas como MLFlow
  4. Implantação: Deployment canário e A/B testing
  5. Monitoramento: Métricas de performance em produção

Implementação

O próprio Zangão não armazena os dados, mas a Operária que o chama é responsável por registar o par `(input, output)` na "Geleia Real" (um sistema de logging ou base de dados). O Zangão contribui para este processo ao incluir metadados importantes na sua resposta, como a `model_version` utilizada na predição. Isso permite, no futuro, treinar novas versões do modelo e comparar a sua performance com as anteriores.

graph TD subgraph "Pipeline de Execução" O1(Operária):::processo Z(Zangão v1.2):::zangao end subgraph "Ciclo de Feedback" GR[Geleia Real]:::feedbackdb FB[Feedback do Utilizador] RT[Processo de Re-treinamento]:::feedback Z_v2(Zangão v1.3):::zangao end O1 -- "1. Envia Input" --> Z Z -- "2. Retorna Output + Versão" --> O1 O1 -- "3. Log (Input, Output, Versão)" --> GR FB --> GR GR -- "4. Dataset" --> RT RT -- "5. Publica" --> Z_v2 classDef processo fill:#90CAF9,stroke:#1565C0,color:#222 classDef zangao fill:#FFE0B2,stroke:#EF6C00,color:#222 classDef feedbackdb fill:#D1FAE5,stroke:#065F46 classDef feedback fill:#A7FFEB,stroke:#00897B

Exemplo: Log para Re-treinamento


# operaria_com_geleia.py
import json
from datetime import datetime
from pydantic import BaseModel

class TrainingLogEntry(BaseModel):
    timestamp: datetime
    pollen_id: str
    model_version: str
    input_data: dict
    output_data: dict
    user_feedback: Optional[dict] = None
    metrics: Optional[dict] = None

def log_for_retraining(pollen_id: str, input_data: dict, 
                      output_data: dict, model_version: str):
    """Registra dados para re-treinamento futuro"""
    log_entry = TrainingLogEntry(
        timestamp=datetime.utcnow(),
        pollen_id=pollen_id,
        model_version=model_version,
        input_data=input_data,
        output_data=output_data
    )
    
    # Em produção, enviaria para um serviço de logging ou S3
    with open("training_logs.jsonl", "a") as f:
        f.write(log_entry.json() + "\n")
        
    return log_entry

# Após chamar um Zangão:
# log_for_retraining(
#     pollen_id="123",
#     input_data={"text": "..."},
#     output_data=zangao_response,
#     model_version=zangao_response["model_version"]
# )

// operaria_com_geleia.ts
interface TrainingLogEntry {
    timestamp: string;
    pollen_id: string;
    model_version: string;
    input_data: Record;
    output_data: Record;
    user_feedback?: Record;
    metrics?: Record;
}

async function logForRetraining(
    pollenId: string,
    inputData: Record,
    outputData: Record,
    modelVersion: string
): Promise {
    const logEntry: TrainingLogEntry = {
        timestamp: new Date().toISOString(),
        pollen_id: pollenId,
        model_version: modelVersion,
        input_data: inputData,
        output_data: outputData
    };
    
    // Em produção, enviaria para um serviço de logging ou S3
    await appendToLogFile(logEntry);
    
    return logEntry;
}

// Exemplo de uso:
// const zangaoResponse = await callZangao(pollen);
// await logForRetraining(
//     pollen.pollen_id,
//     { text: pollen.text_content },
//     zangaoResponse,
//     zangaoResponse.model_version
// );