- Fix database.py: Add DatabaseConnection alias for backward compat - Fix train_symbol_timeframe_models.py: Use PostgreSQLConnection + native queries - Fix run_oos_backtest.py: Fix broken import + add dynamic OOS support - Update data_splitter.py: split_dynamic_oos() method (from previous session) - Update validation_oos.yaml: Dynamic OOS config + all 6 symbols enabled - Create ingest_ohlcv_polygon.py: Standalone Polygon→PostgreSQL ingestion script - Fix .gitignore: /data/ instead of data/ to not ignore src/data/ - Add untracked src/ modules: backtesting, data, llm, models (attention/metamodel/strategies) - Add aiohttp, sqlalchemy, psycopg2-binary to requirements.txt Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
803 lines
31 KiB
Python
803 lines
31 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for Attention Architecture Module
|
|
========================================
|
|
|
|
Comprehensive unit tests for the attention module components:
|
|
- MultiHeadAttention: Core multi-head attention mechanism
|
|
- LearnablePositionalEncoding: Time-agnostic position embeddings
|
|
- PriceFocusedAttention: Main transformer encoder model
|
|
- AttentionExtractor: Utilities for attention analysis
|
|
|
|
Uses pytest and torch.testing for assertions.
|
|
|
|
Author: ML-Specialist (NEXUS v4.0)
|
|
Version: 1.0.0
|
|
Created: 2026-01-25
|
|
"""
|
|
|
|
import pytest
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
# Import the modules under test
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
|
|
|
|
from models.attention import (
|
|
MultiHeadAttention,
|
|
LearnablePositionalEncoding,
|
|
PriceFocusedAttention,
|
|
PriceAttentionConfig,
|
|
AttentionExtractor,
|
|
AttentionScores,
|
|
create_causal_mask,
|
|
compute_return_features,
|
|
)
|
|
|
|
|
|
# ==============================================================================
|
|
# Test Fixtures
|
|
# ==============================================================================
|
|
|
|
@pytest.fixture
|
|
def device():
|
|
"""Return the appropriate device for testing."""
|
|
return torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
|
@pytest.fixture
|
|
def batch_size():
|
|
"""Default batch size for tests."""
|
|
return 4
|
|
|
|
|
|
@pytest.fixture
|
|
def seq_len():
|
|
"""Default sequence length for tests."""
|
|
return 32
|
|
|
|
|
|
@pytest.fixture
|
|
def d_model():
|
|
"""Default model dimension for tests."""
|
|
return 64
|
|
|
|
|
|
@pytest.fixture
|
|
def n_heads():
|
|
"""Default number of attention heads for tests."""
|
|
return 8
|
|
|
|
|
|
@pytest.fixture
|
|
def input_features():
|
|
"""Default number of input features for tests."""
|
|
return 4
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_input(batch_size, seq_len, d_model, device):
|
|
"""Create sample input tensor for attention tests."""
|
|
torch.manual_seed(42)
|
|
return torch.randn(batch_size, seq_len, d_model, device=device)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_price_input(batch_size, seq_len, input_features, device):
|
|
"""Create sample price-based input tensor."""
|
|
torch.manual_seed(42)
|
|
return torch.randn(batch_size, seq_len, input_features, device=device)
|
|
|
|
|
|
@pytest.fixture
|
|
def multi_head_attention(d_model, n_heads, device):
|
|
"""Create MultiHeadAttention instance for testing."""
|
|
mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=0.0)
|
|
return mha.to(device)
|
|
|
|
|
|
@pytest.fixture
|
|
def positional_encoding(d_model, device):
|
|
"""Create LearnablePositionalEncoding instance for testing."""
|
|
pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0)
|
|
return pe.to(device)
|
|
|
|
|
|
@pytest.fixture
|
|
def price_attention_config(d_model, n_heads, input_features):
|
|
"""Create PriceAttentionConfig for testing."""
|
|
return PriceAttentionConfig(
|
|
d_model=d_model,
|
|
n_heads=n_heads,
|
|
d_k=d_model // n_heads,
|
|
d_v=d_model // n_heads,
|
|
n_layers=2,
|
|
d_ff=d_model * 4,
|
|
max_seq_len=512,
|
|
dropout=0.0,
|
|
attention_dropout=0.0,
|
|
input_features=input_features,
|
|
pre_norm=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def price_focused_attention(price_attention_config, input_features, device):
|
|
"""Create PriceFocusedAttention instance for testing."""
|
|
model = PriceFocusedAttention(
|
|
config=price_attention_config,
|
|
input_features=input_features
|
|
)
|
|
return model.to(device)
|
|
|
|
|
|
# ==============================================================================
|
|
# Tests for MultiHeadAttention
|
|
# ==============================================================================
|
|
|
|
class TestMultiHeadAttention:
|
|
"""Tests for MultiHeadAttention module."""
|
|
|
|
def test_output_shape(self, multi_head_attention, sample_input, batch_size, seq_len, d_model):
|
|
"""Verify that output has correct shape (batch, seq_len, d_model)."""
|
|
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
|
|
|
|
assert output.shape == (batch_size, seq_len, d_model), \
|
|
f"Expected output shape ({batch_size}, {seq_len}, {d_model}), got {output.shape}"
|
|
|
|
def test_attention_weights_shape(self, multi_head_attention, sample_input, batch_size, seq_len, n_heads):
|
|
"""Verify attention weights have correct shape (batch, n_heads, seq_len, seq_len)."""
|
|
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
|
|
|
|
assert attn_weights is not None, "Attention weights should not be None"
|
|
assert attn_weights.shape == (batch_size, n_heads, seq_len, seq_len), \
|
|
f"Expected attention weights shape ({batch_size}, {n_heads}, {seq_len}, {seq_len}), got {attn_weights.shape}"
|
|
|
|
def test_attention_weights_sum_to_one(self, multi_head_attention, sample_input):
|
|
"""Verify that attention weights sum to 1 along the key dimension (softmax)."""
|
|
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
|
|
|
|
# Sum along the last dimension (keys)
|
|
weight_sums = attn_weights.sum(dim=-1)
|
|
|
|
# All sums should be approximately 1.0
|
|
expected_ones = torch.ones_like(weight_sums)
|
|
torch.testing.assert_close(
|
|
weight_sums, expected_ones,
|
|
atol=1e-5, rtol=1e-5,
|
|
msg="Attention weights should sum to 1 along key dimension"
|
|
)
|
|
|
|
def test_causal_mask(self, multi_head_attention, sample_input, batch_size, seq_len, n_heads, device):
|
|
"""Verify that causal mask prevents attending to future positions."""
|
|
# Create causal mask
|
|
causal_mask = create_causal_mask(seq_len, device=device)
|
|
|
|
# Forward with causal mask
|
|
output, attn_weights = multi_head_attention(
|
|
sample_input, sample_input, sample_input,
|
|
mask=causal_mask
|
|
)
|
|
|
|
# Check that attention to future positions is zero (masked out)
|
|
# Upper triangular part (excluding diagonal) should be ~0
|
|
for b in range(batch_size):
|
|
for h in range(n_heads):
|
|
attention_matrix = attn_weights[b, h].cpu()
|
|
for i in range(seq_len):
|
|
for j in range(i + 1, seq_len):
|
|
assert attention_matrix[i, j] < 1e-6, \
|
|
f"Position ({i}, {j}) should be masked (near zero), got {attention_matrix[i, j]}"
|
|
|
|
def test_no_attention_returned_when_disabled(self, d_model, n_heads, device, sample_input):
|
|
"""Verify attention weights are None when return_attention is False."""
|
|
mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads).to(device)
|
|
output, attn_weights = mha(
|
|
sample_input, sample_input, sample_input,
|
|
return_attention=False
|
|
)
|
|
|
|
assert attn_weights is None, "Attention weights should be None when return_attention=False"
|
|
|
|
def test_different_q_k_v_shapes(self, d_model, n_heads, device):
|
|
"""Verify MHA works with different query and key/value sequence lengths."""
|
|
torch.manual_seed(42)
|
|
mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=0.0).to(device)
|
|
|
|
batch_size = 2
|
|
seq_len_q = 16
|
|
seq_len_kv = 24
|
|
|
|
query = torch.randn(batch_size, seq_len_q, d_model, device=device)
|
|
key = torch.randn(batch_size, seq_len_kv, d_model, device=device)
|
|
value = torch.randn(batch_size, seq_len_kv, d_model, device=device)
|
|
|
|
output, attn_weights = mha(query, key, value)
|
|
|
|
assert output.shape == (batch_size, seq_len_q, d_model)
|
|
assert attn_weights.shape == (batch_size, n_heads, seq_len_q, seq_len_kv)
|
|
|
|
def test_gradients_flow_correctly(self, multi_head_attention, sample_input):
|
|
"""Verify that gradients flow through the attention mechanism."""
|
|
sample_input.requires_grad_(True)
|
|
output, attn_weights = multi_head_attention(sample_input, sample_input, sample_input)
|
|
|
|
# Compute loss and backward
|
|
loss = output.sum()
|
|
loss.backward()
|
|
|
|
assert sample_input.grad is not None, "Input should have gradients"
|
|
assert not torch.all(sample_input.grad == 0), "Gradients should not be all zeros"
|
|
|
|
|
|
# ==============================================================================
|
|
# Tests for LearnablePositionalEncoding
|
|
# ==============================================================================
|
|
|
|
class TestLearnablePositionalEncoding:
|
|
"""Tests for LearnablePositionalEncoding module."""
|
|
|
|
def test_encoding_shape(self, positional_encoding, sample_input, batch_size, seq_len, d_model):
|
|
"""Verify that output shape matches input shape."""
|
|
output = positional_encoding(sample_input)
|
|
|
|
assert output.shape == sample_input.shape, \
|
|
f"Output shape {output.shape} should match input shape {sample_input.shape}"
|
|
|
|
def test_no_temporal_dependency(self, d_model, device):
|
|
"""Verify encoding does not depend on actual timestamps, only sequence position."""
|
|
pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0).to(device)
|
|
|
|
torch.manual_seed(42)
|
|
batch_size = 2
|
|
seq_len = 16
|
|
|
|
# Create two different input tensors
|
|
x1 = torch.randn(batch_size, seq_len, d_model, device=device)
|
|
x2 = torch.randn(batch_size, seq_len, d_model, device=device) * 2.0 # Different scale
|
|
|
|
# Get position embeddings (the added positions should be the same)
|
|
output1 = pe(x1)
|
|
output2 = pe(x2)
|
|
|
|
# The position embeddings added should be the same for both
|
|
# (output - input) gives us the position embeddings
|
|
pe1 = output1 - x1
|
|
pe2 = output2 - x2
|
|
|
|
torch.testing.assert_close(
|
|
pe1, pe2,
|
|
atol=1e-5, rtol=1e-5,
|
|
msg="Position embeddings should be identical regardless of input values"
|
|
)
|
|
|
|
def test_learnable_parameters(self, positional_encoding, d_model):
|
|
"""Verify that position embeddings are learnable parameters."""
|
|
# Check that position_embeddings is a Parameter
|
|
assert hasattr(positional_encoding, 'position_embeddings'), \
|
|
"Should have position_embeddings attribute"
|
|
assert isinstance(positional_encoding.position_embeddings, nn.Parameter), \
|
|
"position_embeddings should be nn.Parameter"
|
|
|
|
# Check it requires gradients
|
|
assert positional_encoding.position_embeddings.requires_grad, \
|
|
"position_embeddings should require gradients"
|
|
|
|
# Check shape
|
|
max_seq_len = 512
|
|
assert positional_encoding.position_embeddings.shape == (max_seq_len, d_model), \
|
|
f"Expected shape ({max_seq_len}, {d_model}), got {positional_encoding.position_embeddings.shape}"
|
|
|
|
def test_offset_parameter_works(self, d_model, device):
|
|
"""Verify that offset parameter shifts position indices correctly."""
|
|
pe = LearnablePositionalEncoding(d_model=d_model, max_seq_len=512, dropout=0.0).to(device)
|
|
|
|
torch.manual_seed(42)
|
|
batch_size = 2
|
|
seq_len = 10
|
|
|
|
# Use zeros as input so we can see only the position embeddings
|
|
x = torch.zeros(batch_size, seq_len, d_model, device=device)
|
|
|
|
output_no_offset = pe(x, offset=0)
|
|
output_with_offset = pe(x, offset=5)
|
|
|
|
# output_no_offset positions 5:15 should equal output_with_offset positions 0:10
|
|
torch.testing.assert_close(
|
|
output_no_offset[:, 5:, :],
|
|
output_with_offset[:, :5, :],
|
|
atol=1e-5, rtol=1e-5,
|
|
msg="Offset should shift position embeddings correctly"
|
|
)
|
|
|
|
def test_exceeds_max_seq_len_raises_error(self, positional_encoding, d_model, device):
|
|
"""Verify that exceeding max_seq_len raises ValueError."""
|
|
x = torch.randn(1, 600, d_model, device=device) # 600 > 512 (max_seq_len)
|
|
|
|
with pytest.raises(ValueError, match="exceeds maximum sequence length"):
|
|
positional_encoding(x)
|
|
|
|
def test_get_position_embedding(self, positional_encoding, d_model):
|
|
"""Verify get_position_embedding returns correct embedding."""
|
|
position = 10
|
|
embedding = positional_encoding.get_position_embedding(position)
|
|
|
|
assert embedding.shape == (d_model,), \
|
|
f"Expected shape ({d_model},), got {embedding.shape}"
|
|
|
|
# Should match the parameter at that position
|
|
torch.testing.assert_close(
|
|
embedding,
|
|
positional_encoding.position_embeddings[position],
|
|
msg="get_position_embedding should return correct position embedding"
|
|
)
|
|
|
|
|
|
# ==============================================================================
|
|
# Tests for PriceFocusedAttention
|
|
# ==============================================================================
|
|
|
|
class TestPriceFocusedAttention:
|
|
"""Tests for PriceFocusedAttention model."""
|
|
|
|
def test_forward_pass(self, price_focused_attention, sample_price_input):
|
|
"""Verify forward pass completes without error."""
|
|
output, attentions = price_focused_attention(sample_price_input)
|
|
|
|
assert output is not None, "Output should not be None"
|
|
assert attentions is not None, "Attentions should not be None"
|
|
assert len(attentions) > 0, "Should have at least one attention tensor"
|
|
|
|
def test_output_shape(self, price_focused_attention, sample_price_input, batch_size, seq_len, d_model):
|
|
"""Verify output has correct shape (batch, seq_len, d_model)."""
|
|
output, attentions = price_focused_attention(sample_price_input)
|
|
|
|
assert output.shape == (batch_size, seq_len, d_model), \
|
|
f"Expected output shape ({batch_size}, {seq_len}, {d_model}), got {output.shape}"
|
|
|
|
def test_no_nan_gradients(self, price_focused_attention, sample_price_input):
|
|
"""Verify gradients are stable (no NaN values)."""
|
|
sample_price_input.requires_grad_(True)
|
|
|
|
output, attentions = price_focused_attention(sample_price_input)
|
|
|
|
# Compute loss and backward
|
|
loss = output.sum()
|
|
loss.backward()
|
|
|
|
# Check for NaN in gradients
|
|
assert sample_price_input.grad is not None, "Input should have gradients"
|
|
assert not torch.isnan(sample_price_input.grad).any(), \
|
|
"Gradients should not contain NaN values"
|
|
assert not torch.isinf(sample_price_input.grad).any(), \
|
|
"Gradients should not contain Inf values"
|
|
|
|
# Check model parameters for NaN gradients
|
|
for name, param in price_focused_attention.named_parameters():
|
|
if param.grad is not None:
|
|
assert not torch.isnan(param.grad).any(), \
|
|
f"Parameter {name} has NaN gradients"
|
|
assert not torch.isinf(param.grad).any(), \
|
|
f"Parameter {name} has Inf gradients"
|
|
|
|
def test_compute_return_features(self, device):
|
|
"""Verify compute_return_features produces valid features."""
|
|
torch.manual_seed(42)
|
|
batch_size = 4
|
|
seq_len = 50
|
|
|
|
# Create OHLC data (open, high, low, close)
|
|
base_price = torch.ones(batch_size, seq_len, 1, device=device) * 100.0
|
|
noise = torch.randn(batch_size, seq_len, 1, device=device) * 2.0
|
|
|
|
open_price = base_price + noise
|
|
high = base_price + torch.abs(torch.randn(batch_size, seq_len, 1, device=device)) * 3.0
|
|
low = base_price - torch.abs(torch.randn(batch_size, seq_len, 1, device=device)) * 3.0
|
|
close = base_price + noise * 0.5
|
|
|
|
prices = torch.cat([open_price, high, low, close], dim=-1)
|
|
|
|
features = compute_return_features(prices)
|
|
|
|
# Check shape
|
|
assert features.shape == (batch_size, seq_len, 4), \
|
|
f"Expected features shape ({batch_size}, {seq_len}, 4), got {features.shape}"
|
|
|
|
# Check no NaN (except possibly first row due to returns)
|
|
assert not torch.isnan(features[:, 1:, :]).any(), \
|
|
"Features should not have NaN values (after first row)"
|
|
|
|
# Check returns are reasonable (not extreme values)
|
|
returns = features[:, 1:, 0] # First feature is returns
|
|
assert torch.abs(returns).max() < 1.0, \
|
|
"Returns should be reasonable (< 100%)"
|
|
|
|
def test_return_all_attentions(self, price_focused_attention, sample_price_input, price_attention_config):
|
|
"""Verify return_all_attentions returns attention from all layers."""
|
|
output, attentions = price_focused_attention(
|
|
sample_price_input,
|
|
return_all_attentions=True
|
|
)
|
|
|
|
expected_n_layers = price_attention_config.n_layers
|
|
assert len(attentions) == expected_n_layers, \
|
|
f"Expected {expected_n_layers} attention tensors, got {len(attentions)}"
|
|
|
|
def test_encode_sequence_pooling(self, price_focused_attention, sample_price_input, batch_size, d_model):
|
|
"""Verify encode_sequence with different pooling methods."""
|
|
for pooling in ["last", "first", "mean", "max"]:
|
|
encoded = price_focused_attention.encode_sequence(sample_price_input, pooling=pooling)
|
|
assert encoded.shape == (batch_size, d_model), \
|
|
f"Pooling '{pooling}' should produce shape ({batch_size}, {d_model}), got {encoded.shape}"
|
|
|
|
def test_get_attention_scores(self, price_focused_attention, sample_price_input, batch_size, n_heads, seq_len):
|
|
"""Verify get_attention_scores returns correct shape."""
|
|
attention_scores = price_focused_attention.get_attention_scores(sample_price_input, layer_idx=-1)
|
|
|
|
assert attention_scores.shape == (batch_size, n_heads, seq_len, seq_len), \
|
|
f"Expected shape ({batch_size}, {n_heads}, {seq_len}, {seq_len}), got {attention_scores.shape}"
|
|
|
|
def test_deterministic_with_eval_mode(self, price_focused_attention, sample_price_input):
|
|
"""Verify model produces deterministic outputs in eval mode."""
|
|
price_focused_attention.eval()
|
|
|
|
with torch.no_grad():
|
|
output1, _ = price_focused_attention(sample_price_input)
|
|
output2, _ = price_focused_attention(sample_price_input)
|
|
|
|
torch.testing.assert_close(
|
|
output1, output2,
|
|
msg="Model should produce identical outputs in eval mode"
|
|
)
|
|
|
|
|
|
# ==============================================================================
|
|
# Tests for AttentionExtractor
|
|
# ==============================================================================
|
|
|
|
class TestAttentionExtractor:
|
|
"""Tests for AttentionExtractor utility class."""
|
|
|
|
@pytest.fixture
|
|
def extractor(self):
|
|
"""Create AttentionExtractor instance for testing."""
|
|
return AttentionExtractor()
|
|
|
|
def test_extract_scores(self, extractor, price_focused_attention, sample_price_input):
|
|
"""Verify attention score extraction works correctly."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
assert isinstance(scores, AttentionScores), \
|
|
"Should return AttentionScores object"
|
|
assert scores.scores is not None, \
|
|
"Scores array should not be None"
|
|
assert len(scores.scores.shape) == 4, \
|
|
"Scores should have 4 dimensions (batch, heads, seq, seq)"
|
|
|
|
def test_extract_scores_specific_layer(self, extractor, price_focused_attention, sample_price_input):
|
|
"""Verify extraction from specific layer."""
|
|
scores_layer0 = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=0
|
|
)
|
|
scores_layer1 = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=1
|
|
)
|
|
|
|
assert scores_layer0.layer_idx == 0
|
|
assert scores_layer1.layer_idx == 1
|
|
|
|
# Scores from different layers should be different
|
|
assert not np.allclose(scores_layer0.scores, scores_layer1.scores), \
|
|
"Different layers should produce different attention patterns"
|
|
|
|
def test_extract_scores_specific_head(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len):
|
|
"""Verify extraction for specific attention head."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1,
|
|
head_idx=0
|
|
)
|
|
|
|
assert scores.head_idx == 0
|
|
# When extracting single head, second dimension should be 1
|
|
assert scores.scores.shape[1] == 1, \
|
|
f"Expected 1 head, got {scores.scores.shape[1]}"
|
|
assert scores.scores.shape == (batch_size, 1, seq_len, seq_len)
|
|
|
|
def test_compute_statistics(self, extractor, price_focused_attention, sample_price_input):
|
|
"""Verify attention statistics computation."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
stats = extractor.compute_attention_statistics(scores)
|
|
|
|
# Check required keys exist
|
|
assert 'global' in stats, "Stats should have 'global' key"
|
|
assert 'per_head' in stats, "Stats should have 'per_head' key"
|
|
assert 'diagonal_attention_mean' in stats, "Stats should have 'diagonal_attention_mean' key"
|
|
assert 'sparsity' in stats, "Stats should have 'sparsity' key"
|
|
|
|
# Check global stats
|
|
global_stats = stats['global']
|
|
assert 'mean' in global_stats
|
|
assert 'std' in global_stats
|
|
assert 'max' in global_stats
|
|
assert 'min' in global_stats
|
|
|
|
# Mean should be reasonable for softmax outputs
|
|
assert 0.0 <= global_stats['mean'] <= 1.0, \
|
|
"Mean attention should be between 0 and 1"
|
|
|
|
# Min should be >= 0 (softmax outputs)
|
|
assert global_stats['min'] >= 0.0, \
|
|
"Min attention should be >= 0"
|
|
|
|
# Max should be <= 1 (softmax outputs)
|
|
assert global_stats['max'] <= 1.0, \
|
|
"Max attention should be <= 1"
|
|
|
|
def test_compute_statistics_per_head(self, extractor, price_focused_attention, sample_price_input, n_heads):
|
|
"""Verify per-head statistics computation."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
stats = extractor.compute_attention_statistics(scores)
|
|
|
|
per_head = stats['per_head']
|
|
assert len(per_head) == n_heads, \
|
|
f"Should have stats for {n_heads} heads, got {len(per_head)}"
|
|
|
|
for head_stat in per_head:
|
|
assert 'head' in head_stat
|
|
assert 'mean' in head_stat
|
|
assert 'std' in head_stat
|
|
assert 'max' in head_stat
|
|
assert 'entropy' in head_stat
|
|
|
|
def test_attention_scores_mean_attention(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len):
|
|
"""Verify AttentionScores.mean_attention method."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
mean_attn = scores.mean_attention()
|
|
|
|
assert mean_attn.shape == (batch_size, seq_len, seq_len), \
|
|
f"Mean attention should have shape ({batch_size}, {seq_len}, {seq_len}), got {mean_attn.shape}"
|
|
|
|
def test_attention_scores_head_attention(self, extractor, price_focused_attention, sample_price_input, batch_size, seq_len):
|
|
"""Verify AttentionScores.head_attention method."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
head_attn = scores.head_attention(0)
|
|
|
|
assert head_attn.shape == (batch_size, seq_len, seq_len), \
|
|
f"Head attention should have shape ({batch_size}, {seq_len}, {seq_len}), got {head_attn.shape}"
|
|
|
|
def test_attention_scores_to_dict(self, extractor, price_focused_attention, sample_price_input):
|
|
"""Verify AttentionScores serialization to dict."""
|
|
metadata = {'symbol': 'XAUUSD', 'timeframe': '15m'}
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1,
|
|
metadata=metadata
|
|
)
|
|
|
|
scores_dict = scores.to_dict()
|
|
|
|
assert 'scores' in scores_dict
|
|
assert 'layer_idx' in scores_dict
|
|
assert 'sequence_len' in scores_dict
|
|
assert 'n_heads' in scores_dict
|
|
assert 'metadata' in scores_dict
|
|
assert scores_dict['metadata'] == metadata
|
|
|
|
def test_sparsity_computation(self, extractor, price_focused_attention, sample_price_input):
|
|
"""Verify sparsity metric is computed correctly."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
stats = extractor.compute_attention_statistics(scores)
|
|
|
|
# Sparsity should be between 0 and 1
|
|
assert 0.0 <= stats['sparsity'] <= 1.0, \
|
|
f"Sparsity should be between 0 and 1, got {stats['sparsity']}"
|
|
|
|
def test_diagonal_attention(self, extractor, price_focused_attention, sample_price_input):
|
|
"""Verify diagonal attention mean is computed correctly."""
|
|
scores = extractor.get_attention_scores(
|
|
price_focused_attention,
|
|
sample_price_input,
|
|
layer_idx=-1
|
|
)
|
|
|
|
stats = extractor.compute_attention_statistics(scores)
|
|
|
|
# Diagonal attention mean should be between 0 and 1
|
|
assert 0.0 <= stats['diagonal_attention_mean'] <= 1.0, \
|
|
f"Diagonal attention mean should be between 0 and 1, got {stats['diagonal_attention_mean']}"
|
|
|
|
|
|
# ==============================================================================
|
|
# Integration Tests
|
|
# ==============================================================================
|
|
|
|
class TestAttentionIntegration:
|
|
"""Integration tests for the attention module."""
|
|
|
|
def test_full_pipeline(self, device):
|
|
"""Test complete pipeline from input to attention extraction."""
|
|
torch.manual_seed(42)
|
|
|
|
# Create config
|
|
config = PriceAttentionConfig(
|
|
d_model=64,
|
|
n_heads=4,
|
|
d_k=16,
|
|
d_v=16,
|
|
n_layers=2,
|
|
d_ff=128,
|
|
dropout=0.0,
|
|
attention_dropout=0.0,
|
|
input_features=4,
|
|
)
|
|
|
|
# Create model
|
|
model = PriceFocusedAttention(config, input_features=4).to(device)
|
|
model.eval()
|
|
|
|
# Create input
|
|
batch_size = 2
|
|
seq_len = 20
|
|
x = torch.randn(batch_size, seq_len, 4, device=device)
|
|
|
|
# Forward pass
|
|
output, attentions = model(x, return_all_attentions=True)
|
|
|
|
# Verify output
|
|
assert output.shape == (batch_size, seq_len, config.d_model)
|
|
assert len(attentions) == config.n_layers
|
|
|
|
# Extract and analyze attention
|
|
extractor = AttentionExtractor()
|
|
scores = extractor.get_attention_scores(model, x, layer_idx=-1)
|
|
|
|
# Compute statistics
|
|
stats = extractor.compute_attention_statistics(scores)
|
|
|
|
# Verify stats are reasonable
|
|
assert stats['global']['mean'] > 0
|
|
assert stats['global']['max'] <= 1.0
|
|
assert len(stats['per_head']) == config.n_heads
|
|
|
|
def test_training_step_simulation(self, price_focused_attention, sample_price_input, d_model):
|
|
"""Simulate a training step to verify gradients work correctly."""
|
|
price_focused_attention.train()
|
|
|
|
# Create a simple output head
|
|
output_head = nn.Linear(d_model, 1).to(sample_price_input.device)
|
|
|
|
# Forward pass
|
|
output, _ = price_focused_attention(sample_price_input)
|
|
prediction = output_head(output[:, -1, :]) # Use last position
|
|
|
|
# Create fake target
|
|
target = torch.randn_like(prediction)
|
|
|
|
# Compute loss
|
|
loss = nn.MSELoss()(prediction, target)
|
|
|
|
# Backward
|
|
loss.backward()
|
|
|
|
# Verify gradients exist for all parameters
|
|
for name, param in price_focused_attention.named_parameters():
|
|
if param.requires_grad:
|
|
assert param.grad is not None, f"Parameter {name} should have gradient"
|
|
|
|
def test_causal_attention_pattern(self, price_focused_attention, sample_price_input, batch_size, n_heads, seq_len, device):
|
|
"""Verify causal attention produces lower-triangular attention pattern."""
|
|
price_focused_attention.eval()
|
|
|
|
# Create causal mask
|
|
causal_mask = create_causal_mask(seq_len, device=device)
|
|
|
|
# Forward with causal mask
|
|
with torch.no_grad():
|
|
output, attentions = price_focused_attention(
|
|
sample_price_input,
|
|
mask=causal_mask,
|
|
return_all_attentions=True
|
|
)
|
|
|
|
# Check last layer attention pattern
|
|
last_attention = attentions[-1].cpu().numpy()
|
|
|
|
# Upper triangular (excluding diagonal) should be near zero
|
|
for b in range(batch_size):
|
|
for h in range(n_heads):
|
|
for i in range(seq_len):
|
|
for j in range(i + 1, seq_len):
|
|
assert last_attention[b, h, i, j] < 1e-5, \
|
|
f"Future position ({i}, {j}) should be masked"
|
|
|
|
|
|
# ==============================================================================
|
|
# Edge Case Tests
|
|
# ==============================================================================
|
|
|
|
class TestEdgeCases:
|
|
"""Tests for edge cases and boundary conditions."""
|
|
|
|
def test_single_sequence_batch(self, device):
|
|
"""Test with batch size of 1."""
|
|
config = PriceAttentionConfig(d_model=32, n_heads=4, input_features=4)
|
|
model = PriceFocusedAttention(config, input_features=4).to(device)
|
|
|
|
x = torch.randn(1, 16, 4, device=device)
|
|
output, attentions = model(x)
|
|
|
|
assert output.shape == (1, 16, 32)
|
|
|
|
def test_short_sequence(self, device):
|
|
"""Test with very short sequence (length 2)."""
|
|
config = PriceAttentionConfig(d_model=32, n_heads=4, input_features=4)
|
|
model = PriceFocusedAttention(config, input_features=4).to(device)
|
|
|
|
x = torch.randn(2, 2, 4, device=device) # Sequence length 2
|
|
output, attentions = model(x)
|
|
|
|
assert output.shape == (2, 2, 32)
|
|
|
|
def test_attention_scores_property(self):
|
|
"""Test AttentionScores dataclass properties."""
|
|
scores_array = np.random.rand(2, 4, 10, 10)
|
|
scores = AttentionScores(
|
|
scores=scores_array,
|
|
layer_idx=1,
|
|
head_idx=None,
|
|
n_heads=4
|
|
)
|
|
|
|
assert scores.shape == (2, 4, 10, 10)
|
|
assert scores.sequence_len == 10
|
|
|
|
def test_multi_head_attention_dimension_validation(self, device):
|
|
"""Test that invalid dimensions raise appropriate errors."""
|
|
with pytest.raises(ValueError):
|
|
# d_k * n_heads != d_model should raise error
|
|
MultiHeadAttention(d_model=64, n_heads=8, d_k=10) # 10 * 8 = 80 != 64
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pytest.main([__file__, '-v', '--tb=short'])
|