--- id: "INTEGRACION-API-MASSIVE" title: "Integracion API Massive - Pipeline de Datos" type: "Documentation" project: "trading-platform" version: "1.0.0" updated_date: "2026-01-04" --- # Integracion API Massive - Pipeline de Datos **Version:** 1.0.0 **Fecha:** 2025-12-08 **Modulo:** Data Services **Autor:** Trading Strategist - Trading Platform --- ## Tabla de Contenidos 1. [Vision General](#vision-general) 2. [API Massive Overview](#api-massive-overview) 3. [Arquitectura de Datos](#arquitectura-de-datos) 4. [Pipeline de Ingesta](#pipeline-de-ingesta) 5. [Gap Detection y Filling](#gap-detection-y-filling) 6. [Sincronizacion con MT4](#sincronizacion-con-mt4) 7. [API Endpoints](#api-endpoints) 8. [Implementacion](#implementacion) 9. [Scheduling](#scheduling) --- ## Vision General ### Objetivo Implementar un pipeline de datos robusto que: 1. **Descargue datos historicos** desde API Massive 2. **Actualice datos** de forma incremental 3. **Detecte y rellene gaps** en los datos 4. **Sincronice** con precios de brokers MT4 5. **Mantenga calidad** de datos para ML ### Simbolos Soportados | Simbolo | Descripcion | Datos Disponibles | |---------|-------------|-------------------| | XAUUSD | Oro vs USD | 10+ anos | | EURUSD | Euro vs USD | 10+ anos | | GBPUSD | Libra vs USD | 10+ anos | | USDJPY | USD vs Yen | 10+ anos | ### Timeframes | Timeframe | Codigo | Barras/Dia | |-----------|--------|------------| | 1 minuto | M1 | 1,440 | | 5 minutos | M5 | 288 | | 15 minutos | M15 | 96 | | 1 hora | H1 | 24 | | 4 horas | H4 | 6 | | Diario | D1 | 1 | --- ## API Massive Overview ### Autenticacion ```python # API Massive uses API key authentication headers = { "Authorization": f"Bearer {API_MASSIVE_KEY}", "Content-Type": "application/json" } ``` ### Endpoints Principales | Endpoint | Metodo | Descripcion | |----------|--------|-------------| | `/api/v1/symbols` | GET | Lista simbolos disponibles | | `/api/v1/candles/{symbol}` | GET | Obtiene datos OHLCV | | `/api/v1/candles/batch` | POST | Descarga batch de datos | | `/api/v1/tick/{symbol}` | GET | Datos tick-by-tick | ### Rate Limits | Plan | Requests/min | Candles/request | Max Historical | |------|--------------|-----------------|----------------| | Free | 10 | 1,000 | 1 year | | Basic | 60 | 5,000 | 5 years | | Pro | 300 | 50,000 | Unlimited | --- ## Arquitectura de Datos ### Diagrama de Flujo ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ DATA PIPELINE ARCHITECTURE │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ DATA SOURCES │ │ │ │ │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ │ │ API Massive │ │ MetaTrader4 │ │ Backup │ │ │ │ │ │ (Historical)│ │ (Live) │ │ (S3) │ │ │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ │ │ │ └─────────┼─────────────────┼─────────────────┼───────────────────────┘ │ │ │ │ │ │ │ └────────────────┬┴─────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ DATA PROCESSOR │ │ │ │ │ │ │ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ │ │ │ Fetcher │ │ Validator │ │ Normalizer │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ - Download │─▶│ - Schema │─▶│ - Timezone │ │ │ │ │ │ - Retry │ │ - Range │ │ - Decimals │ │ │ │ │ │ - Rate limit │ │ - Outliers │ │ - Format │ │ │ │ │ └───────────────┘ └───────────────┘ └───────┬───────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ │ │ GAP HANDLER │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ │ │ Detector │─▶│ Filler │─▶│ Merger │ │ │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ │ │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ └─────────────────────────────────────────────────┼───────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ STORAGE │ │ │ │ │ │ │ │ ┌────────────────────────────────────────────────────────────────┐ │ │ │ │ │ PostgreSQL │ │ │ │ │ │ │ │ │ │ │ │ Tables: │ │ │ │ │ │ - market_data (partitioned by symbol, month) │ │ │ │ │ │ - data_gaps (tracking gaps) │ │ │ │ │ │ - sync_status (last sync per symbol) │ │ │ │ │ │ │ │ │ │ │ │ Indices: │ │ │ │ │ │ - (symbol, timeframe, timestamp) - unique │ │ │ │ │ │ - (symbol, timestamp) - for range queries │ │ │ │ │ │ │ │ │ │ │ └────────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### Schema de Base de Datos ```sql -- Market data table (partitioned) CREATE TABLE market_data ( id BIGSERIAL, symbol VARCHAR(20) NOT NULL, timeframe VARCHAR(5) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, open DECIMAL(18, 8) NOT NULL, high DECIMAL(18, 8) NOT NULL, low DECIMAL(18, 8) NOT NULL, close DECIMAL(18, 8) NOT NULL, volume DECIMAL(18, 8), source VARCHAR(20) DEFAULT 'api_massive', created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (symbol, timeframe, timestamp) ) PARTITION BY LIST (symbol); -- Create partitions per symbol CREATE TABLE market_data_xauusd PARTITION OF market_data FOR VALUES IN ('XAUUSD'); CREATE TABLE market_data_eurusd PARTITION OF market_data FOR VALUES IN ('EURUSD'); CREATE TABLE market_data_gbpusd PARTITION OF market_data FOR VALUES IN ('GBPUSD'); CREATE TABLE market_data_usdjpy PARTITION OF market_data FOR VALUES IN ('USDJPY'); -- Index for time range queries CREATE INDEX idx_market_data_symbol_time ON market_data (symbol, timestamp DESC); -- Sync status table CREATE TABLE sync_status ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, timeframe VARCHAR(5) NOT NULL, last_sync_time TIMESTAMPTZ, last_candle_time TIMESTAMPTZ, total_candles BIGINT DEFAULT 0, status VARCHAR(20) DEFAULT 'pending', updated_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(symbol, timeframe) ); -- Data gaps tracking CREATE TABLE data_gaps ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, timeframe VARCHAR(5) NOT NULL, gap_start TIMESTAMPTZ NOT NULL, gap_end TIMESTAMPTZ NOT NULL, candles_missing INT, status VARCHAR(20) DEFAULT 'detected', -- detected, filling, filled, unfillable created_at TIMESTAMPTZ DEFAULT NOW(), filled_at TIMESTAMPTZ, UNIQUE(symbol, timeframe, gap_start) ); ``` --- ## Pipeline de Ingesta ### API Massive Client ```python # services/api_massive_client.py import httpx import asyncio from typing import List, Dict, Optional from datetime import datetime, timedelta from dataclasses import dataclass @dataclass class Candle: timestamp: datetime open: float high: float low: float close: float volume: float class APIMassiveClient: """ Cliente para API Massive """ def __init__(self, config: Dict): self.base_url = config['base_url'] self.api_key = config['api_key'] self.rate_limit = config.get('rate_limit', 60) # requests/min self.batch_size = config.get('batch_size', 5000) self._request_count = 0 self._last_reset = datetime.now() async def get_candles( self, symbol: str, timeframe: str, start: datetime, end: datetime ) -> List[Candle]: """ Obtiene candles para un rango de tiempo Args: symbol: Par de trading (XAUUSD, etc.) timeframe: Timeframe (M5, H1, etc.) start: Fecha de inicio end: Fecha de fin Returns: Lista de Candle objects """ await self._check_rate_limit() async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/v1/candles/{symbol}", headers={"Authorization": f"Bearer {self.api_key}"}, params={ "timeframe": timeframe, "start": start.isoformat(), "end": end.isoformat(), "limit": self.batch_size }, timeout=60.0 ) if response.status_code != 200: raise Exception(f"API Error: {response.status_code} - {response.text}") data = response.json() return [ Candle( timestamp=datetime.fromisoformat(c['timestamp']), open=float(c['open']), high=float(c['high']), low=float(c['low']), close=float(c['close']), volume=float(c.get('volume', 0)) ) for c in data['candles'] ] async def get_candles_batch( self, symbol: str, timeframe: str, start: datetime, end: datetime ) -> List[Candle]: """ Descarga candles en batches para rangos largos """ all_candles = [] current_start = start while current_start < end: # Calculate batch end candles_per_day = self._get_candles_per_day(timeframe) days_per_batch = self.batch_size // candles_per_day batch_end = min(current_start + timedelta(days=days_per_batch), end) # Fetch batch candles = await self.get_candles(symbol, timeframe, current_start, batch_end) all_candles.extend(candles) # Move to next batch if candles: current_start = candles[-1].timestamp + self._get_timeframe_delta(timeframe) else: current_start = batch_end # Progress logging print(f" Downloaded {len(all_candles)} candles up to {current_start}") return all_candles async def _check_rate_limit(self): """Verifica y espera si es necesario por rate limit""" now = datetime.now() # Reset counter cada minuto if (now - self._last_reset).seconds >= 60: self._request_count = 0 self._last_reset = now # Check limit if self._request_count >= self.rate_limit: sleep_time = 60 - (now - self._last_reset).seconds print(f"Rate limit reached. Sleeping {sleep_time}s...") await asyncio.sleep(sleep_time) self._request_count = 0 self._last_reset = datetime.now() self._request_count += 1 def _get_candles_per_day(self, timeframe: str) -> int: """Candles por dia segun timeframe""" mapping = { 'M1': 1440, 'M5': 288, 'M15': 96, 'M30': 48, 'H1': 24, 'H4': 6, 'D1': 1 } return mapping.get(timeframe, 288) def _get_timeframe_delta(self, timeframe: str) -> timedelta: """Delta de tiempo para un timeframe""" mapping = { 'M1': timedelta(minutes=1), 'M5': timedelta(minutes=5), 'M15': timedelta(minutes=15), 'M30': timedelta(minutes=30), 'H1': timedelta(hours=1), 'H4': timedelta(hours=4), 'D1': timedelta(days=1) } return mapping.get(timeframe, timedelta(minutes=5)) ``` ### Data Fetcher Service ```python # services/data_fetcher.py from typing import List, Dict, Optional from datetime import datetime, timedelta import asyncpg class DataFetcher: """ Servicio de descarga y almacenamiento de datos """ def __init__(self, api_client: APIMassiveClient, db_pool: asyncpg.Pool, config: Dict): self.api = api_client self.db = db_pool self.config = config async def full_sync( self, symbol: str, timeframe: str = 'M5', years: int = 10 ) -> Dict: """ Sincronizacion completa de datos historicos Args: symbol: Par de trading timeframe: Timeframe a descargar years: Anos de historia Returns: Resumen de sincronizacion """ print(f"\n=== Full Sync: {symbol} {timeframe} ===") end = datetime.utcnow() start = end - timedelta(days=years * 365) # Download all candles candles = await self.api.get_candles_batch(symbol, timeframe, start, end) print(f"Downloaded {len(candles)} candles") # Validate and clean valid_candles = self._validate_candles(candles) print(f"Valid candles: {len(valid_candles)}") # Store in database inserted = await self._store_candles(symbol, timeframe, valid_candles) print(f"Inserted {inserted} candles") # Update sync status await self._update_sync_status(symbol, timeframe, len(valid_candles)) return { 'symbol': symbol, 'timeframe': timeframe, 'downloaded': len(candles), 'valid': len(valid_candles), 'inserted': inserted, 'start': start.isoformat(), 'end': end.isoformat() } async def incremental_sync( self, symbol: str, timeframe: str = 'M5' ) -> Dict: """ Sincronizacion incremental desde ultima vela """ # Get last candle time last_time = await self._get_last_candle_time(symbol, timeframe) if not last_time: # No data, do full sync return await self.full_sync(symbol, timeframe) # Download from last candle to now end = datetime.utcnow() start = last_time + self.api._get_timeframe_delta(timeframe) if start >= end: return {'symbol': symbol, 'message': 'Already up to date'} candles = await self.api.get_candles_batch(symbol, timeframe, start, end) if candles: valid_candles = self._validate_candles(candles) inserted = await self._store_candles(symbol, timeframe, valid_candles) await self._update_sync_status(symbol, timeframe, inserted) return { 'symbol': symbol, 'timeframe': timeframe, 'new_candles': inserted, 'latest': candles[-1].timestamp.isoformat() if candles else None } return {'symbol': symbol, 'message': 'No new data'} def _validate_candles(self, candles: List[Candle]) -> List[Candle]: """Valida y filtra candles""" valid = [] for c in candles: # Basic validation if c.high < c.low: continue if c.open <= 0 or c.close <= 0: continue if c.high < max(c.open, c.close): continue if c.low > min(c.open, c.close): continue valid.append(c) return valid async def _store_candles( self, symbol: str, timeframe: str, candles: List[Candle] ) -> int: """Almacena candles en PostgreSQL""" if not candles: return 0 # Prepare data for bulk insert records = [ (symbol, timeframe, c.timestamp, c.open, c.high, c.low, c.close, c.volume, 'api_massive') for c in candles ] # Bulk upsert async with self.db.acquire() as conn: result = await conn.executemany(''' INSERT INTO market_data (symbol, timeframe, timestamp, open, high, low, close, volume, source) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (symbol, timeframe, timestamp) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume ''', records) return len(records) async def _get_last_candle_time(self, symbol: str, timeframe: str) -> Optional[datetime]: """Obtiene timestamp de la ultima vela""" async with self.db.acquire() as conn: result = await conn.fetchval(''' SELECT MAX(timestamp) FROM market_data WHERE symbol = $1 AND timeframe = $2 ''', symbol, timeframe) return result async def _update_sync_status(self, symbol: str, timeframe: str, candles_added: int): """Actualiza estado de sincronizacion""" async with self.db.acquire() as conn: await conn.execute(''' INSERT INTO sync_status (symbol, timeframe, last_sync_time, total_candles, status) VALUES ($1, $2, NOW(), $3, 'synced') ON CONFLICT (symbol, timeframe) DO UPDATE SET last_sync_time = NOW(), total_candles = sync_status.total_candles + $3, status = 'synced', updated_at = NOW() ''', symbol, timeframe, candles_added) ``` --- ## Gap Detection y Filling ### Gap Detector ```python # services/gap_handler.py from typing import List, Dict, Tuple from datetime import datetime, timedelta from dataclasses import dataclass @dataclass class DataGap: start: datetime end: datetime candles_missing: int duration_minutes: int class GapHandler: """ Detecta y rellena gaps en los datos """ def __init__(self, db_pool, api_client: APIMassiveClient, config: Dict): self.db = db_pool self.api = api_client self.config = config async def detect_gaps( self, symbol: str, timeframe: str, start: Optional[datetime] = None, end: Optional[datetime] = None ) -> List[DataGap]: """ Detecta gaps en los datos Un gap es definido como mas de N candles faltantes consecutivos donde N = config['min_gap_candles'] (default 3) """ min_gap_candles = self.config.get('min_gap_candles', 3) timeframe_delta = self._get_timeframe_delta(timeframe) # Get all timestamps async with self.db.acquire() as conn: query = ''' SELECT timestamp FROM market_data WHERE symbol = $1 AND timeframe = $2 ''' params = [symbol, timeframe] if start: query += ' AND timestamp >= $3' params.append(start) if end: query += f' AND timestamp <= ${len(params) + 1}' params.append(end) query += ' ORDER BY timestamp' rows = await conn.fetch(query, *params) if len(rows) < 2: return [] # Detect gaps gaps = [] timestamps = [row['timestamp'] for row in rows] for i in range(1, len(timestamps)): expected_delta = timeframe_delta actual_delta = timestamps[i] - timestamps[i-1] if actual_delta > expected_delta * min_gap_candles: candles_missing = int(actual_delta / timeframe_delta) - 1 gaps.append(DataGap( start=timestamps[i-1] + timeframe_delta, end=timestamps[i] - timeframe_delta, candles_missing=candles_missing, duration_minutes=int(actual_delta.total_seconds() / 60) )) # Log gaps to database for gap in gaps: await self._log_gap(symbol, timeframe, gap) return gaps async def fill_gaps( self, symbol: str, timeframe: str, max_gaps: int = 100 ) -> Dict: """ Intenta rellenar gaps detectados """ # Get pending gaps async with self.db.acquire() as conn: rows = await conn.fetch(''' SELECT * FROM data_gaps WHERE symbol = $1 AND timeframe = $2 AND status = 'detected' ORDER BY gap_start LIMIT $3 ''', symbol, timeframe, max_gaps) if not rows: return {'message': 'No gaps to fill', 'filled': 0} filled_count = 0 unfillable_count = 0 for row in rows: gap_start = row['gap_start'] gap_end = row['gap_end'] try: # Try to fetch missing data candles = await self.api.get_candles(symbol, timeframe, gap_start, gap_end) if candles: # Store candles await self._store_gap_candles(symbol, timeframe, candles) # Mark gap as filled await self._update_gap_status(row['id'], 'filled', len(candles)) filled_count += 1 else: # No data available (likely market was closed) await self._update_gap_status(row['id'], 'unfillable', 0) unfillable_count += 1 except Exception as e: print(f"Error filling gap {row['id']}: {e}") continue return { 'gaps_processed': len(rows), 'filled': filled_count, 'unfillable': unfillable_count } async def _log_gap(self, symbol: str, timeframe: str, gap: DataGap): """Registra gap en la base de datos""" async with self.db.acquire() as conn: await conn.execute(''' INSERT INTO data_gaps (symbol, timeframe, gap_start, gap_end, candles_missing, status) VALUES ($1, $2, $3, $4, $5, 'detected') ON CONFLICT (symbol, timeframe, gap_start) DO UPDATE SET gap_end = EXCLUDED.gap_end, candles_missing = EXCLUDED.candles_missing ''', symbol, timeframe, gap.start, gap.end, gap.candles_missing) async def _update_gap_status(self, gap_id: int, status: str, candles_filled: int): """Actualiza estado de un gap""" async with self.db.acquire() as conn: await conn.execute(''' UPDATE data_gaps SET status = $2, filled_at = NOW() WHERE id = $1 ''', gap_id, status) async def _store_gap_candles(self, symbol: str, timeframe: str, candles: List[Candle]): """Almacena candles de gap""" records = [ (symbol, timeframe, c.timestamp, c.open, c.high, c.low, c.close, c.volume, 'gap_fill') for c in candles ] async with self.db.acquire() as conn: await conn.executemany(''' INSERT INTO market_data (symbol, timeframe, timestamp, open, high, low, close, volume, source) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (symbol, timeframe, timestamp) DO NOTHING ''', records) def _get_timeframe_delta(self, timeframe: str) -> timedelta: mapping = { 'M1': timedelta(minutes=1), 'M5': timedelta(minutes=5), 'M15': timedelta(minutes=15), 'M30': timedelta(minutes=30), 'H1': timedelta(hours=1), 'H4': timedelta(hours=4), 'D1': timedelta(days=1) } return mapping.get(timeframe, timedelta(minutes=5)) ``` --- ## Sincronizacion con MT4 ### Price Comparison Service ```python # services/price_sync.py from typing import Dict, List, Optional from datetime import datetime, timedelta class PriceSync: """ Sincroniza precios entre API Massive y brokers MT4 """ def __init__(self, db_pool, metaapi_client, config: Dict): self.db = db_pool self.mt4 = metaapi_client self.config = config async def compare_prices( self, symbol: str, account_id: str, window_minutes: int = 60 ) -> Dict: """ Compara precios de API Massive vs MT4 Returns: { 'avg_difference': float, 'max_difference': float, 'std_difference': float, 'samples': int } """ end = datetime.utcnow() start = end - timedelta(minutes=window_minutes) # Get API Massive data async with self.db.acquire() as conn: api_data = await conn.fetch(''' SELECT timestamp, close FROM market_data WHERE symbol = $1 AND timeframe = 'M5' AND timestamp BETWEEN $2 AND $3 ORDER BY timestamp ''', symbol, start, end) if not api_data: return {'error': 'No API data available'} # Get MT4 current price mt4_price = await self.mt4.get_symbol_price(account_id, symbol) # Calculate differences differences = [] for row in api_data: diff = abs(row['close'] - mt4_price['bid']) differences.append(diff) import numpy as np return { 'avg_difference': np.mean(differences), 'max_difference': np.max(differences), 'std_difference': np.std(differences), 'samples': len(differences), 'api_latest': api_data[-1]['close'], 'mt4_bid': mt4_price['bid'], 'mt4_ask': mt4_price['ask'] } async def calculate_broker_offset( self, symbol: str, account_id: str, samples: int = 100 ) -> Dict: """ Calcula offset promedio entre API y broker Util para ajustar precios en ejecucion """ offsets = [] for _ in range(samples): # Get current prices mt4_price = await self.mt4.get_symbol_price(account_id, symbol) # Get latest API candle async with self.db.acquire() as conn: api_close = await conn.fetchval(''' SELECT close FROM market_data WHERE symbol = $1 AND timeframe = 'M5' ORDER BY timestamp DESC LIMIT 1 ''', symbol) if api_close: mid_price = (mt4_price['bid'] + mt4_price['ask']) / 2 offset = mid_price - float(api_close) offsets.append(offset) await asyncio.sleep(1) # 1 sample per second import numpy as np return { 'symbol': symbol, 'broker': account_id, 'avg_offset': np.mean(offsets), 'std_offset': np.std(offsets), 'samples': len(offsets), 'recommendation': { 'use_offset': abs(np.mean(offsets)) > 0.5, # > 0.5 pips 'offset_value': np.mean(offsets) } } ``` --- ## API Endpoints ### Data Service API ```python # api/data_api.py from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from typing import List, Optional from datetime import datetime app = FastAPI(title="Trading Platform Data Service") class SyncRequest(BaseModel): symbol: str timeframe: str = "M5" full_sync: bool = False class DataQuery(BaseModel): symbol: str timeframe: str = "M5" start: datetime end: datetime limit: int = 10000 @app.post("/api/data/sync") async def sync_data(request: SyncRequest, background_tasks: BackgroundTasks): """Inicia sincronizacion de datos""" if request.full_sync: background_tasks.add_task( data_fetcher.full_sync, request.symbol, request.timeframe ) else: background_tasks.add_task( data_fetcher.incremental_sync, request.symbol, request.timeframe ) return {"message": f"Sync started for {request.symbol}", "full_sync": request.full_sync} @app.get("/api/data/candles") async def get_candles(query: DataQuery): """Obtiene candles de la base de datos""" async with db_pool.acquire() as conn: rows = await conn.fetch(''' SELECT timestamp, open, high, low, close, volume FROM market_data WHERE symbol = $1 AND timeframe = $2 AND timestamp BETWEEN $3 AND $4 ORDER BY timestamp LIMIT $5 ''', query.symbol, query.timeframe, query.start, query.end, query.limit) return { "symbol": query.symbol, "timeframe": query.timeframe, "count": len(rows), "candles": [dict(row) for row in rows] } @app.get("/api/data/status") async def get_sync_status(): """Estado de sincronizacion de todos los simbolos""" async with db_pool.acquire() as conn: rows = await conn.fetch('SELECT * FROM sync_status ORDER BY symbol') return {"status": [dict(row) for row in rows]} @app.get("/api/data/gaps/{symbol}") async def get_gaps(symbol: str, timeframe: str = "M5"): """Lista gaps detectados""" async with db_pool.acquire() as conn: rows = await conn.fetch(''' SELECT * FROM data_gaps WHERE symbol = $1 AND timeframe = $2 ORDER BY gap_start ''', symbol, timeframe) return {"gaps": [dict(row) for row in rows]} @app.post("/api/data/gaps/detect") async def detect_gaps(symbol: str, timeframe: str = "M5"): """Detecta gaps en los datos""" gaps = await gap_handler.detect_gaps(symbol, timeframe) return { "symbol": symbol, "timeframe": timeframe, "gaps_found": len(gaps), "gaps": [ { "start": g.start.isoformat(), "end": g.end.isoformat(), "missing": g.candles_missing } for g in gaps ] } @app.post("/api/data/gaps/fill") async def fill_gaps(symbol: str, timeframe: str = "M5", background_tasks: BackgroundTasks): """Intenta rellenar gaps""" background_tasks.add_task(gap_handler.fill_gaps, symbol, timeframe) return {"message": f"Gap filling started for {symbol}"} @app.get("/api/data/health") async def health(): """Health check""" return {"status": "healthy"} ``` --- ## Scheduling ### Celery Tasks ```python # tasks/data_tasks.py from celery import Celery from celery.schedules import crontab celery_app = Celery('trading_data') celery_app.conf.beat_schedule = { # Full sync once a day at 4 AM UTC 'full-sync-daily': { 'task': 'tasks.full_sync_all', 'schedule': crontab(hour=4, minute=0), }, # Incremental sync every 5 minutes 'incremental-sync': { 'task': 'tasks.incremental_sync_all', 'schedule': crontab(minute='*/5'), }, # Gap detection daily 'detect-gaps': { 'task': 'tasks.detect_gaps_all', 'schedule': crontab(hour=5, minute=0), }, # Gap filling daily 'fill-gaps': { 'task': 'tasks.fill_gaps_all', 'schedule': crontab(hour=6, minute=0), }, } @celery_app.task def full_sync_all(): """Full sync de todos los simbolos""" symbols = ['XAUUSD', 'EURUSD', 'GBPUSD', 'USDJPY'] for symbol in symbols: full_sync_symbol.delay(symbol) @celery_app.task def full_sync_symbol(symbol: str, timeframe: str = 'M5'): """Full sync de un simbolo""" import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(data_fetcher.full_sync(symbol, timeframe)) @celery_app.task def incremental_sync_all(): """Incremental sync de todos los simbolos""" symbols = ['XAUUSD', 'EURUSD', 'GBPUSD', 'USDJPY'] for symbol in symbols: incremental_sync_symbol.delay(symbol) @celery_app.task def incremental_sync_symbol(symbol: str, timeframe: str = 'M5'): """Incremental sync de un simbolo""" import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(data_fetcher.incremental_sync(symbol, timeframe)) @celery_app.task def detect_gaps_all(): """Detecta gaps en todos los simbolos""" symbols = ['XAUUSD', 'EURUSD', 'GBPUSD', 'USDJPY'] for symbol in symbols: import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(gap_handler.detect_gaps(symbol, 'M5')) @celery_app.task def fill_gaps_all(): """Rellena gaps de todos los simbolos""" symbols = ['XAUUSD', 'EURUSD', 'GBPUSD', 'USDJPY'] for symbol in symbols: import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(gap_handler.fill_gaps(symbol, 'M5')) ``` --- ## Implementacion ### Docker Compose ```yaml # docker-compose.data.yaml version: '3.8' services: # ========================================================= # NOTA: Este servicio usa la instancia compartida de PostgreSQL # del workspace (puerto 5438). NO crear instancia propia. # Ver: /home/isem/workspace/core/devtools/environment/DEVENV-PORTS.md # ========================================================= data-service: build: context: . dockerfile: Dockerfile.data container_name: trading-data ports: - "3604:3604" # Data service (base 3600 + 4) environment: - API_MASSIVE_KEY=${API_MASSIVE_KEY} - DATABASE_URL=postgresql://trading_user:trading_dev_2025@postgres:5438/trading_data - REDIS_URL=redis://redis:6385 - SERVICE_PORT=3604 depends_on: - redis networks: - trading-network restart: unless-stopped celery-worker: build: context: . dockerfile: Dockerfile.data container_name: trading-celery command: celery -A tasks worker -l info environment: - API_MASSIVE_KEY=${API_MASSIVE_KEY} - DATABASE_URL=postgresql://trading_user:trading_dev_2025@postgres:5438/trading_data - REDIS_URL=redis://redis:6385 networks: - trading-network restart: unless-stopped celery-beat: build: context: . dockerfile: Dockerfile.data container_name: trading-beat command: celery -A tasks beat -l info environment: - REDIS_URL=redis://redis:6385 depends_on: - redis networks: - trading-network restart: unless-stopped # Redis dedicado para trading-platform (puerto 6385) redis: image: redis:7-alpine container_name: trading-redis ports: - "6385:6379" volumes: - redis_data:/data restart: unless-stopped networks: trading-network: external: true # Red compartida del workspace volumes: redis_data: ``` --- **Documento Generado:** 2025-12-08 **Trading Strategist - Trading Platform**