Data aggregation and distribution service: - Market data collection - OHLCV aggregation - Real-time data feeds - Data API endpoints Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
169 lines
4.8 KiB
Python
169 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Batch Service Runner
|
|
OrbiQuant IA Trading Platform
|
|
|
|
Runs the BatchOrchestrator for continuous asset updates.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import signal
|
|
import logging
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
|
|
|
|
import asyncpg
|
|
|
|
from providers.polygon_client import PolygonClient
|
|
from providers.rate_limiter import RateLimiter
|
|
from services.priority_queue import PriorityQueue
|
|
from services.asset_updater import AssetUpdater
|
|
from services.batch_orchestrator import BatchOrchestrator
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BatchService:
|
|
"""Main batch service wrapper"""
|
|
|
|
def __init__(self):
|
|
self.orchestrator = None
|
|
self.polygon = None
|
|
self.db_pool = None
|
|
self._shutdown = False
|
|
|
|
async def start(self):
|
|
"""Initialize and start the batch service"""
|
|
logger.info("=" * 60)
|
|
logger.info("OrbiQuant IA - Batch Service Starting")
|
|
logger.info(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
logger.info("=" * 60)
|
|
|
|
# Database connection
|
|
dsn = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
|
|
self.db_pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
|
|
logger.info("✓ Database pool created")
|
|
|
|
# Polygon client
|
|
api_key = os.getenv("POLYGON_API_KEY")
|
|
self.polygon = PolygonClient(api_key=api_key, rate_limit_per_min=5)
|
|
await self.polygon.__aenter__()
|
|
logger.info("✓ Polygon client initialized")
|
|
|
|
# Rate limiter
|
|
rate_limiter = RateLimiter(calls_per_minute=5)
|
|
logger.info("✓ Rate limiter configured (5 calls/min)")
|
|
|
|
# Priority queue
|
|
priority_queue = PriorityQueue()
|
|
logger.info("✓ Priority queue initialized")
|
|
|
|
# Asset updater
|
|
asset_updater = AssetUpdater(
|
|
polygon_client=self.polygon,
|
|
rate_limiter=rate_limiter,
|
|
db_pool=self.db_pool,
|
|
redis_client=None
|
|
)
|
|
logger.info("✓ Asset updater ready")
|
|
|
|
# Batch orchestrator
|
|
batch_interval = int(os.getenv("BATCH_INTERVAL_MINUTES", 5))
|
|
self.orchestrator = BatchOrchestrator(
|
|
asset_updater=asset_updater,
|
|
priority_queue=priority_queue,
|
|
batch_interval_minutes=batch_interval
|
|
)
|
|
|
|
# Start orchestrator
|
|
await self.orchestrator.start()
|
|
logger.info("✓ Batch orchestrator started")
|
|
|
|
logger.info("-" * 60)
|
|
logger.info("Service is running. Press Ctrl+C to stop.")
|
|
logger.info("-" * 60)
|
|
|
|
async def stop(self):
|
|
"""Gracefully stop the service"""
|
|
logger.info("Shutting down batch service...")
|
|
|
|
if self.orchestrator:
|
|
await self.orchestrator.stop()
|
|
|
|
if self.polygon:
|
|
await self.polygon.__aexit__(None, None, None)
|
|
|
|
if self.db_pool:
|
|
await self.db_pool.close()
|
|
|
|
logger.info("Batch service stopped.")
|
|
|
|
async def run(self, duration_seconds: int = None):
|
|
"""
|
|
Run the service.
|
|
|
|
Args:
|
|
duration_seconds: If set, run for this duration then stop.
|
|
If None, run until interrupted.
|
|
"""
|
|
await self.start()
|
|
|
|
try:
|
|
if duration_seconds:
|
|
logger.info(f"Running for {duration_seconds} seconds...")
|
|
await asyncio.sleep(duration_seconds)
|
|
else:
|
|
# Run indefinitely
|
|
while not self._shutdown:
|
|
await asyncio.sleep(1)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
await self.stop()
|
|
|
|
def request_shutdown(self):
|
|
"""Request graceful shutdown"""
|
|
self._shutdown = True
|
|
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
service = BatchService()
|
|
|
|
# Handle signals
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def signal_handler():
|
|
logger.info("Received shutdown signal")
|
|
service.request_shutdown()
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, signal_handler)
|
|
|
|
# Run for 2 minutes for testing, or indefinitely in production
|
|
test_mode = os.getenv("TEST_MODE", "true").lower() == "true"
|
|
|
|
if test_mode:
|
|
# Run for 120 seconds to see at least one batch cycle
|
|
await service.run(duration_seconds=120)
|
|
else:
|
|
await service.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|