#!/usr/bin/env python3 """ Batch Service Runner OrbiQuant IA Trading Platform Runs the BatchOrchestrator for continuous asset updates. """ import asyncio import os import sys import signal import logging from datetime import datetime from dotenv import load_dotenv # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) import asyncpg from providers.polygon_client import PolygonClient from providers.rate_limiter import RateLimiter from services.priority_queue import PriorityQueue from services.asset_updater import AssetUpdater from services.batch_orchestrator import BatchOrchestrator # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class BatchService: """Main batch service wrapper""" def __init__(self): self.orchestrator = None self.polygon = None self.db_pool = None self._shutdown = False async def start(self): """Initialize and start the batch service""" logger.info("=" * 60) logger.info("OrbiQuant IA - Batch Service Starting") logger.info(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("=" * 60) # Database connection dsn = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}" self.db_pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10) logger.info("✓ Database pool created") # Polygon client api_key = os.getenv("POLYGON_API_KEY") self.polygon = PolygonClient(api_key=api_key, rate_limit_per_min=5) await self.polygon.__aenter__() logger.info("✓ Polygon client initialized") # Rate limiter rate_limiter = RateLimiter(calls_per_minute=5) logger.info("✓ Rate limiter configured (5 calls/min)") # Priority queue priority_queue = PriorityQueue() logger.info("✓ Priority queue initialized") # Asset updater asset_updater = AssetUpdater( polygon_client=self.polygon, rate_limiter=rate_limiter, db_pool=self.db_pool, redis_client=None ) logger.info("✓ Asset updater ready") # Batch orchestrator batch_interval = int(os.getenv("BATCH_INTERVAL_MINUTES", 5)) self.orchestrator = BatchOrchestrator( asset_updater=asset_updater, priority_queue=priority_queue, batch_interval_minutes=batch_interval ) # Start orchestrator await self.orchestrator.start() logger.info("✓ Batch orchestrator started") logger.info("-" * 60) logger.info("Service is running. Press Ctrl+C to stop.") logger.info("-" * 60) async def stop(self): """Gracefully stop the service""" logger.info("Shutting down batch service...") if self.orchestrator: await self.orchestrator.stop() if self.polygon: await self.polygon.__aexit__(None, None, None) if self.db_pool: await self.db_pool.close() logger.info("Batch service stopped.") async def run(self, duration_seconds: int = None): """ Run the service. Args: duration_seconds: If set, run for this duration then stop. If None, run until interrupted. """ await self.start() try: if duration_seconds: logger.info(f"Running for {duration_seconds} seconds...") await asyncio.sleep(duration_seconds) else: # Run indefinitely while not self._shutdown: await asyncio.sleep(1) except asyncio.CancelledError: pass finally: await self.stop() def request_shutdown(self): """Request graceful shutdown""" self._shutdown = True async def main(): """Main entry point""" service = BatchService() # Handle signals loop = asyncio.get_event_loop() def signal_handler(): logger.info("Received shutdown signal") service.request_shutdown() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) # Run for 2 minutes for testing, or indefinitely in production test_mode = os.getenv("TEST_MODE", "true").lower() == "true" if test_mode: # Run for 120 seconds to see at least one batch cycle await service.run(duration_seconds=120) else: await service.run() if __name__ == "__main__": asyncio.run(main())