""" Technical indicators module Implements the 14 essential indicators identified in the analysis """ import pandas as pd import numpy as np from typing import Optional, Dict, Any import pandas_ta as ta from loguru import logger class TechnicalIndicators: """Calculate technical indicators for trading data""" def __init__(self): """Initialize technical indicators calculator""" self.minimal_indicators = [ 'macd_signal', 'macd_histogram', 'rsi', 'sma_10', 'sma_20', 'sar', 'atr', 'obv', 'ad', 'cmf', 'mfi', 'volume_zscore', 'fractals_high', 'fractals_low' ] def calculate_all_indicators( self, df: pd.DataFrame, minimal: bool = True ) -> pd.DataFrame: """ Calculate all technical indicators Args: df: DataFrame with OHLCV data minimal: If True, only calculate minimal set (14 indicators) Returns: DataFrame with indicators added """ df_ind = df.copy() # Ensure we have required columns required = ['open', 'high', 'low', 'close', 'volume'] if not all(col in df_ind.columns for col in required): raise ValueError(f"DataFrame must contain columns: {required}") # MACD macd = ta.macd(df_ind['close'], fast=12, slow=26, signal=9) if macd is not None: df_ind['macd'] = macd['MACD_12_26_9'] df_ind['macd_signal'] = macd['MACDs_12_26_9'] df_ind['macd_histogram'] = macd['MACDh_12_26_9'] # RSI df_ind['rsi'] = ta.rsi(df_ind['close'], length=14) # Simple Moving Averages df_ind['sma_10'] = ta.sma(df_ind['close'], length=10) df_ind['sma_20'] = ta.sma(df_ind['close'], length=20) # Parabolic SAR sar = ta.psar(df_ind['high'], df_ind['low'], df_ind['close']) if sar is not None: df_ind['sar'] = sar.iloc[:, 0] # Get the SAR values # ATR (Average True Range) df_ind['atr'] = ta.atr(df_ind['high'], df_ind['low'], df_ind['close'], length=14) # Volume indicators df_ind['obv'] = ta.obv(df_ind['close'], df_ind['volume']) df_ind['ad'] = ta.ad(df_ind['high'], df_ind['low'], df_ind['close'], df_ind['volume']) df_ind['cmf'] = ta.cmf(df_ind['high'], df_ind['low'], df_ind['close'], df_ind['volume']) df_ind['mfi'] = ta.mfi(df_ind['high'], df_ind['low'], df_ind['close'], df_ind['volume']) # Volume Z-Score df_ind['volume_zscore'] = self._calculate_volume_zscore(df_ind['volume']) # Fractals df_ind['fractals_high'], df_ind['fractals_low'] = self._calculate_fractals( df_ind['high'], df_ind['low'] ) if not minimal: # Add extended indicators df_ind = self._add_extended_indicators(df_ind) # Fill NaN values df_ind = df_ind.fillna(method='ffill').fillna(0) logger.info(f"Calculated {len(df_ind.columns) - len(df.columns)} indicators") return df_ind def _calculate_volume_zscore( self, volume: pd.Series, window: int = 20 ) -> pd.Series: """ Calculate volume Z-score for anomaly detection Args: volume: Volume series window: Rolling window size Returns: Volume Z-score series """ vol_mean = volume.rolling(window=window).mean() vol_std = volume.rolling(window=window).std() # Avoid division by zero vol_std = vol_std.replace(0, 1) zscore = (volume - vol_mean) / vol_std return zscore def _calculate_fractals( self, high: pd.Series, low: pd.Series, n: int = 2 ) -> tuple[pd.Series, pd.Series]: """ Calculate Williams Fractals Args: high: High price series low: Low price series n: Number of bars on each side Returns: Tuple of (bullish fractals, bearish fractals) """ fractals_high = pd.Series(0, index=high.index) fractals_low = pd.Series(0, index=low.index) for i in range(n, len(high) - n): # Bearish fractal (high point) if high.iloc[i] == high.iloc[i-n:i+n+1].max(): fractals_high.iloc[i] = 1 # Bullish fractal (low point) if low.iloc[i] == low.iloc[i-n:i+n+1].min(): fractals_low.iloc[i] = 1 return fractals_high, fractals_low def _add_extended_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add extended set of indicators for experimentation""" # Stochastic stoch = ta.stoch(df['high'], df['low'], df['close']) if stoch is not None: df['stoch_k'] = stoch.iloc[:, 0] df['stoch_d'] = stoch.iloc[:, 1] # CCI df['cci'] = ta.cci(df['high'], df['low'], df['close']) # EMA df['ema_12'] = ta.ema(df['close'], length=12) df['ema_26'] = ta.ema(df['close'], length=26) # ADX adx = ta.adx(df['high'], df['low'], df['close']) if adx is not None: df['adx'] = adx['ADX_14'] # Bollinger Bands bbands = ta.bbands(df['close'], length=20) if bbands is not None: df['bb_upper'] = bbands['BBU_20_2.0'] df['bb_middle'] = bbands['BBM_20_2.0'] df['bb_lower'] = bbands['BBL_20_2.0'] # Keltner Channels kc = ta.kc(df['high'], df['low'], df['close']) if kc is not None: df['kc_upper'] = kc.iloc[:, 0] df['kc_middle'] = kc.iloc[:, 1] df['kc_lower'] = kc.iloc[:, 2] return df def calculate_partial_hour_features( self, df: pd.DataFrame, timeframe: int = 5 ) -> pd.DataFrame: """ Calculate partial hour features to prevent look-ahead bias Based on trading_bot_meta_model implementation Args: df: DataFrame with OHLCV data timeframe: Timeframe in minutes Returns: DataFrame with partial hour features added """ df_partial = df.copy() # Ensure datetime index if not isinstance(df_partial.index, pd.DatetimeIndex): raise ValueError("DataFrame must have datetime index") # Calculate hour truncation df_partial['hour_trunc'] = df_partial.index.floor('H') # Partial hour OHLCV df_partial['open_hr_partial'] = df_partial.groupby('hour_trunc')['open'].transform('first') df_partial['close_hr_partial'] = df_partial['close'] # Current close df_partial['high_hr_partial'] = df_partial.groupby('hour_trunc')['high'].transform('cummax') df_partial['low_hr_partial'] = df_partial.groupby('hour_trunc')['low'].transform('cummin') df_partial['volume_hr_partial'] = df_partial.groupby('hour_trunc')['volume'].transform('cumsum') # Calculate indicators on partial hour data partial_cols = ['open_hr_partial', 'close_hr_partial', 'high_hr_partial', 'low_hr_partial', 'volume_hr_partial'] df_temp = df_partial[partial_cols].copy() df_temp.columns = ['open', 'close', 'high', 'low', 'volume'] # Calculate indicators on partial data df_ind_partial = self.calculate_all_indicators(df_temp, minimal=True) # Rename columns to indicate partial for col in df_ind_partial.columns: if col not in ['open', 'close', 'high', 'low', 'volume']: df_partial[f"{col}_hr_partial"] = df_ind_partial[col] # Drop temporary column df_partial.drop('hour_trunc', axis=1, inplace=True) logger.info(f"Added {len([c for c in df_partial.columns if '_hr_partial' in c])} partial hour features") return df_partial def calculate_rolling_features( self, df: pd.DataFrame, windows: list = [15, 60, 120] ) -> pd.DataFrame: """ Calculate rolling window features Args: df: DataFrame with OHLCV data windows: List of window sizes in minutes (assuming 5-min bars) Returns: DataFrame with rolling features added """ df_roll = df.copy() for window_min in windows: # Convert minutes to number of bars (5-min timeframe) window_bars = window_min // 5 # Rolling aggregations df_roll[f'open_{window_min}m'] = df_roll['open'].shift(window_bars - 1) df_roll[f'high_{window_min}m'] = df_roll['high'].rolling(window_bars).max() df_roll[f'low_{window_min}m'] = df_roll['low'].rolling(window_bars).min() df_roll[f'close_{window_min}m'] = df_roll['close'] # Current close df_roll[f'volume_{window_min}m'] = df_roll['volume'].rolling(window_bars).sum() # Price changes df_roll[f'return_{window_min}m'] = df_roll['close'].pct_change(window_bars) # Volatility df_roll[f'volatility_{window_min}m'] = df_roll['close'].pct_change().rolling(window_bars).std() logger.info(f"Added rolling features for windows: {windows}") return df_roll def transform_to_ratios( self, df: pd.DataFrame, reference_col: str = 'close' ) -> pd.DataFrame: """ Transform price columns to ratios for better model stability Args: df: DataFrame with price data reference_col: Column to use as reference for ratios Returns: DataFrame with ratio transformations """ df_ratio = df.copy() price_cols = ['open', 'high', 'low', 'close'] for col in price_cols: if col in df_ratio.columns and col != reference_col: df_ratio[f'{col}_ratio'] = (df_ratio[col] / df_ratio[reference_col]) - 1 # Volume ratio to mean if 'volume' in df_ratio.columns: vol_mean = df_ratio['volume'].rolling(20).mean() df_ratio['volume_ratio'] = df_ratio['volume'] / vol_mean.fillna(1) logger.info("Transformed prices to ratios") return df_ratio if __name__ == "__main__": # Test indicators calculation # Create sample data dates = pd.date_range(start='2024-01-01', periods=1000, freq='5min') np.random.seed(42) df_test = pd.DataFrame({ 'open': 100 + np.random.randn(1000).cumsum(), 'high': 102 + np.random.randn(1000).cumsum(), 'low': 98 + np.random.randn(1000).cumsum(), 'close': 100 + np.random.randn(1000).cumsum(), 'volume': np.random.randint(1000, 10000, 1000) }, index=dates) # Ensure high > low df_test['high'] = df_test[['open', 'high', 'close']].max(axis=1) df_test['low'] = df_test[['open', 'low', 'close']].min(axis=1) # Calculate indicators indicators = TechnicalIndicators() # Test minimal indicators df_with_ind = indicators.calculate_all_indicators(df_test, minimal=True) print(f"Calculated indicators: {[c for c in df_with_ind.columns if c not in df_test.columns]}") # Test partial hour features df_partial = indicators.calculate_partial_hour_features(df_with_ind) partial_cols = [c for c in df_partial.columns if '_hr_partial' in c] print(f"\nPartial hour features ({len(partial_cols)}): {partial_cols[:5]}...") # Test rolling features df_roll = indicators.calculate_rolling_features(df_test, windows=[15, 60]) roll_cols = [c for c in df_roll.columns if 'm' in c and c not in df_test.columns] print(f"\nRolling features: {roll_cols}") # Test ratio transformation df_ratio = indicators.transform_to_ratios(df_test) ratio_cols = [c for c in df_ratio.columns if 'ratio' in c] print(f"\nRatio features: {ratio_cols}")