workspace-v1/projects/trading-platform/apps/data-service/examples/sync_example.py
rckrdmrd 66161b1566 feat: Workspace-v1 complete migration with NEXUS v3.4
Sistema NEXUS v3.4 migrado con:

Estructura principal:
- core/orchestration: Sistema SIMCO + CAPVED (27 directivas, 28 perfiles)
- core/catalog: Catalogo de funcionalidades reutilizables
- shared/knowledge-base: Base de conocimiento compartida
- devtools/scripts: Herramientas de desarrollo
- control-plane/registries: Control de servicios y CI/CD
- orchestration/: Configuracion de orquestacion de agentes

Proyectos incluidos (11):
- gamilit (submodule -> GitHub)
- trading-platform (OrbiquanTIA)
- erp-suite con 5 verticales:
  - erp-core, construccion, vidrio-templado
  - mecanicas-diesel, retail, clinicas
- betting-analytics
- inmobiliaria-analytics
- platform_marketing_content
- pos-micro, erp-basico

Configuracion:
- .gitignore completo para Node.js/Python/Docker
- gamilit como submodule (git@github.com:rckrdmrd/gamilit-workspace.git)
- Sistema de puertos estandarizado (3005-3199)

Generated with NEXUS v3.4 Migration System
EPIC-010: Configuracion Git y Repositorios
2026-01-04 03:37:42 -06:00

177 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
Example: Using the Data Sync Service
OrbiQuant IA Trading Platform
This example demonstrates how to use the sync service programmatically.
"""
import asyncio
import asyncpg
from datetime import datetime, timedelta
from providers.polygon_client import PolygonClient, AssetType, Timeframe
from services.sync_service import DataSyncService
async def main():
"""Main example function."""
# 1. Initialize database connection
print("Connecting to database...")
db_pool = await asyncpg.create_pool(
host="localhost",
port=5432,
database="orbiquant_trading",
user="orbiquant_user",
password="orbiquant_dev_2025",
min_size=2,
max_size=10
)
print("Connected!")
# 2. Initialize Polygon/Massive client
print("\nInitializing Polygon/Massive client...")
polygon_client = PolygonClient(
api_key="YOUR_API_KEY_HERE", # Replace with your actual API key
rate_limit_per_min=5, # Free tier limit
use_massive_url=False # Set True to use api.massive.com
)
print(f"Client initialized with base URL: {polygon_client.base_url}")
# 3. Create sync service
print("\nCreating sync service...")
sync_service = DataSyncService(
polygon_client=polygon_client,
db_pool=db_pool,
batch_size=10000
)
print("Sync service ready!")
# 4. Get list of supported symbols
print("\n" + "="*60)
print("SUPPORTED SYMBOLS")
print("="*60)
symbols = await sync_service.get_supported_symbols()
print(f"\nTotal symbols: {len(symbols)}")
# Group by asset type
forex_symbols = [s for s in symbols if s["asset_type"] == "forex"]
crypto_symbols = [s for s in symbols if s["asset_type"] == "crypto"]
index_symbols = [s for s in symbols if s["asset_type"] == "index"]
print(f" - Forex: {len(forex_symbols)}")
print(f" - Crypto: {len(crypto_symbols)}")
print(f" - Indices: {len(index_symbols)}")
# Show first 5 forex symbols
print("\nFirst 5 forex symbols:")
for sym in forex_symbols[:5]:
print(f" {sym['symbol']:10} -> {sym['polygon_symbol']}")
# 5. Sync a specific symbol
print("\n" + "="*60)
print("SYNCING EURUSD - 5 MINUTE DATA")
print("="*60)
result = await sync_service.sync_ticker_data(
symbol="EURUSD",
asset_type=AssetType.FOREX,
timeframe=Timeframe.MINUTE_5,
backfill_days=7 # Last 7 days
)
print(f"\nSync completed!")
print(f" Status: {result['status']}")
print(f" Rows inserted: {result['rows_inserted']}")
if result.get('start_date'):
print(f" Date range: {result['start_date']} to {result['end_date']}")
if result.get('error'):
print(f" Error: {result['error']}")
# 6. Sync multiple timeframes for same symbol
print("\n" + "="*60)
print("SYNCING MULTIPLE TIMEFRAMES FOR GBPUSD")
print("="*60)
timeframes = [
Timeframe.MINUTE_5,
Timeframe.MINUTE_15,
Timeframe.HOUR_1,
]
for tf in timeframes:
print(f"\nSyncing {tf.value}...")
result = await sync_service.sync_ticker_data(
symbol="GBPUSD",
asset_type=AssetType.FOREX,
timeframe=tf,
backfill_days=3
)
print(f" {result['status']}: {result['rows_inserted']} rows")
# 7. Get sync status
print("\n" + "="*60)
print("SYNC STATUS")
print("="*60)
status = await sync_service.get_sync_status()
print(f"\nTotal sync records: {len(status)}")
# Show recent syncs
print("\nRecent syncs:")
for s in status[:10]:
last_sync = s['last_sync'] or "Never"
print(f" {s['symbol']:10} {s['timeframe']:10} -> {s['status']:10} ({s['rows_synced']} rows)")
# 8. Sync status for specific symbol
print("\n" + "="*60)
print("EURUSD SYNC STATUS (ALL TIMEFRAMES)")
print("="*60)
eurusd_status = await sync_service.get_sync_status(symbol="EURUSD")
if eurusd_status:
print(f"\nFound {len(eurusd_status)} timeframes:")
for s in eurusd_status:
print(f" {s['timeframe']:10} - Last sync: {s['last_sync'] or 'Never'}")
print(f" Status: {s['status']}, Rows: {s['rows_synced']}")
if s['error']:
print(f" Error: {s['error']}")
else:
print("\nNo sync status found for EURUSD")
# 9. Example: Sync all active tickers (commented out - can take a while)
# print("\n" + "="*60)
# print("SYNCING ALL ACTIVE TICKERS")
# print("="*60)
#
# result = await sync_service.sync_all_active_tickers(
# timeframe=Timeframe.MINUTE_5,
# backfill_days=1
# )
#
# print(f"\nSync completed!")
# print(f" Total tickers: {result['total_tickers']}")
# print(f" Successful: {result['successful']}")
# print(f" Failed: {result['failed']}")
# print(f" Total rows: {result['total_rows_inserted']}")
# Cleanup
print("\n" + "="*60)
print("CLEANUP")
print("="*60)
await db_pool.close()
if polygon_client._session:
await polygon_client._session.close()
print("\nDone!")
if __name__ == "__main__":
# Run the example
asyncio.run(main())