diff --git a/src/api/main.py b/src/api/main.py index adb574c..e547e00 100644 --- a/src/api/main.py +++ b/src/api/main.py @@ -1770,6 +1770,214 @@ async def websocket_status(): } +# ============================================================================= +# Historical Signals Query Endpoints +# ============================================================================= + +class SignalQueryParams(BaseModel): + """Parameters for signal query""" + symbol: Optional[str] = None + direction: Optional[str] = None + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + min_confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0) + amd_phase: Optional[str] = None + timeframe: Optional[str] = None + is_valid_only: bool = False + page: int = Field(default=1, ge=1) + page_size: int = Field(default=50, ge=1, le=200) + + +class StoredSignalResponse(BaseModel): + """Single stored signal response""" + signal_id: str + symbol: str + direction: str + entry_price: float + stop_loss: float + take_profit: float + risk_reward_ratio: float + prob_tp_first: float + confidence_score: float + amd_phase: str + volatility_regime: str + timeframe: str + timestamp: datetime + valid_until: datetime + attention_score: Optional[float] = None + flow_class: Optional[int] = None + is_high_flow: Optional[bool] = None + + +class SignalQueryResponse(BaseModel): + """Signal query response with pagination""" + signals: List[Dict[str, Any]] + total_count: int + page: int + page_size: int + has_more: bool + query_params: Optional[Dict[str, Any]] = None + + +@app.get("/api/signals/history", response_model=SignalQueryResponse, tags=["Historical Signals"]) +async def query_historical_signals( + symbol: Optional[str] = Query(default=None, description="Filter by symbol"), + direction: Optional[str] = Query(default=None, description="Filter by direction: long, short, neutral"), + start_date: Optional[datetime] = Query(default=None, description="Start date filter"), + end_date: Optional[datetime] = Query(default=None, description="End date filter"), + min_confidence: Optional[float] = Query(default=None, ge=0.0, le=1.0, description="Minimum confidence"), + amd_phase: Optional[str] = Query(default=None, description="Filter by AMD phase"), + timeframe: Optional[str] = Query(default=None, description="Filter by timeframe"), + is_valid_only: bool = Query(default=False, description="Only return still-valid signals"), + page: int = Query(default=1, ge=1, description="Page number"), + page_size: int = Query(default=50, ge=1, le=200, description="Results per page") +): + """ + Query historical signals with filters. + + Supports filtering by symbol, direction, date range, confidence, AMD phase, etc. + Returns paginated results sorted by timestamp (newest first). + + Example queries: + - Get all XAUUSD signals: /api/signals/history?symbol=XAUUSD + - Get high confidence longs: /api/signals/history?direction=long&min_confidence=0.7 + - Get signals from last 24h: /api/signals/history?start_date=2026-01-24T00:00:00 + """ + global prediction_service + + if prediction_service is None: + prediction_service = get_prediction_service() + + if not prediction_service.signal_store_available: + raise HTTPException( + status_code=503, + detail="Signal store not available" + ) + + result = prediction_service.query_signals( + symbol=symbol.upper() if symbol else None, + direction=direction, + start_date=start_date, + end_date=end_date, + min_confidence=min_confidence, + amd_phase=amd_phase, + timeframe=timeframe, + is_valid_only=is_valid_only, + page=page, + page_size=page_size + ) + + return SignalQueryResponse(**result) + + +@app.get("/api/signals/latest", tags=["Historical Signals"]) +async def get_latest_signals( + symbol: Optional[str] = Query(default=None, description="Filter by symbol"), + limit: int = Query(default=10, ge=1, le=100, description="Number of signals") +): + """ + Get the most recent signals. + + Returns the latest signals, optionally filtered by symbol. + """ + global prediction_service + + if prediction_service is None: + prediction_service = get_prediction_service() + + if not prediction_service.signal_store_available: + raise HTTPException( + status_code=503, + detail="Signal store not available" + ) + + signals = prediction_service.get_latest_signals( + symbol=symbol.upper() if symbol else None, + limit=limit + ) + + return { + "count": len(signals), + "signals": signals + } + + +@app.get("/api/signals/valid", tags=["Historical Signals"]) +async def get_valid_signals( + symbol: Optional[str] = Query(default=None, description="Filter by symbol") +): + """ + Get all currently valid signals. + + Returns signals that have not yet expired (valid_until > now). + """ + global prediction_service + + if prediction_service is None: + prediction_service = get_prediction_service() + + if not prediction_service.signal_store_available: + raise HTTPException( + status_code=503, + detail="Signal store not available" + ) + + signals = prediction_service.get_valid_signals( + symbol=symbol.upper() if symbol else None + ) + + return { + "count": len(signals), + "valid_signals": signals + } + + +@app.get("/api/signals/{signal_id}", tags=["Historical Signals"]) +async def get_signal_by_id(signal_id: str): + """ + Get a specific signal by its ID. + + Signal IDs are in the format: SIG-XXXXXXXX + """ + global prediction_service + + if prediction_service is None: + prediction_service = get_prediction_service() + + if not prediction_service.signal_store_available: + raise HTTPException( + status_code=503, + detail="Signal store not available" + ) + + signal = prediction_service.get_signal_by_id(signal_id) + + if signal is None: + raise HTTPException( + status_code=404, + detail=f"Signal {signal_id} not found" + ) + + return signal + + +@app.get("/api/signals/stats", tags=["Historical Signals"]) +async def get_signal_store_stats(): + """ + Get signal store statistics. + + Returns information about stored signals, including counts by symbol + and direction, capacity, and retention settings. + """ + global prediction_service + + if prediction_service is None: + prediction_service = get_prediction_service() + + stats = prediction_service.get_signal_store_stats() + return stats + + # Main entry point if __name__ == "__main__": import uvicorn diff --git a/src/services/prediction_service.py b/src/services/prediction_service.py index 08e2529..71f2eb0 100644 --- a/src/services/prediction_service.py +++ b/src/services/prediction_service.py @@ -45,6 +45,9 @@ from ..pipelines.hierarchical_pipeline import ( from ..backtesting.rr_backtester import RRBacktester, BacktestConfig, BacktestResult from ..backtesting.metrics import TradingMetrics, TradeRecord +# Signal store for historical queries +from .signal_store import SignalStore, StoredSignal, get_signal_store + class Direction(Enum): LONG = "long" @@ -213,6 +216,7 @@ class PredictionService: self._attention_provider = None # Level 0 attention models self._hierarchical_pipeline = None # L0→L1→L2 pipeline self._backtester = None # Backtesting engine + self._signal_store = None # Historical signal store self._models_loaded = False # Symbol-specific trainers (nuevos modelos por símbolo/timeframe) @@ -292,6 +296,19 @@ class PredictionService: logger.warning(f"RRBacktester initialization failed: {e}") self._backtester = None + # Initialize Signal Store for historical queries + try: + persistence_dir = os.path.join(self.models_dir, "..", "logs", "signals") + self._signal_store = get_signal_store( + max_signals=10000, + retention_hours=168, # 7 days + persistence_dir=persistence_dir + ) + logger.info("✅ SignalStore initialized") + except Exception as e: + logger.warning(f"SignalStore initialization failed: {e}") + self._signal_store = None + self._models_loaded = True # Cargar modelos por símbolo si el feature flag está activo @@ -981,7 +998,7 @@ class PredictionService: adjusted_confidence = min(1.0, confidence * 1.15) # 15% boost logger.debug(f"High flow detected, boosted confidence: {confidence:.3f} -> {adjusted_confidence:.3f}") - return TradingSignal( + signal = TradingSignal( signal_id=f"SIG-{uuid.uuid4().hex[:8].upper()}", symbol=symbol, direction=direction, @@ -1005,6 +1022,142 @@ class PredictionService: } ) + # Store signal for historical queries + self._store_signal(signal) + + return signal + + def _store_signal(self, signal: TradingSignal): + """Store a generated signal in the signal store""" + if not self._signal_store: + return + + try: + stored = StoredSignal( + signal_id=signal.signal_id, + symbol=signal.symbol, + direction=signal.direction.value, + entry_price=signal.entry_price, + stop_loss=signal.stop_loss, + take_profit=signal.take_profit, + risk_reward_ratio=signal.risk_reward_ratio, + prob_tp_first=signal.prob_tp_first, + confidence_score=signal.confidence_score, + amd_phase=signal.amd_phase.value, + volatility_regime=signal.volatility_regime.value, + timeframe=signal.metadata.get("timeframe", "15m"), + timestamp=signal.timestamp, + valid_until=signal.valid_until, + attention_score=signal.attention.attention_score if signal.attention else None, + flow_class=signal.attention.flow_class if signal.attention else None, + is_high_flow=signal.attention.is_high_flow if signal.attention else None, + metadata=signal.metadata + ) + self._signal_store.store(stored) + except Exception as e: + logger.debug(f"Failed to store signal: {e}") + + @property + def signal_store_available(self) -> bool: + """Check if signal store is available""" + return self._signal_store is not None + + def query_signals( + self, + symbol: Optional[str] = None, + direction: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + min_confidence: Optional[float] = None, + amd_phase: Optional[str] = None, + timeframe: Optional[str] = None, + is_valid_only: bool = False, + page: int = 1, + page_size: int = 50 + ) -> Dict[str, Any]: + """ + Query historical signals. + + Args: + symbol: Filter by symbol + direction: Filter by direction ('long', 'short', 'neutral') + start_date: Filter by start date + end_date: Filter by end date + min_confidence: Minimum confidence score + amd_phase: Filter by AMD phase + timeframe: Filter by timeframe + is_valid_only: Only return signals still valid + page: Page number + page_size: Results per page + + Returns: + Dict with signals and pagination info + """ + if not self._signal_store: + return { + "signals": [], + "total_count": 0, + "page": page, + "page_size": page_size, + "has_more": False, + "error": "Signal store not available" + } + + result = self._signal_store.query( + symbol=symbol, + direction=direction, + start_date=start_date, + end_date=end_date, + min_confidence=min_confidence, + amd_phase=amd_phase, + timeframe=timeframe, + is_valid_only=is_valid_only, + page=page, + page_size=page_size + ) + + return { + "signals": [s.to_dict() for s in result.signals], + "total_count": result.total_count, + "page": result.page, + "page_size": result.page_size, + "has_more": result.has_more, + "query_params": result.query_params + } + + def get_latest_signals(self, symbol: Optional[str] = None, limit: int = 10) -> List[Dict]: + """Get the most recent signals""" + if not self._signal_store: + return [] + + signals = self._signal_store.get_latest(symbol=symbol, limit=limit) + return [s.to_dict() for s in signals] + + def get_valid_signals(self, symbol: Optional[str] = None) -> List[Dict]: + """Get all currently valid signals""" + if not self._signal_store: + return [] + + signals = self._signal_store.get_valid(symbol=symbol) + return [s.to_dict() for s in signals] + + def get_signal_by_id(self, signal_id: str) -> Optional[Dict]: + """Get a specific signal by ID""" + if not self._signal_store: + return None + + signal = self._signal_store.get(signal_id) + return signal.to_dict() if signal else None + + def get_signal_store_stats(self) -> Dict[str, Any]: + """Get signal store statistics""" + if not self._signal_store: + return {"available": False} + + stats = self._signal_store.get_statistics() + stats["available"] = True + return stats + def _determine_direction( self, amd: AMDDetection, diff --git a/src/services/signal_store.py b/src/services/signal_store.py new file mode 100644 index 0000000..c3f88cb --- /dev/null +++ b/src/services/signal_store.py @@ -0,0 +1,462 @@ +""" +Signal Store Service +==================== + +Stores and queries historical trading signals. +Provides in-memory storage with optional persistence to database. + +Author: ML Pipeline +Version: 1.0.0 +Created: 2026-01-25 +""" + +import asyncio +from collections import deque +from dataclasses import dataclass, asdict +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any +from pathlib import Path +import json +from loguru import logger + + +@dataclass +class StoredSignal: + """A stored trading signal""" + signal_id: str + symbol: str + direction: str # 'long', 'short', 'neutral' + entry_price: float + stop_loss: float + take_profit: float + risk_reward_ratio: float + prob_tp_first: float + confidence_score: float + amd_phase: str + volatility_regime: str + timeframe: str + timestamp: datetime + valid_until: datetime + # Optional attention info + attention_score: Optional[float] = None + flow_class: Optional[int] = None + is_high_flow: Optional[bool] = None + # Optional hierarchical info + hierarchical_available: bool = False + delta_high_final: Optional[float] = None + delta_low_final: Optional[float] = None + trade_quality: Optional[str] = None + # Metadata + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary""" + d = asdict(self) + d['timestamp'] = self.timestamp.isoformat() + d['valid_until'] = self.valid_until.isoformat() + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'StoredSignal': + """Create from dictionary""" + data = data.copy() + if isinstance(data.get('timestamp'), str): + data['timestamp'] = datetime.fromisoformat(data['timestamp']) + if isinstance(data.get('valid_until'), str): + data['valid_until'] = datetime.fromisoformat(data['valid_until']) + return cls(**data) + + +@dataclass +class SignalQueryResult: + """Result of a signal query""" + signals: List[StoredSignal] + total_count: int + page: int + page_size: int + has_more: bool + query_params: Dict[str, Any] + + +class SignalStore: + """ + In-memory store for historical signals with query capabilities. + + Features: + - Stores signals with automatic expiration + - Query by symbol, direction, date range, etc. + - Pagination support + - Optional persistence to JSON files + """ + + def __init__( + self, + max_signals: int = 10000, + retention_hours: int = 168, # 7 days + persistence_dir: Optional[str] = None + ): + """ + Initialize signal store. + + Args: + max_signals: Maximum signals to keep in memory + retention_hours: Hours to retain signals (default 7 days) + persistence_dir: Optional directory for JSON persistence + """ + self.max_signals = max_signals + self.retention_hours = retention_hours + self.persistence_dir = Path(persistence_dir) if persistence_dir else None + + # Storage: deque for efficient FIFO, dict for fast lookup + self._signals: deque = deque(maxlen=max_signals) + self._index_by_id: Dict[str, StoredSignal] = {} + self._index_by_symbol: Dict[str, List[str]] = {} # symbol -> list of signal_ids + + # Statistics + self._stats = { + 'total_stored': 0, + 'total_queries': 0, + 'signals_by_symbol': {}, + 'signals_by_direction': {'long': 0, 'short': 0, 'neutral': 0} + } + + # Load persisted signals if directory exists + if self.persistence_dir: + self.persistence_dir.mkdir(parents=True, exist_ok=True) + self._load_persisted() + + logger.info(f"SignalStore initialized (max={max_signals}, retention={retention_hours}h)") + + def store(self, signal: StoredSignal) -> bool: + """ + Store a new signal. + + Args: + signal: Signal to store + + Returns: + True if stored successfully + """ + try: + # Check for duplicate + if signal.signal_id in self._index_by_id: + logger.debug(f"Signal {signal.signal_id} already exists, updating") + self._remove_signal(signal.signal_id) + + # Add to storage + self._signals.append(signal) + self._index_by_id[signal.signal_id] = signal + + # Update symbol index + if signal.symbol not in self._index_by_symbol: + self._index_by_symbol[signal.symbol] = [] + self._index_by_symbol[signal.symbol].append(signal.signal_id) + + # Update stats + self._stats['total_stored'] += 1 + self._stats['signals_by_symbol'][signal.symbol] = \ + self._stats['signals_by_symbol'].get(signal.symbol, 0) + 1 + self._stats['signals_by_direction'][signal.direction] = \ + self._stats['signals_by_direction'].get(signal.direction, 0) + 1 + + logger.debug(f"Stored signal {signal.signal_id} for {signal.symbol}") + return True + + except Exception as e: + logger.error(f"Failed to store signal: {e}") + return False + + def get(self, signal_id: str) -> Optional[StoredSignal]: + """Get a signal by ID""" + return self._index_by_id.get(signal_id) + + def query( + self, + symbol: Optional[str] = None, + direction: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + min_confidence: Optional[float] = None, + max_confidence: Optional[float] = None, + amd_phase: Optional[str] = None, + timeframe: Optional[str] = None, + is_valid_only: bool = False, + page: int = 1, + page_size: int = 50, + sort_by: str = 'timestamp', + sort_desc: bool = True + ) -> SignalQueryResult: + """ + Query signals with filters. + + Args: + symbol: Filter by symbol (e.g., 'XAUUSD') + direction: Filter by direction ('long', 'short', 'neutral') + start_date: Filter by start date + end_date: Filter by end date + min_confidence: Minimum confidence score + max_confidence: Maximum confidence score + amd_phase: Filter by AMD phase + timeframe: Filter by timeframe + is_valid_only: Only return signals that are still valid + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by + sort_desc: Sort descending + + Returns: + SignalQueryResult with matching signals + """ + self._stats['total_queries'] += 1 + + # Start with all signals or symbol-filtered + if symbol and symbol.upper() in self._index_by_symbol: + signal_ids = self._index_by_symbol[symbol.upper()] + signals = [self._index_by_id[sid] for sid in signal_ids if sid in self._index_by_id] + else: + signals = list(self._signals) + + now = datetime.utcnow() + + # Apply filters + filtered = [] + for s in signals: + # Symbol filter (if not already applied) + if symbol and s.symbol.upper() != symbol.upper(): + continue + + # Direction filter + if direction and s.direction != direction: + continue + + # Date range filter + if start_date and s.timestamp < start_date: + continue + if end_date and s.timestamp > end_date: + continue + + # Confidence filter + if min_confidence is not None and s.confidence_score < min_confidence: + continue + if max_confidence is not None and s.confidence_score > max_confidence: + continue + + # AMD phase filter + if amd_phase and s.amd_phase != amd_phase: + continue + + # Timeframe filter + if timeframe and s.timeframe != timeframe: + continue + + # Valid only filter + if is_valid_only and s.valid_until < now: + continue + + filtered.append(s) + + # Sort + if sort_by == 'timestamp': + filtered.sort(key=lambda x: x.timestamp, reverse=sort_desc) + elif sort_by == 'confidence': + filtered.sort(key=lambda x: x.confidence_score, reverse=sort_desc) + elif sort_by == 'symbol': + filtered.sort(key=lambda x: x.symbol, reverse=sort_desc) + + # Pagination + total_count = len(filtered) + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + page_signals = filtered[start_idx:end_idx] + has_more = end_idx < total_count + + return SignalQueryResult( + signals=page_signals, + total_count=total_count, + page=page, + page_size=page_size, + has_more=has_more, + query_params={ + 'symbol': symbol, + 'direction': direction, + 'start_date': start_date.isoformat() if start_date else None, + 'end_date': end_date.isoformat() if end_date else None, + 'min_confidence': min_confidence, + 'amd_phase': amd_phase, + 'timeframe': timeframe + } + ) + + def get_latest(self, symbol: Optional[str] = None, limit: int = 10) -> List[StoredSignal]: + """Get the latest signals""" + if symbol: + symbol = symbol.upper() + if symbol in self._index_by_symbol: + signal_ids = self._index_by_symbol[symbol][-limit:] + signals = [self._index_by_id[sid] for sid in reversed(signal_ids) if sid in self._index_by_id] + return signals[:limit] + return [] + + signals = list(self._signals)[-limit:] + return list(reversed(signals)) + + def get_valid(self, symbol: Optional[str] = None) -> List[StoredSignal]: + """Get all currently valid signals""" + now = datetime.utcnow() + result = self.query( + symbol=symbol, + is_valid_only=True, + page_size=1000 + ) + return result.signals + + def get_statistics(self) -> Dict[str, Any]: + """Get store statistics""" + now = datetime.utcnow() + valid_count = len([s for s in self._signals if s.valid_until > now]) + + return { + 'total_signals': len(self._signals), + 'valid_signals': valid_count, + 'expired_signals': len(self._signals) - valid_count, + 'unique_symbols': len(self._index_by_symbol), + 'symbols': list(self._index_by_symbol.keys()), + 'total_stored_all_time': self._stats['total_stored'], + 'total_queries': self._stats['total_queries'], + 'signals_by_symbol': self._stats['signals_by_symbol'], + 'signals_by_direction': self._stats['signals_by_direction'], + 'max_capacity': self.max_signals, + 'retention_hours': self.retention_hours + } + + def cleanup_expired(self) -> int: + """Remove expired signals based on retention policy""" + cutoff = datetime.utcnow() - timedelta(hours=self.retention_hours) + removed = 0 + + # Find signals to remove + to_remove = [] + for s in self._signals: + if s.timestamp < cutoff: + to_remove.append(s.signal_id) + + # Remove them + for signal_id in to_remove: + if self._remove_signal(signal_id): + removed += 1 + + if removed > 0: + logger.info(f"Cleaned up {removed} expired signals") + + return removed + + def _remove_signal(self, signal_id: str) -> bool: + """Remove a signal by ID""" + if signal_id not in self._index_by_id: + return False + + signal = self._index_by_id[signal_id] + + # Remove from deque + try: + self._signals.remove(signal) + except ValueError: + pass + + # Remove from indices + del self._index_by_id[signal_id] + + if signal.symbol in self._index_by_symbol: + try: + self._index_by_symbol[signal.symbol].remove(signal_id) + except ValueError: + pass + + return True + + def persist(self, filename: Optional[str] = None) -> bool: + """Persist signals to JSON file""" + if not self.persistence_dir: + logger.warning("No persistence directory configured") + return False + + try: + if filename is None: + filename = f"signals_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" + + filepath = self.persistence_dir / filename + data = { + 'saved_at': datetime.utcnow().isoformat(), + 'count': len(self._signals), + 'signals': [s.to_dict() for s in self._signals] + } + + with open(filepath, 'w') as f: + json.dump(data, f, indent=2, default=str) + + logger.info(f"Persisted {len(self._signals)} signals to {filepath}") + return True + + except Exception as e: + logger.error(f"Failed to persist signals: {e}") + return False + + def _load_persisted(self) -> int: + """Load signals from persistence directory""" + if not self.persistence_dir or not self.persistence_dir.exists(): + return 0 + + loaded = 0 + try: + # Find most recent file + files = sorted(self.persistence_dir.glob("signals_*.json"), reverse=True) + if not files: + return 0 + + latest_file = files[0] + with open(latest_file, 'r') as f: + data = json.load(f) + + for signal_data in data.get('signals', []): + try: + signal = StoredSignal.from_dict(signal_data) + if self.store(signal): + loaded += 1 + except Exception as e: + logger.debug(f"Failed to load signal: {e}") + + logger.info(f"Loaded {loaded} persisted signals from {latest_file}") + + except Exception as e: + logger.error(f"Failed to load persisted signals: {e}") + + return loaded + + def clear(self): + """Clear all signals""" + self._signals.clear() + self._index_by_id.clear() + self._index_by_symbol.clear() + logger.info("Signal store cleared") + + +# Global singleton instance +_signal_store: Optional[SignalStore] = None + + +def get_signal_store( + max_signals: int = 10000, + retention_hours: int = 168, + persistence_dir: Optional[str] = None +) -> SignalStore: + """Get or create the global signal store instance""" + global _signal_store + + if _signal_store is None: + _signal_store = SignalStore( + max_signals=max_signals, + retention_hours=retention_hours, + persistence_dir=persistence_dir + ) + + return _signal_store