# -*- coding: utf-8 -*- """ Autonomous Policy Optimizer (APO) Automatically tunes agent policies based on performance feedback. """ import logging import json import os from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from agentscope.message import Msg from backend.llm.models import get_agent_model, get_agent_formatter from backend.agents.workspace_manager import WorkspaceManager logger = logging.getLogger(__name__) class PolicyOptimizer: """ PolicyOptimizer analyzes trading performance and automatically updates agent workspace files (POLICY.md, AGENTS.md) to improve future results. """ def __init__(self, config_name: str, project_root: Optional[Path] = None): self.config_name = config_name self.workspace_manager = WorkspaceManager(project_root=project_root) # Use a high-capability model for the optimizer (meta-agent) self.model = get_agent_model("portfolio_manager") self.formatter = get_agent_formatter("portfolio_manager") async def run_optimization( self, date: str, reflection_content: str, settlement_result: Dict[str, Any], analyst_results: List[Dict[str, Any]], decisions: Dict[str, Dict], ) -> Dict[str, Any]: """ Run the optimization loop if performance indicates a need for change. """ total_pnl = settlement_result.get("portfolio_value", 0) - 100000.0 # Assuming 100k initial # You might want to use a more sophisticated trigger, like 3 consecutive losses if total_pnl >= 0: logger.info(f"APO: Positive P&L (${total_pnl:,.2f}) for {date}, skipping optimization.") return {"status": "skipped", "reason": "positive_pnl"} logger.info(f"APO: Negative P&L (${total_pnl:,.2f}) detected for {date}. Starting optimization...") # 1. Identify underperforming agents or logic # 2. Generate policy updates # 3. Apply updates optimizations = [] # Focus on agents that gave high confidence but wrong direction underperformers = self._identify_underperformers(settlement_result, analyst_results) for agent_id in underperformers: update = await self._generate_policy_update( agent_id, date, reflection_content, settlement_result, analyst_results, decisions ) if update: self._apply_update(agent_id, update) optimizations.append({ "agent_id": agent_id, "file": update.get("file", "POLICY.md"), "change": update.get("change", "") }) return { "status": "completed", "date": date, "total_pnl": total_pnl, "optimizations": optimizations } def _identify_underperformers( self, settlement_result: Dict[str, Any], analyst_results: List[Dict[str, Any]] ) -> List[str]: """Identify which agents might need policy adjustments.""" underperformers = [] # Simple logic: if the overall day was a loss, all active analysts might need a check, # but specifically those whose predictions didn't match the market. # For now, let's include all analysts involved in the day. for result in analyst_results: agent_id = result.get("agent") if agent_id: underperformers.append(agent_id) # Also include PM and Risk Manager as they are critical underperformers.append("portfolio_manager") underperformers.append("risk_manager") return list(set(underperformers)) async def _generate_policy_update( self, agent_id: str, date: str, reflection_content: str, settlement_result: Dict[str, Any], analyst_results: List[Dict[str, Any]], decisions: Dict[str, Dict], ) -> Optional[Dict[str, str]]: """Use LLM to generate a specific policy update for an agent.""" # Load current policy try: current_policy = self.workspace_manager.load_agent_file( config_name=self.config_name, agent_id=agent_id, filename="POLICY.md" ) except Exception: current_policy = "No existing policy found." prompt = f""" As an Expert Meta-Optimizer for a multi-agent trading system, your task is to update the operational POLICY for an agent named '{agent_id}' based on recent performance failures. [Current Context] Date: {date} Daily Reflection: {reflection_content} [Agent's Current POLICY.md] {current_policy} [Task] Analyze why the system failed (loss occurred). Identify what '{agent_id}' could have done differently or what new constraint/heuristic should be added to its policy to prevent similar mistakes in the future. Provide a specific, concise addition or modification to the POLICY.md file. The output MUST be a JSON object with: 1. "reasoning": Brief explanation of why this change is needed. 2. "file": Always "POLICY.md". 3. "change": The EXACT markdown text to APPEND or REPLACE in the file. Keep it in Chinese as the system uses Chinese prompts. Output ONLY the JSON object. """ msg = Msg(name="system", content=prompt, role="user") response = await self.model.reply(msg) content = response.content if isinstance(content, list): content = content[0].get("text", "") # Clean JSON if wrapped in markdown if "```json" in content: content = content.split("```json")[1].split("```")[0].strip() try: return json.loads(content) except Exception as e: logger.error(f"APO: Failed to parse optimization response for {agent_id}: {e}") return None def _apply_update(self, agent_id: str, update: Dict[str, str]) -> None: """Apply the suggested update to the agent's workspace.""" filename = update.get("file", "POLICY.md") change = update.get("change", "") if not change: return try: current_content = self.workspace_manager.load_agent_file( config_name=self.config_name, agent_id=agent_id, filename=filename ) # Check if change is already there to avoid duplicates if change.strip() in current_content: logger.info(f"APO: Change already present in {agent_id}/{filename}") return new_content = current_content + "\n\n### APO Update (" + datetime.now().strftime("%Y-%m-%d") + ")\n" + change self.workspace_manager.update_agent_file( config_name=self.config_name, agent_id=agent_id, filename=filename, content=new_content ) logger.info(f"APO: Updated {agent_id}/{filename} with new heuristics.") except Exception as e: logger.error(f"APO: Failed to apply update to {agent_id}/{filename}: {e}")