--- id: "ET-INV-004" title: "Integración con Agentes ML" type: "Technical Specification" status: "Done" priority: "Alta" epic: "OQI-004" project: "trading-platform" version: "1.0.0" created_date: "2025-12-05" updated_date: "2026-01-04" --- # ET-INV-004: Integración con Agentes ML **Epic:** OQI-004 Cuentas de Inversión **Versión:** 1.0 **Fecha:** 2025-12-05 **Responsable:** Requirements-Analyst --- ## 1. Descripción Define la integración entre el sistema de cuentas de inversión y el ML Engine (Python FastAPI) que ejecuta los agentes de trading: - Comunicación bidireccional con ML Engine - Notificación de depósitos y retiros - Recepción de trades ejecutados - Sincronización de balances - Distribución de utilidades --- ## 2. Arquitectura de Integración ``` ┌─────────────────────────────────────────────────────────────────┐ │ Investment ↔ ML Engine Integration │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Investment Backend ML Engine (Python FastAPI) │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ │ │ │ │ │ │ Deposit Event │──────►│ Update Capital │ │ │ │ │ │ │ │ │ └──────────────────┘ └──────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ Trading Agent │ │ │ │ (Swing/Day) │ │ │ └──────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ │◄──────│ Trade Executed │ │ │ │ Update Balance │ │ (Webhook) │ │ │ │ │ │ │ │ │ └──────────────────┘ └──────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ Daily Performance│ │ │ │ Calculation │ │ │ └──────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ Monthly Profit │ │ │ │ Distribution │ │ │ └──────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 3. ML Engine API Specification ### 3.1 Endpoints del ML Engine ```typescript // ML Engine base URL const ML_ENGINE_URL = 'http://ml-engine:8000/api/v1'; // Endpoints disponibles interface MLEngineEndpoints { // Gestión de cuentas createAccount: '/trading/accounts', updateCapital: '/trading/accounts/:id/capital', pauseAccount: '/trading/accounts/:id/pause', resumeAccount: '/trading/accounts/:id/resume', closeAccount: '/trading/accounts/:id/close', // Consulta de datos getAccountStatus: '/trading/accounts/:id/status', getTrades: '/trading/accounts/:id/trades', getPerformance: '/trading/accounts/:id/performance', // Configuración de agentes getAgentConfig: '/agents/:agent_id/config', updateAgentConfig: '/agents/:agent_id/config', } ``` --- ## 4. Implementación del ML Engine Service ### 4.1 ML Engine Service Class ```typescript // src/services/ml-engine/ml-engine.service.ts import axios, { AxiosInstance } from 'axios'; import { logger } from '../../utils/logger'; import { AppError } from '../../utils/errors'; export interface CreateMLAccountDto { account_id: string; product_id: string; agent_type: string; initial_capital: number; agent_config?: Record; } export interface UpdateCapitalDto { account_id: string; amount: number; operation: 'deposit' | 'withdrawal'; new_total_capital: number; } export interface MLTradeDto { trade_id: string; account_id: string; symbol: string; side: 'buy' | 'sell'; quantity: number; entry_price: number; exit_price?: number; profit_loss?: number; status: 'open' | 'closed'; executed_at: string; } export interface MLPerformanceDto { account_id: string; date: string; opening_balance: number; closing_balance: number; daily_return: number; trades_executed: number; winning_trades: number; losing_trades: number; } export class MLEngineService { private client: AxiosInstance; private baseURL: string; private apiKey: string; constructor() { this.baseURL = process.env.ML_ENGINE_URL || 'http://localhost:8000/api/v1'; this.apiKey = process.env.ML_ENGINE_API_KEY || ''; this.client = axios.create({ baseURL: this.baseURL, timeout: 30000, headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiKey, }, }); // Interceptor para logging this.client.interceptors.request.use((config) => { logger.info('ML Engine request', { method: config.method, url: config.url, data: config.data, }); return config; }); this.client.interceptors.response.use( (response) => response, (error) => { logger.error('ML Engine error', { url: error.config?.url, status: error.response?.status, data: error.response?.data, }); return Promise.reject(error); } ); } /** * Crea una nueva cuenta en ML Engine */ async createAccount(data: CreateMLAccountDto): Promise { try { await this.client.post('/trading/accounts', { account_id: data.account_id, product_id: data.product_id, agent_type: data.agent_type, initial_capital: data.initial_capital, agent_config: data.agent_config || {}, }); logger.info('ML account created', { account_id: data.account_id }); } catch (error: any) { logger.error('Failed to create ML account', { error: error.message, account_id: data.account_id, }); throw new AppError('Failed to initialize trading account', 500); } } /** * Notifica depósito al ML Engine */ async notifyDeposit(data: { account_id: string; product_id: string; amount: number; new_balance: number; }): Promise { try { await this.client.post(`/trading/accounts/${data.account_id}/capital`, { operation: 'deposit', amount: data.amount, new_total_capital: data.new_balance, }); logger.info('Deposit notified to ML Engine', { account_id: data.account_id, amount: data.amount, }); } catch (error: any) { logger.error('Failed to notify deposit', { error: error.message, account_id: data.account_id, }); // No lanzar error, el depósito ya se procesó en nuestra DB } } /** * Notifica retiro al ML Engine */ async notifyWithdrawal(data: { account_id: string; amount: number; new_balance: number; }): Promise { try { await this.client.post(`/trading/accounts/${data.account_id}/capital`, { operation: 'withdrawal', amount: data.amount, new_total_capital: data.new_balance, }); logger.info('Withdrawal notified to ML Engine', { account_id: data.account_id, amount: data.amount, }); } catch (error: any) { logger.error('Failed to notify withdrawal', { error: error.message, account_id: data.account_id, }); } } /** * Pausa trading en una cuenta */ async pauseAccount(accountId: string): Promise { try { await this.client.post(`/trading/accounts/${accountId}/pause`); logger.info('Account paused in ML Engine', { account_id: accountId }); } catch (error: any) { logger.error('Failed to pause account', { error: error.message, account_id: accountId, }); throw new AppError('Failed to pause trading', 500); } } /** * Reanuda trading en una cuenta */ async resumeAccount(accountId: string): Promise { try { await this.client.post(`/trading/accounts/${accountId}/resume`); logger.info('Account resumed in ML Engine', { account_id: accountId }); } catch (error: any) { logger.error('Failed to resume account', { error: error.message, account_id: accountId, }); throw new AppError('Failed to resume trading', 500); } } /** * Cierra una cuenta en ML Engine */ async closeAccount(accountId: string): Promise { try { await this.client.post(`/trading/accounts/${accountId}/close`); logger.info('Account closed in ML Engine', { account_id: accountId }); } catch (error: any) { logger.error('Failed to close account', { error: error.message, account_id: accountId, }); throw new AppError('Failed to close trading account', 500); } } /** * Obtiene el estado actual de una cuenta */ async getAccountStatus(accountId: string): Promise { try { const response = await this.client.get( `/trading/accounts/${accountId}/status` ); return response.data; } catch (error: any) { logger.error('Failed to get account status', { error: error.message, account_id: accountId, }); throw new AppError('Failed to get trading status', 500); } } /** * Obtiene trades ejecutados */ async getTrades(accountId: string, filters?: { start_date?: string; end_date?: string; status?: string; }): Promise { try { const response = await this.client.get( `/trading/accounts/${accountId}/trades`, { params: filters } ); return response.data.trades; } catch (error: any) { logger.error('Failed to get trades', { error: error.message, account_id: accountId, }); return []; } } /** * Obtiene performance histórica */ async getPerformance(accountId: string, filters?: { start_date?: string; end_date?: string; }): Promise { try { const response = await this.client.get( `/trading/accounts/${accountId}/performance`, { params: filters } ); return response.data.performance; } catch (error: any) { logger.error('Failed to get performance', { error: error.message, account_id: accountId, }); return []; } } /** * Verifica health del ML Engine */ async healthCheck(): Promise { try { const response = await this.client.get('/health'); return response.status === 200; } catch (error) { return false; } } } ``` --- ## 5. Webhook Handler para Trades ### 5.1 ML Engine Webhook Service ```typescript // src/services/ml-engine/ml-webhook.service.ts import { Request } from 'express'; import crypto from 'crypto'; import { InvestmentRepository } from '../../modules/investment/investment.repository'; import { logger } from '../../utils/logger'; import { MLTradeDto } from './ml-engine.service'; export class MLWebhookService { private investmentRepo: InvestmentRepository; private webhookSecret: string; constructor() { this.investmentRepo = new InvestmentRepository(); this.webhookSecret = process.env.ML_ENGINE_WEBHOOK_SECRET || ''; } /** * Procesa webhook del ML Engine */ async handleWebhook(req: Request): Promise { // Verificar firma this.verifySignature(req); const { event_type, data } = req.body; switch (event_type) { case 'trade.executed': await this.handleTradeExecuted(data); break; case 'daily.performance': await this.handleDailyPerformance(data); break; case 'account.balance_updated': await this.handleBalanceUpdated(data); break; case 'account.error': await this.handleAccountError(data); break; default: logger.warn('Unhandled ML webhook event', { event_type }); } } /** * Verifica firma HMAC del webhook */ private verifySignature(req: Request): void { const signature = req.headers['x-ml-signature'] as string; if (!signature) { throw new Error('Missing webhook signature'); } const payload = JSON.stringify(req.body); const expectedSignature = crypto .createHmac('sha256', this.webhookSecret) .update(payload) .digest('hex'); if (signature !== expectedSignature) { throw new Error('Invalid webhook signature'); } } /** * Maneja trade ejecutado */ private async handleTradeExecuted(trade: MLTradeDto): Promise { logger.info('Trade executed webhook received', { trade_id: trade.trade_id, account_id: trade.account_id, symbol: trade.symbol, profit_loss: trade.profit_loss, }); try { const account = await this.investmentRepo.getAccountById(trade.account_id); if (!account) { logger.error('Account not found for trade', { account_id: trade.account_id, }); return; } // Si el trade está cerrado y tiene P&L, actualizar balance if (trade.status === 'closed' && trade.profit_loss !== undefined) { const newBalance = account.current_balance + trade.profit_loss; await this.investmentRepo.updateAccount(account.id, { current_balance: newBalance, }); logger.info('Account balance updated from trade', { account_id: account.id, old_balance: account.current_balance, new_balance: newBalance, profit_loss: trade.profit_loss, }); } // Guardar información del trade (opcional, si quieres histórico) // await this.investmentRepo.saveTrade(trade); } catch (error: any) { logger.error('Error handling trade executed', { error: error.message, trade_id: trade.trade_id, }); } } /** * Maneja performance diaria */ private async handleDailyPerformance(data: MLPerformanceDto): Promise { logger.info('Daily performance webhook received', { account_id: data.account_id, date: data.date, daily_return: data.daily_return, }); try { // Guardar en tabla daily_performance await this.investmentRepo.createDailyPerformance({ account_id: data.account_id, date: data.date, opening_balance: data.opening_balance, closing_balance: data.closing_balance, daily_return: data.daily_return, daily_return_percentage: (data.daily_return / data.opening_balance) * 100, trades_executed: data.trades_executed, winning_trades: data.winning_trades, losing_trades: data.losing_trades, }); // Actualizar balance actual de la cuenta await this.investmentRepo.updateAccount(data.account_id, { current_balance: data.closing_balance, }); logger.info('Daily performance saved', { account_id: data.account_id }); } catch (error: any) { logger.error('Error handling daily performance', { error: error.message, account_id: data.account_id, }); } } /** * Maneja actualización de balance */ private async handleBalanceUpdated(data: { account_id: string; new_balance: number; reason: string; }): Promise { logger.info('Balance updated webhook received', { account_id: data.account_id, new_balance: data.new_balance, reason: data.reason, }); try { await this.investmentRepo.updateAccount(data.account_id, { current_balance: data.new_balance, }); } catch (error: any) { logger.error('Error updating balance', { error: error.message, account_id: data.account_id, }); } } /** * Maneja error en cuenta */ private async handleAccountError(data: { account_id: string; error_type: string; error_message: string; }): Promise { logger.error('ML Engine account error', { account_id: data.account_id, error_type: data.error_type, error_message: data.error_message, }); try { // Pausar cuenta si hay error crítico if (data.error_type === 'critical') { await this.investmentRepo.updateAccount(data.account_id, { status: 'paused', }); // Notificar al usuario // await notificationService.sendAccountError(data.account_id, data.error_message); } } catch (error: any) { logger.error('Error handling account error', { error: error.message, account_id: data.account_id, }); } } } ``` ### 5.2 Webhook Route ```typescript // src/routes/ml-webhooks.routes.ts import { Router, Request, Response } from 'express'; import { MLWebhookService } from '../services/ml-engine/ml-webhook.service'; import { logger } from '../utils/logger'; const router = Router(); const mlWebhookService = new MLWebhookService(); router.post('/ml-engine', async (req: Request, res: Response) => { try { await mlWebhookService.handleWebhook(req); res.status(200).json({ received: true }); } catch (error: any) { logger.error('ML webhook processing error', { error: error.message }); res.status(400).send(`Webhook Error: ${error.message}`); } }); export default router; ``` --- ## 6. Sincronización de Datos ### 6.1 Sincronización Diaria ```typescript // src/jobs/sync-ml-performance.job.ts import { CronJob } from 'cron'; import { MLEngineService } from '../services/ml-engine/ml-engine.service'; import { InvestmentRepository } from '../modules/investment/investment.repository'; import { logger } from '../utils/logger'; export class SyncMLPerformanceJob { private mlEngineService: MLEngineService; private investmentRepo: InvestmentRepository; private job: CronJob; constructor() { this.mlEngineService = new MLEngineService(); this.investmentRepo = new InvestmentRepository(); // Ejecutar diariamente a las 00:30 UTC this.job = new CronJob('30 0 * * *', () => this.run()); } async run(): Promise { logger.info('Starting ML performance sync job'); try { // Obtener todas las cuentas activas const accounts = await this.investmentRepo.getActiveAccounts(); for (const account of accounts) { try { // Obtener performance del día anterior desde ML Engine const yesterday = new Date(); yesterday.setDate(yesterday.getDate() - 1); const dateStr = yesterday.toISOString().split('T')[0]; const performance = await this.mlEngineService.getPerformance( account.id, { start_date: dateStr, end_date: dateStr, } ); if (performance.length > 0) { const dailyPerf = performance[0]; // Guardar en DB si no existe await this.investmentRepo.upsertDailyPerformance({ account_id: account.id, date: dailyPerf.date, opening_balance: dailyPerf.opening_balance, closing_balance: dailyPerf.closing_balance, daily_return: dailyPerf.daily_return, trades_executed: dailyPerf.trades_executed, winning_trades: dailyPerf.winning_trades, losing_trades: dailyPerf.losing_trades, }); // Actualizar balance actual await this.investmentRepo.updateAccount(account.id, { current_balance: dailyPerf.closing_balance, }); } } catch (error: any) { logger.error('Error syncing account performance', { account_id: account.id, error: error.message, }); } } logger.info('ML performance sync job completed'); } catch (error: any) { logger.error('ML performance sync job failed', { error: error.message }); } } start(): void { this.job.start(); logger.info('ML performance sync job scheduled'); } stop(): void { this.job.stop(); } } ``` --- ## 7. Configuración ### 7.1 Variables de Entorno ```bash # ML Engine ML_ENGINE_URL=http://ml-engine:8000/api/v1 ML_ENGINE_API_KEY=ml_engine_secret_key_abc123 ML_ENGINE_WEBHOOK_SECRET=ml_webhook_secret_xyz789 # ML Engine Timeouts ML_ENGINE_TIMEOUT_MS=30000 ML_ENGINE_RETRY_ATTEMPTS=3 ML_ENGINE_RETRY_DELAY_MS=1000 ``` --- ## 8. Manejo de Errores ### 8.1 Retry Logic ```typescript async function withRetry( fn: () => Promise, maxRetries: number = 3, delayMs: number = 1000 ): Promise { let lastError: Error; for (let i = 0; i < maxRetries; i++) { try { return await fn(); } catch (error: any) { lastError = error; if (i < maxRetries - 1) { await new Promise((resolve) => setTimeout(resolve, delayMs * (i + 1))); } } } throw lastError!; } // Uso await withRetry(() => mlEngineService.notifyDeposit(data)); ``` ### 8.2 Circuit Breaker ```typescript class CircuitBreaker { private failures = 0; private maxFailures = 5; private resetTimeout = 60000; // 1 minuto private state: 'closed' | 'open' | 'half-open' = 'closed'; async execute(fn: () => Promise): Promise { if (this.state === 'open') { throw new Error('Circuit breaker is open'); } try { const result = await fn(); this.onSuccess(); return result; } catch (error) { this.onFailure(); throw error; } } private onSuccess(): void { this.failures = 0; this.state = 'closed'; } private onFailure(): void { this.failures++; if (this.failures >= this.maxFailures) { this.state = 'open'; setTimeout(() => { this.state = 'half-open'; }, this.resetTimeout); } } } ``` --- ## 9. Testing ### 9.1 Mocking ML Engine ```typescript // tests/mocks/ml-engine.mock.ts import nock from 'nock'; export class MLEngineMock { private baseURL: string; constructor() { this.baseURL = process.env.ML_ENGINE_URL || 'http://localhost:8000'; } mockCreateAccount(accountId: string, success: boolean = true): void { const scope = nock(this.baseURL) .post('/api/v1/trading/accounts') .reply(success ? 201 : 500, success ? { account_id: accountId } : { error: 'Failed' }); } mockNotifyDeposit(accountId: string, success: boolean = true): void { nock(this.baseURL) .post(`/api/v1/trading/accounts/${accountId}/capital`) .reply(success ? 200 : 500); } mockGetPerformance(accountId: string, data: any[]): void { nock(this.baseURL) .get(`/api/v1/trading/accounts/${accountId}/performance`) .reply(200, { performance: data }); } clear(): void { nock.cleanAll(); } } ``` --- ## 10. Monitoreo ### 10.1 Health Check ```typescript // src/jobs/ml-engine-health.job.ts import { CronJob } from 'cron'; import { MLEngineService } from '../services/ml-engine/ml-engine.service'; import { logger } from '../utils/logger'; export class MLEngineHealthJob { private mlEngineService: MLEngineService; private job: CronJob; constructor() { this.mlEngineService = new MLEngineService(); // Cada 5 minutos this.job = new CronJob('*/5 * * * *', () => this.run()); } async run(): Promise { const isHealthy = await this.mlEngineService.healthCheck(); if (!isHealthy) { logger.error('ML Engine health check failed'); // Enviar alerta } else { logger.debug('ML Engine health check passed'); } } start(): void { this.job.start(); } } ``` --- ## 11. Referencias - FastAPI Webhooks Documentation - Axios Interceptors - Circuit Breaker Pattern - Event-Driven Architecture