"""Unit tests for the log analyzer module.""" import json from datetime import datetime, timedelta from pathlib import Path import pytest from openclaw.monitoring.log_analyzer import ( ErrorPattern, LogAnalyzer, LogEntry, LogReport, ) class TestLogEntry: """Tests for LogEntry dataclass.""" def test_log_entry_creation(self) -> None: """Test creating a LogEntry.""" entry = LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test message", module="test_module", function="test_function", line=42, ) assert entry.level == "INFO" assert entry.message == "Test message" assert entry.agent_id is None assert entry.trade_id is None def test_log_entry_with_extra(self) -> None: """Test LogEntry with extra fields.""" entry = LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Trade executed", module="trader", function="execute_trade", line=100, extra={"agent_id": "trader-001", "trade_id": "T001"}, ) assert entry.agent_id == "trader-001" assert entry.trade_id == "T001" def test_log_entry_to_dict(self) -> None: """Test converting LogEntry to dictionary.""" entry = LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="ERROR", message="Error occurred", module="test", function="run", line=10, extra={"key": "value"}, ) data = entry.to_dict() assert data["level"] == "ERROR" assert data["message"] == "Error occurred" assert data["extra"]["key"] == "value" def test_log_entry_matches_text(self) -> None: """Test text matching in LogEntry.""" entry = LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Trade executed successfully", module="trading.agent", function="execute", line=50, extra={"trade_id": "T123"}, ) assert entry.matches_text("trade") assert entry.matches_text("executed") assert entry.matches_text("T123") assert entry.matches_text("trading.agent") assert not entry.matches_text("failure") class TestErrorPattern: """Tests for ErrorPattern dataclass.""" def test_error_pattern_creation(self) -> None: """Test creating an ErrorPattern.""" pattern = ErrorPattern( pattern="module:function", count=5, sample_messages=["Error 1", "Error 2"], ) assert pattern.pattern == "module:function" assert pattern.count == 5 assert len(pattern.sample_messages) == 2 def test_error_pattern_to_dict(self) -> None: """Test converting ErrorPattern to dictionary.""" pattern = ErrorPattern( pattern="test:func", count=3, sample_messages=["msg1"], affected_agents={"agent1", "agent2"}, ) data = pattern.to_dict() assert data["pattern"] == "test:func" assert data["count"] == 3 assert len(data["affected_agents"]) == 2 class TestLogAnalyzer: """Tests for LogAnalyzer class.""" def test_analyzer_creation(self) -> None: """Test creating a LogAnalyzer.""" analyzer = LogAnalyzer() assert analyzer.entry_count == 0 assert analyzer.time_range is None def test_analyzer_with_custom_log_dir(self) -> None: """Test creating analyzer with custom log directory.""" analyzer = LogAnalyzer(log_dir="/custom/logs") assert str(analyzer.log_dir) == "/custom/logs" def test_parse_jsonl_line_valid(self) -> None: """Test parsing valid JSONL line.""" analyzer = LogAnalyzer() line = json.dumps({ "timestamp": "2024-01-15T10:30:00", "level": "INFO", "message": "Test message", "module": "test_module", "function": "test_func", "line": 42, }) entry = analyzer._parse_jsonl_line(line) assert entry is not None assert entry.level == "INFO" assert entry.message == "Test message" assert entry.timestamp == datetime(2024, 1, 15, 10, 30, 0) def test_parse_jsonl_line_with_extra(self) -> None: """Test parsing JSONL line with extra fields.""" analyzer = LogAnalyzer() line = json.dumps({ "timestamp": "2024-01-15T10:30:00", "level": "INFO", "message": "Trade done", "module": "trader", "function": "trade", "line": 10, "extra": {"agent_id": "A1", "trade_id": "T1"}, }) entry = analyzer._parse_jsonl_line(line) assert entry is not None assert entry.agent_id == "A1" assert entry.trade_id == "T1" def test_parse_jsonl_line_invalid(self) -> None: """Test parsing invalid JSONL line.""" analyzer = LogAnalyzer() entry = analyzer._parse_jsonl_line("not valid json") assert entry is None def test_add_entry(self) -> None: """Test adding a single entry.""" analyzer = LogAnalyzer() entry = LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test", module="test", function="run", line=1, extra={"agent_id": "agent1"}, ) analyzer.add_entry(entry) assert analyzer.entry_count == 1 def test_filter_by_agent(self) -> None: """Test filtering by agent ID.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Msg 1", module="test", function="run", line=1, extra={"agent_id": "agent1"}, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 1, 0), level="INFO", message="Msg 2", module="test", function="run", line=1, extra={"agent_id": "agent2"}, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 2, 0), level="INFO", message="Msg 3", module="test", function="run", line=1, extra={"agent_id": "agent1"}, )) results = analyzer.filter_by_agent("agent1") assert len(results) == 2 assert all(e.agent_id == "agent1" for e in results) def test_filter_by_level(self) -> None: """Test filtering by log level.""" analyzer = LogAnalyzer() for level in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]: analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level=level, message=f"{level} message", module="test", function="run", line=1, )) errors = analyzer.filter_by_level("ERROR") assert len(errors) == 1 assert errors[0].level == "ERROR" def test_filter_by_level_min_level(self) -> None: """Test filtering with min_level flag.""" analyzer = LogAnalyzer() for level in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]: analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level=level, message=f"{level} message", module="test", function="run", line=1, )) # Get ERROR and above (ERROR, CRITICAL) results = analyzer.filter_by_level("ERROR", min_level=True) assert len(results) == 2 assert all(e.level in ["ERROR", "CRITICAL"] for e in results) def test_filter_by_time_range(self) -> None: """Test filtering by time range.""" analyzer = LogAnalyzer() base_time = datetime(2024, 1, 1, 10, 0, 0) for i in range(10): analyzer.add_entry(LogEntry( timestamp=base_time + timedelta(minutes=i), level="INFO", message=f"Msg {i}", module="test", function="run", line=1, )) results = analyzer.filter_by_time_range( start_time=base_time + timedelta(minutes=3), end_time=base_time + timedelta(minutes=6), ) assert len(results) == 4 # Minutes 3, 4, 5, 6 def test_search_logs(self) -> None: """Test full-text search.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Trade executed successfully", module="trading", function="execute", line=1, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 1, 0), level="ERROR", message="Trade failed", module="trading", function="execute", line=1, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 2, 0), level="INFO", message="Market data updated", module="market", function="update", line=1, )) results = analyzer.search_logs("trade") assert len(results) == 2 results = analyzer.search_logs("failed") assert len(results) == 1 def test_search_logs_with_filters(self) -> None: """Test search with additional filters.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Trade executed", module="trading", function="run", line=1, extra={"agent_id": "agent1"}, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 1, 0), level="ERROR", message="Trade executed with error", module="trading", function="run", line=1, extra={"agent_id": "agent2"}, )) results = analyzer.search_logs("trade", filters={"level": "ERROR"}) assert len(results) == 1 assert results[0].level == "ERROR" def test_get_trade_audit_trail(self) -> None: """Test getting trade audit trail.""" analyzer = LogAnalyzer() base_time = datetime(2024, 1, 1, 10, 0, 0) analyzer.add_entry(LogEntry( timestamp=base_time, level="INFO", message="Trade initiated", module="trading", function="initiate", line=1, extra={"trade_id": "T001"}, )) analyzer.add_entry(LogEntry( timestamp=base_time + timedelta(minutes=1), level="INFO", message="Trade validated", module="trading", function="validate", line=1, extra={"trade_id": "T001"}, )) analyzer.add_entry(LogEntry( timestamp=base_time + timedelta(minutes=2), level="INFO", message="Trade executed", module="trading", function="execute", line=1, extra={"trade_id": "T001"}, )) analyzer.add_entry(LogEntry( timestamp=base_time + timedelta(minutes=3), level="INFO", message="Different trade", module="trading", function="run", line=1, extra={"trade_id": "T002"}, )) trail = analyzer.get_trade_audit_trail("T001") assert len(trail) == 3 assert trail[0].message == "Trade initiated" assert trail[2].message == "Trade executed" def test_get_error_stats(self) -> None: """Test error statistics.""" analyzer = LogAnalyzer() for i in range(5): analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, i, 0), level="ERROR", message=f"Connection error {i}", module="network", function="connect", line=10, extra={"agent_id": f"agent{i % 2}"}, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 10, 0), level="CRITICAL", message="System failure", module="system", function="main", line=1, )) stats = analyzer.get_error_stats() assert stats["total_errors"] == 5 assert stats["total_critical"] == 1 assert len(stats["top_patterns"]) > 0 def test_get_agent_activity(self) -> None: """Test getting agent activity summary.""" analyzer = LogAnalyzer() base_time = datetime(2024, 1, 1, 10, 0, 0) for i in range(5): analyzer.add_entry(LogEntry( timestamp=base_time + timedelta(minutes=i), level="INFO" if i < 3 else "ERROR", message=f"Message {i}", module="test", function="run", line=1, extra={"agent_id": "agent1"}, )) activity = analyzer.get_agent_activity("agent1") assert activity["agent_id"] == "agent1" assert activity["total_entries"] == 5 assert activity["level_counts"]["INFO"] == 3 assert activity["level_counts"]["ERROR"] == 2 assert activity["error_count"] == 2 def test_generate_log_report(self) -> None: """Test generating log report.""" analyzer = LogAnalyzer() base_time = datetime(2024, 1, 1, 10, 0, 0) for i in range(10): analyzer.add_entry(LogEntry( timestamp=base_time + timedelta(minutes=i), level="INFO" if i < 8 else "ERROR", message=f"Message {i}", module="test", function="run", line=1, extra={"agent_id": f"agent{i % 2}"}, )) report = analyzer.generate_log_report() assert isinstance(report, LogReport) assert report.total_entries == 10 assert report.level_counts["INFO"] == 8 assert report.level_counts["ERROR"] == 2 assert len(report.agent_counts) == 2 def test_export_to_csv(self, tmp_path: Path) -> None: """Test exporting to CSV.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test message", module="test", function="run", line=1, extra={"agent_id": "agent1", "trade_id": "T1"}, )) filepath = tmp_path / "logs.csv" analyzer.export_to_csv(filepath) assert filepath.exists() content = filepath.read_text() assert "timestamp,level,message" in content assert "Test message" in content assert "agent1" in content def test_export_to_json(self, tmp_path: Path) -> None: """Test exporting to JSON.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test message", module="test", function="run", line=1, )) filepath = tmp_path / "logs.json" analyzer.export_to_json(filepath) assert filepath.exists() data = json.loads(filepath.read_text()) assert len(data) == 1 assert data[0]["message"] == "Test message" def test_get_unique_agents(self) -> None: """Test getting unique agent IDs.""" analyzer = LogAnalyzer() for agent_id in ["agent2", "agent1", "agent3", "agent1"]: analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test", module="test", function="run", line=1, extra={"agent_id": agent_id}, )) agents = analyzer.get_unique_agents() assert agents == ["agent1", "agent2", "agent3"] def test_get_unique_trades(self) -> None: """Test getting unique trade IDs.""" analyzer = LogAnalyzer() for trade_id in ["T2", "T1", "T3", "T1"]: analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test", module="test", function="run", line=1, extra={"trade_id": trade_id}, )) trades = analyzer.get_unique_trades() assert trades == ["T1", "T2", "T3"] def test_clear(self) -> None: """Test clearing all logs.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="Test", module="test", function="run", line=1, )) assert analyzer.entry_count == 1 analyzer.clear() assert analyzer.entry_count == 0 assert analyzer.time_range is None def test_time_range_property(self) -> None: """Test time_range property.""" analyzer = LogAnalyzer() analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 10, 0, 0), level="INFO", message="First", module="test", function="run", line=1, )) analyzer.add_entry(LogEntry( timestamp=datetime(2024, 1, 1, 11, 0, 0), level="INFO", message="Last", module="test", function="run", line=1, )) time_range = analyzer.time_range assert time_range is not None assert time_range[0] == datetime(2024, 1, 1, 10, 0, 0) assert time_range[1] == datetime(2024, 1, 1, 11, 0, 0) def test_load_logs_from_directory(self, tmp_path: Path) -> None: """Test loading logs from directory.""" log_dir = tmp_path / "logs" log_dir.mkdir() log_file = log_dir / "openclaw_2024-01-15.jsonl" with open(log_file, "w") as f: for i in range(5): entry = { "timestamp": f"2024-01-15T10:{i:02d}:00", "level": "INFO", "message": f"Message {i}", "module": "test", "function": "run", "line": i, "extra": {"agent_id": f"agent{i}"}, } f.write(json.dumps(entry) + "\n") analyzer = LogAnalyzer(log_dir=str(log_dir)) count = analyzer.load_logs() assert count == 5 assert analyzer.entry_count == 5 def test_load_logs_with_date_filter(self, tmp_path: Path) -> None: """Test loading logs with date filter.""" log_dir = tmp_path / "logs" log_dir.mkdir() # Create log for Jan 15 log_file1 = log_dir / "openclaw_2024-01-15.jsonl" with open(log_file1, "w") as f: entry = { "timestamp": "2024-01-15T10:00:00", "level": "INFO", "message": "Jan 15 log", "module": "test", "function": "run", "line": 1, } f.write(json.dumps(entry) + "\n") # Create log for Jan 20 log_file2 = log_dir / "openclaw_2024-01-20.jsonl" with open(log_file2, "w") as f: entry = { "timestamp": "2024-01-20T10:00:00", "level": "INFO", "message": "Jan 20 log", "module": "test", "function": "run", "line": 1, } f.write(json.dumps(entry) + "\n") analyzer = LogAnalyzer(log_dir=str(log_dir)) count = analyzer.load_logs( start_date=datetime(2024, 1, 15), end_date=datetime(2024, 1, 15), ) assert count == 1 assert analyzer._entries[0].message == "Jan 15 log" def test_load_logs_nonexistent_directory(self) -> None: """Test loading from non-existent directory.""" analyzer = LogAnalyzer(log_dir="/nonexistent/path") count = analyzer.load_logs() assert count == 0 def test_generate_log_report_empty(self) -> None: """Test generating report with no entries.""" analyzer = LogAnalyzer() report = analyzer.generate_log_report() assert report.total_entries == 0 assert "No log entries" in report.summary class TestLogReport: """Tests for LogReport dataclass.""" def test_report_to_dict(self) -> None: """Test converting LogReport to dictionary.""" report = LogReport( start_time=datetime(2024, 1, 1, 10, 0, 0), end_time=datetime(2024, 1, 1, 11, 0, 0), total_entries=100, level_counts={"INFO": 90, "ERROR": 10}, agent_counts={"agent1": 50, "agent2": 50}, error_patterns=[], summary="Test summary", ) data = report.to_dict() assert data["total_entries"] == 100 assert data["level_counts"]["INFO"] == 90 def test_report_to_json(self) -> None: """Test converting LogReport to JSON.""" report = LogReport( start_time=datetime(2024, 1, 1, 10, 0, 0), end_time=datetime(2024, 1, 1, 11, 0, 0), total_entries=100, level_counts={"INFO": 90}, agent_counts={}, error_patterns=[], ) json_str = report.to_json() assert "total_entries" in json_str assert "100" in json_str