Changes include: - Updated architecture documentation - Enhanced module definitions (OQI-001 to OQI-008) - ML integration documentation updates - Trading strategies documentation - Orchestration and inventory updates - Docker configuration updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
983 lines
29 KiB
Markdown
983 lines
29 KiB
Markdown
---
|
|
id: "ET-TRD-002"
|
|
title: "Especificación Técnica - WebSocket Connections"
|
|
type: "Technical Specification"
|
|
status: "Done"
|
|
priority: "Alta"
|
|
epic: "OQI-003"
|
|
project: "trading-platform"
|
|
version: "1.0.0"
|
|
created_date: "2025-12-05"
|
|
updated_date: "2026-01-04"
|
|
---
|
|
|
|
# ET-TRD-002: Especificación Técnica - WebSocket Connections
|
|
|
|
**Version:** 1.0.0
|
|
**Fecha:** 2025-12-05
|
|
**Estado:** Pendiente
|
|
**Épica:** [OQI-003](../_MAP.md)
|
|
**Requerimiento:** RF-TRD-002
|
|
|
|
---
|
|
|
|
## Resumen
|
|
|
|
Esta especificación detalla la implementación técnica del sistema de conexiones WebSocket bidireccionales para streaming de datos de mercado en tiempo real, incluyendo reconexión automática, gestión de suscripciones y sincronización de estado.
|
|
|
|
---
|
|
|
|
## Arquitectura
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────┐
|
|
│ FRONTEND CLIENTS │
|
|
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
|
|
│ │ Client A │ │ Client B │ │ Client C │ │
|
|
│ │ (Browser) │ │ (Browser) │ │ (Browser) │ │
|
|
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
|
|
│ │ WSS │ WSS │ WSS │
|
|
└───────────┼─────────────────────┼────────────────────┼───────────────────┘
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌─────────────────────────────────────────────────────────────────────────┐
|
|
│ BACKEND WS SERVER │
|
|
│ ┌──────────────────────────────────────────────────────────────────┐ │
|
|
│ │ WebSocket Server (ws) │ │
|
|
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
|
|
│ │ │ Connection Manager │ │ │
|
|
│ │ │ - Client connections map │ │ │
|
|
│ │ │ - Subscription registry │ │ │
|
|
│ │ │ - Heartbeat monitor │ │ │
|
|
│ │ └─────────────────────────────────────────────────────────┘ │ │
|
|
│ └──────────────────────────────────────────────────────────────────┘ │
|
|
│ │ │ │ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
|
|
│ │ Subscription │ │ Message │ │ Reconnection │ │
|
|
│ │ Manager │ │ Broker │ │ Handler │ │
|
|
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
|
|
└────────────────────────────────┬────────────────────────────────────────┘
|
|
│
|
|
│ WSS (Multiplexed)
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────────────────┐
|
|
│ BINANCE WEBSOCKET API │
|
|
│ ┌──────────────────────────────────────────────────────────────────┐ │
|
|
│ │ wss://stream.binance.com:9443/ws │ │
|
|
│ │ - <symbol>@kline_<interval> │ │
|
|
│ │ - <symbol>@ticker │ │
|
|
│ │ - <symbol>@depth │ │
|
|
│ │ - <symbol>@trade │ │
|
|
│ └──────────────────────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Componentes
|
|
|
|
### 1. WebSocket Server (`websocket.server.ts`)
|
|
|
|
**Ubicación:** `apps/backend/src/modules/trading/websocket/websocket.server.ts`
|
|
|
|
```typescript
|
|
import { WebSocketServer, WebSocket } from 'ws';
|
|
import { Server as HTTPServer } from 'http';
|
|
import { IncomingMessage } from 'http';
|
|
import { parse } from 'url';
|
|
import jwt from 'jsonwebtoken';
|
|
|
|
export interface ClientConnection {
|
|
id: string;
|
|
ws: WebSocket;
|
|
userId: string;
|
|
subscriptions: Set<string>;
|
|
lastHeartbeat: number;
|
|
isAlive: boolean;
|
|
}
|
|
|
|
export interface WSMessage {
|
|
type: 'subscribe' | 'unsubscribe' | 'ping' | 'pong';
|
|
channel?: string;
|
|
data?: any;
|
|
}
|
|
|
|
export class TradingWebSocketServer {
|
|
private wss: WebSocketServer;
|
|
private clients: Map<string, ClientConnection>;
|
|
private subscriptions: Map<string, Set<string>>; // channel -> Set<clientId>
|
|
private heartbeatInterval: NodeJS.Timeout;
|
|
|
|
constructor(server: HTTPServer) {
|
|
this.clients = new Map();
|
|
this.subscriptions = new Map();
|
|
|
|
this.wss = new WebSocketServer({
|
|
server,
|
|
path: '/ws/trading',
|
|
verifyClient: this.verifyClient.bind(this),
|
|
});
|
|
|
|
this.initialize();
|
|
}
|
|
|
|
private async verifyClient(
|
|
info: { origin: string; secure: boolean; req: IncomingMessage },
|
|
callback: (verified: boolean, code?: number, message?: string) => void
|
|
): Promise<void> {
|
|
try {
|
|
const { query } = parse(info.req.url || '', true);
|
|
const token = query.token as string;
|
|
|
|
if (!token) {
|
|
return callback(false, 401, 'Missing token');
|
|
}
|
|
|
|
// Verificar JWT
|
|
const decoded = jwt.verify(token, process.env.JWT_SECRET) as any;
|
|
|
|
if (!decoded.userId) {
|
|
return callback(false, 401, 'Invalid token');
|
|
}
|
|
|
|
// Guardar userId en request para uso posterior
|
|
(info.req as any).userId = decoded.userId;
|
|
|
|
callback(true);
|
|
} catch (error) {
|
|
callback(false, 401, 'Authentication failed');
|
|
}
|
|
}
|
|
|
|
private initialize(): void {
|
|
this.wss.on('connection', this.handleConnection.bind(this));
|
|
|
|
// Heartbeat cada 30 segundos
|
|
this.heartbeatInterval = setInterval(() => {
|
|
this.checkHeartbeats();
|
|
}, 30000);
|
|
|
|
console.log('Trading WebSocket Server initialized');
|
|
}
|
|
|
|
private handleConnection(ws: WebSocket, req: IncomingMessage): void {
|
|
const clientId = this.generateClientId();
|
|
const userId = (req as any).userId;
|
|
|
|
const client: ClientConnection = {
|
|
id: clientId,
|
|
ws,
|
|
userId,
|
|
subscriptions: new Set(),
|
|
lastHeartbeat: Date.now(),
|
|
isAlive: true,
|
|
};
|
|
|
|
this.clients.set(clientId, client);
|
|
|
|
console.log(`Client connected: ${clientId} (User: ${userId})`);
|
|
|
|
// Configurar handlers
|
|
ws.on('message', (data) => this.handleMessage(clientId, data));
|
|
ws.on('close', () => this.handleDisconnection(clientId));
|
|
ws.on('error', (error) => this.handleError(clientId, error));
|
|
ws.on('pong', () => this.handlePong(clientId));
|
|
|
|
// Enviar mensaje de bienvenida
|
|
this.sendToClient(clientId, {
|
|
type: 'connected',
|
|
clientId,
|
|
timestamp: Date.now(),
|
|
});
|
|
}
|
|
|
|
private handleMessage(clientId: string, data: Buffer | string): void {
|
|
try {
|
|
const message: WSMessage = JSON.parse(data.toString());
|
|
|
|
switch (message.type) {
|
|
case 'subscribe':
|
|
this.handleSubscribe(clientId, message.channel);
|
|
break;
|
|
|
|
case 'unsubscribe':
|
|
this.handleUnsubscribe(clientId, message.channel);
|
|
break;
|
|
|
|
case 'ping':
|
|
this.handlePing(clientId);
|
|
break;
|
|
|
|
default:
|
|
console.warn(`Unknown message type: ${message.type}`);
|
|
}
|
|
} catch (error) {
|
|
console.error('Error parsing message:', error);
|
|
this.sendError(clientId, 'Invalid message format');
|
|
}
|
|
}
|
|
|
|
private handleSubscribe(clientId: string, channel: string): void {
|
|
const client = this.clients.get(clientId);
|
|
if (!client) return;
|
|
|
|
// Agregar a suscripciones del cliente
|
|
client.subscriptions.add(channel);
|
|
|
|
// Agregar a mapa global de suscripciones
|
|
if (!this.subscriptions.has(channel)) {
|
|
this.subscriptions.set(channel, new Set());
|
|
}
|
|
this.subscriptions.get(channel).add(clientId);
|
|
|
|
console.log(`Client ${clientId} subscribed to ${channel}`);
|
|
|
|
// Confirmar suscripción
|
|
this.sendToClient(clientId, {
|
|
type: 'subscribed',
|
|
channel,
|
|
timestamp: Date.now(),
|
|
});
|
|
}
|
|
|
|
private handleUnsubscribe(clientId: string, channel: string): void {
|
|
const client = this.clients.get(clientId);
|
|
if (!client) return;
|
|
|
|
// Remover de suscripciones del cliente
|
|
client.subscriptions.delete(channel);
|
|
|
|
// Remover de mapa global
|
|
const subscribers = this.subscriptions.get(channel);
|
|
if (subscribers) {
|
|
subscribers.delete(clientId);
|
|
if (subscribers.size === 0) {
|
|
this.subscriptions.delete(channel);
|
|
}
|
|
}
|
|
|
|
console.log(`Client ${clientId} unsubscribed from ${channel}`);
|
|
|
|
this.sendToClient(clientId, {
|
|
type: 'unsubscribed',
|
|
channel,
|
|
timestamp: Date.now(),
|
|
});
|
|
}
|
|
|
|
private handlePing(clientId: string): void {
|
|
const client = this.clients.get(clientId);
|
|
if (!client) return;
|
|
|
|
client.lastHeartbeat = Date.now();
|
|
client.isAlive = true;
|
|
|
|
this.sendToClient(clientId, {
|
|
type: 'pong',
|
|
timestamp: Date.now(),
|
|
});
|
|
}
|
|
|
|
private handlePong(clientId: string): void {
|
|
const client = this.clients.get(clientId);
|
|
if (!client) return;
|
|
|
|
client.lastHeartbeat = Date.now();
|
|
client.isAlive = true;
|
|
}
|
|
|
|
private handleDisconnection(clientId: string): void {
|
|
const client = this.clients.get(clientId);
|
|
if (!client) return;
|
|
|
|
// Limpiar todas las suscripciones
|
|
client.subscriptions.forEach((channel) => {
|
|
const subscribers = this.subscriptions.get(channel);
|
|
if (subscribers) {
|
|
subscribers.delete(clientId);
|
|
if (subscribers.size === 0) {
|
|
this.subscriptions.delete(channel);
|
|
}
|
|
}
|
|
});
|
|
|
|
this.clients.delete(clientId);
|
|
console.log(`Client disconnected: ${clientId}`);
|
|
}
|
|
|
|
private handleError(clientId: string, error: Error): void {
|
|
console.error(`WebSocket error for client ${clientId}:`, error);
|
|
}
|
|
|
|
private checkHeartbeats(): void {
|
|
const now = Date.now();
|
|
const timeout = 60000; // 60 segundos
|
|
|
|
this.clients.forEach((client, clientId) => {
|
|
if (now - client.lastHeartbeat > timeout) {
|
|
console.log(`Client ${clientId} timeout, terminating connection`);
|
|
client.ws.terminate();
|
|
this.handleDisconnection(clientId);
|
|
} else if (client.ws.readyState === WebSocket.OPEN) {
|
|
client.isAlive = false;
|
|
client.ws.ping();
|
|
}
|
|
});
|
|
}
|
|
|
|
// Broadcast a canal específico
|
|
public broadcastToChannel(channel: string, data: any): void {
|
|
const subscribers = this.subscriptions.get(channel);
|
|
if (!subscribers || subscribers.size === 0) return;
|
|
|
|
const message = JSON.stringify({
|
|
type: 'data',
|
|
channel,
|
|
data,
|
|
timestamp: Date.now(),
|
|
});
|
|
|
|
subscribers.forEach((clientId) => {
|
|
const client = this.clients.get(clientId);
|
|
if (client && client.ws.readyState === WebSocket.OPEN) {
|
|
client.ws.send(message);
|
|
}
|
|
});
|
|
}
|
|
|
|
// Enviar a cliente específico
|
|
private sendToClient(clientId: string, data: any): void {
|
|
const client = this.clients.get(clientId);
|
|
if (client && client.ws.readyState === WebSocket.OPEN) {
|
|
client.ws.send(JSON.stringify(data));
|
|
}
|
|
}
|
|
|
|
private sendError(clientId: string, message: string): void {
|
|
this.sendToClient(clientId, {
|
|
type: 'error',
|
|
message,
|
|
timestamp: Date.now(),
|
|
});
|
|
}
|
|
|
|
private generateClientId(): string {
|
|
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
|
}
|
|
|
|
// Obtener estadísticas
|
|
public getStats() {
|
|
return {
|
|
totalClients: this.clients.size,
|
|
totalChannels: this.subscriptions.size,
|
|
channels: Array.from(this.subscriptions.entries()).map(([channel, subs]) => ({
|
|
channel,
|
|
subscribers: subs.size,
|
|
})),
|
|
};
|
|
}
|
|
|
|
public shutdown(): void {
|
|
clearInterval(this.heartbeatInterval);
|
|
|
|
this.clients.forEach((client) => {
|
|
client.ws.close(1000, 'Server shutting down');
|
|
});
|
|
|
|
this.wss.close();
|
|
console.log('WebSocket Server shutdown complete');
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. Binance WebSocket Client (`binance-ws.client.ts`)
|
|
|
|
**Ubicación:** `apps/backend/src/modules/trading/websocket/binance-ws.client.ts`
|
|
|
|
```typescript
|
|
import WebSocket from 'ws';
|
|
import { EventEmitter } from 'events';
|
|
|
|
export interface BinanceKlineStream {
|
|
e: 'kline';
|
|
E: number; // Event time
|
|
s: string; // Symbol
|
|
k: {
|
|
t: number; // Kline start time
|
|
T: number; // Kline close time
|
|
s: string; // Symbol
|
|
i: string; // Interval
|
|
f: number; // First trade ID
|
|
L: number; // Last trade ID
|
|
o: string; // Open price
|
|
c: string; // Close price
|
|
h: string; // High price
|
|
l: string; // Low price
|
|
v: string; // Base asset volume
|
|
n: number; // Number of trades
|
|
x: boolean; // Is kline closed
|
|
q: string; // Quote asset volume
|
|
V: string; // Taker buy base asset volume
|
|
Q: string; // Taker buy quote asset volume
|
|
};
|
|
}
|
|
|
|
export interface BinanceTickerStream {
|
|
e: '24hrTicker';
|
|
E: number; // Event time
|
|
s: string; // Symbol
|
|
p: string; // Price change
|
|
P: string; // Price change percent
|
|
w: string; // Weighted average price
|
|
c: string; // Last price
|
|
Q: string; // Last quantity
|
|
o: string; // Open price
|
|
h: string; // High price
|
|
l: string; // Low price
|
|
v: string; // Total traded base asset volume
|
|
q: string; // Total traded quote asset volume
|
|
}
|
|
|
|
export class BinanceWebSocketClient extends EventEmitter {
|
|
private ws: WebSocket | null = null;
|
|
private streams: Set<string>;
|
|
private reconnectAttempts: number = 0;
|
|
private maxReconnectAttempts: number = 10;
|
|
private reconnectDelay: number = 1000;
|
|
private pingInterval: NodeJS.Timeout | null = null;
|
|
private baseUrl: string = 'wss://stream.binance.com:9443';
|
|
|
|
constructor() {
|
|
super();
|
|
this.streams = new Set();
|
|
}
|
|
|
|
public connect(): void {
|
|
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
|
console.log('WebSocket already connected');
|
|
return;
|
|
}
|
|
|
|
const url = `${this.baseUrl}/ws`;
|
|
this.ws = new WebSocket(url);
|
|
|
|
this.ws.on('open', this.handleOpen.bind(this));
|
|
this.ws.on('message', this.handleMessage.bind(this));
|
|
this.ws.on('close', this.handleClose.bind(this));
|
|
this.ws.on('error', this.handleError.bind(this));
|
|
this.ws.on('pong', this.handlePong.bind(this));
|
|
}
|
|
|
|
private handleOpen(): void {
|
|
console.log('Connected to Binance WebSocket');
|
|
this.reconnectAttempts = 0;
|
|
|
|
// Resubscribir a streams previos
|
|
if (this.streams.size > 0) {
|
|
this.subscribeMultiple(Array.from(this.streams));
|
|
}
|
|
|
|
// Iniciar ping cada 3 minutos (Binance timeout es 10min)
|
|
this.pingInterval = setInterval(() => {
|
|
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
|
this.ws.ping();
|
|
}
|
|
}, 180000);
|
|
|
|
this.emit('connected');
|
|
}
|
|
|
|
private handleMessage(data: Buffer): void {
|
|
try {
|
|
const message = JSON.parse(data.toString());
|
|
|
|
// Respuesta a subscribe/unsubscribe
|
|
if (message.result === null && message.id) {
|
|
console.log(`Stream operation successful: ${message.id}`);
|
|
return;
|
|
}
|
|
|
|
// Datos del stream
|
|
if (message.stream && message.data) {
|
|
this.emit('stream', message.stream, message.data);
|
|
|
|
// Emitir eventos específicos por tipo
|
|
if (message.data.e === 'kline') {
|
|
this.emit('kline', message.stream, message.data);
|
|
} else if (message.data.e === '24hrTicker') {
|
|
this.emit('ticker', message.stream, message.data);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('Error parsing message:', error);
|
|
}
|
|
}
|
|
|
|
private handleClose(code: number, reason: string): void {
|
|
console.log(`WebSocket closed: ${code} - ${reason}`);
|
|
|
|
if (this.pingInterval) {
|
|
clearInterval(this.pingInterval);
|
|
this.pingInterval = null;
|
|
}
|
|
|
|
this.emit('disconnected', code, reason);
|
|
|
|
// Intentar reconexión
|
|
if (this.reconnectAttempts < this.maxReconnectAttempts) {
|
|
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
|
|
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`);
|
|
|
|
setTimeout(() => {
|
|
this.reconnectAttempts++;
|
|
this.connect();
|
|
}, delay);
|
|
} else {
|
|
console.error('Max reconnection attempts reached');
|
|
this.emit('reconnect-failed');
|
|
}
|
|
}
|
|
|
|
private handleError(error: Error): void {
|
|
console.error('WebSocket error:', error);
|
|
this.emit('error', error);
|
|
}
|
|
|
|
private handlePong(): void {
|
|
// console.log('Received pong from Binance');
|
|
}
|
|
|
|
// Suscribirse a stream
|
|
public subscribe(stream: string): void {
|
|
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
|
console.warn('WebSocket not connected, queuing subscription');
|
|
this.streams.add(stream);
|
|
return;
|
|
}
|
|
|
|
const id = Date.now();
|
|
const message = {
|
|
method: 'SUBSCRIBE',
|
|
params: [stream],
|
|
id,
|
|
};
|
|
|
|
this.ws.send(JSON.stringify(message));
|
|
this.streams.add(stream);
|
|
|
|
console.log(`Subscribed to ${stream}`);
|
|
}
|
|
|
|
// Suscribirse a múltiples streams
|
|
public subscribeMultiple(streams: string[]): void {
|
|
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
|
console.warn('WebSocket not connected, queuing subscriptions');
|
|
streams.forEach((s) => this.streams.add(s));
|
|
return;
|
|
}
|
|
|
|
const id = Date.now();
|
|
const message = {
|
|
method: 'SUBSCRIBE',
|
|
params: streams,
|
|
id,
|
|
};
|
|
|
|
this.ws.send(JSON.stringify(message));
|
|
streams.forEach((s) => this.streams.add(s));
|
|
|
|
console.log(`Subscribed to ${streams.length} streams`);
|
|
}
|
|
|
|
// Desuscribirse
|
|
public unsubscribe(stream: string): void {
|
|
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
|
this.streams.delete(stream);
|
|
return;
|
|
}
|
|
|
|
const id = Date.now();
|
|
const message = {
|
|
method: 'UNSUBSCRIBE',
|
|
params: [stream],
|
|
id,
|
|
};
|
|
|
|
this.ws.send(JSON.stringify(message));
|
|
this.streams.delete(stream);
|
|
|
|
console.log(`Unsubscribed from ${stream}`);
|
|
}
|
|
|
|
public disconnect(): void {
|
|
if (this.pingInterval) {
|
|
clearInterval(this.pingInterval);
|
|
this.pingInterval = null;
|
|
}
|
|
|
|
if (this.ws) {
|
|
this.ws.close(1000, 'Client disconnect');
|
|
this.ws = null;
|
|
}
|
|
|
|
this.streams.clear();
|
|
}
|
|
|
|
public isConnected(): boolean {
|
|
return this.ws !== null && this.ws.readyState === WebSocket.OPEN;
|
|
}
|
|
|
|
public getActiveStreams(): string[] {
|
|
return Array.from(this.streams);
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. Stream Manager (`stream-manager.ts`)
|
|
|
|
**Ubicación:** `apps/backend/src/modules/trading/websocket/stream-manager.ts`
|
|
|
|
```typescript
|
|
import { BinanceWebSocketClient } from './binance-ws.client';
|
|
import { TradingWebSocketServer } from './websocket.server';
|
|
|
|
export class StreamManager {
|
|
private binanceClient: BinanceWebSocketClient;
|
|
private wsServer: TradingWebSocketServer;
|
|
private channelToStream: Map<string, string>; // internal channel -> binance stream
|
|
|
|
constructor(wsServer: TradingWebSocketServer) {
|
|
this.wsServer = wsServer;
|
|
this.binanceClient = new BinanceWebSocketClient();
|
|
this.channelToStream = new Map();
|
|
|
|
this.initialize();
|
|
}
|
|
|
|
private initialize(): void {
|
|
this.binanceClient.connect();
|
|
|
|
// Forward kline data
|
|
this.binanceClient.on('kline', (stream: string, data: any) => {
|
|
const channel = this.streamToChannel(stream);
|
|
this.wsServer.broadcastToChannel(channel, {
|
|
type: 'kline',
|
|
data: this.formatKline(data),
|
|
});
|
|
});
|
|
|
|
// Forward ticker data
|
|
this.binanceClient.on('ticker', (stream: string, data: any) => {
|
|
const channel = this.streamToChannel(stream);
|
|
this.wsServer.broadcastToChannel(channel, {
|
|
type: 'ticker',
|
|
data: this.formatTicker(data),
|
|
});
|
|
});
|
|
|
|
// Handle reconnection
|
|
this.binanceClient.on('connected', () => {
|
|
console.log('Binance WebSocket reconnected, resubscribing...');
|
|
});
|
|
|
|
this.binanceClient.on('error', (error) => {
|
|
console.error('Binance WebSocket error:', error);
|
|
});
|
|
}
|
|
|
|
// Convertir canal interno a stream de Binance
|
|
private channelToStream(channel: string): string {
|
|
// channel format: "kline:BTCUSDT:1m" -> "btcusdt@kline_1m"
|
|
// channel format: "ticker:BTCUSDT" -> "btcusdt@ticker"
|
|
|
|
const parts = channel.split(':');
|
|
const type = parts[0];
|
|
const symbol = parts[1]?.toLowerCase();
|
|
|
|
if (type === 'kline') {
|
|
const interval = parts[2];
|
|
return `${symbol}@kline_${interval}`;
|
|
} else if (type === 'ticker') {
|
|
return `${symbol}@ticker`;
|
|
}
|
|
|
|
return '';
|
|
}
|
|
|
|
// Convertir stream de Binance a canal interno
|
|
private streamToChannel(stream: string): string {
|
|
// "btcusdt@kline_1m" -> "kline:BTCUSDT:1m"
|
|
// "btcusdt@ticker" -> "ticker:BTCUSDT"
|
|
|
|
const cached = Array.from(this.channelToStream.entries())
|
|
.find(([_, s]) => s === stream);
|
|
|
|
if (cached) {
|
|
return cached[0];
|
|
}
|
|
|
|
// Fallback parsing
|
|
const [symbol, streamType] = stream.split('@');
|
|
const upperSymbol = symbol.toUpperCase();
|
|
|
|
if (streamType.startsWith('kline_')) {
|
|
const interval = streamType.replace('kline_', '');
|
|
return `kline:${upperSymbol}:${interval}`;
|
|
} else if (streamType === 'ticker') {
|
|
return `ticker:${upperSymbol}`;
|
|
}
|
|
|
|
return stream;
|
|
}
|
|
|
|
// Subscribe to channel
|
|
public subscribeToChannel(channel: string): void {
|
|
const binanceStream = this.channelToStream(channel);
|
|
|
|
if (!binanceStream) {
|
|
console.error(`Invalid channel format: ${channel}`);
|
|
return;
|
|
}
|
|
|
|
this.channelToStream.set(channel, binanceStream);
|
|
this.binanceClient.subscribe(binanceStream);
|
|
}
|
|
|
|
// Unsubscribe from channel
|
|
public unsubscribeFromChannel(channel: string): void {
|
|
const binanceStream = this.channelToStream.get(channel);
|
|
|
|
if (!binanceStream) {
|
|
return;
|
|
}
|
|
|
|
this.binanceClient.unsubscribe(binanceStream);
|
|
this.channelToStream.delete(channel);
|
|
}
|
|
|
|
private formatKline(data: any) {
|
|
return {
|
|
symbol: data.s,
|
|
interval: data.k.i,
|
|
startTime: data.k.t,
|
|
endTime: data.k.T,
|
|
open: parseFloat(data.k.o),
|
|
high: parseFloat(data.k.h),
|
|
low: parseFloat(data.k.l),
|
|
close: parseFloat(data.k.c),
|
|
volume: parseFloat(data.k.v),
|
|
closed: data.k.x,
|
|
trades: data.k.n,
|
|
};
|
|
}
|
|
|
|
private formatTicker(data: any) {
|
|
return {
|
|
symbol: data.s,
|
|
price: parseFloat(data.c),
|
|
priceChange: parseFloat(data.p),
|
|
priceChangePercent: parseFloat(data.P),
|
|
high: parseFloat(data.h),
|
|
low: parseFloat(data.l),
|
|
volume: parseFloat(data.v),
|
|
quoteVolume: parseFloat(data.q),
|
|
};
|
|
}
|
|
|
|
public shutdown(): void {
|
|
this.binanceClient.disconnect();
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Componentes Frontend
|
|
|
|
### WebSocket Client Hook
|
|
|
|
**Ubicación:** `apps/frontend/src/modules/trading/hooks/useWebSocket.ts`
|
|
|
|
```typescript
|
|
import { useEffect, useRef, useCallback } from 'react';
|
|
import { useAuthStore } from '@/stores/auth.store';
|
|
|
|
export interface WSMessage {
|
|
type: string;
|
|
channel?: string;
|
|
data?: any;
|
|
timestamp?: number;
|
|
}
|
|
|
|
export function useWebSocket() {
|
|
const wsRef = useRef<WebSocket | null>(null);
|
|
const { token } = useAuthStore();
|
|
const reconnectTimeoutRef = useRef<NodeJS.Timeout>();
|
|
const reconnectAttempts = useRef(0);
|
|
const maxReconnectAttempts = 5;
|
|
|
|
const connect = useCallback(() => {
|
|
if (!token) return;
|
|
|
|
const wsUrl = `${process.env.REACT_APP_WS_URL}/ws/trading?token=${token}`;
|
|
const ws = new WebSocket(wsUrl);
|
|
|
|
ws.onopen = () => {
|
|
console.log('WebSocket connected');
|
|
reconnectAttempts.current = 0;
|
|
};
|
|
|
|
ws.onclose = (event) => {
|
|
console.log('WebSocket disconnected:', event.code);
|
|
|
|
// Intentar reconexión
|
|
if (reconnectAttempts.current < maxReconnectAttempts) {
|
|
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts.current), 30000);
|
|
console.log(`Reconnecting in ${delay}ms...`);
|
|
|
|
reconnectTimeoutRef.current = setTimeout(() => {
|
|
reconnectAttempts.current++;
|
|
connect();
|
|
}, delay);
|
|
}
|
|
};
|
|
|
|
ws.onerror = (error) => {
|
|
console.error('WebSocket error:', error);
|
|
};
|
|
|
|
wsRef.current = ws;
|
|
}, [token]);
|
|
|
|
const disconnect = useCallback(() => {
|
|
if (reconnectTimeoutRef.current) {
|
|
clearTimeout(reconnectTimeoutRef.current);
|
|
}
|
|
|
|
if (wsRef.current) {
|
|
wsRef.current.close(1000, 'Client disconnect');
|
|
wsRef.current = null;
|
|
}
|
|
}, []);
|
|
|
|
const subscribe = useCallback((channel: string) => {
|
|
if (wsRef.current?.readyState === WebSocket.OPEN) {
|
|
wsRef.current.send(JSON.stringify({
|
|
type: 'subscribe',
|
|
channel,
|
|
}));
|
|
}
|
|
}, []);
|
|
|
|
const unsubscribe = useCallback((channel: string) => {
|
|
if (wsRef.current?.readyState === WebSocket.OPEN) {
|
|
wsRef.current.send(JSON.stringify({
|
|
type: 'unsubscribe',
|
|
channel,
|
|
}));
|
|
}
|
|
}, []);
|
|
|
|
const onMessage = useCallback((callback: (message: WSMessage) => void) => {
|
|
if (wsRef.current) {
|
|
wsRef.current.onmessage = (event) => {
|
|
try {
|
|
const message = JSON.parse(event.data);
|
|
callback(message);
|
|
} catch (error) {
|
|
console.error('Error parsing WebSocket message:', error);
|
|
}
|
|
};
|
|
}
|
|
}, []);
|
|
|
|
useEffect(() => {
|
|
connect();
|
|
return () => disconnect();
|
|
}, [connect, disconnect]);
|
|
|
|
return {
|
|
subscribe,
|
|
unsubscribe,
|
|
onMessage,
|
|
isConnected: wsRef.current?.readyState === WebSocket.OPEN,
|
|
};
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Configuración
|
|
|
|
```typescript
|
|
// config/websocket.config.ts
|
|
export const websocketConfig = {
|
|
server: {
|
|
path: '/ws/trading',
|
|
heartbeatInterval: 30000,
|
|
clientTimeout: 60000,
|
|
},
|
|
|
|
binance: {
|
|
baseUrl: 'wss://stream.binance.com:9443',
|
|
pingInterval: 180000, // 3 minutes
|
|
reconnect: {
|
|
maxAttempts: 10,
|
|
initialDelay: 1000,
|
|
maxDelay: 30000,
|
|
},
|
|
},
|
|
|
|
channels: {
|
|
kline: 'kline:{symbol}:{interval}',
|
|
ticker: 'ticker:{symbol}',
|
|
depth: 'depth:{symbol}',
|
|
trade: 'trade:{symbol}',
|
|
},
|
|
};
|
|
```
|
|
|
|
---
|
|
|
|
## Testing
|
|
|
|
```typescript
|
|
describe('TradingWebSocketServer', () => {
|
|
let server: TradingWebSocketServer;
|
|
let httpServer: any;
|
|
|
|
beforeEach(() => {
|
|
httpServer = createServer();
|
|
server = new TradingWebSocketServer(httpServer);
|
|
});
|
|
|
|
afterEach(() => {
|
|
server.shutdown();
|
|
});
|
|
|
|
it('should accept authenticated connections', async () => {
|
|
const token = generateTestToken();
|
|
const ws = new WebSocket(`ws://localhost:3000/ws/trading?token=${token}`);
|
|
|
|
await new Promise((resolve) => {
|
|
ws.on('open', resolve);
|
|
});
|
|
|
|
expect(ws.readyState).toBe(WebSocket.OPEN);
|
|
});
|
|
|
|
it('should handle subscription', async () => {
|
|
// Test implementation
|
|
});
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Referencias
|
|
|
|
- [WebSocket Protocol RFC 6455](https://tools.ietf.org/html/rfc6455)
|
|
- [Binance WebSocket Streams](https://binance-docs.github.io/apidocs/spot/en/#websocket-market-streams)
|
|
- [ws Library Documentation](https://github.com/websockets/ws)
|