trading-platform/docs/02-definicion-modulos/OQI-006-ml-signals/implementacion/DATA-PIPELINE-SPEC.md
Adrian Flores Cortes 7bfcbb978e docs: Add OQI-006 DATA-PIPELINE-SPEC.md and ML-TRAINING-ENHANCEMENT task docs
- Added DATA-PIPELINE-SPEC.md for ML signals module
- Added TASK-2026-01-25-ML-TRAINING-ENHANCEMENT documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 14:32:37 -06:00

6.0 KiB

DATA-PIPELINE-SPEC: Especificación del Pipeline de Datos ML

Versión: 1.0.0 Fecha: 2026-01-25 Estado: IMPLEMENTADO Tarea: TASK-2026-01-25-ML-TRAINING-ENHANCEMENT (Fase 1.1)


1. RESUMEN

El módulo de Data Pipeline proporciona carga eficiente de datos, datasets de PyTorch y validación de calidad para el entrenamiento de modelos ML.


2. COMPONENTES IMPLEMENTADOS

2.1 TrainingDataLoader (src/data/training_loader.py)

from data import TrainingDataLoader

loader = TrainingDataLoader()

# Cargar datos por símbolo y rango de fechas
df = loader.get_training_data('XAUUSD', '2023-01-01', '2024-12-31', '5m')

# Streaming para datasets grandes
for batch in loader.stream_training_data('XAUUSD', '5m', batch_size=10000):
    process(batch)

# Obtener features y targets listos para ML
X, y = loader.get_features_and_targets('XAUUSD', '5m', target_horizon=12)

Métodos principales:

Método Descripción
get_training_data() Carga datos en batches
stream_training_data() Streaming memory-efficient
get_features_and_targets() Features X y targets y
get_multi_symbol_data() Múltiples símbolos

2.2 TradingDataset (src/data/dataset.py)

from data import TradingDataset, DatasetConfig

config = DatasetConfig(
    sequence_length=60,
    target_horizon=12,
    features=['returns', 'volatility', 'volume_ratio'],
    normalize=True
)

dataset = TradingDataset(df, config)
dataloader = dataset.create_dataloader(batch_size=32, shuffle=True)

for features, targets in dataloader:
    # features: (batch, seq_len, n_features)
    # targets: (batch, target_dim)
    pass

Características:

  • Sequence length configurable
  • Normalización automática (z-score, min-max, robust)
  • Generación de features de retornos
  • Compatible con PyTorch DataLoader

2.3 DataValidator (src/data/validators.py)

from data import DataValidator, validate_trading_data

validator = DataValidator()

# Validaciones individuales
gaps = validator.validate_gaps(df)
outliers = validator.validate_outliers(df, ['close', 'volume'])
consistency = validator.validate_consistency(df)

# Reporte completo
report = validate_trading_data(df, 'XAUUSD', '5min')
if not report.is_valid:
    for issue in report.issues:
        print(f"[{issue.severity}] {issue.message}")

Validaciones:

Validación Descripción
validate_gaps() Detecta gaps temporales
validate_outliers() Outliers estadísticos
validate_consistency() Integridad OHLC
validate_missing_values() Valores faltantes

3. CONFIGURACIÓN

3.1 PostgreSQL

# config/database.yaml
host: localhost
port: 5432
database: trading_platform
user: trading_user
password: trading_dev_2026
schema: market_data
tables:
  - ohlcv_5m
  - ohlcv_15m
  - ohlcv_historical  # Datos migrados

3.2 DatasetConfig

@dataclass
class DatasetConfig:
    sequence_length: int = 60
    target_horizon: int = 12
    features: List[str] = field(default_factory=lambda: [
        'returns_1', 'returns_5', 'returns_10', 'returns_20',
        'volatility', 'volume_ratio', 'range_pct'
    ])
    normalize: bool = True
    normalization_method: str = 'zscore'  # 'zscore', 'minmax', 'robust'

4. MIGRACIÓN DE DATOS HISTÓRICOS

4.1 Script de Migración

# Ubicación
apps/data-service/scripts/migrate_historical_data.py

# Uso
python migrate_historical_data.py --dry-run --limit 100  # Test
python migrate_historical_data.py --file db.sql          # Migrar
python migrate_historical_data.py --batch-size 5000      # Custom batch

4.2 Datos Disponibles

Fuente Registros Período Tickers
db.sql ~12.6M 2015-2023 17 Forex + XAUUSD
db_res.sql ~12.6M 2015-2023 + Indicadores

4.3 Schema Destino

CREATE TABLE market_data.ohlcv_historical (
    id SERIAL PRIMARY KEY,
    ticker VARCHAR(20) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    open DOUBLE PRECISION,
    high DOUBLE PRECISION,
    low DOUBLE PRECISION,
    close DOUBLE PRECISION,
    volume DOUBLE PRECISION,
    vwap DOUBLE PRECISION,
    -- Indicadores técnicos
    macd DOUBLE PRECISION,
    macd_signal DOUBLE PRECISION,
    macd_hist DOUBLE PRECISION,
    sma_10 DOUBLE PRECISION,
    sma_20 DOUBLE PRECISION,
    atr DOUBLE PRECISION,
    rsi DOUBLE PRECISION,
    -- ... más indicadores
    source VARCHAR(20) DEFAULT 'legacy_mysql'
);

5. INTEGRACIÓN CON ML ENGINE

5.1 Flujo de Datos

PostgreSQL (market_data)
       ↓
TrainingDataLoader.get_training_data()
       ↓
DataValidator.validate()
       ↓
TradingDataset.__init__()
       ↓
DataLoader (PyTorch)
       ↓
Model Training

5.2 Uso en Pipelines

# En pipelines/phase2_pipeline.py
from data import TrainingDataLoader, TradingDataset, DatasetConfig

loader = TrainingDataLoader()
df = loader.get_training_data(symbol, start_date, end_date, timeframe)

config = DatasetConfig(sequence_length=100)
dataset = TradingDataset(df, config)

# Train model
for batch in dataset.create_dataloader(batch_size=256):
    features, targets = batch
    predictions = model(features)
    loss = criterion(predictions, targets)

6. ARCHIVOS CREADOS

Archivo Líneas Propósito
src/data/training_loader.py ~300 Carga de datos
src/data/dataset.py ~250 Datasets PyTorch
src/data/validators.py ~200 Validación
src/data/__init__.py ~50 Exports
scripts/migrate_historical_data.py ~400 Migración

7. DEPENDENCIAS

# requirements.txt (agregadas)
torch>=2.0.0
pandas>=2.0.0
psycopg2-binary>=2.9.9
tqdm>=4.66.0

8. PRÓXIMOS PASOS

  1. Implementar TrainingDataLoader
  2. Implementar TradingDataset
  3. Implementar DataValidator
  4. Crear script de migración
  5. Ejecutar migración de datos históricos
  6. Validar datos migrados

Estado: IMPLEMENTADO (Fase 1.1 completada)