Files
evotraders/backend/agents/unified_factory.py
cillin 16b54d5ccc feat(agent): complete EvoAgent integration for all 6 agent roles
Migrate all agent roles from Legacy to EvoAgent architecture:
- fundamentals_analyst, technical_analyst, sentiment_analyst, valuation_analyst
- risk_manager, portfolio_manager

Key changes:
- EvoAgent now supports Portfolio Manager compatibility methods (_make_decision,
  get_decisions, get_portfolio_state, load_portfolio_state, update_portfolio)
- Add UnifiedAgentFactory for centralized agent creation
- ToolGuard with batch approval API and WebSocket broadcast
- Legacy agents marked deprecated (AnalystAgent, RiskAgent, PMAgent)
- Remove backend/agents/compat.py migration shim
- Add run_id alongside workspace_id for semantic clarity
- Complete integration test coverage (13 tests)
- All smoke tests passing for 6 agent roles

Constraint: Must maintain backward compatibility with existing run configs
Constraint: Memory support must work with EvoAgent (no fallback to Legacy)
Rejected: Separate PM implementation for EvoAgent | unified approach cleaner
Confidence: high
Scope-risk: broad
Directive: EVO_AGENT_IDS env var still respected but defaults to all roles
Not-tested: Kubernetes sandbox mode for skill execution
2026-04-02 00:55:08 +08:00

433 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""Unified Agent Factory - Centralized agent creation for 大时代.
This module provides a unified factory for creating all agent types (analysts,
risk manager, portfolio manager) with consistent configuration. It replaces
the scattered agent creation logic in main.py, pipeline.py, and pipeline_runner.py.
Key features:
- Single entry point for all agent creation
- Automatic EvoAgent vs Legacy Agent selection based on _resolve_evo_agent_ids()
- Consistent parameter handling across all agent types
- Support for workspace-driven configuration
- Long-term memory integration
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Protocol, TypeVar, Union
if TYPE_CHECKING:
from backend.agents.base.evo_agent import EvoAgent
from backend.agents.analyst import AnalystAgent
from backend.agents.risk_manager import RiskAgent
from backend.agents.portfolio_manager import PMAgent
# Type aliases for agent types
AgentType = Union["EvoAgent", "AnalystAgent", "RiskAgent", "PMAgent"]
T = TypeVar("T")
class AgentFactoryProtocol(Protocol):
"""Protocol for agent factory implementations."""
def create_analyst(
self,
analyst_type: str,
model: Any,
formatter: Any,
active_skill_dirs: Optional[list[Path]] = None,
long_term_memory: Optional[Any] = None,
) -> AnalystAgent | EvoAgent: ...
def create_risk_manager(
self,
model: Any,
formatter: Any,
active_skill_dirs: Optional[list[Path]] = None,
long_term_memory: Optional[Any] = None,
) -> RiskAgent | EvoAgent: ...
def create_portfolio_manager(
self,
model: Any,
formatter: Any,
initial_cash: float,
margin_requirement: float,
active_skill_dirs: Optional[list[Path]] = None,
long_term_memory: Optional[Any] = None,
) -> PMAgent | EvoAgent: ...
class UnifiedAgentFactory:
"""Unified factory for creating agents with consistent configuration.
This factory centralizes agent creation logic and automatically selects
between EvoAgent (new) and Legacy Agent based on the EVO_AGENT_IDS
environment variable configuration.
By default, all supported roles use EvoAgent. Set EVO_AGENT_IDS=legacy
to disable EvoAgent entirely.
Example:
factory = UnifiedAgentFactory(
config_name="smoke_fullstack",
skills_manager=skills_manager,
)
# Create analyst
analyst = factory.create_analyst(
analyst_type="fundamentals_analyst",
model=model,
formatter=formatter,
)
# Create risk manager
risk_mgr = factory.create_risk_manager(
model=model,
formatter=formatter,
)
# Create portfolio manager
pm = factory.create_portfolio_manager(
model=model,
formatter=formatter,
initial_cash=100000.0,
margin_requirement=0.5,
)
"""
def __init__(
self,
config_name: str,
skills_manager: Any,
toolkit_factory: Optional[Any] = None,
evo_agent_ids: Optional[set[str]] = None,
):
"""Initialize the agent factory.
Args:
config_name: Run configuration name (e.g., "smoke_fullstack")
skills_manager: SkillsManager instance for skill/asset management
toolkit_factory: Optional factory function for creating toolkits
evo_agent_ids: Optional set of agent IDs to use EvoAgent.
If None, uses _resolve_evo_agent_ids() default.
"""
self.config_name = config_name
self.skills_manager = skills_manager
self.toolkit_factory = toolkit_factory
# Determine which agents should use EvoAgent
if evo_agent_ids is not None:
self._evo_agent_ids = evo_agent_ids
else:
self._evo_agent_ids = self._resolve_evo_agent_ids()
def _resolve_evo_agent_ids(self) -> set[str]:
"""Return agent ids selected to use EvoAgent.
By default, all supported roles use EvoAgent.
EVO_AGENT_IDS can be used to limit to specific roles.
"""
from backend.config.constants import ANALYST_TYPES
all_supported = set(ANALYST_TYPES) | {"risk_manager", "portfolio_manager"}
raw = os.getenv("EVO_AGENT_IDS", "")
if not raw.strip():
# Default: all supported roles use EvoAgent
return all_supported
if raw.strip().lower() in ("legacy", "old", "none"):
return set()
requested = {item.strip() for item in raw.split(",") if item.strip()}
return {
agent_id
for agent_id in requested
if agent_id in ANALYST_TYPES
or agent_id in {"risk_manager", "portfolio_manager"}
}
def _should_use_evo_agent(self, agent_id: str) -> bool:
"""Check if an agent should use EvoAgent."""
return agent_id in self._evo_agent_ids
def _create_toolkit(
self,
agent_type: str,
active_skill_dirs: Optional[list[Path]] = None,
owner: Optional[Any] = None,
) -> Any:
"""Create toolkit for an agent."""
if self.toolkit_factory is None:
from backend.agents.toolkit_factory import create_agent_toolkit
self.toolkit_factory = create_agent_toolkit
kwargs: dict[str, Any] = {
"active_skill_dirs": active_skill_dirs or [],
}
if owner is not None:
kwargs["owner"] = owner
return self.toolkit_factory(agent_type, self.config_name, **kwargs)
def _load_agent_config(self, agent_id: str) -> Any:
"""Load agent configuration from workspace."""
from backend.agents.agent_workspace import load_agent_workspace_config
workspace_dir = self.skills_manager.get_agent_asset_dir(
self.config_name, agent_id
)
config_path = workspace_dir / "agent.yaml"
if config_path.exists():
return load_agent_workspace_config(config_path)
# Return default config if no agent.yaml
return type(
"AgentConfig",
(),
{"prompt_files": ["SOUL.md"]},
)()
def _create_evo_agent(
self,
agent_id: str,
model: Any,
formatter: Any,
toolkit: Any,
agent_config: Any,
long_term_memory: Optional[Any] = None,
extra_kwargs: Optional[dict[str, Any]] = None,
) -> "EvoAgent":
"""Create an EvoAgent instance."""
from backend.agents.base.evo_agent import EvoAgent
workspace_dir = self.skills_manager.get_agent_asset_dir(
self.config_name, agent_id
)
kwargs: dict[str, Any] = {
"agent_id": agent_id,
"config_name": self.config_name,
"workspace_dir": workspace_dir,
"model": model,
"formatter": formatter,
"skills_manager": self.skills_manager,
"prompt_files": getattr(agent_config, "prompt_files", ["SOUL.md"]),
"long_term_memory": long_term_memory,
}
if extra_kwargs:
kwargs.update(extra_kwargs)
agent = EvoAgent(**kwargs)
agent.toolkit = toolkit
setattr(agent, "run_id", self.config_name)
# Keep workspace_id for backward compatibility
setattr(agent, "workspace_id", self.config_name)
return agent
def create_analyst(
self,
analyst_type: str,
model: Any,
formatter: Any,
active_skill_dirs: Optional[list[Path]] = None,
long_term_memory: Optional[Any] = None,
) -> "AnalystAgent | EvoAgent":
"""Create an analyst agent.
Args:
analyst_type: Type of analyst (fundamentals, technical, sentiment, valuation)
model: LLM model instance
formatter: Message formatter instance
active_skill_dirs: Optional list of active skill directories
long_term_memory: Optional long-term memory instance
Returns:
AnalystAgent or EvoAgent instance
"""
toolkit = self._create_toolkit(analyst_type, active_skill_dirs)
if self._should_use_evo_agent(analyst_type):
agent_config = self._load_agent_config(analyst_type)
return self._create_evo_agent(
agent_id=analyst_type,
model=model,
formatter=formatter,
toolkit=toolkit,
agent_config=agent_config,
long_term_memory=long_term_memory,
)
# Legacy path
from backend.agents.analyst import AnalystAgent
return AnalystAgent(
analyst_type=analyst_type,
toolkit=toolkit,
model=model,
formatter=formatter,
agent_id=analyst_type,
config={"config_name": self.config_name},
long_term_memory=long_term_memory,
)
def create_risk_manager(
self,
model: Any,
formatter: Any,
active_skill_dirs: Optional[list[Path]] = None,
long_term_memory: Optional[Any] = None,
) -> "RiskAgent | EvoAgent":
"""Create a risk manager agent.
Args:
model: LLM model instance
formatter: Message formatter instance
active_skill_dirs: Optional list of active skill directories
long_term_memory: Optional long-term memory instance
Returns:
RiskAgent or EvoAgent instance
"""
toolkit = self._create_toolkit("risk_manager", active_skill_dirs)
if self._should_use_evo_agent("risk_manager"):
agent_config = self._load_agent_config("risk_manager")
return self._create_evo_agent(
agent_id="risk_manager",
model=model,
formatter=formatter,
toolkit=toolkit,
agent_config=agent_config,
long_term_memory=long_term_memory,
)
# Legacy path
from backend.agents.risk_manager import RiskAgent
return RiskAgent(
model=model,
formatter=formatter,
name="risk_manager",
config={"config_name": self.config_name},
long_term_memory=long_term_memory,
toolkit=toolkit,
)
def create_portfolio_manager(
self,
model: Any,
formatter: Any,
initial_cash: float,
margin_requirement: float,
active_skill_dirs: Optional[list[Path]] = None,
long_term_memory: Optional[Any] = None,
) -> "PMAgent | EvoAgent":
"""Create a portfolio manager agent.
Args:
model: LLM model instance
formatter: Message formatter instance
initial_cash: Initial cash allocation
margin_requirement: Margin requirement ratio
active_skill_dirs: Optional list of active skill directories
long_term_memory: Optional long-term memory instance
Returns:
PMAgent or EvoAgent instance
"""
if self._should_use_evo_agent("portfolio_manager"):
agent_config = self._load_agent_config("portfolio_manager")
# For PM, toolkit is created after agent (needs owner reference)
from backend.agents.base.evo_agent import EvoAgent
workspace_dir = self.skills_manager.get_agent_asset_dir(
self.config_name, "portfolio_manager"
)
agent = EvoAgent(
agent_id="portfolio_manager",
config_name=self.config_name,
workspace_dir=workspace_dir,
model=model,
formatter=formatter,
skills_manager=self.skills_manager,
prompt_files=getattr(agent_config, "prompt_files", ["SOUL.md"]),
initial_cash=initial_cash,
margin_requirement=margin_requirement,
long_term_memory=long_term_memory,
)
agent.toolkit = self._create_toolkit(
"portfolio_manager", active_skill_dirs, owner=agent
)
setattr(agent, "run_id", self.config_name)
# Keep workspace_id for backward compatibility
setattr(agent, "workspace_id", self.config_name)
return agent
# Legacy path
from backend.agents.portfolio_manager import PMAgent
return PMAgent(
name="portfolio_manager",
model=model,
formatter=formatter,
initial_cash=initial_cash,
margin_requirement=margin_requirement,
config={"config_name": self.config_name},
long_term_memory=long_term_memory,
toolkit_factory=self.toolkit_factory,
toolkit_factory_kwargs={"active_skill_dirs": active_skill_dirs or []},
)
# Singleton factory instance cache
_factory_cache: dict[str, UnifiedAgentFactory] = {}
def get_agent_factory(
config_name: str,
skills_manager: Any,
toolkit_factory: Optional[Any] = None,
) -> UnifiedAgentFactory:
"""Get or create a cached agent factory instance.
Args:
config_name: Run configuration name
skills_manager: SkillsManager instance
toolkit_factory: Optional toolkit factory function
Returns:
UnifiedAgentFactory instance (cached per config_name)
"""
cache_key = f"{config_name}:{id(skills_manager)}"
if cache_key not in _factory_cache:
_factory_cache[cache_key] = UnifiedAgentFactory(
config_name=config_name,
skills_manager=skills_manager,
toolkit_factory=toolkit_factory,
)
return _factory_cache[cache_key]
def clear_factory_cache() -> None:
"""Clear the factory cache. Useful for testing."""
_factory_cache.clear()
__all__ = [
"UnifiedAgentFactory",
"AgentFactoryProtocol",
"get_agent_factory",
"clear_factory_cache",
]