trading-platform/docs/02-definicion-modulos/OQI-006-ml-signals/implementacion/DATA-PIPELINE-SPEC.md
Adrian Flores Cortes 7bfcbb978e docs: Add OQI-006 DATA-PIPELINE-SPEC.md and ML-TRAINING-ENHANCEMENT task docs
- Added DATA-PIPELINE-SPEC.md for ML signals module
- Added TASK-2026-01-25-ML-TRAINING-ENHANCEMENT documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 14:32:37 -06:00

259 lines
6.0 KiB
Markdown

# DATA-PIPELINE-SPEC: Especificación del Pipeline de Datos ML
**Versión:** 1.0.0
**Fecha:** 2026-01-25
**Estado:** IMPLEMENTADO
**Tarea:** TASK-2026-01-25-ML-TRAINING-ENHANCEMENT (Fase 1.1)
---
## 1. RESUMEN
El módulo de Data Pipeline proporciona carga eficiente de datos, datasets de PyTorch y validación de calidad para el entrenamiento de modelos ML.
---
## 2. COMPONENTES IMPLEMENTADOS
### 2.1 TrainingDataLoader (`src/data/training_loader.py`)
```python
from data import TrainingDataLoader
loader = TrainingDataLoader()
# Cargar datos por símbolo y rango de fechas
df = loader.get_training_data('XAUUSD', '2023-01-01', '2024-12-31', '5m')
# Streaming para datasets grandes
for batch in loader.stream_training_data('XAUUSD', '5m', batch_size=10000):
process(batch)
# Obtener features y targets listos para ML
X, y = loader.get_features_and_targets('XAUUSD', '5m', target_horizon=12)
```
**Métodos principales:**
| Método | Descripción |
|--------|-------------|
| `get_training_data()` | Carga datos en batches |
| `stream_training_data()` | Streaming memory-efficient |
| `get_features_and_targets()` | Features X y targets y |
| `get_multi_symbol_data()` | Múltiples símbolos |
### 2.2 TradingDataset (`src/data/dataset.py`)
```python
from data import TradingDataset, DatasetConfig
config = DatasetConfig(
sequence_length=60,
target_horizon=12,
features=['returns', 'volatility', 'volume_ratio'],
normalize=True
)
dataset = TradingDataset(df, config)
dataloader = dataset.create_dataloader(batch_size=32, shuffle=True)
for features, targets in dataloader:
# features: (batch, seq_len, n_features)
# targets: (batch, target_dim)
pass
```
**Características:**
- Sequence length configurable
- Normalización automática (z-score, min-max, robust)
- Generación de features de retornos
- Compatible con PyTorch DataLoader
### 2.3 DataValidator (`src/data/validators.py`)
```python
from data import DataValidator, validate_trading_data
validator = DataValidator()
# Validaciones individuales
gaps = validator.validate_gaps(df)
outliers = validator.validate_outliers(df, ['close', 'volume'])
consistency = validator.validate_consistency(df)
# Reporte completo
report = validate_trading_data(df, 'XAUUSD', '5min')
if not report.is_valid:
for issue in report.issues:
print(f"[{issue.severity}] {issue.message}")
```
**Validaciones:**
| Validación | Descripción |
|------------|-------------|
| `validate_gaps()` | Detecta gaps temporales |
| `validate_outliers()` | Outliers estadísticos |
| `validate_consistency()` | Integridad OHLC |
| `validate_missing_values()` | Valores faltantes |
---
## 3. CONFIGURACIÓN
### 3.1 PostgreSQL
```yaml
# config/database.yaml
host: localhost
port: 5432
database: trading_platform
user: trading_user
password: trading_dev_2026
schema: market_data
tables:
- ohlcv_5m
- ohlcv_15m
- ohlcv_historical # Datos migrados
```
### 3.2 DatasetConfig
```python
@dataclass
class DatasetConfig:
sequence_length: int = 60
target_horizon: int = 12
features: List[str] = field(default_factory=lambda: [
'returns_1', 'returns_5', 'returns_10', 'returns_20',
'volatility', 'volume_ratio', 'range_pct'
])
normalize: bool = True
normalization_method: str = 'zscore' # 'zscore', 'minmax', 'robust'
```
---
## 4. MIGRACIÓN DE DATOS HISTÓRICOS
### 4.1 Script de Migración
```bash
# Ubicación
apps/data-service/scripts/migrate_historical_data.py
# Uso
python migrate_historical_data.py --dry-run --limit 100 # Test
python migrate_historical_data.py --file db.sql # Migrar
python migrate_historical_data.py --batch-size 5000 # Custom batch
```
### 4.2 Datos Disponibles
| Fuente | Registros | Período | Tickers |
|--------|-----------|---------|---------|
| db.sql | ~12.6M | 2015-2023 | 17 Forex + XAUUSD |
| db_res.sql | ~12.6M | 2015-2023 | + Indicadores |
### 4.3 Schema Destino
```sql
CREATE TABLE market_data.ohlcv_historical (
id SERIAL PRIMARY KEY,
ticker VARCHAR(20) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
open DOUBLE PRECISION,
high DOUBLE PRECISION,
low DOUBLE PRECISION,
close DOUBLE PRECISION,
volume DOUBLE PRECISION,
vwap DOUBLE PRECISION,
-- Indicadores técnicos
macd DOUBLE PRECISION,
macd_signal DOUBLE PRECISION,
macd_hist DOUBLE PRECISION,
sma_10 DOUBLE PRECISION,
sma_20 DOUBLE PRECISION,
atr DOUBLE PRECISION,
rsi DOUBLE PRECISION,
-- ... más indicadores
source VARCHAR(20) DEFAULT 'legacy_mysql'
);
```
---
## 5. INTEGRACIÓN CON ML ENGINE
### 5.1 Flujo de Datos
```
PostgreSQL (market_data)
TrainingDataLoader.get_training_data()
DataValidator.validate()
TradingDataset.__init__()
DataLoader (PyTorch)
Model Training
```
### 5.2 Uso en Pipelines
```python
# En pipelines/phase2_pipeline.py
from data import TrainingDataLoader, TradingDataset, DatasetConfig
loader = TrainingDataLoader()
df = loader.get_training_data(symbol, start_date, end_date, timeframe)
config = DatasetConfig(sequence_length=100)
dataset = TradingDataset(df, config)
# Train model
for batch in dataset.create_dataloader(batch_size=256):
features, targets = batch
predictions = model(features)
loss = criterion(predictions, targets)
```
---
## 6. ARCHIVOS CREADOS
| Archivo | Líneas | Propósito |
|---------|--------|-----------|
| `src/data/training_loader.py` | ~300 | Carga de datos |
| `src/data/dataset.py` | ~250 | Datasets PyTorch |
| `src/data/validators.py` | ~200 | Validación |
| `src/data/__init__.py` | ~50 | Exports |
| `scripts/migrate_historical_data.py` | ~400 | Migración |
---
## 7. DEPENDENCIAS
```
# requirements.txt (agregadas)
torch>=2.0.0
pandas>=2.0.0
psycopg2-binary>=2.9.9
tqdm>=4.66.0
```
---
## 8. PRÓXIMOS PASOS
1. ✅ Implementar TrainingDataLoader
2. ✅ Implementar TradingDataset
3. ✅ Implementar DataValidator
4. ✅ Crear script de migración
5. ⏳ Ejecutar migración de datos históricos
6. ⏳ Validar datos migrados
---
**Estado:** IMPLEMENTADO (Fase 1.1 completada)