#!/usr/bin/env python3 """ Tests for Attention Architecture Module ======================================== Comprehensive unit tests for the attention module components: - MultiHeadAttention: Core multi-head attention mechanism - LearnablePositionalEncoding: Time-agnostic position embeddings - PriceFocusedAttention: Main transformer encoder model - AttentionExtractor: Utilities for attention analysis Uses pytest and torch.testing for assertions. Author: ML-Specialist (NEXUS v4.0) Version: 1.0.0 Created: 2026-01-25 """ import pytest import numpy as np from pathlib import Path import torch import torch.nn as nn # Import the modules under test import sys sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) from models.attention import ( MultiHeadAttention, LearnablePositionalEncoding, PriceFocusedAttention, PriceAttentionConfig, AttentionExtractor, AttentionScores, create_causal_mask, compute_return_features, ) # ============================================================================== # Test Fixtures # ============================================================================== @pytest.fixture def device(): """Return the appropriate device for testing.""" return torch.device('cuda' if torch.cuda.is_available() else 'cpu') @pytest.fixture def batch_size(): """Default batch size for tests.""" return 4 @pytest.fixture def seq_len(): """Default sequence length for tests.""" return 32 @pytest.fixture def d_model(): """Default model dimension for tests.""" return 64 @pytest.fixture def n_heads(): """Default number of attention heads for tests.""" return 8 @pytest.fixture def input_features(): """Default number of input features for tests.""" return 4 @pytest.fixture def sample_input(batch_size, seq_len, d_model, device): """Create sample input tensor for attention tests.""" torch.manual_seed(42) return torch.randn(batch_size, seq_len, d_model, device=device) @pytest.fixture def sample_price_input(batch_size, seq_len, input_features, device): """Create sample price-based input tensor.""" torch.manual_seed(42) return torch.randn(batch_size, seq_len, input_features, device=device) @pytest.fixture def multi_head_attention(d_model, n_heads, device): """Create MultiHeadAttention instance for testing.""" mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=0.0) return mha.to(device) @pytest.fixture def positional_encoding(d_model, device): """Create LearnablePositionalEncoding instance for testing.""" pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0) return pe.to(device) @pytest.fixture def price_attention_config(d_model, n_heads, input_features): """Create PriceAttentionConfig for testing.""" return PriceAttentionConfig( d_model=d_model, n_heads=n_heads, d_k=d_model // n_heads, d_v=d_model // n_heads, n_layers=2, d_ff=d_model * 4, max_seq_len=512, dropout=0.0, attention_dropout=0.0, input_features=input_features, pre_norm=True, ) @pytest.fixture def price_focused_attention(price_attention_config, input_features, device): """Create PriceFocusedAttention instance for testing.""" model = PriceFocusedAttention( config=price_attention_config, input_features=input_features ) return model.to(device) # ============================================================================== # Tests for MultiHeadAttention # ============================================================================== class TestMultiHeadAttention: """Tests for MultiHeadAttention module.""" def test_output_shape(self, multi_head_attention, sample_input, batch_size, seq_len, d_model): """Verify that output has correct shape (batch, seq_len, d_model).""" output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input) assert output.shape == (batch_size, seq_len, d_model), \ f"Expected output shape ({batch_size}, {seq_len}, {d_model}), got {output.shape}" def test_attention_weights_shape(self, multi_head_attention, sample_input, batch_size, seq_len, n_heads): """Verify attention weights have correct shape (batch, n_heads, seq_len, seq_len).""" output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input) assert attn_weights is not None, "Attention weights should not be None" assert attn_weights.shape == (batch_size, n_heads, seq_len, seq_len), \ f"Expected attention weights shape ({batch_size}, {n_heads}, {seq_len}, {seq_len}), got {attn_weights.shape}" def test_attention_weights_sum_to_one(self, multi_head_attention, sample_input): """Verify that attention weights sum to 1 along the key dimension (softmax).""" output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input) # Sum along the last dimension (keys) weight_sums = attn_weights.sum(dim=-1) # All sums should be approximately 1.0 expected_ones = torch.ones_like(weight_sums) torch.testing.assert_close( weight_sums, expected_ones, atol=1e-5, rtol=1e-5, msg="Attention weights should sum to 1 along key dimension" ) def test_causal_mask(self, multi_head_attention, sample_input, batch_size, seq_len, n_heads, device): """Verify that causal mask prevents attending to future positions.""" # Create causal mask causal_mask = create_causal_mask(seq_len, device=device) # Forward with causal mask output, attn_weights = multi_head_attention( sample_input, sample_input, sample_input, mask=causal_mask ) # Check that attention to future positions is zero (masked out) # Upper triangular part (excluding diagonal) should be ~0 for b in range(batch_size): for h in range(n_heads): attention_matrix = attn_weights[b, h].cpu() for i in range(seq_len): for j in range(i + 1, seq_len): assert attention_matrix[i, j] < 1e-6, \ f"Position ({i}, {j}) should be masked (near zero), got {attention_matrix[i, j]}" def test_no_attention_returned_when_disabled(self, d_model, n_heads, device, sample_input): """Verify attention weights are None when return_attention is False.""" mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads).to(device) output, attn_weights = mha( sample_input, sample_input, sample_input, return_attention=False ) assert attn_weights is None, "Attention weights should be None when return_attention=False" def test_different_q_k_v_shapes(self, d_model, n_heads, device): """Verify MHA works with different query and key/value sequence lengths.""" torch.manual_seed(42) mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=0.0).to(device) batch_size = 2 seq_len_q = 16 seq_len_kv = 24 query = torch.randn(batch_size, seq_len_q, d_model, device=device) key = torch.randn(batch_size, seq_len_kv, d_model, device=device) value = torch.randn(batch_size, seq_len_kv, d_model, device=device) output, attn_weights = mha(query, key, value) assert output.shape == (batch_size, seq_len_q, d_model) assert attn_weights.shape == (batch_size, n_heads, seq_len_q, seq_len_kv) def test_gradients_flow_correctly(self, multi_head_attention, sample_input): """Verify that gradients flow through the attention mechanism.""" sample_input.requires_grad_(True) output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input) # Compute loss and backward loss = output.sum() loss.backward() assert sample_input.grad is not None, "Input should have gradients" assert not torch.all(sample_input.grad == 0), "Gradients should not be all zeros" # ============================================================================== # Tests for LearnablePositionalEncoding # ============================================================================== class TestLearnablePositionalEncoding: """Tests for LearnablePositionalEncoding module.""" def test_encoding_shape(self, positional_encoding, sample_input, batch_size, seq_len, d_model): """Verify that output shape matches input shape.""" output = positional_encoding(sample_input) assert output.shape == sample_input.shape, \ f"Output shape {output.shape} should match input shape {sample_input.shape}" def test_no_temporal_dependency(self, d_model, device): """Verify encoding does not depend on actual timestamps, only sequence position.""" pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0).to(device) torch.manual_seed(42) batch_size = 2 seq_len = 16 # Create two different input tensors x1 = torch.randn(batch_size, seq_len, d_model, device=device) x2 = torch.randn(batch_size, seq_len, d_model, device=device) * 2.0 # Different scale # Get position embeddings (the added positions should be the same) output1 = pe(x1) output2 = pe(x2) # The position embeddings added should be the same for both # (output - input) gives us the position embeddings pe1 = output1 - x1 pe2 = output2 - x2 torch.testing.assert_close( pe1, pe2, atol=1e-5, rtol=1e-5, msg="Position embeddings should be identical regardless of input values" ) def test_learnable_parameters(self, positional_encoding, d_model): """Verify that position embeddings are learnable parameters.""" # Check that position_embeddings is a Parameter assert hasattr(positional_encoding, 'position_embeddings'), \ "Should have position_embeddings attribute" assert isinstance(positional_encoding.position_embeddings, nn.Parameter), \ "position_embeddings should be nn.Parameter" # Check it requires gradients assert positional_encoding.position_embeddings.requires_grad, \ "position_embeddings should require gradients" # Check shape max_seq_len = 512 assert positional_encoding.position_embeddings.shape == (max_seq_len, d_model), \ f"Expected shape ({max_seq_len}, {d_model}), got {positional_encoding.position_embeddings.shape}" def test_offset_parameter_works(self, d_model, device): """Verify that offset parameter shifts position indices correctly.""" pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0).to(device) torch.manual_seed(42) batch_size = 2 seq_len = 10 # Use zeros as input so we can see only the position embeddings x = torch.zeros(batch_size, seq_len, d_model, device=device) output_no_offset = pe(x, offset=0) output_with_offset = pe(x, offset=5) # output_no_offset positions 5:15 should equal output_with_offset positions 0:10 torch.testing.assert_close( output_no_offset[:, 5:, :], output_with_offset[:, :5, :], atol=1e-5, rtol=1e-5, msg="Offset should shift position embeddings correctly" ) def test_exceeds_max_seq_len_raises_error(self, positional_encoding, d_model, device): """Verify that exceeding max_seq_len raises ValueError.""" x = torch.randn(1, 600, d_model, device=device) # 600 > 512 (max_seq_len) with pytest.raises(ValueError, match="exceeds maximum sequence length"): positional_encoding(x) def test_get_position_embedding(self, positional_encoding, d_model): """Verify get_position_embedding returns correct embedding.""" position = 10 embedding = positional_encoding.get_position_embedding(position) assert embedding.shape == (d_model,), \ f"Expected shape ({d_model},), got {embedding.shape}" # Should match the parameter at that position torch.testing.assert_close( embedding, positional_encoding.position_embeddings[position], msg="get_position_embedding should return correct position embedding" ) # ============================================================================== # Tests for PriceFocusedAttention # ============================================================================== class TestPriceFocusedAttention: """Tests for PriceFocusedAttention model.""" def test_forward_pass(self, price_focused_attention, sample_price_input): """Verify forward pass completes without error.""" output, attentions = price_focused_attention(sample_price_input) assert output is not None, "Output should not be None" assert attentions is not None, "Attentions should not be None" assert len(attentions) > 0, "Should have at least one attention tensor" def test_output_shape(self, price_focused_attention, sample_price_input, batch_size, seq_len, d_model): """Verify output has correct shape (batch, seq_len, d_model).""" output, attentions = price_focused_attention(sample_price_input) assert output.shape == (batch_size, seq_len, d_model), \ f"Expected output shape ({batch_size}, {seq_len}, {d_model}), got {output.shape}" def test_no_nan_gradients(self, price_focused_attention, sample_price_input): """Verify gradients are stable (no NaN values).""" sample_price_input.requires_grad_(True) output, attentions = price_focused_attention(sample_price_input) # Compute loss and backward loss = output.sum() loss.backward() # Check for NaN in gradients assert sample_price_input.grad is not None, "Input should have gradients" assert not torch.isnan(sample_price_input.grad).any(), \ "Gradients should not contain NaN values" assert not torch.isinf(sample_price_input.grad).any(), \ "Gradients should not contain Inf values" # Check model parameters for NaN gradients for name, param in price_focused_attention.named_parameters(): if param.grad is not None: assert not torch.isnan(param.grad).any(), \ f"Parameter {name} has NaN gradients" assert not torch.isinf(param.grad).any(), \ f"Parameter {name} has Inf gradients" def test_compute_return_features(self, device): """Verify compute_return_features produces valid features.""" torch.manual_seed(42) batch_size = 4 seq_len = 50 # Create OHLC data (open, high, low, close) base_price = torch.ones(batch_size, seq_len, 1, device=device) * 100.0 noise = torch.randn(batch_size, seq_len, 1, device=device) * 2.0 open_price = base_price + noise high = base_price + torch.abs(torch.randn(batch_size, seq_len, 1, device=device)) * 3.0 low = base_price - torch.abs(torch.randn(batch_size, seq_len, 1, device=device)) * 3.0 close = base_price + noise * 0.5 prices = torch.cat([open_price, high, low, close], dim=-1) features = compute_return_features(prices) # Check shape assert features.shape == (batch_size, seq_len, 4), \ f"Expected features shape ({batch_size}, {seq_len}, 4), got {features.shape}" # Check no NaN (except possibly first row due to returns) assert not torch.isnan(features[:, 1:, :]).any(), \ "Features should not have NaN values (after first row)" # Check returns are reasonable (not extreme values) returns = features[:, 1:, 0] # First feature is returns assert torch.abs(returns).max() < 1.0, \ "Returns should be reasonable (< 100%)" def test_return_all_attentions(self, price_focused_attention, sample_price_input, price_attention_config): """Verify return_all_attentions returns attention from all layers.""" output, attentions = price_focused_attention( sample_price_input, return_all_attentions=True ) expected_n_layers = price_attention_config.n_layers assert len(attentions) == expected_n_layers, \ f"Expected {expected_n_layers} attention tensors, got {len(attentions)}" def test_encode_sequence_pooling(self, price_focused_attention, sample_price_input, batch_size, d_model): """Verify encode_sequence with different pooling methods.""" for pooling in ["last", "first", "mean", "max"]: encoded = price_focused_attention.encode_sequence(sample_price_input, pooling=pooling) assert encoded.shape == (batch_size, d_model), \ f"Pooling '{pooling}' should produce shape ({batch_size}, {d_model}), got {encoded.shape}" def test_get_attention_scores(self, price_focused_attention, sample_price_input, batch_size, n_heads, seq_len): """Verify get_attention_scores returns correct shape.""" attention_scores = price_focused_attention.get_attention_scores(sample_price_input, layer_idx=-1) assert attention_scores.shape == (batch_size, n_heads, seq_len, seq_len), \ f"Expected shape ({batch_size}, {n_heads}, {seq_len}, {seq_len}), got {attention_scores.shape}" def test_deterministic_with_eval_mode(self, price_focused_attention, sample_price_input): """Verify model produces deterministic outputs in eval mode.""" price_focused_attention.eval() with torch.no_grad(): output1, _ = price_focused_attention(sample_price_input) output2, _ = price_focused_attention(sample_price_input) torch.testing.assert_close( output1, output2, msg="Model should produce identical outputs in eval mode" ) # ============================================================================== # Tests for AttentionExtractor # ============================================================================== class TestAttentionExtractor: """Tests for AttentionExtractor utility class.""" @pytest.fixture def extractor(self): """Create AttentionExtractor instance for testing.""" return AttentionExtractor() def test_extract_scores(self, extractor, price_focused_attention, sample_price_input): """Verify attention score extraction works correctly.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) assert isinstance(scores, AttentionScores), \ "Should return AttentionScores object" assert scores.scores is not None, \ "Scores array should not be None" assert len(scores.scores.shape) == 4, \ "Scores should have 4 dimensions (batch, heads, seq, seq)" def test_extract_scores_specific_layer(self, extractor, price_focused_attention, sample_price_input): """Verify extraction from specific layer.""" scores_layer0 = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=0 ) scores_layer1 = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=1 ) assert scores_layer0.layer_idx == 0 assert scores_layer1.layer_idx == 1 # Scores from different layers should be different assert not np.allclose(scores_layer0.scores, scores_layer1.scores), \ "Different layers should produce different attention patterns" def test_extract_scores_specific_head(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len): """Verify extraction for specific attention head.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1, head_idx=0 ) assert scores.head_idx == 0 # When extracting single head, second dimension should be 1 assert scores.scores.shape[1] == 1, \ f"Expected 1 head, got {scores.scores.shape[1]}" assert scores.scores.shape == (batch_size, 1, seq_len, seq_len) def test_compute_statistics(self, extractor, price_focused_attention, sample_price_input): """Verify attention statistics computation.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) stats = extractor.compute_attention_statistics(scores) # Check required keys exist assert 'global' in stats, "Stats should have 'global' key" assert 'per_head' in stats, "Stats should have 'per_head' key" assert 'diagonal_attention_mean' in stats, "Stats should have 'diagonal_attention_mean' key" assert 'sparsity' in stats, "Stats should have 'sparsity' key" # Check global stats global_stats = stats['global'] assert 'mean' in global_stats assert 'std' in global_stats assert 'max' in global_stats assert 'min' in global_stats # Mean should be reasonable for softmax outputs assert 0.0 <= global_stats['mean'] <= 1.0, \ "Mean attention should be between 0 and 1" # Min should be >= 0 (softmax outputs) assert global_stats['min'] >= 0.0, \ "Min attention should be >= 0" # Max should be <= 1 (softmax outputs) assert global_stats['max'] <= 1.0, \ "Max attention should be <= 1" def test_compute_statistics_per_head(self, extractor, price_focused_attention, sample_price_input, n_heads): """Verify per-head statistics computation.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) stats = extractor.compute_attention_statistics(scores) per_head = stats['per_head'] assert len(per_head) == n_heads, \ f"Should have stats for {n_heads} heads, got {len(per_head)}" for head_stat in per_head: assert 'head' in head_stat assert 'mean' in head_stat assert 'std' in head_stat assert 'max' in head_stat assert 'entropy' in head_stat def test_attention_scores_mean_attention(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len): """Verify AttentionScores.mean_attention method.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) mean_attn = scores.mean_attention() assert mean_attn.shape == (batch_size, seq_len, seq_len), \ f"Mean attention should have shape ({batch_size}, {seq_len}, {seq_len}), got {mean_attn.shape}" def test_attention_scores_head_attention(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len): """Verify AttentionScores.head_attention method.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) head_attn = scores.head_attention(0) assert head_attn.shape == (batch_size, seq_len, seq_len), \ f"Head attention should have shape ({batch_size}, {seq_len}, {seq_len}), got {head_attn.shape}" def test_attention_scores_to_dict(self, extractor, price_focused_attention, sample_price_input): """Verify AttentionScores serialization to dict.""" metadata = {'symbol': 'XAUUSD', 'timeframe': '15m'} scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1, metadata=metadata ) scores_dict = scores.to_dict() assert 'scores' in scores_dict assert 'layer_idx' in scores_dict assert 'sequence_len' in scores_dict assert 'n_heads' in scores_dict assert 'metadata' in scores_dict assert scores_dict['metadata'] == metadata def test_sparsity_computation(self, extractor, price_focused_attention, sample_price_input): """Verify sparsity metric is computed correctly.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) stats = extractor.compute_attention_statistics(scores) # Sparsity should be between 0 and 1 assert 0.0 <= stats['sparsity'] <= 1.0, \ f"Sparsity should be between 0 and 1, got {stats['sparsity']}" def test_diagonal_attention(self, extractor, price_focused_attention, sample_price_input): """Verify diagonal attention mean is computed correctly.""" scores = extractor.get_attention_scores( price_focused_attention, sample_price_input, layer_idx=-1 ) stats = extractor.compute_attention_statistics(scores) # Diagonal attention mean should be between 0 and 1 assert 0.0 <= stats['diagonal_attention_mean'] <= 1.0, \ f"Diagonal attention mean should be between 0 and 1, got {stats['diagonal_attention_mean']}" # ============================================================================== # Integration Tests # ============================================================================== class TestAttentionIntegration: """Integration tests for the attention module.""" def test_full_pipeline(self, device): """Test complete pipeline from input to attention extraction.""" torch.manual_seed(42) # Create config config = PriceAttentionConfig( d_model=64, n_heads=4, d_k=16, d_v=16, n_layers=2, d_ff=128, dropout=0.0, attention_dropout=0.0, input_features=4, ) # Create model model = PriceFocusedAttention(config, input_features=4).to(device) model.eval() # Create input batch_size = 2 seq_len = 20 x = torch.randn(batch_size, seq_len, 4, device=device) # Forward pass output, attentions = model(x, return_all_attentions=True) # Verify output assert output.shape == (batch_size, seq_len, config.d_model) assert len(attentions) == config.n_layers # Extract and analyze attention extractor = AttentionExtractor() scores = extractor.get_attention_scores(model, x, layer_idx=-1) # Compute statistics stats = extractor.compute_attention_statistics(scores) # Verify stats are reasonable assert stats['global']['mean'] > 0 assert stats['global']['max'] <= 1.0 assert len(stats['per_head']) == config.n_heads def test_training_step_simulation(self, price_focused_attention, sample_price_input, d_model): """Simulate a training step to verify gradients work correctly.""" price_focused_attention.train() # Create a simple output head output_head = nn.Linear(d_model, 1).to(sample_price_input.device) # Forward pass output, _ = price_focused_attention(sample_price_input) prediction = output_head(output[:, -1, :]) # Use last position # Create fake target target = torch.randn_like(prediction) # Compute loss loss = nn.MSELoss()(prediction, target) # Backward loss.backward() # Verify gradients exist for all parameters for name, param in price_focused_attention.named_parameters(): if param.requires_grad: assert param.grad is not None, f"Parameter {name} should have gradient" def test_causal_attention_pattern(self, price_focused_attention, sample_price_input, batch_size, n_heads, seq_len, device): """Verify causal attention produces lower-triangular attention pattern.""" price_focused_attention.eval() # Create causal mask causal_mask = create_causal_mask(seq_len, device=device) # Forward with causal mask with torch.no_grad(): output, attentions = price_focused_attention( sample_price_input, mask=causal_mask, return_all_attentions=True ) # Check last layer attention pattern last_attention = attentions[-1].cpu().numpy() # Upper triangular (excluding diagonal) should be near zero for b in range(batch_size): for h in range(n_heads): for i in range(seq_len): for j in range(i + 1, seq_len): assert last_attention[b, h, i, j] < 1e-5, \ f"Future position ({i}, {j}) should be masked" # ============================================================================== # Edge Case Tests # ============================================================================== class TestEdgeCases: """Tests for edge cases and boundary conditions.""" def test_single_sequence_batch(self, device): """Test with batch size of 1.""" config = PriceAttentionConfig(d_model=32, n_heads=4, input_features=4) model = PriceFocusedAttention(config, input_features=4).to(device) x = torch.randn(1, 16, 4, device=device) output, attentions = model(x) assert output.shape == (1, 16, 32) def test_short_sequence(self, device): """Test with very short sequence (length 2).""" config = PriceAttentionConfig(d_model=32, n_heads=4, input_features=4) model = PriceFocusedAttention(config, input_features=4).to(device) x = torch.randn(2, 2, 4, device=device) # Sequence length 2 output, attentions = model(x) assert output.shape == (2, 2, 32) def test_attention_scores_property(self): """Test AttentionScores dataclass properties.""" scores_array = np.random.rand(2, 4, 10, 10) scores = AttentionScores( scores=scores_array, layer_idx=1, head_idx=None, n_heads=4 ) assert scores.shape == (2, 4, 10, 10) assert scores.sequence_len == 10 def test_multi_head_attention_dimension_validation(self, device): """Test that invalid dimensions raise appropriate errors.""" with pytest.raises(ValueError): # d_k * n_heads != d_model should raise error MultiHeadAttention(d_model=64, n_heads=8, d_k=10) # 10 * 8 = 80 != 64 if __name__ == '__main__': pytest.main([__file__, '-v', '--tb=short'])