# DATA-PIPELINE-SPEC: Especificación del Pipeline de Datos ML **Versión:** 1.0.0 **Fecha:** 2026-01-25 **Estado:** IMPLEMENTADO **Tarea:** TASK-2026-01-25-ML-TRAINING-ENHANCEMENT (Fase 1.1) --- ## 1. RESUMEN El módulo de Data Pipeline proporciona carga eficiente de datos, datasets de PyTorch y validación de calidad para el entrenamiento de modelos ML. --- ## 2. COMPONENTES IMPLEMENTADOS ### 2.1 TrainingDataLoader (`src/data/training_loader.py`) ```python from data import TrainingDataLoader loader = TrainingDataLoader() # Cargar datos por símbolo y rango de fechas df = loader.get_training_data('XAUUSD', '2023-01-01', '2024-12-31', '5m') # Streaming para datasets grandes for batch in loader.stream_training_data('XAUUSD', '5m', batch_size=10000): process(batch) # Obtener features y targets listos para ML X, y = loader.get_features_and_targets('XAUUSD', '5m', target_horizon=12) ``` **Métodos principales:** | Método | Descripción | |--------|-------------| | `get_training_data()` | Carga datos en batches | | `stream_training_data()` | Streaming memory-efficient | | `get_features_and_targets()` | Features X y targets y | | `get_multi_symbol_data()` | Múltiples símbolos | ### 2.2 TradingDataset (`src/data/dataset.py`) ```python from data import TradingDataset, DatasetConfig config = DatasetConfig( sequence_length=60, target_horizon=12, features=['returns', 'volatility', 'volume_ratio'], normalize=True ) dataset = TradingDataset(df, config) dataloader = dataset.create_dataloader(batch_size=32, shuffle=True) for features, targets in dataloader: # features: (batch, seq_len, n_features) # targets: (batch, target_dim) pass ``` **Características:** - Sequence length configurable - Normalización automática (z-score, min-max, robust) - Generación de features de retornos - Compatible con PyTorch DataLoader ### 2.3 DataValidator (`src/data/validators.py`) ```python from data import DataValidator, validate_trading_data validator = DataValidator() # Validaciones individuales gaps = validator.validate_gaps(df) outliers = validator.validate_outliers(df, ['close', 'volume']) consistency = validator.validate_consistency(df) # Reporte completo report = validate_trading_data(df, 'XAUUSD', '5min') if not report.is_valid: for issue in report.issues: print(f"[{issue.severity}] {issue.message}") ``` **Validaciones:** | Validación | Descripción | |------------|-------------| | `validate_gaps()` | Detecta gaps temporales | | `validate_outliers()` | Outliers estadísticos | | `validate_consistency()` | Integridad OHLC | | `validate_missing_values()` | Valores faltantes | --- ## 3. CONFIGURACIÓN ### 3.1 PostgreSQL ```yaml # config/database.yaml host: localhost port: 5432 database: trading_platform user: trading_user password: trading_dev_2026 schema: market_data tables: - ohlcv_5m - ohlcv_15m - ohlcv_historical # Datos migrados ``` ### 3.2 DatasetConfig ```python @dataclass class DatasetConfig: sequence_length: int = 60 target_horizon: int = 12 features: List[str] = field(default_factory=lambda: [ 'returns_1', 'returns_5', 'returns_10', 'returns_20', 'volatility', 'volume_ratio', 'range_pct' ]) normalize: bool = True normalization_method: str = 'zscore' # 'zscore', 'minmax', 'robust' ``` --- ## 4. MIGRACIÓN DE DATOS HISTÓRICOS ### 4.1 Script de Migración ```bash # Ubicación apps/data-service/scripts/migrate_historical_data.py # Uso python migrate_historical_data.py --dry-run --limit 100 # Test python migrate_historical_data.py --file db.sql # Migrar python migrate_historical_data.py --batch-size 5000 # Custom batch ``` ### 4.2 Datos Disponibles | Fuente | Registros | Período | Tickers | |--------|-----------|---------|---------| | db.sql | ~12.6M | 2015-2023 | 17 Forex + XAUUSD | | db_res.sql | ~12.6M | 2015-2023 | + Indicadores | ### 4.3 Schema Destino ```sql CREATE TABLE market_data.ohlcv_historical ( id SERIAL PRIMARY KEY, ticker VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, open DOUBLE PRECISION, high DOUBLE PRECISION, low DOUBLE PRECISION, close DOUBLE PRECISION, volume DOUBLE PRECISION, vwap DOUBLE PRECISION, -- Indicadores técnicos macd DOUBLE PRECISION, macd_signal DOUBLE PRECISION, macd_hist DOUBLE PRECISION, sma_10 DOUBLE PRECISION, sma_20 DOUBLE PRECISION, atr DOUBLE PRECISION, rsi DOUBLE PRECISION, -- ... más indicadores source VARCHAR(20) DEFAULT 'legacy_mysql' ); ``` --- ## 5. INTEGRACIÓN CON ML ENGINE ### 5.1 Flujo de Datos ``` PostgreSQL (market_data) ↓ TrainingDataLoader.get_training_data() ↓ DataValidator.validate() ↓ TradingDataset.__init__() ↓ DataLoader (PyTorch) ↓ Model Training ``` ### 5.2 Uso en Pipelines ```python # En pipelines/phase2_pipeline.py from data import TrainingDataLoader, TradingDataset, DatasetConfig loader = TrainingDataLoader() df = loader.get_training_data(symbol, start_date, end_date, timeframe) config = DatasetConfig(sequence_length=100) dataset = TradingDataset(df, config) # Train model for batch in dataset.create_dataloader(batch_size=256): features, targets = batch predictions = model(features) loss = criterion(predictions, targets) ``` --- ## 6. ARCHIVOS CREADOS | Archivo | Líneas | Propósito | |---------|--------|-----------| | `src/data/training_loader.py` | ~300 | Carga de datos | | `src/data/dataset.py` | ~250 | Datasets PyTorch | | `src/data/validators.py` | ~200 | Validación | | `src/data/__init__.py` | ~50 | Exports | | `scripts/migrate_historical_data.py` | ~400 | Migración | --- ## 7. DEPENDENCIAS ``` # requirements.txt (agregadas) torch>=2.0.0 pandas>=2.0.0 psycopg2-binary>=2.9.9 tqdm>=4.66.0 ``` --- ## 8. PRÓXIMOS PASOS 1. ✅ Implementar TrainingDataLoader 2. ✅ Implementar TradingDataset 3. ✅ Implementar DataValidator 4. ✅ Crear script de migración 5. ⏳ Ejecutar migración de datos históricos 6. ⏳ Validar datos migrados --- **Estado:** IMPLEMENTADO (Fase 1.1 completada)