Files
evotraders/backend/agents/workspace_manager.py
cillin 16b54d5ccc feat(agent): complete EvoAgent integration for all 6 agent roles
Migrate all agent roles from Legacy to EvoAgent architecture:
- fundamentals_analyst, technical_analyst, sentiment_analyst, valuation_analyst
- risk_manager, portfolio_manager

Key changes:
- EvoAgent now supports Portfolio Manager compatibility methods (_make_decision,
  get_decisions, get_portfolio_state, load_portfolio_state, update_portfolio)
- Add UnifiedAgentFactory for centralized agent creation
- ToolGuard with batch approval API and WebSocket broadcast
- Legacy agents marked deprecated (AnalystAgent, RiskAgent, PMAgent)
- Remove backend/agents/compat.py migration shim
- Add run_id alongside workspace_id for semantic clarity
- Complete integration test coverage (13 tests)
- All smoke tests passing for 6 agent roles

Constraint: Must maintain backward compatibility with existing run configs
Constraint: Memory support must work with EvoAgent (no fallback to Legacy)
Rejected: Separate PM implementation for EvoAgent | unified approach cleaner
Confidence: high
Scope-risk: broad
Directive: EVO_AGENT_IDS env var still respected but defaults to all roles
Not-tested: Kubernetes sandbox mode for skill execution
2026-04-02 00:55:08 +08:00

485 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""Initialize run-scoped agent workspace assets under `runs/<run_id>/`."""
from pathlib import Path
from typing import Dict, Iterable, Optional
import yaml
from .skills_manager import SkillsManager
from .team_pipeline_config import ensure_team_pipeline_config
class RunWorkspaceManager:
"""Create and maintain run-level prompt asset files for each agent."""
def __init__(self, project_root: Optional[Path] = None):
self.skills_manager = SkillsManager(project_root=project_root)
self.project_root = self.skills_manager.project_root
def get_run_dir(self, config_name: str) -> Path:
return self.project_root / "runs" / config_name
def ensure_run_workspace(self, config_name: str) -> Path:
run_dir = self.get_run_dir(config_name)
run_dir.mkdir(parents=True, exist_ok=True)
self.skills_manager.ensure_activation_manifest(config_name)
ensure_team_pipeline_config(
project_root=self.project_root,
config_name=config_name,
default_analysts=[
"fundamentals_analyst",
"technical_analyst",
"sentiment_analyst",
"valuation_analyst",
],
)
bootstrap_path = run_dir / "BOOTSTRAP.md"
if not bootstrap_path.exists():
bootstrap_path.write_text(
"---\n"
"tickers:\n"
" - AAPL\n"
" - MSFT\n"
" - GOOGL\n"
" - AMZN\n"
" - NVDA\n"
" - META\n"
" - TSLA\n"
" - AMD\n"
" - NFLX\n"
" - AVGO\n"
" - PLTR\n"
" - COIN\n"
"initial_cash: 100000\n"
"margin_requirement: 0.0\n"
"enable_memory: false\n"
"max_comm_cycles: 2\n"
"agent_overrides: {}\n"
"---\n\n"
"# Bootstrap\n\n"
"Use this file to describe run-specific setup notes, preferred tickers,\n"
"risk bounds, or strategy constraints before the first execution.\n\n"
"The YAML front matter above is machine-readable runtime configuration.\n"
"The markdown body below is injected into agent prompts as run context.\n",
encoding="utf-8",
)
return run_dir
def bootstrap_path(self, config_name: str) -> Path:
return self.get_run_dir(config_name) / "BOOTSTRAP.md"
def ensure_agent_assets(
self,
config_name: str,
agent_id: str,
file_contents: Optional[Dict[str, str]] = None,
persona: Optional[Dict[str, object]] = None,
) -> Path:
asset_dir = self.skills_manager.get_agent_asset_dir(
config_name,
agent_id,
)
asset_dir.mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "installed").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "active").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "disabled").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "local").mkdir(parents=True, exist_ok=True)
file_contents = file_contents or self.build_default_agent_files(agent_id=agent_id)
for filename, content in file_contents.items():
legacy_contents = self.build_legacy_agent_file_variants(
agent_id=agent_id,
filename=filename,
persona=persona,
)
self._ensure_file(asset_dir / filename, content, legacy_contents=legacy_contents)
self._ensure_agent_yaml(
asset_dir / "agent.yaml",
agent_id=agent_id,
)
return asset_dir
def build_default_agent_files(
self,
*,
agent_id: str,
persona: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
"""Build default workspace markdown files for one agent."""
if agent_id.endswith("_analyst"):
return self._build_analyst_files(agent_id=agent_id, persona=persona or {})
if agent_id == "portfolio_manager":
return self._build_portfolio_manager_files()
if agent_id == "risk_manager":
return self._build_risk_manager_files()
return self._build_generic_files(agent_id=agent_id)
def build_legacy_agent_file_variants(
self,
*,
agent_id: str,
filename: str,
persona: Optional[Dict[str, object]] = None,
) -> list[str]:
"""Return known generated legacy variants safe to upgrade in-place."""
persona = persona or {}
variants: list[dict[str, str]] = [
self._build_legacy_english_files(agent_id=agent_id),
self._build_previous_chinese_files(agent_id=agent_id, persona=persona),
]
values: list[str] = []
for item in variants:
content = item.get(filename)
if content:
values.append(content)
return values
def load_agent_file(
self,
*,
config_name: str,
agent_id: str,
filename: str,
) -> str:
"""Load one run-scoped agent workspace file."""
path = self.skills_manager.get_agent_asset_dir(config_name, agent_id) / filename
if not path.exists():
raise FileNotFoundError(f"File not found: {filename}")
return path.read_text(encoding="utf-8")
def update_agent_file(
self,
*,
config_name: str,
agent_id: str,
filename: str,
content: str,
) -> None:
"""Write one run-scoped agent workspace file."""
asset_dir = self.skills_manager.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
path = asset_dir / filename
path.write_text(content, encoding="utf-8")
def initialize_default_assets(
self,
config_name: str,
agent_ids: Iterable[str],
analyst_personas: Optional[Dict[str, Dict]] = None,
) -> None:
self.ensure_run_workspace(config_name)
analyst_personas = analyst_personas or {}
for agent_id in agent_ids:
if agent_id.endswith("_analyst"):
persona = analyst_personas.get(agent_id, {})
file_contents = self.build_default_agent_files(
agent_id=agent_id,
persona=persona,
)
else:
persona = None
file_contents = self.build_default_agent_files(agent_id=agent_id)
asset_dir = self.skills_manager.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "installed").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "active").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "disabled").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "local").mkdir(parents=True, exist_ok=True)
for filename, content in file_contents.items():
self._ensure_file(
asset_dir / filename,
content,
legacy_contents=self.build_legacy_agent_file_variants(
agent_id=agent_id,
filename=filename,
persona=persona,
),
)
self._ensure_agent_yaml(asset_dir / "agent.yaml", agent_id=agent_id)
@staticmethod
def _ensure_file(path: Path, content: str, *, legacy_contents: Optional[list[str]] = None) -> None:
if not path.exists():
path.write_text(content, encoding="utf-8")
return
existing = path.read_text(encoding="utf-8")
normalized_existing = existing.strip()
candidates = {item.strip() for item in (legacy_contents or []) if item and item.strip()}
if normalized_existing in candidates:
path.write_text(content, encoding="utf-8")
@staticmethod
def _build_generic_files(agent_id: str) -> Dict[str, str]:
return {
"SOUL.md": (
"# Soul\n\n"
f"你是 `{agent_id}`,语气冷静、客观、专业。保持清晰推理,优先基于数据而不是情绪下结论。\n"
),
"PROFILE.md": (
"# Profile\n\n"
"记录这个 agent 长期稳定的分析风格、偏好、优势与盲点。\n"
),
"AGENTS.md": (
"# Agent Guide\n\n"
"工作要求:\n"
"- 优先使用已激活的技能和工具\n"
"- 结论要明确,过程要可追溯\n"
"- 与其他 agent 协作时保持输入输出简洁\n"
"- 最终输出必须使用简体中文;如需引用英文术语,仅保留专有名词,解释和结论必须用中文\n"
),
"POLICY.md": (
"# Policy\n\n"
"- 给出结论时说明核心驱动因素\n"
"- 明确风险边界和结论失效条件\n"
"- 出现反例时需要纳入最终判断\n"
"- 不要输出英文报告标题、英文摘要或整段英文正文\n"
),
"MEMORY.md": (
"# Memory\n\n"
"记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n"
),
}
@classmethod
def _build_analyst_files(cls, *, agent_id: str, persona: Dict[str, object]) -> Dict[str, str]:
role_name = str(persona.get("name") or agent_id)
focus_items = [
str(item).strip()
for item in persona.get("focus", [])
if str(item).strip()
]
focus_md = "\n".join(f"- {item}" for item in focus_items) or "- 根据当前任务选择最相关的分析维度"
description = str(persona.get("description") or "").strip()
files = cls._build_generic_files(agent_id)
files["SOUL.md"] = (
"# Soul\n\n"
f"你是一位专业的{role_name}\n\n"
"保持谦逊和开放,主动寻找与自己观点相悖的证据,并将其纳入最终评估。"
"你的分析要体现持续演化的投资哲学,而不是一次性的结论。\n"
)
files["PROFILE.md"] = (
"# Profile\n\n"
f"角色定位:{role_name}\n\n"
"你的关注重点:\n"
f"{focus_md}\n\n"
"角色说明:\n"
f"{description or '围绕最关键的基本面、技术面、情绪面或估值因素形成高质量判断。'}\n"
)
files["AGENTS.md"] = (
"# Agent Guide\n\n"
"分析流程:\n"
"- 优先识别真正驱动价值或价格变化的核心变量\n"
"- 使用相关工具和技能补足证据链\n"
"- 给出可验证、可复查、可执行的分析结果\n"
"- 在团队讨论中清晰表达你的论点和反论点\n\n"
"输出要求:\n"
"- 给出明确投资信号:看涨、看跌或中性\n"
"- 包含置信度0-100\n"
"- 如果你确定要分享最终分析,请先给出结论,再给出推理依据\n"
"- 最终输出必须使用简体中文,不要生成英文版 analysis report\n"
)
files["POLICY.md"] = (
"# Policy\n\n"
"- 深化你的投资逻辑,确保每项建议都有清晰、可追溯、可重复的依据\n"
"- 明确风险边界:在什么具体情况下当前结论会失效\n"
"- 做逆向测试:说明市场主流共识与你的不同点\n"
"- 每次分析后反思这次案例如何验证或挑战你现有的信念\n"
"- 即使输入新闻或财报原文是英文,最终表达也必须用中文\n"
)
return files
@classmethod
def _build_portfolio_manager_files(cls) -> Dict[str, str]:
files = cls._build_generic_files("portfolio_manager")
files["SOUL.md"] = (
"# Soul\n\n"
"你是一位负责做出投资决策的投资组合经理。你需要综合多个分析视角,"
"做出保守、明确、资本约束下可执行的组合决策。\n"
)
files["PROFILE.md"] = (
"# Profile\n\n"
"核心职责:\n"
"- 分析分析师和风险管理经理的输入\n"
"- 基于信号和市场情境做出投资决策\n"
"- 使用可用工具记录每个 ticker 的决策\n"
)
files["AGENTS.md"] = (
"# Agent Guide\n\n"
"决策框架:\n"
"- 审阅分析以理解市场观点\n"
"- 在做决策前先考虑风险警告\n"
"- 评估当前投资组合持仓、现金与保证金占用\n"
"- 决策必须与整体投资目标和风险约束一致\n\n"
"决策类型:\n"
'- `long`:看涨,建议买入\n'
'- `short`:看跌,建议卖出或做空\n'
'- `hold`:中性,维持当前持仓\n\n'
"输出要求:\n"
"- 使用 `make_decision` 工具记录每个股票的最终决策\n"
"- 记录完成后给出投资逻辑总结\n"
"- 最终总结必须使用简体中文\n"
)
files["POLICY.md"] = (
"# Policy\n\n"
"- 在决定数量时考虑可用现金,不要超出现金允许范围\n"
"- 考虑做空头寸的保证金要求\n"
"- 仓位规模相对于组合总资产保持保守\n"
"- 始终为决策提供清晰理由\n"
"- 不要输出英文投资报告或英文结论\n"
)
return files
@classmethod
def _build_risk_manager_files(cls) -> Dict[str, str]:
files = cls._build_generic_files("risk_manager")
files["SOUL.md"] = (
"# Soul\n\n"
"你是一位专业的风险管理经理,负责监控投资组合风险并提供风险警告。"
"你的目标不是输出空泛的谨慎,而是给出量化、可执行、可优先级排序的风险意见。\n"
)
files["PROFILE.md"] = (
"# Profile\n\n"
"核心职责:\n"
"- 监控投资组合敞口和集中度风险\n"
"- 评估仓位规模相对于波动性是否合理\n"
"- 评估保证金使用和杠杆水平\n"
"- 识别潜在风险因素并提供警告\n"
"- 基于市场条件建议仓位限制\n"
)
files["AGENTS.md"] = (
"# Agent Guide\n\n"
"决策流程:\n"
"- 优先使用可用的风险工具量化集中度、波动率和保证金压力\n"
"- 结合工具结果与当前市场上下文做判断\n"
"- 生成可操作的风险警告和仓位限制建议\n"
"- 为风险评估提供清晰理由\n\n"
"输出要求:\n"
"- 风险评估要简洁但全面\n"
"- 按严重程度优先排序警告\n"
"- 提供具体、可操作的建议\n"
"- 尽可能包含量化指标\n"
"- 最终风险结论必须使用简体中文\n"
)
files["POLICY.md"] = (
"# Policy\n\n"
"- 先量化,再判断,不要只给抽象风险表述\n"
"- 高严重度风险必须先说\n"
"- 最终结论需要明确仓位限制或调整建议\n"
"- 不要输出英文风险报告或英文摘要\n"
)
return files
@staticmethod
def _build_legacy_english_files(agent_id: str) -> Dict[str, str]:
policy_tail = "Optional run-scoped constraints, limits, or strategy policy.\n\n"
if agent_id == "portfolio_manager":
policy_tail += "Respect cash, margin, and portfolio concentration constraints before recording decisions.\n"
elif agent_id == "risk_manager":
policy_tail += "Use available risk tools before issuing the final risk memo.\n"
elif agent_id.endswith("_analyst"):
policy_tail += "State a clear signal, confidence, and the conditions that would invalidate the thesis.\n"
return {
"SOUL.md": "# Soul\n\nDescribe the agent's temperament, reasoning posture, and voice.\n\n",
"PROFILE.md": "# Profile\n\nTrack this agent's long-lived investment style, preferences, and strengths.\n\n",
"AGENTS.md": "# Agent Guide\n\nDocument how this agent should work, collaborate, and choose tools or skills.\n\n",
"POLICY.md": "# Policy\n\n" + policy_tail,
"MEMORY.md": "# Memory\n\nStore durable lessons, heuristics, and reminders for this agent.\n\n",
}
@classmethod
def _build_previous_chinese_files(cls, *, agent_id: str, persona: Dict[str, object]) -> Dict[str, str]:
if agent_id.endswith("_analyst"):
role_name = str(persona.get("name") or agent_id)
focus_items = [
str(item).strip()
for item in persona.get("focus", [])
if str(item).strip()
]
focus_md = "\n".join(f"- {item}" for item in focus_items) or "- 根据当前任务选择最相关的分析维度"
description = str(persona.get("description") or "").strip()
return {
"SOUL.md": (
"# Soul\n\n"
f"你是一位专业的{role_name}\n\n"
"保持谦逊和开放,主动寻找与自己观点相悖的证据,并将其纳入最终评估。"
"你的分析要体现持续演化的投资哲学,而不是一次性的结论。\n"
),
"PROFILE.md": (
"# Profile\n\n"
f"角色定位:{role_name}\n\n"
"你的关注重点:\n"
f"{focus_md}\n\n"
"角色说明:\n"
f"{description or '围绕最关键的基本面、技术面、情绪面或估值因素形成高质量判断。'}\n"
),
"AGENTS.md": (
"# Agent Guide\n\n"
"分析流程:\n"
"- 优先识别真正驱动价值或价格变化的核心变量\n"
"- 使用相关工具和技能补足证据链\n"
"- 给出可验证、可复查、可执行的分析结果\n"
"- 在团队讨论中清晰表达你的论点和反论点\n\n"
"输出要求:\n"
"- 给出明确投资信号:看涨、看跌或中性\n"
"- 包含置信度0-100\n"
"- 如果你确定要分享最终分析,请先给出结论,再给出推理依据\n"
),
"POLICY.md": (
"# Policy\n\n"
"- 深化你的投资逻辑,确保每项建议都有清晰、可追溯、可重复的依据\n"
"- 明确风险边界:在什么具体情况下当前结论会失效\n"
"- 做逆向测试:说明市场主流共识与你的不同点\n"
"- 每次分析后反思这次案例如何验证或挑战你现有的信念\n"
),
"MEMORY.md": "# Memory\n\n记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n",
}
if agent_id == "portfolio_manager":
return {
"SOUL.md": "# Soul\n\n你是一位负责做出投资决策的投资组合经理。你需要综合多个分析视角,做出保守、明确、资本约束下可执行的组合决策。\n",
"PROFILE.md": "# Profile\n\n核心职责:\n- 分析分析师和风险管理经理的输入\n- 基于信号和市场情境做出投资决策\n- 使用可用工具记录每个 ticker 的决策\n",
"AGENTS.md": "# Agent Guide\n\n决策框架:\n- 审阅分析以理解市场观点\n- 在做决策前先考虑风险警告\n- 评估当前投资组合持仓、现金与保证金占用\n- 决策必须与整体投资目标和风险约束一致\n\n决策类型:\n- `long`:看涨,建议买入\n- `short`:看跌,建议卖出或做空\n- `hold`:中性,维持当前持仓\n\n输出要求:\n- 使用 `make_decision` 工具记录每个股票的最终决策\n- 记录完成后给出投资逻辑总结\n",
"POLICY.md": "# Policy\n\n- 在决定数量时考虑可用现金,不要超出现金允许范围\n- 考虑做空头寸的保证金要求\n- 仓位规模相对于组合总资产保持保守\n- 始终为决策提供清晰理由\n",
"MEMORY.md": "# Memory\n\n记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n",
}
if agent_id == "risk_manager":
return {
"SOUL.md": "# Soul\n\n你是一位专业的风险管理经理,负责监控投资组合风险并提供风险警告。你的目标不是输出空泛的谨慎,而是给出量化、可执行、可优先级排序的风险意见。\n",
"PROFILE.md": "# Profile\n\n核心职责:\n- 监控投资组合敞口和集中度风险\n- 评估仓位规模相对于波动性是否合理\n- 评估保证金使用和杠杆水平\n- 识别潜在风险因素并提供警告\n- 基于市场条件建议仓位限制\n",
"AGENTS.md": "# Agent Guide\n\n决策流程:\n- 优先使用可用的风险工具量化集中度、波动率和保证金压力\n- 结合工具结果与当前市场上下文做判断\n- 生成可操作的风险警告和仓位限制建议\n- 为风险评估提供清晰理由\n\n输出要求:\n- 风险评估要简洁但全面\n- 按严重程度优先排序警告\n- 提供具体、可操作的建议\n- 尽可能包含量化指标\n",
"POLICY.md": "# Policy\n\n- 先量化,再判断,不要只给抽象风险表述\n- 高严重度风险必须先说\n- 最终结论需要明确仓位限制或调整建议\n",
"MEMORY.md": "# Memory\n\n记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n",
}
return cls._build_legacy_english_files(agent_id)
@staticmethod
def _ensure_agent_yaml(path: Path, agent_id: str) -> None:
if path.exists():
return
payload = {
"agent_id": agent_id,
"prompt_files": [
"SOUL.md",
"PROFILE.md",
"AGENTS.md",
"POLICY.md",
"MEMORY.md",
],
"enabled_skills": [],
"disabled_skills": [],
"active_tool_groups": [],
"disabled_tool_groups": [],
}
path.write_text(
yaml.safe_dump(payload, allow_unicode=True, sort_keys=False),
encoding="utf-8",
)
# Backward-compatible alias: many runtime paths still import WorkspaceManager
# from this module when they mean the run-scoped manager.
WorkspaceManager = RunWorkspaceManager