""" WebSocket Route Handlers OrbiQuant IA Trading Platform - Data Service """ import asyncio import logging import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query from fastapi.websockets import WebSocketState from .manager import WebSocketManager, ConnectionManager logger = logging.getLogger(__name__) # Global WebSocket manager instance _ws_manager: Optional[WebSocketManager] = None def get_ws_manager() -> WebSocketManager: """Get or create WebSocket manager.""" global _ws_manager if _ws_manager is None: _ws_manager = WebSocketManager() return _ws_manager def set_ws_manager(manager: WebSocketManager) -> None: """Set the WebSocket manager instance.""" global _ws_manager _ws_manager = manager class WSRouter: """WebSocket router with handlers.""" def __init__(self, ws_manager: Optional[WebSocketManager] = None): self.router = APIRouter() self.ws_manager = ws_manager or get_ws_manager() self._setup_routes() def _setup_routes(self): """Setup WebSocket routes.""" @self.router.websocket("/ws/stream") async def websocket_stream( websocket: WebSocket, client_id: Optional[str] = Query(None) ): """ Main WebSocket endpoint for real-time data streaming. Connect and subscribe to channels: - ticker: Real-time price updates - candles: OHLCV candle updates - orderbook: Order book snapshots - trades: Recent trades - signals: Trading signals from ML models Example message format: ```json { "action": "subscribe", "channel": "ticker", "symbols": ["EURUSD", "BTCUSD"] } ``` """ # Generate client ID if not provided if not client_id: client_id = f"client_{uuid.uuid4().hex[:12]}" # Accept connection client = await self.ws_manager.connections.connect(websocket, client_id) # Send welcome message await websocket.send_json({ "type": "connected", "client_id": client_id, "message": "Connected to OrbiQuant Data Service", "timestamp": datetime.utcnow().isoformat(), "available_channels": ["ticker", "candles", "orderbook", "trades", "signals"] }) try: while True: # Receive message try: data = await asyncio.wait_for( websocket.receive_json(), timeout=60.0 # Heartbeat timeout ) except asyncio.TimeoutError: # Send ping to keep connection alive if websocket.client_state == WebSocketState.CONNECTED: await websocket.send_json({ "type": "ping", "timestamp": datetime.utcnow().isoformat() }) continue # Handle message response = await self.ws_manager.handle_message(client_id, data) await websocket.send_json(response) except WebSocketDisconnect: logger.info(f"Client {client_id} disconnected normally") except Exception as e: logger.error(f"WebSocket error for {client_id}: {e}") finally: await self.ws_manager.connections.disconnect(client_id) @self.router.websocket("/ws/ticker/{symbol}") async def websocket_ticker( websocket: WebSocket, symbol: str ): """ Simplified ticker WebSocket for a single symbol. Automatically subscribes to the ticker channel for the specified symbol. """ client_id = f"ticker_{uuid.uuid4().hex[:8]}" client = await self.ws_manager.connections.connect(websocket, client_id) await self.ws_manager.connections.subscribe( client_id=client_id, channel=self.ws_manager.connections.__class__.__bases__[0].__subclasses__()[0], # Channel.TICKER workaround symbol=symbol ) # Import here to avoid circular from .manager import Channel await self.ws_manager.connections.subscribe( client_id=client_id, channel=Channel.TICKER, symbol=symbol ) await websocket.send_json({ "type": "subscribed", "channel": "ticker", "symbol": symbol.upper(), "timestamp": datetime.utcnow().isoformat() }) try: while True: # Keep connection alive, data comes via broadcasts try: data = await asyncio.wait_for( websocket.receive_json(), timeout=30.0 ) # Handle ping/pong if data.get("type") == "ping": await websocket.send_json({ "type": "pong", "timestamp": datetime.utcnow().isoformat() }) except asyncio.TimeoutError: # Send heartbeat await websocket.send_json({ "type": "heartbeat", "timestamp": datetime.utcnow().isoformat() }) except WebSocketDisconnect: pass finally: await self.ws_manager.connections.disconnect(client_id) @self.router.get("/ws/stats") async def websocket_stats(): """Get WebSocket connection statistics.""" return { "status": "ok", "stats": self.ws_manager.connections.stats, "timestamp": datetime.utcnow().isoformat() }