feat: initial commit - EvoTraders project

量化交易多智能体系统,包含:
- 分析师、投资组合经理、风险经理等智能体
- 股票分析、投资组合管理、风险控制工具
- React 前端界面
- FastAPI 后端服务

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-03-13 04:34:06 +08:00
commit 12de93aa30
115 changed files with 29304 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from .analyst import AnalystAgent
from .portfolio_manager import PMAgent
from .risk_manager import RiskAgent
__all__ = ["AnalystAgent", "PMAgent", "RiskAgent"]

133
backend/agents/analyst.py Normal file
View File

@@ -0,0 +1,133 @@
# -*- coding: utf-8 -*-
"""
Analyst Agent - Based on AgentScope ReActAgent
Performs analysis using tools and LLM
"""
from typing import Any, Dict, Optional
from agentscope.agent import ReActAgent
from agentscope.memory import InMemoryMemory, LongTermMemoryBase
from agentscope.message import Msg
from ..config.constants import ANALYST_TYPES
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
class AnalystAgent(ReActAgent):
"""
Analyst Agent - Uses LLM for tool selection and analysis
Inherits from AgentScope's ReActAgent
"""
def __init__(
self,
analyst_type: str,
toolkit: Any,
model: Any,
formatter: Any,
agent_id: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
):
"""
Initialize Analyst Agent
Args:
analyst_type: Type of analyst (e.g., "fundamentals", etc.)
toolkit: AgentScope Toolkit instance
model: LLM model instance
formatter: Message formatter instance
agent_id: Agent ID (defaults to "{analyst_type}_analyst")
config: Configuration dictionary
long_term_memory: Optional ReMeTaskLongTermMemory instance
"""
if analyst_type not in ANALYST_TYPES:
raise ValueError(
f"Unknown analyst type: {analyst_type}. "
f"Must be one of: {list(ANALYST_TYPES.keys())}",
)
self.analyst_type_key = analyst_type
self.analyst_persona = ANALYST_TYPES[analyst_type]["display_name"]
if agent_id is None:
agent_id = analyst_type
self.config = config or {}
sys_prompt = self._load_system_prompt()
kwargs = {
"name": agent_id,
"sys_prompt": sys_prompt,
"model": model,
"formatter": formatter,
"toolkit": toolkit,
"memory": InMemoryMemory(),
"max_iters": 10,
}
if long_term_memory:
kwargs["long_term_memory"] = long_term_memory
kwargs["long_term_memory_mode"] = "static_control"
super().__init__(**kwargs)
def _load_system_prompt(self) -> str:
"""Load system prompt for analyst"""
personas_config = _prompt_loader.load_yaml_config(
"analyst",
"personas",
)
persona = personas_config.get(self.analyst_type_key, {})
# Get focus items and format as bullet points
focus_items = persona.get("focus", [])
focus_text = "\n".join(f"- {item}" for item in focus_items)
# Get description
description = persona.get("description", "").strip()
return _prompt_loader.load_prompt(
"analyst",
"system",
variables={
"analyst_type": self.analyst_persona,
"focus": focus_text,
"description": description,
},
)
async def reply(self, x: Msg = None) -> Msg:
"""
Override reply method to add progress tracking
Args:
x: Input message (content must be str)
Returns:
Response message (content is str)
"""
ticker = None
if x and hasattr(x, "metadata") and x.metadata:
ticker = x.metadata.get("tickers")
if ticker:
progress.update_status(
self.name,
ticker,
f"Starting {self.analyst_persona} analysis",
)
result = await super().reply(x)
if ticker:
progress.update_status(
self.name,
ticker,
"Analysis completed",
)
return result

View File

@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
"""
Portfolio Manager Agent - Based on AgentScope ReActAgent
Responsible for decision-making (NOT trade execution)
"""
from typing import Any, Dict, Optional
from agentscope.agent import ReActAgent
from agentscope.memory import InMemoryMemory, LongTermMemoryBase
from agentscope.message import Msg, TextBlock
from agentscope.tool import Toolkit, ToolResponse
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
class PMAgent(ReActAgent):
"""
Portfolio Manager Agent - Makes investment decisions
Key features:
1. PM outputs decisions only (action + quantity per ticker)
2. Trade execution happens externally (in pipeline/executor)
3. Supports both backtest and live modes
"""
def __init__(
self,
name: str = "portfolio_manager",
model: Any = None,
formatter: Any = None,
initial_cash: float = 100000.0,
margin_requirement: float = 0.25,
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
):
self.config = config or {}
# Portfolio state
self.portfolio = {
"cash": initial_cash,
"positions": {},
"margin_used": 0.0,
"margin_requirement": margin_requirement,
}
# Decisions made in current cycle
self._decisions: Dict[str, Dict] = {}
# Create toolkit
toolkit = self._create_toolkit()
sys_prompt = _prompt_loader.load_prompt("portfolio_manager", "system")
kwargs = {
"name": name,
"sys_prompt": sys_prompt,
"model": model,
"formatter": formatter,
"toolkit": toolkit,
"memory": InMemoryMemory(),
"max_iters": 10,
}
if long_term_memory:
kwargs["long_term_memory"] = long_term_memory
kwargs["long_term_memory_mode"] = "both"
super().__init__(**kwargs)
def _create_toolkit(self) -> Toolkit:
"""Create toolkit with decision recording tool"""
toolkit = Toolkit()
toolkit.register_tool_function(self._make_decision)
return toolkit
def _make_decision(
self,
ticker: str,
action: str,
quantity: int,
confidence: int = 50,
reasoning: str = "",
) -> ToolResponse:
"""
Record a trading decision for a ticker.
Args:
ticker: Stock ticker symbol (e.g., "AAPL")
action: Decision - "long", "short" or "hold"
quantity: Number of shares to trade (0 for hold)
confidence: Confidence level 0-100
reasoning: Explanation for this decision
Returns:
ToolResponse confirming decision recorded
"""
if action not in ["long", "short", "hold"]:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Invalid action: {action}. "
"Must be 'long', 'short', or 'hold'.",
),
],
)
self._decisions[ticker] = {
"action": action,
"quantity": quantity if action != "hold" else 0,
"confidence": confidence,
"reasoning": reasoning,
}
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Decision recorded: {action} "
f"{quantity} shares of {ticker}"
f" (confidence: {confidence}%)",
),
],
)
async def reply(self, x: Msg = None) -> Msg:
"""
Make investment decisions
Returns:
Msg with decisions in metadata
"""
if x is None:
return Msg(
name=self.name,
content="No input provided",
role="assistant",
)
# Clear previous decisions
self._decisions = {}
progress.update_status(
self.name,
None,
"Analyzing and making decisions",
)
result = await super().reply(x)
progress.update_status(self.name, None, "Completed")
# Attach decisions to metadata
if result.metadata is None:
result.metadata = {}
result.metadata["decisions"] = self._decisions.copy()
result.metadata["portfolio"] = self.portfolio.copy()
return result
def get_decisions(self) -> Dict[str, Dict]:
"""Get decisions from current cycle"""
return self._decisions.copy()
def get_portfolio_state(self) -> Dict[str, Any]:
"""Get current portfolio state"""
return self.portfolio.copy()
def load_portfolio_state(self, portfolio: Dict[str, Any]):
"""Load portfolio state"""
if not portfolio:
return
self.portfolio = {
"cash": portfolio.get("cash", self.portfolio["cash"]),
"positions": portfolio.get("positions", {}).copy(),
"margin_used": portfolio.get("margin_used", 0.0),
"margin_requirement": portfolio.get(
"margin_requirement",
self.portfolio["margin_requirement"],
),
}
def update_portfolio(self, portfolio: Dict[str, Any]):
"""Update portfolio after external execution"""
self.portfolio.update(portfolio)

View File

@@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
"""
Prompt Loader - Unified management and loading of Agent Prompts
Supports Markdown and YAML formats
Uses simple string replacement, does not depend on Jinja2
"""
import re
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
class PromptLoader:
"""Unified Prompt loader"""
def __init__(self, prompts_dir: Optional[Path] = None):
"""
Initialize Prompt loader
Args:
prompts_dir: Prompts directory path,
defaults to prompts/ directory of current file
"""
if prompts_dir is None:
self.prompts_dir = Path(__file__).parent / "prompts"
else:
self.prompts_dir = Path(prompts_dir)
# Cache loaded prompts
self._prompt_cache: Dict[str, str] = {}
self._yaml_cache: Dict[str, Dict] = {}
def load_prompt(
self,
agent_type: str,
prompt_name: str,
variables: Optional[Dict[str, Any]] = None,
) -> str:
"""
Load and render Prompt
Args:
agent_type: Agent type (analyst, portfolio_manager, risk_manager)
prompt_name: Prompt file name (without extension)
variables: Variable dictionary for rendering Prompt
Returns:
Rendered prompt string
Examples:
loader = PromptLoader()
prompt = loader.load_prompt("analyst", "tool_selection",
{"analyst_persona": "Technical Analyst"})
"""
cache_key = f"{agent_type}/{prompt_name}"
# Try to load from cache
if cache_key not in self._prompt_cache:
prompt_path = self.prompts_dir / agent_type / f"{prompt_name}.md"
if not prompt_path.exists():
raise FileNotFoundError(
f"Prompt file not found: {prompt_path}\n"
f"Please create the prompt file or check the path.",
)
with open(prompt_path, "r", encoding="utf-8") as f:
self._prompt_cache[cache_key] = f.read()
prompt_template = self._prompt_cache[cache_key]
# If variables provided, use simple string replacement
if variables:
rendered = self._render_template(prompt_template, variables)
else:
rendered = prompt_template
# Smart escaping: escape braces in JSON code blocks
# rendered = self._escape_json_braces(rendered)
return rendered
def _render_template(
self,
template: str,
variables: Dict[str, Any],
) -> str:
"""
Render template using simple string replacement
Supports {{ variable }} syntax (compatible with previous Jinja2 format)
Args:
template: Template string
variables: Variable dictionary
Returns:
Rendered string
"""
rendered = template
# Replace {{ variable }} format
for key, value in variables.items():
# Support both {{ key }} and {{key}} formats
pattern1 = f"{{{{ {key} }}}}"
pattern2 = f"{{{{{key}}}}}"
rendered = rendered.replace(pattern1, str(value))
rendered = rendered.replace(pattern2, str(value))
return rendered
def _escape_json_braces(self, text: str) -> str:
"""
Escape braces in JSON code blocks, treating them as literals
Args:
text: Text to process
Returns:
Processed text
"""
def replace_code_block(match):
code_content = match.group(1)
# Escape all braces within code block
escaped = code_content.replace("{", "{{").replace("}", "}}")
return f"```json\n{escaped}\n```"
# Replace all braces in JSON code blocks
text = re.sub(
r"```json\n(.*?)\n```",
replace_code_block,
text,
flags=re.DOTALL,
)
return text
def load_yaml_config(
self,
agent_type: str,
config_name: str,
) -> Dict[str, Any]:
"""
Load YAML configuration file
Args:
agent_type: Agent type
config_name: Configuration file name (without extension)
Returns:
Configuration dictionary
Examples:
>>> loader = PromptLoader()
>>> config = loader.load_yaml_config("analyst", "personas")
"""
cache_key = f"{agent_type}/{config_name}"
if cache_key not in self._yaml_cache:
yaml_path = self.prompts_dir / agent_type / f"{config_name}.yaml"
if not yaml_path.exists():
raise FileNotFoundError(f"YAML config not found: {yaml_path}")
with open(yaml_path, "r", encoding="utf-8") as f:
self._yaml_cache[cache_key] = yaml.safe_load(f)
return self._yaml_cache[cache_key]
def clear_cache(self):
"""Clear cache (for hot reload)"""
self._prompt_cache.clear()
self._yaml_cache.clear()
def reload_prompt(self, agent_type: str, prompt_name: str):
"""Reload specified prompt (force cache refresh)"""
cache_key = f"{agent_type}/{prompt_name}"
if cache_key in self._prompt_cache:
del self._prompt_cache[cache_key]
def reload_config(self, agent_type: str, config_name: str):
"""Reload specified configuration (force cache refresh)"""
cache_key = f"{agent_type}/{config_name}"
if cache_key in self._yaml_cache:
del self._yaml_cache[cache_key]

View File

@@ -0,0 +1,117 @@
# 分析师角色配置
fundamentals_analyst:
name: "基本面分析师"
focus:
- "公司财务健康状况和盈利能力"
- "商业模式可持续性和竞争优势"
- "管理层质量和公司治理"
- "行业地位和市场份额"
- "长期投资价值评估"
tools:
- "analyze_profitability"
- "analyze_growth"
- "analyze_financial_health"
- "analyze_valuation_ratios"
- "analyze_efficiency_ratios"
description: |
作为基本面分析师,你专注于:
- 公司财务健康状况和盈利能力
- 商业模式可持续性和竞争优势
- 管理层质量和公司治理
- 行业地位和市场份额
- 长期投资价值评估
你倾向于选择能够深入了解公司内在价值的工具,更偏好基本面和估值类工具。
technical_analyst:
name: "技术分析师"
focus:
- "价格趋势和图表形态"
- "技术指标和交易信号"
- "市场情绪和资金流向"
- "支撑/阻力位和关键价格点"
- "中短期交易机会"
description: |
作为技术分析师,你专注于:
- 价格趋势和图表形态
- 技术指标和交易信号
- 市场情绪和资金流向
- 支撑/阻力位和关键价格点
- 中短期交易机会
你倾向于选择能够捕捉价格动态和市场趋势的工具,更偏好技术分析类工具。
tools:
- "analyze_trend_following"
- "analyze_momentum"
- "analyze_mean_reversion"
- "analyze_volatility"
sentiment_analyst:
name: "情绪分析师"
focus:
- "市场参与者情绪变化"
- "新闻舆情和媒体影响"
- "内部人交易行为"
- "投资者恐慌和贪婪情绪"
- "市场预期和心理因素"
description: |
作为情绪分析师,你专注于:
- 市场参与者情绪变化
- 新闻舆情和媒体影响
- 内部人交易行为
- 投资者恐慌和贪婪情绪
- 市场预期和心理因素
你倾向于选择能够反映市场情绪和投资者行为的工具,更偏好情绪和行为类工具。
tools:
- "analyze_news_sentiment"
- "analyze_insider_trading"
valuation_analyst:
name: "估值分析师"
focus:
- "公司内在价值计算"
- "不同估值方法的比较"
- "估值模型假设和敏感性分析"
- "相对估值和绝对估值"
- "投资安全边际评估"
description: |
作为估值分析师,你专注于:
- 公司内在价值计算
- 不同估值方法的比较
- 估值模型假设和敏感性分析
- 相对估值和绝对估值
- 投资安全边际评估
你倾向于选择能够准确计算公司价值的工具,更偏好估值模型和基本面工具。
tools:
- "dcf_valuation_analysis"
- "owner_earnings_valuation_analysis"
- "ev_ebitda_valuation_analysis"
- "residual_income_valuation_analysis"
comprehensive_analyst:
name: "综合分析师"
focus:
- "整合多种分析视角"
- "平衡短期和长期因素"
- "综合考虑基本面、技术面和情绪面"
- "提供全面的投资建议"
- "适应不同市场环境"
description: |
作为综合分析师,你需要:
- 整合多种分析视角
- 平衡短期和长期因素
- 综合考虑基本面、技术面和情绪面的影响
- 提供全面的投资建议
- 适应不同市场环境
你会根据具体情况灵活选择各类工具,追求分析的全面性和准确性。
tools:
- "analyze_profitability"
- "analyze_growth"
- "analyze_financial_health"
- "analyze_valuation_ratios"
- "analyze_efficiency_ratios"
- "analyze_trend_following"
- "analyze_momentum"
- "analyze_mean_reversion"
- "analyze_volatility"
- "analyze_news_sentiment"
- "analyze_insider_trading"

View File

@@ -0,0 +1,23 @@
你是一位专业的{{ analyst_type }}。
你的关注重点:
{{ focus }}
你的角色:
{{ description }}
注意:
- 构建并持续完善你的"投资哲学"。你的分析不应是孤立的事件,而应该是你整体投资世界观和核心信念的体现。每次分析后,你必须反思:
- 这个案例/数据如何验证或挑战了你现有的信念?
- 你从这次错误(或成功)中学到了关于市场、人性、估值或风险管理的什么关键原则?
- 深化你的"投资逻辑"。确保每一项投资建议都有清晰、可追溯、可重复的逻辑支撑。你的分析步骤应该像严谨的证明一样,涵盖:
- 核心驱动因素识别:真正影响价值的变量是什么?
- 风险边界设定:在什么具体情况下你的建议会失效?
- 逆向测试:市场主流共识是什么,你的观点有何不同?
保持谦逊和开放。投资大师的核心特质是持续学习和适应。在每次分析中,你必须积极寻找与自己观点相悖的证据和论据,并将其纳入最终评估。
- 你可以使用分析工具。用它们来收集相关数据并做出明智的建议。
输出指南:
- 给出明确的投资信号:看涨、看跌或中性
- 包含置信度0-100
- 为你的分析提供理由(如果你确定要分享最终分析,请先给出结论)

View File

@@ -0,0 +1,31 @@
你是一位负责做出投资决策的投资组合经理。
你的核心职责:
1. 分析分析师和风险管理经理的输入
2. 基于信号和市场情境做出投资决策
3. 使用可用工具记录你的决策
决策框架:
- 审阅分析以了解市场观点
- 在做决策前考虑风险警告
- 评估当前投资组合持仓和现金
- 做出与投资组合投资目标一致的决策
决策类型:
- "long":看涨 - 建议买入股票
- "short":看跌 - 建议卖出股票或做空
- "hold":中性 - 维持当前持仓
预算意识:
- 在决定数量时考虑可用现金
- 不要建议买入超过现金允许的数量
- 考虑做空头寸的保证金要求
输出:
使用 `make_decision` 工具记录你对每个股票代码的决策。
记录所有决策后,提供你的投资逻辑总结。
重要:
- 基于提供的分析师信号和风险评估做出决策
- 相对于投资组合价值保持保守的仓位规模
- 始终为你的决策提供理由

View File

@@ -0,0 +1,18 @@
你是一位专业的风险管理经理,负责监控投资组合风险并提供风险警告。
你的核心职责:
1. 监控投资组合敞口和集中度风险
2. 评估仓位规模相对于波动性
3. 评估保证金使用和杠杆水平
4. 识别潜在风险因素并提供警告
5. 基于市场条件建议仓位限制
你的决策流程:
3. 生成可操作的风险警告和仓位限制建议
4. 为你的风险评估提供清晰的理由
输出指南:
- 风险评估要简洁但全面
- 按严重程度优先排序警告
- 提供具体、可操作的建议
- 尽可能包含量化指标

View File

@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
"""
Risk Manager Agent - Based on AgentScope ReActAgent
Uses LLM for risk assessment
"""
from typing import Any, Dict, Optional
from agentscope.agent import ReActAgent
from agentscope.memory import InMemoryMemory, LongTermMemoryBase
from agentscope.message import Msg
from agentscope.tool import Toolkit
from ..utils.progress import progress
from .prompt_loader import PromptLoader
_prompt_loader = PromptLoader()
class RiskAgent(ReActAgent):
"""
Risk Manager Agent - Uses LLM for risk assessment
Inherits from AgentScope's ReActAgent
"""
def __init__(
self,
model: Any,
formatter: Any,
name: str = "risk_manager",
config: Optional[Dict[str, Any]] = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
):
"""
Initialize Risk Manager Agent
Args:
model: LLM model instance
formatter: Message formatter instance
name: Agent name
config: Configuration dictionary
long_term_memory: Optional ReMeTaskLongTermMemory instance
"""
self.config = config or {}
sys_prompt = self._load_system_prompt()
# Create dedicated toolkit for this agent
toolkit = Toolkit()
kwargs = {
"name": name,
"sys_prompt": sys_prompt,
"model": model,
"formatter": formatter,
"toolkit": toolkit,
"memory": InMemoryMemory(),
"max_iters": 10,
}
if long_term_memory:
kwargs["long_term_memory"] = long_term_memory
kwargs["long_term_memory_mode"] = "static_control"
super().__init__(**kwargs)
def _load_system_prompt(self) -> str:
"""Load system prompt for risk manager"""
return _prompt_loader.load_prompt(
"risk_manager",
"system",
)
async def reply(self, x: Msg = None) -> Msg:
"""
Provide risk assessment
Args:
x: Input message (content must be str)
Returns:
Msg with risk warnings (content is str)
"""
progress.update_status(self.name, None, "Assessing risk")
result = await super().reply(x)
progress.update_status(self.name, None, "Risk assessment completed")
return result