Some checks failed
CI Pipeline / changes (push) Has been cancelled
CI Pipeline / core (push) Has been cancelled
CI Pipeline / trading-backend (push) Has been cancelled
CI Pipeline / trading-data-service (push) Has been cancelled
CI Pipeline / trading-frontend (push) Has been cancelled
CI Pipeline / erp-core (push) Has been cancelled
CI Pipeline / erp-mecanicas (push) Has been cancelled
CI Pipeline / gamilit-backend (push) Has been cancelled
CI Pipeline / gamilit-frontend (push) Has been cancelled
177 lines
5.2 KiB
Python
177 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Example: Using the Data Sync Service
|
|
OrbiQuant IA Trading Platform
|
|
|
|
This example demonstrates how to use the sync service programmatically.
|
|
"""
|
|
|
|
import asyncio
|
|
import asyncpg
|
|
from datetime import datetime, timedelta
|
|
|
|
from providers.polygon_client import PolygonClient, AssetType, Timeframe
|
|
from services.sync_service import DataSyncService
|
|
|
|
|
|
async def main():
|
|
"""Main example function."""
|
|
|
|
# 1. Initialize database connection
|
|
print("Connecting to database...")
|
|
db_pool = await asyncpg.create_pool(
|
|
host="localhost",
|
|
port=5432,
|
|
database="orbiquant_trading",
|
|
user="orbiquant_user",
|
|
password="orbiquant_dev_2025",
|
|
min_size=2,
|
|
max_size=10
|
|
)
|
|
print("Connected!")
|
|
|
|
# 2. Initialize Polygon/Massive client
|
|
print("\nInitializing Polygon/Massive client...")
|
|
polygon_client = PolygonClient(
|
|
api_key="YOUR_API_KEY_HERE", # Replace with your actual API key
|
|
rate_limit_per_min=5, # Free tier limit
|
|
use_massive_url=False # Set True to use api.massive.com
|
|
)
|
|
print(f"Client initialized with base URL: {polygon_client.base_url}")
|
|
|
|
# 3. Create sync service
|
|
print("\nCreating sync service...")
|
|
sync_service = DataSyncService(
|
|
polygon_client=polygon_client,
|
|
db_pool=db_pool,
|
|
batch_size=10000
|
|
)
|
|
print("Sync service ready!")
|
|
|
|
# 4. Get list of supported symbols
|
|
print("\n" + "="*60)
|
|
print("SUPPORTED SYMBOLS")
|
|
print("="*60)
|
|
|
|
symbols = await sync_service.get_supported_symbols()
|
|
print(f"\nTotal symbols: {len(symbols)}")
|
|
|
|
# Group by asset type
|
|
forex_symbols = [s for s in symbols if s["asset_type"] == "forex"]
|
|
crypto_symbols = [s for s in symbols if s["asset_type"] == "crypto"]
|
|
index_symbols = [s for s in symbols if s["asset_type"] == "index"]
|
|
|
|
print(f" - Forex: {len(forex_symbols)}")
|
|
print(f" - Crypto: {len(crypto_symbols)}")
|
|
print(f" - Indices: {len(index_symbols)}")
|
|
|
|
# Show first 5 forex symbols
|
|
print("\nFirst 5 forex symbols:")
|
|
for sym in forex_symbols[:5]:
|
|
print(f" {sym['symbol']:10} -> {sym['polygon_symbol']}")
|
|
|
|
# 5. Sync a specific symbol
|
|
print("\n" + "="*60)
|
|
print("SYNCING EURUSD - 5 MINUTE DATA")
|
|
print("="*60)
|
|
|
|
result = await sync_service.sync_ticker_data(
|
|
symbol="EURUSD",
|
|
asset_type=AssetType.FOREX,
|
|
timeframe=Timeframe.MINUTE_5,
|
|
backfill_days=7 # Last 7 days
|
|
)
|
|
|
|
print(f"\nSync completed!")
|
|
print(f" Status: {result['status']}")
|
|
print(f" Rows inserted: {result['rows_inserted']}")
|
|
if result.get('start_date'):
|
|
print(f" Date range: {result['start_date']} to {result['end_date']}")
|
|
if result.get('error'):
|
|
print(f" Error: {result['error']}")
|
|
|
|
# 6. Sync multiple timeframes for same symbol
|
|
print("\n" + "="*60)
|
|
print("SYNCING MULTIPLE TIMEFRAMES FOR GBPUSD")
|
|
print("="*60)
|
|
|
|
timeframes = [
|
|
Timeframe.MINUTE_5,
|
|
Timeframe.MINUTE_15,
|
|
Timeframe.HOUR_1,
|
|
]
|
|
|
|
for tf in timeframes:
|
|
print(f"\nSyncing {tf.value}...")
|
|
result = await sync_service.sync_ticker_data(
|
|
symbol="GBPUSD",
|
|
asset_type=AssetType.FOREX,
|
|
timeframe=tf,
|
|
backfill_days=3
|
|
)
|
|
print(f" {result['status']}: {result['rows_inserted']} rows")
|
|
|
|
# 7. Get sync status
|
|
print("\n" + "="*60)
|
|
print("SYNC STATUS")
|
|
print("="*60)
|
|
|
|
status = await sync_service.get_sync_status()
|
|
|
|
print(f"\nTotal sync records: {len(status)}")
|
|
|
|
# Show recent syncs
|
|
print("\nRecent syncs:")
|
|
for s in status[:10]:
|
|
last_sync = s['last_sync'] or "Never"
|
|
print(f" {s['symbol']:10} {s['timeframe']:10} -> {s['status']:10} ({s['rows_synced']} rows)")
|
|
|
|
# 8. Sync status for specific symbol
|
|
print("\n" + "="*60)
|
|
print("EURUSD SYNC STATUS (ALL TIMEFRAMES)")
|
|
print("="*60)
|
|
|
|
eurusd_status = await sync_service.get_sync_status(symbol="EURUSD")
|
|
|
|
if eurusd_status:
|
|
print(f"\nFound {len(eurusd_status)} timeframes:")
|
|
for s in eurusd_status:
|
|
print(f" {s['timeframe']:10} - Last sync: {s['last_sync'] or 'Never'}")
|
|
print(f" Status: {s['status']}, Rows: {s['rows_synced']}")
|
|
if s['error']:
|
|
print(f" Error: {s['error']}")
|
|
else:
|
|
print("\nNo sync status found for EURUSD")
|
|
|
|
# 9. Example: Sync all active tickers (commented out - can take a while)
|
|
# print("\n" + "="*60)
|
|
# print("SYNCING ALL ACTIVE TICKERS")
|
|
# print("="*60)
|
|
#
|
|
# result = await sync_service.sync_all_active_tickers(
|
|
# timeframe=Timeframe.MINUTE_5,
|
|
# backfill_days=1
|
|
# )
|
|
#
|
|
# print(f"\nSync completed!")
|
|
# print(f" Total tickers: {result['total_tickers']}")
|
|
# print(f" Successful: {result['successful']}")
|
|
# print(f" Failed: {result['failed']}")
|
|
# print(f" Total rows: {result['total_rows_inserted']}")
|
|
|
|
# Cleanup
|
|
print("\n" + "="*60)
|
|
print("CLEANUP")
|
|
print("="*60)
|
|
|
|
await db_pool.close()
|
|
if polygon_client._session:
|
|
await polygon_client._session.close()
|
|
|
|
print("\nDone!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run the example
|
|
asyncio.run(main())
|