Files
evotraders/backend/core/pipeline.py

2213 lines
84 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Core Pipeline - Orchestrates multi-agent analysis and decision-making
"""
# flake8: noqa: E501
# pylint: disable=W0613,C0301
import asyncio
import json
import logging
import os
import re
from contextlib import nullcontext
from datetime import datetime
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional
from agentscope.message import Msg
from agentscope.pipeline import MsgHub
from backend.utils.settlement import SettlementCoordinator
from backend.core.state_sync import StateSync
from backend.utils.trade_executor import PortfolioTradeExecutor
from backend.runtime.manager import TradingRuntimeManager
from backend.runtime.session import TradingSessionKey
from backend.agents.team_pipeline_config import (
resolve_active_analysts,
update_active_analysts,
)
from backend.agents import EvoAgent
from backend.agents.agent_workspace import load_agent_workspace_config
from backend.agents.toolkit_factory import create_agent_toolkit
from backend.agents.workspace_manager import WorkspaceManager
from backend.agents.prompt_loader import get_prompt_loader
from backend.llm.models import get_agent_formatter, get_agent_model
from backend.config.constants import ANALYST_TYPES, AGENT_CONFIG
from backend.agents.dynamic_team_types import AnalystConfig
from backend.tools.dynamic_team_tools import DynamicTeamController, set_controller
def _resolve_evo_agent_ids() -> set[str]:
"""Return agent ids selected to use EvoAgent.
By default, all supported roles use EvoAgent.
EVO_AGENT_IDS can be used to limit to specific roles.
Supported roles:
- analyst roles (fundamentals, technical, sentiment, valuation)
- risk_manager
- portfolio_manager
Example:
EVO_AGENT_IDS=fundamentals_analyst,risk_manager,portfolio_manager
"""
raw = os.getenv("EVO_AGENT_IDS", "")
if not raw.strip():
# Default: all supported roles use EvoAgent
return set(ANALYST_TYPES) | {"risk_manager", "portfolio_manager"}
requested = {
item.strip()
for item in raw.split(",")
if item.strip()
}
return {
agent_id
for agent_id in requested
if agent_id in ANALYST_TYPES or agent_id in {"risk_manager", "portfolio_manager"}
}
# Team infrastructure imports (graceful import - may not exist yet)
try:
from backend.agents.team.team_coordinator import TeamCoordinator
TEAM_COORD_AVAILABLE = True
except ImportError:
TEAM_COORD_AVAILABLE = False
TeamCoordinator = None
logger = logging.getLogger(__name__)
def _log(msg: str) -> None:
"""Helper function for pipeline logging."""
logger.info(msg)
from backend.core.apo import PolicyOptimizer
class TradingPipeline:
"""
Trading Pipeline - Orchestrates the complete trading cycle
Flow:
1. Clear agent short-term memory (avoid cross-day context pollution)
2. Analysts analyze stocks
3. Risk Manager provides risk assessment
4. PM makes decisions (direction + quantity)
5. Execute trades with provided prices
6. Reflection phase: broadcast closing P&L, agents record to long-term memory
Real-time updates via StateSync after each agent completes.
Supports run-scoped EvoAgent loading with workspace-driven configuration.
"""
def __init__(
self,
analysts: List[Any],
risk_manager: Any,
portfolio_manager: Any,
state_sync: Optional["StateSync"] = None,
settlement_coordinator: Optional[SettlementCoordinator] = None,
max_comm_cycles: Optional[int] = None,
workspace_id: Optional[str] = None,
agent_factory: Optional[Any] = None,
runtime_manager: Optional[TradingRuntimeManager] = None,
):
self.analysts = analysts
self.risk_manager = risk_manager
self.pm = portfolio_manager
self.state_sync = state_sync
self.settlement_coordinator = settlement_coordinator
self.max_comm_cycles = max_comm_cycles or int(
os.getenv("MAX_COMM_CYCLES", "2"),
)
self.conference_summary = None # Store latest conference summary
self.workspace_id = workspace_id
self.agent_factory = agent_factory
self.runtime_manager = runtime_manager
self._session_key: Optional[str] = None
self._dynamic_analysts: Dict[str, Any] = {}
self._dynamic_analyst_configs: Dict[str, AnalystConfig] = {}
# Initialize APO (Autonomous Policy Optimizer)
config_name = workspace_id or (runtime_manager.config_name if runtime_manager else "default")
self.apo = PolicyOptimizer(config_name=config_name)
# Initialize dynamic team controller and inject into PM
self._team_controller = DynamicTeamController(
create_callback=self._create_runtime_analyst,
remove_callback=self._remove_runtime_analyst,
get_analysts_callback=lambda: self._all_analysts() + [self.risk_manager, self.pm],
)
set_controller(self._team_controller)
# Backward compatibility: also set individual callbacks if PM expects them
if hasattr(self.pm, "set_team_controller"):
self.pm.set_team_controller(
create_agent_callback=self._create_runtime_analyst,
remove_agent_callback=self._remove_runtime_analyst,
)
async def run_cycle(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]] = None,
close_prices: Optional[Dict[str, float]] = None,
market_caps: Optional[Dict[str, float]] = None,
get_open_prices_fn: Optional[
Callable[[], Awaitable[Dict[str, float]]]
] = None,
get_close_prices_fn: Optional[
Callable[[], Awaitable[Dict[str, float]]]
] = None,
execute_decisions: bool = True,
) -> Dict[str, Any]:
"""
Run one complete trading cycle with checkpointing support.
"""
_log(f"Starting cycle {date} - {len(tickers)} tickers")
session_key = TradingSessionKey(date=date).key()
self._session_key = session_key
active_analysts = self._get_active_analysts()
self._sync_agent_runtime_context(
agents=active_analysts + [self.risk_manager, self.pm],
session_key=session_key,
)
# Load checkpoint if exists
checkpoint = self._load_checkpoint(session_key)
checkpoint_data = checkpoint.get("data", {}) if checkpoint else {}
last_phase = checkpoint.get("phase") if checkpoint else None
if checkpoint:
_log(f"Resuming from checkpoint: {last_phase}")
# Restore state from checkpoint
analyst_results = checkpoint_data.get("analyst_results", [])
risk_assessment = checkpoint_data.get("risk_assessment", {})
self.conference_summary = checkpoint_data.get("conference_summary")
final_predictions = checkpoint_data.get("final_predictions", [])
pm_result = checkpoint_data.get("pm_result", {})
execution_result = checkpoint_data.get("execution_result", {})
settlement_result = checkpoint_data.get("settlement_result")
# Prefer passed prices if not hold in checkpoint
if not prices:
prices = checkpoint_data.get("prices")
if not close_prices:
close_prices = checkpoint_data.get("close_prices")
else:
analyst_results = []
risk_assessment = {}
self.conference_summary = None
final_predictions = []
pm_result = {}
execution_result = {}
settlement_result = None
if self.runtime_manager:
self.runtime_manager.set_session_key(session_key)
self._runtime_log_event("cycle:start", {"tickers": tickers, "date": date, "resumed": checkpoint is not None})
self._runtime_batch_status(active_analysts, "analysis_in_progress")
# Phase 0: Clear memory (only if not resuming or if resuming from very start)
if not last_phase:
_log("Phase 0: Clearing memory")
await self._clear_all_agent_memory()
participants = self._all_analysts() + [self.risk_manager, self.pm]
# Single MsgHub for entire cycle - no nesting
async with MsgHub(
participants=participants,
announcement=Msg(
"system",
f"Starting analysis cycle for {date}. Tickers: {', '.join(tickers)}",
"system",
),
):
# Phase 1.0: PM assesses team coverage and expands if needed
if not last_phase or last_phase == "cleared":
_log("Phase 1.0: Team gap assessment")
await self._run_team_gap_assessment(
tickers=tickers,
date=date,
prices=prices,
)
active_analysts = self._get_active_analysts()
if self.runtime_manager:
self._runtime_batch_status(active_analysts, "analysis_in_progress")
self._save_checkpoint(session_key, "team_assessment", {
"prices": prices,
"close_prices": close_prices,
})
last_phase = "team_assessment"
# Phase 1.1: Analysts
if last_phase == "team_assessment":
_log("Phase 1.1: Analyst analysis (parallel)")
analyst_results = await self._run_analysts_parallel(
tickers,
date,
active_analysts=active_analysts,
)
self._save_checkpoint(session_key, "analysis", {
"analyst_results": analyst_results,
"prices": prices,
"close_prices": close_prices
})
last_phase = "analysis"
# Phase 1.2: Risk Manager
if last_phase == "analysis":
_log("Phase 1.2: Risk assessment")
self._runtime_update_status(self.risk_manager, "risk_assessment")
risk_assessment = await self._run_risk_manager_with_sync(
tickers,
date,
prices,
)
self._save_checkpoint(session_key, "risk_assessment", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"prices": prices,
"close_prices": close_prices
})
last_phase = "risk_assessment"
# Phase 2.1: Conference discussion
if last_phase == "risk_assessment":
_log("Phase 2.1: Conference discussion")
conference_summary = await self._run_conference_cycles(
tickers=tickers,
date=date,
prices=prices,
analyst_results=analyst_results,
risk_assessment=risk_assessment,
)
self.conference_summary = conference_summary
self._save_checkpoint(session_key, "conference", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"prices": prices,
"close_prices": close_prices
})
last_phase = "conference"
# Phase 2.2: Analysts generate final structured predictions
if last_phase == "conference":
_log("Phase 2.2: Analysts generate final structured predictions")
final_predictions = await self._collect_final_predictions(
tickers,
date,
active_analysts=active_analysts,
)
self._save_checkpoint(session_key, "predictions", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"final_predictions": final_predictions,
"prices": prices,
"close_prices": close_prices
})
last_phase = "predictions"
# Record final predictions
if last_phase == "predictions" and self.settlement_coordinator:
self.settlement_coordinator.record_analyst_predictions(
final_predictions,
)
# Live mode: wait for market open
if not prices and get_open_prices_fn:
_log("Waiting for market open...")
prices = await get_open_prices_fn()
_log(f"Got open prices: {prices}")
# Update prices in checkpoint if we just got them
self._save_checkpoint(session_key, "predictions", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"final_predictions": final_predictions,
"prices": prices,
"close_prices": close_prices
})
# Phase 3: PM makes decisions
if last_phase == "predictions":
_log("Phase 3.1: PM makes decisions")
self._runtime_update_status(self.pm, "decision_phase")
pm_result = await self._run_pm_with_sync(
tickers,
date,
prices,
analyst_results,
risk_assessment,
)
self._save_checkpoint(session_key, "decisions", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"final_predictions": final_predictions,
"pm_result": pm_result,
"prices": prices,
"close_prices": close_prices
})
last_phase = "decisions"
# Outside MsgHub for execution and settlement
decisions = pm_result.get("decisions", {}) if pm_result else {}
if not execution_result:
execution_result = {
"executed_trades": [],
"portfolio": self.pm.get_portfolio_state(),
}
if last_phase == "decisions":
if execute_decisions:
_log("Phase 4: Executing trades")
self._runtime_update_status(self.pm, "executing")
execution_result = self._execute_decisions(decisions, prices, date)
else:
_log("Phase 4: Skipping trade execution")
self._save_checkpoint(session_key, "execution", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"final_predictions": final_predictions,
"pm_result": pm_result,
"execution_result": execution_result,
"prices": prices,
"close_prices": close_prices
})
last_phase = "execution"
# Live mode: wait for market close
if not close_prices and get_close_prices_fn:
_log("Waiting for market close")
close_prices = await get_close_prices_fn()
_log(f"Got close prices: {close_prices}")
# Update close_prices in checkpoint
self._save_checkpoint(session_key, "execution", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"final_predictions": final_predictions,
"pm_result": pm_result,
"execution_result": execution_result,
"prices": prices,
"close_prices": close_prices
})
# Phase 5: Settlement
if last_phase == "execution":
if close_prices and self.settlement_coordinator:
_log("Phase 5: Daily review and generate memories")
self._runtime_batch_status(
[self.risk_manager] + self._all_analysts() + [self.pm],
"settlement",
)
agent_trajectories = await self._capture_agent_trajectories()
if market_caps is None:
market_caps = {ticker: 1e9 for ticker in tickers}
settlement_result = (
self.settlement_coordinator.run_daily_settlement(
date=date,
tickers=tickers,
open_prices=prices,
close_prices=close_prices,
market_caps=market_caps,
agent_portfolio=execution_result.get("portfolio", {}),
analyst_results=analyst_results,
pm_decisions=decisions,
)
)
await self._run_reflection(
date=date,
agent_trajectories=agent_trajectories,
analyst_results=analyst_results,
decisions=decisions,
executed_trades=execution_result.get("executed_trades", []),
open_prices=prices,
close_prices=close_prices,
settlement_result=settlement_result,
conference_summary=self.conference_summary,
)
self._runtime_batch_status(
[self.risk_manager] + self._all_analysts() + [self.pm],
"reflection",
)
self._save_checkpoint(session_key, "settlement", {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"conference_summary": conference_summary,
"final_predictions": final_predictions,
"pm_result": pm_result,
"execution_result": execution_result,
"settlement_result": settlement_result,
"prices": prices,
"close_prices": close_prices
})
last_phase = "settlement"
_log(f"Cycle complete: {date}")
self._runtime_batch_status(
self._all_analysts() + [self.risk_manager, self.pm],
"idle",
)
self._runtime_log_event("cycle:end", {"tickers": tickers, "date": date})
# Optional: Clean up checkpoint after successful completion
# path = self._get_checkpoint_path(session_key)
# if path and path.exists():
# path.unlink()
return {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"pm_decisions": decisions,
"executed_trades": execution_result.get("executed_trades", []),
"portfolio": execution_result.get("portfolio", {}),
"settlement_result": settlement_result,
}
def reload_runtime_assets(
self,
runtime_config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Reload prompt assets and safe in-process runtime settings."""
from backend.agents.skills_manager import SkillsManager
from backend.agents.toolkit_factory import load_agent_profiles
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
if runtime_config and "max_comm_cycles" in runtime_config:
self.max_comm_cycles = int(runtime_config["max_comm_cycles"])
skills_manager = SkillsManager()
profiles = load_agent_profiles()
active_skill_map = skills_manager.prepare_active_skills(
config_name=config_name,
agent_defaults={
agent_id: profile.get("skills", [])
for agent_id, profile in profiles.items()
},
)
for analyst in self._all_analysts():
analyst.reload_runtime_assets(
active_skill_dirs=active_skill_map.get(analyst.name, []),
)
self.risk_manager.reload_runtime_assets(
active_skill_dirs=active_skill_map.get("risk_manager", []),
)
self.pm.reload_runtime_assets(
active_skill_dirs=active_skill_map.get("portfolio_manager", []),
)
return {
"config_name": config_name,
"reloaded_agents": [agent.name for agent in self._all_analysts()]
+ ["risk_manager", "portfolio_manager"],
"active_skills": {
agent_id: [path.name for path in paths]
for agent_id, paths in active_skill_map.items()
},
"max_comm_cycles": self.max_comm_cycles,
}
async def _clear_all_agent_memory(self):
"""Clear short-term memory for all agents"""
for analyst in self._all_analysts():
await analyst.memory.clear()
await self.risk_manager.memory.clear()
await self.pm.memory.clear()
def _get_checkpoint_path(self, session_key: str) -> Optional[Path]:
"""Get the path to the pipeline checkpoint file."""
if not self.runtime_manager or not self.runtime_manager.run_dir:
return None
checkpoint_dir = self.runtime_manager.run_dir / "state" / "checkpoints"
checkpoint_dir.mkdir(parents=True, exist_ok=True)
return checkpoint_dir / f"pipeline_{session_key}.json"
def _save_checkpoint(self, session_key: str, phase: str, data: Dict[str, Any]) -> None:
"""Save the current pipeline state to a checkpoint file."""
path = self._get_checkpoint_path(session_key)
if not path:
return
checkpoint = {
"session_key": session_key,
"phase": phase,
"timestamp": datetime.now().isoformat(),
"data": data
}
try:
path.write_text(json.dumps(checkpoint, ensure_ascii=False, indent=2, default=str), encoding="utf-8")
_log(f"Checkpoint saved: {phase} for {session_key}")
except Exception as e:
logger.error(f"Failed to save checkpoint: {e}")
def _load_checkpoint(self, session_key: str) -> Optional[Dict[str, Any]]:
"""Load the pipeline state from a checkpoint file."""
path = self._get_checkpoint_path(session_key)
if not path or not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
logger.error(f"Failed to load checkpoint: {e}")
return None
async def _sync_memory_if_retrieved(self, agent: Any) -> None:
"""
Check agent's short-term memory for retrieved long-term memory and sync to frontend.
AgentScope's ReActAgent adds a Msg with name="long_term_memory" when
memory is retrieved in static_control mode.
"""
if not self.state_sync:
return
try:
msgs = await agent.memory.get_memory()
for msg in msgs:
if getattr(msg, "name", None) == "long_term_memory":
content = self._extract_text_content(msg.content)
if content:
parsed = self._parse_memory_content(content)
await self.state_sync.on_memory_retrieved(
agent_id=agent.name,
content=parsed,
)
break # Only sync the first (most recent) memory retrieval
except Exception as e:
logger.warning(f"Failed to sync memory for {agent.name}: {e}")
def _parse_memory_content(self, content: str) -> str:
"""
Parse memory content to extract rewritten_context from JSON format.
AgentScope ReMe memory wraps content in <long_term_memory> tags with JSON.
"""
# Try to extract JSON from the content
print("memory content:\n", content)
json_match = re.search(
r"<long_term_memory>.*?```json\s*(\{[\s\S]*?\})\s*```\s*</long_term_memory>",
content,
re.DOTALL,
)
if not json_match:
json_match = re.search(
r'\{[^{}]*"rewritten_context"[^{}]*\}',
content,
re.DOTALL,
)
if json_match:
try:
json_str = json_match.group(1)
data = json.loads(json_str)
return data.get("rewritten_context", "")
except json.JSONDecodeError:
pass
# Fallback: strip XML tags and return cleaned content
content = re.sub(r"</?long_term_memory>", "", content)
content = re.sub(
r"The content below are retrieved from long-term memory.*?:\s*",
"",
content,
)
return content.strip()
async def _capture_agent_trajectories(self) -> Dict[str, List[Msg]]:
"""
Capture execution trajectories from all agents' short-term memory
This should be called BEFORE clearing memory to preserve the
complete execution trajectory for long-term memory recording.
Returns:
Dict mapping agent name to list of Msg objects (the trajectory)
"""
trajectories = {}
# Capture analyst trajectories
for analyst in self._all_analysts():
try:
msgs = await analyst.memory.get_memory()
if msgs:
trajectories[analyst.name] = list(msgs)
except Exception as e:
logger.warning(
f"Failed to capture trajectory for {analyst.name}: {e}",
)
# Capture risk manager trajectory
try:
msgs = await self.risk_manager.memory.get_memory()
if msgs:
trajectories["risk_manager"] = list(msgs)
except Exception as e:
logger.warning(
f"Failed to capture trajectory for risk_manager: {e}",
)
# Capture PM trajectory
try:
msgs = await self.pm.memory.get_memory()
if msgs:
trajectories["portfolio_manager"] = list(msgs)
except Exception as e:
logger.warning(
f"Failed to capture trajectory for portfolio_manager: {e}",
)
return trajectories
async def _run_reflection(
self,
date: str,
agent_trajectories: Dict[str, List[Msg]],
analyst_results: List[Dict[str, Any]],
decisions: Dict[str, Dict],
executed_trades: List[Dict],
open_prices: Optional[Dict[str, float]],
close_prices: Optional[Dict[str, float]],
settlement_result: Optional[Dict[str, Any]] = None,
conference_summary: Optional[str] = None,
):
"""
Run reflection phase after market close
Calculates actual P&L and records execution trajectory to long-term memory
Args:
date: Trading date
agent_trajectories: Dict mapping agent name to their execution trajectory
analyst_results: Results from analyst agents
decisions: PM decisions
executed_trades: List of executed trades
open_prices: Opening prices
close_prices: Closing prices
settlement_result: Optional settlement results with baseline performance
conference_summary: Optional summary from conference discussion
"""
# Calculate P&L for each trade
trade_pnl = []
for trade in executed_trades:
ticker = trade["ticker"]
action = trade["action"]
quantity = trade["quantity"]
entry_price = trade["price"]
exit_price = close_prices.get(ticker, entry_price)
if action == "long":
pnl = (exit_price - entry_price) * quantity
elif action == "short":
pnl = (entry_price - exit_price) * quantity
else:
pnl = 0
pnl_pct = (
(pnl / (entry_price * quantity) * 100) if quantity > 0 else 0
)
trade_pnl.append(
{
"ticker": ticker,
"action": action,
"quantity": quantity,
"entry_price": entry_price,
"exit_price": exit_price,
"pnl": pnl,
"pnl_pct": pnl_pct,
},
)
total_pnl = sum(t["pnl"] for t in trade_pnl)
# Build reflection summary with settlement info
reflection_content = self._build_reflection_content(
date=date,
analyst_results=analyst_results,
decisions=decisions,
trade_pnl=trade_pnl,
total_pnl=total_pnl,
settlement_result=settlement_result,
conference_summary=conference_summary,
)
# Record execution trajectories to long-term memory for agents that support it
# Score based on profitability: higher score for profitable days
score = 1.0 if total_pnl > 0 else 0.0
await self._record_to_long_term_memory(
date=date,
agent_trajectories=agent_trajectories,
trade_pnl=trade_pnl,
total_pnl=total_pnl,
score=score,
)
# Broadcast reflection to StateSync
if self.state_sync:
await self.state_sync.on_agent_complete(
agent_id="Daily Log",
content=reflection_content,
agent_name="每日记录",
)
# Phase 6: APO (Autonomous Policy Optimization)
# If the day was a loss, let APO suggest and apply policy updates.
if hasattr(self, "apo") and self.apo:
_log(f"Phase 6: APO - Running autonomous policy optimization for {date}")
try:
apo_result = await self.apo.run_optimization(
date=date,
reflection_content=reflection_content,
settlement_result=settlement_result or {"portfolio_value": 100000.0 + total_pnl},
analyst_results=analyst_results,
decisions=decisions
)
if apo_result.get("status") == "completed":
_log(f"APO: Successfully applied {len(apo_result.get('optimizations', []))} policy updates.")
# Reload assets for next cycle to ensure they are picked up
self.reload_runtime_assets()
except Exception as e:
logger.error(f"APO: Optimization failed: {e}")
def _build_reflection_content(
self,
date: str,
analyst_results: List[Dict[str, Any]],
decisions: Dict[str, Dict],
trade_pnl: List[Dict],
total_pnl: float,
settlement_result: Optional[Dict[str, Any]] = None,
conference_summary: Optional[str] = None,
) -> str:
"""Build human-readable reflection content"""
lines = [f"Daily log for {date}:"]
lines.append(f"Total P&L: ${total_pnl:,.2f}")
lines.append("")
if conference_summary:
lines.append("Conference Discussion Summary:")
lines.append(conference_summary)
lines.append("")
if settlement_result:
baseline_values = settlement_result.get("baseline_values", {})
initial = 100000.0
lines.append("Baseline Comparison:")
lines.append(
f" Equal Weight: ${baseline_values.get('equal_weight', 0):,.2f} "
f"({(baseline_values.get('equal_weight', initial) - initial) / initial * 100:.2f}%)",
)
lines.append(
f" Market Cap Weighted: ${baseline_values.get('market_cap_weighted', 0):,.2f} "
f"({(baseline_values.get('market_cap_weighted', initial) - initial) / initial * 100:.2f}%)",
)
lines.append(
f" Momentum: ${baseline_values.get('momentum', 0):,.2f} "
f"({(baseline_values.get('momentum', initial) - initial) / initial * 100:.2f}%)",
)
lines.append("")
if trade_pnl:
lines.append("Trade Results:")
for t in trade_pnl:
pnl_sign = "+" if t["pnl"] >= 0 else ""
lines.append(
f" {t['ticker']}: {t['action'].upper()} {t['quantity']} @ "
f"${t['entry_price']:.2f} -> ${t['exit_price']:.2f}, "
f"P&L: {pnl_sign}${t['pnl']:.2f} ({pnl_sign}{t['pnl_pct']:.1f}%)",
)
else:
lines.append("No trades executed today.")
return "\n".join(lines)
async def _record_to_long_term_memory(
self,
date: str,
agent_trajectories: Dict[str, List[Msg]],
trade_pnl: List[Dict],
total_pnl: float,
score: float,
):
"""
Record execution trajectories to long-term memory for all agents
This method records the actual execution trajectory (conversation history)
from each agent's short-term memory. This allows the ReMe memory system
to learn from the complete task execution flow, not just summaries.
Args:
date: Trading date
agent_trajectories: Dict mapping agent name to their execution trajectory
trade_pnl: P&L details for each trade
total_pnl: Total P&L for the day
score: Score for this trajectory (1.0 for profitable, 0.5 for loss)
"""
# Build outcome message to append to trajectories
outcome_msg = Msg(
role="user",
content=f"You are an analyst/financial manager, The Key point is to predict correctly and"
f"have good P&L. The Definition of loss is when P&L < 0. "
f"Focus on how to do good prediction but not only execution correctly."
f"[Outcome] Trading day {date} - Total P&L: ${total_pnl:,.2f}. "
f"{'Profitable day.' if total_pnl > 0 else 'Loss day.'}",
name="system",
)
# Record for analysts
for analyst in self._all_analysts():
if (
hasattr(analyst, "long_term_memory")
and analyst.long_term_memory is not None
):
trajectory = agent_trajectories.get(analyst.name, [])
if trajectory:
# Append outcome to trajectory
trajectory_with_outcome = trajectory + [outcome_msg]
try:
await analyst.long_term_memory.record(
msgs=trajectory_with_outcome,
score=score,
)
logger.debug(
f"Recorded {len(trajectory_with_outcome)} messages "
f"to long-term memory for {analyst.name}",
)
except Exception as e:
logger.warning(
f"Failed to record to long-term memory for {analyst.name}: {e}",
)
# Record for risk manager
if (
hasattr(self.risk_manager, "long_term_memory")
and self.risk_manager.long_term_memory is not None
):
trajectory = agent_trajectories.get("risk_manager", [])
if trajectory:
trajectory_with_outcome = trajectory + [outcome_msg]
try:
await self.risk_manager.long_term_memory.record(
msgs=trajectory_with_outcome,
score=score,
)
logger.debug(
f"Recorded {len(trajectory_with_outcome)} messages "
f"to long-term memory for risk_manager",
)
except Exception as e:
logger.warning(
f"Failed to record to long-term memory for risk_manager: {e}",
)
# Record for PM with trade outcome details
if (
hasattr(self.pm, "long_term_memory")
and self.pm.long_term_memory is not None
):
trajectory = agent_trajectories.get("portfolio_manager", [])
if trajectory:
# Build detailed outcome message for PM
pnl_details = []
for t in trade_pnl:
pnl_sign = "+" if t["pnl"] >= 0 else ""
pnl_details.append(
f"{t['ticker']}: {t['action']} {t['quantity']} @ "
f"${t['entry_price']:.2f} -> ${t['exit_price']:.2f}, "
f"P&L: {pnl_sign}${t['pnl']:.2f}",
)
pm_outcome_msg = Msg(
role="user",
content=f"[Outcome] Trading day {date}\n"
f"Total P&L: ${total_pnl:,.2f} "
f"({'Profitable' if total_pnl >= 0 else 'Loss'})\n"
f"Trade details:\n" + "\n".join(pnl_details)
if pnl_details
else f"[Outcome] Trading day {date}\n"
f"Total P&L: ${total_pnl:,.2f}\nNo trades executed.",
name="system",
)
trajectory_with_outcome = trajectory + [pm_outcome_msg]
try:
await self.pm.long_term_memory.record(
msgs=trajectory_with_outcome,
score=score,
)
logger.debug(
f"Recorded {len(trajectory_with_outcome)} messages "
f"to long-term memory for portfolio_manager",
)
except Exception as e:
logger.warning(
f"Failed to record to long-term memory for portfolio_manager: {e}",
)
async def _run_conference_cycles(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> Optional[str]:
"""
Run conference discussion cycles (within existing MsgHub context)
No nested MsgHub - this runs inside the main cycle's MsgHub.
Returns:
Conference summary string generated by PM
"""
if self.max_comm_cycles <= 0:
_log(
"Phase 2.1: Conference discussion - "
"Conference skipped (disabled)",
)
return None
conference_title = f"Investment Discussion - {date}"
if self.state_sync:
await self.state_sync.on_conference_start(
title=conference_title,
date=date,
)
# Conference participants: analysts + PM
conference_participants = self._get_active_analysts() + [self.pm]
# Use TeamMsgHub for conference if available
if TEAM_COORD_AVAILABLE and TeamMsgHub is not None:
_log(
f"Phase 2.1: Conference using TeamMsgHub with "
f"{len(conference_participants)} participants"
)
conference_hub = TeamMsgHub(participants=conference_participants)
else:
_log("Phase 2.1: Conference using standard MsgHub context")
conference_hub = None
# Run discussion cycles
async with conference_hub if conference_hub else nullcontext(None):
for cycle in range(self.max_comm_cycles):
_log(
"Phase 2.1: Conference discussion - "
f"Conference {cycle + 1}/{self.max_comm_cycles}",
)
if self.state_sync:
await self.state_sync.on_conference_cycle_start(
cycle=cycle + 1,
total_cycles=self.max_comm_cycles,
)
# PM sets agenda or asks questions
pm_prompt = self._build_pm_discussion_prompt(
cycle=cycle,
tickers=tickers,
date=date,
prices=prices,
analyst_results=analyst_results,
risk_assessment=risk_assessment,
)
pm_msg = Msg(name="system", content=pm_prompt, role="user")
pm_response = await self.pm.reply(pm_msg)
pm_content = self._extract_text_content(pm_response.content)
if self.state_sync:
await self.state_sync.on_conference_message(
agent_id="portfolio_manager",
content=pm_content,
agent_name=self._resolve_agent_display_name("portfolio_manager"),
)
# Analysts share perspectives (supports per-round active team updates)
for analyst in self._get_active_analysts():
analyst_prompt = self._build_analyst_discussion_prompt(
cycle=cycle,
tickers=tickers,
date=date,
)
analyst_msg = Msg(
name="system",
content=analyst_prompt,
role="user",
)
analyst_response = await analyst.reply(analyst_msg)
if self.state_sync:
analyst_content = self._extract_text_content(
analyst_response.content,
)
await self.state_sync.on_conference_message(
agent_id=analyst.name,
content=analyst_content,
agent_name=self._resolve_agent_display_name(analyst.name),
)
if self.state_sync:
await self.state_sync.on_conference_cycle_end(
cycle=cycle + 1,
)
# Generate conference summary by PM
_log(
"Phase 2.1: Conference discussion - Generating conference summary",
)
summary_prompt = (
f"The conference discussion for {date} has concluded. "
f"As Portfolio Manager, provide a concise summary of the key insights, "
f"concerns, and consensus points discussed about {', '.join(tickers)}. "
f"Highlight any critical factors that should be considered in the final decision-making."
)
summary_msg = Msg(name="system", content=summary_prompt, role="user")
summary_response = await self.pm.reply(summary_msg)
conference_summary = self._extract_text_content(
summary_response.content,
)
_log(
"Phase 2.1: Conference discussion - Conference summary generated",
)
if self.state_sync:
await self.state_sync.on_conference_message(
agent_id="conference summary",
content=conference_summary,
agent_name="会议总结",
)
await self.state_sync.on_conference_end()
return conference_summary
def _build_pm_discussion_prompt(
self,
cycle: int,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> str:
"""Build PM discussion prompt with full context"""
# Get current portfolio state
portfolio = self.pm.get_portfolio_state()
if cycle == 0:
# First cycle: provide full context
context_lines = [
f"As Portfolio Manager, review the following information for {date}:",
"",
"=== Current Portfolio ===",
f"Cash: ${portfolio.get('cash', 0):,.2f}",
f"Positions: {json.dumps(portfolio.get('positions', {}), indent=2)}",
"",
"=== Current Prices ===",
json.dumps(prices, indent=2),
"",
"=== Analyst Signals ===",
]
# Add analyst results summary
for result in analyst_results:
agent_name = result.get("agent", "Unknown")
content = result.get("content", "")
context_lines.append(f"{agent_name}: {content}")
context_lines.extend(
[
"",
"=== Risk Assessment ===",
str(risk_assessment.get("content", "")),
"",
"Based on the above context, share your key concerns or questions about the opportunities in "
f"{', '.join(tickers)}. Do not make final decisions yet - this is a discussion phase.",
],
)
return "\n".join(context_lines)
else:
return (
f"Continue the discussion. Share your thoughts on the perspectives raised "
f"and any remaining concerns about {', '.join(tickers)}."
)
async def _run_team_gap_assessment(
self,
*,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
) -> str:
active_analysts = self._get_active_analysts()
team_summary = [
{
"agent_id": analyst.name,
"display_name": self._resolve_agent_display_name(analyst.name),
}
for analyst in active_analysts
]
prompt = (
f"As Portfolio Manager, perform a team coverage assessment before analysis for {date}.\n\n"
f"Tickers: {', '.join(tickers)}\n"
f"Current team: {json.dumps(team_summary, ensure_ascii=False, indent=2)}\n"
f"Current prices snapshot: {json.dumps(prices, ensure_ascii=False, indent=2) if prices else 'N/A'}\n\n"
"Your job in this phase is not to make investment decisions. "
"First decide whether the current team has enough domain coverage. "
"If the team is insufficient, immediately call dynamic team tools to create or clone the needed analysts now. "
"Before creating any analyst, explicitly check whether an existing analyst already covers that role. "
"Do not create duplicate roles with different IDs but the same responsibilities. "
"If the current team is sufficient, explicitly say the current team is sufficient and explain why."
)
msg = Msg(name="system", content=prompt, role="user")
response = await self.pm.reply(msg)
pm_content = self._extract_text_content(response.content)
enforced_pm_content = await self._enforce_pm_team_expansion_if_needed(
tickers=tickers,
date=date,
pm_content=pm_content,
)
if enforced_pm_content:
pm_content = enforced_pm_content
if self.state_sync:
await self.state_sync.on_agent_complete(
agent_id="portfolio_manager",
agent_name=self._resolve_agent_display_name("portfolio_manager"),
content=pm_content,
)
return pm_content
def _pm_requests_team_expansion(self, text: str) -> bool:
normalized = (text or "").strip().lower()
if not normalized:
return False
phrases = [
"创建",
"新增分析师",
"补充分析师",
"扩编团队",
"需要行业分析师",
"需要量化分析师",
"需要宏观分析师",
"需要补充",
"先扩编",
"create analyst",
"create a new analyst",
"add analyst",
"expand the team",
"need a specialist",
"need another analyst",
]
return any(phrase in normalized for phrase in phrases)
async def _enforce_pm_team_expansion_if_needed(
self,
*,
tickers: List[str],
date: str,
pm_content: str,
) -> Optional[str]:
if not self._pm_requests_team_expansion(pm_content):
return None
before_ids = {agent.name for agent in self._get_active_analysts()}
followup_prompt = (
f"You identified a team coverage gap for {date} across {', '.join(tickers)}. "
"This is still the pre-analysis team assessment phase. "
"Do not merely recommend adding analysts. If additional analysts are needed, "
"you must now call the dynamic team tools (`create_analyst` or `clone_analyst`) "
"to add the required specialists before analyst analysis begins. "
"Only after the tool call succeeds may you explain why the new analysts were added. "
"If you truly believe the current team is sufficient, explicitly say the current team is sufficient."
)
followup_msg = Msg(name="system", content=followup_prompt, role="user")
followup_response = await self.pm.reply(followup_msg)
followup_content = self._extract_text_content(followup_response.content)
after_ids = {agent.name for agent in self._get_active_analysts()}
if after_ids != before_ids:
created = sorted(after_ids - before_ids)
logger.info(
"PM team expansion enforced successfully before analysis; added analysts=%s",
created,
)
else:
logger.info(
"PM mentioned expansion in team assessment but did not add analysts after enforcement prompt",
)
return followup_content
def _build_analyst_discussion_prompt(
self,
cycle: int,
tickers: List[str],
date: str,
) -> str:
"""Build analyst discussion prompt"""
return (
f"Share your perspective on the discussion so far. "
f"Provide insights or address concerns raised by others about {', '.join(tickers)}. "
f"Do not use tools - focus on sharing your professional opinion."
)
def _resolve_agent_display_name(self, agent_id: str) -> str:
runtime_name = None
if self.runtime_manager:
state = self.runtime_manager.get_agent_state(agent_id)
runtime_name = getattr(state, "display_name", None) if state else None
if isinstance(runtime_name, str) and runtime_name.strip():
return runtime_name.strip()
static_name = AGENT_CONFIG.get(agent_id, {}).get("name")
if isinstance(static_name, str) and static_name.strip():
return static_name.strip()
profile_path = Path(__file__).resolve().parents[2] / "runs" / self.runtime_manager.config_name / "agents" / agent_id / "PROFILE.md" if self.runtime_manager else None
if profile_path and profile_path.exists():
try:
raw = profile_path.read_text(encoding="utf-8")
for line in raw.splitlines():
text = line.strip()
if text.startswith("角色定位:"):
value = text.split("", 1)[1].strip()
if value:
return value
except Exception:
pass
return agent_id
@staticmethod
def _normalize_role_key(value: str) -> str:
normalized = (value or "").strip().lower()
normalized = normalized.replace("_", "")
normalized = normalized.replace(" ", "")
replacements = {
"analyst": "分析师",
"macro": "宏观",
"technical": "技术",
"tech": "技术",
"sentiment": "情绪",
"fundamentals": "基本面",
"fundamental": "基本面",
"valuation": "估值",
"crypto": "加密",
"cryptocurrency": "加密",
"semiconductor": "半导体",
"industry": "行业",
"sector": "行业",
"risk": "风险",
}
for src, target in replacements.items():
normalized = normalized.replace(src, target)
return normalized
@staticmethod
def _contains_cjk(value: str) -> bool:
text = (value or "").strip()
return any("\u4e00" <= ch <= "\u9fff" for ch in text)
def _find_similar_existing_analyst(
self,
*,
agent_id: str,
analyst_type: str,
custom_config: Optional[AnalystConfig],
) -> Optional[str]:
requested_names = {self._normalize_role_key(agent_id)}
if custom_config and custom_config.persona and custom_config.persona.name:
requested_names.add(self._normalize_role_key(custom_config.persona.name))
for agent in self._all_analysts():
existing_id = getattr(agent, "name", None) or getattr(agent, "agent_id", None)
if not existing_id or existing_id == agent_id:
continue
existing_names = {
self._normalize_role_key(existing_id),
self._normalize_role_key(self._resolve_agent_display_name(existing_id)),
}
if requested_names & existing_names:
return existing_id
return None
async def _collect_final_predictions(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""
Collect final predictions from all analysts as simple text responses.
Analysts provide their predictions in plain text without tool calls.
"""
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Starting _collect_final_predictions for {len(active_analysts or self.analysts)} analysts",
)
final_predictions = []
analysts = active_analysts or self.analysts
for i, analyst in enumerate(analysts):
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Collecting prediction from analyst {i+1}/{len(analysts)}: {analyst.name}",
)
prompt = (
f"Based on your analysis, provide your final prediction for {date}. "
f"For each ticker ({', '.join(tickers)}), state: "
f"TICKER: UP/DOWN/NEUTRAL (confidence: X%). "
f"Do not use any tools, just respond with your predictions."
)
msg = Msg(name="system", content=prompt, role="user")
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Sending prediction request to {analyst.name}",
)
response = await analyst.reply(msg)
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Received response from {analyst.name}",
)
# Parse predictions from text response
content = self._extract_text_content(response.content)
predictions_data = self._parse_predictions_from_text(
content,
tickers,
)
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" {analyst.name} final predictions: {predictions_data}",
)
final_predictions.append(
{
"agent": analyst.name,
"predictions": predictions_data,
"raw_content": content,
},
)
# if self.state_sync:
# await self.state_sync.on_agent_complete(
# agent_id=f"{analyst.name}_final_prediction",
# content=content,
# )
return final_predictions
def _parse_predictions_from_text(
self,
content: str,
tickers: List[str],
) -> List[Dict[str, Any]]:
"""Parse predictions from analyst text response"""
predictions = []
content_upper = content.upper()
for ticker in tickers:
direction = "neutral"
confidence = 0.5
# Simple pattern matching for direction
ticker_idx = content_upper.find(ticker)
if ticker_idx >= 0:
# Look at text near ticker mention
context = content_upper[ticker_idx : ticker_idx + 100]
if (
"UP" in context
or "BULLISH" in context
or "LONG" in context
):
direction = "up"
confidence = 0.7
elif (
"DOWN" in context
or "BEARISH" in context
or "SHORT" in context
):
direction = "down"
confidence = 0.7
predictions.append(
{
"ticker": ticker,
"direction": direction,
"confidence": confidence,
},
)
return predictions
async def _run_analysts_with_sync(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Run all analysts with real-time sync after each completion"""
results = []
analysts = active_analysts or self.analysts
for analyst in analysts:
content = (
f"Analyze the following stocks for date {date}: {', '.join(tickers)}. "
f"Provide investment signals with confidence scores and reasoning."
)
msg = Msg(
name="system",
content=content,
role="user",
metadata={"tickers": tickers, "date": date},
)
result = await analyst.reply(msg)
extracted = self._extract_result_from_msg(result)
results.append(extracted)
# Sync retrieved memory first
await self._sync_memory_if_retrieved(analyst)
# Broadcast agent result via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id=analyst.name,
content=text_content,
agent_name=self._resolve_agent_display_name(analyst.name),
)
return results
async def _run_analysts_parallel(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Run all analysts in parallel using TeamCoordinator.
This method replaces the sequential analyst loop with parallel execution
using the TeamCoordinator for orchestration.
Args:
tickers: List of stock tickers to analyze
date: Trading date
active_analysts: Optional list of analysts to run
Returns:
List of analyst result dictionaries
"""
analysts = active_analysts or self.analysts
if not analysts:
return []
if not TEAM_COORD_AVAILABLE:
_log("TeamCoordinator not available, falling back to sequential execution")
return await self._run_analysts_with_sync(
tickers=tickers,
date=date,
active_analysts=active_analysts,
)
_log(
f"Phase 1.1: Running {len(analysts)} analysts in parallel "
f"[{', '.join(a.name for a in analysts)}]"
)
# Build the analyst prompt
content = (
f"Analyze the following stocks for date {date}: {', '.join(tickers)}. "
f"Provide investment signals with confidence scores and reasoning."
)
# Create coordinator for parallel execution
coordinator = TeamCoordinator(
participants=analysts,
task_content=content,
)
# Run analysts in parallel via TeamCoordinator
results = await coordinator.run_phase(
"analyst_analysis",
metadata={"tickers": tickers, "date": date},
)
# Process results and sync
processed_results = []
for i, (analyst, result) in enumerate(zip(analysts, results)):
if result is not None:
extracted = self._extract_result_from_msg(result)
processed_results.append(extracted)
# Sync retrieved memory
await self._sync_memory_if_retrieved(analyst)
# Broadcast agent result via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id=analyst.name,
content=text_content,
agent_name=self._resolve_agent_display_name(analyst.name),
)
else:
logger.warning(
"Analyst %s returned no result",
analyst.name,
)
processed_results.append({
"agent": analyst.name,
"content": "",
"success": False,
})
_log(
f"Phase 1.1: Parallel analyst execution complete "
f"({len(processed_results)}/{len(analysts)} successful)"
)
return processed_results
async def _run_analysts(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Run all analysts (without sync, for backward compatibility)"""
results = []
analysts = active_analysts or self.analysts
for analyst in analysts:
content = (
f"Analyze the following stocks for date {date}: {', '.join(tickers)}. "
f"Provide investment signals with confidence scores and reasoning."
)
msg = Msg(
name="system",
content=content,
role="user",
metadata={"tickers": tickers, "date": date},
)
result = await analyst.reply(msg)
results.append(self._extract_result_from_msg(result))
return results
async def _run_risk_manager_with_sync(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
) -> Dict[str, Any]:
"""Run risk manager assessment with real-time sync"""
portfolio = self.pm.get_portfolio_state()
context = {
"portfolio": portfolio,
"tickers": tickers,
"date": date,
"current_prices": prices,
}
content = (
f"Assess risk for the following portfolio and market conditions:\n"
f"{json.dumps(context, indent=2)}\n"
f"Provide risk warnings and recommendations."
)
msg = Msg(name="system", content=content, role="user")
result = await self.risk_manager.reply(msg)
extracted = self._extract_result_from_msg(result)
# Sync retrieved memory first
await self._sync_memory_if_retrieved(self.risk_manager)
# Broadcast agent result via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id="risk_manager",
content=text_content,
agent_name=self._resolve_agent_display_name("risk_manager"),
)
return extracted
async def _run_risk_manager(
self,
tickers: List[str],
date: str,
prices: Dict[str, float],
) -> Dict[str, Any]:
"""Run risk manager assessment (without sync, for backward compatibility)"""
portfolio = self.pm.get_portfolio_state()
context = {
"portfolio": portfolio,
"tickers": tickers,
"date": date,
"current_prices": prices,
}
content = (
f"Assess risk for the following portfolio and market conditions:\n"
f"{json.dumps(context, indent=2)}\n"
f"Provide risk warnings and recommendations."
)
msg = Msg(name="system", content=content, role="user")
result = await self.risk_manager.reply(msg)
return self._extract_result_from_msg(result)
async def _run_pm_with_sync(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> Dict[str, Any]:
"""Run PM decision-making with real-time sync"""
portfolio = self.pm.get_portfolio_state()
context = {
"analyst_signals": {
r["agent"]: r.get("content", "") for r in analyst_results
},
"risk_warnings": risk_assessment.get("content", ""),
"current_prices": prices,
"tickers": tickers,
"portfolio_cash": portfolio.get("cash", 0),
"portfolio_positions": portfolio.get("positions", {}),
}
# Add conference summary if available
if self.conference_summary:
context["conference_summary"] = self.conference_summary
content_parts = [
f"Based on the analyst signals, risk assessment, and conference discussion, "
f"make investment decisions for date {date}.\n",
f"Context:\n{json.dumps(context, indent=2)}\n",
]
if self.conference_summary:
content_parts.append(
f"\n=== Conference Summary ===\n{self.conference_summary}\n",
)
content_parts.append(
"\nUse the make_decision tool for each ticker to record your decisions. "
"After recording all decisions, provide a summary of your investment rationale.",
)
content = "".join(content_parts)
msg = Msg(name="system", content=content, role="user")
result = await self.pm.reply(msg)
extracted = self._extract_result_from_msg(result)
# Sync retrieved memory first
await self._sync_memory_if_retrieved(self.pm)
# Broadcast PM decision via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id="portfolio_manager",
content=text_content,
agent_name=self._resolve_agent_display_name("portfolio_manager"),
)
return extracted
async def _run_pm(
self,
tickers: List[str],
date: str,
prices: Dict[str, float],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> Dict[str, Any]:
"""Run PM decision-making (without sync, for backward compatibility)"""
portfolio = self.pm.get_portfolio_state()
context = {
"analyst_signals": {
r["agent"]: r.get("content", "") for r in analyst_results
},
"risk_warnings": risk_assessment.get("content", ""),
"current_prices": prices,
"tickers": tickers,
"portfolio_cash": portfolio.get("cash", 0),
"portfolio_positions": portfolio.get("positions", {}),
}
content = (
f"Based on the analyst signals and risk assessment, make investment decisions "
f"for date {date}.\n"
f"Context:\n{json.dumps(context, indent=2)}\n\n"
f"Use the make_decision tool for each ticker to record your decisions. "
f"After recording all decisions, provide a summary of your investment rationale."
)
msg = Msg(name="system", content=content, role="user")
result = await self.pm.reply(msg)
return self._extract_result_from_msg(result)
def _execute_decisions(
self,
decisions: Dict[str, Dict],
prices: Optional[Dict[str, float]],
date: str,
) -> Dict[str, Any]:
"""Execute PM decisions with provided prices"""
if not decisions:
return {
"executed_trades": [],
"portfolio": self.pm.get_portfolio_state(),
}
executor = PortfolioTradeExecutor(
initial_portfolio=self.pm.get_portfolio_state(),
)
executed_trades = []
for ticker, decision in decisions.items():
action = decision.get("action", "hold")
quantity = decision.get("quantity", 0)
if action == "hold" or quantity == 0:
continue
price = prices.get(ticker)
if not price or price <= 0:
logger.warning(f"No price for {ticker}, skipping trade")
continue
result = executor.execute_trade(
ticker=ticker,
action=action,
quantity=quantity,
price=price,
current_date=date,
)
if result.get("status") == "success":
executed_trades.append(
{
"ticker": ticker,
"action": action,
"quantity": quantity,
"price": price,
},
)
updated_portfolio = executor.portfolio.copy()
self.pm.update_portfolio(updated_portfolio)
return {
"executed_trades": executed_trades,
"portfolio": updated_portfolio,
}
def _extract_result_from_msg(self, msg: Msg) -> Dict[str, Any]:
"""Extract result dictionary from Msg object"""
result = {
"agent": msg.name,
"content": msg.content,
}
if hasattr(msg, "metadata") and msg.metadata:
result.update(msg.metadata)
if isinstance(msg.content, str):
try:
result["content_parsed"] = json.loads(msg.content)
except json.JSONDecodeError:
pass
return result
def _extract_text_content(self, content: Any) -> str:
"""
Extract plain text from AgentScope Msg content
AgentScope content can be:
- str: plain text
- list: list of TextBlocks like [{'type': 'text', 'text': '...'}]
- dict: single TextBlock
"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
# TextBlock format: {'type': 'text', 'text': '...'}
if item.get("type") == "text" and "text" in item:
texts.append(item["text"])
elif "content" in item:
texts.append(str(item["content"]))
else:
texts.append(str(item))
elif isinstance(item, str):
texts.append(item)
else:
texts.append(str(item))
return "\n".join(texts)
if isinstance(content, dict):
if content.get("type") == "text" and "text" in content:
return content["text"]
return str(content)
return str(content)
def _format_pm_decisions(self, decisions: Dict[str, Dict]) -> str:
"""Format PM decisions as a human-readable string"""
if not decisions:
return "Portfolio analysis completed. No trades recommended."
decision_texts = []
for ticker, decision in decisions.items():
action = decision.get("action", "hold")
quantity = decision.get("quantity", 0)
reasoning = decision.get("reasoning", "")
if action != "hold" and quantity > 0:
decision_texts.append(
f"{action.upper()} {quantity} {ticker}: {reasoning}",
)
if decision_texts:
return "Decisions: " + "; ".join(decision_texts)
return "Portfolio analysis completed. No trades recommended."
def _runtime_update_status(self, agent: Any, status: str) -> None:
if not self.runtime_manager:
return
agent_id = getattr(agent, "agent_id", None) or getattr(agent, "name", None)
if not agent_id:
return
self.runtime_manager.update_agent_status(agent_id, status, self._session_key)
def _runtime_batch_status(self, agents: List[Any], status: str) -> None:
for agent in agents:
self._runtime_update_status(agent, status)
def _sync_agent_runtime_context(
self,
agents: List[Any],
session_key: str,
) -> None:
"""Propagate run/session identifiers onto agent instances.
EvoAgent's tool-guard approval records depend on workspace/session
context being present on the agent object at runtime.
"""
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
for agent in agents:
try:
setattr(agent, "session_id", session_key)
if not getattr(agent, "run_id", None):
setattr(agent, "run_id", config_name)
# Keep workspace_id for backward compatibility
if not getattr(agent, "workspace_id", None):
setattr(agent, "workspace_id", config_name)
except Exception:
continue
def _all_analysts(self) -> List[Any]:
"""Return static analysts plus runtime-created analysts."""
return list(self.analysts) + list(self._dynamic_analysts.values())
def _create_runtime_analyst(
self,
agent_id: str,
analyst_type: str,
custom_config: Optional[AnalystConfig] = None,
) -> str:
"""Create one runtime analyst instance.
Args:
agent_id: Unique identifier for the new analyst
analyst_type: Type of analyst (e.g., "technical_analyst")
custom_config: Optional custom configuration for the analyst,
including persona, soul_md, agents_md, etc.
Returns:
Success or error message
"""
# Validate analyst_type or custom_config
if analyst_type not in ANALYST_TYPES and not custom_config:
return (
f"Unknown analyst_type '{analyst_type}'. "
f"Available: {', '.join(ANALYST_TYPES.keys())}. "
f"Or provide custom_config to create a custom analyst."
)
display_name = (
custom_config.persona.name
if custom_config and custom_config.persona and custom_config.persona.name
else ""
)
if not self._contains_cjk(display_name):
return (
f"Analyst '{agent_id}' requires a Chinese display name. "
"Please provide `name` in Chinese characters when creating dynamic analysts."
)
if agent_id in {agent.name for agent in self._all_analysts()}:
return f"Analyst '{agent_id}' already exists."
similar_existing = self._find_similar_existing_analyst(
agent_id=agent_id,
analyst_type=analyst_type,
custom_config=custom_config,
)
if similar_existing:
return (
f"Analyst '{agent_id}' is too similar to existing analyst "
f"'{similar_existing}'. Reuse or clone the existing analyst instead of "
f"creating a duplicate role."
)
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
project_root = Path(__file__).resolve().parents[2]
# Get persona: use custom_config if provided, else load from personas.yaml
if custom_config and custom_config.persona:
persona = {
"name": custom_config.persona.name,
"focus": custom_config.persona.focus,
"description": custom_config.persona.description,
}
else:
personas = get_prompt_loader().load_yaml_config("analyst", "personas")
persona = personas.get(analyst_type, {})
workspace_manager = WorkspaceManager(project_root=project_root)
# Build file contents: use custom if provided, else generate from persona
file_contents = {}
if custom_config:
if custom_config.soul_md:
file_contents["SOUL.md"] = custom_config.soul_md
if custom_config.agents_md:
file_contents["AGENTS.md"] = custom_config.agents_md
if custom_config.profile_md:
file_contents["PROFILE.md"] = custom_config.profile_md
if custom_config.bootstrap_md:
file_contents["BOOTSTRAP.md"] = custom_config.bootstrap_md
# Fill in any missing files with defaults
if not file_contents or len(file_contents) < 4:
default_files = workspace_manager.build_default_agent_files(
agent_id=agent_id,
persona=persona,
)
for key, value in default_files.items():
if key not in file_contents:
file_contents[key] = value
workspace_manager.ensure_agent_assets(
config_name=config_name,
agent_id=agent_id,
file_contents=file_contents,
)
# Create EvoAgent with workspace-driven configuration
from backend.agents.skills_manager import SkillsManager
skills_manager = SkillsManager(project_root=project_root)
workspace_dir = skills_manager.get_agent_asset_dir(
config_name,
agent_id,
)
agent_config = load_agent_workspace_config(workspace_dir / "agent.yaml")
# Support model override from custom_config
if custom_config and custom_config.model_name:
# Import create_model for custom model creation
from backend.llm.models import create_model
# Use specified model name, default to openai provider
model = create_model(
model_name=custom_config.model_name,
model_provider=custom_config.memory_config.get("model_provider", "openai") if custom_config.memory_config else "openai"
)
else:
model = get_agent_model(analyst_type)
agent = EvoAgent(
agent_id=agent_id,
config_name=config_name,
workspace_dir=workspace_dir,
model=model,
formatter=get_agent_formatter(analyst_type),
prompt_files=agent_config.prompt_files,
)
agent.toolkit = create_agent_toolkit(
agent_id=agent_id,
config_name=config_name,
active_skill_dirs=[],
)
setattr(agent, "run_id", config_name)
# Keep workspace_id for backward compatibility
setattr(agent, "workspace_id", config_name)
self._dynamic_analysts[agent_id] = agent
if self.runtime_manager:
display_name = None
if custom_config and custom_config.persona and custom_config.persona.name:
display_name = custom_config.persona.name
self.runtime_manager.register_agent(
agent_id,
display_name=display_name,
)
self.runtime_manager.log_event(
"agent:created",
{
"agent_id": agent_id,
"analyst_type": analyst_type,
"display_name": display_name,
},
)
logger.info(
"Dynamic analyst created: agent_id=%s analyst_type=%s custom=%s",
agent_id,
analyst_type,
bool(custom_config),
)
if self.state_sync:
try:
asyncio.create_task(
self.state_sync.emit(
{
"type": "runtime_agents_updated",
"action": "created",
"agentId": agent_id,
"agentName": display_name or self._resolve_agent_display_name(agent_id),
},
persist=False,
)
)
except Exception as exc:
logger.warning(
"Failed to broadcast runtime_agents_updated(create) for %s: %s",
agent_id,
exc,
)
# Store custom config for future reference (e.g., cloning)
if custom_config:
self._dynamic_analyst_configs[agent_id] = custom_config
update_active_analysts(
project_root=project_root,
config_name=config_name,
available_analysts=[item.name for item in self._all_analysts()],
add=[agent_id],
)
return f"Created runtime analyst '{agent_id}' ({analyst_type})."
def _remove_runtime_analyst(self, agent_id: str) -> str:
"""Remove one runtime-created analyst instance."""
if agent_id not in self._dynamic_analysts:
return f"Runtime analyst '{agent_id}' not found."
self._dynamic_analysts.pop(agent_id, None)
# Also remove stored config if exists
self._dynamic_analyst_configs.pop(agent_id, None)
if self.runtime_manager:
self.runtime_manager.unregister_agent(agent_id)
self.runtime_manager.log_event(
"agent:removed",
{"agent_id": agent_id},
)
logger.info("Dynamic analyst removed: agent_id=%s", agent_id)
if self.state_sync:
try:
asyncio.create_task(
self.state_sync.emit(
{
"type": "runtime_agents_updated",
"action": "removed",
"agentId": agent_id,
},
persist=False,
)
)
except Exception as exc:
logger.warning(
"Failed to broadcast runtime_agents_updated(remove) for %s: %s",
agent_id,
exc,
)
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
project_root = Path(__file__).resolve().parents[2]
update_active_analysts(
project_root=project_root,
config_name=config_name,
available_analysts=[item.name for item in self._all_analysts()],
remove=[agent_id],
)
return f"Removed runtime analyst '{agent_id}'."
def _get_active_analysts(self) -> List[Any]:
"""Resolve active analyst participants from run-scoped team pipeline config."""
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
project_root = Path(__file__).resolve().parents[2]
analyst_map = {agent.name: agent for agent in self._all_analysts()}
active_ids = resolve_active_analysts(
project_root=project_root,
config_name=config_name,
available_analysts=list(analyst_map.keys()),
)
return [analyst_map[agent_id] for agent_id in active_ids if agent_id in analyst_map]
def _runtime_log_event(self, event: str, details: Optional[Dict[str, Any]] = None) -> None:
if not self.runtime_manager:
return
self.runtime_manager.log_event(event, details)