1 Commits

Author SHA1 Message Date
78f133617f Add run-scoped skill and prompt asset management 2026-03-16 00:04:04 +08:00
23 changed files with 1309 additions and 109 deletions

View File

@@ -11,9 +11,7 @@ from agentscope.message import Msg
from ..config.constants import ANALYST_TYPES
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache
class AnalystAgent(ReActAgent):
@@ -55,9 +53,10 @@ class AnalystAgent(ReActAgent):
if agent_id is None:
agent_id = analyst_type
self.agent_id = agent_id
self.config = config or {}
self.toolkit = toolkit
sys_prompt = self._load_system_prompt()
kwargs = {
@@ -77,27 +76,11 @@ class AnalystAgent(ReActAgent):
def _load_system_prompt(self) -> str:
"""Load system prompt for analyst"""
personas_config = _prompt_loader.load_yaml_config(
"analyst",
"personas",
)
persona = personas_config.get(self.analyst_type_key, {})
# Get focus items and format as bullet points
focus_items = persona.get("focus", [])
focus_text = "\n".join(f"- {item}" for item in focus_items)
# Get description
description = persona.get("description", "").strip()
return _prompt_loader.load_prompt(
"analyst",
"system",
variables={
"analyst_type": self.analyst_persona,
"focus": focus_text,
"description": description,
},
return build_agent_system_prompt(
agent_id=self.agent_id,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
analyst_type=self.analyst_type_key,
)
async def reply(self, x: Msg = None) -> Msg:
@@ -131,3 +114,15 @@ class AnalystAgent(ReActAgent):
)
return result
def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None:
"""Reload toolkit and system prompt from current run assets."""
from .toolkit_factory import create_agent_toolkit
clear_prompt_factory_cache()
self.toolkit = create_agent_toolkit(
self.agent_id,
self.config.get("config_name", "default"),
active_skill_dirs=active_skill_dirs,
)
self.sys_prompt = self._load_system_prompt()

View File

@@ -12,9 +12,7 @@ from agentscope.message import Msg, TextBlock
from agentscope.tool import Toolkit, ToolResponse
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache
class PMAgent(ReActAgent):
@@ -36,6 +34,9 @@ class PMAgent(ReActAgent):
margin_requirement: float = 0.25,
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
toolkit_factory: Any = None,
toolkit_factory_kwargs: Optional[Dict[str, Any]] = None,
toolkit: Optional[Toolkit] = None,
):
self.config = config or {}
@@ -49,11 +50,28 @@ class PMAgent(ReActAgent):
# Decisions made in current cycle
self._decisions: Dict[str, Dict] = {}
toolkit_factory_kwargs = toolkit_factory_kwargs or {}
self._toolkit_factory = toolkit_factory
self._toolkit_factory_kwargs = toolkit_factory_kwargs
# Create toolkit
toolkit = self._create_toolkit()
# Create toolkit after local state is ready so bound tool methods can be registered.
if toolkit is None:
if toolkit_factory is not None:
toolkit = toolkit_factory(
name,
self.config.get("config_name", "default"),
owner=self,
**toolkit_factory_kwargs,
)
else:
toolkit = self._create_toolkit()
self.toolkit = toolkit
sys_prompt = _prompt_loader.load_prompt("portfolio_manager", "system")
sys_prompt = build_agent_system_prompt(
agent_id=name,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
)
kwargs = {
"name": name,
@@ -186,3 +204,25 @@ class PMAgent(ReActAgent):
def update_portfolio(self, portfolio: Dict[str, Any]):
"""Update portfolio after external execution"""
self.portfolio.update(portfolio)
def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None:
"""Reload toolkit and system prompt from current run assets."""
from .toolkit_factory import create_agent_toolkit
clear_prompt_factory_cache()
toolkit_factory = self._toolkit_factory or create_agent_toolkit
toolkit_kwargs = dict(self._toolkit_factory_kwargs)
if active_skill_dirs is not None:
toolkit_kwargs["active_skill_dirs"] = active_skill_dirs
self.toolkit = toolkit_factory(
self.name,
self.config.get("config_name", "default"),
owner=self,
**toolkit_kwargs,
)
self.sys_prompt = build_agent_system_prompt(
agent_id=self.name,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
)

View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
"""Assemble system prompts from base prompts, run assets, and toolkit context."""
from pathlib import Path
from typing import Any, Optional
from backend.config.bootstrap_config import get_bootstrap_config_for_run
from .prompt_loader import PromptLoader
from .skills_manager import SkillsManager
_prompt_loader = PromptLoader()
def _read_file_if_exists(path: Path) -> str:
if not path.exists() or not path.is_file():
return ""
return path.read_text(encoding="utf-8").strip()
def _append_section(parts: list[str], title: str, content: str) -> None:
content = content.strip()
if content:
parts.append(f"## {title}\n{content}")
def build_agent_system_prompt(
agent_id: str,
config_name: str,
toolkit: Any,
analyst_type: Optional[str] = None,
) -> str:
"""Build the final system prompt for an agent."""
sections: list[str] = []
if analyst_type:
personas_config = _prompt_loader.load_yaml_config(
"analyst",
"personas",
)
persona = personas_config.get(analyst_type, {})
focus_text = "\n".join(
f"- {item}" for item in persona.get("focus", [])
)
description = persona.get("description", "").strip()
base_prompt = _prompt_loader.load_prompt(
"analyst",
"system",
variables={
"analyst_type": persona.get("name", analyst_type),
"focus": focus_text,
"description": description,
},
)
elif agent_id == "portfolio_manager":
base_prompt = _prompt_loader.load_prompt(
"portfolio_manager",
"system",
)
elif agent_id == "risk_manager":
base_prompt = _prompt_loader.load_prompt(
"risk_manager",
"system",
)
else:
raise ValueError(f"Unsupported agent prompt build for: {agent_id}")
sections.append(base_prompt.strip())
skills_manager = SkillsManager()
asset_dir = skills_manager.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
)
_append_section(
sections,
"Bootstrap",
bootstrap_config.prompt_body,
)
_append_section(
sections,
"Role",
_read_file_if_exists(asset_dir / "ROLE.md"),
)
_append_section(
sections,
"Style",
_read_file_if_exists(asset_dir / "STYLE.md"),
)
_append_section(
sections,
"Policy",
_read_file_if_exists(asset_dir / "POLICY.md"),
)
skill_prompt = toolkit.get_agent_skill_prompt()
if skill_prompt:
_append_section(sections, "Skills", str(skill_prompt))
activated_notes = toolkit.get_activated_notes()
if activated_notes:
_append_section(sections, "Tool Usage Notes", str(activated_notes))
return "\n\n".join(section for section in sections if section.strip())
def clear_prompt_factory_cache() -> None:
"""Clear cached prompt and YAML templates before hot reload."""
_prompt_loader.clear_cache()

View File

@@ -8,12 +8,6 @@ fundamentals_analyst:
- "管理层质量和公司治理"
- "行业地位和市场份额"
- "长期投资价值评估"
tools:
- "analyze_profitability"
- "analyze_growth"
- "analyze_financial_health"
- "analyze_valuation_ratios"
- "analyze_efficiency_ratios"
description: |
作为基本面分析师,你专注于:
- 公司财务健康状况和盈利能力
@@ -39,11 +33,6 @@ technical_analyst:
- 支撑/阻力位和关键价格点
- 中短期交易机会
你倾向于选择能够捕捉价格动态和市场趋势的工具,更偏好技术分析类工具。
tools:
- "analyze_trend_following"
- "analyze_momentum"
- "analyze_mean_reversion"
- "analyze_volatility"
sentiment_analyst:
name: "情绪分析师"
@@ -61,10 +50,6 @@ sentiment_analyst:
- 投资者恐慌和贪婪情绪
- 市场预期和心理因素
你倾向于选择能够反映市场情绪和投资者行为的工具,更偏好情绪和行为类工具。
tools:
- "analyze_news_sentiment"
- "analyze_insider_trading"
valuation_analyst:
name: "估值分析师"
focus:
@@ -81,12 +66,6 @@ valuation_analyst:
- 相对估值和绝对估值
- 投资安全边际评估
你倾向于选择能够准确计算公司价值的工具,更偏好估值模型和基本面工具。
tools:
- "dcf_valuation_analysis"
- "owner_earnings_valuation_analysis"
- "ev_ebitda_valuation_analysis"
- "residual_income_valuation_analysis"
comprehensive_analyst:
name: "综合分析师"
focus:
@@ -103,15 +82,3 @@ comprehensive_analyst:
- 提供全面的投资建议
- 适应不同市场环境
你会根据具体情况灵活选择各类工具,追求分析的全面性和准确性。
tools:
- "analyze_profitability"
- "analyze_growth"
- "analyze_financial_health"
- "analyze_valuation_ratios"
- "analyze_efficiency_ratios"
- "analyze_trend_following"
- "analyze_momentum"
- "analyze_mean_reversion"
- "analyze_volatility"
- "analyze_news_sentiment"
- "analyze_insider_trading"

View File

@@ -8,6 +8,8 @@
5. 基于市场条件建议仓位限制
你的决策流程:
1. 优先使用可用的风险工具量化集中度、波动率和保证金压力
2. 结合工具结果与当前市场上下文做判断
3. 生成可操作的风险警告和仓位限制建议
4. 为你的风险评估提供清晰的理由

View File

@@ -11,9 +11,7 @@ from agentscope.message import Msg
from agentscope.tool import Toolkit
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
from .prompt_factory import build_agent_system_prompt, clear_prompt_factory_cache
class RiskAgent(ReActAgent):
@@ -29,6 +27,7 @@ class RiskAgent(ReActAgent):
name: str = "risk_manager",
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
toolkit: Optional[Toolkit] = None,
):
"""
Initialize Risk Manager Agent
@@ -41,12 +40,14 @@ class RiskAgent(ReActAgent):
long_term_memory: Optional ReMeTaskLongTermMemory instance
"""
self.config = config or {}
self.agent_id = name
if toolkit is None:
toolkit = Toolkit()
self.toolkit = toolkit
sys_prompt = self._load_system_prompt()
# Create dedicated toolkit for this agent
toolkit = Toolkit()
kwargs = {
"name": name,
"sys_prompt": sys_prompt,
@@ -64,9 +65,10 @@ class RiskAgent(ReActAgent):
def _load_system_prompt(self) -> str:
"""Load system prompt for risk manager"""
return _prompt_loader.load_prompt(
"risk_manager",
"system",
return build_agent_system_prompt(
agent_id=self.agent_id,
config_name=self.config.get("config_name", "default"),
toolkit=self.toolkit,
)
async def reply(self, x: Msg = None) -> Msg:
@@ -86,3 +88,15 @@ class RiskAgent(ReActAgent):
progress.update_status(self.name, None, "Risk assessment completed")
return result
def reload_runtime_assets(self, active_skill_dirs: Optional[list] = None) -> None:
"""Reload toolkit and system prompt from current run assets."""
from .toolkit_factory import create_agent_toolkit
clear_prompt_factory_cache()
self.toolkit = create_agent_toolkit(
self.agent_id,
self.config.get("config_name", "default"),
active_skill_dirs=active_skill_dirs,
)
self.sys_prompt = self._load_system_prompt()

View File

@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
"""Manage builtin/customized/active skill directories for each run."""
from pathlib import Path
import shutil
from typing import Dict, Iterable, List
import yaml
from backend.config.bootstrap_config import get_bootstrap_config_for_run
class SkillsManager:
"""Sync named skills into a run-scoped active skills workspace."""
def __init__(self, project_root: Path | None = None):
self.project_root = (
project_root or Path(__file__).resolve().parents[2]
)
self.builtin_root = self.project_root / "backend" / "skills" / "builtin"
self.customized_root = (
self.project_root / "backend" / "skills" / "customized"
)
self.runs_root = self.project_root / "runs"
def get_active_root(self, config_name: str) -> Path:
return self.runs_root / config_name / "skills" / "active"
def get_activation_manifest_path(self, config_name: str) -> Path:
return self.runs_root / config_name / "skills" / "activation.yaml"
def get_agent_asset_dir(self, config_name: str, agent_id: str) -> Path:
return self.runs_root / config_name / "agents" / agent_id
def ensure_activation_manifest(self, config_name: str) -> Path:
manifest_path = self.get_activation_manifest_path(config_name)
manifest_path.parent.mkdir(parents=True, exist_ok=True)
if not manifest_path.exists():
manifest_path.write_text(
"global_enabled_skills: []\n"
"global_disabled_skills: []\n"
"agent_enabled_skills: {}\n"
"agent_disabled_skills: {}\n",
encoding="utf-8",
)
return manifest_path
def load_activation_manifest(self, config_name: str) -> Dict[str, object]:
manifest_path = self.ensure_activation_manifest(config_name)
with open(manifest_path, "r", encoding="utf-8") as file:
parsed = yaml.safe_load(file) or {}
return parsed if isinstance(parsed, dict) else {}
def _resolve_source_dir(self, skill_name: str) -> Path:
customized_dir = self.customized_root / skill_name
if customized_dir.exists():
return customized_dir
builtin_dir = self.builtin_root / skill_name
if builtin_dir.exists():
return builtin_dir
raise FileNotFoundError(f"Unknown skill: {skill_name}")
def resolve_agent_skill_names(
self,
config_name: str,
agent_id: str,
default_skills: Iterable[str],
) -> List[str]:
"""Resolve final skill names after bootstrap and activation overlays."""
bootstrap = get_bootstrap_config_for_run(self.project_root, config_name)
override = bootstrap.agent_override(agent_id)
skills = list(override.get("skills", list(default_skills)))
manifest = self.load_activation_manifest(config_name)
for skill_name in manifest.get("global_enabled_skills", []):
if skill_name not in skills:
skills.append(skill_name)
for skill_name in manifest.get("agent_enabled_skills", {}).get(agent_id, []):
if skill_name not in skills:
skills.append(skill_name)
disabled = set(manifest.get("global_disabled_skills", []))
disabled.update(
manifest.get("agent_disabled_skills", {}).get(agent_id, []),
)
return [skill for skill in skills if skill not in disabled]
def sync_active_skills(
self,
config_name: str,
skill_names: Iterable[str],
) -> List[Path]:
"""Sync selected skills into the run workspace and return their paths."""
active_root = self.get_active_root(config_name)
active_root.mkdir(parents=True, exist_ok=True)
synced_paths: List[Path] = []
wanted = set(skill_names)
for existing in active_root.iterdir():
if existing.is_dir() and existing.name not in wanted:
shutil.rmtree(existing)
for skill_name in skill_names:
source_dir = self._resolve_source_dir(skill_name)
target_dir = active_root / skill_name
if target_dir.exists():
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
synced_paths.append(target_dir)
return synced_paths
def prepare_active_skills(
self,
config_name: str,
agent_defaults: Dict[str, Iterable[str]],
) -> Dict[str, List[Path]]:
"""Resolve all agent skills, sync the union once, and map paths per agent."""
resolved: Dict[str, List[str]] = {}
union: List[str] = []
for agent_id, default_skills in agent_defaults.items():
resolved_skills = self.resolve_agent_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=default_skills,
)
resolved[agent_id] = resolved_skills
for skill_name in resolved_skills:
if skill_name not in union:
union.append(skill_name)
self.sync_active_skills(config_name=config_name, skill_names=union)
active_root = self.get_active_root(config_name)
return {
agent_id: [active_root / skill_name for skill_name in skill_names]
for agent_id, skill_names in resolved.items()
}

View File

@@ -0,0 +1,197 @@
# -*- coding: utf-8 -*-
"""Toolkit factory following AgentScope's skill + tool group practices."""
from typing import Any, Dict, Iterable
from backend.config.bootstrap_config import get_bootstrap_config_for_run
import yaml
from .skills_manager import SkillsManager
def load_agent_profiles() -> Dict[str, Dict[str, Any]]:
config_path = SkillsManager().project_root / "backend" / "config" / "agent_profiles.yaml"
with open(config_path, "r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def _register_analysis_tool_groups(toolkit: Any) -> None:
from backend.tools.analysis_tools import TOOL_REGISTRY
tool_groups = {
"fundamentals": {
"description": "Financial health, profitability, growth, and efficiency analysis tools.",
"active": False,
"notes": (
"Use these tools to validate business quality, financial resilience, "
"and earnings durability before making directional conclusions."
),
"tools": [
"analyze_profitability",
"analyze_growth",
"analyze_financial_health",
"analyze_efficiency_ratios",
"analyze_valuation_ratios",
"get_financial_metrics_tool",
],
},
"technical": {
"description": "Trend, momentum, mean reversion, and volatility analysis tools.",
"active": False,
"notes": (
"Use these tools to assess timing, price structure, and risk-reward in "
"the current market regime."
),
"tools": [
"analyze_trend_following",
"analyze_momentum",
"analyze_mean_reversion",
"analyze_volatility",
],
},
"sentiment": {
"description": "News sentiment and insider activity analysis tools.",
"active": False,
"notes": (
"Use these tools to capture short-horizon catalysts, sentiment shifts, "
"and behavioral signals around each ticker."
),
"tools": [
"analyze_news_sentiment",
"analyze_insider_trading",
],
},
"valuation": {
"description": "Intrinsic value and relative valuation analysis tools.",
"active": False,
"notes": (
"Use these tools when the task requires fair value estimation, margin of "
"safety analysis, or valuation scenario comparison."
),
"tools": [
"dcf_valuation_analysis",
"owner_earnings_valuation_analysis",
"ev_ebitda_valuation_analysis",
"residual_income_valuation_analysis",
],
},
}
for group_name, group_config in tool_groups.items():
toolkit.create_tool_group(
group_name=group_name,
description=group_config["description"],
active=group_config["active"],
notes=group_config["notes"],
)
for tool_name in group_config["tools"]:
tool_func = TOOL_REGISTRY.get(tool_name)
if tool_func:
toolkit.register_tool_function(
tool_func,
group_name=group_name,
)
def _register_portfolio_tool_groups(toolkit: Any, pm_agent: Any) -> None:
toolkit.create_tool_group(
group_name="portfolio_ops",
description="Portfolio decision recording tools.",
active=False,
notes=(
"Use portfolio tools only after synthesizing analyst and risk inputs. "
"Record one explicit decision per ticker."
),
)
toolkit.register_tool_function(
pm_agent._make_decision,
group_name="portfolio_ops",
)
def _register_risk_tool_groups(toolkit: Any) -> None:
from backend.tools.risk_tools import (
assess_margin_and_liquidity,
assess_position_concentration,
assess_volatility_exposure,
)
toolkit.create_tool_group(
group_name="risk_ops",
description="Risk diagnostics for concentration, leverage, and volatility.",
active=False,
notes=(
"Use risk tools to quantify concentration, margin pressure, and volatility "
"before writing the final risk memo."
),
)
toolkit.register_tool_function(
assess_position_concentration,
group_name="risk_ops",
)
toolkit.register_tool_function(
assess_margin_and_liquidity,
group_name="risk_ops",
)
toolkit.register_tool_function(
assess_volatility_exposure,
group_name="risk_ops",
)
def create_agent_toolkit(
agent_id: str,
config_name: str,
owner: Any = None,
active_skill_dirs: Iterable[str] | None = None,
) -> Any:
"""Create a Toolkit with agent skills and grouped tools."""
from agentscope.tool import Toolkit
profiles = load_agent_profiles()
profile = profiles.get(agent_id, {})
skills_manager = SkillsManager()
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
)
override = bootstrap_config.agent_override(agent_id)
active_groups = override.get(
"active_tool_groups",
profile.get("active_tool_groups", []),
)
toolkit = Toolkit(
agent_skill_instruction=(
"<system-info>You have access to project skills. Each skill lives in a "
"directory and is described by SKILL.md. Follow the skill instructions "
"when they are relevant to the current task.</system-info>"
),
agent_skill_template="- {name} (dir: {dir}): {description}",
)
if agent_id.endswith("_analyst"):
_register_analysis_tool_groups(toolkit)
elif agent_id == "portfolio_manager" and owner is not None:
_register_portfolio_tool_groups(toolkit, owner)
elif agent_id == "risk_manager":
_register_risk_tool_groups(toolkit)
if active_skill_dirs is None:
skill_names = skills_manager.resolve_agent_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=profile.get("skills", []),
)
active_skill_dirs = [
skills_manager.get_active_root(config_name) / skill_name
for skill_name in skill_names
]
for skill_dir in active_skill_dirs:
toolkit.register_agent_skill(str(skill_dir))
if active_groups:
toolkit.update_tool_groups(group_names=active_groups, active=True)
return toolkit

View File

@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
"""Initialize run-scoped agent workspace assets."""
from pathlib import Path
from typing import Dict, Iterable, Optional
from .skills_manager import SkillsManager
class WorkspaceManager:
"""Create and maintain run-level prompt asset files for each agent."""
def __init__(self, project_root: Optional[Path] = None):
self.skills_manager = SkillsManager(project_root=project_root)
self.project_root = self.skills_manager.project_root
def get_run_dir(self, config_name: str) -> Path:
return self.project_root / "runs" / config_name
def ensure_run_workspace(self, config_name: str) -> Path:
run_dir = self.get_run_dir(config_name)
run_dir.mkdir(parents=True, exist_ok=True)
self.skills_manager.ensure_activation_manifest(config_name)
bootstrap_path = run_dir / "BOOTSTRAP.md"
if not bootstrap_path.exists():
bootstrap_path.write_text(
"---\n"
"tickers:\n"
" - AAPL\n"
" - MSFT\n"
"initial_cash: 100000\n"
"margin_requirement: 0.0\n"
"enable_memory: false\n"
"max_comm_cycles: 2\n"
"agent_overrides: {}\n"
"---\n\n"
"# Bootstrap\n\n"
"Use this file to describe run-specific setup notes, preferred tickers,\n"
"risk bounds, or strategy constraints before the first execution.\n\n"
"The YAML front matter above is machine-readable runtime configuration.\n"
"The markdown body below is injected into agent prompts as run context.\n",
encoding="utf-8",
)
return run_dir
def bootstrap_path(self, config_name: str) -> Path:
return self.get_run_dir(config_name) / "BOOTSTRAP.md"
def ensure_agent_assets(
self,
config_name: str,
agent_id: str,
role_seed: str = "",
style_seed: str = "",
policy_seed: str = "",
) -> Path:
asset_dir = self.skills_manager.get_agent_asset_dir(
config_name,
agent_id,
)
asset_dir.mkdir(parents=True, exist_ok=True)
self._ensure_file(
asset_dir / "ROLE.md",
"# Role\n\n"
"Optional run-scoped role override.\n\n"
f"{role_seed}".strip()
+ "\n",
)
self._ensure_file(
asset_dir / "STYLE.md",
"# Style\n\n"
"Optional run-scoped communication or reasoning style.\n\n"
f"{style_seed}".strip()
+ "\n",
)
self._ensure_file(
asset_dir / "POLICY.md",
"# Policy\n\n"
"Optional run-scoped constraints, limits, or strategy policy.\n\n"
f"{policy_seed}".strip()
+ "\n",
)
return asset_dir
def initialize_default_assets(
self,
config_name: str,
agent_ids: Iterable[str],
analyst_personas: Optional[Dict[str, Dict]] = None,
) -> None:
self.ensure_run_workspace(config_name)
analyst_personas = analyst_personas or {}
for agent_id in agent_ids:
if agent_id.endswith("_analyst"):
persona = analyst_personas.get(agent_id, {})
role_seed = persona.get("description", "").strip()
focus_items = persona.get("focus", [])
style_seed = "\n".join(f"- {item}" for item in focus_items)
policy_seed = (
"State a clear signal, confidence, and the conditions that would invalidate the thesis."
)
elif agent_id == "portfolio_manager":
role_seed = (
"Synthesize analyst and risk inputs into explicit portfolio decisions."
)
style_seed = (
"Be concise, capital-aware, and explicit about sizing rationale."
)
policy_seed = (
"Respect cash, margin, and portfolio concentration constraints before recording decisions."
)
elif agent_id == "risk_manager":
role_seed = (
"Quantify concentration, leverage, liquidity, and volatility risk before trade execution."
)
style_seed = (
"Prioritize the highest-severity risk first and state concrete limits."
)
policy_seed = (
"Use available risk tools before issuing the final risk memo."
)
else:
role_seed = ""
style_seed = ""
policy_seed = ""
self.ensure_agent_assets(
config_name=config_name,
agent_id=agent_id,
role_seed=role_seed,
style_seed=style_seed,
policy_seed=policy_seed,
)
@staticmethod
def _ensure_file(path: Path, content: str) -> None:
if not path.exists():
path.write_text(content, encoding="utf-8")

View File

@@ -22,6 +22,9 @@ from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm
from backend.agents.prompt_loader import PromptLoader
from backend.agents.workspace_manager import WorkspaceManager
app = typer.Typer(
name="evotraders",
help="EvoTraders: A self-evolving multi-agent trading system",
@@ -29,6 +32,7 @@ app = typer.Typer(
)
console = Console()
_prompt_loader = PromptLoader()
def get_project_root() -> Path:
@@ -180,6 +184,46 @@ def run_data_updater(project_root: Path) -> None:
)
def initialize_workspace(config_name: str) -> Path:
"""Create run-scoped workspace files for a config."""
workspace_manager = WorkspaceManager(project_root=get_project_root())
workspace_manager.initialize_default_assets(
config_name=config_name,
agent_ids=[
"fundamentals_analyst",
"technical_analyst",
"sentiment_analyst",
"valuation_analyst",
"risk_manager",
"portfolio_manager",
],
analyst_personas=_prompt_loader.load_yaml_config(
"analyst",
"personas",
),
)
return workspace_manager.get_run_dir(config_name)
@app.command("init-workspace")
def init_workspace(
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Configuration name for the workspace",
),
):
"""Initialize run-scoped BOOTSTRAP and agent prompt asset files."""
run_dir = initialize_workspace(config_name)
console.print(
Panel.fit(
f"[bold green]Workspace initialized[/bold green]\n[cyan]{run_dir}[/cyan]",
border_style="green",
),
)
@app.command()
def backtest(
start: Optional[str] = typer.Option(

View File

@@ -0,0 +1,37 @@
fundamentals_analyst:
skills:
- fundamental_review
active_tool_groups:
- fundamentals
- valuation
technical_analyst:
skills:
- technical_review
active_tool_groups:
- technical
sentiment_analyst:
skills:
- sentiment_review
active_tool_groups:
- sentiment
valuation_analyst:
skills:
- valuation_review
active_tool_groups:
- valuation
- fundamentals
portfolio_manager:
skills:
- portfolio_decisioning
active_tool_groups:
- portfolio_ops
risk_manager:
skills:
- risk_review
active_tool_groups:
- risk_ops

View File

@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
"""Parse run-scoped BOOTSTRAP.md into structured configuration."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict
import re
import yaml
BOOTSTRAP_FRONT_MATTER_RE = re.compile(
r"^---\s*\n(.*?)\n---\s*\n?(.*)$",
re.DOTALL,
)
@dataclass(frozen=True)
class BootstrapConfig:
"""Structured configuration extracted from BOOTSTRAP.md."""
values: Dict[str, Any] = field(default_factory=dict)
prompt_body: str = ""
def get(self, key: str, default: Any = None) -> Any:
return self.values.get(key, default)
def agent_override(self, agent_id: str) -> Dict[str, Any]:
overrides = self.values.get("agent_overrides", {})
if not isinstance(overrides, dict):
return {}
override = overrides.get(agent_id, {})
return override if isinstance(override, dict) else {}
def load_bootstrap_config(bootstrap_path: Path) -> BootstrapConfig:
"""Load structured bootstrap config and free-form prompt body."""
if not bootstrap_path.exists():
return BootstrapConfig()
raw = bootstrap_path.read_text(encoding="utf-8").strip()
if not raw:
return BootstrapConfig()
match = BOOTSTRAP_FRONT_MATTER_RE.match(raw)
if not match:
return BootstrapConfig(prompt_body=raw)
front_matter = match.group(1).strip()
body = match.group(2).strip()
parsed = yaml.safe_load(front_matter) or {}
if not isinstance(parsed, dict):
parsed = {}
return BootstrapConfig(values=parsed, prompt_body=body)
def get_bootstrap_config_for_run(
project_root: Path,
config_name: str,
) -> BootstrapConfig:
"""Load BOOTSTRAP.md from the run workspace."""
return load_bootstrap_config(
project_root / "runs" / config_name / "BOOTSTRAP.md",
)

View File

@@ -226,6 +226,44 @@ class TradingPipeline:
"settlement_result": settlement_result,
}
def reload_runtime_assets(self) -> Dict[str, Any]:
"""Reload prompt assets, bootstrap config, and active skills for all agents."""
from backend.agents.skills_manager import SkillsManager
from backend.agents.toolkit_factory import load_agent_profiles
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
skills_manager = SkillsManager()
profiles = load_agent_profiles()
active_skill_map = skills_manager.prepare_active_skills(
config_name=config_name,
agent_defaults={
agent_id: profile.get("skills", [])
for agent_id, profile in profiles.items()
},
)
for analyst in self.analysts:
analyst.reload_runtime_assets(
active_skill_dirs=active_skill_map.get(analyst.name, []),
)
self.risk_manager.reload_runtime_assets(
active_skill_dirs=active_skill_map.get("risk_manager", []),
)
self.pm.reload_runtime_assets(
active_skill_dirs=active_skill_map.get("portfolio_manager", []),
)
return {
"config_name": config_name,
"reloaded_agents": [agent.name for agent in self.analysts]
+ ["risk_manager", "portfolio_manager"],
"active_skills": {
agent_id: [path.name for path in paths]
for agent_id, paths in active_skill_map.items()
},
}
async def _clear_all_agent_memory(self):
"""Clear short-term memory for all agents"""
for analyst in self.analysts:

View File

@@ -14,6 +14,11 @@ import loguru
from dotenv import load_dotenv
from backend.agents import AnalystAgent, PMAgent, RiskAgent
from backend.agents.skills_manager import SkillsManager
from backend.agents.toolkit_factory import create_agent_toolkit, load_agent_profiles
from backend.agents.prompt_loader import PromptLoader
from backend.agents.workspace_manager import WorkspaceManager
from backend.config.bootstrap_config import get_bootstrap_config_for_run
from backend.config.constants import ANALYST_TYPES
from backend.config.env_config import get_env_float, get_env_int, get_env_list
from backend.core.pipeline import TradingPipeline
@@ -28,6 +33,38 @@ load_dotenv()
logger = logging.getLogger(__name__)
loguru.logger.disable("flowllm")
loguru.logger.disable("reme_ai")
_prompt_loader = PromptLoader()
def _resolve_runtime_config(args) -> dict:
"""Merge env defaults with run-scoped bootstrap config."""
project_root = Path(__file__).resolve().parents[1]
bootstrap = get_bootstrap_config_for_run(project_root, args.config_name)
return {
"tickers": bootstrap.get("tickers")
or get_env_list("TICKERS", ["AAPL", "MSFT"]),
"initial_cash": float(
bootstrap.get(
"initial_cash",
get_env_float("INITIAL_CASH", 100000.0),
),
),
"margin_requirement": float(
bootstrap.get(
"margin_requirement",
get_env_float("MARGIN_REQUIREMENT", 0.0),
),
),
"max_comm_cycles": int(
bootstrap.get(
"max_comm_cycles",
get_env_int("MAX_COMM_CYCLES", 2),
),
),
"enable_memory": args.enable_memory
or bool(bootstrap.get("enable_memory", False)),
}
def create_long_term_memory(agent_name: str, config_name: str):
@@ -84,11 +121,31 @@ def create_agents(
"""
analysts = []
long_term_memories = []
workspace_manager = WorkspaceManager()
workspace_manager.initialize_default_assets(
config_name=config_name,
agent_ids=list(ANALYST_TYPES.keys())
+ ["risk_manager", "portfolio_manager"],
analyst_personas=_prompt_loader.load_yaml_config("analyst", "personas"),
)
profiles = load_agent_profiles()
skills_manager = SkillsManager()
active_skill_map = skills_manager.prepare_active_skills(
config_name=config_name,
agent_defaults={
agent_id: profile.get("skills", [])
for agent_id, profile in profiles.items()
},
)
for analyst_type in ANALYST_TYPES:
model = get_agent_model(analyst_type)
formatter = get_agent_formatter(analyst_type)
toolkit = create_toolkit(analyst_type)
toolkit = create_agent_toolkit(
analyst_type,
config_name,
active_skill_dirs=active_skill_map.get(analyst_type, []),
)
long_term_memory = None
if enable_long_term_memory:
@@ -125,6 +182,11 @@ def create_agents(
name="risk_manager",
config={"config_name": config_name},
long_term_memory=risk_long_term_memory,
toolkit=create_agent_toolkit(
"risk_manager",
config_name,
active_skill_dirs=active_skill_map.get("risk_manager", []),
),
)
pm_long_term_memory = None
@@ -144,44 +206,25 @@ def create_agents(
margin_requirement=margin_requirement,
config={"config_name": config_name},
long_term_memory=pm_long_term_memory,
toolkit_factory=create_agent_toolkit,
toolkit_factory_kwargs={
"active_skill_dirs": active_skill_map.get(
"portfolio_manager",
[],
),
},
)
return analysts, risk_manager, portfolio_manager, long_term_memories
def create_toolkit(analyst_type: str):
"""Create AgentScope Toolkit with tools for specific analyst type"""
from agentscope.tool import Toolkit
from backend.agents.prompt_loader import PromptLoader
from backend.tools.analysis_tools import TOOL_REGISTRY
# Load analyst persona config
prompt_loader = PromptLoader()
personas_config = prompt_loader.load_yaml_config("analyst", "personas")
persona = personas_config.get(analyst_type, {})
# Get tool names for this analyst type
tool_names = persona.get("tools", [])
# Create toolkit and register tools
toolkit = Toolkit()
for tool_name in tool_names:
tool_func = TOOL_REGISTRY.get(tool_name)
if tool_func:
toolkit.register_tool_function(tool_func)
return toolkit
async def run_with_gateway(args):
"""Run with WebSocket gateway"""
is_backtest = args.mode == "backtest"
runtime_config = _resolve_runtime_config(args)
# Load config from env, override with args
tickers = get_env_list("TICKERS", ["AAPL", "MSFT"])
initial_cash = get_env_float("INITIAL_CASH", 100000.0)
margin_requirement = get_env_float("MARGIN_REQUIREMENT", 0.0)
config_name = args.config_name
tickers = runtime_config["tickers"]
initial_cash = runtime_config["initial_cash"]
margin_requirement = runtime_config["margin_requirement"]
# Create market service
market_service = MarketService(
@@ -213,7 +256,7 @@ async def run_with_gateway(args):
config_name=config_name,
initial_cash=initial_cash,
margin_requirement=margin_requirement,
enable_long_term_memory=args.enable_memory,
enable_long_term_memory=runtime_config["enable_memory"],
)
portfolio_state = storage_service.load_portfolio_state()
pm.load_portfolio_state(portfolio_state)
@@ -228,7 +271,7 @@ async def run_with_gateway(args):
risk_manager=risk_manager,
portfolio_manager=pm,
settlement_coordinator=settlement_coordinator,
max_comm_cycles=get_env_int("MAX_COMM_CYCLES", 2),
max_comm_cycles=runtime_config["max_comm_cycles"],
)
# Create scheduler callback
@@ -307,15 +350,17 @@ def main():
args = parser.parse_args()
# Load config from env for logging
tickers = get_env_list("TICKERS", ["AAPL", "MSFT"])
initial_cash = get_env_float("INITIAL_CASH", 100000.0)
runtime_config = _resolve_runtime_config(args)
tickers = runtime_config["tickers"]
initial_cash = runtime_config["initial_cash"]
logger.info("=" * 60)
logger.info(f"Mode: {args.mode}, Config: {args.config_name}")
logger.info(f"Tickers: {tickers}")
logger.info(f"Initial Cash: ${initial_cash:,.2f}")
logger.info(
f"Long-term Memory: {'enabled' if args.enable_memory else 'disabled'}",
"Long-term Memory: %s",
"enabled" if runtime_config["enable_memory"] else "disabled",
)
if args.mode == "backtest":
if not args.start_date or not args.end_date:

View File

@@ -219,6 +219,8 @@ class Gateway:
await self._send_initial_state(websocket)
elif msg_type == "start_backtest":
await self._handle_start_backtest(data)
elif msg_type == "reload_runtime_assets":
await self._handle_reload_runtime_assets()
except websockets.ConnectionClosed:
pass
@@ -236,6 +238,19 @@ class Gateway:
task.add_done_callback(self._handle_backtest_exception)
self._backtest_task = task
async def _handle_reload_runtime_assets(self):
"""Reload prompt assets and active skills without restarting the server."""
result = self.pipeline.reload_runtime_assets()
await self.state_sync.on_system_message(
"Runtime assets reloaded.",
)
await self.broadcast(
{
"type": "runtime_assets_reloaded",
**result,
},
)
async def broadcast(self, message: Dict[str, Any]):
"""Broadcast message to all connected clients"""
if not self.connected_clients:

View File

@@ -0,0 +1,21 @@
---
name: fundamental_review
description: Review a company from a fundamentals-first perspective before issuing a trading signal.
---
# Fundamental Review
Use this skill when the task requires judging business quality, balance-sheet strength, profitability, or long-term earnings durability.
## Workflow
1. Check profitability, growth, financial health, and efficiency before forming a conclusion.
2. Separate durable business quality from short-term noise.
3. State what would invalidate the thesis.
4. End with a clear signal, confidence, and the main drivers behind that signal.
## Guardrails
- Do not rely on one metric in isolation.
- Call out missing data explicitly.
- Prefer conservative conclusions when financial quality is mixed.

View File

@@ -0,0 +1,21 @@
---
name: portfolio_decisioning
description: Synthesize analyst inputs and risk feedback into explicit portfolio decisions.
---
# Portfolio Decisioning
Use this skill when you are responsible for converting team analysis into final trades.
## Workflow
1. Read analyst conclusions and risk warnings before acting.
2. Evaluate the current portfolio, cash, and margin constraints.
3. Record one explicit decision per ticker using the decision tool.
4. Summarize the portfolio-level rationale after all decisions are recorded.
## Guardrails
- Position sizing must respect capital and margin limits.
- Prefer smaller size when analyst conviction and risk signals disagree.
- Do not leave a ticker undecided when the task expects a full slate of decisions.

View File

@@ -0,0 +1,21 @@
---
name: risk_review
description: Assess portfolio and market risks before final position sizing and execution.
---
# Risk Review
Use this skill when you must identify concentration, volatility, leverage, and scenario risks.
## Workflow
1. Review the proposed exposure by ticker and theme.
2. Identify concentration, volatility, liquidity, and leverage concerns.
3. Rank warnings by severity.
4. Translate risk findings into concrete limits or cautions for the portfolio manager.
## Guardrails
- Focus on actionable risk controls.
- Quantify limits when the available data supports it.
- Distinguish fatal blockers from manageable risks.

View File

@@ -0,0 +1,21 @@
---
name: sentiment_review
description: Analyze news flow, market psychology, and insider behavior for catalyst-driven signals.
---
# Sentiment Review
Use this skill when the task depends on recent catalysts, news tone, or behavioral market signals.
## Workflow
1. Review recent news and identify the dominant narrative.
2. Check insider activity for confirming or conflicting signals.
3. Separate durable sentiment shifts from transient noise.
4. Explain how sentiment changes the near-term trade outlook.
## Guardrails
- Do not confuse attention with conviction.
- Highlight when sentiment is strong but unsupported by fundamentals.
- Be explicit about catalyst timing risk.

View File

@@ -0,0 +1,21 @@
---
name: technical_review
description: Evaluate price action, momentum, and volatility to judge timing and market regime.
---
# Technical Review
Use this skill when the task is sensitive to entry timing, trend quality, or short-term market structure.
## Workflow
1. Assess trend direction and strength.
2. Check momentum and mean-reversion conditions.
3. Review volatility before making aggressive recommendations.
4. Convert the setup into a trading view with explicit risk awareness.
## Guardrails
- Distinguish trend continuation from overshoot.
- Avoid strong conviction when signals conflict.
- Treat volatility as a sizing input, not only a directional input.

View File

@@ -0,0 +1,21 @@
---
name: valuation_review
description: Estimate fair value and margin of safety using multiple valuation lenses.
---
# Valuation Review
Use this skill when the task requires determining whether a stock is cheap, expensive, or fairly priced.
## Workflow
1. Use more than one valuation method when possible.
2. Compare intrinsic value estimates with current market pricing.
3. Explain the key assumptions behind the valuation view.
4. State the margin of safety and what could compress or expand it.
## Guardrails
- Treat valuation as a range, not a single precise number.
- Call out assumption sensitivity.
- Avoid high-confidence calls when inputs are sparse or unstable.

View File

@@ -0,0 +1 @@

218
backend/tools/risk_tools.py Normal file
View File

@@ -0,0 +1,218 @@
# -*- coding: utf-8 -*-
"""Risk management tools for the risk manager agent."""
import json
from typing import Any, Dict, Iterable, List
from agentscope.message import TextBlock
from agentscope.tool import ToolResponse
def _to_text_response(text: str) -> ToolResponse:
return ToolResponse(content=[TextBlock(type="text", text=text)])
def _parse_object(payload: Any) -> Dict[str, Any]:
if payload is None:
return {}
if isinstance(payload, dict):
return payload
if isinstance(payload, str):
try:
parsed = json.loads(payload)
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
return {}
return {}
def _parse_prices(payload: Any) -> Dict[str, float]:
parsed = _parse_object(payload)
prices = {}
for ticker, value in parsed.items():
try:
prices[str(ticker)] = float(value)
except (TypeError, ValueError):
continue
return prices
def _iter_positions(
portfolio: Dict[str, Any],
prices: Dict[str, float],
) -> Iterable[Dict[str, Any]]:
positions = portfolio.get("positions", {})
for ticker, raw_position in positions.items():
if not isinstance(raw_position, dict):
continue
price = prices.get(ticker, 0.0)
long_qty = int(raw_position.get("long", 0) or 0)
short_qty = int(raw_position.get("short", 0) or 0)
long_value = long_qty * price
short_value = short_qty * price
net_value = long_value - short_value
gross_value = long_value + short_value
yield {
"ticker": ticker,
"price": price,
"long_qty": long_qty,
"short_qty": short_qty,
"long_value": long_value,
"short_value": short_value,
"net_value": net_value,
"gross_value": gross_value,
}
def _portfolio_equity(portfolio: Dict[str, Any], prices: Dict[str, float]) -> float:
cash = float(portfolio.get("cash", 0.0) or 0.0)
margin_used = float(portfolio.get("margin_used", 0.0) or 0.0)
total = cash + margin_used
for position in _iter_positions(portfolio, prices):
total += position["net_value"]
return total
def assess_position_concentration(
portfolio: Dict[str, Any] | str,
current_prices: Dict[str, float] | str,
) -> ToolResponse:
"""
Assess single-name concentration and gross exposure in the current portfolio.
Args:
portfolio: Portfolio state with cash, positions, and margin fields.
current_prices: Current price map by ticker.
"""
portfolio_obj = _parse_object(portfolio)
prices = _parse_prices(current_prices)
equity = _portfolio_equity(portfolio_obj, prices)
if equity <= 0:
return _to_text_response("Unable to assess concentration: portfolio equity is non-positive.")
exposures: List[Dict[str, Any]] = sorted(
_iter_positions(portfolio_obj, prices),
key=lambda item: abs(item["net_value"]),
reverse=True,
)
if not exposures:
return _to_text_response(
"No open positions. Concentration risk is low because the portfolio is fully in cash."
)
lines = ["=== Position Concentration Assessment ==="]
gross_exposure = sum(item["gross_value"] for item in exposures)
net_exposure = sum(item["net_value"] for item in exposures)
lines.append(f"Portfolio equity: ${equity:,.2f}")
lines.append(f"Gross exposure: ${gross_exposure:,.2f} ({gross_exposure / equity:.1%} of equity)")
lines.append(f"Net exposure: ${net_exposure:,.2f} ({net_exposure / equity:.1%} of equity)")
lines.append("Largest positions by net exposure:")
for item in exposures[:5]:
weight = item["net_value"] / equity
gross_weight = item["gross_value"] / equity
direction = "NET LONG" if item["net_value"] >= 0 else "NET SHORT"
lines.append(
f"- {item['ticker']}: {direction}, net ${item['net_value']:,.2f} ({weight:.1%}), "
f"gross ${item['gross_value']:,.2f} ({gross_weight:.1%})"
)
top_weight = abs(exposures[0]["net_value"]) / equity
if top_weight >= 0.30:
lines.append("Risk flag: concentration is HIGH because the largest single-name exposure exceeds 30% of equity.")
elif top_weight >= 0.20:
lines.append("Risk flag: concentration is MODERATE because the largest single-name exposure exceeds 20% of equity.")
else:
lines.append("Risk flag: concentration is currently contained at the single-name level.")
return _to_text_response("\n".join(lines))
def assess_margin_and_liquidity(
portfolio: Dict[str, Any] | str,
current_prices: Dict[str, float] | str,
) -> ToolResponse:
"""
Assess available cash, margin usage, and short exposure pressure.
Args:
portfolio: Portfolio state with cash, positions, and margin fields.
current_prices: Current price map by ticker.
"""
portfolio_obj = _parse_object(portfolio)
prices = _parse_prices(current_prices)
equity = _portfolio_equity(portfolio_obj, prices)
cash = float(portfolio_obj.get("cash", 0.0) or 0.0)
margin_used = float(portfolio_obj.get("margin_used", 0.0) or 0.0)
margin_requirement = float(portfolio_obj.get("margin_requirement", 0.0) or 0.0)
short_exposure = sum(item["short_value"] for item in _iter_positions(portfolio_obj, prices))
margin_buffer = cash - margin_used
lines = ["=== Margin And Liquidity Assessment ==="]
lines.append(f"Portfolio equity: ${equity:,.2f}")
lines.append(f"Cash available: ${cash:,.2f}")
lines.append(f"Margin used: ${margin_used:,.2f}")
lines.append(f"Margin requirement: {margin_requirement:.1%}")
lines.append(f"Short exposure: ${short_exposure:,.2f}")
lines.append(f"Margin buffer (cash - used): ${margin_buffer:,.2f}")
if equity > 0:
lines.append(f"Margin used / equity: {margin_used / equity:.1%}")
lines.append(f"Short exposure / equity: {short_exposure / equity:.1%}")
if margin_buffer < 0:
lines.append("Risk flag: HIGH. Margin usage exceeds available cash buffer.")
elif equity > 0 and margin_used / equity > 0.35:
lines.append("Risk flag: MODERATE to HIGH. Margin usage is above 35% of equity.")
else:
lines.append("Risk flag: margin pressure is currently manageable.")
return _to_text_response("\n".join(lines))
def assess_volatility_exposure(
tickers: List[str] | str,
current_date: str | None = None,
) -> ToolResponse:
"""
Assess per-ticker volatility and risk level for the current watchlist.
Args:
tickers: List of stock tickers or JSON list string.
current_date: Analysis date in YYYY-MM-DD format.
"""
from datetime import datetime, timedelta
from backend.tools.analysis_tools import _parse_tickers, _resolved_date
from backend.tools.data_tools import get_prices, prices_to_df
from backend.tools.technical_signals import StockTechnicalAnalyzer
tickers_list = _parse_tickers(tickers)
current_date = _resolved_date(current_date)
end_dt = datetime.strptime(current_date, "%Y-%m-%d")
start_date = (end_dt - timedelta(days=90)).strftime("%Y-%m-%d")
analyzer = StockTechnicalAnalyzer()
lines = [f"=== Volatility Exposure Assessment ({current_date}) ==="]
for ticker in tickers_list:
prices = get_prices(
ticker=ticker,
start_date=start_date,
end_date=current_date,
)
if not prices or len(prices) < 5:
lines.append(f"- {ticker}: insufficient price data")
continue
signal = analyzer.analyze(ticker=ticker, df=prices_to_df(prices))
lines.append(
f"- {ticker}: annualized volatility {signal.annualized_volatility_pct:.1f}%, "
f"RSI14 {signal.rsi14:.1f}, trend {signal.trend}, risk level {signal.risk_level}"
)
if len(lines) == 1:
lines.append("No tickers provided.")
return _to_text_response("\n".join(lines))