Data aggregation and distribution service: - Market data collection - OHLCV aggregation - Real-time data feeds - Data API endpoints Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
185 lines
6.3 KiB
Python
185 lines
6.3 KiB
Python
"""
|
|
WebSocket Route Handlers
|
|
OrbiQuant IA Trading Platform - Data Service
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
|
|
from fastapi.websockets import WebSocketState
|
|
|
|
from .manager import WebSocketManager, ConnectionManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global WebSocket manager instance
|
|
_ws_manager: Optional[WebSocketManager] = None
|
|
|
|
|
|
def get_ws_manager() -> WebSocketManager:
|
|
"""Get or create WebSocket manager."""
|
|
global _ws_manager
|
|
if _ws_manager is None:
|
|
_ws_manager = WebSocketManager()
|
|
return _ws_manager
|
|
|
|
|
|
def set_ws_manager(manager: WebSocketManager) -> None:
|
|
"""Set the WebSocket manager instance."""
|
|
global _ws_manager
|
|
_ws_manager = manager
|
|
|
|
|
|
class WSRouter:
|
|
"""WebSocket router with handlers."""
|
|
|
|
def __init__(self, ws_manager: Optional[WebSocketManager] = None):
|
|
self.router = APIRouter()
|
|
self.ws_manager = ws_manager or get_ws_manager()
|
|
self._setup_routes()
|
|
|
|
def _setup_routes(self):
|
|
"""Setup WebSocket routes."""
|
|
|
|
@self.router.websocket("/ws/stream")
|
|
async def websocket_stream(
|
|
websocket: WebSocket,
|
|
client_id: Optional[str] = Query(None)
|
|
):
|
|
"""
|
|
Main WebSocket endpoint for real-time data streaming.
|
|
|
|
Connect and subscribe to channels:
|
|
- ticker: Real-time price updates
|
|
- candles: OHLCV candle updates
|
|
- orderbook: Order book snapshots
|
|
- trades: Recent trades
|
|
- signals: Trading signals from ML models
|
|
|
|
Example message format:
|
|
```json
|
|
{
|
|
"action": "subscribe",
|
|
"channel": "ticker",
|
|
"symbols": ["EURUSD", "BTCUSD"]
|
|
}
|
|
```
|
|
"""
|
|
# Generate client ID if not provided
|
|
if not client_id:
|
|
client_id = f"client_{uuid.uuid4().hex[:12]}"
|
|
|
|
# Accept connection
|
|
client = await self.ws_manager.connections.connect(websocket, client_id)
|
|
|
|
# Send welcome message
|
|
await websocket.send_json({
|
|
"type": "connected",
|
|
"client_id": client_id,
|
|
"message": "Connected to OrbiQuant Data Service",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"available_channels": ["ticker", "candles", "orderbook", "trades", "signals"]
|
|
})
|
|
|
|
try:
|
|
while True:
|
|
# Receive message
|
|
try:
|
|
data = await asyncio.wait_for(
|
|
websocket.receive_json(),
|
|
timeout=60.0 # Heartbeat timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
# Send ping to keep connection alive
|
|
if websocket.client_state == WebSocketState.CONNECTED:
|
|
await websocket.send_json({
|
|
"type": "ping",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
continue
|
|
|
|
# Handle message
|
|
response = await self.ws_manager.handle_message(client_id, data)
|
|
await websocket.send_json(response)
|
|
|
|
except WebSocketDisconnect:
|
|
logger.info(f"Client {client_id} disconnected normally")
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error for {client_id}: {e}")
|
|
finally:
|
|
await self.ws_manager.connections.disconnect(client_id)
|
|
|
|
@self.router.websocket("/ws/ticker/{symbol}")
|
|
async def websocket_ticker(
|
|
websocket: WebSocket,
|
|
symbol: str
|
|
):
|
|
"""
|
|
Simplified ticker WebSocket for a single symbol.
|
|
|
|
Automatically subscribes to the ticker channel for the specified symbol.
|
|
"""
|
|
client_id = f"ticker_{uuid.uuid4().hex[:8]}"
|
|
|
|
client = await self.ws_manager.connections.connect(websocket, client_id)
|
|
await self.ws_manager.connections.subscribe(
|
|
client_id=client_id,
|
|
channel=self.ws_manager.connections.__class__.__bases__[0].__subclasses__()[0], # Channel.TICKER workaround
|
|
symbol=symbol
|
|
)
|
|
|
|
# Import here to avoid circular
|
|
from .manager import Channel
|
|
|
|
await self.ws_manager.connections.subscribe(
|
|
client_id=client_id,
|
|
channel=Channel.TICKER,
|
|
symbol=symbol
|
|
)
|
|
|
|
await websocket.send_json({
|
|
"type": "subscribed",
|
|
"channel": "ticker",
|
|
"symbol": symbol.upper(),
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
try:
|
|
while True:
|
|
# Keep connection alive, data comes via broadcasts
|
|
try:
|
|
data = await asyncio.wait_for(
|
|
websocket.receive_json(),
|
|
timeout=30.0
|
|
)
|
|
# Handle ping/pong
|
|
if data.get("type") == "ping":
|
|
await websocket.send_json({
|
|
"type": "pong",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
except asyncio.TimeoutError:
|
|
# Send heartbeat
|
|
await websocket.send_json({
|
|
"type": "heartbeat",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
finally:
|
|
await self.ws_manager.connections.disconnect(client_id)
|
|
|
|
@self.router.get("/ws/stats")
|
|
async def websocket_stats():
|
|
"""Get WebSocket connection statistics."""
|
|
return {
|
|
"status": "ok",
|
|
"stats": self.ws_manager.connections.stats,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|