Changes include: - Updated architecture documentation - Enhanced module definitions (OQI-001 to OQI-008) - ML integration documentation updates - Trading strategies documentation - Orchestration and inventory updates - Docker configuration updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
25 KiB
25 KiB
| id | title | type | status | priority | epic | project | version | created_date | updated_date |
|---|---|---|---|---|---|---|---|---|---|
| ET-INV-004 | Integración con Agentes ML | Technical Specification | Done | Alta | OQI-004 | trading-platform | 1.0.0 | 2025-12-05 | 2026-01-04 |
ET-INV-004: Integración con Agentes ML
Epic: OQI-004 Cuentas de Inversión Versión: 1.0 Fecha: 2025-12-05 Responsable: Requirements-Analyst
1. Descripción
Define la integración entre el sistema de cuentas de inversión y el ML Engine (Python FastAPI) que ejecuta los agentes de trading:
- Comunicación bidireccional con ML Engine
- Notificación de depósitos y retiros
- Recepción de trades ejecutados
- Sincronización de balances
- Distribución de utilidades
2. Arquitectura de Integración
┌─────────────────────────────────────────────────────────────────┐
│ Investment ↔ ML Engine Integration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Investment Backend ML Engine (Python FastAPI) │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ │ │ │ │
│ │ Deposit Event │──────►│ Update Capital │ │
│ │ │ │ │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Trading Agent │ │
│ │ (Swing/Day) │ │
│ └──────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ │◄──────│ Trade Executed │ │
│ │ Update Balance │ │ (Webhook) │ │
│ │ │ │ │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Daily Performance│ │
│ │ Calculation │ │
│ └──────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Monthly Profit │ │
│ │ Distribution │ │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
3. ML Engine API Specification
3.1 Endpoints del ML Engine
// ML Engine base URL
const ML_ENGINE_URL = 'http://ml-engine:8000/api/v1';
// Endpoints disponibles
interface MLEngineEndpoints {
// Gestión de cuentas
createAccount: '/trading/accounts',
updateCapital: '/trading/accounts/:id/capital',
pauseAccount: '/trading/accounts/:id/pause',
resumeAccount: '/trading/accounts/:id/resume',
closeAccount: '/trading/accounts/:id/close',
// Consulta de datos
getAccountStatus: '/trading/accounts/:id/status',
getTrades: '/trading/accounts/:id/trades',
getPerformance: '/trading/accounts/:id/performance',
// Configuración de agentes
getAgentConfig: '/agents/:agent_id/config',
updateAgentConfig: '/agents/:agent_id/config',
}
4. Implementación del ML Engine Service
4.1 ML Engine Service Class
// src/services/ml-engine/ml-engine.service.ts
import axios, { AxiosInstance } from 'axios';
import { logger } from '../../utils/logger';
import { AppError } from '../../utils/errors';
export interface CreateMLAccountDto {
account_id: string;
product_id: string;
agent_type: string;
initial_capital: number;
agent_config?: Record<string, any>;
}
export interface UpdateCapitalDto {
account_id: string;
amount: number;
operation: 'deposit' | 'withdrawal';
new_total_capital: number;
}
export interface MLTradeDto {
trade_id: string;
account_id: string;
symbol: string;
side: 'buy' | 'sell';
quantity: number;
entry_price: number;
exit_price?: number;
profit_loss?: number;
status: 'open' | 'closed';
executed_at: string;
}
export interface MLPerformanceDto {
account_id: string;
date: string;
opening_balance: number;
closing_balance: number;
daily_return: number;
trades_executed: number;
winning_trades: number;
losing_trades: number;
}
export class MLEngineService {
private client: AxiosInstance;
private baseURL: string;
private apiKey: string;
constructor() {
this.baseURL = process.env.ML_ENGINE_URL || 'http://localhost:8000/api/v1';
this.apiKey = process.env.ML_ENGINE_API_KEY || '';
this.client = axios.create({
baseURL: this.baseURL,
timeout: 30000,
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.apiKey,
},
});
// Interceptor para logging
this.client.interceptors.request.use((config) => {
logger.info('ML Engine request', {
method: config.method,
url: config.url,
data: config.data,
});
return config;
});
this.client.interceptors.response.use(
(response) => response,
(error) => {
logger.error('ML Engine error', {
url: error.config?.url,
status: error.response?.status,
data: error.response?.data,
});
return Promise.reject(error);
}
);
}
/**
* Crea una nueva cuenta en ML Engine
*/
async createAccount(data: CreateMLAccountDto): Promise<void> {
try {
await this.client.post('/trading/accounts', {
account_id: data.account_id,
product_id: data.product_id,
agent_type: data.agent_type,
initial_capital: data.initial_capital,
agent_config: data.agent_config || {},
});
logger.info('ML account created', { account_id: data.account_id });
} catch (error: any) {
logger.error('Failed to create ML account', {
error: error.message,
account_id: data.account_id,
});
throw new AppError('Failed to initialize trading account', 500);
}
}
/**
* Notifica depósito al ML Engine
*/
async notifyDeposit(data: {
account_id: string;
product_id: string;
amount: number;
new_balance: number;
}): Promise<void> {
try {
await this.client.post(`/trading/accounts/${data.account_id}/capital`, {
operation: 'deposit',
amount: data.amount,
new_total_capital: data.new_balance,
});
logger.info('Deposit notified to ML Engine', {
account_id: data.account_id,
amount: data.amount,
});
} catch (error: any) {
logger.error('Failed to notify deposit', {
error: error.message,
account_id: data.account_id,
});
// No lanzar error, el depósito ya se procesó en nuestra DB
}
}
/**
* Notifica retiro al ML Engine
*/
async notifyWithdrawal(data: {
account_id: string;
amount: number;
new_balance: number;
}): Promise<void> {
try {
await this.client.post(`/trading/accounts/${data.account_id}/capital`, {
operation: 'withdrawal',
amount: data.amount,
new_total_capital: data.new_balance,
});
logger.info('Withdrawal notified to ML Engine', {
account_id: data.account_id,
amount: data.amount,
});
} catch (error: any) {
logger.error('Failed to notify withdrawal', {
error: error.message,
account_id: data.account_id,
});
}
}
/**
* Pausa trading en una cuenta
*/
async pauseAccount(accountId: string): Promise<void> {
try {
await this.client.post(`/trading/accounts/${accountId}/pause`);
logger.info('Account paused in ML Engine', { account_id: accountId });
} catch (error: any) {
logger.error('Failed to pause account', {
error: error.message,
account_id: accountId,
});
throw new AppError('Failed to pause trading', 500);
}
}
/**
* Reanuda trading en una cuenta
*/
async resumeAccount(accountId: string): Promise<void> {
try {
await this.client.post(`/trading/accounts/${accountId}/resume`);
logger.info('Account resumed in ML Engine', { account_id: accountId });
} catch (error: any) {
logger.error('Failed to resume account', {
error: error.message,
account_id: accountId,
});
throw new AppError('Failed to resume trading', 500);
}
}
/**
* Cierra una cuenta en ML Engine
*/
async closeAccount(accountId: string): Promise<void> {
try {
await this.client.post(`/trading/accounts/${accountId}/close`);
logger.info('Account closed in ML Engine', { account_id: accountId });
} catch (error: any) {
logger.error('Failed to close account', {
error: error.message,
account_id: accountId,
});
throw new AppError('Failed to close trading account', 500);
}
}
/**
* Obtiene el estado actual de una cuenta
*/
async getAccountStatus(accountId: string): Promise<any> {
try {
const response = await this.client.get(
`/trading/accounts/${accountId}/status`
);
return response.data;
} catch (error: any) {
logger.error('Failed to get account status', {
error: error.message,
account_id: accountId,
});
throw new AppError('Failed to get trading status', 500);
}
}
/**
* Obtiene trades ejecutados
*/
async getTrades(accountId: string, filters?: {
start_date?: string;
end_date?: string;
status?: string;
}): Promise<MLTradeDto[]> {
try {
const response = await this.client.get(
`/trading/accounts/${accountId}/trades`,
{ params: filters }
);
return response.data.trades;
} catch (error: any) {
logger.error('Failed to get trades', {
error: error.message,
account_id: accountId,
});
return [];
}
}
/**
* Obtiene performance histórica
*/
async getPerformance(accountId: string, filters?: {
start_date?: string;
end_date?: string;
}): Promise<MLPerformanceDto[]> {
try {
const response = await this.client.get(
`/trading/accounts/${accountId}/performance`,
{ params: filters }
);
return response.data.performance;
} catch (error: any) {
logger.error('Failed to get performance', {
error: error.message,
account_id: accountId,
});
return [];
}
}
/**
* Verifica health del ML Engine
*/
async healthCheck(): Promise<boolean> {
try {
const response = await this.client.get('/health');
return response.status === 200;
} catch (error) {
return false;
}
}
}
5. Webhook Handler para Trades
5.1 ML Engine Webhook Service
// src/services/ml-engine/ml-webhook.service.ts
import { Request } from 'express';
import crypto from 'crypto';
import { InvestmentRepository } from '../../modules/investment/investment.repository';
import { logger } from '../../utils/logger';
import { MLTradeDto } from './ml-engine.service';
export class MLWebhookService {
private investmentRepo: InvestmentRepository;
private webhookSecret: string;
constructor() {
this.investmentRepo = new InvestmentRepository();
this.webhookSecret = process.env.ML_ENGINE_WEBHOOK_SECRET || '';
}
/**
* Procesa webhook del ML Engine
*/
async handleWebhook(req: Request): Promise<void> {
// Verificar firma
this.verifySignature(req);
const { event_type, data } = req.body;
switch (event_type) {
case 'trade.executed':
await this.handleTradeExecuted(data);
break;
case 'daily.performance':
await this.handleDailyPerformance(data);
break;
case 'account.balance_updated':
await this.handleBalanceUpdated(data);
break;
case 'account.error':
await this.handleAccountError(data);
break;
default:
logger.warn('Unhandled ML webhook event', { event_type });
}
}
/**
* Verifica firma HMAC del webhook
*/
private verifySignature(req: Request): void {
const signature = req.headers['x-ml-signature'] as string;
if (!signature) {
throw new Error('Missing webhook signature');
}
const payload = JSON.stringify(req.body);
const expectedSignature = crypto
.createHmac('sha256', this.webhookSecret)
.update(payload)
.digest('hex');
if (signature !== expectedSignature) {
throw new Error('Invalid webhook signature');
}
}
/**
* Maneja trade ejecutado
*/
private async handleTradeExecuted(trade: MLTradeDto): Promise<void> {
logger.info('Trade executed webhook received', {
trade_id: trade.trade_id,
account_id: trade.account_id,
symbol: trade.symbol,
profit_loss: trade.profit_loss,
});
try {
const account = await this.investmentRepo.getAccountById(trade.account_id);
if (!account) {
logger.error('Account not found for trade', {
account_id: trade.account_id,
});
return;
}
// Si el trade está cerrado y tiene P&L, actualizar balance
if (trade.status === 'closed' && trade.profit_loss !== undefined) {
const newBalance = account.current_balance + trade.profit_loss;
await this.investmentRepo.updateAccount(account.id, {
current_balance: newBalance,
});
logger.info('Account balance updated from trade', {
account_id: account.id,
old_balance: account.current_balance,
new_balance: newBalance,
profit_loss: trade.profit_loss,
});
}
// Guardar información del trade (opcional, si quieres histórico)
// await this.investmentRepo.saveTrade(trade);
} catch (error: any) {
logger.error('Error handling trade executed', {
error: error.message,
trade_id: trade.trade_id,
});
}
}
/**
* Maneja performance diaria
*/
private async handleDailyPerformance(data: MLPerformanceDto): Promise<void> {
logger.info('Daily performance webhook received', {
account_id: data.account_id,
date: data.date,
daily_return: data.daily_return,
});
try {
// Guardar en tabla daily_performance
await this.investmentRepo.createDailyPerformance({
account_id: data.account_id,
date: data.date,
opening_balance: data.opening_balance,
closing_balance: data.closing_balance,
daily_return: data.daily_return,
daily_return_percentage:
(data.daily_return / data.opening_balance) * 100,
trades_executed: data.trades_executed,
winning_trades: data.winning_trades,
losing_trades: data.losing_trades,
});
// Actualizar balance actual de la cuenta
await this.investmentRepo.updateAccount(data.account_id, {
current_balance: data.closing_balance,
});
logger.info('Daily performance saved', { account_id: data.account_id });
} catch (error: any) {
logger.error('Error handling daily performance', {
error: error.message,
account_id: data.account_id,
});
}
}
/**
* Maneja actualización de balance
*/
private async handleBalanceUpdated(data: {
account_id: string;
new_balance: number;
reason: string;
}): Promise<void> {
logger.info('Balance updated webhook received', {
account_id: data.account_id,
new_balance: data.new_balance,
reason: data.reason,
});
try {
await this.investmentRepo.updateAccount(data.account_id, {
current_balance: data.new_balance,
});
} catch (error: any) {
logger.error('Error updating balance', {
error: error.message,
account_id: data.account_id,
});
}
}
/**
* Maneja error en cuenta
*/
private async handleAccountError(data: {
account_id: string;
error_type: string;
error_message: string;
}): Promise<void> {
logger.error('ML Engine account error', {
account_id: data.account_id,
error_type: data.error_type,
error_message: data.error_message,
});
try {
// Pausar cuenta si hay error crítico
if (data.error_type === 'critical') {
await this.investmentRepo.updateAccount(data.account_id, {
status: 'paused',
});
// Notificar al usuario
// await notificationService.sendAccountError(data.account_id, data.error_message);
}
} catch (error: any) {
logger.error('Error handling account error', {
error: error.message,
account_id: data.account_id,
});
}
}
}
5.2 Webhook Route
// src/routes/ml-webhooks.routes.ts
import { Router, Request, Response } from 'express';
import { MLWebhookService } from '../services/ml-engine/ml-webhook.service';
import { logger } from '../utils/logger';
const router = Router();
const mlWebhookService = new MLWebhookService();
router.post('/ml-engine', async (req: Request, res: Response) => {
try {
await mlWebhookService.handleWebhook(req);
res.status(200).json({ received: true });
} catch (error: any) {
logger.error('ML webhook processing error', { error: error.message });
res.status(400).send(`Webhook Error: ${error.message}`);
}
});
export default router;
6. Sincronización de Datos
6.1 Sincronización Diaria
// src/jobs/sync-ml-performance.job.ts
import { CronJob } from 'cron';
import { MLEngineService } from '../services/ml-engine/ml-engine.service';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { logger } from '../utils/logger';
export class SyncMLPerformanceJob {
private mlEngineService: MLEngineService;
private investmentRepo: InvestmentRepository;
private job: CronJob;
constructor() {
this.mlEngineService = new MLEngineService();
this.investmentRepo = new InvestmentRepository();
// Ejecutar diariamente a las 00:30 UTC
this.job = new CronJob('30 0 * * *', () => this.run());
}
async run(): Promise<void> {
logger.info('Starting ML performance sync job');
try {
// Obtener todas las cuentas activas
const accounts = await this.investmentRepo.getActiveAccounts();
for (const account of accounts) {
try {
// Obtener performance del día anterior desde ML Engine
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const dateStr = yesterday.toISOString().split('T')[0];
const performance = await this.mlEngineService.getPerformance(
account.id,
{
start_date: dateStr,
end_date: dateStr,
}
);
if (performance.length > 0) {
const dailyPerf = performance[0];
// Guardar en DB si no existe
await this.investmentRepo.upsertDailyPerformance({
account_id: account.id,
date: dailyPerf.date,
opening_balance: dailyPerf.opening_balance,
closing_balance: dailyPerf.closing_balance,
daily_return: dailyPerf.daily_return,
trades_executed: dailyPerf.trades_executed,
winning_trades: dailyPerf.winning_trades,
losing_trades: dailyPerf.losing_trades,
});
// Actualizar balance actual
await this.investmentRepo.updateAccount(account.id, {
current_balance: dailyPerf.closing_balance,
});
}
} catch (error: any) {
logger.error('Error syncing account performance', {
account_id: account.id,
error: error.message,
});
}
}
logger.info('ML performance sync job completed');
} catch (error: any) {
logger.error('ML performance sync job failed', { error: error.message });
}
}
start(): void {
this.job.start();
logger.info('ML performance sync job scheduled');
}
stop(): void {
this.job.stop();
}
}
7. Configuración
7.1 Variables de Entorno
# ML Engine
ML_ENGINE_URL=http://ml-engine:8000/api/v1
ML_ENGINE_API_KEY=ml_engine_secret_key_abc123
ML_ENGINE_WEBHOOK_SECRET=ml_webhook_secret_xyz789
# ML Engine Timeouts
ML_ENGINE_TIMEOUT_MS=30000
ML_ENGINE_RETRY_ATTEMPTS=3
ML_ENGINE_RETRY_DELAY_MS=1000
8. Manejo de Errores
8.1 Retry Logic
async function withRetry<T>(
fn: () => Promise<T>,
maxRetries: number = 3,
delayMs: number = 1000
): Promise<T> {
let lastError: Error;
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (error: any) {
lastError = error;
if (i < maxRetries - 1) {
await new Promise((resolve) => setTimeout(resolve, delayMs * (i + 1)));
}
}
}
throw lastError!;
}
// Uso
await withRetry(() => mlEngineService.notifyDeposit(data));
8.2 Circuit Breaker
class CircuitBreaker {
private failures = 0;
private maxFailures = 5;
private resetTimeout = 60000; // 1 minuto
private state: 'closed' | 'open' | 'half-open' = 'closed';
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
throw new Error('Circuit breaker is open');
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
this.failures = 0;
this.state = 'closed';
}
private onFailure(): void {
this.failures++;
if (this.failures >= this.maxFailures) {
this.state = 'open';
setTimeout(() => {
this.state = 'half-open';
}, this.resetTimeout);
}
}
}
9. Testing
9.1 Mocking ML Engine
// tests/mocks/ml-engine.mock.ts
import nock from 'nock';
export class MLEngineMock {
private baseURL: string;
constructor() {
this.baseURL = process.env.ML_ENGINE_URL || 'http://localhost:8000';
}
mockCreateAccount(accountId: string, success: boolean = true): void {
const scope = nock(this.baseURL)
.post('/api/v1/trading/accounts')
.reply(success ? 201 : 500, success ? { account_id: accountId } : { error: 'Failed' });
}
mockNotifyDeposit(accountId: string, success: boolean = true): void {
nock(this.baseURL)
.post(`/api/v1/trading/accounts/${accountId}/capital`)
.reply(success ? 200 : 500);
}
mockGetPerformance(accountId: string, data: any[]): void {
nock(this.baseURL)
.get(`/api/v1/trading/accounts/${accountId}/performance`)
.reply(200, { performance: data });
}
clear(): void {
nock.cleanAll();
}
}
10. Monitoreo
10.1 Health Check
// src/jobs/ml-engine-health.job.ts
import { CronJob } from 'cron';
import { MLEngineService } from '../services/ml-engine/ml-engine.service';
import { logger } from '../utils/logger';
export class MLEngineHealthJob {
private mlEngineService: MLEngineService;
private job: CronJob;
constructor() {
this.mlEngineService = new MLEngineService();
// Cada 5 minutos
this.job = new CronJob('*/5 * * * *', () => this.run());
}
async run(): Promise<void> {
const isHealthy = await this.mlEngineService.healthCheck();
if (!isHealthy) {
logger.error('ML Engine health check failed');
// Enviar alerta
} else {
logger.debug('ML Engine health check passed');
}
}
start(): void {
this.job.start();
}
}
11. Referencias
- FastAPI Webhooks Documentation
- Axios Interceptors
- Circuit Breaker Pattern
- Event-Driven Architecture