trading-platform-data-servi.../src/websocket/handlers.py
rckrdmrd 62a9f3e1d9 feat: Initial commit - Data Service
Data aggregation and distribution service:
- Market data collection
- OHLCV aggregation
- Real-time data feeds
- Data API endpoints

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-18 04:30:42 -06:00

185 lines
6.3 KiB
Python

"""
WebSocket Route Handlers
OrbiQuant IA Trading Platform - Data Service
"""
import asyncio
import logging
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from fastapi.websockets import WebSocketState
from .manager import WebSocketManager, ConnectionManager
logger = logging.getLogger(__name__)
# Global WebSocket manager instance
_ws_manager: Optional[WebSocketManager] = None
def get_ws_manager() -> WebSocketManager:
"""Get or create WebSocket manager."""
global _ws_manager
if _ws_manager is None:
_ws_manager = WebSocketManager()
return _ws_manager
def set_ws_manager(manager: WebSocketManager) -> None:
"""Set the WebSocket manager instance."""
global _ws_manager
_ws_manager = manager
class WSRouter:
"""WebSocket router with handlers."""
def __init__(self, ws_manager: Optional[WebSocketManager] = None):
self.router = APIRouter()
self.ws_manager = ws_manager or get_ws_manager()
self._setup_routes()
def _setup_routes(self):
"""Setup WebSocket routes."""
@self.router.websocket("/ws/stream")
async def websocket_stream(
websocket: WebSocket,
client_id: Optional[str] = Query(None)
):
"""
Main WebSocket endpoint for real-time data streaming.
Connect and subscribe to channels:
- ticker: Real-time price updates
- candles: OHLCV candle updates
- orderbook: Order book snapshots
- trades: Recent trades
- signals: Trading signals from ML models
Example message format:
```json
{
"action": "subscribe",
"channel": "ticker",
"symbols": ["EURUSD", "BTCUSD"]
}
```
"""
# Generate client ID if not provided
if not client_id:
client_id = f"client_{uuid.uuid4().hex[:12]}"
# Accept connection
client = await self.ws_manager.connections.connect(websocket, client_id)
# Send welcome message
await websocket.send_json({
"type": "connected",
"client_id": client_id,
"message": "Connected to OrbiQuant Data Service",
"timestamp": datetime.utcnow().isoformat(),
"available_channels": ["ticker", "candles", "orderbook", "trades", "signals"]
})
try:
while True:
# Receive message
try:
data = await asyncio.wait_for(
websocket.receive_json(),
timeout=60.0 # Heartbeat timeout
)
except asyncio.TimeoutError:
# Send ping to keep connection alive
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.send_json({
"type": "ping",
"timestamp": datetime.utcnow().isoformat()
})
continue
# Handle message
response = await self.ws_manager.handle_message(client_id, data)
await websocket.send_json(response)
except WebSocketDisconnect:
logger.info(f"Client {client_id} disconnected normally")
except Exception as e:
logger.error(f"WebSocket error for {client_id}: {e}")
finally:
await self.ws_manager.connections.disconnect(client_id)
@self.router.websocket("/ws/ticker/{symbol}")
async def websocket_ticker(
websocket: WebSocket,
symbol: str
):
"""
Simplified ticker WebSocket for a single symbol.
Automatically subscribes to the ticker channel for the specified symbol.
"""
client_id = f"ticker_{uuid.uuid4().hex[:8]}"
client = await self.ws_manager.connections.connect(websocket, client_id)
await self.ws_manager.connections.subscribe(
client_id=client_id,
channel=self.ws_manager.connections.__class__.__bases__[0].__subclasses__()[0], # Channel.TICKER workaround
symbol=symbol
)
# Import here to avoid circular
from .manager import Channel
await self.ws_manager.connections.subscribe(
client_id=client_id,
channel=Channel.TICKER,
symbol=symbol
)
await websocket.send_json({
"type": "subscribed",
"channel": "ticker",
"symbol": symbol.upper(),
"timestamp": datetime.utcnow().isoformat()
})
try:
while True:
# Keep connection alive, data comes via broadcasts
try:
data = await asyncio.wait_for(
websocket.receive_json(),
timeout=30.0
)
# Handle ping/pong
if data.get("type") == "ping":
await websocket.send_json({
"type": "pong",
"timestamp": datetime.utcnow().isoformat()
})
except asyncio.TimeoutError:
# Send heartbeat
await websocket.send_json({
"type": "heartbeat",
"timestamp": datetime.utcnow().isoformat()
})
except WebSocketDisconnect:
pass
finally:
await self.ws_manager.connections.disconnect(client_id)
@self.router.get("/ws/stats")
async def websocket_stats():
"""Get WebSocket connection statistics."""
return {
"status": "ok",
"stats": self.ws_manager.connections.stats,
"timestamp": datetime.utcnow().isoformat()
}