#!/usr/bin/env python3 """ Example: Using the Data Sync Service OrbiQuant IA Trading Platform This example demonstrates how to use the sync service programmatically. """ import asyncio import asyncpg from datetime import datetime, timedelta from providers.polygon_client import PolygonClient, AssetType, Timeframe from services.sync_service import DataSyncService async def main(): """Main example function.""" # 1. Initialize database connection print("Connecting to database...") db_pool = await asyncpg.create_pool( host="localhost", port=5432, database="orbiquant_trading", user="orbiquant_user", password="orbiquant_dev_2025", min_size=2, max_size=10 ) print("Connected!") # 2. Initialize Polygon/Massive client print("\nInitializing Polygon/Massive client...") polygon_client = PolygonClient( api_key="YOUR_API_KEY_HERE", # Replace with your actual API key rate_limit_per_min=5, # Free tier limit use_massive_url=False # Set True to use api.massive.com ) print(f"Client initialized with base URL: {polygon_client.base_url}") # 3. Create sync service print("\nCreating sync service...") sync_service = DataSyncService( polygon_client=polygon_client, db_pool=db_pool, batch_size=10000 ) print("Sync service ready!") # 4. Get list of supported symbols print("\n" + "="*60) print("SUPPORTED SYMBOLS") print("="*60) symbols = await sync_service.get_supported_symbols() print(f"\nTotal symbols: {len(symbols)}") # Group by asset type forex_symbols = [s for s in symbols if s["asset_type"] == "forex"] crypto_symbols = [s for s in symbols if s["asset_type"] == "crypto"] index_symbols = [s for s in symbols if s["asset_type"] == "index"] print(f" - Forex: {len(forex_symbols)}") print(f" - Crypto: {len(crypto_symbols)}") print(f" - Indices: {len(index_symbols)}") # Show first 5 forex symbols print("\nFirst 5 forex symbols:") for sym in forex_symbols[:5]: print(f" {sym['symbol']:10} -> {sym['polygon_symbol']}") # 5. Sync a specific symbol print("\n" + "="*60) print("SYNCING EURUSD - 5 MINUTE DATA") print("="*60) result = await sync_service.sync_ticker_data( symbol="EURUSD", asset_type=AssetType.FOREX, timeframe=Timeframe.MINUTE_5, backfill_days=7 # Last 7 days ) print(f"\nSync completed!") print(f" Status: {result['status']}") print(f" Rows inserted: {result['rows_inserted']}") if result.get('start_date'): print(f" Date range: {result['start_date']} to {result['end_date']}") if result.get('error'): print(f" Error: {result['error']}") # 6. Sync multiple timeframes for same symbol print("\n" + "="*60) print("SYNCING MULTIPLE TIMEFRAMES FOR GBPUSD") print("="*60) timeframes = [ Timeframe.MINUTE_5, Timeframe.MINUTE_15, Timeframe.HOUR_1, ] for tf in timeframes: print(f"\nSyncing {tf.value}...") result = await sync_service.sync_ticker_data( symbol="GBPUSD", asset_type=AssetType.FOREX, timeframe=tf, backfill_days=3 ) print(f" {result['status']}: {result['rows_inserted']} rows") # 7. Get sync status print("\n" + "="*60) print("SYNC STATUS") print("="*60) status = await sync_service.get_sync_status() print(f"\nTotal sync records: {len(status)}") # Show recent syncs print("\nRecent syncs:") for s in status[:10]: last_sync = s['last_sync'] or "Never" print(f" {s['symbol']:10} {s['timeframe']:10} -> {s['status']:10} ({s['rows_synced']} rows)") # 8. Sync status for specific symbol print("\n" + "="*60) print("EURUSD SYNC STATUS (ALL TIMEFRAMES)") print("="*60) eurusd_status = await sync_service.get_sync_status(symbol="EURUSD") if eurusd_status: print(f"\nFound {len(eurusd_status)} timeframes:") for s in eurusd_status: print(f" {s['timeframe']:10} - Last sync: {s['last_sync'] or 'Never'}") print(f" Status: {s['status']}, Rows: {s['rows_synced']}") if s['error']: print(f" Error: {s['error']}") else: print("\nNo sync status found for EURUSD") # 9. Example: Sync all active tickers (commented out - can take a while) # print("\n" + "="*60) # print("SYNCING ALL ACTIVE TICKERS") # print("="*60) # # result = await sync_service.sync_all_active_tickers( # timeframe=Timeframe.MINUTE_5, # backfill_days=1 # ) # # print(f"\nSync completed!") # print(f" Total tickers: {result['total_tickers']}") # print(f" Successful: {result['successful']}") # print(f" Failed: {result['failed']}") # print(f" Total rows: {result['total_rows_inserted']}") # Cleanup print("\n" + "="*60) print("CLEANUP") print("="*60) await db_pool.close() if polygon_client._session: await polygon_client._session.close() print("\nDone!") if __name__ == "__main__": # Run the example asyncio.run(main())