# RF-MGN-018-003: Procesamiento de Mensajes **Módulo:** MGN-018 - AI Agents & Chatbots **Prioridad:** P1 **Story Points:** 8 **Estado:** Definido **Fecha:** 2025-12-05 ## Descripción El sistema debe procesar mensajes entrantes de diferentes canales (WhatsApp, Web Chat, Email), construir el contexto apropiado, consultar bases de conocimiento, invocar el modelo de IA, y entregar la respuesta al usuario. El procesamiento incluye manejo de historial de conversación, detección de intenciones, y decisiones de escalación. ## Actores - **Actor Principal:** Sistema (Message Processor) - **Actores Secundarios:** - Cliente Final (envía mensajes) - AI Agent (genera respuestas) - LLM Provider (procesa prompts) - Vector Store (búsqueda RAG) ## Precondiciones 1. Agente activo y configurado 2. Canal de comunicación configurado 3. Créditos de IA disponibles ## Flujo Principal - Procesar Mensaje 1. Sistema recibe mensaje de canal (webhook) 2. Sistema identifica conversación existente o crea nueva 3. Sistema obtiene configuración del agente 4. Sistema construye contexto: - Historial de conversación - Variables de sesión - Datos del contacto 5. Sistema realiza búsqueda RAG si hay KBs asignadas 6. Sistema construye prompt completo: - System prompt del agente - Contexto RAG - Historial - Mensaje actual 7. Sistema envía prompt a LLM 8. Sistema recibe respuesta 9. Sistema evalúa si hay tool calls 10. Si hay tools: ejecuta y reenvía a LLM 11. Sistema evalúa condiciones de escalación 12. Sistema guarda mensaje y respuesta 13. Sistema envía respuesta al canal 14. Sistema registra consumo de tokens ## Flujo Alternativo - Escalación 1. Sistema detecta trigger de escalación: - Palabra clave ("humano", "agente") - Sentimiento negativo detectado - Intent de escalación - Timeout de respuesta - Usuario lo solicita 2. Sistema envía mensaje de escalación 3. Sistema transfiere conversación a cola de agentes 4. Sistema notifica al equipo humano 5. Agente humano toma la conversación ## Pipeline de Procesamiento ``` ┌──────────────────────────────────────────────────────────────────────────┐ │ MESSAGE PROCESSOR │ ├──────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Receive │──▶│ Context │──▶│ RAG │──▶│ LLM │──▶│ Execute │ │ │ │ Message │ │ Builder │ │ Search │ │ Request │ │ Tools │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────┐ │ │ │ │ │Escalate?│─────────┤ │ │ │ └─────────┘ │ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ └───────────▶│ Store │◀───────────│ Format │◀──│ Send │ │ │ │ Message │ │Response │ │Response │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────┘ ``` ## Construcción de Contexto ### Context Builder ```typescript interface ConversationContext { // Información del contacto contact: { id: string; name: string; phone?: string; email?: string; custom_attributes: Record; }; // Historial de conversación history: Array<{ role: 'user' | 'assistant'; content: string; timestamp: Date; }>; // Sesión actual session: { id: string; started_at: Date; variables: Record; previous_intents: string[]; }; // Contexto RAG rag_context?: { chunks: Array<{ content: string; source: string; score: number; }>; formatted: string; }; // Datos del sistema system: { current_datetime: string; agent_name: string; company_name: string; }; } ``` ### Prompt Construction ```typescript function buildPrompt(agent: Agent, context: ConversationContext, userMessage: string): string { // 1. System prompt base let systemPrompt = agent.system_prompt; // 2. Reemplazar variables systemPrompt = systemPrompt .replace('{{company_name}}', context.system.company_name) .replace('{{agent_name}}', context.system.agent_name) .replace('{{customer_name}}', context.contact.name) .replace('{{current_date}}', context.system.current_datetime); // 3. Agregar contexto RAG si existe if (context.rag_context) { systemPrompt += `\n\n## Información Relevante:\n${context.rag_context.formatted}`; } // 4. Agregar instrucciones de herramientas if (agent.capabilities.tools_enabled.length > 0) { systemPrompt += `\n\n## Herramientas Disponibles:\n${formatToolDescriptions(agent.capabilities.tools_enabled)}`; } return systemPrompt; } ``` ## Manejo de Historial ```typescript interface HistoryConfig { // Límites max_messages: number; // Máximo de mensajes en historial (default: 20) max_tokens: number; // Máximo de tokens (default: 4000) // Estrategias de truncado truncation_strategy: 'oldest_first' | 'summarize' | 'smart'; // Resumen automático summarize_after: number; // Resumir después de N mensajes summary_prompt: string; } // Estrategia smart: mantiene primer mensaje, últimos N, y resumen del medio function smartTruncate(history: Message[], config: HistoryConfig): Message[] { if (history.length <= config.max_messages) return history; const first = history[0]; const last = history.slice(-config.max_messages + 2); // Resumir mensajes del medio const middle = history.slice(1, -config.max_messages + 2); const summary = summarizeMessages(middle); return [ first, { role: 'system', content: `[Resumen de conversación anterior: ${summary}]` }, ...last ]; } ``` ## Detección de Intención ```typescript interface IntentDetection { // Método de detección method: 'keyword' | 'llm' | 'hybrid'; // Keywords por intent keyword_intents: Record; // Intents conocidos known_intents: Array<{ intent: string; description: string; examples: string[]; action?: string; }>; } const defaultIntents: IntentDetection = { method: 'hybrid', keyword_intents: { 'escalate': ['humano', 'agente', 'persona', 'hablar con alguien'], 'order_status': ['pedido', 'orden', 'envío', 'dónde está'], 'pricing': ['precio', 'costo', 'cuánto cuesta', 'cotización'], 'support': ['ayuda', 'problema', 'no funciona', 'error'] }, known_intents: [ { intent: 'escalate', description: 'Usuario quiere hablar con persona', examples: ['Quiero hablar con un humano', 'Pásame con un agente'], action: 'trigger_escalation' } ] }; ``` ## Evaluación de Escalación ```typescript interface EscalationEvaluator { triggers: EscalationTrigger[]; evaluate(context: ConversationContext, response: string): EscalationDecision; } interface EscalationTrigger { type: 'keyword' | 'sentiment' | 'intent' | 'confidence' | 'loop_detection' | 'user_request'; condition: any; priority: 'low' | 'medium' | 'high' | 'urgent'; } interface EscalationDecision { should_escalate: boolean; reason?: string; priority?: string; context_to_pass?: object; } // Ejemplo de evaluación function evaluateEscalation( triggers: EscalationTrigger[], message: string, context: ConversationContext ): EscalationDecision { for (const trigger of triggers) { switch (trigger.type) { case 'keyword': if (matchesKeywords(message, trigger.condition)) { return { should_escalate: true, reason: 'keyword_match', priority: trigger.priority }; } break; case 'sentiment': const sentiment = analyzeSentiment(message); if (sentiment.negative > trigger.condition.threshold) { return { should_escalate: true, reason: 'negative_sentiment', priority: trigger.priority }; } break; case 'loop_detection': if (detectConversationLoop(context.history, trigger.condition.max_similar)) { return { should_escalate: true, reason: 'conversation_loop', priority: 'medium' }; } break; } } return { should_escalate: false }; } ``` ## Reglas de Negocio - **RN-1:** Timeout máximo de respuesta LLM: 30 segundos - **RN-2:** Historial máximo por defecto: 20 mensajes - **RN-3:** Reintentos a LLM: 3 con exponential backoff - **RN-4:** Rate limit por conversación: 20 mensajes/minuto - **RN-5:** Si LLM falla, usar mensaje de fallback configurado - **RN-6:** Siempre loguear consumo de tokens - **RN-7:** Conversaciones inactivas expiran en 24h ## Criterios de Aceptación - [ ] Mensajes de WhatsApp se procesan correctamente - [ ] Contexto incluye historial de conversación - [ ] Búsqueda RAG funciona cuando hay KBs - [ ] Tool calls se ejecutan correctamente - [ ] Escalación se dispara según configuración - [ ] Respuestas llegan en menos de 5 segundos (P95) - [ ] Tokens consumidos se registran - [ ] Fallback funciona cuando LLM falla - [ ] Rate limiting previene abuso - [ ] Streaming de respuestas soportado ## Entidades Involucradas ### ai_agents.conversations ```sql CREATE TABLE ai_agents.conversations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id UUID NOT NULL REFERENCES core_tenants.tenants(id), agent_id UUID NOT NULL REFERENCES ai_agents.agents(id), -- Canal y contacto channel VARCHAR(50) NOT NULL, -- whatsapp, webchat, email channel_conversation_id VARCHAR(100), -- ID en el canal (ej: WA conversation ID) contact_id UUID, -- REFERENCES catalogs.contacts(id) contact_identifier VARCHAR(100), -- Phone/email si no hay contacto -- Estado status VARCHAR(20) NOT NULL DEFAULT 'active', -- active, escalated, closed, expired -- Escalación escalated_at TIMESTAMPTZ, escalated_to UUID, -- Agent/Team ID escalation_reason VARCHAR(100), -- Variables de sesión session_variables JSONB DEFAULT '{}', -- Estadísticas message_count INT DEFAULT 0, tokens_used INT DEFAULT 0, -- Feedback satisfaction_rating INT, -- 1-5 feedback_text TEXT, -- Timestamps started_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, last_message_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, closed_at TIMESTAMPTZ, CONSTRAINT chk_conv_status CHECK (status IN ('active', 'escalated', 'closed', 'expired')) ); CREATE INDEX idx_conv_tenant ON ai_agents.conversations(tenant_id); CREATE INDEX idx_conv_agent ON ai_agents.conversations(agent_id); CREATE INDEX idx_conv_channel ON ai_agents.conversations(channel, channel_conversation_id); CREATE INDEX idx_conv_status ON ai_agents.conversations(status); ``` ### ai_agents.messages ```sql CREATE TABLE ai_agents.messages ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), conversation_id UUID NOT NULL REFERENCES ai_agents.conversations(id) ON DELETE CASCADE, -- Mensaje role VARCHAR(20) NOT NULL, -- user, assistant, system content TEXT NOT NULL, -- Tokens input_tokens INT, output_tokens INT, -- RAG rag_chunks_used JSONB, -- IDs de chunks usados rag_search_time_ms INT, -- Tools tool_calls JSONB, -- [{tool, args, result}] -- Metadata metadata JSONB DEFAULT '{}', -- { -- "model_used": "gpt-4o-mini", -- "response_time_ms": 1234, -- "intent_detected": "order_status" -- } -- External channel_message_id VARCHAR(100), -- ID del mensaje en el canal created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_messages_conversation ON ai_agents.messages(conversation_id); CREATE INDEX idx_messages_created ON ai_agents.messages(created_at); ``` ### ai_agents.usage_logs ```sql CREATE TABLE ai_agents.usage_logs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id UUID NOT NULL, agent_id UUID NOT NULL, conversation_id UUID, message_id UUID, -- Tipo de operación operation VARCHAR(50) NOT NULL, -- llm_request, embedding, tool_execution -- Proveedor y modelo provider VARCHAR(50), model VARCHAR(100), -- Consumo input_tokens INT DEFAULT 0, output_tokens INT DEFAULT 0, total_tokens INT DEFAULT 0, -- Costo estimado cost_usd DECIMAL(10,6), -- Timing latency_ms INT, -- Resultado success BOOLEAN DEFAULT true, error_message TEXT, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_usage_tenant ON ai_agents.usage_logs(tenant_id); CREATE INDEX idx_usage_created ON ai_agents.usage_logs(created_at); -- Partition by month para mejor rendimiento -- CREATE TABLE ai_agents.usage_logs_2025_01 PARTITION OF ai_agents.usage_logs -- FOR VALUES FROM ('2025-01-01') TO ('2025-02-01'); ``` ## Integración con WhatsApp (MGN-017) ```typescript // Webhook handler para mensajes de WhatsApp async function handleWhatsAppMessage(webhook: WhatsAppWebhook): Promise { const { phone_number_id, message } = webhook; // 1. Encontrar agente asignado al número const agent = await findAgentForWhatsAppNumber(phone_number_id); if (!agent) { // No hay agente, dejar para inbox manual return; } // 2. Encontrar o crear conversación const conversation = await findOrCreateConversation({ agent_id: agent.id, channel: 'whatsapp', channel_conversation_id: message.from, contact_identifier: message.from }); // 3. Procesar con el pipeline const response = await processMessage({ conversation_id: conversation.id, agent_id: agent.id, message: message.text?.body || '', channel: 'whatsapp' }); // 4. Enviar respuesta via WhatsApp API await sendWhatsAppMessage(phone_number_id, message.from, response.content); } ``` ## Configuración de Timeouts y Retries ```typescript interface ProcessingConfig { // Timeouts llm_timeout_ms: number; // 30000 rag_timeout_ms: number; // 5000 tool_timeout_ms: number; // 10000 total_timeout_ms: number; // 45000 // Retries llm_max_retries: number; // 3 llm_retry_delay_ms: number; // 1000 llm_retry_backoff: number; // 2 (exponential) // Rate limits messages_per_minute: number; // 20 tokens_per_minute: number; // 50000 // Streaming streaming_enabled: boolean; streaming_chunk_delay_ms: number; } ``` ## Referencias - [OpenAI Chat Completions](https://platform.openai.com/docs/api-reference/chat) - [Anthropic Messages](https://docs.anthropic.com/en/api/messages) - [LangChain Conversation Memory](https://python.langchain.com/docs/modules/memory/) ## Dependencias - **RF Requeridos:** RF-018-001 (Agentes), RF-018-002 (Knowledge Bases) - **Bloqueante para:** RF-018-004 (Acciones y Herramientas)