# -*- coding: utf-8 -*- """Evaluation hooks system for skills. Provides evaluation metric collection and storage for skill performance tracking. Based on the evaluation hooks design in SKILL_TEMPLATE.md. """ from __future__ import annotations import json import logging from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Set logger = logging.getLogger(__name__) class MetricType(Enum): """Types of evaluation metrics.""" HIT_RATE = "hit_rate" # 信号命中率 RISK_VIOLATION = "risk_violation" # 风控违例率 POSITION_DEVIATION = "position_deviation" # 仓位偏离率 PnL_ATTRIBUTION = "pnl_attribution" # P&L 归因一致性 SIGNAL_CONSISTENCY = "signal_consistency" # 信号一致性 DECISION_LATENCY = "decision_latency" # 决策延迟 TOOL_USAGE = "tool_usage" # 工具使用率 CUSTOM = "custom" # 自定义指标 @dataclass class EvaluationMetric: """A single evaluation metric.""" name: str metric_type: MetricType value: float timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "metric_type": self.metric_type.value, "value": self.value, "timestamp": self.timestamp, "metadata": self.metadata, } @dataclass class EvaluationResult: """Evaluation result for a skill execution.""" skill_name: str run_id: str agent_id: str metrics: List[EvaluationMetric] = field(default_factory=list) inputs: Dict[str, Any] = field(default_factory=dict) outputs: Dict[str, Any] = field(default_factory=dict) decision: Optional[str] = None success: bool = True error_message: Optional[str] = None started_at: Optional[str] = None completed_at: Optional[str] = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> Dict[str, Any]: return { "skill_name": self.skill_name, "run_id": self.run_id, "agent_id": self.agent_id, "metrics": [m.to_dict() for m in self.metrics], "inputs": self.inputs, "outputs": self.outputs, "decision": self.decision, "success": self.success, "error_message": self.error_message, "started_at": self.started_at, "completed_at": self.completed_at, } class EvaluationHook: """Hook for collecting skill evaluation metrics. This hook collects and stores evaluation metrics after skill execution for later analysis and memory/reflection stages. """ def __init__( self, storage_dir: Path, run_id: str, agent_id: str, ): """Initialize evaluation hook. Args: storage_dir: Directory to store evaluation results run_id: Current run identifier agent_id: Current agent identifier """ self.storage_dir = Path(storage_dir) self.run_id = run_id self.agent_id = agent_id self._current_evaluation: Optional[EvaluationResult] = None def start_evaluation( self, skill_name: str, inputs: Dict[str, Any], ) -> None: """Start a new evaluation session. Args: skill_name: Name of the skill being evaluated inputs: Input parameters for the skill """ self._current_evaluation = EvaluationResult( skill_name=skill_name, run_id=self.run_id, agent_id=self.agent_id, inputs=inputs, started_at=datetime.now().isoformat(), ) logger.debug(f"Started evaluation for skill: {skill_name}") def add_metric( self, name: str, metric_type: MetricType, value: float, metadata: Optional[Dict[str, Any]] = None, ) -> None: """Add an evaluation metric. Args: name: Metric name metric_type: Type of metric value: Metric value metadata: Additional metadata """ if self._current_evaluation is None: logger.warning("No active evaluation session, ignoring metric") return metric = EvaluationMetric( name=name, metric_type=metric_type, value=value, metadata=metadata or {}, ) self._current_evaluation.metrics.append(metric) logger.debug(f"Added metric: {name} = {value}") def add_metrics(self, metrics: List[EvaluationMetric]) -> None: """Add multiple evaluation metrics at once. Args: metrics: List of metrics to add """ if self._current_evaluation is None: logger.warning("No active evaluation session, ignoring metrics") return self._current_evaluation.metrics.extend(metrics) def record_outputs(self, outputs: Dict[str, Any]) -> None: """Record skill outputs. Args: outputs: Output from skill execution """ if self._current_evaluation is None: logger.warning("No active evaluation session, ignoring outputs") return self._current_evaluation.outputs = outputs def record_decision(self, decision: str) -> None: """Record the final decision. Args: decision: Final decision made by the skill """ if self._current_evaluation is None: logger.warning("No active evaluation session, ignoring decision") return self._current_evaluation.decision = decision def complete_evaluation( self, success: bool = True, error_message: Optional[str] = None, ) -> Optional[EvaluationResult]: """Complete the evaluation session and persist results. Args: success: Whether the skill execution was successful error_message: Error message if failed Returns: The completed evaluation result, or None if no active evaluation """ if self._current_evaluation is None: logger.warning("No active evaluation to complete") return None self._current_evaluation.success = success self._current_evaluation.error_message = error_message self._current_evaluation.completed_at = datetime.now().isoformat() # Persist to storage result = self._persist_evaluation(self._current_evaluation) self._current_evaluation = None logger.debug(f"Completed evaluation for skill: {result.skill_name}") return result def _persist_evaluation(self, evaluation: EvaluationResult) -> EvaluationResult: """Persist evaluation result to storage. Args: evaluation: Evaluation result to persist Returns: The persisted evaluation """ # Create run-specific directory run_dir = self.storage_dir / self.run_id run_dir.mkdir(parents=True, exist_ok=True) # Create agent-specific subdirectory agent_dir = run_dir / self.agent_id agent_dir.mkdir(parents=True, exist_ok=True) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"{evaluation.skill_name}_{timestamp}.json" filepath = agent_dir / filename # Write evaluation result try: with open(filepath, "w", encoding="utf-8") as f: json.dump(evaluation.to_dict(), f, ensure_ascii=False, indent=2) logger.info(f"Persisted evaluation to: {filepath}") except Exception as e: logger.error(f"Failed to persist evaluation: {e}") return evaluation def cancel_evaluation(self) -> None: """Cancel the current evaluation session without saving.""" if self._current_evaluation is not None: logger.debug(f"Cancelled evaluation for: {self._current_evaluation.skill_name}") self._current_evaluation = None class EvaluationCollector: """Collector for aggregating evaluation metrics across runs. Provides methods to query and analyze evaluation results. """ def __init__(self, storage_dir: Path): """Initialize evaluation collector. Args: storage_dir: Root directory containing evaluation results """ self.storage_dir = Path(storage_dir) def get_run_evaluations( self, run_id: str, agent_id: Optional[str] = None, ) -> List[EvaluationResult]: """Get all evaluations for a run. Args: run_id: Run identifier agent_id: Optional agent identifier to filter by Returns: List of evaluation results """ run_dir = self.storage_dir / run_id if not run_dir.exists(): return [] evaluations = [] agent_dirs = [run_dir / agent_id] if agent_id else run_dir.iterdir() for agent_dir in agent_dirs: if not agent_dir.is_dir(): continue for eval_file in agent_dir.glob("*.json"): try: with open(eval_file, "r", encoding="utf-8") as f: data = json.load(f) evaluations.append(self._parse_evaluation(data)) except Exception as e: logger.warning(f"Failed to load evaluation {eval_file}: {e}") return evaluations def get_skill_metrics( self, skill_name: str, run_ids: Optional[List[str]] = None, ) -> List[EvaluationMetric]: """Get all metrics for a specific skill. Args: skill_name: Name of the skill run_ids: Optional list of run IDs to filter by Returns: List of metrics for the skill """ metrics = [] if run_ids is None: run_ids = [d.name for d in self.storage_dir.iterdir() if d.is_dir()] for run_id in run_ids: evaluations = self.get_run_evaluations(run_id) for eval_result in evaluations: if eval_result.skill_name == skill_name: metrics.extend(eval_result.metrics) return metrics def calculate_skill_stats( self, skill_name: str, metric_type: MetricType, run_ids: Optional[List[str]] = None, ) -> Dict[str, float]: """Calculate statistics for a specific metric type. Args: skill_name: Name of the skill metric_type: Type of metric to calculate run_ids: Optional list of run IDs to filter by Returns: Dictionary with min, max, avg, count statistics """ metrics = self.get_skill_metrics(skill_name, run_ids) filtered = [m for m in metrics if m.metric_type == metric_type] if not filtered: return {"count": 0} values = [m.value for m in filtered] return { "count": len(values), "min": min(values), "max": max(values), "avg": sum(values) / len(values), } def _parse_evaluation(self, data: Dict[str, Any]) -> EvaluationResult: """Parse evaluation data into EvaluationResult. Args: data: Raw evaluation data Returns: Parsed EvaluationResult """ metrics = [] for m in data.get("metrics", []): metrics.append(EvaluationMetric( name=m["name"], metric_type=MetricType(m["metric_type"]), value=m["value"], timestamp=m.get("timestamp", ""), metadata=m.get("metadata", {}), )) return EvaluationResult( skill_name=data["skill_name"], run_id=data["run_id"], agent_id=data["agent_id"], metrics=metrics, inputs=data.get("inputs", {}), outputs=data.get("outputs", {}), decision=data.get("decision"), success=data.get("success", True), error_message=data.get("error_message"), started_at=data.get("started_at"), completed_at=data.get("completed_at"), ) def parse_evaluation_hooks(skill_dir: Path) -> Dict[str, Any]: """Parse evaluation hooks from SKILL.md. Extracts the Optional: Evaluation hooks section from skill documentation. Args: skill_dir: Skill directory path Returns: Dictionary containing evaluation hook definitions """ skill_md = skill_dir / "SKILL.md" if not skill_md.exists(): return {} try: content = skill_md.read_text(encoding="utf-8") # Extract evaluation hooks section if "## Optional: Evaluation hooks" in content: start = content.find("## Optional: Evaluation hooks") # Find the next ## section or end of file next_section = content.find("\n## ", start + 1) if next_section == -1: eval_section = content[start:] else: eval_section = content[start:next_section] # Parse metrics from the section metrics = [] for metric_type in MetricType: if metric_type.value.replace("_", " ") in eval_section.lower(): metrics.append(metric_type.value) return { "supported_metrics": metrics, "section_content": eval_section.strip(), } except Exception as e: logger.warning(f"Failed to parse evaluation hooks: {e}") return {} __all__ = [ "MetricType", "EvaluationMetric", "EvaluationResult", "EvaluationHook", "EvaluationCollector", "parse_evaluation_hooks", ]