#!/usr/bin/env python3 """ Backtest for Movement Magnitude Predictor ========================================== Tests the asymmetric movement strategy using predicted high/low magnitudes. Strategy: - When predicted high >> predicted low: LONG with good RR - When predicted low >> predicted high: SHORT with good RR - Uses predicted magnitudes for TP/SL levels Author: ML-Specialist (NEXUS v4.0) Date: 2026-01-04 """ import sys sys.path.insert(0, 'src') import numpy as np import pandas as pd from pathlib import Path from datetime import datetime import json from loguru import logger import argparse from data.database import MySQLConnection from training.data_splitter import TemporalDataSplitter from models.movement_magnitude_predictor import MovementMagnitudePredictor def resample_to_timeframe(df: pd.DataFrame, timeframe: str) -> pd.DataFrame: """Resample minute data to desired timeframe""" if timeframe == '5m': rule = '5min' elif timeframe == '15m': rule = '15min' else: raise ValueError(f"Unknown timeframe: {timeframe}") if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index) ohlcv = df.resample(rule).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).dropna() return ohlcv def run_movement_backtest( symbol: str = "XAUUSD", horizon: str = "15m_60min", asymmetry_threshold: float = 1.3, # Lower threshold for more signals min_move_usd: float = 2.0, tp_factor: float = 0.7, # TP at 70% of predicted move sl_factor: float = 1.5, # SL at 150% of predicted adverse move signal_every_n: int = 4, # Every N bars min_confidence: float = 0.3 ): """ Run backtest using MovementMagnitudePredictor. Args: symbol: Trading symbol horizon: Prediction horizon asymmetry_threshold: Min ratio for signal min_move_usd: Min predicted move to trade tp_factor: TP as fraction of predicted favorable move sl_factor: SL as fraction of predicted adverse move signal_every_n: Signal frequency min_confidence: Minimum model confidence """ logger.info("=" * 60) logger.info("MOVEMENT MAGNITUDE BACKTEST") logger.info(f"Symbol: {symbol}, Horizon: {horizon}") logger.info(f"Asymmetry >= {asymmetry_threshold}, Min Move >= ${min_move_usd}") logger.info(f"TP Factor: {tp_factor}, SL Factor: {sl_factor}") logger.info("=" * 60) # Determine timeframe from horizon timeframe = '5m' if horizon.startswith('5m') else '15m' horizon_minutes = int(horizon.split('_')[1].replace('min', '')) bars_ahead = 3 if horizon == '5m_15min' else 4 # Load model model_path = f"models/ml_first/{symbol}/movement_predictor/{horizon}" if not Path(model_path).exists(): logger.error(f"Model not found at {model_path}") return None logger.info(f"Loading model from {model_path}") predictor = MovementMagnitudePredictor( horizons=[horizon], asymmetry_threshold=asymmetry_threshold, min_move_usd=min_move_usd ) predictor.load(model_path) # Load data logger.info("Loading data from database...") db = MySQLConnection('config/database.yaml') df_raw = db.get_ticker_data(symbol, limit=150000) if df_raw.empty: logger.error("No data loaded") return None # Split data - use only OOS splitter = TemporalDataSplitter() split = splitter.split_temporal(df_raw) df_test = split.test_data # Resample to correct timeframe df = resample_to_timeframe(df_test, timeframe) logger.info(f"Test data: {len(df)} bars ({df.index.min()} to {df.index.max()})") # Get predictions logger.info("Generating predictions...") predictions = predictor.predict(df) if not predictions: logger.error("No predictions generated") return None logger.info(f"Generated {len(predictions)} predictions") # Create predictions DataFrame aligned with price data pred_df = pd.DataFrame([p.to_dict() for p in predictions]) pred_df.index = pd.to_datetime(pred_df['timestamp']) pred_df = pred_df.reindex(df.index) # Run backtest trades = [] capital = 10000.0 risk_per_trade = 0.01 equity_curve = [capital] close = df['close'].values high = df['high'].values low = df['low'].values n_signals = 0 n_long = 0 n_short = 0 n_skipped = 0 for i in range(len(df) - bars_ahead - 10): # Signal every N bars if i % signal_every_n != 0: continue # Skip if no prediction idx = df.index[i] if idx not in pred_df.index or pd.isna(pred_df.loc[idx, 'asymmetry_ratio']): n_skipped += 1 continue pred = pred_df.loc[idx] # Check for opportunity asymmetry = pred['asymmetry_ratio'] pred_high = pred['predicted_high_usd'] pred_low = pred['predicted_low_usd'] direction = pred['suggested_direction'] # Apply filters if direction == 'NEUTRAL': n_skipped += 1 continue if asymmetry < asymmetry_threshold and asymmetry > (1 / asymmetry_threshold): n_skipped += 1 continue if pred_high < min_move_usd and pred_low < min_move_usd: n_skipped += 1 continue current_price = close[i] # Calculate TP/SL based on predictions if direction == 'LONG': tp_distance = pred_high * tp_factor sl_distance = pred_low * sl_factor tp_price = current_price + tp_distance sl_price = current_price - sl_distance n_long += 1 else: # SHORT tp_distance = pred_low * tp_factor sl_distance = pred_high * sl_factor tp_price = current_price - tp_distance sl_price = current_price + sl_distance n_short += 1 # Simulate trade exit_price = current_price result = 'timeout' bars_held = 0 for j in range(i + 1, min(i + bars_ahead * 2, len(df))): bars_held += 1 if direction == 'LONG': if high[j] >= tp_price: exit_price = tp_price result = 'tp' break elif low[j] <= sl_price: exit_price = sl_price result = 'sl' break else: # SHORT if low[j] <= tp_price: exit_price = tp_price result = 'tp' break elif high[j] >= sl_price: exit_price = sl_price result = 'sl' break # Timeout if j >= i + bars_ahead * 2 - 1: exit_price = close[j] break # Calculate P&L if direction == 'LONG': pnl_pct = (exit_price - current_price) / current_price else: pnl_pct = (current_price - exit_price) / current_price position_size = capital * risk_per_trade / (sl_distance / current_price) pnl = position_size * pnl_pct capital += pnl equity_curve.append(capital) trades.append({ 'bar': i, 'time': idx, 'direction': direction, 'entry': current_price, 'tp': tp_price, 'sl': sl_price, 'exit': exit_price, 'result': result, 'pnl': pnl, 'bars_held': bars_held, 'pred_high': pred_high, 'pred_low': pred_low, 'asymmetry': asymmetry }) n_signals += 1 # Calculate metrics if not trades: logger.warning("No trades executed") return None trades_df = pd.DataFrame(trades) n_wins = (trades_df['result'] == 'tp').sum() n_losses = (trades_df['result'] == 'sl').sum() n_timeouts = (trades_df['result'] == 'timeout').sum() total_trades = len(trades_df) win_rate = n_wins / total_trades if total_trades > 0 else 0 total_pnl = trades_df['pnl'].sum() avg_win = trades_df[trades_df['pnl'] > 0]['pnl'].mean() if n_wins > 0 else 0 avg_loss = trades_df[trades_df['pnl'] < 0]['pnl'].mean() if n_losses > 0 else 0 equity_curve = np.array(equity_curve) max_equity = np.maximum.accumulate(equity_curve) drawdown = (max_equity - equity_curve) / max_equity max_drawdown = drawdown.max() # Print results print("\n" + "=" * 60) print("MOVEMENT MAGNITUDE BACKTEST RESULTS") print("=" * 60) print(f"Strategy: Asymmetry >= {asymmetry_threshold}, TP={tp_factor*100:.0f}%, SL={sl_factor*100:.0f}%") print(f"Horizon: {horizon} ({horizon_minutes} min ahead)") print("-" * 60) print(f"Total Signals Analyzed: {n_signals + n_skipped}") print(f" Long Signals: {n_long}") print(f" Short Signals: {n_short}") print(f" Skipped: {n_skipped}") print("-" * 60) print(f"Trades Executed: {total_trades}") print(f" Wins (TP hit): {n_wins} ({100*n_wins/total_trades:.1f}%)") print(f" Losses (SL hit): {n_losses} ({100*n_losses/total_trades:.1f}%)") print(f" Timeouts: {n_timeouts} ({100*n_timeouts/total_trades:.1f}%)") print("-" * 60) print(f"WIN RATE: {win_rate:.2%}") print(f"Net P&L: ${total_pnl:,.2f}") print(f"Avg Win: ${avg_win:,.2f}") print(f"Avg Loss: ${avg_loss:,.2f}") print(f"Final Capital: ${capital:,.2f}") print(f"Max Drawdown: {max_drawdown:.2%}") if win_rate >= 0.80: print("\n*** 80% WIN RATE TARGET ACHIEVED! ***") elif win_rate >= 0.75: print("\n*** Close to target: 75%+ achieved ***") else: print("\n*** Below target. Need to adjust parameters ***") # Save results output_dir = Path("reports/movement_backtest") output_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") results = { 'timestamp': timestamp, 'symbol': symbol, 'horizon': horizon, 'config': { 'asymmetry_threshold': asymmetry_threshold, 'min_move_usd': min_move_usd, 'tp_factor': tp_factor, 'sl_factor': sl_factor, 'signal_every_n': signal_every_n }, 'metrics': { 'total_trades': total_trades, 'win_rate': win_rate, 'net_pnl': total_pnl, 'avg_win': avg_win, 'avg_loss': avg_loss, 'max_drawdown': max_drawdown, 'final_capital': capital } } result_file = output_dir / f"{symbol}_{horizon}_{timestamp}.json" with open(result_file, 'w') as f: json.dump(results, f, indent=2, default=str) logger.info(f"\nResults saved to {result_file}") return results def main(): parser = argparse.ArgumentParser(description='Backtest Movement Magnitude Predictor') parser.add_argument('--symbol', default='XAUUSD', help='Trading symbol') parser.add_argument('--horizon', default='15m_60min', help='Prediction horizon') parser.add_argument('--asymmetry', type=float, default=1.3, help='Min asymmetry ratio') parser.add_argument('--min-move', type=float, default=2.0, help='Min move in USD') parser.add_argument('--tp-factor', type=float, default=0.7, help='TP factor') parser.add_argument('--sl-factor', type=float, default=1.5, help='SL factor') parser.add_argument('--signal-freq', type=int, default=4, help='Signal every N bars') args = parser.parse_args() results = run_movement_backtest( symbol=args.symbol, horizon=args.horizon, asymmetry_threshold=args.asymmetry, min_move_usd=args.min_move, tp_factor=args.tp_factor, sl_factor=args.sl_factor, signal_every_n=args.signal_freq ) return results if __name__ == "__main__": main()