stock/tests/test_portfolio.py
ZhangPeng 9aecdd036c Initial commit: OpenClaw Trading - AI多智能体量化交易系统
- 添加项目核心代码和配置
- 添加前端界面 (Next.js)
- 添加单元测试
- 更新 .gitignore 排除缓存和依赖
2026-02-27 03:47:40 +08:00

661 lines
21 KiB
Python

"""Unit tests for strategy portfolio management module.
Tests cover weight allocation algorithms, signal aggregation,
rebalancing logic, and StrategyPortfolio class functionality.
"""
import sys
from datetime import datetime, timedelta
from typing import Any, Dict
import numpy as np
import pandas as pd
import pytest
# Add src to path for imports
sys.path.insert(0, "src")
from openclaw.portfolio.weights import (
WeightMethod,
apply_weight_constraints,
calculate_equal_weights,
calculate_inverse_volatility_weights,
calculate_momentum_weights,
calculate_risk_parity_weights,
normalize_weights,
validate_weights,
)
from openclaw.portfolio.signal_aggregator import (
AggregationMethod,
AggregatedSignal,
SignalAggregator,
StrategySignal,
)
from openclaw.portfolio.rebalancer import (
RebalanceResult,
RebalanceTrigger,
Rebalancer,
TransactionCostModel,
)
from openclaw.portfolio.strategy_portfolio import (
StrategyConfig,
StrategyPerformance,
StrategyPortfolio,
StrategyStatus,
)
# ==================== Test Weight Allocation ====================
class TestWeightAllocation:
"""Test suite for weight allocation algorithms."""
def test_equal_weights(self) -> None:
"""Test equal weight allocation."""
strategies = ["s1", "s2", "s3", "s4"]
weights = calculate_equal_weights(strategies)
assert len(weights) == 4
assert all(w == 0.25 for w in weights.values())
assert validate_weights(weights)
def test_equal_weights_empty(self) -> None:
"""Test equal weights with empty list."""
weights = calculate_equal_weights([])
assert weights == {}
def test_risk_parity_weights(self) -> None:
"""Test risk parity weight allocation."""
strategies = ["low_vol", "high_vol"]
# Create returns with different volatilities
np.random.seed(42)
low_vol_returns = np.random.normal(0.001, 0.01, 100)
high_vol_returns = np.random.normal(0.001, 0.05, 100)
returns_data = pd.DataFrame({
"low_vol": low_vol_returns,
"high_vol": high_vol_returns,
})
weights = calculate_risk_parity_weights(strategies, returns_data)
assert len(weights) == 2
assert weights["low_vol"] > weights["high_vol"]
assert validate_weights(weights)
def test_risk_parity_no_data(self) -> None:
"""Test risk parity fallback to equal weights."""
strategies = ["s1", "s2"]
weights = calculate_risk_parity_weights(strategies, None)
assert weights["s1"] == weights["s2"] == 0.5
def test_momentum_weights(self) -> None:
"""Test momentum-based weight allocation."""
strategies = ["winner", "loser"]
# Create returns with different momentum
np.random.seed(42)
winner_returns = np.random.normal(0.005, 0.02, 60)
loser_returns = np.random.normal(-0.002, 0.02, 60)
returns_data = pd.DataFrame({
"winner": winner_returns,
"loser": loser_returns,
})
weights = calculate_momentum_weights(strategies, returns_data)
assert len(weights) == 2
assert weights["winner"] > weights["loser"]
assert validate_weights(weights)
def test_inverse_volatility_weights(self) -> None:
"""Test inverse volatility weight allocation."""
strategies = ["stable", "volatile"]
np.random.seed(42)
stable_returns = np.random.normal(0.001, 0.01, 60)
volatile_returns = np.random.normal(0.001, 0.08, 60)
returns_data = pd.DataFrame({
"stable": stable_returns,
"volatile": volatile_returns,
})
weights = calculate_inverse_volatility_weights(strategies, returns_data)
assert len(weights) == 2
assert weights["stable"] > weights["volatile"]
assert validate_weights(weights)
def test_normalize_weights(self) -> None:
"""Test weight normalization."""
weights = {"s1": 0.3, "s2": 0.3, "s3": 0.3}
normalized = normalize_weights(weights)
assert abs(sum(normalized.values()) - 1.0) < 0.001
def test_normalize_weights_zero_sum(self) -> None:
"""Test normalization with zero sum."""
weights = {"s1": 0.0, "s2": 0.0}
normalized = normalize_weights(weights)
assert normalized["s1"] == normalized["s2"] == 0.5
def test_apply_weight_constraints(self) -> None:
"""Test weight constraint application."""
weights = {"s1": 0.8, "s2": 0.1, "s3": 0.1}
constrained = apply_weight_constraints(weights, min_weight=0.15, max_weight=0.5)
# s1 should be capped at max_weight
assert constrained["s1"] == 0.5
# s2 and s3 should receive redistributed weight
assert constrained["s2"] >= 0.15
assert constrained["s3"] >= 0.15
# Sum should be 1.0
assert abs(sum(constrained.values()) - 1.0) < 0.001
# ==================== Test Signal Aggregation ====================
class TestSignalAggregation:
"""Test suite for signal aggregation."""
def test_voting_aggregation(self) -> None:
"""Test simple voting aggregation."""
signals = [
StrategySignal("s1", "buy", 0.8),
StrategySignal("s2", "buy", 0.7),
StrategySignal("s3", "sell", 0.6),
]
aggregator = SignalAggregator(method=AggregationMethod.VOTING)
result = aggregator.aggregate(signals)
assert result.aggregated_signal == "buy"
assert result.confidence > 0.5
assert result.method == AggregationMethod.VOTING
def test_weighted_aggregation(self) -> None:
"""Test weighted signal aggregation."""
signals = [
StrategySignal("s1", "buy", 0.8),
StrategySignal("s2", "sell", 0.6),
]
weights = {"s1": 0.7, "s2": 0.3}
aggregator = SignalAggregator(method=AggregationMethod.WEIGHTED)
result = aggregator.aggregate(signals, weights=weights)
assert result.aggregated_signal == "buy" # s1 has higher weight
assert result.method == AggregationMethod.WEIGHTED
def test_confidence_threshold_filtering(self) -> None:
"""Test confidence threshold aggregation."""
signals = [
StrategySignal("s1", "buy", 0.9),
StrategySignal("s2", "sell", 0.4), # Below threshold
StrategySignal("s3", "buy", 0.8),
]
aggregator = SignalAggregator(
method=AggregationMethod.CONFIDENCE_THRESHOLD,
confidence_threshold=0.5,
)
result = aggregator.aggregate(signals)
assert result.aggregated_signal == "buy"
# Should only consider s1 and s3 (both above threshold)
def test_majority_vote(self) -> None:
"""Test majority vote aggregation."""
signals = [
StrategySignal("s1", "buy", 0.8),
StrategySignal("s2", "buy", 0.7),
StrategySignal("s3", "sell", 0.6),
]
aggregator = SignalAggregator(method=AggregationMethod.MAJORITY_VOTE)
result = aggregator.aggregate(signals)
assert result.aggregated_signal == "buy"
def test_unanimous_agreement(self) -> None:
"""Test unanimous vote aggregation."""
signals = [
StrategySignal("s1", "buy", 0.8),
StrategySignal("s2", "buy", 0.7),
StrategySignal("s3", "buy", 0.9),
]
aggregator = SignalAggregator(method=AggregationMethod.UNANIMOUS)
result = aggregator.aggregate(signals)
assert result.aggregated_signal == "buy"
def test_unanimous_disagreement(self) -> None:
"""Test unanimous when strategies disagree."""
signals = [
StrategySignal("s1", "buy", 0.8),
StrategySignal("s2", "sell", 0.7),
]
aggregator = SignalAggregator(method=AggregationMethod.UNANIMOUS)
result = aggregator.aggregate(signals)
assert result.aggregated_signal == "hold"
def test_empty_signals(self) -> None:
"""Test aggregation with no signals."""
aggregator = SignalAggregator()
result = aggregator.aggregate([])
assert result.aggregated_signal == "hold"
assert result.confidence == 0.0
def test_aggregated_signal_properties(self) -> None:
"""Test AggregatedSignal helper properties."""
signal = AggregatedSignal(
aggregated_signal="buy",
confidence=0.8,
)
assert signal.is_bullish
assert not signal.is_bearish
assert not signal.is_neutral
# ==================== Test Rebalancer ====================
class TestRebalancer:
"""Test suite for rebalancing logic."""
def test_drift_calculation(self) -> None:
"""Test portfolio drift calculation."""
rebalancer = Rebalancer()
current = {"s1": 0.4, "s2": 0.6}
target = {"s1": 0.5, "s2": 0.5}
drift = rebalancer.calculate_drift(current, target)
assert abs(drift - 0.1) < 0.0001
def test_rebalance_needed_periodic(self) -> None:
"""Test periodic rebalance trigger."""
rebalancer = Rebalancer(
trigger_type=RebalanceTrigger.PERIODIC,
rebalance_frequency=30,
)
current = {"s1": 0.5, "s2": 0.5}
target = {"s1": 0.5, "s2": 0.5}
# Initial rebalance needed
needed, trigger = rebalancer.check_rebalance_needed(current, target)
assert needed
assert trigger == RebalanceTrigger.PERIODIC
def test_rebalance_needed_threshold(self) -> None:
"""Test threshold-based rebalance trigger."""
rebalancer = Rebalancer(
trigger_type=RebalanceTrigger.THRESHOLD,
drift_threshold=0.05,
)
current = {"s1": 0.6, "s2": 0.4}
target = {"s1": 0.5, "s2": 0.5}
needed, trigger = rebalancer.check_rebalance_needed(current, target)
assert needed
assert trigger == RebalanceTrigger.THRESHOLD
def test_rebalance_not_needed(self) -> None:
"""Test when rebalance is not needed."""
rebalancer = Rebalancer(
trigger_type=RebalanceTrigger.THRESHOLD,
drift_threshold=0.1,
)
current = {"s1": 0.52, "s2": 0.48}
target = {"s1": 0.5, "s2": 0.5}
needed, _ = rebalancer.check_rebalance_needed(current, target)
assert not needed
def test_transaction_cost_calculation(self) -> None:
"""Test transaction cost model."""
model = TransactionCostModel(
fixed_cost=5.0,
percentage_cost=0.001,
)
cost = model.calculate_cost(trade_value=10000.0)
expected = 5.0 + 10000.0 * 0.001 # 15.0
assert cost == expected
def test_rebalance_execution(self) -> None:
"""Test rebalance execution."""
rebalancer = Rebalancer()
current = {"s1": 0.6, "s2": 0.4}
target = {"s1": 0.5, "s2": 0.5}
result = rebalancer.rebalance(
current_weights=current,
target_weights=target,
portfolio_value=100000.0,
force=True,
)
assert result is not None
assert result.trades_executed == 2
assert result.old_weights == current
assert result.new_weights == target
def test_rebalance_turnover(self) -> None:
"""Test turnover calculation."""
result = RebalanceResult(
timestamp=datetime.now(),
old_weights={"s1": 0.6, "s2": 0.4},
new_weights={"s1": 0.5, "s2": 0.5},
)
# Turnover = (|0.5-0.6| + |0.5-0.4|) / 2 = 0.1
assert abs(result.total_turnover - 0.1) < 0.0001
# ==================== Test StrategyPortfolio ====================
class TestStrategyPortfolio:
"""Test suite for StrategyPortfolio class."""
def test_portfolio_initialization(self) -> None:
"""Test portfolio initialization."""
portfolio = StrategyPortfolio(
portfolio_id="test_portfolio",
initial_capital=100000.0,
)
assert portfolio.portfolio_id == "test_portfolio"
assert portfolio.initial_capital == 100000.0
assert portfolio.list_strategies() == []
def test_add_strategy(self) -> None:
"""Test adding strategies to portfolio."""
portfolio = StrategyPortfolio("test")
result = portfolio.add_strategy("s1", weight=0.5)
assert result
assert "s1" in portfolio.list_strategies()
def test_add_duplicate_strategy(self) -> None:
"""Test adding duplicate strategy fails."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
result = portfolio.add_strategy("s1")
assert not result
def test_remove_strategy(self) -> None:
"""Test removing strategies."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
result = portfolio.remove_strategy("s1")
assert result
assert "s1" not in portfolio.list_strategies()
def test_strategy_lifecycle(self) -> None:
"""Test strategy enable/disable/pause."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1", status=StrategyStatus.ACTIVE)
assert portfolio.get_strategy_status("s1") == StrategyStatus.ACTIVE
portfolio.disable_strategy("s1")
assert portfolio.get_strategy_status("s1") == StrategyStatus.DISABLED
portfolio.enable_strategy("s1")
assert portfolio.get_strategy_status("s1") == StrategyStatus.ACTIVE
portfolio.pause_strategy("s1")
assert portfolio.get_strategy_status("s1") == StrategyStatus.PAUSED
def test_list_active_strategies(self) -> None:
"""Test listing only active strategies."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1", status=StrategyStatus.ACTIVE)
portfolio.add_strategy("s2", status=StrategyStatus.DISABLED)
portfolio.add_strategy("s3", status=StrategyStatus.ACTIVE)
active = portfolio.list_strategies(active_only=True)
assert "s1" in active
assert "s2" not in active
assert "s3" in active
def test_weight_management(self) -> None:
"""Test weight management."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
portfolio.add_strategy("s2")
# With equal weight method, should auto-calculate
weights = portfolio.get_target_weights()
assert weights["s1"] == weights["s2"] == 0.5
def test_set_custom_weights(self) -> None:
"""Test setting custom weights."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
portfolio.add_strategy("s2")
portfolio.add_strategy("s3")
custom_weights = {"s1": 0.5, "s2": 0.3, "s3": 0.2}
portfolio.set_weights(custom_weights)
weights = portfolio.get_target_weights()
assert weights["s1"] == 0.5
assert weights["s2"] == 0.3
assert weights["s3"] == 0.2
def test_signal_aggregation(self) -> None:
"""Test signal aggregation through portfolio."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1", status=StrategyStatus.ACTIVE)
portfolio.add_strategy("s2", status=StrategyStatus.ACTIVE)
signals = {
"s1": {"signal": "buy", "confidence": 0.8},
"s2": {"signal": "buy", "confidence": 0.7},
}
result = portfolio.aggregate_signals(signals)
assert result.aggregated_signal == "buy"
def test_signal_aggregation_disabled_skipped(self) -> None:
"""Test that disabled strategies are skipped in aggregation."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1", status=StrategyStatus.ACTIVE)
portfolio.add_strategy("s2", status=StrategyStatus.DISABLED)
signals = {
"s1": {"signal": "buy", "confidence": 0.8},
"s2": {"signal": "sell", "confidence": 0.9},
}
result = portfolio.aggregate_signals(signals)
# Should only consider s1 since s2 is disabled
assert result.aggregated_signal == "buy"
def test_performance_tracking(self) -> None:
"""Test performance tracking."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
portfolio.update_performance("s1", {
"total_return": 10.0,
"sharpe_ratio": 1.5,
"max_drawdown": -5.0,
})
perf = portfolio.get_performance("s1")
assert perf["total_return"] == 10.0
assert perf["sharpe_ratio"] == 1.5
def test_rebalance_check(self) -> None:
"""Test rebalance checking."""
portfolio = StrategyPortfolio(
"test",
rebalance_trigger=RebalanceTrigger.THRESHOLD,
)
portfolio.add_strategy("s1")
portfolio.add_strategy("s2")
# After adding strategies, weights should be equal
# Set up a scenario where drift exceeds threshold
portfolio.set_rebalance_config(drift_threshold=0.05)
# Force weight mismatch
portfolio._current_weights = {"s1": 0.7, "s2": 0.3}
portfolio._target_weights = {"s1": 0.5, "s2": 0.5}
assert portfolio.check_rebalance()
def test_portfolio_state(self) -> None:
"""Test portfolio state management."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
state = portfolio.get_state()
assert state.portfolio_id == "test"
assert "s1" in state.strategies
def test_portfolio_reset(self) -> None:
"""Test portfolio reset."""
portfolio = StrategyPortfolio("test")
portfolio.add_strategy("s1")
portfolio.add_strategy("s2")
portfolio.reset()
assert portfolio.list_strategies() == []
assert portfolio.get_weights() == {}
def test_portfolio_to_dict(self) -> None:
"""Test portfolio serialization."""
portfolio = StrategyPortfolio(
"test",
weight_method=WeightMethod.EQUAL,
aggregation_method=AggregationMethod.WEIGHTED,
)
portfolio.add_strategy("s1", status=StrategyStatus.ACTIVE)
portfolio.update_performance("s1", {"total_return": 5.0})
data = portfolio.to_dict()
assert data["portfolio_id"] == "test"
assert data["weight_method"] == "equal"
assert data["aggregation_method"] == "weighted"
assert "s1" in data["strategies"]
assert "performance" in data
def test_update_returns(self) -> None:
"""Test updating returns history."""
portfolio = StrategyPortfolio("test")
returns = pd.DataFrame({
"s1": np.random.normal(0.001, 0.02, 100),
"s2": np.random.normal(0.001, 0.02, 100),
})
portfolio.update_returns(returns)
assert not portfolio._returns_history.empty
# ==================== Test Integration ====================
class TestPortfolioIntegration:
"""Integration tests for portfolio components."""
def test_full_portfolio_workflow(self) -> None:
"""Test complete portfolio workflow."""
# Create portfolio
portfolio = StrategyPortfolio(
"integration_test",
weight_method=WeightMethod.EQUAL,
aggregation_method=AggregationMethod.WEIGHTED,
)
# Add strategies
portfolio.add_strategy("trend_following", status=StrategyStatus.ACTIVE)
portfolio.add_strategy("mean_reversion", status=StrategyStatus.ACTIVE)
portfolio.add_strategy("breakout", status=StrategyStatus.DISABLED)
# Verify strategies
assert len(portfolio.list_strategies()) == 3
assert len(portfolio.list_strategies(active_only=True)) == 2
# Check weights
weights = portfolio.get_target_weights()
assert len(weights) == 2 # Only active strategies
assert abs(sum(weights.values()) - 1.0) < 0.001
# Aggregate signals
signals = {
"trend_following": {"signal": "buy", "confidence": 0.8},
"mean_reversion": {"signal": "hold", "confidence": 0.6},
"breakout": {"signal": "sell", "confidence": 0.9}, # Disabled, should be ignored
}
result = portfolio.aggregate_signals(signals)
assert result.aggregated_signal == "buy"
# Update performance
portfolio.update_performance("trend_following", {
"total_return": 15.0,
"sharpe_ratio": 1.8,
})
perf = portfolio.get_performance()
assert "trend_following" in perf
def test_risk_parity_weight_update(self) -> None:
"""Test risk parity with returns data."""
portfolio = StrategyPortfolio(
"test",
weight_method=WeightMethod.RISK_PARITY,
)
portfolio.add_strategy("low_risk")
portfolio.add_strategy("high_risk")
# Add returns data
np.random.seed(42)
returns_data = pd.DataFrame({
"low_risk": np.random.normal(0.001, 0.01, 100),
"high_risk": np.random.normal(0.001, 0.08, 100),
})
portfolio.update_returns(returns_data)
portfolio.update_weight_method(WeightMethod.RISK_PARITY)
weights = portfolio.get_target_weights()
# Low risk strategy should have higher weight
assert weights["low_risk"] > weights["high_risk"]
if __name__ == "__main__":
pytest.main([__file__, "-v"])