stock/tests/test_evolution.py
ZhangPeng 9aecdd036c Initial commit: OpenClaw Trading - AI多智能体量化交易系统
- 添加项目核心代码和配置
- 添加前端界面 (Next.js)
- 添加单元测试
- 更新 .gitignore 排除缓存和依赖
2026-02-27 03:47:40 +08:00

1022 lines
31 KiB
Python

"""Unit tests for evolution algorithms module.
This module tests the genetic algorithm, genetic programming,
NSGA-II, and fitness evaluation components.
"""
import numpy as np
import pytest
from datetime import datetime
from openclaw.evolution.engine import (
EvolutionConfig,
EvolutionEngine,
EvolutionAlgorithm,
EvolutionStatus,
)
from openclaw.evolution.genetic_algorithm import (
GeneticAlgorithm,
GAConfig,
Chromosome,
SelectionOperator,
CrossoverOperator,
MutationOperator,
)
from openclaw.evolution.genetic_programming import (
GeneticProgramming,
GPConfig,
Node,
NodeType,
TreeChromosome,
)
from openclaw.evolution.nsga2 import (
NSGA2,
NSGA2Config,
Individual,
ObjectiveValue,
ParetoFront,
)
from openclaw.evolution.fitness import (
FitnessEvaluator,
FitnessMetrics,
)
class TestEvolutionConfig:
"""Test EvolutionConfig dataclass."""
def test_default_config(self):
"""Test default configuration values."""
config = EvolutionConfig()
assert config.population_size == 100
assert config.max_generations == 500
assert config.crossover_rate == 0.8
assert config.mutation_rate == 0.1
assert config.elite_size == 5
def test_custom_config(self):
"""Test custom configuration."""
config = EvolutionConfig(
population_size=50,
max_generations=200,
crossover_rate=0.7,
mutation_rate=0.2,
)
assert config.population_size == 50
assert config.max_generations == 200
assert config.crossover_rate == 0.7
assert config.mutation_rate == 0.2
def test_invalid_population_size(self):
"""Test that small population size raises error."""
with pytest.raises(ValueError, match="Population size must be at least"):
EvolutionConfig(population_size=5)
def test_invalid_crossover_rate(self):
"""Test that invalid crossover rate raises error."""
with pytest.raises(ValueError, match="Crossover rate must be between"):
EvolutionConfig(crossover_rate=1.5)
def test_invalid_mutation_rate(self):
"""Test that invalid mutation rate raises error."""
with pytest.raises(ValueError, match="Mutation rate must be between"):
EvolutionConfig(mutation_rate=-0.1)
def test_invalid_elite_size(self):
"""Test that elite size >= population raises error."""
with pytest.raises(ValueError, match="Elite size must be less than"):
EvolutionConfig(population_size=50, elite_size=50)
class TestEvolutionEngine:
"""Test EvolutionEngine class."""
def test_initialization(self):
"""Test engine initialization."""
config = EvolutionConfig(population_size=20)
def fitness_func(genes):
return np.sum(genes ** 2)
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=fitness_func,
)
assert engine.config == config
assert engine.algorithm == EvolutionAlgorithm.GA
assert engine.status == EvolutionStatus.IDLE
assert len(engine.population) == 0
def test_population_initialization(self):
"""Test population initialization."""
config = EvolutionConfig(population_size=20)
def fitness_func(genes):
return -np.sum((genes - 5) ** 2)
def init_func():
return Chromosome(genes=np.random.randn(5))
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=fitness_func,
)
engine.initialize_population(init_func)
assert len(engine.population) == 20
assert engine.best_individual is not None
def test_run_without_initialization_raises(self):
"""Test that running without initialization raises error."""
config = EvolutionConfig(population_size=20)
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=lambda x: 1.0,
)
with pytest.raises(ValueError, match="Population not initialized"):
engine.run()
def test_simple_optimization(self):
"""Test simple optimization problem."""
config = EvolutionConfig(
population_size=30,
max_generations=50,
elite_size=2,
)
# Optimize f(x) = -(x - 3)^2, max at x = 3
def fitness_func(genes):
x = genes[0]
return -(x - 3) ** 2
def init_func():
return Chromosome(genes=np.random.uniform(-10, 10, 1))
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=fitness_func,
)
engine.initialize_population(init_func)
monitor = engine.run()
# Should find solution close to 3
best = engine.get_best_individual()
assert best is not None
# best is Chromosome, genes attribute is the numpy array
assert abs(best.genes[0] - 3) < 2.0
assert len(monitor.best_fitness_history) > 0
def test_callback_registration(self):
"""Test callback registration."""
config = EvolutionConfig(population_size=20, max_generations=10)
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=lambda x: 1.0,
)
calls = []
def callback(generation, population, best_fitness, avg_fitness):
calls.append(generation)
engine.register_callback(callback)
# Initialize and run for a few generations
def init_func():
return Chromosome(genes=np.random.randn(3))
engine.initialize_population(init_func)
engine.run()
# Callback should have been called for each generation
assert len(calls) == config.max_generations
def test_population_stats(self):
"""Test getting population statistics."""
config = EvolutionConfig(population_size=20)
# Fitness function receives genes directly (numpy array)
def fitness_func(genes):
return float(np.sum(genes))
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=fitness_func,
)
def init_func():
return Chromosome(genes=np.random.randn(5))
engine.initialize_population(init_func)
stats = engine.get_population_stats()
assert "best" in stats
assert "worst" in stats
assert "mean" in stats
assert "std" in stats
def test_reset(self):
"""Test engine reset."""
config = EvolutionConfig(population_size=20)
engine = EvolutionEngine(
config=config,
algorithm=EvolutionAlgorithm.GA,
fitness_func=lambda x: 1.0,
)
def init_func():
return Chromosome(genes=np.random.randn(3))
engine.initialize_population(init_func)
engine.run()
engine.reset()
assert engine.status == EvolutionStatus.IDLE
assert len(engine.population) == 0
assert engine.best_individual is None
class TestGeneticAlgorithm:
"""Test GeneticAlgorithm class."""
def test_ga_initialization(self):
"""Test GA initialization."""
config = GAConfig(population_size=20)
def fitness_func(genes):
return np.sum(genes)
def gene_init_func():
return np.random.randn(5)
ga = GeneticAlgorithm(config, fitness_func, gene_init_func)
assert ga.config == config
assert ga.fitness_func == fitness_func
def test_ga_population_init(self):
"""Test GA population initialization."""
config = GAConfig(population_size=20)
ga = GeneticAlgorithm(
config=config,
fitness_func=lambda g: np.sum(g),
gene_init_func=lambda: np.random.randn(5),
)
ga.initialize()
assert len(ga.population) == 20
assert all(isinstance(c, Chromosome) for c in ga.population)
def test_roulette_selection(self):
"""Test roulette wheel selection."""
config = GAConfig(population_size=20, selection=SelectionOperator.ROULETTE)
ga = GeneticAlgorithm(
config=config,
fitness_func=lambda g: np.sum(g),
gene_init_func=lambda: np.random.randn(5),
)
ga.initialize()
selected = ga._select_roulette()
# Check selected is a valid chromosome by comparing genes
assert any(np.array_equal(selected.genes, c.genes) for c in ga.population)
def test_tournament_selection(self):
"""Test tournament selection."""
config = GAConfig(population_size=20, selection=SelectionOperator.TOURNAMENT)
ga = GeneticAlgorithm(
config=config,
fitness_func=lambda g: np.sum(g),
gene_init_func=lambda: np.random.randn(5),
)
ga.initialize()
selected = ga._select_tournament(tournament_size=3)
# Check selected is a valid chromosome by comparing genes
assert any(np.array_equal(selected.genes, c.genes) for c in ga.population)
def test_crossover_operators(self):
"""Test different crossover operators."""
parent1 = Chromosome(genes=np.array([1.0, 2.0, 3.0, 4.0]))
parent2 = Chromosome(genes=np.array([5.0, 6.0, 7.0, 8.0]))
config = GAConfig(crossover_rate=1.0)
ga = GeneticAlgorithm(config, lambda g: 1.0, lambda: np.zeros(4))
# Single point crossover
config.crossover = CrossoverOperator.SINGLE_POINT
c1, c2 = ga._crossover_single_point(parent1, parent2)
assert len(c1.genes) == len(parent1.genes)
# Two point crossover
c1, c2 = ga._crossover_two_point(parent1, parent2)
assert len(c1.genes) == len(parent1.genes)
# Uniform crossover
c1, c2 = ga._crossover_uniform(parent1, parent2)
assert len(c1.genes) == len(parent1.genes)
# Arithmetic crossover
c1, c2 = ga._crossover_arithmetic(parent1, parent2)
assert len(c1.genes) == len(parent1.genes)
def test_mutation_operators(self):
"""Test different mutation operators."""
config = GAConfig(mutation_rate=1.0, bounds=(-10, 10))
ga = GeneticAlgorithm(config, lambda g: 1.0, lambda: np.zeros(5))
ga.generation = 1
chromosome = Chromosome(genes=np.zeros(5))
# Gaussian mutation
config.mutation = MutationOperator.GAUSSIAN
mutated = ga._mutate_gaussian(chromosome)
assert len(mutated.genes) == len(chromosome.genes)
assert np.any(mutated.genes != chromosome.genes)
# Uniform mutation
config.mutation = MutationOperator.UNIFORM
mutated = ga._mutate_uniform(chromosome)
assert len(mutated.genes) == len(chromosome.genes)
# Boundary mutation
config.mutation = MutationOperator.BOUNDARY
mutated = ga._mutate_boundary(chromosome)
assert len(mutated.genes) == len(chromosome.genes)
def test_ga_optimization(self):
"""Test GA on a simple optimization problem."""
config = GAConfig(
population_size=30,
max_generations=50,
elite_size=2,
bounds=(-10, 10),
)
# Minimize (x - 5)^2 + (y + 3)^2
def fitness_func(genes):
x, y = genes[0], genes[1]
return -((x - 5) ** 2 + (y + 3) ** 2)
def gene_init_func():
return np.random.uniform(-10, 10, 2)
ga = GeneticAlgorithm(config, fitness_func, gene_init_func)
best = ga.run()
# Should be close to [5, -3]
assert abs(best.genes[0] - 5) < 1.0
assert abs(best.genes[1] + 3) < 1.0
def test_ga_statistics(self):
"""Test GA statistics."""
config = GAConfig(population_size=20)
ga = GeneticAlgorithm(
config=config,
fitness_func=lambda g: np.sum(g),
gene_init_func=lambda: np.random.randn(5),
)
ga.initialize()
stats = ga.get_statistics()
assert "generation" in stats
assert "best" in stats
assert "mean" in stats
assert "std" in stats
class TestGeneticProgramming:
"""Test GeneticProgramming class."""
def test_gp_initialization(self):
"""Test GP initialization."""
config = GPConfig(population_size=20)
def fitness_func(chromosome):
return 1.0
gp = GeneticProgramming(config, fitness_func)
assert gp.config == config
assert len(gp.population) == 0
def test_node_creation(self):
"""Test creating nodes."""
# Terminal node
terminal = Node(node_type=NodeType.CONSTANT, value=5.0)
assert terminal.is_terminal()
assert terminal.get_size() == 1
assert terminal.get_depth() == 0
# Function node
func = Node(
node_type=NodeType.ADD,
children=[
Node(node_type=NodeType.CONSTANT, value=1.0),
Node(node_type=NodeType.CONSTANT, value=2.0),
],
)
assert func.is_function()
assert func.get_size() == 3
assert func.get_depth() == 1
def test_node_evaluation(self):
"""Test node evaluation."""
# Simple expression: 2 + 3
tree = Node(
node_type=NodeType.ADD,
children=[
Node(node_type=NodeType.CONSTANT, value=2.0),
Node(node_type=NodeType.CONSTANT, value=3.0),
],
)
result = tree.evaluate({})
assert result == 5.0
def test_tree_copy(self):
"""Test tree copying."""
original = Node(
node_type=NodeType.MUL,
children=[
Node(node_type=NodeType.CONSTANT, value=2.0),
Node(node_type=NodeType.ADD,
children=[
Node(node_type=NodeType.CONSTANT, value=1.0),
Node(node_type=NodeType.CONSTANT, value=3.0),
]),
],
)
copy_node = original.copy()
assert copy_node.node_type == original.node_type
assert len(copy_node.children) == len(original.children)
# Modify copy should not affect original
copy_node.children[0].value = 10.0
assert original.children[0].value == 2.0
def test_gp_population_init(self):
"""Test GP population initialization."""
config = GPConfig(population_size=20, max_depth=5)
def fitness_func(chromosome):
return 1.0
gp = GeneticProgramming(config, fitness_func)
gp.initialize()
assert len(gp.population) == 20
assert all(isinstance(c, TreeChromosome) for c in gp.population)
assert all(c.get_depth() <= config.max_depth for c in gp.population)
def test_subtree_crossover(self):
"""Test subtree crossover."""
config = GPConfig(crossover_rate=1.0)
gp = GeneticProgramming(config, lambda c: 1.0)
parent1 = TreeChromosome(
root=Node(
node_type=NodeType.ADD,
children=[
Node(node_type=NodeType.CONSTANT, value=1.0),
Node(node_type=NodeType.CONSTANT, value=2.0),
],
),
)
parent2 = TreeChromosome(
root=Node(
node_type=NodeType.MUL,
children=[
Node(node_type=NodeType.CONSTANT, value=3.0),
Node(node_type=NodeType.CONSTANT, value=4.0),
],
),
)
child1, child2 = gp._subtree_crossover(parent1, parent2)
assert child1.get_size() > 0
assert child2.get_size() > 0
def test_point_mutation(self):
"""Test point mutation."""
config = GPConfig(mutation_rate=1.0)
gp = GeneticProgramming(config, lambda c: 1.0)
gp.initialize()
chromosome = TreeChromosome(
root=Node(node_type=NodeType.CONSTANT, value=5.0),
)
mutated = gp._point_mutation(chromosome)
assert mutated is not None
assert mutated.get_size() == chromosome.get_size()
def test_tree_simplification(self):
"""Test tree simplification."""
config = GPConfig()
gp = GeneticProgramming(config, lambda c: 1.0)
# Expression: 2 + 3 (both constants, should simplify to 5)
tree = TreeChromosome(
root=Node(
node_type=NodeType.ADD,
children=[
Node(node_type=NodeType.CONSTANT, value=2.0),
Node(node_type=NodeType.CONSTANT, value=3.0),
],
),
)
simplified = gp.simplify_tree(tree)
assert simplified.get_size() <= tree.get_size()
def test_gp_optimization(self):
"""Test GP on a simple symbolic regression problem."""
config = GPConfig(
population_size=30,
max_generations=50,
max_depth=5,
)
# Target: f(x) = 2x + 1
target_func = lambda x: 2 * x + 1
x_vals = np.linspace(-5, 5, 20)
y_vals = target_func(x_vals)
def fitness_func(chromosome):
total_error = 0
for x, y in zip(x_vals, y_vals):
context = {"price_close": x, "param_x": x}
try:
pred = chromosome.evaluate(context)
total_error += (pred - y) ** 2
except:
return -10000
return -total_error
gp = GeneticProgramming(
config,
fitness_func,
terminal_set=[NodeType.CONSTANT, NodeType.PARAMETER],
function_set=[NodeType.ADD, NodeType.MUL, NodeType.SUB],
)
best = gp.run()
assert best is not None
# Just verify we got a result, exact fitness depends on random evolution
assert isinstance(best.fitness, (int, float))
class TestNSGA2:
"""Test NSGA2 class."""
def test_nsga2_initialization(self):
"""Test NSGA2 initialization."""
config = NSGA2Config(population_size=20)
objectives = {
"profit": lambda g: np.sum(g),
"risk": lambda g: np.std(g),
}
nsga2 = NSGA2(
config=config,
objective_funcs=objectives,
gene_init_func=lambda: np.random.randn(5),
)
assert nsga2.config == config
assert len(nsga2.population) == 0
def test_nsga2_population_init(self):
"""Test NSGA2 population initialization."""
config = NSGA2Config(population_size=20)
nsga2 = NSGA2(
config=config,
objective_funcs={"f1": lambda g: g[0]},
gene_init_func=lambda: np.random.randn(3),
)
nsga2.initialize()
assert len(nsga2.population) == 20
assert all(isinstance(ind, Individual) for ind in nsga2.population)
def test_dominance(self):
"""Test dominance relations."""
ind1 = Individual(
genes=np.array([1.0]),
objectives={
"obj1": ObjectiveValue("obj1", 10.0, minimize=False),
"obj2": ObjectiveValue("obj2", 5.0, minimize=True),
},
)
ind2 = Individual(
genes=np.array([2.0]),
objectives={
"obj1": ObjectiveValue("obj1", 5.0, minimize=False), # Worse
"obj2": ObjectiveValue("obj2", 10.0, minimize=True), # Worse
},
)
# ind1 dominates ind2 (better on both objectives)
assert ind1.dominates(ind2)
assert not ind2.dominates(ind1)
def test_non_dominated_sorting(self):
"""Test non-dominated sorting."""
config = NSGA2Config(population_size=10)
nsga2 = NSGA2(
config=config,
objective_funcs={
"f1": lambda g: g[0],
"f2": lambda g: -g[0], # Conflict with f1
},
gene_init_func=lambda: np.random.randn(2),
)
nsga2.initialize()
fronts = nsga2._fast_non_dominated_sort()
assert len(fronts) > 0
assert len(fronts[0].individuals) > 0
# All individuals should be in some front
total = sum(len(f.individuals) for f in fronts)
assert total == len(nsga2.population)
def test_crowding_distance(self):
"""Test crowding distance calculation."""
front = ParetoFront(
individuals=[
Individual(
genes=np.array([float(i)]),
objectives={
"f1": ObjectiveValue("f1", float(i), minimize=False),
"f2": ObjectiveValue("f2", float(10 - i), minimize=False),
},
)
for i in range(5)
]
)
config = NSGA2Config()
nsga2 = NSGA2(config, {}, lambda: np.array([0.0]))
nsga2._calculate_crowding_distance(front)
# Boundary individuals should have infinite distance
assert front.individuals[0].crowding_distance == float("inf")
assert front.individuals[-1].crowding_distance == float("inf")
# Interior individuals should have finite distances
assert front.individuals[1].crowding_distance > 0
assert front.individuals[2].crowding_distance > 0
def test_nsga2_optimization(self):
"""Test NSGA2 on a simple multi-objective problem."""
config = NSGA2Config(
population_size=30,
max_generations=50,
)
# Schaffer N.1 function: minimize f1 = x^2, f2 = (x-2)^2
objectives = {
"f1": lambda g: g[0] ** 2,
"f2": lambda g: (g[0] - 2) ** 2,
}
nsga2 = NSGA2(
config=config,
objective_funcs=objectives,
gene_init_func=lambda: np.random.uniform(-5, 5, 1),
bounds=(np.array([-5]), np.array([5])),
)
fronts = nsga2.run()
assert len(fronts) > 0
assert len(fronts[0].individuals) > 0
# First front should have diverse solutions
pareto_solutions = nsga2.get_pareto_front_solutions()
assert len(pareto_solutions) > 0
def test_nsga2_statistics(self):
"""Test NSGA2 statistics."""
config = NSGA2Config(population_size=20)
nsga2 = NSGA2(
config=config,
objective_funcs={"f1": lambda g: g[0]},
gene_init_func=lambda: np.random.randn(2),
)
nsga2.initialize()
stats = nsga2.get_statistics()
assert "generation" in stats
assert "population_size" in stats
assert "num_fronts" in stats
class TestFitnessEvaluator:
"""Test FitnessEvaluator class."""
def test_fitness_metrics_creation(self):
"""Test FitnessMetrics creation."""
metrics = FitnessMetrics(
total_return=15.5,
sharpe_ratio=1.2,
max_drawdown=10.0,
win_rate=60.0,
)
assert metrics.total_return == 15.5
assert metrics.sharpe_ratio == 1.2
assert metrics.max_drawdown == 10.0
assert metrics.win_rate == 60.0
def test_metrics_to_dict(self):
"""Test converting metrics to dictionary."""
metrics = FitnessMetrics(total_return=10.0, sharpe_ratio=1.0)
d = metrics.to_dict()
assert d["total_return"] == 10.0
assert d["sharpe_ratio"] == 1.0
def test_sharpe_fitness(self):
"""Test Sharpe ratio fitness calculation."""
evaluator = FitnessEvaluator()
# Create mock backtest result with required attributes
class MockResult:
total_trades = 20
sharpe_ratio = 1.5
max_drawdown = 15.0
total_return = 20.0
volatility = 10.0
win_rate = 60.0
winning_trades = 12
losing_trades = 8
avg_win = 150.0
avg_loss = -100.0
profit_factor = 2.0
calmar_ratio = 1.5
fitness = evaluator.calculate_fitness_sharpe(MockResult())
assert isinstance(fitness, float)
# Should return some value (may be negative due to drawdown penalty)
def test_profit_risk_fitness(self):
"""Test profit-risk balanced fitness."""
evaluator = FitnessEvaluator()
class MockResult:
total_trades = 20
total_return = 30.0
max_drawdown = 20.0
sharpe_ratio = 1.5
volatility = 15.0
win_rate = 60.0
winning_trades = 12
losing_trades = 8
avg_win = 150.0
avg_loss = -100.0
profit_factor = 2.0
calmar_ratio = 1.5
fitness = evaluator.calculate_fitness_profit_risk(
MockResult(), risk_weight=0.5
)
assert isinstance(fitness, (int, float))
def test_multi_objective_fitness(self):
"""Test multi-objective fitness."""
evaluator = FitnessEvaluator()
metrics = FitnessMetrics(
total_return=20.0,
sharpe_ratio=1.5,
win_rate=65.0,
num_trades=30,
)
fitness = evaluator.calculate_fitness_multi_objective(
metrics=metrics,
objectives=["total_return", "sharpe_ratio", "win_rate"],
weights=[0.4, 0.4, 0.2],
)
assert fitness is not None
def test_fitness_with_insufficient_trades(self):
"""Test fitness with insufficient trades."""
evaluator = FitnessEvaluator(min_trades=20)
class MockResult:
total_trades = 5
sharpe_ratio = 2.0
fitness = evaluator.calculate_fitness_sharpe(MockResult())
assert fitness == -1.0
def test_sortino_fitness(self):
"""Test Sortino ratio fitness."""
evaluator = FitnessEvaluator()
# Generate sample returns
np.random.seed(42)
returns = np.random.normal(0.001, 0.02, 100)
fitness = evaluator.calculate_fitness_sortino(returns=returns)
assert isinstance(fitness, float)
def test_fitness_function_factory(self):
"""Test creating fitness functions."""
evaluator = FitnessEvaluator()
sharpe_func = evaluator.create_fitness_function("sharpe")
assert callable(sharpe_func)
profit_risk_func = evaluator.create_fitness_function(
"profit_risk", risk_weight=0.6
)
assert callable(profit_risk_func)
def test_convergence_metrics(self):
"""Test convergence metrics calculation."""
evaluator = FitnessEvaluator()
# Converged history (stable values around 1.0)
converged_history = [1.0 + 0.001 * np.sin(i) for i in range(100)]
metrics = evaluator.get_convergence_metrics(converged_history, window=20)
assert "converged" in metrics
assert "improvement_rate" in metrics
assert "stability" in metrics
# Stability should be high for flat curve
assert metrics["stability"] > 0.5
def test_population_evaluation(self):
"""Test evaluating a population."""
evaluator = FitnessEvaluator()
# Create mock results
def create_mock(i):
m = type("Mock", (), {})()
m.total_trades = 20
m.sharpe_ratio = 1.0 + i * 0.1
m.max_drawdown = 10.0
m.total_return = 15.0
m.volatility = 12.0
m.winning_trades = 12
m.losing_trades = 8
return m
results = [create_mock(i) for i in range(5)]
scores = evaluator.evaluate_population(results)
assert len(scores) == 5
assert all(isinstance(s, float) for s in scores)
class TestIntegration:
"""Integration tests for evolution module."""
def test_ga_with_fitness_evaluator(self):
"""Test GA using FitnessEvaluator."""
config = GAConfig(population_size=30, max_generations=30)
evaluator = FitnessEvaluator()
# Simple optimization: maximize x where x in [0, 10]
def fitness_func(genes):
x = genes[0]
return x if 0 <= x <= 10 else -100
def gene_init_func():
return np.random.uniform(0, 10, 1)
ga = GeneticAlgorithm(config, fitness_func, gene_init_func)
best = ga.run()
assert best.genes[0] > 8 # Should be close to 10
def test_gp_complex_expression(self):
"""Test GP with complex expression evaluation."""
config = GPConfig(population_size=20, max_depth=4)
def fitness_func(chromosome):
# Test if tree can compute x^2 + 2x + 1 for various x
errors = []
for x in np.linspace(-2, 2, 10):
context = {"param_x": x, "price_close": x}
try:
result = chromosome.evaluate(context)
expected = x ** 2 + 2 * x + 1
errors.append((result - expected) ** 2)
except:
errors.append(1000)
return -np.mean(errors)
gp = GeneticProgramming(config, fitness_func)
gp.initialize()
# Run a few generations
for _ in range(10):
gp.step()
assert gp.best_chromosome is not None
def test_nsga2_with_multiple_objectives(self):
"""Test NSGA2 with realistic trading objectives."""
config = NSGA2Config(population_size=20, max_generations=20)
# Two objectives: profit (maximize), low risk (minimize)
objectives = {
"profit": lambda g: np.sum(g ** 2), # Maximize
"risk": lambda g: np.std(g), # Minimize
}
nsga2 = NSGA2(
config=config,
objective_funcs=objectives,
gene_init_func=lambda: np.random.randn(5),
)
fronts = nsga2.run()
assert len(fronts) > 0
assert len(fronts[0].individuals) > 0
def test_evolution_callback(self):
"""Test evolution with callback tracking progress."""
config = GAConfig(population_size=20, max_generations=10)
progress = []
def callback(gen, fitness):
progress.append((gen, fitness))
ga = GeneticAlgorithm(
config,
lambda g: -np.sum((g - 5) ** 2),
lambda: np.random.randn(1),
)
ga.initialize()
for _ in range(10):
fitness = ga.step()
assert len(progress) == 0 # No callback passed to step
# Test with callback in run
progress = []
ga2 = GeneticAlgorithm(
config,
lambda g: -np.sum((g - 5) ** 2),
lambda: np.random.randn(1),
)
ga2.run(callback=lambda gen, fit: progress.append((gen, fit)))
assert len(progress) == 10