Add run-scoped skill and prompt asset management

This commit is contained in:
2026-03-16 00:04:04 +08:00
parent 964d3b6e13
commit 78f133617f
23 changed files with 1309 additions and 109 deletions

View File

@@ -11,9 +11,7 @@ from agentscope.message import Msg
from ..config.constants import ANALYST_TYPES
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache
class AnalystAgent(ReActAgent):
@@ -55,9 +53,10 @@ class AnalystAgent(ReActAgent):
if agent_id is None:
agent_id = analyst_type
self.agent_id = agent_id
self.config = config or {}
self.toolkit = toolkit
sys_prompt = self._load_system_prompt()
kwargs = {
@@ -77,27 +76,11 @@ class AnalystAgent(ReActAgent):
def _load_system_prompt(self) -> str:
"""Load system prompt for analyst"""
personas_config = _prompt_loader.load_yaml_config(
"analyst",
"personas",
)
persona = personas_config.get(self.analyst_type_key, {})
# Get focus items and format as bullet points
focus_items = persona.get("focus", [])
focus_text = "\n".join(f"- {item}" for item in focus_items)
# Get description
description = persona.get("description", "").strip()
return _prompt_loader.load_prompt(
"analyst",
"system",
variables={
"analyst_type": self.analyst_persona,
"focus": focus_text,
"description": description,
},
return build_agent_system_prompt(
agent_id=self.agent_id,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
analyst_type=self.analyst_type_key,
)
async def reply(self, x: Msg = None) -> Msg:
@@ -131,3 +114,15 @@ class AnalystAgent(ReActAgent):
)
return result
def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None:
"""Reload toolkit and system prompt from current run assets."""
from .toolkit_factory import create_agent_toolkit
clear_prompt_factory_cache()
self.toolkit = create_agent_toolkit(
self.agent_id,
self.config.get("config_name", "default"),
active_skill_dirs=active_skill_dirs,
)
self.sys_prompt = self._load_system_prompt()

View File

@@ -12,9 +12,7 @@ from agentscope.message import Msg, TextBlock
from agentscope.tool import Toolkit, ToolResponse
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache
class PMAgent(ReActAgent):
@@ -36,6 +34,9 @@ class PMAgent(ReActAgent):
margin_requirement: float = 0.25,
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
toolkit_factory: Any = None,
toolkit_factory_kwargs: Optional[Dict[str, Any]] = None,
toolkit: Optional[Toolkit] = None,
):
self.config = config or {}
@@ -49,11 +50,28 @@ class PMAgent(ReActAgent):
# Decisions made in current cycle
self._decisions: Dict[str, Dict] = {}
toolkit_factory_kwargs = toolkit_factory_kwargs or {}
self._toolkit_factory = toolkit_factory
self._toolkit_factory_kwargs = toolkit_factory_kwargs
# Create toolkit
toolkit = self._create_toolkit()
# Create toolkit after local state is ready so bound tool methods can be registered.
if toolkit is None:
if toolkit_factory is not None:
toolkit = toolkit_factory(
name,
self.config.get("config_name", "default"),
owner=self,
**toolkit_factory_kwargs,
)
else:
toolkit = self._create_toolkit()
self.toolkit = toolkit
sys_prompt = _prompt_loader.load_prompt("portfolio_manager", "system")
sys_prompt = build_agent_system_prompt(
agent_id=name,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
)
kwargs = {
"name": name,
@@ -186,3 +204,25 @@ class PMAgent(ReActAgent):
def update_portfolio(self, portfolio: Dict[str, Any]):
"""Update portfolio after external execution"""
self.portfolio.update(portfolio)
def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None:
"""Reload toolkit and system prompt from current run assets."""
from .toolkit_factory import create_agent_toolkit
clear_prompt_factory_cache()
toolkit_factory = self._toolkit_factory or create_agent_toolkit
toolkit_kwargs = dict(self._toolkit_factory_kwargs)
if active_skill_dirs is not None:
toolkit_kwargs["active_skill_dirs"] = active_skill_dirs
self.toolkit = toolkit_factory(
self.name,
self.config.get("config_name", "default"),
owner=self,
**toolkit_kwargs,
)
self.sys_prompt = build_agent_system_prompt(
agent_id=self.name,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
)

View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
"""Assemble system prompts from base prompts, run assets, and toolkit context."""
from pathlib import Path
from typing import Any, Optional
from backend.config.bootstrap_config import get_bootstrap_config_for_run
from .prompt_loader import PromptLoader
from .skills_manager import SkillsManager
_prompt_loader = PromptLoader()
def _read_file_if_exists(path: Path) -> str:
if not path.exists() or not path.is_file():
return ""
return path.read_text(encoding="utf-8").strip()
def _append_section(parts: list[str], title: str, content: str) -> None:
content = content.strip()
if content:
parts.append(f"## {title}\n{content}")
def build_agent_system_prompt(
agent_id: str,
config_name: str,
toolkit: Any,
analyst_type: Optional[str] = None,
) -> str:
"""Build the final system prompt for an agent."""
sections: list[str] = []
if analyst_type:
personas_config = _prompt_loader.load_yaml_config(
"analyst",
"personas",
)
persona = personas_config.get(analyst_type, {})
focus_text = "\n".join(
f"- {item}" for item in persona.get("focus", [])
)
description = persona.get("description", "").strip()
base_prompt = _prompt_loader.load_prompt(
"analyst",
"system",
variables={
"analyst_type": persona.get("name", analyst_type),
"focus": focus_text,
"description": description,
},
)
elif agent_id == "portfolio_manager":
base_prompt = _prompt_loader.load_prompt(
"portfolio_manager",
"system",
)
elif agent_id == "risk_manager":
base_prompt = _prompt_loader.load_prompt(
"risk_manager",
"system",
)
else:
raise ValueError(f"Unsupported agent prompt build for: {agent_id}")
sections.append(base_prompt.strip())
skills_manager = SkillsManager()
asset_dir = skills_manager.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
)
_append_section(
sections,
"Bootstrap",
bootstrap_config.prompt_body,
)
_append_section(
sections,
"Role",
_read_file_if_exists(asset_dir / "ROLE.md"),
)
_append_section(
sections,
"Style",
_read_file_if_exists(asset_dir / "STYLE.md"),
)
_append_section(
sections,
"Policy",
_read_file_if_exists(asset_dir / "POLICY.md"),
)
skill_prompt = toolkit.get_agent_skill_prompt()
if skill_prompt:
_append_section(sections, "Skills", str(skill_prompt))
activated_notes = toolkit.get_activated_notes()
if activated_notes:
_append_section(sections, "Tool Usage Notes", str(activated_notes))
return "\n\n".join(section for section in sections if section.strip())
def clear_prompt_factory_cache() -> None:
"""Clear cached prompt and YAML templates before hot reload."""
_prompt_loader.clear_cache()

View File

@@ -8,12 +8,6 @@ fundamentals_analyst:
- "管理层质量和公司治理"
- "行业地位和市场份额"
- "长期投资价值评估"
tools:
- "analyze_profitability"
- "analyze_growth"
- "analyze_financial_health"
- "analyze_valuation_ratios"
- "analyze_efficiency_ratios"
description: |
作为基本面分析师,你专注于:
- 公司财务健康状况和盈利能力
@@ -39,11 +33,6 @@ technical_analyst:
- 支撑/阻力位和关键价格点
- 中短期交易机会
你倾向于选择能够捕捉价格动态和市场趋势的工具,更偏好技术分析类工具。
tools:
- "analyze_trend_following"
- "analyze_momentum"
- "analyze_mean_reversion"
- "analyze_volatility"
sentiment_analyst:
name: "情绪分析师"
@@ -61,10 +50,6 @@ sentiment_analyst:
- 投资者恐慌和贪婪情绪
- 市场预期和心理因素
你倾向于选择能够反映市场情绪和投资者行为的工具,更偏好情绪和行为类工具。
tools:
- "analyze_news_sentiment"
- "analyze_insider_trading"
valuation_analyst:
name: "估值分析师"
focus:
@@ -81,12 +66,6 @@ valuation_analyst:
- 相对估值和绝对估值
- 投资安全边际评估
你倾向于选择能够准确计算公司价值的工具,更偏好估值模型和基本面工具。
tools:
- "dcf_valuation_analysis"
- "owner_earnings_valuation_analysis"
- "ev_ebitda_valuation_analysis"
- "residual_income_valuation_analysis"
comprehensive_analyst:
name: "综合分析师"
focus:
@@ -103,15 +82,3 @@ comprehensive_analyst:
- 提供全面的投资建议
- 适应不同市场环境
你会根据具体情况灵活选择各类工具,追求分析的全面性和准确性。
tools:
- "analyze_profitability"
- "analyze_growth"
- "analyze_financial_health"
- "analyze_valuation_ratios"
- "analyze_efficiency_ratios"
- "analyze_trend_following"
- "analyze_momentum"
- "analyze_mean_reversion"
- "analyze_volatility"
- "analyze_news_sentiment"
- "analyze_insider_trading"

View File

@@ -8,6 +8,8 @@
5. 基于市场条件建议仓位限制
你的决策流程:
1. 优先使用可用的风险工具量化集中度、波动率和保证金压力
2. 结合工具结果与当前市场上下文做判断
3. 生成可操作的风险警告和仓位限制建议
4. 为你的风险评估提供清晰的理由

View File

@@ -11,9 +11,7 @@ from agentscope.message import Msg
from agentscope.tool import Toolkit
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache
class RiskAgent(ReActAgent):
@@ -29,6 +27,7 @@ class RiskAgent(ReActAgent):
name: str = "risk_manager",
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
toolkit: Optional[Toolkit] = None,
):
"""
Initialize Risk Manager Agent
@@ -41,12 +40,14 @@ class RiskAgent(ReActAgent):
long_term_memory: Optional ReMeTaskLongTermMemory instance
"""
self.config = config or {}
self.agent_id = name
if toolkit is None:
toolkit = Toolkit()
self.toolkit = toolkit
sys_prompt = self._load_system_prompt()
# Create dedicated toolkit for this agent
toolkit = Toolkit()
kwargs = {
"name": name,
"sys_prompt": sys_prompt,
@@ -64,9 +65,10 @@ class RiskAgent(ReActAgent):
def _load_system_prompt(self) -> str:
"""Load system prompt for risk manager"""
return _prompt_loader.load_prompt(
"risk_manager",
"system",
return build_agent_system_prompt(
agent_id=self.agent_id,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
)
async def reply(self, x: Msg = None) -> Msg:
@@ -86,3 +88,15 @@ class RiskAgent(ReActAgent):
progress.update_status(self.name, None, "Risk assessment completed")
return result
def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None:
"""Reload toolkit and system prompt from current run assets."""
from .toolkit_factory import create_agent_toolkit
clear_prompt_factory_cache()
self.toolkit = create_agent_toolkit(
self.agent_id,
self.config.get("config_name", "default"),
active_skill_dirs=active_skill_dirs,
)
self.sys_prompt = self._load_system_prompt()

View File

@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
"""Manage builtin/customized/active skill directories for each run."""
from pathlib import Path
import shutil
from typing import Dict, Iterable, List
import yaml
from backend.config.bootstrap_config import get_bootstrap_config_for_run
class SkillsManager:
"""Sync named skills into a run-scoped active skills workspace."""
def __init__(self, project_root: Path | None = None):
self.project_root = (
project_root or Path(__file__).resolve().parents[2]
)
self.builtin_root = self.project_root / "backend" / "skills" / "builtin"
self.customized_root = (
self.project_root / "backend" / "skills" / "customized"
)
self.runs_root = self.project_root / "runs"
def get_active_root(self, config_name: str) -> Path:
return self.runs_root / config_name / "skills" / "active"
def get_activation_manifest_path(self, config_name: str) -> Path:
return self.runs_root / config_name / "skills" / "activation.yaml"
def get_agent_asset_dir(self, config_name: str, agent_id: str) -> Path:
return self.runs_root / config_name / "agents" / agent_id
def ensure_activation_manifest(self, config_name: str) -> Path:
manifest_path = self.get_activation_manifest_path(config_name)
manifest_path.parent.mkdir(parents=True, exist_ok=True)
if not manifest_path.exists():
manifest_path.write_text(
"global_enabled_skills: []\n"
"global_disabled_skills: []\n"
"agent_enabled_skills: {}\n"
"agent_disabled_skills: {}\n",
encoding="utf-8",
)
return manifest_path
def load_activation_manifest(self, config_name: str) -> Dict[str, object]:
manifest_path = self.ensure_activation_manifest(config_name)
with open(manifest_path, "r", encoding="utf-8") as file:
parsed = yaml.safe_load(file) or {}
return parsed if isinstance(parsed, dict) else {}
def _resolve_source_dir(self, skill_name: str) -> Path:
customized_dir = self.customized_root / skill_name
if customized_dir.exists():
return customized_dir
builtin_dir = self.builtin_root / skill_name
if builtin_dir.exists():
return builtin_dir
raise FileNotFoundError(f"Unknown skill: {skill_name}")
def resolve_agent_skill_names(
self,
config_name: str,
agent_id: str,
default_skills: Iterable[str],
) -> List[str]:
"""Resolve final skill names after bootstrap and activation overlays."""
bootstrap = get_bootstrap_config_for_run(self.project_root, config_name)
override = bootstrap.agent_override(agent_id)
skills = list(override.get("skills", list(default_skills)))
manifest = self.load_activation_manifest(config_name)
for skill_name in manifest.get("global_enabled_skills", []):
if skill_name not in skills:
skills.append(skill_name)
for skill_name in manifest.get("agent_enabled_skills", {}).get(agent_id, []):
if skill_name not in skills:
skills.append(skill_name)
disabled = set(manifest.get("global_disabled_skills", []))
disabled.update(
manifest.get("agent_disabled_skills", {}).get(agent_id, []),
)
return [skill for skill in skills if skill not in disabled]
def sync_active_skills(
self,
config_name: str,
skill_names: Iterable[str],
) -> List[Path]:
"""Sync selected skills into the run workspace and return their paths."""
active_root = self.get_active_root(config_name)
active_root.mkdir(parents=True, exist_ok=True)
synced_paths: List[Path] = []
wanted = set(skill_names)
for existing in active_root.iterdir():
if existing.is_dir() and existing.name not in wanted:
shutil.rmtree(existing)
for skill_name in skill_names:
source_dir = self._resolve_source_dir(skill_name)
target_dir = active_root / skill_name
if target_dir.exists():
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
synced_paths.append(target_dir)
return synced_paths
def prepare_active_skills(
self,
config_name: str,
agent_defaults: Dict[str, Iterable[str]],
) -> Dict[str, List[Path]]:
"""Resolve all agent skills, sync the union once, and map paths per agent."""
resolved: Dict[str, List[str]] = {}
union: List[str] = []
for agent_id, default_skills in agent_defaults.items():
resolved_skills = self.resolve_agent_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=default_skills,
)
resolved[agent_id] = resolved_skills
for skill_name in resolved_skills:
if skill_name not in union:
union.append(skill_name)
self.sync_active_skills(config_name=config_name, skill_names=union)
active_root = self.get_active_root(config_name)
return {
agent_id: [active_root / skill_name for skill_name in skill_names]
for agent_id, skill_names in resolved.items()
}

View File

@@ -0,0 +1,197 @@
# -*- coding: utf-8 -*-
"""Toolkit factory following AgentScope's skill + tool group practices."""
from typing import Any, Dict, Iterable
from backend.config.bootstrap_config import get_bootstrap_config_for_run
import yaml
from .skills_manager import SkillsManager
def load_agent_profiles() -> Dict[str, Dict[str, Any]]:
config_path = SkillsManager().project_root / "backend" / "config" / "agent_profiles.yaml"
with open(config_path, "r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def _register_analysis_tool_groups(toolkit: Any) -> None:
from backend.tools.analysis_tools import TOOL_REGISTRY
tool_groups = {
"fundamentals": {
"description": "Financial health, profitability, growth, and efficiency analysis tools.",
"active": False,
"notes": (
"Use these tools to validate business quality, financial resilience, "
"and earnings durability before making directional conclusions."
),
"tools": [
"analyze_profitability",
"analyze_growth",
"analyze_financial_health",
"analyze_efficiency_ratios",
"analyze_valuation_ratios",
"get_financial_metrics_tool",
],
},
"technical": {
"description": "Trend, momentum, mean reversion, and volatility analysis tools.",
"active": False,
"notes": (
"Use these tools to assess timing, price structure, and risk-reward in "
"the current market regime."
),
"tools": [
"analyze_trend_following",
"analyze_momentum",
"analyze_mean_reversion",
"analyze_volatility",
],
},
"sentiment": {
"description": "News sentiment and insider activity analysis tools.",
"active": False,
"notes": (
"Use these tools to capture short-horizon catalysts, sentiment shifts, "
"and behavioral signals around each ticker."
),
"tools": [
"analyze_news_sentiment",
"analyze_insider_trading",
],
},
"valuation": {
"description": "Intrinsic value and relative valuation analysis tools.",
"active": False,
"notes": (
"Use these tools when the task requires fair value estimation, margin of "
"safety analysis, or valuation scenario comparison."
),
"tools": [
"dcf_valuation_analysis",
"owner_earnings_valuation_analysis",
"ev_ebitda_valuation_analysis",
"residual_income_valuation_analysis",
],
},
}
for group_name, group_config in tool_groups.items():
toolkit.create_tool_group(
group_name=group_name,
description=group_config["description"],
active=group_config["active"],
notes=group_config["notes"],
)
for tool_name in group_config["tools"]:
tool_func = TOOL_REGISTRY.get(tool_name)
if tool_func:
toolkit.register_tool_function(
tool_func,
group_name=group_name,
)
def _register_portfolio_tool_groups(toolkit: Any, pm_agent: Any) -> None:
toolkit.create_tool_group(
group_name="portfolio_ops",
description="Portfolio decision recording tools.",
active=False,
notes=(
"Use portfolio tools only after synthesizing analyst and risk inputs. "
"Record one explicit decision per ticker."
),
)
toolkit.register_tool_function(
pm_agent._make_decision,
group_name="portfolio_ops",
)
def _register_risk_tool_groups(toolkit: Any) -> None:
from backend.tools.risk_tools import (
assess_margin_and_liquidity,
assess_position_concentration,
assess_volatility_exposure,
)
toolkit.create_tool_group(
group_name="risk_ops",
description="Risk diagnostics for concentration, leverage, and volatility.",
active=False,
notes=(
"Use risk tools to quantify concentration, margin pressure, and volatility "
"before writing the final risk memo."
),
)
toolkit.register_tool_function(
assess_position_concentration,
group_name="risk_ops",
)
toolkit.register_tool_function(
assess_margin_and_liquidity,
group_name="risk_ops",
)
toolkit.register_tool_function(
assess_volatility_exposure,
group_name="risk_ops",
)
def create_agent_toolkit(
agent_id: str,
config_name: str,
owner: Any = None,
active_skill_dirs: Iterable[str] | None = None,
) -> Any:
"""Create a Toolkit with agent skills and grouped tools."""
from agentscope.tool import Toolkit
profiles = load_agent_profiles()
profile = profiles.get(agent_id, {})
skills_manager = SkillsManager()
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
)
override = bootstrap_config.agent_override(agent_id)
active_groups = override.get(
"active_tool_groups",
profile.get("active_tool_groups", []),
)
toolkit = Toolkit(
agent_skill_instruction=(
"<system-info>You have access to project skills. Each skill lives in a "
"directory and is described by SKILL.md. Follow the skill instructions "
"when they are relevant to the current task.</system-info>"
),
agent_skill_template="- {name} (dir: {dir}): {description}",
)
if agent_id.endswith("_analyst"):
_register_analysis_tool_groups(toolkit)
elif agent_id == "portfolio_manager" and owner is not None:
_register_portfolio_tool_groups(toolkit, owner)
elif agent_id == "risk_manager":
_register_risk_tool_groups(toolkit)
if active_skill_dirs is None:
skill_names = skills_manager.resolve_agent_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=profile.get("skills", []),
)
active_skill_dirs = [
skills_manager.get_active_root(config_name) / skill_name
for skill_name in skill_names
]
for skill_dir in active_skill_dirs:
toolkit.register_agent_skill(str(skill_dir))
if active_groups:
toolkit.update_tool_groups(group_names=active_groups, active=True)
return toolkit

View File

@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
"""Initialize run-scoped agent workspace assets."""
from pathlib import Path
from typing import Dict, Iterable, Optional
from .skills_manager import SkillsManager
class WorkspaceManager:
"""Create and maintain run-level prompt asset files for each agent."""
def __init__(self, project_root: Optional[Path] = None):
self.skills_manager = SkillsManager(project_root=project_root)
self.project_root = self.skills_manager.project_root
def get_run_dir(self, config_name: str) -> Path:
return self.project_root / "runs" / config_name
def ensure_run_workspace(self, config_name: str) -> Path:
run_dir = self.get_run_dir(config_name)
run_dir.mkdir(parents=True, exist_ok=True)
self.skills_manager.ensure_activation_manifest(config_name)
bootstrap_path = run_dir / "BOOTSTRAP.md"
if not bootstrap_path.exists():
bootstrap_path.write_text(
"---\n"
"tickers:\n"
" - AAPL\n"
" - MSFT\n"
"initial_cash: 100000\n"
"margin_requirement: 0.0\n"
"enable_memory: false\n"
"max_comm_cycles: 2\n"
"agent_overrides: {}\n"
"---\n\n"
"# Bootstrap\n\n"
"Use this file to describe run-specific setup notes, preferred tickers,\n"
"risk bounds, or strategy constraints before the first execution.\n\n"
"The YAML front matter above is machine-readable runtime configuration.\n"
"The markdown body below is injected into agent prompts as run context.\n",
encoding="utf-8",
)
return run_dir
def bootstrap_path(self, config_name: str) -> Path:
return self.get_run_dir(config_name) / "BOOTSTRAP.md"
def ensure_agent_assets(
self,
config_name: str,
agent_id: str,
role_seed: str = "",
style_seed: str = "",
policy_seed: str = "",
) -> Path:
asset_dir = self.skills_manager.get_agent_asset_dir(
config_name,
agent_id,
)
asset_dir.mkdir(parents=True, exist_ok=True)
self._ensure_file(
asset_dir / "ROLE.md",
"# Role\n\n"
"Optional run-scoped role override.\n\n"
f"{role_seed}".strip()
+ "\n",
)
self._ensure_file(
asset_dir / "STYLE.md",
"# Style\n\n"
"Optional run-scoped communication or reasoning style.\n\n"
f"{style_seed}".strip()
+ "\n",
)
self._ensure_file(
asset_dir / "POLICY.md",
"# Policy\n\n"
"Optional run-scoped constraints, limits, or strategy policy.\n\n"
f"{policy_seed}".strip()
+ "\n",
)
return asset_dir
def initialize_default_assets(
self,
config_name: str,
agent_ids: Iterable[str],
analyst_personas: Optional[Dict[str, Dict]] = None,
) -> None:
self.ensure_run_workspace(config_name)
analyst_personas = analyst_personas or {}
for agent_id in agent_ids:
if agent_id.endswith("_analyst"):
persona = analyst_personas.get(agent_id, {})
role_seed = persona.get("description", "").strip()
focus_items = persona.get("focus", [])
style_seed = "\n".join(f"- {item}" for item in focus_items)
policy_seed = (
"State a clear signal, confidence, and the conditions that would invalidate the thesis."
)
elif agent_id == "portfolio_manager":
role_seed = (
"Synthesize analyst and risk inputs into explicit portfolio decisions."
)
style_seed = (
"Be concise, capital-aware, and explicit about sizing rationale."
)
policy_seed = (
"Respect cash, margin, and portfolio concentration constraints before recording decisions."
)
elif agent_id == "risk_manager":
role_seed = (
"Quantify concentration, leverage, liquidity, and volatility risk before trade execution."
)
style_seed = (
"Prioritize the highest-severity risk first and state concrete limits."
)
policy_seed = (
"Use available risk tools before issuing the final risk memo."
)
else:
role_seed = ""
style_seed = ""
policy_seed = ""
self.ensure_agent_assets(
config_name=config_name,
agent_id=agent_id,
role_seed=role_seed,
style_seed=style_seed,
policy_seed=policy_seed,
)
@staticmethod
def _ensure_file(path: Path, content: str) -> None:
if not path.exists():
path.write_text(content, encoding="utf-8")