933 lines
30 KiB
Markdown
933 lines
30 KiB
Markdown
# ET-ML-005: Integración con Backend
|
|
|
|
## Metadata
|
|
|
|
| Campo | Valor |
|
|
|-------|-------|
|
|
| **ID** | ET-ML-005 |
|
|
| **Épica** | OQI-006 - Señales ML |
|
|
| **Tipo** | Especificación Técnica |
|
|
| **Versión** | 1.0.0 |
|
|
| **Estado** | Pendiente |
|
|
| **Última actualización** | 2025-12-05 |
|
|
|
|
---
|
|
|
|
## Propósito
|
|
|
|
Especificar la integración entre el ML Engine (Python/FastAPI) y el Backend principal (Express.js), incluyendo comunicación HTTP, eventos en tiempo real, y sincronización de datos.
|
|
|
|
---
|
|
|
|
## Arquitectura de Integración
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ FRONTEND (React) │
|
|
│ │
|
|
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
|
|
│ │ Charts │ │ Signals │ │ Dashboard │ │
|
|
│ │ Component │ │ Display │ │ Component │ │
|
|
│ └──────┬───────┘ └──────┬───────┘ └───────────┬──────────────┘ │
|
|
│ │ │ │ │
|
|
└──────────┼────────────────────┼─────────────────────────┼───────────────────┘
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌──────────────────────────────────────────────────────────────────────────────┐
|
|
│ BACKEND (Express.js) │
|
|
│ │
|
|
│ ┌─────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ API Gateway Layer │ │
|
|
│ │ ┌──────────────┐ ┌──────────────┐ ┌────────────────────────┐ │ │
|
|
│ │ │ /ml/signals │ │ /ml/predict │ │ /ml/indicators │ │ │
|
|
│ │ └──────┬───────┘ └──────┬───────┘ └───────────┬────────────┘ │ │
|
|
│ └──────────┼──────────────────┼───────────────────────┼───────────────┘ │
|
|
│ │ │ │ │
|
|
│ ┌──────────▼──────────────────▼───────────────────────▼───────────────┐ │
|
|
│ │ ML Integration Service │ │
|
|
│ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────────────────┐│ │
|
|
│ │ │ ML Client │ │ Rate Limiter │ │ Response Transformer ││ │
|
|
│ │ │ (HTTP) │ │ (per user) │ │ ││ │
|
|
│ │ └──────┬──────┘ └──────────────┘ └──────────────────────────┘│ │
|
|
│ └──────────┼───────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ┌──────────▼───────────────────────────────────────────────────────────┐ │
|
|
│ │ Event Bus (Redis Pub/Sub) │ │
|
|
│ │ │ │
|
|
│ │ Channels: signals:BTCUSDT, signals:ETHUSDT, predictions:* │ │
|
|
│ └───────────────────────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└───────────────────────────────────────────────────────────────────────────────┘
|
|
│ │
|
|
▼ ▼
|
|
┌──────────────────────────────────────────────────────────────────────────────┐
|
|
│ ML ENGINE (FastAPI) │
|
|
│ │
|
|
│ ┌─────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ REST API │ │
|
|
│ │ POST /predictions POST /signals GET /indicators │ │
|
|
│ └─────────────────────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
│ ┌─────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ Signal Publisher (Background) │ │
|
|
│ │ Publishes to Redis Pub/Sub │ │
|
|
│ └─────────────────────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└───────────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## ML Client (Backend)
|
|
|
|
### HTTP Client Service
|
|
|
|
```typescript
|
|
// src/services/ml/ml-client.service.ts
|
|
import axios, { AxiosInstance, AxiosError } from 'axios';
|
|
import { Injectable } from '@nestjs/common';
|
|
import { ConfigService } from '@nestjs/config';
|
|
import { RedisService } from '../redis/redis.service';
|
|
|
|
interface PredictionRequest {
|
|
symbol: string;
|
|
horizon: number;
|
|
}
|
|
|
|
interface SignalRequest {
|
|
symbol: string;
|
|
horizon: number;
|
|
includeRange?: boolean;
|
|
includeTpsl?: boolean;
|
|
}
|
|
|
|
interface MLResponse<T> {
|
|
success: boolean;
|
|
data: T;
|
|
metadata: {
|
|
requestId: string;
|
|
latencyMs: number;
|
|
cached: boolean;
|
|
};
|
|
}
|
|
|
|
@Injectable()
|
|
export class MLClientService {
|
|
private client: AxiosInstance;
|
|
private readonly cachePrefix = 'ml:cache:';
|
|
private readonly defaultCacheTTL = 30; // seconds
|
|
|
|
constructor(
|
|
private config: ConfigService,
|
|
private redis: RedisService,
|
|
) {
|
|
this.client = axios.create({
|
|
baseURL: this.config.get('ML_ENGINE_URL'),
|
|
timeout: 30000,
|
|
headers: {
|
|
'X-API-Key': this.config.get('ML_API_KEY'),
|
|
'Content-Type': 'application/json',
|
|
},
|
|
});
|
|
|
|
// Add response interceptor for logging
|
|
this.client.interceptors.response.use(
|
|
(response) => {
|
|
console.log(`ML API: ${response.config.url} - ${response.status} - ${response.data?.metadata?.latencyMs}ms`);
|
|
return response;
|
|
},
|
|
(error: AxiosError) => {
|
|
console.error(`ML API Error: ${error.config?.url} - ${error.response?.status}`);
|
|
throw error;
|
|
}
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Get price prediction with caching
|
|
*/
|
|
async getPrediction(request: PredictionRequest): Promise<MLResponse<PredictionData>> {
|
|
const cacheKey = `${this.cachePrefix}prediction:${request.symbol}:${request.horizon}`;
|
|
|
|
// Check cache first
|
|
const cached = await this.redis.get(cacheKey);
|
|
if (cached) {
|
|
return {
|
|
success: true,
|
|
data: JSON.parse(cached),
|
|
metadata: { requestId: 'cached', latencyMs: 0, cached: true }
|
|
};
|
|
}
|
|
|
|
// Call ML Engine
|
|
const response = await this.client.post<MLResponse<PredictionData>>(
|
|
'/predictions',
|
|
{
|
|
symbol: request.symbol,
|
|
horizon: request.horizon,
|
|
}
|
|
);
|
|
|
|
// Cache response
|
|
await this.redis.setex(
|
|
cacheKey,
|
|
this.defaultCacheTTL,
|
|
JSON.stringify(response.data.data)
|
|
);
|
|
|
|
return response.data;
|
|
}
|
|
|
|
/**
|
|
* Get trading signal
|
|
*/
|
|
async getSignal(request: SignalRequest): Promise<MLResponse<SignalData>> {
|
|
const response = await this.client.post<MLResponse<SignalData>>(
|
|
'/signals',
|
|
{
|
|
symbol: request.symbol,
|
|
horizon: request.horizon,
|
|
include_range: request.includeRange ?? true,
|
|
include_tpsl: request.includeTpsl ?? true,
|
|
}
|
|
);
|
|
|
|
return response.data;
|
|
}
|
|
|
|
/**
|
|
* Get technical indicators
|
|
*/
|
|
async getIndicators(symbol: string): Promise<MLResponse<IndicatorData>> {
|
|
const cacheKey = `${this.cachePrefix}indicators:${symbol}`;
|
|
|
|
const cached = await this.redis.get(cacheKey);
|
|
if (cached) {
|
|
return {
|
|
success: true,
|
|
data: JSON.parse(cached),
|
|
metadata: { requestId: 'cached', latencyMs: 0, cached: true }
|
|
};
|
|
}
|
|
|
|
const response = await this.client.get<MLResponse<IndicatorData>>(
|
|
`/indicators?symbol=${symbol}`
|
|
);
|
|
|
|
await this.redis.setex(cacheKey, 10, JSON.stringify(response.data.data));
|
|
|
|
return response.data;
|
|
}
|
|
|
|
/**
|
|
* Get signal history
|
|
*/
|
|
async getSignalHistory(params: SignalHistoryParams): Promise<SignalHistoryResponse> {
|
|
const queryParams = new URLSearchParams({
|
|
symbol: params.symbol,
|
|
...(params.horizon && { horizon: params.horizon.toString() }),
|
|
...(params.type && { type: params.type }),
|
|
limit: (params.limit || 50).toString(),
|
|
offset: (params.offset || 0).toString(),
|
|
});
|
|
|
|
const response = await this.client.get<SignalHistoryResponse>(
|
|
`/signals/history?${queryParams}`
|
|
);
|
|
|
|
return response.data;
|
|
}
|
|
|
|
/**
|
|
* Health check
|
|
*/
|
|
async healthCheck(): Promise<boolean> {
|
|
try {
|
|
const response = await this.client.get('/health', { timeout: 5000 });
|
|
return response.data.status === 'healthy';
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Rate Limiting por Usuario
|
|
|
|
```typescript
|
|
// src/services/ml/ml-rate-limiter.service.ts
|
|
import { Injectable, ForbiddenException } from '@nestjs/common';
|
|
import { RedisService } from '../redis/redis.service';
|
|
|
|
interface RateLimitConfig {
|
|
free: number;
|
|
basic: number;
|
|
pro: number;
|
|
premium: number;
|
|
}
|
|
|
|
@Injectable()
|
|
export class MLRateLimiterService {
|
|
private readonly limits: Record<string, RateLimitConfig> = {
|
|
predictions: { free: 3, basic: 10, pro: 100, premium: 1000 },
|
|
signals: { free: 3, basic: 10, pro: 100, premium: 1000 },
|
|
indicators: { free: 10, basic: 50, pro: 500, premium: 5000 },
|
|
};
|
|
|
|
private readonly windowSeconds = 86400; // 24 hours
|
|
|
|
constructor(private redis: RedisService) {}
|
|
|
|
/**
|
|
* Check if user can make request
|
|
*/
|
|
async checkLimit(
|
|
userId: string,
|
|
userPlan: string,
|
|
endpoint: 'predictions' | 'signals' | 'indicators'
|
|
): Promise<{ allowed: boolean; remaining: number; resetAt: Date }> {
|
|
const key = `ratelimit:ml:${endpoint}:${userId}`;
|
|
const limit = this.limits[endpoint][userPlan] || this.limits[endpoint].free;
|
|
|
|
const current = await this.redis.incr(key);
|
|
|
|
if (current === 1) {
|
|
await this.redis.expire(key, this.windowSeconds);
|
|
}
|
|
|
|
const ttl = await this.redis.ttl(key);
|
|
const resetAt = new Date(Date.now() + ttl * 1000);
|
|
|
|
return {
|
|
allowed: current <= limit,
|
|
remaining: Math.max(0, limit - current),
|
|
resetAt,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Enforce rate limit (throws if exceeded)
|
|
*/
|
|
async enforce(
|
|
userId: string,
|
|
userPlan: string,
|
|
endpoint: 'predictions' | 'signals' | 'indicators'
|
|
): Promise<void> {
|
|
const { allowed, remaining, resetAt } = await this.checkLimit(userId, userPlan, endpoint);
|
|
|
|
if (!allowed) {
|
|
throw new ForbiddenException({
|
|
message: 'Rate limit exceeded',
|
|
limit: this.limits[endpoint][userPlan],
|
|
remaining: 0,
|
|
resetAt: resetAt.toISOString(),
|
|
upgradeUrl: '/subscription/upgrade',
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get current usage stats
|
|
*/
|
|
async getUsage(userId: string, userPlan: string): Promise<MLUsageStats> {
|
|
const endpoints = ['predictions', 'signals', 'indicators'] as const;
|
|
const usage: Record<string, { used: number; limit: number; remaining: number }> = {};
|
|
|
|
for (const endpoint of endpoints) {
|
|
const key = `ratelimit:ml:${endpoint}:${userId}`;
|
|
const current = parseInt(await this.redis.get(key) || '0');
|
|
const limit = this.limits[endpoint][userPlan] || this.limits[endpoint].free;
|
|
|
|
usage[endpoint] = {
|
|
used: current,
|
|
limit,
|
|
remaining: Math.max(0, limit - current),
|
|
};
|
|
}
|
|
|
|
return usage;
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## API Gateway (Express Routes)
|
|
|
|
```typescript
|
|
// src/routes/ml.routes.ts
|
|
import { Router } from 'express';
|
|
import { authenticate } from '../middleware/auth';
|
|
import { MLController } from '../controllers/ml.controller';
|
|
|
|
const router = Router();
|
|
const controller = new MLController();
|
|
|
|
// All routes require authentication
|
|
router.use(authenticate);
|
|
|
|
/**
|
|
* POST /api/ml/predictions
|
|
* Get price prediction
|
|
*/
|
|
router.post('/predictions', controller.getPrediction);
|
|
|
|
/**
|
|
* POST /api/ml/signals
|
|
* Get trading signal
|
|
*/
|
|
router.post('/signals', controller.getSignal);
|
|
|
|
/**
|
|
* GET /api/ml/signals/history
|
|
* Get signal history
|
|
*/
|
|
router.get('/signals/history', controller.getSignalHistory);
|
|
|
|
/**
|
|
* GET /api/ml/indicators/:symbol
|
|
* Get technical indicators
|
|
*/
|
|
router.get('/indicators/:symbol', controller.getIndicators);
|
|
|
|
/**
|
|
* GET /api/ml/usage
|
|
* Get user's ML API usage stats
|
|
*/
|
|
router.get('/usage', controller.getUsage);
|
|
|
|
export default router;
|
|
```
|
|
|
|
```typescript
|
|
// src/controllers/ml.controller.ts
|
|
import { Request, Response, NextFunction } from 'express';
|
|
import { MLClientService } from '../services/ml/ml-client.service';
|
|
import { MLRateLimiterService } from '../services/ml/ml-rate-limiter.service';
|
|
|
|
export class MLController {
|
|
private mlClient: MLClientService;
|
|
private rateLimiter: MLRateLimiterService;
|
|
|
|
constructor() {
|
|
this.mlClient = new MLClientService();
|
|
this.rateLimiter = new MLRateLimiterService();
|
|
}
|
|
|
|
/**
|
|
* Get price prediction
|
|
*/
|
|
getPrediction = async (req: Request, res: Response, next: NextFunction) => {
|
|
try {
|
|
const { userId, plan } = req.user;
|
|
const { symbol, horizon } = req.body;
|
|
|
|
// Validate input
|
|
if (!symbol || !horizon) {
|
|
return res.status(400).json({
|
|
success: false,
|
|
error: 'symbol and horizon are required',
|
|
});
|
|
}
|
|
|
|
// Check rate limit
|
|
await this.rateLimiter.enforce(userId, plan, 'predictions');
|
|
|
|
// Get prediction from ML Engine
|
|
const prediction = await this.mlClient.getPrediction({ symbol, horizon });
|
|
|
|
// Transform response for frontend
|
|
res.json({
|
|
success: true,
|
|
data: this.transformPrediction(prediction.data),
|
|
metadata: prediction.metadata,
|
|
});
|
|
} catch (error) {
|
|
next(error);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Get trading signal
|
|
*/
|
|
getSignal = async (req: Request, res: Response, next: NextFunction) => {
|
|
try {
|
|
const { userId, plan } = req.user;
|
|
const { symbol, horizon, includeRange, includeTpsl } = req.body;
|
|
|
|
await this.rateLimiter.enforce(userId, plan, 'signals');
|
|
|
|
const signal = await this.mlClient.getSignal({
|
|
symbol,
|
|
horizon,
|
|
includeRange,
|
|
includeTpsl,
|
|
});
|
|
|
|
// Log signal for analytics
|
|
await this.logSignal(userId, signal.data);
|
|
|
|
res.json({
|
|
success: true,
|
|
data: this.transformSignal(signal.data),
|
|
metadata: signal.metadata,
|
|
});
|
|
} catch (error) {
|
|
next(error);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Get technical indicators
|
|
*/
|
|
getIndicators = async (req: Request, res: Response, next: NextFunction) => {
|
|
try {
|
|
const { userId, plan } = req.user;
|
|
const { symbol } = req.params;
|
|
|
|
await this.rateLimiter.enforce(userId, plan, 'indicators');
|
|
|
|
const indicators = await this.mlClient.getIndicators(symbol);
|
|
|
|
res.json({
|
|
success: true,
|
|
data: indicators.data,
|
|
metadata: indicators.metadata,
|
|
});
|
|
} catch (error) {
|
|
next(error);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Get signal history
|
|
*/
|
|
getSignalHistory = async (req: Request, res: Response, next: NextFunction) => {
|
|
try {
|
|
const { symbol, horizon, type, limit, offset } = req.query;
|
|
|
|
const history = await this.mlClient.getSignalHistory({
|
|
symbol: symbol as string,
|
|
horizon: horizon ? parseInt(horizon as string) : undefined,
|
|
type: type as string,
|
|
limit: limit ? parseInt(limit as string) : 50,
|
|
offset: offset ? parseInt(offset as string) : 0,
|
|
});
|
|
|
|
res.json(history);
|
|
} catch (error) {
|
|
next(error);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Get ML API usage stats
|
|
*/
|
|
getUsage = async (req: Request, res: Response, next: NextFunction) => {
|
|
try {
|
|
const { userId, plan } = req.user;
|
|
const usage = await this.rateLimiter.getUsage(userId, plan);
|
|
|
|
res.json({
|
|
success: true,
|
|
data: {
|
|
plan,
|
|
usage,
|
|
resetAt: this.getNextResetTime(),
|
|
},
|
|
});
|
|
} catch (error) {
|
|
next(error);
|
|
}
|
|
};
|
|
|
|
private transformPrediction(data: any) {
|
|
return {
|
|
...data,
|
|
formattedPredictedHigh: `$${data.predicted_high.toLocaleString()}`,
|
|
formattedPredictedLow: `$${data.predicted_low.toLocaleString()}`,
|
|
};
|
|
}
|
|
|
|
private transformSignal(data: any) {
|
|
return {
|
|
...data,
|
|
signalEmoji: this.getSignalEmoji(data.signal.type),
|
|
confidenceLabel: this.getConfidenceLabel(data.signal.confidence),
|
|
};
|
|
}
|
|
|
|
private getSignalEmoji(type: string): string {
|
|
const emojis = { buy: '📈', sell: '📉', hold: '⏸️' };
|
|
return emojis[type] || '❓';
|
|
}
|
|
|
|
private getConfidenceLabel(confidence: number): string {
|
|
if (confidence >= 0.8) return 'Very High';
|
|
if (confidence >= 0.65) return 'High';
|
|
if (confidence >= 0.5) return 'Medium';
|
|
return 'Low';
|
|
}
|
|
|
|
private async logSignal(userId: string, signal: any) {
|
|
// Log to database for analytics
|
|
}
|
|
|
|
private getNextResetTime(): string {
|
|
const tomorrow = new Date();
|
|
tomorrow.setUTCHours(0, 0, 0, 0);
|
|
tomorrow.setDate(tomorrow.getDate() + 1);
|
|
return tomorrow.toISOString();
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Eventos en Tiempo Real (Redis Pub/Sub)
|
|
|
|
### Publisher (ML Engine)
|
|
|
|
```python
|
|
# app/tasks/signal_publisher.py
|
|
import asyncio
|
|
import json
|
|
from redis import asyncio as aioredis
|
|
from datetime import datetime
|
|
from app.services.signal_generator import SignalGeneratorService
|
|
from app.config.settings import settings
|
|
|
|
class SignalPublisher:
|
|
"""Publishes signals to Redis for real-time distribution"""
|
|
|
|
def __init__(self):
|
|
self.redis = None
|
|
self.signal_gen = SignalGeneratorService()
|
|
self.symbols = settings.SUPPORTED_SYMBOLS
|
|
self.horizons = [6, 18] # Only short-term for real-time
|
|
self.interval = 60 # seconds between signal generation
|
|
|
|
async def start(self):
|
|
"""Start the signal publisher background task"""
|
|
self.redis = await aioredis.from_url(settings.REDIS_URL)
|
|
|
|
while True:
|
|
try:
|
|
await self.publish_signals()
|
|
except Exception as e:
|
|
print(f"Error publishing signals: {e}")
|
|
|
|
await asyncio.sleep(self.interval)
|
|
|
|
async def publish_signals(self):
|
|
"""Generate and publish signals for all symbols"""
|
|
for symbol in self.symbols:
|
|
for horizon in self.horizons:
|
|
try:
|
|
signal = await self.signal_gen.generate(
|
|
symbol=symbol,
|
|
horizon=horizon,
|
|
include_range=True,
|
|
include_tpsl=True
|
|
)
|
|
|
|
# Publish to Redis channel
|
|
channel = f"signals:{symbol}"
|
|
message = json.dumps({
|
|
'type': 'signal',
|
|
'data': signal.dict(),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
|
|
await self.redis.publish(channel, message)
|
|
|
|
except Exception as e:
|
|
print(f"Error generating signal for {symbol}: {e}")
|
|
|
|
async def stop(self):
|
|
"""Stop and cleanup"""
|
|
if self.redis:
|
|
await self.redis.close()
|
|
```
|
|
|
|
### Subscriber (Backend)
|
|
|
|
```typescript
|
|
// src/services/ml/signal-subscriber.service.ts
|
|
import Redis from 'ioredis';
|
|
import { Server as SocketServer } from 'socket.io';
|
|
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
|
|
|
@Injectable()
|
|
export class SignalSubscriberService implements OnModuleInit, OnModuleDestroy {
|
|
private subscriber: Redis;
|
|
private io: SocketServer;
|
|
private channels: string[] = ['signals:BTCUSDT', 'signals:ETHUSDT'];
|
|
|
|
constructor() {
|
|
this.subscriber = new Redis(process.env.REDIS_URL);
|
|
}
|
|
|
|
async onModuleInit() {
|
|
await this.subscribe();
|
|
}
|
|
|
|
async onModuleDestroy() {
|
|
await this.subscriber.quit();
|
|
}
|
|
|
|
setSocketServer(io: SocketServer) {
|
|
this.io = io;
|
|
}
|
|
|
|
private async subscribe() {
|
|
for (const channel of this.channels) {
|
|
await this.subscriber.subscribe(channel);
|
|
}
|
|
|
|
this.subscriber.on('message', (channel: string, message: string) => {
|
|
this.handleMessage(channel, message);
|
|
});
|
|
|
|
console.log(`Subscribed to ML signal channels: ${this.channels.join(', ')}`);
|
|
}
|
|
|
|
private handleMessage(channel: string, message: string) {
|
|
try {
|
|
const data = JSON.parse(message);
|
|
const symbol = channel.split(':')[1];
|
|
|
|
// Broadcast to connected clients subscribed to this symbol
|
|
this.io?.to(`signals:${symbol}`).emit('signal', {
|
|
symbol,
|
|
...data,
|
|
});
|
|
|
|
// Log for monitoring
|
|
console.log(`Signal received: ${symbol} - ${data.data?.signal?.type}`);
|
|
} catch (error) {
|
|
console.error('Error handling ML signal:', error);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### WebSocket Integration
|
|
|
|
```typescript
|
|
// src/websocket/signals.gateway.ts
|
|
import {
|
|
WebSocketGateway,
|
|
WebSocketServer,
|
|
SubscribeMessage,
|
|
OnGatewayConnection,
|
|
OnGatewayDisconnect,
|
|
} from '@nestjs/websockets';
|
|
import { Server, Socket } from 'socket.io';
|
|
import { SignalSubscriberService } from '../services/ml/signal-subscriber.service';
|
|
|
|
@WebSocketGateway({ namespace: '/signals' })
|
|
export class SignalsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
|
@WebSocketServer()
|
|
server: Server;
|
|
|
|
constructor(private signalSubscriber: SignalSubscriberService) {}
|
|
|
|
afterInit(server: Server) {
|
|
this.signalSubscriber.setSocketServer(server);
|
|
}
|
|
|
|
handleConnection(client: Socket) {
|
|
console.log(`Client connected to signals: ${client.id}`);
|
|
}
|
|
|
|
handleDisconnect(client: Socket) {
|
|
console.log(`Client disconnected from signals: ${client.id}`);
|
|
}
|
|
|
|
@SubscribeMessage('subscribe')
|
|
handleSubscribe(client: Socket, symbol: string) {
|
|
client.join(`signals:${symbol}`);
|
|
client.emit('subscribed', { symbol });
|
|
console.log(`Client ${client.id} subscribed to ${symbol}`);
|
|
}
|
|
|
|
@SubscribeMessage('unsubscribe')
|
|
handleUnsubscribe(client: Socket, symbol: string) {
|
|
client.leave(`signals:${symbol}`);
|
|
client.emit('unsubscribed', { symbol });
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Frontend Integration
|
|
|
|
```typescript
|
|
// src/hooks/useMLSignals.ts
|
|
import { useEffect, useState, useCallback } from 'react';
|
|
import { io, Socket } from 'socket.io-client';
|
|
import { useAuth } from './useAuth';
|
|
|
|
interface MLSignal {
|
|
symbol: string;
|
|
horizon: number;
|
|
signal: {
|
|
type: 'buy' | 'sell' | 'hold';
|
|
confidence: number;
|
|
};
|
|
priceRange?: {
|
|
current: number;
|
|
predictedHigh: number;
|
|
predictedLow: number;
|
|
};
|
|
timestamp: string;
|
|
}
|
|
|
|
export function useMLSignals(symbol: string) {
|
|
const { token } = useAuth();
|
|
const [signal, setSignal] = useState<MLSignal | null>(null);
|
|
const [connected, setConnected] = useState(false);
|
|
const [socket, setSocket] = useState<Socket | null>(null);
|
|
|
|
useEffect(() => {
|
|
const newSocket = io('/signals', {
|
|
auth: { token },
|
|
transports: ['websocket'],
|
|
});
|
|
|
|
newSocket.on('connect', () => {
|
|
setConnected(true);
|
|
newSocket.emit('subscribe', symbol);
|
|
});
|
|
|
|
newSocket.on('disconnect', () => {
|
|
setConnected(false);
|
|
});
|
|
|
|
newSocket.on('signal', (data: MLSignal) => {
|
|
if (data.symbol === symbol) {
|
|
setSignal(data);
|
|
}
|
|
});
|
|
|
|
setSocket(newSocket);
|
|
|
|
return () => {
|
|
newSocket.emit('unsubscribe', symbol);
|
|
newSocket.disconnect();
|
|
};
|
|
}, [symbol, token]);
|
|
|
|
return { signal, connected };
|
|
}
|
|
|
|
// Usage in component
|
|
function TradingChart({ symbol }) {
|
|
const { signal, connected } = useMLSignals(symbol);
|
|
|
|
return (
|
|
<div>
|
|
<ConnectionStatus connected={connected} />
|
|
{signal && <SignalIndicator signal={signal} />}
|
|
<Chart symbol={symbol} />
|
|
</div>
|
|
);
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Error Handling
|
|
|
|
```typescript
|
|
// src/middleware/ml-error-handler.ts
|
|
import { Request, Response, NextFunction } from 'express';
|
|
import { AxiosError } from 'axios';
|
|
|
|
export function mlErrorHandler(
|
|
error: Error,
|
|
req: Request,
|
|
res: Response,
|
|
next: NextFunction
|
|
) {
|
|
// Handle Axios errors from ML Engine
|
|
if (error instanceof AxiosError) {
|
|
if (error.code === 'ECONNREFUSED') {
|
|
return res.status(503).json({
|
|
success: false,
|
|
error: {
|
|
code: 'ML_ENGINE_UNAVAILABLE',
|
|
message: 'ML Engine is temporarily unavailable',
|
|
},
|
|
});
|
|
}
|
|
|
|
if (error.response) {
|
|
return res.status(error.response.status).json({
|
|
success: false,
|
|
error: error.response.data?.error || {
|
|
code: 'ML_ENGINE_ERROR',
|
|
message: error.message,
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
next(error);
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Health Check Integration
|
|
|
|
```typescript
|
|
// src/services/health/ml-health.service.ts
|
|
import { Injectable } from '@nestjs/common';
|
|
import { MLClientService } from '../ml/ml-client.service';
|
|
|
|
@Injectable()
|
|
export class MLHealthService {
|
|
constructor(private mlClient: MLClientService) {}
|
|
|
|
async check(): Promise<HealthCheckResult> {
|
|
const startTime = Date.now();
|
|
|
|
try {
|
|
const healthy = await this.mlClient.healthCheck();
|
|
const latency = Date.now() - startTime;
|
|
|
|
return {
|
|
name: 'ml-engine',
|
|
status: healthy ? 'healthy' : 'unhealthy',
|
|
latencyMs: latency,
|
|
};
|
|
} catch (error) {
|
|
return {
|
|
name: 'ml-engine',
|
|
status: 'unhealthy',
|
|
error: error.message,
|
|
latencyMs: Date.now() - startTime,
|
|
};
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Referencias
|
|
|
|
- [ET-ML-001: Arquitectura](./ET-ML-001-arquitectura.md)
|
|
- [ET-ML-004: FastAPI Endpoints](./ET-ML-004-api.md)
|
|
- [Socket.io Documentation](https://socket.io/docs/)
|
|
|
|
---
|
|
|
|
**Autor:** Requirements-Analyst
|
|
**Fecha:** 2025-12-05
|