"""Unit tests for the monitoring module.""" from openclaw.core.economy import SurvivalStatus, TradingEconomicTracker from openclaw.monitoring.status import ( AgentStatusSnapshot, StatusChange, StatusMonitor, StatusReport, ) class TestAgentStatusSnapshot: """Tests for AgentStatusSnapshot dataclass.""" def test_snapshot_creation(self) -> None: """Test creating an AgentStatusSnapshot.""" snapshot = AgentStatusSnapshot( agent_id="test_agent", timestamp="2024-01-01T00:00:00", balance=1000.0, initial_capital=1000.0, status=SurvivalStatus.STABLE, total_costs=0.0, realized_pnl=0.0, net_profit=0.0, ) assert snapshot.agent_id == "test_agent" assert snapshot.balance == 1000.0 assert snapshot.status == SurvivalStatus.STABLE def test_snapshot_to_dict(self) -> None: """Test converting snapshot to dictionary.""" snapshot = AgentStatusSnapshot( agent_id="test_agent", timestamp="2024-01-01T00:00:00", balance=1500.0, initial_capital=1000.0, status=SurvivalStatus.THRIVING, total_costs=50.0, realized_pnl=550.0, net_profit=500.0, ) data = snapshot.to_dict() assert data["agent_id"] == "test_agent" assert data["balance"] == 1500.0 assert data["status"] == "🚀 thriving" assert data["net_profit"] == 500.0 class TestStatusChange: """Tests for StatusChange dataclass.""" def test_status_change_creation(self) -> None: """Test creating a StatusChange.""" change = StatusChange( agent_id="test_agent", timestamp="2024-01-01T00:00:00", old_status=SurvivalStatus.STABLE, new_status=SurvivalStatus.THRIVING, balance=1500.0, ) assert change.agent_id == "test_agent" assert change.old_status == SurvivalStatus.STABLE assert change.new_status == SurvivalStatus.THRIVING def test_status_change_str(self) -> None: """Test StatusChange string representation.""" change = StatusChange( agent_id="test_agent", timestamp="2024-01-01T00:00:00", old_status=SurvivalStatus.STABLE, new_status=SurvivalStatus.THRIVING, balance=1500.0, ) result = str(change) assert "test_agent" in result assert "💪 stable" in result assert "🚀 thriving" in result assert "$1500.00" in result class TestStatusReport: """Tests for StatusReport dataclass.""" def test_report_creation(self) -> None: """Test creating a StatusReport.""" snapshot = AgentStatusSnapshot( agent_id="test_agent", timestamp="2024-01-01T00:00:00", balance=1000.0, initial_capital=1000.0, status=SurvivalStatus.STABLE, total_costs=0.0, realized_pnl=0.0, net_profit=0.0, ) report = StatusReport( timestamp="2024-01-01T00:00:00", total_agents=1, status_counts={SurvivalStatus.STABLE: 1}, agents=[snapshot], changes=[], summary="Test summary", ) assert report.total_agents == 1 assert report.status_counts[SurvivalStatus.STABLE] == 1 assert report.summary == "Test summary" def test_report_to_dict(self) -> None: """Test converting report to dictionary.""" snapshot = AgentStatusSnapshot( agent_id="test_agent", timestamp="2024-01-01T00:00:00", balance=1000.0, initial_capital=1000.0, status=SurvivalStatus.STABLE, total_costs=0.0, realized_pnl=0.0, net_profit=0.0, ) report = StatusReport( timestamp="2024-01-01T00:00:00", total_agents=1, status_counts={SurvivalStatus.STABLE: 1}, agents=[snapshot], changes=[], summary="Test summary", ) data = report.to_dict() assert data["total_agents"] == 1 assert data["status_counts"]["💪 stable"] == 1 assert data["summary"] == "Test summary" def test_report_to_json(self) -> None: """Test converting report to JSON.""" report = StatusReport( timestamp="2024-01-01T00:00:00", total_agents=0, status_counts={}, agents=[], changes=[], ) json_str = report.to_json() assert "total_agents" in json_str assert "timestamp" in json_str def test_report_to_text(self) -> None: """Test generating text report.""" snapshot = AgentStatusSnapshot( agent_id="test_agent", timestamp="2024-01-01T00:00:00", balance=1000.0, initial_capital=1000.0, status=SurvivalStatus.STABLE, total_costs=0.0, realized_pnl=0.0, net_profit=0.0, ) report = StatusReport( timestamp="2024-01-01T00:00:00", total_agents=1, status_counts={SurvivalStatus.STABLE: 1}, agents=[snapshot], changes=[], ) text = report.to_text() assert "OpenClaw Agent Status Report" in text assert "test_agent" in text assert "Total Agents: 1" in text class TestStatusMonitor: """Tests for StatusMonitor class.""" def test_monitor_creation(self) -> None: """Test creating a StatusMonitor.""" monitor = StatusMonitor() assert monitor.agent_count == 0 assert monitor.bankrupt_count == 0 assert monitor.thriving_count == 0 def test_register_agent(self) -> None: """Test registering an agent.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) assert monitor.agent_count == 1 assert "test_agent" in monitor._agents def test_unregister_agent(self) -> None: """Test unregistering an agent.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) assert monitor.agent_count == 1 monitor.unregister_agent("test_agent") assert monitor.agent_count == 0 def test_get_snapshot_existing_agent(self) -> None: """Test getting snapshot for existing agent.""" monitor = StatusMonitor() # Use 5000 initial so 10000 balance is THRIVING (>= 150% of 5000 = 7500) tracker = TradingEconomicTracker("test_agent", initial_capital=5000.0) # Manually set balance to 10000 to be in THRIVING state tracker._update_balance(5000.0, "Extra capital") monitor.register_agent("test_agent", tracker) snapshot = monitor.get_snapshot("test_agent") assert snapshot is not None assert snapshot.agent_id == "test_agent" assert snapshot.balance == 10000.0 assert snapshot.status == SurvivalStatus.THRIVING def test_get_snapshot_nonexistent_agent(self) -> None: """Test getting snapshot for non-existent agent.""" monitor = StatusMonitor() snapshot = monitor.get_snapshot("nonexistent") assert snapshot is None def test_get_all_snapshots(self) -> None: """Test getting all agent snapshots.""" monitor = StatusMonitor() tracker1 = TradingEconomicTracker("agent_1", initial_capital=10000.0) tracker2 = TradingEconomicTracker("agent_2", initial_capital=5000.0) monitor.register_agent("agent_1", tracker1) monitor.register_agent("agent_2", tracker2) snapshots = monitor.get_all_snapshots() assert len(snapshots) == 2 agent_ids = {s.agent_id for s in snapshots} assert agent_ids == {"agent_1", "agent_2"} def test_update_detects_status_change(self) -> None: """Test that update detects status changes.""" # Use 8000 initial so starting balance (8000) is STABLE (>= 8800 threshold? No, 8000 < 8800) # Actually 8000 is STRUGGLING. Let me use 5000 so 10000 is THRIVING monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=5000.0) monitor.register_agent("test_agent", tracker) # 5000 is below 5500 (stable threshold), so it's STRUGGLING assert monitor._last_status["test_agent"] == SurvivalStatus.STRUGGLING # Win enough to go to THRIVING (need >= 7500 = 150% of 5000) tracker.calculate_trade_cost( trade_value=1000.0, is_win=True, win_amount=3000.0, # Balance will be ~8000 after costs loss_amount=0.0, ) changes = monitor.update() assert len(changes) == 1 assert changes[0].agent_id == "test_agent" assert changes[0].old_status == SurvivalStatus.STRUGGLING assert changes[0].new_status == SurvivalStatus.THRIVING def test_update_no_change(self) -> None: """Test update when no status change occurs.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) # Small trade that doesn't change status tracker.calculate_trade_cost( trade_value=100.0, is_win=True, win_amount=10.0, loss_amount=0.0, ) changes = monitor.update() assert len(changes) == 0 def test_get_status_changes_single_agent(self) -> None: """Test getting status changes for a single agent.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) # Trigger status change to thriving tracker.calculate_trade_cost( trade_value=1000.0, is_win=True, win_amount=6000.0, loss_amount=0.0, ) monitor.update() changes = monitor.get_status_changes("test_agent") assert len(changes) == 1 assert changes[0].new_status == SurvivalStatus.THRIVING def test_get_status_changes_all_agents(self) -> None: """Test getting status changes for all agents.""" monitor = StatusMonitor() tracker1 = TradingEconomicTracker("agent_1", initial_capital=10000.0) tracker2 = TradingEconomicTracker("agent_2", initial_capital=10000.0) monitor.register_agent("agent_1", tracker1) monitor.register_agent("agent_2", tracker2) # Trigger status change for agent_1 tracker1.calculate_trade_cost( trade_value=1000.0, is_win=True, win_amount=6000.0, loss_amount=0.0, ) monitor.update() all_changes = monitor.get_status_changes() assert len(all_changes) == 1 assert all_changes[0].agent_id == "agent_1" def test_generate_report(self) -> None: """Test generating a status report.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) report = monitor.generate_report() assert report.total_agents == 1 assert len(report.agents) == 1 assert report.agents[0].agent_id == "test_agent" assert SurvivalStatus.STABLE in report.status_counts def test_generate_report_with_bankrupt_agent(self) -> None: """Test report generation includes bankrupt alert.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) # Lose enough to go bankrupt (< 30%) tracker.calculate_trade_cost( trade_value=1000.0, is_win=False, win_amount=0.0, loss_amount=8000.0, ) monitor.update() report = monitor.generate_report() assert "bankrupt" in report.summary.lower() assert report.status_counts.get(SurvivalStatus.BANKRUPT, 0) == 1 def test_generate_report_all_thriving(self) -> None: """Test report when all agents are thriving.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) # Win enough to go thriving (> 150%) tracker.calculate_trade_cost( trade_value=1000.0, is_win=True, win_amount=6000.0, loss_amount=0.0, ) monitor.update() report = monitor.generate_report() assert "thriving" in report.summary.lower() def test_bankrupt_count_property(self) -> None: """Test bankrupt_count property.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) assert monitor.bankrupt_count == 0 # Lose enough to go bankrupt tracker.calculate_trade_cost( trade_value=1000.0, is_win=False, win_amount=0.0, loss_amount=8000.0, ) assert monitor.bankrupt_count == 1 def test_thriving_count_property(self) -> None: """Test thriving_count property.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) assert monitor.thriving_count == 0 # Win enough to go thriving tracker.calculate_trade_cost( trade_value=1000.0, is_win=True, win_amount=6000.0, loss_amount=0.0, ) assert monitor.thriving_count == 1 def test_multiple_status_transitions(self) -> None: """Test tracking multiple status transitions.""" # Use 8000 initial so we can test STRUGGLING -> THRIVING -> STABLE # STRUGGLING: < 6400 bankrupt, < 6400-7040 critical, 7040-8800 struggling # STABLE: >= 8800 (1.1 * 8000) # THRIVING: >= 12000 (1.5 * 8000) monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=8000.0) monitor.register_agent("test_agent", tracker) # STRUGGLING -> THRIVING (win 6000, balance ~14000) tracker.calculate_trade_cost( trade_value=1000.0, is_win=True, win_amount=6000.0, loss_amount=0.0, ) monitor.update() # THRIVING -> STABLE (lose 3000, balance ~11000 which is stable) tracker._update_balance(-3000.0, "Partial loss") changes = monitor.update() assert len(changes) == 1 assert changes[0].old_status == SurvivalStatus.THRIVING assert changes[0].new_status == SurvivalStatus.STABLE # Check history has both transitions history = monitor.get_status_changes("test_agent") assert len(history) == 2 def test_save_report_text(self, tmp_path) -> None: """Test saving report in text format.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) filepath = tmp_path / "report.txt" monitor.save_report(filepath, format="text") assert filepath.exists() content = filepath.read_text() assert "OpenClaw Agent Status Report" in content def test_save_report_json(self, tmp_path) -> None: """Test saving report in JSON format.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) filepath = tmp_path / "report.json" monitor.save_report(filepath, format="json") assert filepath.exists() content = filepath.read_text() assert "total_agents" in content assert "timestamp" in content def test_save_report_creates_directories(self, tmp_path) -> None: """Test that save_report creates parent directories.""" monitor = StatusMonitor() tracker = TradingEconomicTracker("test_agent", initial_capital=10000.0) monitor.register_agent("test_agent", tracker) nested_path = tmp_path / "subdir" / "nested" / "report.txt" monitor.save_report(nested_path, format="text") assert nested_path.exists()