# -*- coding: utf-8 -*- """Toolkit factory following AgentScope's skill + tool group practices. 支持从Agent工作空间动态创建工具集,加载builtin/customized技能, 以及合并Agent特定工具。 """ from typing import Any, Dict, Iterable, List, Optional, Set from pathlib import Path import yaml from backend.agents.agent_workspace import load_agent_workspace_config from backend.agents.skills_manager import SkillsManager from backend.agents.skill_metadata import parse_skill_metadata from backend.config.bootstrap_config import get_bootstrap_config_for_run from backend.tools.dynamic_team_tools import ( create_analyst, clone_analyst, remove_analyst, list_analyst_types, get_analyst_info, get_team_summary, ) def load_agent_profiles() -> Dict[str, Dict[str, Any]]: """加载Agent配置文件""" config_path = SkillsManager().project_root / "backend" / "config" / "agent_profiles.yaml" with open(config_path, "r", encoding="utf-8") as file: return yaml.safe_load(file) or {} def _register_analysis_tool_groups(toolkit: Any) -> None: """注册分析工具组""" from backend.tools.analysis_tools import TOOL_REGISTRY tool_groups = { "fundamentals": { "description": "Financial health, profitability, growth, and efficiency analysis tools.", "active": False, "notes": ( "Use these tools to validate business quality, financial resilience, " "and earnings durability before making directional conclusions." ), "tools": [ "analyze_profitability", "analyze_growth", "analyze_financial_health", "analyze_efficiency_ratios", "analyze_valuation_ratios", "get_financial_metrics_tool", ], }, "technical": { "description": "Trend, momentum, mean reversion, and volatility analysis tools.", "active": False, "notes": ( "Use these tools to assess timing, price structure, and risk-reward in " "the current market regime." ), "tools": [ "analyze_trend_following", "analyze_momentum", "analyze_mean_reversion", "analyze_volatility", ], }, "sentiment": { "description": "News sentiment and insider activity analysis tools.", "active": False, "notes": ( "Use these tools to capture short-horizon catalysts, sentiment shifts, " "and behavioral signals around each ticker." ), "tools": [ "analyze_news_sentiment", "analyze_insider_trading", ], }, "valuation": { "description": "Intrinsic value and relative valuation analysis tools.", "active": False, "notes": ( "Use these tools when the task requires fair value estimation, margin of " "safety analysis, or valuation scenario comparison." ), "tools": [ "dcf_valuation_analysis", "owner_earnings_valuation_analysis", "ev_ebitda_valuation_analysis", "residual_income_valuation_analysis", ], }, } for group_name, group_config in tool_groups.items(): toolkit.create_tool_group( group_name=group_name, description=group_config["description"], active=group_config["active"], notes=group_config["notes"], ) for tool_name in group_config["tools"]: tool_func = TOOL_REGISTRY.get(tool_name) if tool_func: toolkit.register_tool_function( tool_func, group_name=group_name, ) def _register_portfolio_tool_groups(toolkit: Any, pm_agent: Any) -> None: """注册投资组合工具组""" toolkit.create_tool_group( group_name="portfolio_ops", description="Portfolio decision recording tools.", active=False, notes=( "Use portfolio tools only after synthesizing analyst and risk inputs. " "Record one explicit decision per ticker." ), ) toolkit.register_tool_function( pm_agent._make_decision, group_name="portfolio_ops", ) if hasattr(pm_agent, "_add_team_analyst"): toolkit.register_tool_function( pm_agent._add_team_analyst, group_name="portfolio_ops", ) if hasattr(pm_agent, "_remove_team_analyst"): toolkit.register_tool_function( pm_agent._remove_team_analyst, group_name="portfolio_ops", ) if hasattr(pm_agent, "_set_active_analysts"): toolkit.register_tool_function( pm_agent._set_active_analysts, group_name="portfolio_ops", ) if hasattr(pm_agent, "_create_team_analyst"): toolkit.register_tool_function( pm_agent._create_team_analyst, group_name="portfolio_ops", ) # Register dynamic team management tools toolkit.create_tool_group( group_name="dynamic_team", description="Dynamic analyst team management tools.", active=False, notes=( "Use these tools to create, clone, and manage analyst agents dynamically. " "Only available when allow_dynamic_team_update is enabled." ), ) toolkit.register_tool_function(create_analyst, group_name="dynamic_team") toolkit.register_tool_function(clone_analyst, group_name="dynamic_team") toolkit.register_tool_function(remove_analyst, group_name="dynamic_team") toolkit.register_tool_function(list_analyst_types, group_name="dynamic_team") toolkit.register_tool_function(get_analyst_info, group_name="dynamic_team") toolkit.register_tool_function(get_team_summary, group_name="dynamic_team") def _register_risk_tool_groups(toolkit: Any) -> None: """注册风险工具组""" from backend.tools.risk_tools import ( assess_margin_and_liquidity, assess_position_concentration, assess_volatility_exposure, ) toolkit.create_tool_group( group_name="risk_ops", description="Risk diagnostics for concentration, leverage, and volatility.", active=False, notes=( "Use risk tools to quantify concentration, margin pressure, and volatility " "before writing the final risk memo." ), ) toolkit.register_tool_function( assess_position_concentration, group_name="risk_ops", ) toolkit.register_tool_function( assess_margin_and_liquidity, group_name="risk_ops", ) toolkit.register_tool_function( assess_volatility_exposure, group_name="risk_ops", ) def create_agent_toolkit( agent_id: str, config_name: str, owner: Any = None, active_skill_dirs: Iterable[str] | None = None, ) -> Any: """Create a Toolkit with agent skills and grouped tools. Args: agent_id: Agent标识符 config_name: 运行配置名称 owner: Agent实例(用于注册特定方法) active_skill_dirs: 显式指定的活动技能目录列表 Returns: 配置好的Toolkit实例 """ from agentscope.tool import Toolkit profiles = load_agent_profiles() profile = profiles.get(agent_id, {}) skills_manager = SkillsManager() agent_config = load_agent_workspace_config( skills_manager.get_agent_asset_dir(config_name, agent_id) / "agent.yaml", ) bootstrap_config = get_bootstrap_config_for_run( skills_manager.project_root, config_name, ) override = bootstrap_config.agent_override(agent_id) active_groups = override.get( "active_tool_groups", agent_config.active_tool_groups or profile.get("active_tool_groups", []), ) disabled_groups = set(agent_config.disabled_tool_groups) if disabled_groups: active_groups = [ group_name for group_name in active_groups if group_name not in disabled_groups ] toolkit = Toolkit( agent_skill_instruction=( "You have access to project skills. Each skill lives in a " "directory and is described by SKILL.md. Follow the skill instructions " "when they are relevant to the current task." ), agent_skill_template="- {name} (dir: {dir}): {description}", ) if agent_id.endswith("_analyst"): _register_analysis_tool_groups(toolkit) elif agent_id == "portfolio_manager" and owner is not None: _register_portfolio_tool_groups(toolkit, owner) elif agent_id == "risk_manager": _register_risk_tool_groups(toolkit) if active_skill_dirs is None: skill_names = skills_manager.resolve_agent_skill_names( config_name=config_name, agent_id=agent_id, default_skills=profile.get("skills", []), ) active_skill_dirs = [ skills_manager.get_agent_active_root(config_name, agent_id) / skill_name for skill_name in skill_names ] for skill_dir in active_skill_dirs: toolkit.register_agent_skill(str(skill_dir)) apply_skill_tool_restrictions(toolkit, active_skill_dirs) if active_groups: toolkit.update_tool_groups(group_names=active_groups, active=True) return toolkit def create_toolkit_from_workspace( agent_id: str, config_name: str, owner: Any = None, include_builtin: bool = True, include_customized: bool = True, include_local: bool = True, active_groups: Optional[List[str]] = None, ) -> Any: """从Agent工作空间创建工具集 这是create_agent_toolkit的增强版本,支持更灵活的技能加载策略。 Args: agent_id: Agent标识符 config_name: 运行配置名称 owner: Agent实例 include_builtin: 是否包含builtin技能 include_customized: 是否包含customized技能 include_local: 是否包含agent-local技能 active_groups: 显式指定的活动工具组 Returns: 配置好的Toolkit实例 """ from agentscope.tool import Toolkit skills_manager = SkillsManager() agent_config = load_agent_workspace_config( skills_manager.get_agent_asset_dir(config_name, agent_id) / "agent.yaml", ) toolkit = Toolkit( agent_skill_instruction=( "You have access to project skills. Each skill lives in a " "directory and is described by SKILL.md. Follow the skill instructions " "when they are relevant to the current task." ), agent_skill_template="- {name} (dir: {dir}): {description}", ) # 注册Agent类型的默认工具组 if agent_id.endswith("_analyst"): _register_analysis_tool_groups(toolkit) elif agent_id == "portfolio_manager" and owner is not None: _register_portfolio_tool_groups(toolkit, owner) elif agent_id == "risk_manager": _register_risk_tool_groups(toolkit) # 收集所有要加载的技能目录 skill_dirs: List[Path] = [] # 1. 从active目录加载已同步的技能 active_root = skills_manager.get_agent_active_root(config_name, agent_id) if active_root.exists(): for skill_dir in sorted(active_root.iterdir()): if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): skill_dirs.append(skill_dir) # 2. 从installed目录加载 installed_root = skills_manager.get_agent_installed_root(config_name, agent_id) if installed_root.exists(): for skill_dir in sorted(installed_root.iterdir()): if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): if skill_dir not in skill_dirs: skill_dirs.append(skill_dir) # 3. 从local目录加载agent-local技能 if include_local: local_root = skills_manager.get_agent_local_root(config_name, agent_id) if local_root.exists(): for skill_dir in sorted(local_root.iterdir()): if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): if skill_dir not in skill_dirs: skill_dirs.append(skill_dir) # 注册技能到toolkit for skill_dir in skill_dirs: toolkit.register_agent_skill(str(skill_dir)) apply_skill_tool_restrictions(toolkit, skill_dirs) # 激活指定的工具组 if active_groups is None: # 从配置中读取 profiles = load_agent_profiles() profile = profiles.get(agent_id, {}) active_groups = agent_config.active_tool_groups or profile.get("active_tool_groups", []) # 应用禁用列表 disabled_groups = set(agent_config.disabled_tool_groups) if disabled_groups: active_groups = [g for g in active_groups if g not in disabled_groups] if active_groups: toolkit.update_tool_groups(group_names=active_groups, active=True) return toolkit def get_toolkit_info(toolkit: Any) -> Dict[str, Any]: """获取工具集信息 Args: toolkit: Toolkit实例 Returns: 工具集信息字典 """ info = { "tool_groups": {}, "skills": [], "tools_count": 0, } # 获取工具组信息 groups = getattr(toolkit, "tool_groups", {}) for name, group in groups.items(): info["tool_groups"][name] = { "description": getattr(group, "description", ""), "active": getattr(group, "active", False), "tools": [t.name for t in getattr(group, "tools", [])], } info["tools_count"] += len(getattr(group, "tools", [])) # 获取技能信息 skills = getattr(toolkit, "agent_skills", []) for skill in skills: info["skills"].append({ "name": getattr(skill, "name", "unknown"), "path": getattr(skill, "path", ""), "description": getattr(skill, "description", ""), }) return info def refresh_toolkit_skills( toolkit: Any, agent_id: str, config_name: str, ) -> None: """刷新工具集中的技能 重新从工作空间加载技能,用于运行时技能变更。 Args: toolkit: Toolkit实例 agent_id: Agent标识符 config_name: 运行配置名称 """ skills_manager = SkillsManager() # 清除现有技能 if hasattr(toolkit, "agent_skills"): toolkit.agent_skills.clear() # 重新加载active技能 active_root = skills_manager.get_agent_active_root(config_name, agent_id) if active_root.exists(): for skill_dir in sorted(active_root.iterdir()): if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): toolkit.register_agent_skill(str(skill_dir)) # 重新加载local技能 local_root = skills_manager.get_agent_local_root(config_name, agent_id) if local_root.exists(): for skill_dir in sorted(local_root.iterdir()): if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): toolkit.register_agent_skill(str(skill_dir)) def apply_skill_tool_restrictions(toolkit: Any, skill_dirs: List[Path]) -> None: """Apply per-skill allowed_tools / denied_tools restrictions to a toolkit. If a skill specifies allowed_tools, only those tools are accessible when that skill is active. If a skill specifies denied_tools, those tools are removed regardless of allowed_tools. Denied tools take precedence. This function annotates the toolkit with a _skill_tool_restrictions map that downstream code can consult when resolving available tools. Args: toolkit: The agentscope Toolkit instance. skill_dirs: List of skill directory paths to inspect. """ restrictions: Dict[str, Dict[str, Set[str]]] = {} for skill_dir in skill_dirs: metadata = parse_skill_metadata(skill_dir, source="active") if not metadata.allowed_tools and not metadata.denied_tools: continue restrictions[skill_dir.name] = { "allowed": set(metadata.allowed_tools), "denied": set(metadata.denied_tools), } if hasattr(toolkit, "agent_skills"): for skill in toolkit.agent_skills: skill_name = getattr(skill, "name", "") or "" if skill_name in restrictions: setattr( skill, "_tool_allowed", restrictions[skill_name]["allowed"], ) setattr( skill, "_tool_denied", restrictions[skill_name]["denied"], ) def get_skill_effective_tools(skill: Any) -> Optional[Set[str]]: """Return the effective tool set for a skill after applying restrictions. If the skill has no restrictions (no allowed_tools / denied_tools), returns None to indicate "all tools allowed". If allowed_tools is set, returns only those tools minus denied_tools. If only denied_tools is set, returns all tools minus denied_tools. Args: skill: A skill object previously registered via register_agent_skill. Returns: A set of allowed tool names, or None if unrestricted. """ allowed = getattr(skill, "_tool_allowed", None) denied = getattr(skill, "_tool_denied", set()) if allowed is None: return None effective = allowed - denied return effective def filter_toolkit_by_skill( toolkit: Any, skill_name: str, ) -> Set[str]: """Return the set of tool names that are accessible for a given skill. Args: toolkit: The agentscope Toolkit instance. skill_name: Name of the skill to query. Returns: Set of allowed tool names, or all registered tool names if unrestricted. """ if not hasattr(toolkit, "agent_skills"): return set() for skill in toolkit.agent_skills: name = getattr(skill, "name", "") or "" if name != skill_name: continue effective = get_skill_effective_tools(skill) if effective is None: return set() return effective return set()