# -*- coding: utf-8 -*- """Initialize run-scoped agent workspace assets under `runs//`.""" from pathlib import Path from typing import Dict, Iterable, Optional import yaml from .skills_manager import SkillsManager from .team_pipeline_config import ensure_team_pipeline_config class RunWorkspaceManager: """Create and maintain run-level prompt asset files for each agent.""" def __init__(self, project_root: Optional[Path] = None): self.skills_manager = SkillsManager(project_root=project_root) self.project_root = self.skills_manager.project_root def get_run_dir(self, config_name: str) -> Path: return self.project_root / "runs" / config_name def ensure_run_workspace(self, config_name: str) -> Path: run_dir = self.get_run_dir(config_name) run_dir.mkdir(parents=True, exist_ok=True) self.skills_manager.ensure_activation_manifest(config_name) ensure_team_pipeline_config( project_root=self.project_root, config_name=config_name, default_analysts=[ "fundamentals_analyst", "technical_analyst", "sentiment_analyst", "valuation_analyst", ], ) bootstrap_path = run_dir / "BOOTSTRAP.md" if not bootstrap_path.exists(): bootstrap_path.write_text( "---\n" "tickers:\n" " - AAPL\n" " - MSFT\n" " - GOOGL\n" " - AMZN\n" " - NVDA\n" " - META\n" " - TSLA\n" " - AMD\n" " - NFLX\n" " - AVGO\n" " - PLTR\n" " - COIN\n" "initial_cash: 100000\n" "margin_requirement: 0.0\n" "enable_memory: false\n" "max_comm_cycles: 2\n" "agent_overrides: {}\n" "---\n\n" "# Bootstrap\n\n" "Use this file to describe run-specific setup notes, preferred tickers,\n" "risk bounds, or strategy constraints before the first execution.\n\n" "The YAML front matter above is machine-readable runtime configuration.\n" "The markdown body below is injected into agent prompts as run context.\n", encoding="utf-8", ) return run_dir def bootstrap_path(self, config_name: str) -> Path: return self.get_run_dir(config_name) / "BOOTSTRAP.md" def ensure_agent_assets( self, config_name: str, agent_id: str, file_contents: Optional[Dict[str, str]] = None, persona: Optional[Dict[str, object]] = None, ) -> Path: asset_dir = self.skills_manager.get_agent_asset_dir( config_name, agent_id, ) asset_dir.mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "installed").mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "active").mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "disabled").mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "local").mkdir(parents=True, exist_ok=True) file_contents = file_contents or self.build_default_agent_files(agent_id=agent_id) for filename, content in file_contents.items(): legacy_contents = self.build_legacy_agent_file_variants( agent_id=agent_id, filename=filename, persona=persona, ) self._ensure_file(asset_dir / filename, content, legacy_contents=legacy_contents) self._ensure_agent_yaml( asset_dir / "agent.yaml", agent_id=agent_id, ) return asset_dir def build_default_agent_files( self, *, agent_id: str, persona: Optional[Dict[str, object]] = None, ) -> Dict[str, str]: """Build default workspace markdown files for one agent.""" if agent_id.endswith("_analyst"): return self._build_analyst_files(agent_id=agent_id, persona=persona or {}) if agent_id == "portfolio_manager": return self._build_portfolio_manager_files() if agent_id == "risk_manager": return self._build_risk_manager_files() return self._build_generic_files(agent_id=agent_id) def build_legacy_agent_file_variants( self, *, agent_id: str, filename: str, persona: Optional[Dict[str, object]] = None, ) -> list[str]: """Return known generated legacy variants safe to upgrade in-place.""" persona = persona or {} variants: list[dict[str, str]] = [ self._build_legacy_english_files(agent_id=agent_id), self._build_previous_chinese_files(agent_id=agent_id, persona=persona), ] values: list[str] = [] for item in variants: content = item.get(filename) if content: values.append(content) return values def load_agent_file( self, *, config_name: str, agent_id: str, filename: str, ) -> str: """Load one run-scoped agent workspace file.""" path = self.skills_manager.get_agent_asset_dir(config_name, agent_id) / filename if not path.exists(): raise FileNotFoundError(f"File not found: {filename}") return path.read_text(encoding="utf-8") def update_agent_file( self, *, config_name: str, agent_id: str, filename: str, content: str, ) -> None: """Write one run-scoped agent workspace file.""" asset_dir = self.skills_manager.get_agent_asset_dir(config_name, agent_id) asset_dir.mkdir(parents=True, exist_ok=True) path = asset_dir / filename path.write_text(content, encoding="utf-8") def initialize_default_assets( self, config_name: str, agent_ids: Iterable[str], analyst_personas: Optional[Dict[str, Dict]] = None, ) -> None: self.ensure_run_workspace(config_name) analyst_personas = analyst_personas or {} for agent_id in agent_ids: if agent_id.endswith("_analyst"): persona = analyst_personas.get(agent_id, {}) file_contents = self.build_default_agent_files( agent_id=agent_id, persona=persona, ) else: persona = None file_contents = self.build_default_agent_files(agent_id=agent_id) asset_dir = self.skills_manager.get_agent_asset_dir(config_name, agent_id) asset_dir.mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "installed").mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "active").mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "disabled").mkdir(parents=True, exist_ok=True) (asset_dir / "skills" / "local").mkdir(parents=True, exist_ok=True) for filename, content in file_contents.items(): self._ensure_file( asset_dir / filename, content, legacy_contents=self.build_legacy_agent_file_variants( agent_id=agent_id, filename=filename, persona=persona, ), ) self._ensure_agent_yaml(asset_dir / "agent.yaml", agent_id=agent_id) @staticmethod def _ensure_file(path: Path, content: str, *, legacy_contents: Optional[list[str]] = None) -> None: if not path.exists(): path.write_text(content, encoding="utf-8") return existing = path.read_text(encoding="utf-8") normalized_existing = existing.strip() candidates = {item.strip() for item in (legacy_contents or []) if item and item.strip()} if normalized_existing in candidates: path.write_text(content, encoding="utf-8") @staticmethod def _build_generic_files(agent_id: str) -> Dict[str, str]: return { "SOUL.md": ( "# Soul\n\n" f"你是 `{agent_id}`,语气冷静、客观、专业。保持清晰推理,优先基于数据而不是情绪下结论。\n" ), "PROFILE.md": ( "# Profile\n\n" "记录这个 agent 长期稳定的分析风格、偏好、优势与盲点。\n" ), "AGENTS.md": ( "# Agent Guide\n\n" "工作要求:\n" "- 优先使用已激活的技能和工具\n" "- 结论要明确,过程要可追溯\n" "- 与其他 agent 协作时保持输入输出简洁\n" "- 最终输出必须使用简体中文;如需引用英文术语,仅保留专有名词,解释和结论必须用中文\n" ), "POLICY.md": ( "# Policy\n\n" "- 给出结论时说明核心驱动因素\n" "- 明确风险边界和结论失效条件\n" "- 出现反例时需要纳入最终判断\n" "- 不要输出英文报告标题、英文摘要或整段英文正文\n" ), "MEMORY.md": ( "# Memory\n\n" "记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n" ), } @classmethod def _build_analyst_files(cls, *, agent_id: str, persona: Dict[str, object]) -> Dict[str, str]: role_name = str(persona.get("name") or agent_id) focus_items = [ str(item).strip() for item in persona.get("focus", []) if str(item).strip() ] focus_md = "\n".join(f"- {item}" for item in focus_items) or "- 根据当前任务选择最相关的分析维度" description = str(persona.get("description") or "").strip() files = cls._build_generic_files(agent_id) files["SOUL.md"] = ( "# Soul\n\n" f"你是一位专业的{role_name}。\n\n" "保持谦逊和开放,主动寻找与自己观点相悖的证据,并将其纳入最终评估。" "你的分析要体现持续演化的投资哲学,而不是一次性的结论。\n" ) files["PROFILE.md"] = ( "# Profile\n\n" f"角色定位:{role_name}\n\n" "你的关注重点:\n" f"{focus_md}\n\n" "角色说明:\n" f"{description or '围绕最关键的基本面、技术面、情绪面或估值因素形成高质量判断。'}\n" ) files["AGENTS.md"] = ( "# Agent Guide\n\n" "分析流程:\n" "- 优先识别真正驱动价值或价格变化的核心变量\n" "- 使用相关工具和技能补足证据链\n" "- 给出可验证、可复查、可执行的分析结果\n" "- 在团队讨论中清晰表达你的论点和反论点\n\n" "输出要求:\n" "- 给出明确投资信号:看涨、看跌或中性\n" "- 包含置信度(0-100)\n" "- 如果你确定要分享最终分析,请先给出结论,再给出推理依据\n" "- 最终输出必须使用简体中文,不要生成英文版 analysis report\n" ) files["POLICY.md"] = ( "# Policy\n\n" "- 深化你的投资逻辑,确保每项建议都有清晰、可追溯、可重复的依据\n" "- 明确风险边界:在什么具体情况下当前结论会失效\n" "- 做逆向测试:说明市场主流共识与你的不同点\n" "- 每次分析后反思这次案例如何验证或挑战你现有的信念\n" "- 即使输入新闻或财报原文是英文,最终表达也必须用中文\n" ) return files @classmethod def _build_portfolio_manager_files(cls) -> Dict[str, str]: files = cls._build_generic_files("portfolio_manager") files["SOUL.md"] = ( "# Soul\n\n" "你是一位负责做出投资决策的投资组合经理。你需要综合多个分析视角," "做出保守、明确、资本约束下可执行的组合决策。\n" ) files["PROFILE.md"] = ( "# Profile\n\n" "核心职责:\n" "- 分析分析师和风险管理经理的输入\n" "- 基于信号和市场情境做出投资决策\n" "- 使用可用工具记录每个 ticker 的决策\n" ) files["AGENTS.md"] = ( "# Agent Guide\n\n" "决策框架:\n" "- 审阅分析以理解市场观点\n" "- 在做决策前先考虑风险警告\n" "- 评估当前投资组合持仓、现金与保证金占用\n" "- 在做最终决策前,先判断当前团队是否足以覆盖任务;如果覆盖不足,不要勉强给结论,先扩编团队\n" "- 当现有团队覆盖不足、观点分歧过大、或出现新的专业分析需求时,优先考虑动态创建合适的分析师,再继续讨论\n" "- 决策必须与整体投资目标和风险约束一致\n\n" "动态扩编触发条件:\n" "- 出现当前团队未覆盖的研究领域:期权、宏观、行业专项、事件驱动、监管冲击、加密资产、商品链、特殊市场结构\n" "- 关键 ticker 的结论依赖某种专业知识,但现有 analyst 无法提供直接证据链\n" "- 分析师之间存在明显冲突,且仅靠风险经理无法完成裁决\n" "- 你需要第二个同类型但不同风格的 analyst 来验证一个高风险假设\n\n" "决策类型:\n" '- `long`:看涨,建议买入\n' '- `short`:看跌,建议卖出或做空\n' '- `hold`:中性,维持当前持仓\n\n' "输出要求:\n" "- 触发扩编条件时,必须先使用动态团队工具创建分析师,并在继续决策前吸收其分析输入\n" "- 不允许口头声称“需要更多分析”但不实际调用创建工具\n" "- 使用 `make_decision` 工具记录每个股票的最终决策\n" "- 记录完成后给出投资逻辑总结\n" "- 最终总结必须使用简体中文\n" ) files["POLICY.md"] = ( "# Policy\n\n" "- 在决定数量时考虑可用现金,不要超出现金允许范围\n" "- 考虑做空头寸的保证金要求\n" "- 仓位规模相对于组合总资产保持保守\n" "- 当任务涉及当前团队未覆盖的领域(如期权、宏观、行业专项、事件驱动、加密资产等)时,应优先创建或克隆对应分析师,而不是勉强用现有团队输出低质量结论\n" "- 当分析师之间长期存在高冲突且缺乏裁决信息时,应考虑增加一个补充视角的分析师\n" "- 如果你已经识别出覆盖缺口,却没有调用动态团队工具补齐团队,就不应直接输出高置信度交易决策\n" "- 对新创建分析师的输出必须纳入本轮决策依据,不能创建后忽略\n" "- 始终为决策提供清晰理由\n" "- 不要输出英文投资报告或英文结论\n" ) return files @classmethod def _build_risk_manager_files(cls) -> Dict[str, str]: files = cls._build_generic_files("risk_manager") files["SOUL.md"] = ( "# Soul\n\n" "你是一位专业的风险管理经理,负责监控投资组合风险并提供风险警告。" "你的目标不是输出空泛的谨慎,而是给出量化、可执行、可优先级排序的风险意见。\n" ) files["PROFILE.md"] = ( "# Profile\n\n" "核心职责:\n" "- 监控投资组合敞口和集中度风险\n" "- 评估仓位规模相对于波动性是否合理\n" "- 评估保证金使用和杠杆水平\n" "- 识别潜在风险因素并提供警告\n" "- 基于市场条件建议仓位限制\n" ) files["AGENTS.md"] = ( "# Agent Guide\n\n" "决策流程:\n" "- 优先使用可用的风险工具量化集中度、波动率和保证金压力\n" "- 结合工具结果与当前市场上下文做判断\n" "- 生成可操作的风险警告和仓位限制建议\n" "- 为风险评估提供清晰理由\n\n" "输出要求:\n" "- 风险评估要简洁但全面\n" "- 按严重程度优先排序警告\n" "- 提供具体、可操作的建议\n" "- 尽可能包含量化指标\n" "- 最终风险结论必须使用简体中文\n" ) files["POLICY.md"] = ( "# Policy\n\n" "- 先量化,再判断,不要只给抽象风险表述\n" "- 高严重度风险必须先说\n" "- 最终结论需要明确仓位限制或调整建议\n" "- 不要输出英文风险报告或英文摘要\n" ) return files @staticmethod def _build_legacy_english_files(agent_id: str) -> Dict[str, str]: policy_tail = "Optional run-scoped constraints, limits, or strategy policy.\n\n" if agent_id == "portfolio_manager": policy_tail += "Respect cash, margin, and portfolio concentration constraints before recording decisions.\n" elif agent_id == "risk_manager": policy_tail += "Use available risk tools before issuing the final risk memo.\n" elif agent_id.endswith("_analyst"): policy_tail += "State a clear signal, confidence, and the conditions that would invalidate the thesis.\n" return { "SOUL.md": "# Soul\n\nDescribe the agent's temperament, reasoning posture, and voice.\n\n", "PROFILE.md": "# Profile\n\nTrack this agent's long-lived investment style, preferences, and strengths.\n\n", "AGENTS.md": "# Agent Guide\n\nDocument how this agent should work, collaborate, and choose tools or skills.\n\n", "POLICY.md": "# Policy\n\n" + policy_tail, "MEMORY.md": "# Memory\n\nStore durable lessons, heuristics, and reminders for this agent.\n\n", } @classmethod def _build_previous_chinese_files(cls, *, agent_id: str, persona: Dict[str, object]) -> Dict[str, str]: if agent_id.endswith("_analyst"): role_name = str(persona.get("name") or agent_id) focus_items = [ str(item).strip() for item in persona.get("focus", []) if str(item).strip() ] focus_md = "\n".join(f"- {item}" for item in focus_items) or "- 根据当前任务选择最相关的分析维度" description = str(persona.get("description") or "").strip() return { "SOUL.md": ( "# Soul\n\n" f"你是一位专业的{role_name}。\n\n" "保持谦逊和开放,主动寻找与自己观点相悖的证据,并将其纳入最终评估。" "你的分析要体现持续演化的投资哲学,而不是一次性的结论。\n" ), "PROFILE.md": ( "# Profile\n\n" f"角色定位:{role_name}\n\n" "你的关注重点:\n" f"{focus_md}\n\n" "角色说明:\n" f"{description or '围绕最关键的基本面、技术面、情绪面或估值因素形成高质量判断。'}\n" ), "AGENTS.md": ( "# Agent Guide\n\n" "分析流程:\n" "- 优先识别真正驱动价值或价格变化的核心变量\n" "- 使用相关工具和技能补足证据链\n" "- 给出可验证、可复查、可执行的分析结果\n" "- 在团队讨论中清晰表达你的论点和反论点\n\n" "输出要求:\n" "- 给出明确投资信号:看涨、看跌或中性\n" "- 包含置信度(0-100)\n" "- 如果你确定要分享最终分析,请先给出结论,再给出推理依据\n" ), "POLICY.md": ( "# Policy\n\n" "- 深化你的投资逻辑,确保每项建议都有清晰、可追溯、可重复的依据\n" "- 明确风险边界:在什么具体情况下当前结论会失效\n" "- 做逆向测试:说明市场主流共识与你的不同点\n" "- 每次分析后反思这次案例如何验证或挑战你现有的信念\n" ), "MEMORY.md": "# Memory\n\n记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n", } if agent_id == "portfolio_manager": return { "SOUL.md": "# Soul\n\n你是一位负责做出投资决策的投资组合经理。你需要综合多个分析视角,做出保守、明确、资本约束下可执行的组合决策。\n", "PROFILE.md": "# Profile\n\n核心职责:\n- 分析分析师和风险管理经理的输入\n- 基于信号和市场情境做出投资决策\n- 使用可用工具记录每个 ticker 的决策\n", "AGENTS.md": "# Agent Guide\n\n决策框架:\n- 审阅分析以理解市场观点\n- 在做决策前先考虑风险警告\n- 评估当前投资组合持仓、现金与保证金占用\n- 决策必须与整体投资目标和风险约束一致\n\n决策类型:\n- `long`:看涨,建议买入\n- `short`:看跌,建议卖出或做空\n- `hold`:中性,维持当前持仓\n\n输出要求:\n- 使用 `make_decision` 工具记录每个股票的最终决策\n- 记录完成后给出投资逻辑总结\n", "POLICY.md": "# Policy\n\n- 在决定数量时考虑可用现金,不要超出现金允许范围\n- 考虑做空头寸的保证金要求\n- 仓位规模相对于组合总资产保持保守\n- 始终为决策提供清晰理由\n", "MEMORY.md": "# Memory\n\n记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n", } if agent_id == "risk_manager": return { "SOUL.md": "# Soul\n\n你是一位专业的风险管理经理,负责监控投资组合风险并提供风险警告。你的目标不是输出空泛的谨慎,而是给出量化、可执行、可优先级排序的风险意见。\n", "PROFILE.md": "# Profile\n\n核心职责:\n- 监控投资组合敞口和集中度风险\n- 评估仓位规模相对于波动性是否合理\n- 评估保证金使用和杠杆水平\n- 识别潜在风险因素并提供警告\n- 基于市场条件建议仓位限制\n", "AGENTS.md": "# Agent Guide\n\n决策流程:\n- 优先使用可用的风险工具量化集中度、波动率和保证金压力\n- 结合工具结果与当前市场上下文做判断\n- 生成可操作的风险警告和仓位限制建议\n- 为风险评估提供清晰理由\n\n输出要求:\n- 风险评估要简洁但全面\n- 按严重程度优先排序警告\n- 提供具体、可操作的建议\n- 尽可能包含量化指标\n", "POLICY.md": "# Policy\n\n- 先量化,再判断,不要只给抽象风险表述\n- 高严重度风险必须先说\n- 最终结论需要明确仓位限制或调整建议\n", "MEMORY.md": "# Memory\n\n记录可复用的经验、失误复盘、有效启发式和需要持续跟踪的提醒。\n", } return cls._build_legacy_english_files(agent_id) @staticmethod def _ensure_agent_yaml(path: Path, agent_id: str) -> None: if path.exists(): return payload = { "agent_id": agent_id, "prompt_files": [ "SOUL.md", "PROFILE.md", "AGENTS.md", "POLICY.md", "MEMORY.md", ], "enabled_skills": [], "disabled_skills": [], "active_tool_groups": [], "disabled_tool_groups": [], } path.write_text( yaml.safe_dump(payload, allow_unicode=True, sort_keys=False), encoding="utf-8", ) # Backward-compatible alias: many runtime paths still import WorkspaceManager # from this module when they mean the run-scoped manager. WorkspaceManager = RunWorkspaceManager