/** * Trading Stream Service * Real-time market data streaming via WebSocket * Now with direct Binance WebSocket integration for true real-time updates */ import { wsManager, WSClient, WSMessage } from './websocket.server'; import { mlIntegrationService } from '../../modules/ml/services/ml-integration.service'; import { mlOverlayService } from '../../modules/ml/services/ml-overlay.service'; import { binanceService, Kline } from '../../modules/trading/services/binance.service'; import { logger } from '../../shared/utils/logger'; import { EventEmitter } from 'events'; // ============================================================================ // Types // ============================================================================ export interface QuoteData { symbol: string; price: number; bid: number; ask: number; volume: number; change: number; changePercent: number; high: number; low: number; open: number; previousClose: number; timestamp: Date; } export interface TradeData { symbol: string; price: number; quantity: number; side: 'buy' | 'sell'; timestamp: Date; } export interface DepthData { symbol: string; bids: [number, number][]; // [price, quantity] asks: [number, number][]; timestamp: Date; } export interface SignalData { symbol: string; signalType: 'buy' | 'sell' | 'hold'; confidence: number; amdPhase: string; targetPrice: number; stopLoss: number; timestamp: Date; } export interface KlineData { symbol: string; interval: string; time: number; open: number; high: number; low: number; close: number; volume: number; isFinal: boolean; timestamp: Date; } // Channel prefixes const CHANNELS = { QUOTES: 'quotes', PRICE: 'price', KLINES: 'klines', TICKER: 'ticker', TRADES: 'trades', DEPTH: 'depth', SIGNALS: 'signals', OVERLAYS: 'overlays', PORTFOLIO: 'portfolio', ORDERS: 'orders', ALERTS: 'alerts', } as const; // ============================================================================ // Trading Stream Service // ============================================================================ class TradingStreamService extends EventEmitter { private quoteIntervals: Map = new Map(); private signalIntervals: Map = new Map(); private binanceStreamRefs: Map = new Map(); private priceCache: Map = new Map(); private initialized: boolean = false; private readonly QUOTE_UPDATE_INTERVAL = 1000; // 1 second (fallback only) private readonly SIGNAL_UPDATE_INTERVAL = 30000; // 30 seconds private readonly MAX_SYMBOLS_PER_CLIENT = 50; private readonly PRICE_CACHE_TTL = 5000; // 5 seconds /** * Initialize streaming service */ initialize(): void { if (this.initialized) return; // Register message handlers wsManager.registerHandler('requestQuote', this.handleQuoteRequest.bind(this)); wsManager.registerHandler('requestSignal', this.handleSignalRequest.bind(this)); wsManager.registerHandler('requestOverlay', this.handleOverlayRequest.bind(this)); // Listen for subscription events wsManager.on('subscribe', this.handleSubscribe.bind(this)); wsManager.on('unsubscribe', this.handleUnsubscribe.bind(this)); wsManager.on('disconnect', this.handleDisconnect.bind(this)); // Setup Binance WebSocket event listeners this.setupBinanceListeners(); this.initialized = true; logger.info('[TradingStream] Service initialized with Binance WebSocket integration'); } /** * Setup Binance WebSocket event listeners */ private setupBinanceListeners(): void { // Listen for ticker updates (24h statistics) binanceService.on('ticker', (data: Record) => { const quote = this.transformTickerToQuote(data); this.priceCache.set(quote.symbol, quote); // Broadcast to subscribed clients wsManager.broadcast(`${CHANNELS.TICKER}:${quote.symbol}`, { type: 'ticker', data: quote, }); // Also broadcast on price and quotes channels wsManager.broadcast(`${CHANNELS.PRICE}:${quote.symbol}`, { type: 'price', data: { symbol: quote.symbol, price: quote.price, change24h: quote.change, changePercent24h: quote.changePercent, high24h: quote.high, low24h: quote.low, volume24h: quote.volume, timestamp: quote.timestamp.getTime(), }, }); wsManager.broadcast(`${CHANNELS.QUOTES}:${quote.symbol}`, { type: 'quote', data: quote, }); }); // Listen for kline updates (candlestick data) binanceService.on('kline', (data: { symbol: string; interval: string; kline: Kline; isFinal: boolean }) => { const klineData: KlineData = { symbol: data.symbol, interval: data.interval, time: data.kline.openTime, open: parseFloat(data.kline.open), high: parseFloat(data.kline.high), low: parseFloat(data.kline.low), close: parseFloat(data.kline.close), volume: parseFloat(data.kline.volume), isFinal: data.isFinal, timestamp: new Date(), }; // Broadcast to subscribed clients wsManager.broadcast(`${CHANNELS.KLINES}:${data.symbol}:${data.interval}`, { type: 'kline', data: klineData, }); }); // Listen for trade updates binanceService.on('trade', (data: Record) => { const tradeData: TradeData = { symbol: data.symbol as string, price: parseFloat(data.price as string), quantity: parseFloat(data.quantity as string), side: data.isBuyerMaker ? 'sell' : 'buy', timestamp: new Date(data.time as number), }; wsManager.broadcast(`${CHANNELS.TRADES}:${data.symbol as string}`, { type: 'trade', data: tradeData, }); }); // Listen for depth updates binanceService.on('depth', (data: Record) => { const depthData: DepthData = { symbol: data.symbol as string, bids: (data.bids as [string, string][]).map((b: [string, string]) => [parseFloat(b[0]), parseFloat(b[1])]), asks: (data.asks as [string, string][]).map((a: [string, string]) => [parseFloat(a[0]), parseFloat(a[1])]), timestamp: new Date(), }; wsManager.broadcast(`${CHANNELS.DEPTH}:${data.symbol as string}`, { type: 'depth', data: depthData, }); }); logger.info('[TradingStream] Binance WebSocket listeners configured'); } /** * Transform Binance ticker to QuoteData */ private transformTickerToQuote(ticker: Record): QuoteData { const price = parseFloat((ticker.c || ticker.lastPrice || '0') as string); const change = parseFloat((ticker.p || ticker.priceChange || '0') as string); const changePercent = parseFloat((ticker.P || ticker.priceChangePercent || '0') as string); return { symbol: (ticker.s || ticker.symbol) as string, price, bid: parseFloat((ticker.b || ticker.bidPrice || '0') as string), ask: parseFloat((ticker.a || ticker.askPrice || '0') as string), volume: parseFloat((ticker.v || ticker.volume || '0') as string), change, changePercent, high: parseFloat((ticker.h || ticker.highPrice || '0') as string), low: parseFloat((ticker.l || ticker.lowPrice || '0') as string), open: parseFloat((ticker.o || ticker.openPrice || '0') as string), previousClose: parseFloat((ticker.x || ticker.prevClosePrice || '0') as string), timestamp: new Date(), }; } /** * Handle subscription to a channel */ private handleSubscribe(client: WSClient, channel: string): void { const parts = channel.split(':'); const type = parts[0]; const symbol = parts[1]?.toUpperCase(); const interval = parts[2]; // For klines if (!symbol) return; // Handle different channel types if (type === CHANNELS.PRICE || type === CHANNELS.TICKER || type === CHANNELS.QUOTES) { this.startTickerStream(symbol); } else if (type === CHANNELS.KLINES && interval) { this.startKlineStream(symbol, interval as '1m' | '3m' | '5m' | '15m' | '30m' | '1h' | '2h' | '4h' | '6h' | '8h' | '12h' | '1d' | '3d' | '1w' | '1M'); } else if (type === CHANNELS.TRADES) { this.startTradeStream(symbol); } else if (type === CHANNELS.DEPTH) { this.startDepthStream(symbol); } else if (type === CHANNELS.SIGNALS) { this.startSignalStream(symbol); } } /** * Handle unsubscription from a channel */ private handleUnsubscribe(_client: WSClient, channel: string): void { const parts = channel.split(':'); const type = parts[0]; const symbol = parts[1]?.toUpperCase(); const interval = parts[2]; // Check if anyone is still subscribed to this channel if (wsManager.getChannelSubscriberCount(channel) === 0) { if (type === CHANNELS.PRICE || type === CHANNELS.TICKER || type === CHANNELS.QUOTES) { this.stopTickerStream(symbol); } else if (type === CHANNELS.KLINES && interval) { this.stopKlineStream(symbol, interval as '1m' | '3m' | '5m' | '15m' | '30m' | '1h' | '2h' | '4h' | '6h' | '8h' | '12h' | '1d' | '3d' | '1w' | '1M'); } else if (type === CHANNELS.TRADES) { this.stopTradeStream(symbol); } else if (type === CHANNELS.DEPTH) { this.stopDepthStream(symbol); } else if (type === CHANNELS.SIGNALS) { this.stopSignalStream(symbol); } } } /** * Handle client disconnect */ private handleDisconnect(_client: WSClient): void { // Clean up empty channels wsManager.getActiveChannels().forEach((channel) => { if (wsManager.getChannelSubscriberCount(channel) === 0) { const parts = channel.split(':'); const type = parts[0]; const symbol = parts[1]; const interval = parts[2]; if (type === CHANNELS.PRICE || type === CHANNELS.TICKER || type === CHANNELS.QUOTES) { this.stopTickerStream(symbol); } else if (type === CHANNELS.KLINES && interval) { this.stopKlineStream(symbol, interval as '1m' | '3m' | '5m' | '15m' | '30m' | '1h' | '2h' | '4h' | '6h' | '8h' | '12h' | '1d' | '3d' | '1w' | '1M'); } else if (type === CHANNELS.TRADES) { this.stopTradeStream(symbol); } else if (type === CHANNELS.DEPTH) { this.stopDepthStream(symbol); } else if (type === CHANNELS.SIGNALS) { this.stopSignalStream(symbol); } } }); } /** * Handle quote request message */ private async handleQuoteRequest(client: WSClient, message: WSMessage): Promise { const { symbol } = message.data as { symbol: string }; if (!symbol) return; try { const quote = await this.fetchQuote(symbol.toUpperCase()); wsManager.send(client, { type: 'quote', channel: `${CHANNELS.QUOTES}:${symbol.toUpperCase()}`, data: quote, }); } catch { wsManager.send(client, { type: 'error', data: { message: `Failed to fetch quote for ${symbol}` }, }); } } /** * Handle signal request message */ private async handleSignalRequest(client: WSClient, message: WSMessage): Promise { const { symbol } = message.data as { symbol: string }; if (!symbol) return; try { const signal = await mlIntegrationService.getSignal(symbol.toUpperCase()); wsManager.send(client, { type: 'signal', channel: `${CHANNELS.SIGNALS}:${symbol.toUpperCase()}`, data: this.transformSignal(signal), }); } catch { wsManager.send(client, { type: 'error', data: { message: `Failed to fetch signal for ${symbol}` }, }); } } /** * Handle overlay request message */ private async handleOverlayRequest(client: WSClient, message: WSMessage): Promise { const { symbol, config } = message.data as { symbol: string; config?: Record }; if (!symbol) return; try { const overlay = await mlOverlayService.getChartOverlay(symbol.toUpperCase(), config); wsManager.send(client, { type: 'overlay', channel: `${CHANNELS.OVERLAYS}:${symbol.toUpperCase()}`, data: overlay, }); } catch { wsManager.send(client, { type: 'error', data: { message: `Failed to fetch overlay for ${symbol}` }, }); } } // ========================================================================== // Binance WebSocket Streaming Methods // ========================================================================== /** * Start ticker stream (24h stats) via Binance WebSocket */ private startTickerStream(symbol: string): void { const streamKey = `ticker:${symbol}`; // Check if already subscribed if (this.binanceStreamRefs.has(streamKey)) { logger.debug('[TradingStream] Ticker stream already active:', { symbol }); return; } try { // Subscribe to Binance WebSocket ticker stream binanceService.subscribeTicker(symbol); this.binanceStreamRefs.set(streamKey, { type: 'ticker', symbol }); logger.info('[TradingStream] Started Binance ticker stream:', { symbol }); // Send initial data from cache if available const cached = this.priceCache.get(symbol); if (cached) { wsManager.broadcast(`${CHANNELS.TICKER}:${symbol}`, { type: 'ticker', data: cached, }); } } catch (error) { logger.error('[TradingStream] Failed to start ticker stream:', { symbol, error: (error as Error).message }); } } /** * Stop ticker stream */ private stopTickerStream(symbol: string): void { const streamKey = `ticker:${symbol}`; if (this.binanceStreamRefs.has(streamKey)) { const streamName = `${symbol.toLowerCase()}@ticker`; binanceService.unsubscribe(streamName); this.binanceStreamRefs.delete(streamKey); this.priceCache.delete(symbol); logger.info('[TradingStream] Stopped Binance ticker stream:', { symbol }); } } /** * Start kline/candlestick stream via Binance WebSocket */ private startKlineStream(symbol: string, interval: string): void { const streamKey = `klines:${symbol}:${interval}`; if (this.binanceStreamRefs.has(streamKey)) { logger.debug('[TradingStream] Kline stream already active:', { symbol, interval }); return; } try { binanceService.subscribeKlines(symbol, interval as '1m' | '3m' | '5m' | '15m' | '30m' | '1h' | '2h' | '4h' | '6h' | '8h' | '12h' | '1d' | '3d' | '1w' | '1M'); this.binanceStreamRefs.set(streamKey, { type: 'klines', symbol, interval }); logger.info('[TradingStream] Started Binance kline stream:', { symbol, interval }); } catch (error) { logger.error('[TradingStream] Failed to start kline stream:', { symbol, interval, error: (error as Error).message }); } } /** * Stop kline stream */ private stopKlineStream(symbol: string, interval: string): void { const streamKey = `klines:${symbol}:${interval}`; if (this.binanceStreamRefs.has(streamKey)) { const streamName = `${symbol.toLowerCase()}@kline_${interval}`; binanceService.unsubscribe(streamName); this.binanceStreamRefs.delete(streamKey); logger.info('[TradingStream] Stopped Binance kline stream:', { symbol, interval }); } } /** * Start trade stream via Binance WebSocket */ private startTradeStream(symbol: string): void { const streamKey = `trades:${symbol}`; if (this.binanceStreamRefs.has(streamKey)) { logger.debug('[TradingStream] Trade stream already active:', { symbol }); return; } try { binanceService.subscribeTrades(symbol); this.binanceStreamRefs.set(streamKey, { type: 'trades', symbol }); logger.info('[TradingStream] Started Binance trade stream:', { symbol }); } catch (error) { logger.error('[TradingStream] Failed to start trade stream:', { symbol, error: (error as Error).message }); } } /** * Stop trade stream */ private stopTradeStream(symbol: string): void { const streamKey = `trades:${symbol}`; if (this.binanceStreamRefs.has(streamKey)) { const streamName = `${symbol.toLowerCase()}@trade`; binanceService.unsubscribe(streamName); this.binanceStreamRefs.delete(streamKey); logger.info('[TradingStream] Stopped Binance trade stream:', { symbol }); } } /** * Start depth/order book stream via Binance WebSocket */ private startDepthStream(symbol: string, levels: 5 | 10 | 20 = 10): void { const streamKey = `depth:${symbol}`; if (this.binanceStreamRefs.has(streamKey)) { logger.debug('[TradingStream] Depth stream already active:', { symbol }); return; } try { binanceService.subscribeDepth(symbol, levels); this.binanceStreamRefs.set(streamKey, { type: 'depth', symbol }); logger.info('[TradingStream] Started Binance depth stream:', { symbol, levels }); } catch (error) { logger.error('[TradingStream] Failed to start depth stream:', { symbol, error: (error as Error).message }); } } /** * Stop depth stream */ private stopDepthStream(symbol: string): void { const streamKey = `depth:${symbol}`; if (this.binanceStreamRefs.has(streamKey)) { const streamName = `${symbol.toLowerCase()}@depth10@100ms`; binanceService.unsubscribe(streamName); this.binanceStreamRefs.delete(streamKey); logger.info('[TradingStream] Stopped Binance depth stream:', { symbol }); } } // ========================================================================== // Legacy Quote Streaming (Fallback) // ========================================================================== /** * Start streaming quotes for a symbol (LEGACY - uses polling as fallback) */ private startQuoteStream(symbol: string): void { const key = `quotes:${symbol}`; if (this.quoteIntervals.has(key)) return; const interval = setInterval(async () => { try { const quote = await this.fetchQuote(symbol); wsManager.broadcast(`${CHANNELS.QUOTES}:${symbol}`, { type: 'quote', data: quote, }); } catch (_error) { logger.error('[TradingStream] Quote fetch error:', { symbol, error: (_error as Error).message }); } }, this.QUOTE_UPDATE_INTERVAL); this.quoteIntervals.set(key, interval); logger.debug('[TradingStream] Started quote stream (polling fallback):', { symbol }); } /** * Stop streaming quotes for a symbol (LEGACY) */ private stopQuoteStream(symbol: string): void { const key = `quotes:${symbol}`; const interval = this.quoteIntervals.get(key); if (interval) { clearInterval(interval); this.quoteIntervals.delete(key); logger.debug('[TradingStream] Stopped quote stream:', { symbol }); } } /** * Fetch quote data from Binance */ private async fetchQuote(symbol: string): Promise { try { // Get 24hr ticker from Binance const result = await binanceService.get24hrTicker(symbol); const ticker = Array.isArray(result) ? result[0] : result; if (!ticker) { throw new Error('No ticker data'); } const price = parseFloat(ticker.lastPrice); const change = parseFloat(ticker.priceChange); const changePercent = parseFloat(ticker.priceChangePercent); return { symbol: ticker.symbol, price, bid: parseFloat(ticker.bidPrice), ask: parseFloat(ticker.askPrice), volume: parseFloat(ticker.volume), change, changePercent, high: parseFloat(ticker.highPrice), low: parseFloat(ticker.lowPrice), open: parseFloat(ticker.openPrice), previousClose: parseFloat(ticker.prevClosePrice), timestamp: new Date(), }; } catch { // Fallback to simulated data if Binance fails logger.warn('[TradingStream] Binance fetch failed, using mock data:', { symbol }); return this.getMockQuote(symbol); } } /** * Get mock quote data (fallback) */ private getMockQuote(symbol: string): QuoteData { const basePrice = this.getBasePrice(symbol); const change = (Math.random() - 0.5) * basePrice * 0.02; const price = basePrice + change; return { symbol, price: parseFloat(price.toFixed(2)), bid: parseFloat((price - 0.01).toFixed(2)), ask: parseFloat((price + 0.01).toFixed(2)), volume: Math.floor(Math.random() * 1000000), change: parseFloat(change.toFixed(2)), changePercent: parseFloat(((change / basePrice) * 100).toFixed(2)), high: parseFloat((price + Math.random() * 2).toFixed(2)), low: parseFloat((price - Math.random() * 2).toFixed(2)), open: parseFloat((price - change * 0.5).toFixed(2)), previousClose: parseFloat(basePrice.toFixed(2)), timestamp: new Date(), }; } /** * Get base price for a symbol (mock fallback) */ private getBasePrice(symbol: string): number { const prices: Record = { BTCUSDT: 97500.00, ETHUSDT: 3650.00, BNBUSDT: 720.00, SOLUSDT: 235.00, XRPUSDT: 2.45, DOGEUSDT: 0.42, ADAUSDT: 1.10, AVAXUSDT: 48.50, DOTUSDT: 9.25, MATICUSDT: 0.58, }; return prices[symbol.toUpperCase()] || 100 + Math.random() * 100; } // ========================================================================== // Signal Streaming // ========================================================================== /** * Start streaming signals for a symbol */ private startSignalStream(symbol: string): void { const key = `signals:${symbol}`; if (this.signalIntervals.has(key)) return; // Initial signal fetch this.broadcastSignal(symbol); const interval = setInterval(async () => { await this.broadcastSignal(symbol); }, this.SIGNAL_UPDATE_INTERVAL); this.signalIntervals.set(key, interval); logger.debug('[TradingStream] Started signal stream:', { symbol }); } /** * Stop streaming signals for a symbol */ private stopSignalStream(symbol: string): void { const key = `signals:${symbol}`; const interval = this.signalIntervals.get(key); if (interval) { clearInterval(interval); this.signalIntervals.delete(key); logger.debug('[TradingStream] Stopped signal stream:', { symbol }); } } /** * Broadcast signal update */ private async broadcastSignal(symbol: string): Promise { try { const signal = await mlIntegrationService.getSignal(symbol); wsManager.broadcast(`${CHANNELS.SIGNALS}:${symbol}`, { type: 'signal', data: this.transformSignal(signal), }); } catch (_error) { logger.error('[TradingStream] Signal fetch error:', { symbol, error: (_error as Error).message }); } } /** * Transform ML signal to stream format */ private transformSignal(signal: unknown): SignalData { const s = signal as Record; const prediction = s.prediction as Record | undefined; return { symbol: s.symbol as string, signalType: s.signalType as 'buy' | 'sell' | 'hold', confidence: s.confidence as number, amdPhase: s.amdPhase as string, targetPrice: (prediction?.targetPrice as number) || 0, stopLoss: (prediction?.stopLoss as number) || 0, timestamp: new Date(s.timestamp as string | number), }; } // ========================================================================== // Public Methods // ========================================================================== /** * Broadcast trade execution to user */ broadcastTradeExecution(userId: string, trade: TradeData): void { wsManager.sendToUser(userId, { type: 'trade', channel: `${CHANNELS.TRADES}:${trade.symbol}`, data: trade, }); } /** * Broadcast order update to user */ broadcastOrderUpdate(userId: string, order: unknown): void { wsManager.sendToUser(userId, { type: 'orderUpdate', channel: CHANNELS.ORDERS, data: order, }); } /** * Broadcast portfolio update to user */ broadcastPortfolioUpdate(userId: string, portfolio: unknown): void { wsManager.sendToUser(userId, { type: 'portfolioUpdate', channel: CHANNELS.PORTFOLIO, data: portfolio, }); } /** * Broadcast alert to user */ broadcastAlert(userId: string, alert: unknown): void { wsManager.sendToUser(userId, { type: 'alert', channel: CHANNELS.ALERTS, data: alert, }); } /** * Broadcast system announcement to all */ broadcastAnnouncement(message: string): void { wsManager.broadcastAll({ type: 'announcement', data: { message }, }); } /** * Get streaming stats */ getStats(): { connectedClients: number; activeChannels: string[]; quoteStreams: number; signalStreams: number; binanceStreams: number; binanceActiveStreams: string[]; priceCache: number; } { return { connectedClients: wsManager.getClientCount(), activeChannels: wsManager.getActiveChannels(), quoteStreams: this.quoteIntervals.size, signalStreams: this.signalIntervals.size, binanceStreams: this.binanceStreamRefs.size, binanceActiveStreams: binanceService.getActiveStreams(), priceCache: this.priceCache.size, }; } /** * Shutdown service */ shutdown(): void { // Clear polling intervals this.quoteIntervals.forEach((interval) => clearInterval(interval)); this.signalIntervals.forEach((interval) => clearInterval(interval)); this.quoteIntervals.clear(); this.signalIntervals.clear(); // Unsubscribe from all Binance streams binanceService.unsubscribeAll(); this.binanceStreamRefs.clear(); this.priceCache.clear(); logger.info('[TradingStream] Service shut down'); } } // Export singleton instance export const tradingStreamService = new TradingStreamService();