stock/tests/unit/test_monitoring.py
2026-02-27 03:17:12 +08:00

489 lines
17 KiB
Python

"""Unit tests for the monitoring module."""
from openclaw.core.economy import SurvivalStatus, TradingEconomicTracker
from openclaw.monitoring.status import (
AgentStatusSnapshot,
StatusChange,
StatusMonitor,
StatusReport,
)
class TestAgentStatusSnapshot:
"""Tests for AgentStatusSnapshot dataclass."""
def test_snapshot_creation(self) -> None:
"""Test creating an AgentStatusSnapshot."""
snapshot = AgentStatusSnapshot(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
balance=1000.0,
initial_capital=1000.0,
status=SurvivalStatus.STABLE,
total_costs=0.0,
realized_pnl=0.0,
net_profit=0.0,
)
assert snapshot.agent_id == "test_agent"
assert snapshot.balance == 1000.0
assert snapshot.status == SurvivalStatus.STABLE
def test_snapshot_to_dict(self) -> None:
"""Test converting snapshot to dictionary."""
snapshot = AgentStatusSnapshot(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
balance=1500.0,
initial_capital=1000.0,
status=SurvivalStatus.THRIVING,
total_costs=50.0,
realized_pnl=550.0,
net_profit=500.0,
)
data = snapshot.to_dict()
assert data["agent_id"] == "test_agent"
assert data["balance"] == 1500.0
assert data["status"] == "🚀 thriving"
assert data["net_profit"] == 500.0
class TestStatusChange:
"""Tests for StatusChange dataclass."""
def test_status_change_creation(self) -> None:
"""Test creating a StatusChange."""
change = StatusChange(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
old_status=SurvivalStatus.STABLE,
new_status=SurvivalStatus.THRIVING,
balance=1500.0,
)
assert change.agent_id == "test_agent"
assert change.old_status == SurvivalStatus.STABLE
assert change.new_status == SurvivalStatus.THRIVING
def test_status_change_str(self) -> None:
"""Test StatusChange string representation."""
change = StatusChange(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
old_status=SurvivalStatus.STABLE,
new_status=SurvivalStatus.THRIVING,
balance=1500.0,
)
result = str(change)
assert "test_agent" in result
assert "💪 stable" in result
assert "🚀 thriving" in result
assert "$1500.00" in result
class TestStatusReport:
"""Tests for StatusReport dataclass."""
def test_report_creation(self) -> None:
"""Test creating a StatusReport."""
snapshot = AgentStatusSnapshot(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
balance=1000.0,
initial_capital=1000.0,
status=SurvivalStatus.STABLE,
total_costs=0.0,
realized_pnl=0.0,
net_profit=0.0,
)
report = StatusReport(
timestamp="2024-01-01T00:00:00",
total_agents=1,
status_counts={SurvivalStatus.STABLE: 1},
agents=[snapshot],
changes=[],
summary="Test summary",
)
assert report.total_agents == 1
assert report.status_counts[SurvivalStatus.STABLE] == 1
assert report.summary == "Test summary"
def test_report_to_dict(self) -> None:
"""Test converting report to dictionary."""
snapshot = AgentStatusSnapshot(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
balance=1000.0,
initial_capital=1000.0,
status=SurvivalStatus.STABLE,
total_costs=0.0,
realized_pnl=0.0,
net_profit=0.0,
)
report = StatusReport(
timestamp="2024-01-01T00:00:00",
total_agents=1,
status_counts={SurvivalStatus.STABLE: 1},
agents=[snapshot],
changes=[],
summary="Test summary",
)
data = report.to_dict()
assert data["total_agents"] == 1
assert data["status_counts"]["💪 stable"] == 1
assert data["summary"] == "Test summary"
def test_report_to_json(self) -> None:
"""Test converting report to JSON."""
report = StatusReport(
timestamp="2024-01-01T00:00:00",
total_agents=0,
status_counts={},
agents=[],
changes=[],
)
json_str = report.to_json()
assert "total_agents" in json_str
assert "timestamp" in json_str
def test_report_to_text(self) -> None:
"""Test generating text report."""
snapshot = AgentStatusSnapshot(
agent_id="test_agent",
timestamp="2024-01-01T00:00:00",
balance=1000.0,
initial_capital=1000.0,
status=SurvivalStatus.STABLE,
total_costs=0.0,
realized_pnl=0.0,
net_profit=0.0,
)
report = StatusReport(
timestamp="2024-01-01T00:00:00",
total_agents=1,
status_counts={SurvivalStatus.STABLE: 1},
agents=[snapshot],
changes=[],
)
text = report.to_text()
assert "OpenClaw Agent Status Report" in text
assert "test_agent" in text
assert "Total Agents: 1" in text
class TestStatusMonitor:
"""Tests for StatusMonitor class."""
def test_monitor_creation(self) -> None:
"""Test creating a StatusMonitor."""
monitor = StatusMonitor()
assert monitor.agent_count == 0
assert monitor.bankrupt_count == 0
assert monitor.thriving_count == 0
def test_register_agent(self) -> None:
"""Test registering an agent."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
assert monitor.agent_count == 1
assert "test_agent" in monitor._agents
def test_unregister_agent(self) -> None:
"""Test unregistering an agent."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
assert monitor.agent_count == 1
monitor.unregister_agent("test_agent")
assert monitor.agent_count == 0
def test_get_snapshot_existing_agent(self) -> None:
"""Test getting snapshot for existing agent."""
monitor = StatusMonitor()
# Use 5000 initial so 10000 balance is THRIVING (>= 150% of 5000 = 7500)
tracker = TradingEconomicTracker("test_agent", initial_capital=5000.0)
# Manually set balance to 10000 to be in THRIVING state
tracker._update_balance(5000.0, "Extra capital")
monitor.register_agent("test_agent", tracker)
snapshot = monitor.get_snapshot("test_agent")
assert snapshot is not None
assert snapshot.agent_id == "test_agent"
assert snapshot.balance == 10000.0
assert snapshot.status == SurvivalStatus.THRIVING
def test_get_snapshot_nonexistent_agent(self) -> None:
"""Test getting snapshot for non-existent agent."""
monitor = StatusMonitor()
snapshot = monitor.get_snapshot("nonexistent")
assert snapshot is None
def test_get_all_snapshots(self) -> None:
"""Test getting all agent snapshots."""
monitor = StatusMonitor()
tracker1 = TradingEconomicTracker("agent_1", initial_capital=10000.0)
tracker2 = TradingEconomicTracker("agent_2", initial_capital=5000.0)
monitor.register_agent("agent_1", tracker1)
monitor.register_agent("agent_2", tracker2)
snapshots = monitor.get_all_snapshots()
assert len(snapshots) == 2
agent_ids = {s.agent_id for s in snapshots}
assert agent_ids == {"agent_1", "agent_2"}
def test_update_detects_status_change(self) -> None:
"""Test that update detects status changes."""
# Use 8000 initial so starting balance (8000) is STABLE (>= 8800 threshold? No, 8000 < 8800)
# Actually 8000 is STRUGGLING. Let me use 5000 so 10000 is THRIVING
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=5000.0)
monitor.register_agent("test_agent", tracker)
# 5000 is below 5500 (stable threshold), so it's STRUGGLING
assert monitor._last_status["test_agent"] == SurvivalStatus.STRUGGLING
# Win enough to go to THRIVING (need >= 7500 = 150% of 5000)
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=True,
win_amount=3000.0, # Balance will be ~8000 after costs
loss_amount=0.0,
)
changes = monitor.update()
assert len(changes) == 1
assert changes[0].agent_id == "test_agent"
assert changes[0].old_status == SurvivalStatus.STRUGGLING
assert changes[0].new_status == SurvivalStatus.THRIVING
def test_update_no_change(self) -> None:
"""Test update when no status change occurs."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
# Small trade that doesn't change status
tracker.calculate_trade_cost(
trade_value=100.0,
is_win=True,
win_amount=10.0,
loss_amount=0.0,
)
changes = monitor.update()
assert len(changes) == 0
def test_get_status_changes_single_agent(self) -> None:
"""Test getting status changes for a single agent."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
# Trigger status change to thriving
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=True,
win_amount=6000.0,
loss_amount=0.0,
)
monitor.update()
changes = monitor.get_status_changes("test_agent")
assert len(changes) == 1
assert changes[0].new_status == SurvivalStatus.THRIVING
def test_get_status_changes_all_agents(self) -> None:
"""Test getting status changes for all agents."""
monitor = StatusMonitor()
tracker1 = TradingEconomicTracker("agent_1", initial_capital=10000.0)
tracker2 = TradingEconomicTracker("agent_2", initial_capital=10000.0)
monitor.register_agent("agent_1", tracker1)
monitor.register_agent("agent_2", tracker2)
# Trigger status change for agent_1
tracker1.calculate_trade_cost(
trade_value=1000.0,
is_win=True,
win_amount=6000.0,
loss_amount=0.0,
)
monitor.update()
all_changes = monitor.get_status_changes()
assert len(all_changes) == 1
assert all_changes[0].agent_id == "agent_1"
def test_generate_report(self) -> None:
"""Test generating a status report."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
report = monitor.generate_report()
assert report.total_agents == 1
assert len(report.agents) == 1
assert report.agents[0].agent_id == "test_agent"
assert SurvivalStatus.STABLE in report.status_counts
def test_generate_report_with_bankrupt_agent(self) -> None:
"""Test report generation includes bankrupt alert."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
# Lose enough to go bankrupt (< 30%)
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=False,
win_amount=0.0,
loss_amount=8000.0,
)
monitor.update()
report = monitor.generate_report()
assert "bankrupt" in report.summary.lower()
assert report.status_counts.get(SurvivalStatus.BANKRUPT, 0) == 1
def test_generate_report_all_thriving(self) -> None:
"""Test report when all agents are thriving."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
# Win enough to go thriving (> 150%)
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=True,
win_amount=6000.0,
loss_amount=0.0,
)
monitor.update()
report = monitor.generate_report()
assert "thriving" in report.summary.lower()
def test_bankrupt_count_property(self) -> None:
"""Test bankrupt_count property."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
assert monitor.bankrupt_count == 0
# Lose enough to go bankrupt
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=False,
win_amount=0.0,
loss_amount=8000.0,
)
assert monitor.bankrupt_count == 1
def test_thriving_count_property(self) -> None:
"""Test thriving_count property."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
assert monitor.thriving_count == 0
# Win enough to go thriving
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=True,
win_amount=6000.0,
loss_amount=0.0,
)
assert monitor.thriving_count == 1
def test_multiple_status_transitions(self) -> None:
"""Test tracking multiple status transitions."""
# Use 8000 initial so we can test STRUGGLING -> THRIVING -> STABLE
# STRUGGLING: < 6400 bankrupt, < 6400-7040 critical, 7040-8800 struggling
# STABLE: >= 8800 (1.1 * 8000)
# THRIVING: >= 12000 (1.5 * 8000)
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=8000.0)
monitor.register_agent("test_agent", tracker)
# STRUGGLING -> THRIVING (win 6000, balance ~14000)
tracker.calculate_trade_cost(
trade_value=1000.0,
is_win=True,
win_amount=6000.0,
loss_amount=0.0,
)
monitor.update()
# THRIVING -> STABLE (lose 3000, balance ~11000 which is stable)
tracker._update_balance(-3000.0, "Partial loss")
changes = monitor.update()
assert len(changes) == 1
assert changes[0].old_status == SurvivalStatus.THRIVING
assert changes[0].new_status == SurvivalStatus.STABLE
# Check history has both transitions
history = monitor.get_status_changes("test_agent")
assert len(history) == 2
def test_save_report_text(self, tmp_path) -> None:
"""Test saving report in text format."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
filepath = tmp_path / "report.txt"
monitor.save_report(filepath, format="text")
assert filepath.exists()
content = filepath.read_text()
assert "OpenClaw Agent Status Report" in content
def test_save_report_json(self, tmp_path) -> None:
"""Test saving report in JSON format."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
filepath = tmp_path / "report.json"
monitor.save_report(filepath, format="json")
assert filepath.exists()
content = filepath.read_text()
assert "total_agents" in content
assert "timestamp" in content
def test_save_report_creates_directories(self, tmp_path) -> None:
"""Test that save_report creates parent directories."""
monitor = StatusMonitor()
tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0)
monitor.register_agent("test_agent", tracker)
nested_path = tmp_path / "subdir" / "nested" / "report.txt"
monitor.save_report(nested_path, format="text")
assert nested_path.exists()