#!/usr/bin/env python3 """ VBP Feature Engineering - Volatility Breakout Predictor Features ================================================================= Comprehensive feature engineering for volatility-based breakout prediction. Features include: - ATR (Average True Range) at multiple periods - Bollinger Bands (width, squeeze detection) - Keltner Channels (for squeeze indicator) - Compression Score (range contraction detection) - Historical Volatility metrics - Breakout labeling for training Key Concept: Breakouts often occur after periods of low volatility (squeeze/compression). This module extracts features that capture volatility compression and expansion patterns to predict upcoming breakouts. Author: ML-Specialist (NEXUS v4.0) Version: 1.0.0 Created: 2026-01-25 """ import numpy as np import pandas as pd from typing import Dict, List, Optional, Tuple, Union from dataclasses import dataclass, field from loguru import logger @dataclass class VBPFeatureConfig: """Configuration for VBP feature engineering.""" # ATR periods atr_periods: List[int] = field(default_factory=lambda: [5, 10, 20, 50]) # Bollinger Bands bb_period: int = 20 bb_std: float = 2.0 # Keltner Channels keltner_period: int = 20 keltner_mult: float = 1.5 # Compression detection compression_lookback: int = 50 # Historical volatility hv_windows: List[int] = field(default_factory=lambda: [10, 20, 50]) # Breakout labeling breakout_atr_mult: float = 2.0 forward_periods: int = 5 # Minimum periods for rolling calculations min_periods_ratio: float = 0.5 class VBPFeatureEngineer: """ Feature engineering for Volatility Breakout Prediction. Extracts volatility-based features designed to predict breakouts: - ATR-based volatility at multiple timeframes - Bollinger Band squeeze indicators - Keltner Channel squeeze (BB inside KC) - Range compression scores - Historical volatility metrics Usage: engineer = VBPFeatureEngineer(VBPFeatureConfig()) features = engineer.compute_all_features(df) labels = engineer.label_breakouts(df) """ def __init__(self, config: Optional[VBPFeatureConfig] = None): """ Initialize VBP Feature Engineer. Args: config: Feature engineering configuration """ self.config = config or VBPFeatureConfig() def _get_price_columns(self, df: pd.DataFrame) -> Tuple[str, str, str, str]: """Get standardized column names for OHLC.""" # Handle different column naming conventions open_col = 'Open' if 'Open' in df.columns else 'open' high_col = 'High' if 'High' in df.columns else 'high' low_col = 'Low' if 'Low' in df.columns else 'low' close_col = 'Close' if 'Close' in df.columns else 'close' return open_col, high_col, low_col, close_col def compute_true_range(self, df: pd.DataFrame) -> pd.Series: """ Compute True Range. TR = max(H-L, |H-C_prev|, |L-C_prev|) Args: df: DataFrame with OHLC data Returns: Series with True Range values """ _, high_col, low_col, close_col = self._get_price_columns(df) high = df[high_col] low = df[low_col] close_prev = df[close_col].shift(1) tr1 = high - low tr2 = (high - close_prev).abs() tr3 = (low - close_prev).abs() true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) return true_range def compute_atr( self, df: pd.DataFrame, periods: Optional[List[int]] = None ) -> pd.DataFrame: """ Compute ATR (Average True Range) at multiple periods. ATR is the smoothed moving average of True Range. Multiple periods capture short-term vs long-term volatility. Args: df: DataFrame with OHLC data periods: List of ATR periods (default: [5, 10, 20, 50]) Returns: DataFrame with ATR columns for each period """ periods = periods or self.config.atr_periods true_range = self.compute_true_range(df) atr_features = pd.DataFrame(index=df.index) for period in periods: min_periods = max(1, int(period * self.config.min_periods_ratio)) # Use EMA for ATR (Wilder's smoothing) atr = true_range.ewm(span=period, min_periods=min_periods, adjust=False).mean() atr_features[f'atr_{period}'] = atr # ATR as percentage of close _, _, _, close_col = self._get_price_columns(df) atr_features[f'atr_{period}_pct'] = atr / (df[close_col] + 1e-8) * 100 # ATR ratios (short vs long term) if len(periods) >= 2: short_period = periods[0] long_period = periods[-1] atr_features['atr_ratio_short_long'] = ( atr_features[f'atr_{short_period}'] / (atr_features[f'atr_{long_period}'] + 1e-8) ) # ATR expansion/contraction atr_20 = atr_features.get('atr_20', atr_features[f'atr_{periods[0]}']) atr_features['atr_change'] = atr_20.pct_change(5) return atr_features def compute_bollinger_bands( self, df: pd.DataFrame, period: Optional[int] = None, std: Optional[float] = None ) -> pd.DataFrame: """ Compute Bollinger Bands and derived features. BB = SMA +/- (std_mult * rolling_std) Key features: - upper, lower: Band boundaries - width: Band width normalized by middle - squeeze: Width relative to historical width (compression indicator) - position: Price position within bands (0-1) Args: df: DataFrame with OHLC data period: SMA period (default: 20) std: Standard deviation multiplier (default: 2.0) Returns: DataFrame with Bollinger Band features """ period = period or self.config.bb_period std_mult = std or self.config.bb_std _, _, _, close_col = self._get_price_columns(df) close = df[close_col] min_periods = max(1, int(period * self.config.min_periods_ratio)) # Calculate bands sma = close.rolling(window=period, min_periods=min_periods).mean() rolling_std = close.rolling(window=period, min_periods=min_periods).std() bb_features = pd.DataFrame(index=df.index) bb_features['bb_upper'] = sma + (std_mult * rolling_std) bb_features['bb_lower'] = sma - (std_mult * rolling_std) bb_features['bb_middle'] = sma # Band width (normalized) bb_features['bb_width'] = ( (bb_features['bb_upper'] - bb_features['bb_lower']) / (sma + 1e-8) ) # Width relative to historical (squeeze indicator) # Low values indicate compression/squeeze width_ma = bb_features['bb_width'].rolling(window=50, min_periods=10).mean() bb_features['bb_squeeze'] = bb_features['bb_width'] / (width_ma + 1e-8) # Price position within bands (0 = at lower, 1 = at upper) bb_features['bb_position'] = ( (close - bb_features['bb_lower']) / (bb_features['bb_upper'] - bb_features['bb_lower'] + 1e-8) ) # Distance from bands (useful for breakout detection) bb_features['bb_dist_upper'] = (bb_features['bb_upper'] - close) / (close + 1e-8) bb_features['bb_dist_lower'] = (close - bb_features['bb_lower']) / (close + 1e-8) # Percent B (standardized position, can be < 0 or > 1) bb_features['bb_percent_b'] = ( (close - bb_features['bb_lower']) / (bb_features['bb_upper'] - bb_features['bb_lower'] + 1e-8) ) return bb_features def compute_keltner_channels( self, df: pd.DataFrame, period: Optional[int] = None, mult: Optional[float] = None ) -> pd.DataFrame: """ Compute Keltner Channels. KC = EMA +/- (mult * ATR) Used with Bollinger Bands to detect squeeze: When BB is inside KC, volatility is compressed (squeeze). Args: df: DataFrame with OHLC data period: EMA/ATR period (default: 20) mult: ATR multiplier (default: 1.5) Returns: DataFrame with Keltner Channel features """ period = period or self.config.keltner_period mult = mult or self.config.keltner_mult _, _, _, close_col = self._get_price_columns(df) close = df[close_col] min_periods = max(1, int(period * self.config.min_periods_ratio)) # Calculate EMA ema = close.ewm(span=period, min_periods=min_periods, adjust=False).mean() # Calculate ATR true_range = self.compute_true_range(df) atr = true_range.ewm(span=period, min_periods=min_periods, adjust=False).mean() kc_features = pd.DataFrame(index=df.index) kc_features['kc_upper'] = ema + (mult * atr) kc_features['kc_lower'] = ema - (mult * atr) kc_features['kc_middle'] = ema # Channel width kc_features['kc_width'] = ( (kc_features['kc_upper'] - kc_features['kc_lower']) / (ema + 1e-8) ) # Price position within channels kc_features['kc_position'] = ( (close - kc_features['kc_lower']) / (kc_features['kc_upper'] - kc_features['kc_lower'] + 1e-8) ) return kc_features def compute_squeeze_indicator(self, df: pd.DataFrame) -> pd.DataFrame: """ Compute Squeeze Indicator (TTM Squeeze concept). Squeeze occurs when Bollinger Bands are inside Keltner Channels. This indicates low volatility and potential upcoming breakout. Returns: - squeeze_on: Binary indicator (1 = squeeze active) - squeeze_strength: How tight the squeeze is - squeeze_duration: Consecutive bars in squeeze Args: df: DataFrame with OHLC data Returns: DataFrame with squeeze indicator features """ # Get BB and KC bb = self.compute_bollinger_bands(df) kc = self.compute_keltner_channels(df) squeeze_features = pd.DataFrame(index=df.index) # Squeeze is ON when BB is inside KC # BB lower > KC lower AND BB upper < KC upper squeeze_on = ( (bb['bb_lower'] > kc['kc_lower']) & (bb['bb_upper'] < kc['kc_upper']) ).astype(float) squeeze_features['squeeze_on'] = squeeze_on # Squeeze strength: how much BB is inside KC # Higher = tighter squeeze bb_width = bb['bb_upper'] - bb['bb_lower'] kc_width = kc['kc_upper'] - kc['kc_lower'] squeeze_features['squeeze_strength'] = 1 - (bb_width / (kc_width + 1e-8)) squeeze_features['squeeze_strength'] = squeeze_features['squeeze_strength'].clip(0, 1) # Squeeze duration (consecutive bars in squeeze) squeeze_duration = squeeze_on.copy() for i in range(1, len(squeeze_duration)): if squeeze_on.iloc[i] == 1: squeeze_duration.iloc[i] = squeeze_duration.iloc[i-1] + 1 else: squeeze_duration.iloc[i] = 0 squeeze_features['squeeze_duration'] = squeeze_duration # Squeeze release (transition from squeeze to no squeeze) squeeze_features['squeeze_release'] = ( (squeeze_on.shift(1) == 1) & (squeeze_on == 0) ).astype(float) # Momentum indicator during squeeze (using close momentum) _, _, _, close_col = self._get_price_columns(df) close = df[close_col] momentum = close - close.rolling(12).mean() squeeze_features['squeeze_momentum'] = momentum / (close + 1e-8) # Momentum direction (positive = bullish breakout likely) squeeze_features['squeeze_momentum_direction'] = np.sign(momentum) return squeeze_features def compute_compression_score( self, df: pd.DataFrame, lookback: Optional[int] = None ) -> pd.DataFrame: """ Compute range compression score. Measures how compressed current range is vs historical range. Low score = high compression = potential breakout setup. Formula: current_range / max_range_in_lookback Args: df: DataFrame with OHLC data lookback: Lookback period for historical range (default: 50) Returns: DataFrame with compression score features """ lookback = lookback or self.config.compression_lookback _, high_col, low_col, close_col = self._get_price_columns(df) high = df[high_col] low = df[low_col] close = df[close_col] compression_features = pd.DataFrame(index=df.index) # Current range (single bar) current_range = high - low # Rolling range (multi-bar) rolling_high = high.rolling(5).max() rolling_low = low.rolling(5).min() rolling_range = rolling_high - rolling_low # Historical max range max_range = rolling_range.rolling(lookback).max() min_range = rolling_range.rolling(lookback).min() # Compression score: min/max (lower = more compressed) compression_features['compression_score'] = ( rolling_range / (max_range + 1e-8) ) # Normalized compression (0-1 scale) compression_features['compression_normalized'] = ( (rolling_range - min_range) / (max_range - min_range + 1e-8) ) # Range percentile (how current range ranks historically) def rolling_percentile(series, window): result = pd.Series(index=series.index, dtype=float) for i in range(window, len(series)): hist_values = series.iloc[i-window:i] current = series.iloc[i] percentile = (hist_values < current).sum() / window result.iloc[i] = percentile return result compression_features['range_percentile'] = rolling_percentile( rolling_range, lookback ) # Range as percentage of price compression_features['range_pct'] = current_range / (close + 1e-8) * 100 # Range change (expansion/contraction trend) compression_features['range_change_5'] = rolling_range.pct_change(5) compression_features['range_change_10'] = rolling_range.pct_change(10) # Inside bars count (consecutive lower range bars) lower_range = current_range < current_range.shift(1) inside_count = lower_range.astype(int).copy() for i in range(1, len(inside_count)): if lower_range.iloc[i]: inside_count.iloc[i] = inside_count.iloc[i-1] + 1 else: inside_count.iloc[i] = 0 compression_features['inside_bar_count'] = inside_count return compression_features def compute_historical_volatility( self, df: pd.DataFrame, windows: Optional[List[int]] = None ) -> pd.DataFrame: """ Compute historical volatility metrics. Uses log returns for more accurate volatility estimation. Args: df: DataFrame with OHLC data windows: List of volatility windows (default: [10, 20, 50]) Returns: DataFrame with historical volatility features """ windows = windows or self.config.hv_windows _, _, _, close_col = self._get_price_columns(df) close = df[close_col] # Log returns (more accurate for volatility) log_returns = np.log(close / close.shift(1)) hv_features = pd.DataFrame(index=df.index) for window in windows: min_periods = max(1, int(window * self.config.min_periods_ratio)) # Standard deviation of log returns (annualized) hv = log_returns.rolling(window=window, min_periods=min_periods).std() hv_features[f'hv_{window}'] = hv # Annualized (assuming 252 trading days, adjusting for intraday) hv_features[f'hv_{window}_annual'] = hv * np.sqrt(252 * 24) # for hourly # Volatility ratios if len(windows) >= 2: short_window = windows[0] long_window = windows[-1] hv_features['hv_ratio'] = ( hv_features[f'hv_{short_window}'] / (hv_features[f'hv_{long_window}'] + 1e-8) ) # Volatility percentile hv_20 = hv_features.get('hv_20', hv_features[f'hv_{windows[0]}']) hv_rolling_max = hv_20.rolling(100).max() hv_rolling_min = hv_20.rolling(100).min() hv_features['hv_percentile'] = ( (hv_20 - hv_rolling_min) / (hv_rolling_max - hv_rolling_min + 1e-8) ) # Volatility regime (low/medium/high) hv_features['hv_regime'] = pd.cut( hv_features['hv_percentile'], bins=[-np.inf, 0.33, 0.66, np.inf], labels=[0, 1, 2] ).astype(float) # Volatility trend hv_features['hv_trend'] = hv_20 - hv_20.rolling(10).mean() # Parkinson volatility (using high-low range) _, high_col, low_col, _ = self._get_price_columns(df) high = df[high_col] low = df[low_col] log_hl = np.log(high / low) parkinson = log_hl.pow(2) / (4 * np.log(2)) hv_features['parkinson_vol'] = parkinson.rolling(20).mean().pow(0.5) return hv_features def label_breakouts( self, df: pd.DataFrame, atr_mult: Optional[float] = None, forward_periods: Optional[int] = None ) -> pd.DataFrame: """ Label breakouts for training. A breakout is defined as a move exceeding atr_mult * ATR within forward_periods bars. Labels: - 0: No breakout - 1: Bullish breakout (upward) - 2: Bearish breakout (downward) Also returns direction and magnitude for regression targets. Args: df: DataFrame with OHLC data atr_mult: ATR multiplier threshold (default: 2.0) forward_periods: Forward look period (default: 5) Returns: DataFrame with breakout labels and targets """ atr_mult = atr_mult or self.config.breakout_atr_mult forward_periods = forward_periods or self.config.forward_periods _, high_col, low_col, close_col = self._get_price_columns(df) close = df[close_col] high = df[high_col] low = df[low_col] # Compute ATR for threshold true_range = self.compute_true_range(df) atr = true_range.rolling(20).mean() labels = pd.DataFrame(index=df.index) # Forward high and low (max/min in forward window) forward_high = high.rolling(forward_periods).max().shift(-forward_periods) forward_low = low.rolling(forward_periods).min().shift(-forward_periods) # Calculate forward moves from current close upward_move = forward_high - close downward_move = close - forward_low # Threshold for breakout threshold = atr_mult * atr # Classify breakouts bullish_breakout = upward_move > threshold bearish_breakout = downward_move > threshold # Labels: 0 = no breakout, 1 = bullish, 2 = bearish # If both directions trigger, use the larger move labels['breakout_label'] = 0 labels.loc[bullish_breakout, 'breakout_label'] = 1 labels.loc[bearish_breakout, 'breakout_label'] = 2 # Handle cases where both are true (use stronger direction) both_mask = bullish_breakout & bearish_breakout labels.loc[both_mask & (upward_move >= downward_move), 'breakout_label'] = 1 labels.loc[both_mask & (downward_move > upward_move), 'breakout_label'] = 2 # Binary breakout (any direction) labels['breakout_binary'] = (labels['breakout_label'] > 0).astype(int) # Direction: 1 = bullish, -1 = bearish, 0 = no breakout labels['breakout_direction'] = 0 labels.loc[labels['breakout_label'] == 1, 'breakout_direction'] = 1 labels.loc[labels['breakout_label'] == 2, 'breakout_direction'] = -1 # Magnitude (for regression) labels['breakout_magnitude'] = np.maximum(upward_move, downward_move) / (atr + 1e-8) # Signed magnitude (positive for bullish, negative for bearish) labels['breakout_signed_magnitude'] = labels['breakout_magnitude'] * labels['breakout_direction'] # Forward return (for additional target) labels['forward_return'] = close.shift(-forward_periods) / close - 1 # Log breakout statistics total = len(labels) no_breakout = (labels['breakout_label'] == 0).sum() bullish = (labels['breakout_label'] == 1).sum() bearish = (labels['breakout_label'] == 2).sum() logger.info(f"Breakout labeling complete:") logger.info(f" No breakout: {no_breakout} ({no_breakout/total*100:.1f}%)") logger.info(f" Bullish: {bullish} ({bullish/total*100:.1f}%)") logger.info(f" Bearish: {bearish} ({bearish/total*100:.1f}%)") return labels def compute_all_features(self, df: pd.DataFrame) -> pd.DataFrame: """ Compute all VBP features. Combines all feature types into a single DataFrame. Args: df: DataFrame with OHLC data Returns: DataFrame with all VBP features """ logger.info(f"Computing VBP features for {len(df)} samples...") # Compute all feature groups atr_features = self.compute_atr(df) bb_features = self.compute_bollinger_bands(df) kc_features = self.compute_keltner_channels(df) squeeze_features = self.compute_squeeze_indicator(df) compression_features = self.compute_compression_score(df) hv_features = self.compute_historical_volatility(df) # Combine all features all_features = pd.concat([ atr_features, bb_features, kc_features, squeeze_features, compression_features, hv_features ], axis=1) # Remove duplicate columns (if any) all_features = all_features.loc[:, ~all_features.columns.duplicated()] # Fill NaN with forward/backward fill, then 0 all_features = all_features.fillna(method='ffill').fillna(method='bfill').fillna(0) logger.info(f"Computed {len(all_features.columns)} features") return all_features def get_feature_names(self) -> List[str]: """Get list of all feature names.""" # Create a dummy dataframe to get feature names dummy_dates = pd.date_range('2020-01-01', periods=200, freq='1H') dummy_df = pd.DataFrame({ 'open': np.random.randn(200).cumsum() + 100, 'high': np.random.randn(200).cumsum() + 101, 'low': np.random.randn(200).cumsum() + 99, 'close': np.random.randn(200).cumsum() + 100 }, index=dummy_dates) dummy_df['high'] = dummy_df[['open', 'high', 'close']].max(axis=1) dummy_df['low'] = dummy_df[['open', 'low', 'close']].min(axis=1) features = self.compute_all_features(dummy_df) return features.columns.tolist() if __name__ == "__main__": # Test the feature engineering module print("Testing VBP Feature Engineering") print("=" * 60) # Create sample OHLCV data with some volatility patterns np.random.seed(42) n = 500 dates = pd.date_range('2025-01-01', periods=n, freq='1H') # Simulate price with varying volatility volatility = np.where( (np.arange(n) % 100 > 70), # High vol periods 0.02, 0.005 # Low vol (compression) periods ) returns = np.random.randn(n) * volatility price = 2000 * (1 + returns).cumprod() df = pd.DataFrame({ 'open': price, 'high': price * (1 + np.abs(np.random.randn(n)) * volatility), 'low': price * (1 - np.abs(np.random.randn(n)) * volatility), 'close': price * (1 + np.random.randn(n) * volatility * 0.5), 'volume': np.random.randint(100, 1000, n) }, index=dates) # Ensure OHLC consistency df['high'] = df[['open', 'high', 'close']].max(axis=1) df['low'] = df[['open', 'low', 'close']].min(axis=1) # Initialize engineer config = VBPFeatureConfig() engineer = VBPFeatureEngineer(config) # Test individual feature groups print("\n1. Testing ATR features...") atr_features = engineer.compute_atr(df) print(f" ATR features: {len(atr_features.columns)}") print(f" Columns: {list(atr_features.columns)}") print("\n2. Testing Bollinger Bands features...") bb_features = engineer.compute_bollinger_bands(df) print(f" BB features: {len(bb_features.columns)}") print(f" Columns: {list(bb_features.columns)}") print("\n3. Testing Keltner Channels features...") kc_features = engineer.compute_keltner_channels(df) print(f" KC features: {len(kc_features.columns)}") print("\n4. Testing Squeeze Indicator...") squeeze_features = engineer.compute_squeeze_indicator(df) print(f" Squeeze features: {len(squeeze_features.columns)}") print(f" Squeeze ON periods: {squeeze_features['squeeze_on'].sum()}") print("\n5. Testing Compression Score...") compression_features = engineer.compute_compression_score(df) print(f" Compression features: {len(compression_features.columns)}") print(f" Avg compression score: {compression_features['compression_score'].mean():.4f}") print("\n6. Testing Historical Volatility...") hv_features = engineer.compute_historical_volatility(df) print(f" HV features: {len(hv_features.columns)}") print("\n7. Testing Breakout Labels...") labels = engineer.label_breakouts(df) print(f" Label columns: {list(labels.columns)}") print("\n8. Testing All Features Combined...") all_features = engineer.compute_all_features(df) print(f" Total features: {len(all_features.columns)}") print(f" Sample shape: {all_features.shape}") print(f" NaN count: {all_features.isna().sum().sum()}") print("\n" + "=" * 60) print("All feature engineering tests passed!")