# -*- coding: utf-8 -*- """ Analyst Agent - Based on AgentScope ReActAgent Performs analysis using tools and LLM """ from typing import Any, Dict, Optional from agentscope.agent import ReActAgent from agentscope.memory import InMemoryMemory, LongTermMemoryBase from agentscope.message import Msg from ..config.constants import ANALYST_TYPES from ..utils.progress import progress from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache class AnalystAgent(ReActAgent): """ Analyst Agent - Uses LLM for tool selection and analysis Inherits from AgentScope's ReActAgent """ def __init__( self, analyst_type: str, toolkit: Any, model: Any, formatter: Any, agent_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, long_term_memory: Optional[LongTermMemoryBase] = None, ): """ Initialize Analyst Agent Args: analyst_type: Type of analyst (e.g., "fundamentals", etc.) toolkit: AgentScope Toolkit instance model: LLM model instance formatter: Message formatter instance agent_id: Agent ID (defaults to "{analyst_type}_analyst") config: Configuration dictionary long_term_memory: Optional ReMeTaskLongTermMemory instance """ if analyst_type not in ANALYST_TYPES: raise ValueError( f"Unknown analyst type: {analyst_type}. " f"Must be one of: {list(ANALYST_TYPES.keys())}", ) object.__setattr__(self, "analyst_type_key", analyst_type) object.__setattr__( self, "analyst_persona", ANALYST_TYPES[analyst_type]["display_name"], ) if agent_id is None: agent_id = analyst_type object.__setattr__(self, "agent_id", agent_id) object.__setattr__(self, "config", config or {}) object.__setattr__(self, "toolkit", toolkit) sys_prompt = self._load_system_prompt() kwargs = { "name": agent_id, "sys_prompt": sys_prompt, "model": model, "formatter": formatter, "toolkit": toolkit, "memory": InMemoryMemory(), "max_iters": 10, } if long_term_memory: kwargs["long_term_memory"] = long_term_memory kwargs["long_term_memory_mode"] = "static_control" super().__init__(**kwargs) def _load_system_prompt(self) -> str: """Load system prompt for analyst""" return build_agent_system_prompt( agent_id=self.agent_id, config_name=self.config.get("config_name", "default"), toolkit=self.toolkit, ) async def reply(self, x: Msg = None) -> Msg: """ Override reply method to add progress tracking Args: x: Input message (content must be str) Returns: Response message (content is str) """ ticker = None if x and hasattr(x, "metadata") and x.metadata: ticker = x.metadata.get("tickers") if ticker: progress.update_status( self.name, ticker, f"Starting {self.analyst_persona} analysis", ) result = await super().reply(x) if ticker: progress.update_status( self.name, ticker, "Analysis completed", ) return result def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None: """Reload toolkit and system prompt from current run assets.""" from .toolkit_factory import create_agent_toolkit clear_prompt_factory_cache() self.toolkit = create_agent_toolkit( self.agent_id, self.config.get("config_name", "default"), active_skill_dirs=active_skill_dirs, ) self._apply_runtime_sys_prompt(self._load_system_prompt()) def _apply_runtime_sys_prompt(self, sys_prompt: str) -> None: """Update the prompt used by future turns and the cached system msg.""" self._sys_prompt = sys_prompt for msg, _marks in self.memory.content: if getattr(msg, "role", None) == "system": msg.content = sys_prompt break