trading-platform-ml-engine-v2/tests/test_attention_architecture.py
Adrian Flores Cortes d015e2b0f3 feat(ml-engine): Phase 4 - PostgreSQL migration, dynamic OOS, data pipeline
- Fix database.py: Add DatabaseConnection alias for backward compat
- Fix train_symbol_timeframe_models.py: Use PostgreSQLConnection + native queries
- Fix run_oos_backtest.py: Fix broken import + add dynamic OOS support
- Update data_splitter.py: split_dynamic_oos() method (from previous session)
- Update validation_oos.yaml: Dynamic OOS config + all 6 symbols enabled
- Create ingest_ohlcv_polygon.py: Standalone Polygon→PostgreSQL ingestion script
- Fix .gitignore: /data/ instead of data/ to not ignore src/data/
- Add untracked src/ modules: backtesting, data, llm, models (attention/metamodel/strategies)
- Add aiohttp, sqlalchemy, psycopg2-binary to requirements.txt

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 04:39:05 -06:00

803 lines
31 KiB
Python

#!/usr/bin/env python3
"""
Tests for Attention Architecture Module
========================================
Comprehensive unit tests for the attention module components:
- MultiHeadAttention: Core multi-head attention mechanism
- LearnablePositionalEncoding: Time-agnostic position embeddings
- PriceFocusedAttention: Main transformer encoder model
- AttentionExtractor: Utilities for attention analysis
Uses pytest and torch.testing for assertions.
Author: ML-Specialist (NEXUS v4.0)
Version: 1.0.0
Created: 2026-01-25
"""
import pytest
import numpy as np
from pathlib import Path
import torch
import torch.nn as nn
# Import the modules under test
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from models.attention import (
MultiHeadAttention,
LearnablePositionalEncoding,
PriceFocusedAttention,
PriceAttentionConfig,
AttentionExtractor,
AttentionScores,
create_causal_mask,
compute_return_features,
)
# ==============================================================================
# Test Fixtures
# ==============================================================================
@pytest.fixture
def device():
"""Return the appropriate device for testing."""
return torch.device('cuda' if torch.cuda.is_available() else 'cpu')
@pytest.fixture
def batch_size():
"""Default batch size for tests."""
return 4
@pytest.fixture
def seq_len():
"""Default sequence length for tests."""
return 32
@pytest.fixture
def d_model():
"""Default model dimension for tests."""
return 64
@pytest.fixture
def n_heads():
"""Default number of attention heads for tests."""
return 8
@pytest.fixture
def input_features():
"""Default number of input features for tests."""
return 4
@pytest.fixture
def sample_input(batch_size, seq_len, d_model, device):
"""Create sample input tensor for attention tests."""
torch.manual_seed(42)
return torch.randn(batch_size, seq_len, d_model, device=device)
@pytest.fixture
def sample_price_input(batch_size, seq_len, input_features, device):
"""Create sample price-based input tensor."""
torch.manual_seed(42)
return torch.randn(batch_size, seq_len, input_features, device=device)
@pytest.fixture
def multi_head_attention(d_model, n_heads, device):
"""Create MultiHeadAttention instance for testing."""
mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=0.0)
return mha.to(device)
@pytest.fixture
def positional_encoding(d_model, device):
"""Create LearnablePositionalEncoding instance for testing."""
pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0)
return pe.to(device)
@pytest.fixture
def price_attention_config(d_model, n_heads, input_features):
"""Create PriceAttentionConfig for testing."""
return PriceAttentionConfig(
d_model=d_model,
n_heads=n_heads,
d_k=d_model // n_heads,
d_v=d_model // n_heads,
n_layers=2,
d_ff=d_model * 4,
max_seq_len=512,
dropout=0.0,
attention_dropout=0.0,
input_features=input_features,
pre_norm=True,
)
@pytest.fixture
def price_focused_attention(price_attention_config, input_features, device):
"""Create PriceFocusedAttention instance for testing."""
model = PriceFocusedAttention(
config=price_attention_config,
input_features=input_features
)
return model.to(device)
# ==============================================================================
# Tests for MultiHeadAttention
# ==============================================================================
class TestMultiHeadAttention:
"""Tests for MultiHeadAttention module."""
def test_output_shape(self, multi_head_attention, sample_input, batch_size, seq_len, d_model):
"""Verify that output has correct shape (batch, seq_len, d_model)."""
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
assert output.shape == (batch_size, seq_len, d_model), \
f"Expected output shape ({batch_size}, {seq_len}, {d_model}), got {output.shape}"
def test_attention_weights_shape(self, multi_head_attention, sample_input, batch_size, seq_len, n_heads):
"""Verify attention weights have correct shape (batch, n_heads, seq_len, seq_len)."""
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
assert attn_weights is not None, "Attention weights should not be None"
assert attn_weights.shape == (batch_size, n_heads, seq_len, seq_len), \
f"Expected attention weights shape ({batch_size}, {n_heads}, {seq_len}, {seq_len}), got {attn_weights.shape}"
def test_attention_weights_sum_to_one(self, multi_head_attention, sample_input):
"""Verify that attention weights sum to 1 along the key dimension (softmax)."""
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
# Sum along the last dimension (keys)
weight_sums = attn_weights.sum(dim=-1)
# All sums should be approximately 1.0
expected_ones = torch.ones_like(weight_sums)
torch.testing.assert_close(
weight_sums, expected_ones,
atol=1e-5, rtol=1e-5,
msg="Attention weights should sum to 1 along key dimension"
)
def test_causal_mask(self, multi_head_attention, sample_input, batch_size, seq_len, n_heads, device):
"""Verify that causal mask prevents attending to future positions."""
# Create causal mask
causal_mask = create_causal_mask(seq_len, device=device)
# Forward with causal mask
output, attn_weights = multi_head_attention(
sample_input, sample_input, sample_input,
mask=causal_mask
)
# Check that attention to future positions is zero (masked out)
# Upper triangular part (excluding diagonal) should be ~0
for b in range(batch_size):
for h in range(n_heads):
attention_matrix = attn_weights[b, h].cpu()
for i in range(seq_len):
for j in range(i + 1, seq_len):
assert attention_matrix[i, j] < 1e-6, \
f"Position ({i}, {j}) should be masked (near zero), got {attention_matrix[i, j]}"
def test_no_attention_returned_when_disabled(self, d_model, n_heads, device, sample_input):
"""Verify attention weights are None when return_attention is False."""
mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads).to(device)
output, attn_weights = mha(
sample_input, sample_input, sample_input,
return_attention=False
)
assert attn_weights is None, "Attention weights should be None when return_attention=False"
def test_different_q_k_v_shapes(self, d_model, n_heads, device):
"""Verify MHA works with different query and key/value sequence lengths."""
torch.manual_seed(42)
mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=0.0).to(device)
batch_size = 2
seq_len_q = 16
seq_len_kv = 24
query = torch.randn(batch_size, seq_len_q, d_model, device=device)
key = torch.randn(batch_size, seq_len_kv, d_model, device=device)
value = torch.randn(batch_size, seq_len_kv, d_model, device=device)
output, attn_weights = mha(query, key, value)
assert output.shape == (batch_size, seq_len_q, d_model)
assert attn_weights.shape == (batch_size, n_heads, seq_len_q, seq_len_kv)
def test_gradients_flow_correctly(self, multi_head_attention, sample_input):
"""Verify that gradients flow through the attention mechanism."""
sample_input.requires_grad_(True)
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
# Compute loss and backward
loss = output.sum()
loss.backward()
assert sample_input.grad is not None, "Input should have gradients"
assert not torch.all(sample_input.grad == 0), "Gradients should not be all zeros"
# ==============================================================================
# Tests for LearnablePositionalEncoding
# ==============================================================================
class TestLearnablePositionalEncoding:
"""Tests for LearnablePositionalEncoding module."""
def test_encoding_shape(self, positional_encoding, sample_input, batch_size, seq_len, d_model):
"""Verify that output shape matches input shape."""
output = positional_encoding(sample_input)
assert output.shape == sample_input.shape, \
f"Output shape {output.shape} should match input shape {sample_input.shape}"
def test_no_temporal_dependency(self, d_model, device):
"""Verify encoding does not depend on actual timestamps, only sequence position."""
pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0).to(device)
torch.manual_seed(42)
batch_size = 2
seq_len = 16
# Create two different input tensors
x1 = torch.randn(batch_size, seq_len, d_model, device=device)
x2 = torch.randn(batch_size, seq_len, d_model, device=device) * 2.0 # Different scale
# Get position embeddings (the added positions should be the same)
output1 = pe(x1)
output2 = pe(x2)
# The position embeddings added should be the same for both
# (output - input) gives us the position embeddings
pe1 = output1 - x1
pe2 = output2 - x2
torch.testing.assert_close(
pe1, pe2,
atol=1e-5, rtol=1e-5,
msg="Position embeddings should be identical regardless of input values"
)
def test_learnable_parameters(self, positional_encoding, d_model):
"""Verify that position embeddings are learnable parameters."""
# Check that position_embeddings is a Parameter
assert hasattr(positional_encoding, 'position_embeddings'), \
"Should have position_embeddings attribute"
assert isinstance(positional_encoding.position_embeddings, nn.Parameter), \
"position_embeddings should be nn.Parameter"
# Check it requires gradients
assert positional_encoding.position_embeddings.requires_grad, \
"position_embeddings should require gradients"
# Check shape
max_seq_len = 512
assert positional_encoding.position_embeddings.shape == (max_seq_len, d_model), \
f"Expected shape ({max_seq_len}, {d_model}), got {positional_encoding.position_embeddings.shape}"
def test_offset_parameter_works(self, d_model, device):
"""Verify that offset parameter shifts position indices correctly."""
pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0).to(device)
torch.manual_seed(42)
batch_size = 2
seq_len = 10
# Use zeros as input so we can see only the position embeddings
x = torch.zeros(batch_size, seq_len, d_model, device=device)
output_no_offset = pe(x, offset=0)
output_with_offset = pe(x, offset=5)
# output_no_offset positions 5:15 should equal output_with_offset positions 0:10
torch.testing.assert_close(
output_no_offset[:, 5:, :],
output_with_offset[:, :5, :],
atol=1e-5, rtol=1e-5,
msg="Offset should shift position embeddings correctly"
)
def test_exceeds_max_seq_len_raises_error(self, positional_encoding, d_model, device):
"""Verify that exceeding max_seq_len raises ValueError."""
x = torch.randn(1, 600, d_model, device=device) # 600 > 512 (max_seq_len)
with pytest.raises(ValueError, match="exceeds maximum sequence length"):
positional_encoding(x)
def test_get_position_embedding(self, positional_encoding, d_model):
"""Verify get_position_embedding returns correct embedding."""
position = 10
embedding = positional_encoding.get_position_embedding(position)
assert embedding.shape == (d_model,), \
f"Expected shape ({d_model},), got {embedding.shape}"
# Should match the parameter at that position
torch.testing.assert_close(
embedding,
positional_encoding.position_embeddings[position],
msg="get_position_embedding should return correct position embedding"
)
# ==============================================================================
# Tests for PriceFocusedAttention
# ==============================================================================
class TestPriceFocusedAttention:
"""Tests for PriceFocusedAttention model."""
def test_forward_pass(self, price_focused_attention, sample_price_input):
"""Verify forward pass completes without error."""
output, attentions = price_focused_attention(sample_price_input)
assert output is not None, "Output should not be None"
assert attentions is not None, "Attentions should not be None"
assert len(attentions) > 0, "Should have at least one attention tensor"
def test_output_shape(self, price_focused_attention, sample_price_input, batch_size, seq_len, d_model):
"""Verify output has correct shape (batch, seq_len, d_model)."""
output, attentions = price_focused_attention(sample_price_input)
assert output.shape == (batch_size, seq_len, d_model), \
f"Expected output shape ({batch_size}, {seq_len}, {d_model}), got {output.shape}"
def test_no_nan_gradients(self, price_focused_attention, sample_price_input):
"""Verify gradients are stable (no NaN values)."""
sample_price_input.requires_grad_(True)
output, attentions = price_focused_attention(sample_price_input)
# Compute loss and backward
loss = output.sum()
loss.backward()
# Check for NaN in gradients
assert sample_price_input.grad is not None, "Input should have gradients"
assert not torch.isnan(sample_price_input.grad).any(), \
"Gradients should not contain NaN values"
assert not torch.isinf(sample_price_input.grad).any(), \
"Gradients should not contain Inf values"
# Check model parameters for NaN gradients
for name, param in price_focused_attention.named_parameters():
if param.grad is not None:
assert not torch.isnan(param.grad).any(), \
f"Parameter {name} has NaN gradients"
assert not torch.isinf(param.grad).any(), \
f"Parameter {name} has Inf gradients"
def test_compute_return_features(self, device):
"""Verify compute_return_features produces valid features."""
torch.manual_seed(42)
batch_size = 4
seq_len = 50
# Create OHLC data (open, high, low, close)
base_price = torch.ones(batch_size, seq_len, 1, device=device) * 100.0
noise = torch.randn(batch_size, seq_len, 1, device=device) * 2.0
open_price = base_price + noise
high = base_price + torch.abs(torch.randn(batch_size, seq_len, 1, device=device)) * 3.0
low = base_price - torch.abs(torch.randn(batch_size, seq_len, 1, device=device)) * 3.0
close = base_price + noise * 0.5
prices = torch.cat([open_price, high, low, close], dim=-1)
features = compute_return_features(prices)
# Check shape
assert features.shape == (batch_size, seq_len, 4), \
f"Expected features shape ({batch_size}, {seq_len}, 4), got {features.shape}"
# Check no NaN (except possibly first row due to returns)
assert not torch.isnan(features[:, 1:, :]).any(), \
"Features should not have NaN values (after first row)"
# Check returns are reasonable (not extreme values)
returns = features[:, 1:, 0] # First feature is returns
assert torch.abs(returns).max() < 1.0, \
"Returns should be reasonable (< 100%)"
def test_return_all_attentions(self, price_focused_attention, sample_price_input, price_attention_config):
"""Verify return_all_attentions returns attention from all layers."""
output, attentions = price_focused_attention(
sample_price_input,
return_all_attentions=True
)
expected_n_layers = price_attention_config.n_layers
assert len(attentions) == expected_n_layers, \
f"Expected {expected_n_layers} attention tensors, got {len(attentions)}"
def test_encode_sequence_pooling(self, price_focused_attention, sample_price_input, batch_size, d_model):
"""Verify encode_sequence with different pooling methods."""
for pooling in ["last", "first", "mean", "max"]:
encoded = price_focused_attention.encode_sequence(sample_price_input, pooling=pooling)
assert encoded.shape == (batch_size, d_model), \
f"Pooling '{pooling}' should produce shape ({batch_size}, {d_model}), got {encoded.shape}"
def test_get_attention_scores(self, price_focused_attention, sample_price_input, batch_size, n_heads, seq_len):
"""Verify get_attention_scores returns correct shape."""
attention_scores = price_focused_attention.get_attention_scores(sample_price_input, layer_idx=-1)
assert attention_scores.shape == (batch_size, n_heads, seq_len, seq_len), \
f"Expected shape ({batch_size}, {n_heads}, {seq_len}, {seq_len}), got {attention_scores.shape}"
def test_deterministic_with_eval_mode(self, price_focused_attention, sample_price_input):
"""Verify model produces deterministic outputs in eval mode."""
price_focused_attention.eval()
with torch.no_grad():
output1, _ = price_focused_attention(sample_price_input)
output2, _ = price_focused_attention(sample_price_input)
torch.testing.assert_close(
output1, output2,
msg="Model should produce identical outputs in eval mode"
)
# ==============================================================================
# Tests for AttentionExtractor
# ==============================================================================
class TestAttentionExtractor:
"""Tests for AttentionExtractor utility class."""
@pytest.fixture
def extractor(self):
"""Create AttentionExtractor instance for testing."""
return AttentionExtractor()
def test_extract_scores(self, extractor, price_focused_attention, sample_price_input):
"""Verify attention score extraction works correctly."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
assert isinstance(scores, AttentionScores), \
"Should return AttentionScores object"
assert scores.scores is not None, \
"Scores array should not be None"
assert len(scores.scores.shape) == 4, \
"Scores should have 4 dimensions (batch, heads, seq, seq)"
def test_extract_scores_specific_layer(self, extractor, price_focused_attention, sample_price_input):
"""Verify extraction from specific layer."""
scores_layer0 = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=0
)
scores_layer1 = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=1
)
assert scores_layer0.layer_idx == 0
assert scores_layer1.layer_idx == 1
# Scores from different layers should be different
assert not np.allclose(scores_layer0.scores, scores_layer1.scores), \
"Different layers should produce different attention patterns"
def test_extract_scores_specific_head(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len):
"""Verify extraction for specific attention head."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1,
head_idx=0
)
assert scores.head_idx == 0
# When extracting single head, second dimension should be 1
assert scores.scores.shape[1] == 1, \
f"Expected 1 head, got {scores.scores.shape[1]}"
assert scores.scores.shape == (batch_size, 1, seq_len, seq_len)
def test_compute_statistics(self, extractor, price_focused_attention, sample_price_input):
"""Verify attention statistics computation."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
stats = extractor.compute_attention_statistics(scores)
# Check required keys exist
assert 'global' in stats, "Stats should have 'global' key"
assert 'per_head' in stats, "Stats should have 'per_head' key"
assert 'diagonal_attention_mean' in stats, "Stats should have 'diagonal_attention_mean' key"
assert 'sparsity' in stats, "Stats should have 'sparsity' key"
# Check global stats
global_stats = stats['global']
assert 'mean' in global_stats
assert 'std' in global_stats
assert 'max' in global_stats
assert 'min' in global_stats
# Mean should be reasonable for softmax outputs
assert 0.0 <= global_stats['mean'] <= 1.0, \
"Mean attention should be between 0 and 1"
# Min should be >= 0 (softmax outputs)
assert global_stats['min'] >= 0.0, \
"Min attention should be >= 0"
# Max should be <= 1 (softmax outputs)
assert global_stats['max'] <= 1.0, \
"Max attention should be <= 1"
def test_compute_statistics_per_head(self, extractor, price_focused_attention, sample_price_input, n_heads):
"""Verify per-head statistics computation."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
stats = extractor.compute_attention_statistics(scores)
per_head = stats['per_head']
assert len(per_head) == n_heads, \
f"Should have stats for {n_heads} heads, got {len(per_head)}"
for head_stat in per_head:
assert 'head' in head_stat
assert 'mean' in head_stat
assert 'std' in head_stat
assert 'max' in head_stat
assert 'entropy' in head_stat
def test_attention_scores_mean_attention(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len):
"""Verify AttentionScores.mean_attention method."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
mean_attn = scores.mean_attention()
assert mean_attn.shape == (batch_size, seq_len, seq_len), \
f"Mean attention should have shape ({batch_size}, {seq_len}, {seq_len}), got {mean_attn.shape}"
def test_attention_scores_head_attention(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len):
"""Verify AttentionScores.head_attention method."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
head_attn = scores.head_attention(0)
assert head_attn.shape == (batch_size, seq_len, seq_len), \
f"Head attention should have shape ({batch_size}, {seq_len}, {seq_len}), got {head_attn.shape}"
def test_attention_scores_to_dict(self, extractor, price_focused_attention, sample_price_input):
"""Verify AttentionScores serialization to dict."""
metadata = {'symbol': 'XAUUSD', 'timeframe': '15m'}
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1,
metadata=metadata
)
scores_dict = scores.to_dict()
assert 'scores' in scores_dict
assert 'layer_idx' in scores_dict
assert 'sequence_len' in scores_dict
assert 'n_heads' in scores_dict
assert 'metadata' in scores_dict
assert scores_dict['metadata'] == metadata
def test_sparsity_computation(self, extractor, price_focused_attention, sample_price_input):
"""Verify sparsity metric is computed correctly."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
stats = extractor.compute_attention_statistics(scores)
# Sparsity should be between 0 and 1
assert 0.0 <= stats['sparsity'] <= 1.0, \
f"Sparsity should be between 0 and 1, got {stats['sparsity']}"
def test_diagonal_attention(self, extractor, price_focused_attention, sample_price_input):
"""Verify diagonal attention mean is computed correctly."""
scores = extractor.get_attention_scores(
price_focused_attention,
sample_price_input,
layer_idx=-1
)
stats = extractor.compute_attention_statistics(scores)
# Diagonal attention mean should be between 0 and 1
assert 0.0 <= stats['diagonal_attention_mean'] <= 1.0, \
f"Diagonal attention mean should be between 0 and 1, got {stats['diagonal_attention_mean']}"
# ==============================================================================
# Integration Tests
# ==============================================================================
class TestAttentionIntegration:
"""Integration tests for the attention module."""
def test_full_pipeline(self, device):
"""Test complete pipeline from input to attention extraction."""
torch.manual_seed(42)
# Create config
config = PriceAttentionConfig(
d_model=64,
n_heads=4,
d_k=16,
d_v=16,
n_layers=2,
d_ff=128,
dropout=0.0,
attention_dropout=0.0,
input_features=4,
)
# Create model
model = PriceFocusedAttention(config, input_features=4).to(device)
model.eval()
# Create input
batch_size = 2
seq_len = 20
x = torch.randn(batch_size, seq_len, 4, device=device)
# Forward pass
output, attentions = model(x, return_all_attentions=True)
# Verify output
assert output.shape == (batch_size, seq_len, config.d_model)
assert len(attentions) == config.n_layers
# Extract and analyze attention
extractor = AttentionExtractor()
scores = extractor.get_attention_scores(model, x, layer_idx=-1)
# Compute statistics
stats = extractor.compute_attention_statistics(scores)
# Verify stats are reasonable
assert stats['global']['mean'] > 0
assert stats['global']['max'] <= 1.0
assert len(stats['per_head']) == config.n_heads
def test_training_step_simulation(self, price_focused_attention, sample_price_input, d_model):
"""Simulate a training step to verify gradients work correctly."""
price_focused_attention.train()
# Create a simple output head
output_head = nn.Linear(d_model, 1).to(sample_price_input.device)
# Forward pass
output, _ = price_focused_attention(sample_price_input)
prediction = output_head(output[:, -1, :]) # Use last position
# Create fake target
target = torch.randn_like(prediction)
# Compute loss
loss = nn.MSELoss()(prediction, target)
# Backward
loss.backward()
# Verify gradients exist for all parameters
for name, param in price_focused_attention.named_parameters():
if param.requires_grad:
assert param.grad is not None, f"Parameter {name} should have gradient"
def test_causal_attention_pattern(self, price_focused_attention, sample_price_input, batch_size, n_heads, seq_len, device):
"""Verify causal attention produces lower-triangular attention pattern."""
price_focused_attention.eval()
# Create causal mask
causal_mask = create_causal_mask(seq_len, device=device)
# Forward with causal mask
with torch.no_grad():
output, attentions = price_focused_attention(
sample_price_input,
mask=causal_mask,
return_all_attentions=True
)
# Check last layer attention pattern
last_attention = attentions[-1].cpu().numpy()
# Upper triangular (excluding diagonal) should be near zero
for b in range(batch_size):
for h in range(n_heads):
for i in range(seq_len):
for j in range(i + 1, seq_len):
assert last_attention[b, h, i, j] < 1e-5, \
f"Future position ({i}, {j}) should be masked"
# ==============================================================================
# Edge Case Tests
# ==============================================================================
class TestEdgeCases:
"""Tests for edge cases and boundary conditions."""
def test_single_sequence_batch(self, device):
"""Test with batch size of 1."""
config = PriceAttentionConfig(d_model=32, n_heads=4, input_features=4)
model = PriceFocusedAttention(config, input_features=4).to(device)
x = torch.randn(1, 16, 4, device=device)
output, attentions = model(x)
assert output.shape == (1, 16, 32)
def test_short_sequence(self, device):
"""Test with very short sequence (length 2)."""
config = PriceAttentionConfig(d_model=32, n_heads=4, input_features=4)
model = PriceFocusedAttention(config, input_features=4).to(device)
x = torch.randn(2, 2, 4, device=device) # Sequence length 2
output, attentions = model(x)
assert output.shape == (2, 2, 32)
def test_attention_scores_property(self):
"""Test AttentionScores dataclass properties."""
scores_array = np.random.rand(2, 4, 10, 10)
scores = AttentionScores(
scores=scores_array,
layer_idx=1,
head_idx=None,
n_heads=4
)
assert scores.shape == (2, 4, 10, 10)
assert scores.sequence_len == 10
def test_multi_head_attention_dimension_validation(self, device):
"""Test that invalid dimensions raise appropriate errors."""
with pytest.raises(ValueError):
# d_k * n_heads != d_model should raise error
MultiHeadAttention(d_model=64, n_heads=8, d_k=10) # 10 * 8 = 80 != 64
if __name__ == '__main__':
pytest.main([__file__, '-v', '--tb=short'])