/** * WebSocket Server * Real-time streaming for trading data */ import { WebSocketServer, WebSocket, RawData } from 'ws'; import { Server as HttpServer } from 'http'; import { parse as parseUrl } from 'url'; import jwt from 'jsonwebtoken'; import { config } from '../../config'; import { logger } from '../../shared/utils/logger'; import { EventEmitter } from 'events'; // ============================================================================ // Types // ============================================================================ export interface WSClient { id: string; ws: WebSocket; userId?: string; subscriptions: Set; isAlive: boolean; connectedAt: Date; lastPing: Date; } export interface WSMessage { type: string; channel?: string; data?: unknown; timestamp?: string; } export interface SubscriptionMessage { type: 'subscribe' | 'unsubscribe'; channels: string[]; } export type MessageHandler = (client: WSClient, message: WSMessage) => void | Promise; // ============================================================================ // WebSocket Manager // ============================================================================ class WebSocketManager extends EventEmitter { private wss: WebSocketServer | null = null; private clients: Map = new Map(); private channelSubscribers: Map> = new Map(); private messageHandlers: Map = new Map(); private heartbeatInterval: NodeJS.Timeout | null = null; private readonly HEARTBEAT_INTERVAL = 30000; private readonly CLIENT_TIMEOUT = 60000; /** * Initialize WebSocket server */ initialize(httpServer: HttpServer): void { this.wss = new WebSocketServer({ server: httpServer, path: '/ws', verifyClient: this.verifyClient.bind(this), }); this.wss.on('connection', this.handleConnection.bind(this)); this.wss.on('error', (error) => { logger.error('[WS] Server error:', { error: error.message }); }); this.startHeartbeat(); logger.info('[WS] WebSocket server initialized'); } /** * Verify client connection (authentication) */ private verifyClient( info: { origin: string; req: { url?: string } }, callback: (result: boolean, code?: number, message?: string) => void ): void { try { const url = parseUrl(info.req.url || '', true); const token = url.query.token as string; // Allow connection without token (public channels only) if (!token) { callback(true); return; } // Verify JWT token jwt.verify(token, config.jwt.accessSecret); callback(true); } catch (error) { logger.warn('[WS] Client verification failed:', { error: (error as Error).message }); callback(false, 401, 'Unauthorized'); } } /** * Handle new WebSocket connection */ private handleConnection(ws: WebSocket, req: { url?: string }): void { const clientId = this.generateClientId(); const url = parseUrl(req.url || '', true); const token = url.query.token as string; let userId: string | undefined; if (token) { try { const decoded = jwt.verify(token, config.jwt.accessSecret) as { sub: string }; userId = decoded.sub; } catch { // Token invalid, continue as anonymous } } const client: WSClient = { id: clientId, ws, userId, subscriptions: new Set(), isAlive: true, connectedAt: new Date(), lastPing: new Date(), }; this.clients.set(clientId, client); // Setup event handlers ws.on('message', (data) => this.handleMessage(client, data)); ws.on('pong', () => { client.isAlive = true; client.lastPing = new Date(); }); ws.on('close', () => this.handleDisconnect(client)); ws.on('error', (error) => { logger.error('[WS] Client error:', { clientId, error: error.message }); }); // Send welcome message this.send(client, { type: 'connected', data: { clientId, authenticated: !!userId, timestamp: new Date().toISOString(), }, }); this.emit('connection', client); logger.info('[WS] Client connected:', { clientId, userId, authenticated: !!userId }); } /** * Handle incoming message */ private handleMessage(client: WSClient, rawData: RawData): void { try { const message = JSON.parse(rawData.toString()) as WSMessage; // Handle subscription messages if (message.type === 'subscribe' || message.type === 'unsubscribe') { this.handleSubscription(client, message as SubscriptionMessage); return; } // Handle ping if (message.type === 'ping') { this.send(client, { type: 'pong', timestamp: new Date().toISOString() }); return; } // Call registered message handler const handler = this.messageHandlers.get(message.type); if (handler) { handler(client, message); } else { logger.debug('[WS] Unknown message type:', { type: message.type, clientId: client.id }); } this.emit('message', client, message); } catch (error) { logger.error('[WS] Message parse error:', { error: (error as Error).message }); this.send(client, { type: 'error', data: { message: 'Invalid message format' }, }); } } /** * Handle subscription requests */ private handleSubscription(client: WSClient, message: SubscriptionMessage): void { const channels = Array.isArray(message.channels) ? message.channels : []; channels.forEach((channel) => { // Check if channel requires authentication if (this.isPrivateChannel(channel) && !client.userId) { this.send(client, { type: 'error', channel, data: { message: 'Authentication required for this channel' }, }); return; } if (message.type === 'subscribe') { // Subscribe client.subscriptions.add(channel); if (!this.channelSubscribers.has(channel)) { this.channelSubscribers.set(channel, new Set()); } this.channelSubscribers.get(channel)!.add(client.id); this.send(client, { type: 'subscribed', channel, timestamp: new Date().toISOString(), }); this.emit('subscribe', client, channel); logger.debug('[WS] Client subscribed:', { clientId: client.id, channel }); } else { // Unsubscribe client.subscriptions.delete(channel); this.channelSubscribers.get(channel)?.delete(client.id); this.send(client, { type: 'unsubscribed', channel, timestamp: new Date().toISOString(), }); this.emit('unsubscribe', client, channel); logger.debug('[WS] Client unsubscribed:', { clientId: client.id, channel }); } }); } /** * Handle client disconnect */ private handleDisconnect(client: WSClient): void { // Remove from all channel subscriptions client.subscriptions.forEach((channel) => { this.channelSubscribers.get(channel)?.delete(client.id); }); this.clients.delete(client.id); this.emit('disconnect', client); logger.info('[WS] Client disconnected:', { clientId: client.id, userId: client.userId }); } /** * Start heartbeat to detect dead connections */ private startHeartbeat(): void { this.heartbeatInterval = setInterval(() => { const now = Date.now(); this.clients.forEach((client) => { if (!client.isAlive) { // Client didn't respond to last ping logger.warn('[WS] Client timed out:', { clientId: client.id }); client.ws.terminate(); return; } // Check for stale connections if (now - client.lastPing.getTime() > this.CLIENT_TIMEOUT) { logger.warn('[WS] Client connection stale:', { clientId: client.id }); client.ws.terminate(); return; } client.isAlive = false; client.ws.ping(); }); }, this.HEARTBEAT_INTERVAL); } /** * Send message to a client */ send(client: WSClient, message: WSMessage): void { if (client.ws.readyState === WebSocket.OPEN) { client.ws.send(JSON.stringify({ ...message, timestamp: message.timestamp || new Date().toISOString(), })); } } /** * Broadcast to a channel */ broadcast(channel: string, message: WSMessage): void { const subscribers = this.channelSubscribers.get(channel); if (!subscribers) return; const payload = JSON.stringify({ ...message, channel, timestamp: message.timestamp || new Date().toISOString(), }); subscribers.forEach((clientId) => { const client = this.clients.get(clientId); if (client && client.ws.readyState === WebSocket.OPEN) { client.ws.send(payload); } }); } /** * Broadcast to all clients */ broadcastAll(message: WSMessage): void { const payload = JSON.stringify({ ...message, timestamp: message.timestamp || new Date().toISOString(), }); this.clients.forEach((client) => { if (client.ws.readyState === WebSocket.OPEN) { client.ws.send(payload); } }); } /** * Send to specific user (all their connections) */ sendToUser(userId: string, message: WSMessage): void { const payload = JSON.stringify({ ...message, timestamp: message.timestamp || new Date().toISOString(), }); this.clients.forEach((client) => { if (client.userId === userId && client.ws.readyState === WebSocket.OPEN) { client.ws.send(payload); } }); } /** * Register message handler */ registerHandler(type: string, handler: MessageHandler): void { this.messageHandlers.set(type, handler); } /** * Get channel subscriber count */ getChannelSubscriberCount(channel: string): number { return this.channelSubscribers.get(channel)?.size || 0; } /** * Get connected clients count */ getClientCount(): number { return this.clients.size; } /** * Get all subscribed channels */ getActiveChannels(): string[] { return Array.from(this.channelSubscribers.keys()).filter( (channel) => (this.channelSubscribers.get(channel)?.size || 0) > 0 ); } /** * Check if channel is private (requires auth) */ private isPrivateChannel(channel: string): boolean { return ( channel.startsWith('user:') || channel.startsWith('portfolio:') || channel.startsWith('account:') || channel.startsWith('orders:') ); } /** * Generate unique client ID */ private generateClientId(): string { return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } /** * Shutdown WebSocket server */ shutdown(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } this.clients.forEach((client) => { this.send(client, { type: 'shutdown', data: { message: 'Server shutting down' } }); client.ws.close(); }); this.wss?.close(); logger.info('[WS] WebSocket server shut down'); } } // Export singleton instance export const wsManager = new WebSocketManager();