"""Tests for monitoring system components. This module contains tests for StatusMonitor, MetricsCollector, and SystemMonitor. """ import time from datetime import datetime from unittest.mock import MagicMock, patch import pytest from openclaw.core.economy import SurvivalStatus, TradingEconomicTracker from openclaw.monitoring.metrics import ( Counter, Gauge, Histogram, MetricLabel, MetricValue, MetricsCollector, ) from openclaw.monitoring.status import ( AgentStatusSnapshot, StatusChange, StatusMonitor, StatusReport, ) from openclaw.monitoring.system import ( AgentPerformanceMetrics, AlertThresholds, HealthStatus, SystemMetrics, SystemMonitor, ) class TestStatusMonitor: """Tests for StatusMonitor class.""" def test_monitor_initialization(self): """Test StatusMonitor initialization.""" monitor = StatusMonitor() assert monitor.agent_count == 0 assert monitor.bankrupt_count == 0 assert monitor.thriving_count == 0 def test_register_agent(self): """Test registering an agent.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) monitor.register_agent("test_agent", tracker) assert monitor.agent_count == 1 def test_unregister_agent(self): """Test unregistering an agent.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) monitor.register_agent("test_agent", tracker) assert monitor.agent_count == 1 monitor.unregister_agent("test_agent") assert monitor.agent_count == 0 def test_get_snapshot(self): """Test getting agent status snapshot.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) monitor.register_agent("test_agent", tracker) snapshot = monitor.get_snapshot("test_agent") assert snapshot is not None assert snapshot.agent_id == "test_agent" # Initial balance is 10000, but some costs are deducted during initialization # So we just verify the status is valid, not a specific value assert snapshot.balance > 0 assert snapshot.initial_capital == 10000.0 assert isinstance(snapshot.status, SurvivalStatus) def test_get_snapshot_unregistered(self): """Test getting snapshot for unregistered agent returns None.""" monitor = StatusMonitor() snapshot = monitor.get_snapshot("nonexistent") assert snapshot is None def test_get_all_snapshots(self): """Test getting all agent snapshots.""" monitor = StatusMonitor() tracker1 = TradingEconomicTracker(agent_id="agent1", initial_capital=10000.0) tracker2 = TradingEconomicTracker(agent_id="agent2", initial_capital=5000.0) monitor.register_agent("agent1", tracker1) monitor.register_agent("agent2", tracker2) snapshots = monitor.get_all_snapshots() assert len(snapshots) == 2 assert {s.agent_id for s in snapshots} == {"agent1", "agent2"} def test_update_detects_status_change(self): """Test update detects status changes.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) monitor.register_agent("test_agent", tracker) # Simulate balance drop to critical level tracker.balance = 3500.0 # Below 50% but above 30% changes = monitor.update() # First update sets initial status, no change recorded yet assert len(changes) == 1 assert changes[0].agent_id == "test_agent" assert changes[0].new_status == SurvivalStatus.CRITICAL def test_get_status_changes(self): """Test getting status change history.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) monitor.register_agent("test_agent", tracker) tracker.balance = 3500.0 monitor.update() changes = monitor.get_status_changes("test_agent") assert len(changes) == 1 assert isinstance(changes[0], StatusChange) def test_get_status_changes_all_agents(self): """Test getting status changes for all agents.""" monitor = StatusMonitor() tracker1 = TradingEconomicTracker(agent_id="agent1", initial_capital=10000.0) tracker2 = TradingEconomicTracker(agent_id="agent2", initial_capital=10000.0) monitor.register_agent("agent1", tracker1) monitor.register_agent("agent2", tracker2) tracker1.balance = 3500.0 tracker2.balance = 3500.0 monitor.update() all_changes = monitor.get_status_changes() assert len(all_changes) == 2 def test_generate_report(self): """Test generating status report.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) monitor.register_agent("test_agent", tracker) report = monitor.generate_report() assert isinstance(report, StatusReport) assert report.total_agents == 1 assert SurvivalStatus.STABLE in report.status_counts def test_report_summary_all_thriving(self): """Test report summary when all agents thriving.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) tracker.balance = 20000.0 # 200% of initial monitor.register_agent("test_agent", tracker) report = monitor.generate_report() assert "thriving" in report.summary.lower() def test_report_summary_bankrupt(self): """Test report summary with bankrupt agents.""" monitor = StatusMonitor() tracker = TradingEconomicTracker( agent_id="test_agent", initial_capital=10000.0, ) tracker.balance = 1000.0 # Below 30% monitor.register_agent("test_agent", tracker) report = monitor.generate_report() assert "bankrupt" in report.summary.lower() assert "ALERT" in report.summary def test_bankrupt_and_thriving_counts(self): """Test bankrupt and thriving count properties.""" monitor = StatusMonitor() # Thriving agent tracker1 = TradingEconomicTracker(agent_id="agent1", initial_capital=10000.0) tracker1.balance = 20000.0 # Bankrupt agent tracker2 = TradingEconomicTracker(agent_id="agent2", initial_capital=10000.0) tracker2.balance = 1000.0 monitor.register_agent("agent1", tracker1) monitor.register_agent("agent2", tracker2) assert monitor.thriving_count == 1 assert monitor.bankrupt_count == 1 class TestMetricsCollector: """Tests for MetricsCollector class.""" def test_collector_initialization(self): """Test MetricsCollector initialization.""" collector = MetricsCollector() assert collector.get_all_names() == [] def test_counter_creation(self): """Test creating a counter metric.""" collector = MetricsCollector() counter = collector.counter("requests_total", "Total requests") assert isinstance(counter, Counter) assert counter.name == "requests_total" def test_gauge_creation(self): """Test creating a gauge metric.""" collector = MetricsCollector() gauge = collector.gauge("memory_usage", "Memory usage") assert isinstance(gauge, Gauge) assert gauge.name == "memory_usage" def test_histogram_creation(self): """Test creating a histogram metric.""" collector = MetricsCollector() histogram = collector.histogram("latency_seconds", "Request latency") assert isinstance(histogram, Histogram) assert histogram.name == "latency_seconds" def test_get_existing_metric(self): """Test getting an existing metric.""" collector = MetricsCollector() counter = collector.counter("requests_total", "Total requests") retrieved = collector.get_metric("requests_total") assert retrieved is counter def test_get_nonexistent_metric(self): """Test getting a nonexistent metric returns None.""" collector = MetricsCollector() retrieved = collector.get_metric("nonexistent") assert retrieved is None def test_remove_metric(self): """Test removing a metric.""" collector = MetricsCollector() collector.counter("requests_total", "Total requests") removed = collector.remove_metric("requests_total") assert removed is True assert collector.get_metric("requests_total") is None def test_remove_nonexistent_metric(self): """Test removing a nonexistent metric returns False.""" collector = MetricsCollector() removed = collector.remove_metric("nonexistent") assert removed is False def test_clear_all_metrics(self): """Test clearing all metrics.""" collector = MetricsCollector() collector.counter("counter1", "Counter 1") collector.gauge("gauge1", "Gauge 1") collector.clear() assert collector.get_all_names() == [] def test_get_all_names(self): """Test getting all metric names.""" collector = MetricsCollector() collector.counter("counter1", "Counter 1") collector.gauge("gauge1", "Gauge 1") names = collector.get_all_names() assert set(names) == {"counter1", "gauge1"} def test_to_prometheus_empty(self): """Test Prometheus export with no metrics.""" collector = MetricsCollector() output = collector.to_prometheus() assert output == "" def test_to_prometheus_with_metrics(self): """Test Prometheus export with metrics.""" collector = MetricsCollector() counter = collector.counter("requests_total", "Total requests") counter.inc(5) output = collector.to_prometheus() assert "# HELP requests_total Total requests" in output assert "# TYPE requests_total counter" in output assert "requests_total 5" in output class TestCounter: """Tests for Counter class.""" def test_counter_initialization(self): """Test Counter initialization.""" counter = Counter("test_counter", "Test counter") assert counter.name == "test_counter" assert counter.get() == 0 def test_counter_increment(self): """Test counter increment.""" counter = Counter("test_counter", "Test counter") counter.inc() assert counter.get() == 1 def test_counter_increment_by_amount(self): """Test counter increment by specific amount.""" counter = Counter("test_counter", "Test counter") counter.inc(5) assert counter.get() == 5 def test_counter_increment_with_labels(self): """Test counter increment with labels.""" counter = Counter("requests_total", "Total requests") counter.inc(1, {"method": "GET", "status": "200"}) counter.inc(1, {"method": "POST", "status": "201"}) assert counter.get({"method": "GET", "status": "200"}) == 1 assert counter.get({"method": "POST", "status": "201"}) == 1 def test_counter_cannot_decrement(self): """Test counter cannot be decremented.""" counter = Counter("test_counter", "Test counter") with pytest.raises(ValueError, match="Counter cannot be decremented"): counter.inc(-1) def test_counter_to_prometheus(self): """Test counter Prometheus export.""" counter = Counter("test_counter", "Test counter") counter.inc(10) output = counter.to_prometheus() assert "# HELP test_counter Test counter" in output assert "# TYPE test_counter counter" in output assert "test_counter 10" in output class TestGauge: """Tests for Gauge class.""" def test_gauge_initialization(self): """Test Gauge initialization.""" gauge = Gauge("test_gauge", "Test gauge") assert gauge.name == "test_gauge" assert gauge.get() == 0 def test_gauge_set(self): """Test gauge set value.""" gauge = Gauge("test_gauge", "Test gauge") gauge.set(100.0) assert gauge.get() == 100.0 def test_gauge_increment(self): """Test gauge increment.""" gauge = Gauge("test_gauge", "Test gauge") gauge.set(100.0) gauge.inc(10.0) assert gauge.get() == 110.0 def test_gauge_decrement(self): """Test gauge decrement.""" gauge = Gauge("test_gauge", "Test gauge") gauge.set(100.0) gauge.dec(10.0) assert gauge.get() == 90.0 def test_gauge_with_labels(self): """Test gauge with labels.""" gauge = Gauge("memory_usage", "Memory usage") gauge.set(100.0, {"region": "us-east"}) gauge.set(200.0, {"region": "us-west"}) assert gauge.get({"region": "us-east"}) == 100.0 assert gauge.get({"region": "us-west"}) == 200.0 class TestHistogram: """Tests for Histogram class.""" def test_histogram_initialization(self): """Test Histogram initialization.""" histogram = Histogram("latency", "Request latency") assert histogram.name == "latency" def test_histogram_observe(self): """Test histogram observe value.""" histogram = Histogram("latency", "Request latency") histogram.observe(0.05) histogram.observe(0.1) assert histogram.get_count() == 2 def test_histogram_bucket_counts(self): """Test histogram bucket counts.""" histogram = Histogram("latency", "Request latency", buckets=[0.01, 0.1, 1.0]) histogram.observe(0.005) # Goes in first bucket histogram.observe(0.05) # Goes in second bucket histogram.observe(0.5) # Goes in third bucket counts = histogram.get_bucket_counts() assert counts[0] == (0.01, 1) # 1 value <= 0.01 assert counts[1] == (0.1, 2) # 2 values <= 0.1 assert counts[2] == (1.0, 3) # 3 values <= 1.0 def test_histogram_sum(self): """Test histogram sum calculation.""" histogram = Histogram("latency", "Request latency") histogram.observe(0.1) histogram.observe(0.2) histogram.observe(0.3) # Get sum - use default labels assert abs(histogram.get_sum() - 0.6) < 0.0001 def test_histogram_to_prometheus(self): """Test histogram Prometheus export.""" histogram = Histogram("latency", "Request latency", buckets=[0.1, 0.5]) histogram.observe(0.05) output = histogram.to_prometheus() assert "# HELP latency Request latency" in output assert "# TYPE latency histogram" in output assert "latency_bucket" in output class TestSystemMonitor: """Tests for SystemMonitor class.""" def test_monitor_initialization(self): """Test SystemMonitor initialization.""" monitor = SystemMonitor() assert monitor.is_running is False assert isinstance(monitor.thresholds, AlertThresholds) def test_record_agent_decision(self): """Test recording agent decision.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) metrics = monitor.get_agent_metrics("agent1") assert len(metrics) == 1 assert metrics[0].decision_count == 1 assert metrics[0].avg_response_time == 0.5 def test_record_multiple_decisions(self): """Test recording multiple agent decisions.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) monitor.record_agent_decision("agent1", 1.5) metrics = monitor.get_agent_metrics("agent1") assert metrics[0].decision_count == 2 assert metrics[0].avg_response_time == 1.0 # (0.5 + 1.5) / 2 def test_record_agent_error(self): """Test recording agent error.""" monitor = SystemMonitor() monitor.record_agent_error("agent1") metrics = monitor.get_agent_metrics("agent1") assert metrics[0].error_count == 1 def test_get_all_agent_metrics(self): """Test getting metrics for all agents.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) monitor.record_agent_decision("agent2", 0.3) all_metrics = monitor.get_agent_metrics() assert len(all_metrics) == 2 def test_unregister_agent(self): """Test unregistering an agent.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) assert len(monitor.get_agent_metrics()) == 1 monitor.unregister_agent("agent1") assert len(monitor.get_agent_metrics()) == 0 def test_reset_agent_metrics_single(self): """Test resetting metrics for single agent.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) monitor.record_agent_decision("agent2", 0.3) monitor.reset_agent_metrics("agent1") metrics1 = monitor.get_agent_metrics("agent1") metrics2 = monitor.get_agent_metrics("agent2") assert metrics1[0].decision_count == 0 assert metrics2[0].decision_count == 1 def test_reset_agent_metrics_all(self): """Test resetting metrics for all agents.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) monitor.record_agent_decision("agent2", 0.3) monitor.reset_agent_metrics() assert len(monitor.get_agent_metrics()) == 0 def test_alert_thresholds_custom(self): """Test custom alert thresholds.""" thresholds = AlertThresholds( cpu_warning=60.0, cpu_critical=85.0, memory_warning=75.0, memory_critical=90.0, ) monitor = SystemMonitor(thresholds=thresholds) assert monitor.thresholds.cpu_warning == 60.0 assert monitor.thresholds.cpu_critical == 85.0 def test_get_agent_summary_empty(self): """Test agent summary with no agents.""" monitor = SystemMonitor() summary = monitor.get_agent_summary() assert summary["total_agents"] == 0 assert summary["total_decisions"] == 0 assert summary["total_errors"] == 0 def test_get_agent_summary_with_agents(self): """Test agent summary with registered agents.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) monitor.record_agent_decision("agent1", 0.3) monitor.record_agent_error("agent1") summary = monitor.get_agent_summary() assert summary["total_agents"] == 1 assert summary["total_decisions"] == 2 assert summary["total_errors"] == 1 def test_prometheus_metrics_export(self): """Test Prometheus metrics export.""" monitor = SystemMonitor() monitor.record_agent_decision("agent1", 0.5) output = monitor.get_prometheus_metrics() assert "openclaw_agent_decisions_total" in output assert "openclaw_agent_response_time_seconds" in output