# -*- coding: utf-8 -*- """Unified Agent Factory - Centralized agent creation for 大时代. This module provides a unified factory for creating all agent types (analysts, risk manager, portfolio manager) with consistent configuration. It replaces the scattered agent creation logic in main.py, pipeline.py, and pipeline_runner.py. Key features: - Single entry point for all agent creation - Automatic EvoAgent vs Legacy Agent selection based on _resolve_evo_agent_ids() - Consistent parameter handling across all agent types - Support for workspace-driven configuration - Long-term memory integration """ from __future__ import annotations import os from pathlib import Path from typing import TYPE_CHECKING, Any, Optional, Protocol, TypeVar, Union if TYPE_CHECKING: from backend.agents.base.evo_agent import EvoAgent from backend.agents.analyst import AnalystAgent from backend.agents.risk_manager import RiskAgent from backend.agents.portfolio_manager import PMAgent # Type aliases for agent types AgentType = Union["EvoAgent", "AnalystAgent", "RiskAgent", "PMAgent"] T = TypeVar("T") class AgentFactoryProtocol(Protocol): """Protocol for agent factory implementations.""" def create_analyst( self, analyst_type: str, model: Any, formatter: Any, active_skill_dirs: Optional[list[Path]] = None, long_term_memory: Optional[Any] = None, ) -> AnalystAgent | EvoAgent: ... def create_risk_manager( self, model: Any, formatter: Any, active_skill_dirs: Optional[list[Path]] = None, long_term_memory: Optional[Any] = None, ) -> RiskAgent | EvoAgent: ... def create_portfolio_manager( self, model: Any, formatter: Any, initial_cash: float, margin_requirement: float, active_skill_dirs: Optional[list[Path]] = None, long_term_memory: Optional[Any] = None, ) -> PMAgent | EvoAgent: ... class UnifiedAgentFactory: """Unified factory for creating agents with consistent configuration. This factory centralizes agent creation logic and automatically selects between EvoAgent (new) and Legacy Agent based on the EVO_AGENT_IDS environment variable configuration. By default, all supported roles use EvoAgent. Set EVO_AGENT_IDS=legacy to disable EvoAgent entirely. Example: factory = UnifiedAgentFactory( config_name="smoke_fullstack", skills_manager=skills_manager, ) # Create analyst analyst = factory.create_analyst( analyst_type="fundamentals_analyst", model=model, formatter=formatter, ) # Create risk manager risk_mgr = factory.create_risk_manager( model=model, formatter=formatter, ) # Create portfolio manager pm = factory.create_portfolio_manager( model=model, formatter=formatter, initial_cash=100000.0, margin_requirement=0.5, ) """ def __init__( self, config_name: str, skills_manager: Any, toolkit_factory: Optional[Any] = None, evo_agent_ids: Optional[set[str]] = None, ): """Initialize the agent factory. Args: config_name: Run configuration name (e.g., "smoke_fullstack") skills_manager: SkillsManager instance for skill/asset management toolkit_factory: Optional factory function for creating toolkits evo_agent_ids: Optional set of agent IDs to use EvoAgent. If None, uses _resolve_evo_agent_ids() default. """ self.config_name = config_name self.skills_manager = skills_manager self.toolkit_factory = toolkit_factory # Determine which agents should use EvoAgent if evo_agent_ids is not None: self._evo_agent_ids = evo_agent_ids else: self._evo_agent_ids = self._resolve_evo_agent_ids() def _resolve_evo_agent_ids(self) -> set[str]: """Return agent ids selected to use EvoAgent. By default, all supported roles use EvoAgent. EVO_AGENT_IDS can be used to limit to specific roles. """ from backend.config.constants import ANALYST_TYPES all_supported = set(ANALYST_TYPES) | {"risk_manager", "portfolio_manager"} raw = os.getenv("EVO_AGENT_IDS", "") if not raw.strip(): # Default: all supported roles use EvoAgent return all_supported if raw.strip().lower() in ("legacy", "old", "none"): return set() requested = {item.strip() for item in raw.split(",") if item.strip()} return { agent_id for agent_id in requested if agent_id in ANALYST_TYPES or agent_id in {"risk_manager", "portfolio_manager"} } def _should_use_evo_agent(self, agent_id: str) -> bool: """Check if an agent should use EvoAgent.""" return agent_id in self._evo_agent_ids def _create_toolkit( self, agent_type: str, active_skill_dirs: Optional[list[Path]] = None, owner: Optional[Any] = None, ) -> Any: """Create toolkit for an agent.""" if self.toolkit_factory is None: from backend.agents.toolkit_factory import create_agent_toolkit self.toolkit_factory = create_agent_toolkit kwargs: dict[str, Any] = { "active_skill_dirs": active_skill_dirs or [], } if owner is not None: kwargs["owner"] = owner return self.toolkit_factory(agent_type, self.config_name, **kwargs) def _load_agent_config(self, agent_id: str) -> Any: """Load agent configuration from workspace.""" from backend.agents.agent_workspace import load_agent_workspace_config workspace_dir = self.skills_manager.get_agent_asset_dir( self.config_name, agent_id ) config_path = workspace_dir / "agent.yaml" if config_path.exists(): return load_agent_workspace_config(config_path) # Return default config if no agent.yaml return type( "AgentConfig", (), {"prompt_files": ["SOUL.md"]}, )() def _create_evo_agent( self, agent_id: str, model: Any, formatter: Any, toolkit: Any, agent_config: Any, long_term_memory: Optional[Any] = None, extra_kwargs: Optional[dict[str, Any]] = None, ) -> "EvoAgent": """Create an EvoAgent instance.""" from backend.agents.base.evo_agent import EvoAgent workspace_dir = self.skills_manager.get_agent_asset_dir( self.config_name, agent_id ) kwargs: dict[str, Any] = { "agent_id": agent_id, "config_name": self.config_name, "workspace_dir": workspace_dir, "model": model, "formatter": formatter, "skills_manager": self.skills_manager, "prompt_files": getattr(agent_config, "prompt_files", ["SOUL.md"]), "long_term_memory": long_term_memory, } if extra_kwargs: kwargs.update(extra_kwargs) agent = EvoAgent(**kwargs) agent.toolkit = toolkit setattr(agent, "run_id", self.config_name) # Keep workspace_id for backward compatibility setattr(agent, "workspace_id", self.config_name) return agent def create_analyst( self, analyst_type: str, model: Any, formatter: Any, active_skill_dirs: Optional[list[Path]] = None, long_term_memory: Optional[Any] = None, ) -> "AnalystAgent | EvoAgent": """Create an analyst agent. Args: analyst_type: Type of analyst (fundamentals, technical, sentiment, valuation) model: LLM model instance formatter: Message formatter instance active_skill_dirs: Optional list of active skill directories long_term_memory: Optional long-term memory instance Returns: AnalystAgent or EvoAgent instance """ toolkit = self._create_toolkit(analyst_type, active_skill_dirs) if self._should_use_evo_agent(analyst_type): agent_config = self._load_agent_config(analyst_type) return self._create_evo_agent( agent_id=analyst_type, model=model, formatter=formatter, toolkit=toolkit, agent_config=agent_config, long_term_memory=long_term_memory, ) # Legacy path from backend.agents.analyst import AnalystAgent return AnalystAgent( analyst_type=analyst_type, toolkit=toolkit, model=model, formatter=formatter, agent_id=analyst_type, config={"config_name": self.config_name}, long_term_memory=long_term_memory, ) def create_risk_manager( self, model: Any, formatter: Any, active_skill_dirs: Optional[list[Path]] = None, long_term_memory: Optional[Any] = None, ) -> "RiskAgent | EvoAgent": """Create a risk manager agent. Args: model: LLM model instance formatter: Message formatter instance active_skill_dirs: Optional list of active skill directories long_term_memory: Optional long-term memory instance Returns: RiskAgent or EvoAgent instance """ toolkit = self._create_toolkit("risk_manager", active_skill_dirs) if self._should_use_evo_agent("risk_manager"): agent_config = self._load_agent_config("risk_manager") return self._create_evo_agent( agent_id="risk_manager", model=model, formatter=formatter, toolkit=toolkit, agent_config=agent_config, long_term_memory=long_term_memory, ) # Legacy path from backend.agents.risk_manager import RiskAgent return RiskAgent( model=model, formatter=formatter, name="risk_manager", config={"config_name": self.config_name}, long_term_memory=long_term_memory, toolkit=toolkit, ) def create_portfolio_manager( self, model: Any, formatter: Any, initial_cash: float, margin_requirement: float, active_skill_dirs: Optional[list[Path]] = None, long_term_memory: Optional[Any] = None, ) -> "PMAgent | EvoAgent": """Create a portfolio manager agent. Args: model: LLM model instance formatter: Message formatter instance initial_cash: Initial cash allocation margin_requirement: Margin requirement ratio active_skill_dirs: Optional list of active skill directories long_term_memory: Optional long-term memory instance Returns: PMAgent or EvoAgent instance """ if self._should_use_evo_agent("portfolio_manager"): agent_config = self._load_agent_config("portfolio_manager") # For PM, toolkit is created after agent (needs owner reference) from backend.agents.base.evo_agent import EvoAgent workspace_dir = self.skills_manager.get_agent_asset_dir( self.config_name, "portfolio_manager" ) agent = EvoAgent( agent_id="portfolio_manager", config_name=self.config_name, workspace_dir=workspace_dir, model=model, formatter=formatter, skills_manager=self.skills_manager, prompt_files=getattr(agent_config, "prompt_files", ["SOUL.md"]), initial_cash=initial_cash, margin_requirement=margin_requirement, long_term_memory=long_term_memory, ) agent.toolkit = self._create_toolkit( "portfolio_manager", active_skill_dirs, owner=agent ) setattr(agent, "run_id", self.config_name) # Keep workspace_id for backward compatibility setattr(agent, "workspace_id", self.config_name) return agent # Legacy path from backend.agents.portfolio_manager import PMAgent return PMAgent( name="portfolio_manager", model=model, formatter=formatter, initial_cash=initial_cash, margin_requirement=margin_requirement, config={"config_name": self.config_name}, long_term_memory=long_term_memory, toolkit_factory=self.toolkit_factory, toolkit_factory_kwargs={"active_skill_dirs": active_skill_dirs or []}, ) # Singleton factory instance cache _factory_cache: dict[str, UnifiedAgentFactory] = {} def get_agent_factory( config_name: str, skills_manager: Any, toolkit_factory: Optional[Any] = None, ) -> UnifiedAgentFactory: """Get or create a cached agent factory instance. Args: config_name: Run configuration name skills_manager: SkillsManager instance toolkit_factory: Optional toolkit factory function Returns: UnifiedAgentFactory instance (cached per config_name) """ cache_key = f"{config_name}:{id(skills_manager)}" if cache_key not in _factory_cache: _factory_cache[cache_key] = UnifiedAgentFactory( config_name=config_name, skills_manager=skills_manager, toolkit_factory=toolkit_factory, ) return _factory_cache[cache_key] def clear_factory_cache() -> None: """Clear the factory cache. Useful for testing.""" _factory_cache.clear() __all__ = [ "UnifiedAgentFactory", "AgentFactoryProtocol", "get_agent_factory", "clear_factory_cache", ]