trading-platform-data-servi.../test_batch_update.py
rckrdmrd 62a9f3e1d9 feat: Initial commit - Data Service
Data aggregation and distribution service:
- Market data collection
- OHLCV aggregation
- Real-time data feeds
- Data API endpoints

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-18 04:30:42 -06:00

242 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
Test script for Batch Asset Update
OrbiQuant IA Trading Platform
Tests:
1. Connection to Polygon API
2. Update of priority assets (XAU, EURUSD, BTC)
3. PostgreSQL database update
"""
import asyncio
import os
import sys
from datetime import datetime
from dotenv import load_dotenv
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
import asyncpg
# Load environment variables
load_dotenv()
async def test_polygon_connection():
"""Test Polygon API connection"""
print("\n" + "="*60)
print("TEST 1: Polygon API Connection")
print("="*60)
from providers.polygon_client import PolygonClient
api_key = os.getenv("POLYGON_API_KEY")
if not api_key:
print("ERROR: POLYGON_API_KEY not set")
return False
print(f"API Key: {api_key[:10]}...{api_key[-4:]}")
try:
async with PolygonClient(api_key=api_key, rate_limit_per_min=5) as client:
# Test forex snapshot (EURUSD)
print("\nFetching EUR/USD snapshot...")
snapshot = await client.get_snapshot_forex("EURUSD")
if snapshot:
print(f" Symbol: EUR/USD")
print(f" Bid: {snapshot.bid:.5f}")
print(f" Ask: {snapshot.ask:.5f}")
print(f" Last: {snapshot.last_price:.5f}")
print(f" Time: {snapshot.timestamp}")
print(" [OK] Forex API working")
else:
print(" [WARN] No data returned for EUR/USD")
# Test crypto snapshot (BTC)
print("\nFetching BTC/USD snapshot...")
snapshot = await client.get_snapshot_crypto("BTCUSD")
if snapshot:
print(f" Symbol: BTC/USD")
print(f" Last: ${snapshot.last_price:,.2f}")
print(f" Time: {snapshot.timestamp}")
print(" [OK] Crypto API working")
else:
print(" [WARN] No data returned for BTC/USD")
# Test gold (XAUUSD)
print("\nFetching XAU/USD (Gold) snapshot...")
snapshot = await client.get_snapshot_forex("XAUUSD")
if snapshot:
print(f" Symbol: XAU/USD")
print(f" Bid: ${snapshot.bid:,.2f}")
print(f" Ask: ${snapshot.ask:,.2f}")
print(f" Time: {snapshot.timestamp}")
print(" [OK] Gold API working")
else:
print(" [WARN] No data returned for XAU/USD")
return True
except Exception as e:
print(f"ERROR: {e}")
return False
async def test_database_connection():
"""Test PostgreSQL connection"""
print("\n" + "="*60)
print("TEST 2: PostgreSQL Database Connection")
print("="*60)
dsn = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
try:
pool = await asyncpg.create_pool(dsn, min_size=1, max_size=5)
async with pool.acquire() as conn:
# Test connection
version = await conn.fetchval("SELECT version()")
print(f"PostgreSQL: {version[:50]}...")
# Check tickers
count = await conn.fetchval(
"SELECT COUNT(*) FROM market_data.tickers WHERE is_active = true"
)
print(f"Active tickers: {count}")
# Check OHLCV data
ohlcv_count = await conn.fetchval(
"SELECT COUNT(*) FROM market_data.ohlcv_5m"
)
print(f"OHLCV records: {ohlcv_count:,}")
# Check priority tickers
priority = await conn.fetch("""
SELECT id, symbol FROM market_data.tickers
WHERE symbol IN ('XAUUSD', 'EURUSD', 'BTCUSD')
ORDER BY symbol
""")
print("\nPriority tickers:")
for t in priority:
print(f" ID {t['id']}: {t['symbol']}")
await pool.close()
print("\n[OK] Database connection working")
return True
except Exception as e:
print(f"ERROR: {e}")
return False
async def test_batch_update():
"""Test batch update process"""
print("\n" + "="*60)
print("TEST 3: Batch Update Process")
print("="*60)
from providers.polygon_client import PolygonClient
from providers.rate_limiter import RateLimiter
from services.priority_queue import PriorityQueue
from services.asset_updater import AssetUpdater
from services.batch_orchestrator import BatchOrchestrator
from config.priority_assets import PRIORITY_ASSETS
api_key = os.getenv("POLYGON_API_KEY")
dsn = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
try:
# Create components
polygon = PolygonClient(api_key=api_key, rate_limit_per_min=5)
rate_limiter = RateLimiter(calls_per_minute=5)
priority_queue = PriorityQueue()
db_pool = await asyncpg.create_pool(dsn, min_size=1, max_size=5)
# Enter async context for polygon client
await polygon.__aenter__()
# Create updater
updater = AssetUpdater(
polygon_client=polygon,
rate_limiter=rate_limiter,
db_pool=db_pool,
redis_client=None
)
print(f"\nPriority assets to update: {[a['symbol'] for a in PRIORITY_ASSETS]}")
print(f"Rate limit: {rate_limiter.limit} calls/min")
print(f"Rate limit remaining: {rate_limiter.get_remaining()}")
# Test update of each priority asset
print("\nUpdating priority assets...")
results = await updater.update_priority_assets()
print(f"\nResults:")
print(f" Updated: {results['updated']}")
print(f" Failed: {results['failed']}")
print(f" API calls used: {results['api_calls_used']}")
print(f" Rate limit remaining: {rate_limiter.get_remaining()}")
# Cleanup
await polygon.__aexit__(None, None, None)
await db_pool.close()
if len(results['updated']) == len(PRIORITY_ASSETS):
print("\n[OK] All priority assets updated successfully!")
return True
else:
print("\n[WARN] Some assets failed to update")
return len(results['updated']) > 0
except Exception as e:
print(f"ERROR: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests"""
print("\n" + "#"*60)
print("# OrbiQuant IA - Batch Update Test Suite")
print("# " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("#"*60)
results = {}
# Test 1: Polygon API
results['polygon'] = await test_polygon_connection()
# Test 2: Database
results['database'] = await test_database_connection()
# Test 3: Batch Update (only if previous tests pass)
if results['polygon'] and results['database']:
results['batch'] = await test_batch_update()
else:
print("\n[SKIP] Batch update test skipped due to previous failures")
results['batch'] = False
# Summary
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
for test, passed in results.items():
status = "[PASS]" if passed else "[FAIL]"
print(f" {test}: {status}")
all_passed = all(results.values())
print("\n" + ("All tests passed!" if all_passed else "Some tests failed."))
return 0 if all_passed else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)