feat: Integrate Level 0 attention models into prediction pipeline

- Add AttentionProvider service to load and serve attention scores
- Integrate attention scoring into PredictionService.generate_signal()
- Add AttentionInfo dataclass to TradingSignal
- Boost confidence by 15% when high flow detected
- Add /api/attention/{symbol} endpoint for direct attention queries
- Add /api/attention/models endpoint to list loaded models
- Add attention field to SignalResponse API model

This completes the L0→L1 integration of the hierarchical ML architecture.
Attention models identify high-flow market moments to improve signal quality.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Adrian Flores Cortes 2026-01-25 06:57:59 -06:00
parent 475e913e3c
commit b0b4a712eb
3 changed files with 520 additions and 2 deletions

View File

@ -87,6 +87,15 @@ class TPSLPredictionResponse(BaseModel):
calibrated: bool
class AttentionInfoResponse(BaseModel):
"""Attention score info from Level 0 models"""
attention_score: float = Field(..., description="Market flow score (1.0=avg, >2.0=high)")
flow_class: int = Field(..., description="Flow class: 0=low, 1=medium, 2=high")
flow_label: str = Field(..., description="Flow label: low_flow, medium_flow, high_flow")
is_high_flow: bool = Field(..., description="True if flow_class == 2")
is_tradeable: bool = Field(..., description="True if attention_score >= 1.0")
class SignalResponse(BaseModel):
"""Trading signal response"""
signal_id: str
@ -101,6 +110,9 @@ class SignalResponse(BaseModel):
amd_phase: AMDPhaseEnum
volatility_regime: VolatilityRegimeEnum
range_prediction: RangePredictionResponse
attention: Optional[AttentionInfoResponse] = Field(
None, description="Level 0 attention score info"
)
timestamp: datetime
valid_until: datetime
metadata: Optional[Dict[str, Any]] = None
@ -339,6 +351,17 @@ async def generate_signal(
ServiceVolatilityRegime.EXTREME: VolatilityRegimeEnum.extreme
}
# Convert attention info if present
attention_response = None
if signal.attention:
attention_response = AttentionInfoResponse(
attention_score=signal.attention.attention_score,
flow_class=signal.attention.flow_class,
flow_label=signal.attention.flow_label,
is_high_flow=signal.attention.is_high_flow,
is_tradeable=signal.attention.is_tradeable
)
return SignalResponse(
signal_id=signal.signal_id,
symbol=signal.symbol,
@ -360,6 +383,7 @@ async def generate_signal(
confidence_high=signal.range_prediction.confidence_high,
confidence_low=signal.range_prediction.confidence_low
),
attention=attention_response,
timestamp=signal.timestamp,
valid_until=signal.valid_until,
metadata=signal.metadata
@ -376,6 +400,77 @@ async def list_symbols():
return ["XAUUSD", "EURUSD", "GBPUSD", "USDJPY", "BTCUSD", "ETHUSD"]
# Attention Score endpoint
@app.get("/api/attention/{symbol}", response_model=AttentionInfoResponse, tags=["Attention"])
async def get_attention_score(
symbol: str,
timeframe: TimeframeEnum = Query(default=TimeframeEnum.m5)
):
"""
Get attention score for a symbol.
Attention scores indicate market flow intensity:
- < 1.0: Low flow (below average movement expected)
- 1.0 - 2.0: Medium flow (average conditions)
- > 2.0: High flow (above average movement expected)
Use this to filter trades: only trade when attention_score >= 1.0
"""
global prediction_service
if prediction_service is None:
prediction_service = get_prediction_service()
try:
attention = await prediction_service.get_attention_info(
symbol=symbol.upper(),
timeframe=timeframe.value
)
if attention is None:
# Return default values if no attention model available
return AttentionInfoResponse(
attention_score=1.0,
flow_class=1,
flow_label="medium_flow",
is_high_flow=False,
is_tradeable=True
)
return AttentionInfoResponse(
attention_score=attention.attention_score,
flow_class=attention.flow_class,
flow_label=attention.flow_label,
is_high_flow=attention.is_high_flow,
is_tradeable=attention.is_tradeable
)
except Exception as e:
logger.error(f"Attention score failed for {symbol}: {e}")
raise HTTPException(status_code=500, detail=f"Attention score failed: {str(e)}")
@app.get("/api/attention/models", tags=["Attention"])
async def list_attention_models():
"""List available attention models"""
global prediction_service
if prediction_service is None:
prediction_service = get_prediction_service()
if prediction_service._attention_provider and prediction_service._attention_provider.is_loaded:
return {
"loaded": True,
"models": prediction_service._attention_provider.available_models,
"count": len(prediction_service._attention_provider.models)
}
return {
"loaded": False,
"models": [],
"count": 0
}
# Active signals endpoint - GET version for easy consumption
class ActiveSignalsResponse(BaseModel):
"""Response with active signals for all symbols"""

View File

@ -0,0 +1,317 @@
"""
Attention Score Provider
========================
Loads and serves attention scores from trained Level 0 models.
These scores are used as features (F51, F52) in Level 1 predictions.
Author: ML Pipeline
Version: 1.0.0
Created: 2026-01-25
"""
import sys
from pathlib import Path
from typing import Dict, Optional, Tuple
import numpy as np
import pandas as pd
from loguru import logger
import joblib
# Import attention model classes
import importlib.util
_models_dir = Path(__file__).parent.parent / 'models'
_attention_module_path = _models_dir / 'attention_score_model.py'
# Use consistent module name for joblib pickle compatibility
_module_name = "models.attention_score_model"
if _module_name not in sys.modules and _attention_module_path.exists():
spec = importlib.util.spec_from_file_location(_module_name, _attention_module_path)
attention_module = importlib.util.module_from_spec(spec)
sys.modules[_module_name] = attention_module
spec.loader.exec_module(attention_module)
AttentionScoreModel = attention_module.AttentionScoreModel
HAS_ATTENTION_MODEL = True
elif _module_name in sys.modules:
attention_module = sys.modules[_module_name]
AttentionScoreModel = attention_module.AttentionScoreModel
HAS_ATTENTION_MODEL = True
else:
AttentionScoreModel = None
HAS_ATTENTION_MODEL = False
logger.warning("AttentionScoreModel not available")
class AttentionProvider:
"""
Provides attention scores for predictions.
Loads trained attention models and computes scores
to be used as features in Level 1+ models.
Usage:
provider = AttentionProvider('models/attention')
provider.load()
# Get scores for prediction
score, flow_class = provider.get_attention_features(df, 'XAUUSD', '5m')
"""
def __init__(self, models_dir: str = 'models/attention'):
"""
Initialize attention provider.
Args:
models_dir: Directory containing trained attention models
"""
self.models_dir = Path(models_dir)
self.models: Dict[str, 'AttentionScoreModel'] = {}
self.metadata: Dict = {}
self._loaded = False
def load(self) -> bool:
"""
Load all attention models from disk.
Returns:
True if at least one model was loaded
"""
if not HAS_ATTENTION_MODEL:
logger.warning("AttentionScoreModel class not available")
return False
if not self.models_dir.exists():
logger.warning(f"Attention models directory not found: {self.models_dir}")
return False
# Load metadata
metadata_path = self.models_dir / 'trainer_metadata.joblib'
if metadata_path.exists():
self.metadata = joblib.load(metadata_path)
logger.info(f"Loaded attention trainer metadata")
# Load individual models
loaded = 0
for model_dir in self.models_dir.iterdir():
if not model_dir.is_dir():
continue
# Skip non-model directories
if not model_dir.name.endswith('_attention'):
continue
try:
model = AttentionScoreModel.load(str(model_dir))
key = model_dir.name # e.g., 'XAUUSD_5m_attention'
self.models[key] = model
loaded += 1
logger.debug(f"Loaded attention model: {key}")
except Exception as e:
logger.warning(f"Failed to load {model_dir.name}: {e}")
self._loaded = loaded > 0
if self._loaded:
logger.info(f"✅ Loaded {loaded} attention models from {self.models_dir}")
else:
logger.warning(f"No attention models loaded from {self.models_dir}")
return self._loaded
@property
def is_loaded(self) -> bool:
"""Check if models are loaded."""
return self._loaded
@property
def available_models(self) -> list:
"""List of available model keys."""
return list(self.models.keys())
def get_model_key(self, symbol: str, timeframe: str) -> str:
"""Generate model key from symbol and timeframe."""
return f"{symbol}_{timeframe}_attention"
def has_model(self, symbol: str, timeframe: str) -> bool:
"""Check if model exists for symbol/timeframe."""
key = self.get_model_key(symbol, timeframe)
return key in self.models
def get_attention_score(
self,
df: pd.DataFrame,
symbol: str,
timeframe: str
) -> np.ndarray:
"""
Get attention scores for a DataFrame.
Args:
df: OHLCV DataFrame with datetime index
symbol: Trading symbol (e.g., 'XAUUSD')
timeframe: Timeframe (e.g., '5m')
Returns:
Array of attention scores (1.0 = average, >2.0 = high flow)
"""
key = self.get_model_key(symbol, timeframe)
if key not in self.models:
# Return neutral scores if model not available
logger.debug(f"No attention model for {key}, returning neutral scores")
return np.ones(len(df))
try:
prediction = self.models[key].predict(df)
return prediction.attention_score
except Exception as e:
logger.warning(f"Attention prediction failed for {key}: {e}")
return np.ones(len(df))
def get_attention_class(
self,
df: pd.DataFrame,
symbol: str,
timeframe: str
) -> np.ndarray:
"""
Get attention flow class for a DataFrame.
Args:
df: OHLCV DataFrame
symbol: Trading symbol
timeframe: Timeframe
Returns:
Array of flow classes (0=low, 1=medium, 2=high)
"""
key = self.get_model_key(symbol, timeframe)
if key not in self.models:
# Return medium flow if model not available
return np.ones(len(df), dtype=int)
try:
prediction = self.models[key].predict(df)
return prediction.flow_class
except Exception as e:
logger.warning(f"Attention class prediction failed for {key}: {e}")
return np.ones(len(df), dtype=int)
def get_attention_features(
self,
df: pd.DataFrame,
symbol: str,
timeframe: str
) -> Tuple[np.ndarray, np.ndarray]:
"""
Get both attention score and class as features.
This is the main method for integrating attention into Level 1+ models.
Args:
df: OHLCV DataFrame
symbol: Trading symbol
timeframe: Timeframe
Returns:
Tuple of (attention_score, flow_class) arrays
"""
key = self.get_model_key(symbol, timeframe)
if key not in self.models:
# Return neutral values if model not available
n = len(df)
return np.ones(n), np.ones(n, dtype=int)
try:
prediction = self.models[key].predict(df)
return prediction.attention_score, prediction.flow_class
except Exception as e:
logger.warning(f"Attention features failed for {key}: {e}")
n = len(df)
return np.ones(n), np.ones(n, dtype=int)
def get_latest_attention(
self,
df: pd.DataFrame,
symbol: str,
timeframe: str
) -> Dict:
"""
Get attention info for the latest bar.
Useful for real-time signal generation.
Args:
df: OHLCV DataFrame (at least 200 bars for feature calculation)
symbol: Trading symbol
timeframe: Timeframe
Returns:
Dict with attention_score, flow_class, flow_label, is_high_flow
"""
score, flow_class = self.get_attention_features(df, symbol, timeframe)
latest_score = float(score[-1]) if len(score) > 0 else 1.0
latest_class = int(flow_class[-1]) if len(flow_class) > 0 else 1
flow_labels = {0: 'low_flow', 1: 'medium_flow', 2: 'high_flow'}
return {
'attention_score': latest_score,
'flow_class': latest_class,
'flow_label': flow_labels.get(latest_class, 'medium_flow'),
'is_high_flow': latest_class == 2,
'is_tradeable': latest_score >= 1.0 # Only trade when attention >= 1
}
def should_trade(
self,
df: pd.DataFrame,
symbol: str,
timeframe: str,
min_score: float = 1.0
) -> bool:
"""
Check if current market conditions are favorable for trading.
Args:
df: OHLCV DataFrame
symbol: Trading symbol
timeframe: Timeframe
min_score: Minimum attention score to trade
Returns:
True if attention score >= min_score
"""
attention = self.get_latest_attention(df, symbol, timeframe)
return attention['attention_score'] >= min_score
# Global singleton instance
_attention_provider: Optional[AttentionProvider] = None
def get_attention_provider(models_dir: str = None) -> AttentionProvider:
"""
Get or create the global attention provider instance.
Args:
models_dir: Directory containing attention models
Returns:
AttentionProvider instance
"""
global _attention_provider
if _attention_provider is None:
# Default path relative to ml-engine
if models_dir is None:
models_dir = Path(__file__).parent.parent.parent / 'models' / 'attention'
_attention_provider = AttentionProvider(str(models_dir))
_attention_provider.load()
return _attention_provider

View File

@ -31,6 +31,9 @@ from ..data.data_service_client import (
from ..data.features import FeatureEngineer
from ..data.indicators import TechnicalIndicators
# Attention provider for Level 0 features
from .attention_provider import AttentionProvider, get_attention_provider
class Direction(Enum):
LONG = "long"
@ -73,6 +76,16 @@ class TPSLPrediction:
calibrated: bool
@dataclass
class AttentionInfo:
"""Attention score information from Level 0 models"""
attention_score: float # 1.0 = average, >2.0 = high flow
flow_class: int # 0=low, 1=medium, 2=high
flow_label: str # 'low_flow', 'medium_flow', 'high_flow'
is_high_flow: bool # True if flow_class == 2
is_tradeable: bool # True if attention_score >= 1.0
@dataclass
class TradingSignal:
"""Complete trading signal"""
@ -90,6 +103,7 @@ class TradingSignal:
range_prediction: RangePrediction
timestamp: datetime
valid_until: datetime
attention: Optional[AttentionInfo] = None # Level 0 attention info
metadata: Optional[Dict[str, Any]] = None
@ -139,6 +153,7 @@ class PredictionService:
self._range_predictor = None
self._tpsl_classifier = None
self._amd_detector = None
self._attention_provider = None # Level 0 attention models
self._models_loaded = False
# Symbol-specific trainers (nuevos modelos por símbolo/timeframe)
@ -184,6 +199,18 @@ class PredictionService:
self._amd_detector = AMDDetector()
logger.info("✅ AMDDetector initialized")
# Load Attention Provider (Level 0 models)
attention_path = os.path.join(self.models_dir, "attention")
if os.path.exists(attention_path):
self._attention_provider = AttentionProvider(attention_path)
if self._attention_provider.load():
logger.info(f"✅ AttentionProvider loaded ({len(self._attention_provider.models)} models)")
else:
logger.warning("AttentionProvider: No models loaded")
else:
logger.info("No attention models directory found, creating provider anyway")
self._attention_provider = AttentionProvider(attention_path)
self._models_loaded = True
# Cargar modelos por símbolo si el feature flag está activo
@ -494,6 +521,74 @@ class PredictionService:
# Heuristic AMD detection
return self._heuristic_amd_detection(df)
async def get_attention_info(
self,
symbol: str,
timeframe: str = "5m"
) -> Optional[AttentionInfo]:
"""
Get attention score info from Level 0 models.
Args:
symbol: Trading symbol
timeframe: Timeframe (5m or 15m)
Returns:
AttentionInfo or None if not available
"""
if not self._attention_provider or not self._attention_provider.is_loaded:
return None
# Get data for attention calculation
df = await self.get_market_data(symbol, timeframe, lookback_periods=250)
if df.empty:
return None
try:
attention_dict = self._attention_provider.get_latest_attention(
df, symbol, timeframe
)
return AttentionInfo(
attention_score=attention_dict['attention_score'],
flow_class=attention_dict['flow_class'],
flow_label=attention_dict['flow_label'],
is_high_flow=attention_dict['is_high_flow'],
is_tradeable=attention_dict['is_tradeable']
)
except Exception as e:
logger.warning(f"Failed to get attention info for {symbol}/{timeframe}: {e}")
return None
async def should_trade(
self,
symbol: str,
timeframe: str = "5m",
min_attention: float = 1.0
) -> Tuple[bool, Optional[AttentionInfo]]:
"""
Check if current market conditions are favorable for trading.
Uses Level 0 attention models to determine if market has enough flow.
Args:
symbol: Trading symbol
timeframe: Timeframe
min_attention: Minimum attention score to trade
Returns:
Tuple of (should_trade, attention_info)
"""
attention = await self.get_attention_info(symbol, timeframe)
if attention is None:
# If no attention model, allow trading
return True, None
should_trade = attention.attention_score >= min_attention
return should_trade, attention
async def generate_signal(
self,
symbol: str,
@ -550,6 +645,15 @@ class PredictionService:
now = datetime.utcnow()
validity_minutes = {"15m": 15, "1h": 60, "4h": 240}.get(timeframe, 15)
# Get attention info from Level 0 models
attention_info = await self.get_attention_info(symbol, timeframe)
# Boost confidence if high flow detected
adjusted_confidence = confidence
if attention_info and attention_info.is_high_flow:
adjusted_confidence = min(1.0, confidence * 1.15) # 15% boost
logger.debug(f"High flow detected, boosted confidence: {confidence:.3f} -> {adjusted_confidence:.3f}")
return TradingSignal(
signal_id=f"SIG-{uuid.uuid4().hex[:8].upper()}",
symbol=symbol,
@ -559,16 +663,18 @@ class PredictionService:
take_profit=tp,
risk_reward_ratio=float(rr_config.split("_")[1]),
prob_tp_first=tpsl_pred.prob_tp_first,
confidence_score=confidence,
confidence_score=adjusted_confidence,
amd_phase=amd_detection.phase,
volatility_regime=volatility,
range_prediction=range_pred,
timestamp=now,
valid_until=now + timedelta(minutes=validity_minutes),
attention=attention_info,
metadata={
"timeframe": timeframe,
"rr_config": rr_config,
"amd_signals": amd_detection.signals
"amd_signals": amd_detection.signals,
"attention_score": attention_info.attention_score if attention_info else 1.0
}
)