Files
evotraders/backend/agents/toolkit_factory.py
cillin 45c3996434 refactor(cleanup): remove legacy agent classes and complete EvoAgent migration
Remove deprecated AnalystAgent, PMAgent, and RiskAgent classes.
All agent creation now goes through UnifiedAgentFactory creating EvoAgent instances.

- Delete backend/agents/analyst.py (169 lines)
- Delete backend/agents/portfolio_manager.py (420 lines)
- Delete backend/agents/risk_manager.py (139 lines)
- Update all imports to use EvoAgent exclusively
- Clean up unused imports across 25 files
- Update tests to work with simplified agent structure

Constraint: EvoAgent is now the single source of truth for all agent roles
Constraint: UnifiedAgentFactory handles runtime agent creation
Rejected: Keep legacy aliases | creates maintenance burden
Confidence: high
Scope-risk: moderate (affects agent instantiation paths)
Directive: All new agent features must be added to EvoAgent, not legacy classes
Not-tested: Kubernetes sandbox executor (marked with TODO)
2026-04-02 10:51:14 +08:00

517 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""Toolkit factory following AgentScope's skill + tool group practices.
支持从Agent工作空间动态创建工具集加载builtin/customized技能
以及合并Agent特定工具。
"""
from typing import Any, Dict, Iterable, List, Optional, Set
from pathlib import Path
import yaml
from backend.agents.agent_workspace import load_agent_workspace_config
from backend.agents.skills_manager import SkillsManager
from backend.agents.skill_metadata import parse_skill_metadata
from backend.config.bootstrap_config import get_bootstrap_config_for_run
def load_agent_profiles() -> Dict[str, Dict[str, Any]]:
"""加载Agent配置文件"""
config_path = SkillsManager().project_root / "backend" / "config" / "agent_profiles.yaml"
with open(config_path, "r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def _register_analysis_tool_groups(toolkit: Any) -> None:
"""注册分析工具组"""
from backend.tools.analysis_tools import TOOL_REGISTRY
tool_groups = {
"fundamentals": {
"description": "Financial health, profitability, growth, and efficiency analysis tools.",
"active": False,
"notes": (
"Use these tools to validate business quality, financial resilience, "
"and earnings durability before making directional conclusions."
),
"tools": [
"analyze_profitability",
"analyze_growth",
"analyze_financial_health",
"analyze_efficiency_ratios",
"analyze_valuation_ratios",
"get_financial_metrics_tool",
],
},
"technical": {
"description": "Trend, momentum, mean reversion, and volatility analysis tools.",
"active": False,
"notes": (
"Use these tools to assess timing, price structure, and risk-reward in "
"the current market regime."
),
"tools": [
"analyze_trend_following",
"analyze_momentum",
"analyze_mean_reversion",
"analyze_volatility",
],
},
"sentiment": {
"description": "News sentiment and insider activity analysis tools.",
"active": False,
"notes": (
"Use these tools to capture short-horizon catalysts, sentiment shifts, "
"and behavioral signals around each ticker."
),
"tools": [
"analyze_news_sentiment",
"analyze_insider_trading",
],
},
"valuation": {
"description": "Intrinsic value and relative valuation analysis tools.",
"active": False,
"notes": (
"Use these tools when the task requires fair value estimation, margin of "
"safety analysis, or valuation scenario comparison."
),
"tools": [
"dcf_valuation_analysis",
"owner_earnings_valuation_analysis",
"ev_ebitda_valuation_analysis",
"residual_income_valuation_analysis",
],
},
}
for group_name, group_config in tool_groups.items():
toolkit.create_tool_group(
group_name=group_name,
description=group_config["description"],
active=group_config["active"],
notes=group_config["notes"],
)
for tool_name in group_config["tools"]:
tool_func = TOOL_REGISTRY.get(tool_name)
if tool_func:
toolkit.register_tool_function(
tool_func,
group_name=group_name,
)
def _register_portfolio_tool_groups(toolkit: Any, pm_agent: Any) -> None:
"""注册投资组合工具组"""
toolkit.create_tool_group(
group_name="portfolio_ops",
description="Portfolio decision recording tools.",
active=False,
notes=(
"Use portfolio tools only after synthesizing analyst and risk inputs. "
"Record one explicit decision per ticker."
),
)
toolkit.register_tool_function(
pm_agent._make_decision,
group_name="portfolio_ops",
)
if hasattr(pm_agent, "_add_team_analyst"):
toolkit.register_tool_function(
pm_agent._add_team_analyst,
group_name="portfolio_ops",
)
if hasattr(pm_agent, "_remove_team_analyst"):
toolkit.register_tool_function(
pm_agent._remove_team_analyst,
group_name="portfolio_ops",
)
if hasattr(pm_agent, "_set_active_analysts"):
toolkit.register_tool_function(
pm_agent._set_active_analysts,
group_name="portfolio_ops",
)
if hasattr(pm_agent, "_create_team_analyst"):
toolkit.register_tool_function(
pm_agent._create_team_analyst,
group_name="portfolio_ops",
)
def _register_risk_tool_groups(toolkit: Any) -> None:
"""注册风险工具组"""
from backend.tools.risk_tools import (
assess_margin_and_liquidity,
assess_position_concentration,
assess_volatility_exposure,
)
toolkit.create_tool_group(
group_name="risk_ops",
description="Risk diagnostics for concentration, leverage, and volatility.",
active=False,
notes=(
"Use risk tools to quantify concentration, margin pressure, and volatility "
"before writing the final risk memo."
),
)
toolkit.register_tool_function(
assess_position_concentration,
group_name="risk_ops",
)
toolkit.register_tool_function(
assess_margin_and_liquidity,
group_name="risk_ops",
)
toolkit.register_tool_function(
assess_volatility_exposure,
group_name="risk_ops",
)
def create_agent_toolkit(
agent_id: str,
config_name: str,
owner: Any = None,
active_skill_dirs: Iterable[str] | None = None,
) -> Any:
"""Create a Toolkit with agent skills and grouped tools.
Args:
agent_id: Agent标识符
config_name: 运行配置名称
owner: Agent实例用于注册特定方法
active_skill_dirs: 显式指定的活动技能目录列表
Returns:
配置好的Toolkit实例
"""
from agentscope.tool import Toolkit
profiles = load_agent_profiles()
profile = profiles.get(agent_id, {})
skills_manager = SkillsManager()
agent_config = load_agent_workspace_config(
skills_manager.get_agent_asset_dir(config_name, agent_id) / "agent.yaml",
)
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
)
override = bootstrap_config.agent_override(agent_id)
active_groups = override.get(
"active_tool_groups",
agent_config.active_tool_groups
or profile.get("active_tool_groups", []),
)
disabled_groups = set(agent_config.disabled_tool_groups)
if disabled_groups:
active_groups = [
group_name
for group_name in active_groups
if group_name not in disabled_groups
]
toolkit = Toolkit(
agent_skill_instruction=(
"<system-info>You have access to project skills. Each skill lives in a "
"directory and is described by SKILL.md. Follow the skill instructions "
"when they are relevant to the current task.</system-info>"
),
agent_skill_template="- {name} (dir: {dir}): {description}",
)
if agent_id.endswith("_analyst"):
_register_analysis_tool_groups(toolkit)
elif agent_id == "portfolio_manager" and owner is not None:
_register_portfolio_tool_groups(toolkit, owner)
elif agent_id == "risk_manager":
_register_risk_tool_groups(toolkit)
if active_skill_dirs is None:
skill_names = skills_manager.resolve_agent_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=profile.get("skills", []),
)
active_skill_dirs = [
skills_manager.get_agent_active_root(config_name, agent_id) / skill_name
for skill_name in skill_names
]
for skill_dir in active_skill_dirs:
toolkit.register_agent_skill(str(skill_dir))
apply_skill_tool_restrictions(toolkit, active_skill_dirs)
if active_groups:
toolkit.update_tool_groups(group_names=active_groups, active=True)
return toolkit
def create_toolkit_from_workspace(
agent_id: str,
config_name: str,
owner: Any = None,
include_builtin: bool = True,
include_customized: bool = True,
include_local: bool = True,
active_groups: Optional[List[str]] = None,
) -> Any:
"""从Agent工作空间创建工具集
这是create_agent_toolkit的增强版本支持更灵活的技能加载策略。
Args:
agent_id: Agent标识符
config_name: 运行配置名称
owner: Agent实例
include_builtin: 是否包含builtin技能
include_customized: 是否包含customized技能
include_local: 是否包含agent-local技能
active_groups: 显式指定的活动工具组
Returns:
配置好的Toolkit实例
"""
from agentscope.tool import Toolkit
skills_manager = SkillsManager()
agent_config = load_agent_workspace_config(
skills_manager.get_agent_asset_dir(config_name, agent_id) / "agent.yaml",
)
toolkit = Toolkit(
agent_skill_instruction=(
"<system-info>You have access to project skills. Each skill lives in a "
"directory and is described by SKILL.md. Follow the skill instructions "
"when they are relevant to the current task.</system-info>"
),
agent_skill_template="- {name} (dir: {dir}): {description}",
)
# 注册Agent类型的默认工具组
if agent_id.endswith("_analyst"):
_register_analysis_tool_groups(toolkit)
elif agent_id == "portfolio_manager" and owner is not None:
_register_portfolio_tool_groups(toolkit, owner)
elif agent_id == "risk_manager":
_register_risk_tool_groups(toolkit)
# 收集所有要加载的技能目录
skill_dirs: List[Path] = []
# 1. 从active目录加载已同步的技能
active_root = skills_manager.get_agent_active_root(config_name, agent_id)
if active_root.exists():
for skill_dir in sorted(active_root.iterdir()):
if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
skill_dirs.append(skill_dir)
# 2. 从installed目录加载
installed_root = skills_manager.get_agent_installed_root(config_name, agent_id)
if installed_root.exists():
for skill_dir in sorted(installed_root.iterdir()):
if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
if skill_dir not in skill_dirs:
skill_dirs.append(skill_dir)
# 3. 从local目录加载agent-local技能
if include_local:
local_root = skills_manager.get_agent_local_root(config_name, agent_id)
if local_root.exists():
for skill_dir in sorted(local_root.iterdir()):
if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
if skill_dir not in skill_dirs:
skill_dirs.append(skill_dir)
# 注册技能到toolkit
for skill_dir in skill_dirs:
toolkit.register_agent_skill(str(skill_dir))
apply_skill_tool_restrictions(toolkit, skill_dirs)
# 激活指定的工具组
if active_groups is None:
# 从配置中读取
profiles = load_agent_profiles()
profile = profiles.get(agent_id, {})
active_groups = agent_config.active_tool_groups or profile.get("active_tool_groups", [])
# 应用禁用列表
disabled_groups = set(agent_config.disabled_tool_groups)
if disabled_groups:
active_groups = [g for g in active_groups if g not in disabled_groups]
if active_groups:
toolkit.update_tool_groups(group_names=active_groups, active=True)
return toolkit
def get_toolkit_info(toolkit: Any) -> Dict[str, Any]:
"""获取工具集信息
Args:
toolkit: Toolkit实例
Returns:
工具集信息字典
"""
info = {
"tool_groups": {},
"skills": [],
"tools_count": 0,
}
# 获取工具组信息
groups = getattr(toolkit, "tool_groups", {})
for name, group in groups.items():
info["tool_groups"][name] = {
"description": getattr(group, "description", ""),
"active": getattr(group, "active", False),
"tools": [t.name for t in getattr(group, "tools", [])],
}
info["tools_count"] += len(getattr(group, "tools", []))
# 获取技能信息
skills = getattr(toolkit, "agent_skills", [])
for skill in skills:
info["skills"].append({
"name": getattr(skill, "name", "unknown"),
"path": getattr(skill, "path", ""),
"description": getattr(skill, "description", ""),
})
return info
def refresh_toolkit_skills(
toolkit: Any,
agent_id: str,
config_name: str,
) -> None:
"""刷新工具集中的技能
重新从工作空间加载技能,用于运行时技能变更。
Args:
toolkit: Toolkit实例
agent_id: Agent标识符
config_name: 运行配置名称
"""
skills_manager = SkillsManager()
# 清除现有技能
if hasattr(toolkit, "agent_skills"):
toolkit.agent_skills.clear()
# 重新加载active技能
active_root = skills_manager.get_agent_active_root(config_name, agent_id)
if active_root.exists():
for skill_dir in sorted(active_root.iterdir()):
if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
toolkit.register_agent_skill(str(skill_dir))
# 重新加载local技能
local_root = skills_manager.get_agent_local_root(config_name, agent_id)
if local_root.exists():
for skill_dir in sorted(local_root.iterdir()):
if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
toolkit.register_agent_skill(str(skill_dir))
def apply_skill_tool_restrictions(toolkit: Any, skill_dirs: List[Path]) -> None:
"""Apply per-skill allowed_tools / denied_tools restrictions to a toolkit.
If a skill specifies allowed_tools, only those tools are accessible when
that skill is active. If a skill specifies denied_tools, those tools are
removed regardless of allowed_tools. Denied tools take precedence.
This function annotates the toolkit with a _skill_tool_restrictions map
that downstream code can consult when resolving available tools.
Args:
toolkit: The agentscope Toolkit instance.
skill_dirs: List of skill directory paths to inspect.
"""
restrictions: Dict[str, Dict[str, Set[str]]] = {}
for skill_dir in skill_dirs:
metadata = parse_skill_metadata(skill_dir, source="active")
if not metadata.allowed_tools and not metadata.denied_tools:
continue
restrictions[skill_dir.name] = {
"allowed": set(metadata.allowed_tools),
"denied": set(metadata.denied_tools),
}
if hasattr(toolkit, "agent_skills"):
for skill in toolkit.agent_skills:
skill_name = getattr(skill, "name", "") or ""
if skill_name in restrictions:
setattr(
skill,
"_tool_allowed",
restrictions[skill_name]["allowed"],
)
setattr(
skill,
"_tool_denied",
restrictions[skill_name]["denied"],
)
def get_skill_effective_tools(skill: Any) -> Optional[Set[str]]:
"""Return the effective tool set for a skill after applying restrictions.
If the skill has no restrictions (no allowed_tools / denied_tools),
returns None to indicate "all tools allowed".
If allowed_tools is set, returns only those tools minus denied_tools.
If only denied_tools is set, returns all tools minus denied_tools.
Args:
skill: A skill object previously registered via register_agent_skill.
Returns:
A set of allowed tool names, or None if unrestricted.
"""
allowed = getattr(skill, "_tool_allowed", None)
denied = getattr(skill, "_tool_denied", set())
if allowed is None:
return None
effective = allowed - denied
return effective
def filter_toolkit_by_skill(
toolkit: Any,
skill_name: str,
) -> Set[str]:
"""Return the set of tool names that are accessible for a given skill.
Args:
toolkit: The agentscope Toolkit instance.
skill_name: Name of the skill to query.
Returns:
Set of allowed tool names, or all registered tool names if unrestricted.
"""
if not hasattr(toolkit, "agent_skills"):
return set()
for skill in toolkit.agent_skills:
name = getattr(skill, "name", "") or ""
if name != skill_name:
continue
effective = get_skill_effective_tools(skill)
if effective is None:
return set()
return effective
return set()