Files
evotraders/backend/core/pipeline.py
cillin 06a23c32a4 refactor: Fix code quality issues identified in analysis
1. Rename factory.py's EvoAgent data class to AgentConfig
   - Avoids naming conflict with base/evo_agent.py's EvoAgent

2. Export pipeline_runner functions in backend/core/__init__.py
   - Add create_agents, create_long_term_memory, stop_gateway

3. Consolidate PromptLoader to singleton pattern
   - Add get_prompt_loader() singleton function
   - Update all usages to use singleton

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 01:07:53 +08:00

1691 lines
61 KiB
Python

# -*- coding: utf-8 -*-
"""
Core Pipeline - Orchestrates multi-agent analysis and decision-making
"""
# flake8: noqa: E501
# pylint: disable=W0613,C0301
import json
import logging
import os
import re
from contextlib import nullcontext
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional
from agentscope.message import Msg
from agentscope.pipeline import MsgHub
from backend.utils.settlement import SettlementCoordinator
from backend.utils.terminal_dashboard import get_dashboard
from backend.core.state_sync import StateSync
from backend.utils.trade_executor import PortfolioTradeExecutor
from backend.runtime.manager import TradingRuntimeManager
from backend.runtime.session import TradingSessionKey
from backend.agents.team_pipeline_config import (
resolve_active_analysts,
update_active_analysts,
)
from backend.agents import AnalystAgent
from backend.agents.toolkit_factory import create_agent_toolkit
from backend.agents.workspace_manager import WorkspaceManager
from backend.agents.prompt_loader import get_prompt_loader
from backend.llm.models import get_agent_formatter, get_agent_model
from backend.config.constants import ANALYST_TYPES
# Team infrastructure imports (graceful import - may not exist yet)
try:
from backend.agents.team.team_coordinator import TeamCoordinator
from backend.agents.team.msg_hub import MsgHub as TeamMsgHub
TEAM_COORD_AVAILABLE = True
except ImportError:
TEAM_COORD_AVAILABLE = False
TeamCoordinator = None
TeamMsgHub = None
logger = logging.getLogger(__name__)
def _log(msg: str):
"""Log to dashboard if available, otherwise to logger"""
dashboard = get_dashboard()
if dashboard.live:
dashboard.log(msg)
else:
logger.info(msg)
class TradingPipeline:
"""
Trading Pipeline - Orchestrates the complete trading cycle
Flow:
1. Clear agent short-term memory (avoid cross-day context pollution)
2. Analysts analyze stocks
3. Risk Manager provides risk assessment
4. PM makes decisions (direction + quantity)
5. Execute trades with provided prices
6. Reflection phase: broadcast closing P&L, agents record to long-term memory
Real-time updates via StateSync after each agent completes.
Supports both legacy agent lists and new workspace-based agent loading.
"""
def __init__(
self,
analysts: List[Any],
risk_manager: Any,
portfolio_manager: Any,
state_sync: Optional["StateSync"] = None,
settlement_coordinator: Optional[SettlementCoordinator] = None,
max_comm_cycles: Optional[int] = None,
workspace_id: Optional[str] = None,
agent_factory: Optional[Any] = None,
runtime_manager: Optional[TradingRuntimeManager] = None,
):
self.analysts = analysts
self.risk_manager = risk_manager
self.pm = portfolio_manager
self.state_sync = state_sync
self.settlement_coordinator = settlement_coordinator
self.max_comm_cycles = max_comm_cycles or int(
os.getenv("MAX_COMM_CYCLES", "2"),
)
self.conference_summary = None # Store latest conference summary
self.workspace_id = workspace_id
self.agent_factory = agent_factory
self.runtime_manager = runtime_manager
self._session_key: Optional[str] = None
self._dynamic_analysts: Dict[str, Any] = {}
if hasattr(self.pm, "set_team_controller"):
self.pm.set_team_controller(
create_agent_callback=self._create_runtime_analyst,
remove_agent_callback=self._remove_runtime_analyst,
)
async def run_cycle(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]] = None,
close_prices: Optional[Dict[str, float]] = None,
market_caps: Optional[Dict[str, float]] = None,
get_open_prices_fn: Optional[
Callable[[], Awaitable[Dict[str, float]]]
] = None,
get_close_prices_fn: Optional[
Callable[[], Awaitable[Dict[str, float]]]
] = None,
execute_decisions: bool = True,
) -> Dict[str, Any]:
"""
Run one complete trading cycle
Args:
tickers: List of stock tickers
date: Trading date (YYYY-MM-DD)
prices: Open prices {ticker: price} (for backtest)
close_prices: Close prices for settlement (for backtest)
market_caps: Optional market caps for baseline calculation
get_open_prices_fn: Async callback to wait for open prices (live mode)
get_close_prices_fn: Async callback to wait for close prices (live mode)
For live mode:
- Analysis runs immediately
- Execution waits for market open via get_open_prices_fn
- Settlement waits for market close via get_close_prices_fn
Each agent's result is broadcast immediately via StateSync.
"""
_log(f"Starting cycle {date} - {len(tickers)} tickers")
session_key = TradingSessionKey(date=date).key()
self._session_key = session_key
active_analysts = self._get_active_analysts()
if self.runtime_manager:
self.runtime_manager.set_session_key(session_key)
self._runtime_log_event("cycle:start", {"tickers": tickers, "date": date})
self._runtime_batch_status(active_analysts, "analysis_in_progress")
# Phase 0: Clear short-term memory to avoid cross-day context pollution
_log("Phase 0: Clearing memory")
await self._clear_all_agent_memory()
participants = self._all_analysts() + [self.risk_manager, self.pm]
# Single MsgHub for entire cycle - no nesting
async with MsgHub(
participants=participants,
announcement=Msg(
"system",
f"Starting analysis cycle for {date}. Tickers: {', '.join(tickers)}",
"system",
),
):
# Phase 1.1: Analysts (parallel execution with TeamCoordinator)
_log("Phase 1.1: Analyst analysis (parallel)")
analyst_results = await self._run_analysts_parallel(
tickers,
date,
active_analysts=active_analysts,
)
# Phase 1.2: Risk Manager
_log("Phase 1.2: Risk assessment")
self._runtime_update_status(self.risk_manager, "risk_assessment")
risk_assessment = await self._run_risk_manager_with_sync(
tickers,
date,
prices,
)
# Phase 2.1: Conference discussion (within same MsgHub)
_log("Phase 2.1: Conference discussion")
conference_summary = await self._run_conference_cycles(
tickers=tickers,
date=date,
prices=prices,
analyst_results=analyst_results,
risk_assessment=risk_assessment,
)
self.conference_summary = conference_summary
# Phase 2.2: Analysts generate final structured predictions
_log("Phase 2.2: Analysts generate final structured predictions")
final_predictions = await self._collect_final_predictions(
tickers,
date,
active_analysts=active_analysts,
)
# Record final predictions for leaderboard ranking
if self.settlement_coordinator:
self.settlement_coordinator.record_analyst_predictions(
final_predictions,
)
# Live mode: wait for market open before execution
if get_open_prices_fn:
_log("Waiting for market open...")
prices = await get_open_prices_fn()
_log(f"Got open prices: {prices}")
# Phase 3: PM makes decisions
_log("Phase 3.1: PM makes decisions")
self._runtime_update_status(self.pm, "decision_phase")
pm_result = await self._run_pm_with_sync(
tickers,
date,
prices,
analyst_results,
risk_assessment,
)
decisions = pm_result.get("decisions", {})
execution_result = {
"executed_trades": [],
"portfolio": self.pm.get_portfolio_state(),
}
if execute_decisions:
_log("Phase 4: Executing trades")
self._runtime_update_status(self.pm, "executing")
execution_result = self._execute_decisions(decisions, prices, date)
else:
_log("Phase 4: Skipping trade execution")
# Live mode: wait for market close before settlement
if get_close_prices_fn:
_log("Waiting for market close")
close_prices = await get_close_prices_fn()
_log(f"Got close prices: {close_prices}")
# Phase 5: Settlement - run after close prices available
settlement_result = None
if close_prices and self.settlement_coordinator:
_log("Phase 5: Daily review and generate memories")
self._runtime_batch_status(
[self.risk_manager] + self._all_analysts() + [self.pm],
"settlement",
)
agent_trajectories = await self._capture_agent_trajectories()
if market_caps is None:
market_caps = {ticker: 1e9 for ticker in tickers}
settlement_result = (
self.settlement_coordinator.run_daily_settlement(
date=date,
tickers=tickers,
open_prices=prices,
close_prices=close_prices,
market_caps=market_caps,
agent_portfolio=execution_result.get("portfolio", {}),
analyst_results=analyst_results,
pm_decisions=decisions,
)
)
await self._run_reflection(
date=date,
agent_trajectories=agent_trajectories,
analyst_results=analyst_results,
decisions=decisions,
executed_trades=execution_result.get("executed_trades", []),
open_prices=prices,
close_prices=close_prices,
settlement_result=settlement_result,
conference_summary=self.conference_summary,
)
self._runtime_batch_status(
[self.risk_manager] + self._all_analysts() + [self.pm],
"reflection",
)
_log(f"Cycle complete: {date}")
self._runtime_batch_status(
self._all_analysts() + [self.risk_manager, self.pm],
"idle",
)
self._runtime_log_event("cycle:end", {"tickers": tickers, "date": date})
return {
"analyst_results": analyst_results,
"risk_assessment": risk_assessment,
"pm_decisions": decisions,
"executed_trades": execution_result.get("executed_trades", []),
"portfolio": execution_result.get("portfolio", {}),
"settlement_result": settlement_result,
}
def reload_runtime_assets(
self,
runtime_config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Reload prompt assets and safe in-process runtime settings."""
from backend.agents.skills_manager import SkillsManager
from backend.agents.toolkit_factory import load_agent_profiles
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
if runtime_config and "max_comm_cycles" in runtime_config:
self.max_comm_cycles = int(runtime_config["max_comm_cycles"])
skills_manager = SkillsManager()
profiles = load_agent_profiles()
active_skill_map = skills_manager.prepare_active_skills(
config_name=config_name,
agent_defaults={
agent_id: profile.get("skills", [])
for agent_id, profile in profiles.items()
},
)
for analyst in self._all_analysts():
analyst.reload_runtime_assets(
active_skill_dirs=active_skill_map.get(analyst.name, []),
)
self.risk_manager.reload_runtime_assets(
active_skill_dirs=active_skill_map.get("risk_manager", []),
)
self.pm.reload_runtime_assets(
active_skill_dirs=active_skill_map.get("portfolio_manager", []),
)
return {
"config_name": config_name,
"reloaded_agents": [agent.name for agent in self._all_analysts()]
+ ["risk_manager", "portfolio_manager"],
"active_skills": {
agent_id: [path.name for path in paths]
for agent_id, paths in active_skill_map.items()
},
"max_comm_cycles": self.max_comm_cycles,
}
async def _clear_all_agent_memory(self):
"""Clear short-term memory for all agents"""
for analyst in self._all_analysts():
await analyst.memory.clear()
await self.risk_manager.memory.clear()
await self.pm.memory.clear()
async def _sync_memory_if_retrieved(self, agent: Any) -> None:
"""
Check agent's short-term memory for retrieved long-term memory and sync to frontend.
AgentScope's ReActAgent adds a Msg with name="long_term_memory" when
memory is retrieved in static_control mode.
"""
if not self.state_sync:
return
try:
msgs = await agent.memory.get_memory()
for msg in msgs:
if getattr(msg, "name", None) == "long_term_memory":
content = self._extract_text_content(msg.content)
if content:
parsed = self._parse_memory_content(content)
await self.state_sync.on_memory_retrieved(
agent_id=agent.name,
content=parsed,
)
break # Only sync the first (most recent) memory retrieval
except Exception as e:
logger.warning(f"Failed to sync memory for {agent.name}: {e}")
def _parse_memory_content(self, content: str) -> str:
"""
Parse memory content to extract rewritten_context from JSON format.
AgentScope ReMe memory wraps content in <long_term_memory> tags with JSON.
"""
# Try to extract JSON from the content
print("memory content:\n", content)
json_match = re.search(
r"<long_term_memory>.*?```json\s*(\{[\s\S]*?\})\s*```\s*</long_term_memory>",
content,
re.DOTALL,
)
if not json_match:
json_match = re.search(
r'\{[^{}]*"rewritten_context"[^{}]*\}',
content,
re.DOTALL,
)
if json_match:
try:
json_str = json_match.group(1)
data = json.loads(json_str)
return data.get("rewritten_context", "")
except json.JSONDecodeError:
pass
# Fallback: strip XML tags and return cleaned content
content = re.sub(r"</?long_term_memory>", "", content)
content = re.sub(
r"The content below are retrieved from long-term memory.*?:\s*",
"",
content,
)
return content.strip()
async def _capture_agent_trajectories(self) -> Dict[str, List[Msg]]:
"""
Capture execution trajectories from all agents' short-term memory
This should be called BEFORE clearing memory to preserve the
complete execution trajectory for long-term memory recording.
Returns:
Dict mapping agent name to list of Msg objects (the trajectory)
"""
trajectories = {}
# Capture analyst trajectories
for analyst in self._all_analysts():
try:
msgs = await analyst.memory.get_memory()
if msgs:
trajectories[analyst.name] = list(msgs)
except Exception as e:
logger.warning(
f"Failed to capture trajectory for {analyst.name}: {e}",
)
# Capture risk manager trajectory
try:
msgs = await self.risk_manager.memory.get_memory()
if msgs:
trajectories["risk_manager"] = list(msgs)
except Exception as e:
logger.warning(
f"Failed to capture trajectory for risk_manager: {e}",
)
# Capture PM trajectory
try:
msgs = await self.pm.memory.get_memory()
if msgs:
trajectories["portfolio_manager"] = list(msgs)
except Exception as e:
logger.warning(
f"Failed to capture trajectory for portfolio_manager: {e}",
)
return trajectories
async def _run_reflection(
self,
date: str,
agent_trajectories: Dict[str, List[Msg]],
analyst_results: List[Dict[str, Any]],
decisions: Dict[str, Dict],
executed_trades: List[Dict],
open_prices: Optional[Dict[str, float]],
close_prices: Optional[Dict[str, float]],
settlement_result: Optional[Dict[str, Any]] = None,
conference_summary: Optional[str] = None,
):
"""
Run reflection phase after market close
Calculates actual P&L and records execution trajectory to long-term memory
Args:
date: Trading date
agent_trajectories: Dict mapping agent name to their execution trajectory
analyst_results: Results from analyst agents
decisions: PM decisions
executed_trades: List of executed trades
open_prices: Opening prices
close_prices: Closing prices
settlement_result: Optional settlement results with baseline performance
conference_summary: Optional summary from conference discussion
"""
# Calculate P&L for each trade
trade_pnl = []
for trade in executed_trades:
ticker = trade["ticker"]
action = trade["action"]
quantity = trade["quantity"]
entry_price = trade["price"]
exit_price = close_prices.get(ticker, entry_price)
if action == "long":
pnl = (exit_price - entry_price) * quantity
elif action == "short":
pnl = (entry_price - exit_price) * quantity
else:
pnl = 0
pnl_pct = (
(pnl / (entry_price * quantity) * 100) if quantity > 0 else 0
)
trade_pnl.append(
{
"ticker": ticker,
"action": action,
"quantity": quantity,
"entry_price": entry_price,
"exit_price": exit_price,
"pnl": pnl,
"pnl_pct": pnl_pct,
},
)
total_pnl = sum(t["pnl"] for t in trade_pnl)
# Build reflection summary with settlement info
reflection_content = self._build_reflection_content(
date=date,
analyst_results=analyst_results,
decisions=decisions,
trade_pnl=trade_pnl,
total_pnl=total_pnl,
settlement_result=settlement_result,
conference_summary=conference_summary,
)
# Record execution trajectories to long-term memory for agents that support it
# Score based on profitability: higher score for profitable days
score = 1.0 if total_pnl > 0 else 0.0
await self._record_to_long_term_memory(
date=date,
agent_trajectories=agent_trajectories,
trade_pnl=trade_pnl,
total_pnl=total_pnl,
score=score,
)
# Broadcast reflection to StateSync
if self.state_sync:
await self.state_sync.on_agent_complete(
agent_id="Daily Log",
content=reflection_content,
)
def _build_reflection_content(
self,
date: str,
analyst_results: List[Dict[str, Any]],
decisions: Dict[str, Dict],
trade_pnl: List[Dict],
total_pnl: float,
settlement_result: Optional[Dict[str, Any]] = None,
conference_summary: Optional[str] = None,
) -> str:
"""Build human-readable reflection content"""
lines = [f"Daily log for {date}:"]
lines.append(f"Total P&L: ${total_pnl:,.2f}")
lines.append("")
if conference_summary:
lines.append("Conference Discussion Summary:")
lines.append(conference_summary)
lines.append("")
if settlement_result:
baseline_values = settlement_result.get("baseline_values", {})
initial = 100000.0
lines.append("Baseline Comparison:")
lines.append(
f" Equal Weight: ${baseline_values.get('equal_weight', 0):,.2f} "
f"({(baseline_values.get('equal_weight', initial) - initial) / initial * 100:.2f}%)",
)
lines.append(
f" Market Cap Weighted: ${baseline_values.get('market_cap_weighted', 0):,.2f} "
f"({(baseline_values.get('market_cap_weighted', initial) - initial) / initial * 100:.2f}%)",
)
lines.append(
f" Momentum: ${baseline_values.get('momentum', 0):,.2f} "
f"({(baseline_values.get('momentum', initial) - initial) / initial * 100:.2f}%)",
)
lines.append("")
if trade_pnl:
lines.append("Trade Results:")
for t in trade_pnl:
pnl_sign = "+" if t["pnl"] >= 0 else ""
lines.append(
f" {t['ticker']}: {t['action'].upper()} {t['quantity']} @ "
f"${t['entry_price']:.2f} -> ${t['exit_price']:.2f}, "
f"P&L: {pnl_sign}${t['pnl']:.2f} ({pnl_sign}{t['pnl_pct']:.1f}%)",
)
else:
lines.append("No trades executed today.")
return "\n".join(lines)
async def _record_to_long_term_memory(
self,
date: str,
agent_trajectories: Dict[str, List[Msg]],
trade_pnl: List[Dict],
total_pnl: float,
score: float,
):
"""
Record execution trajectories to long-term memory for all agents
This method records the actual execution trajectory (conversation history)
from each agent's short-term memory. This allows the ReMe memory system
to learn from the complete task execution flow, not just summaries.
Args:
date: Trading date
agent_trajectories: Dict mapping agent name to their execution trajectory
trade_pnl: P&L details for each trade
total_pnl: Total P&L for the day
score: Score for this trajectory (1.0 for profitable, 0.5 for loss)
"""
# Build outcome message to append to trajectories
outcome_msg = Msg(
role="user",
content=f"You are an analyst/financial manager, The Key point is to predict correctly and"
f"have good P&L. The Definition of loss is when P&L < 0. "
f"Focus on how to do good prediction but not only execution correctly."
f"[Outcome] Trading day {date} - Total P&L: ${total_pnl:,.2f}. "
f"{'Profitable day.' if total_pnl > 0 else 'Loss day.'}",
name="system",
)
# Record for analysts
for analyst in self._all_analysts():
if (
hasattr(analyst, "long_term_memory")
and analyst.long_term_memory is not None
):
trajectory = agent_trajectories.get(analyst.name, [])
if trajectory:
# Append outcome to trajectory
trajectory_with_outcome = trajectory + [outcome_msg]
try:
await analyst.long_term_memory.record(
msgs=trajectory_with_outcome,
score=score,
)
logger.debug(
f"Recorded {len(trajectory_with_outcome)} messages "
f"to long-term memory for {analyst.name}",
)
except Exception as e:
logger.warning(
f"Failed to record to long-term memory for {analyst.name}: {e}",
)
# Record for risk manager
if (
hasattr(self.risk_manager, "long_term_memory")
and self.risk_manager.long_term_memory is not None
):
trajectory = agent_trajectories.get("risk_manager", [])
if trajectory:
trajectory_with_outcome = trajectory + [outcome_msg]
try:
await self.risk_manager.long_term_memory.record(
msgs=trajectory_with_outcome,
score=score,
)
logger.debug(
f"Recorded {len(trajectory_with_outcome)} messages "
f"to long-term memory for risk_manager",
)
except Exception as e:
logger.warning(
f"Failed to record to long-term memory for risk_manager: {e}",
)
# Record for PM with trade outcome details
if (
hasattr(self.pm, "long_term_memory")
and self.pm.long_term_memory is not None
):
trajectory = agent_trajectories.get("portfolio_manager", [])
if trajectory:
# Build detailed outcome message for PM
pnl_details = []
for t in trade_pnl:
pnl_sign = "+" if t["pnl"] >= 0 else ""
pnl_details.append(
f"{t['ticker']}: {t['action']} {t['quantity']} @ "
f"${t['entry_price']:.2f} -> ${t['exit_price']:.2f}, "
f"P&L: {pnl_sign}${t['pnl']:.2f}",
)
pm_outcome_msg = Msg(
role="user",
content=f"[Outcome] Trading day {date}\n"
f"Total P&L: ${total_pnl:,.2f} "
f"({'Profitable' if total_pnl >= 0 else 'Loss'})\n"
f"Trade details:\n" + "\n".join(pnl_details)
if pnl_details
else f"[Outcome] Trading day {date}\n"
f"Total P&L: ${total_pnl:,.2f}\nNo trades executed.",
name="system",
)
trajectory_with_outcome = trajectory + [pm_outcome_msg]
try:
await self.pm.long_term_memory.record(
msgs=trajectory_with_outcome,
score=score,
)
logger.debug(
f"Recorded {len(trajectory_with_outcome)} messages "
f"to long-term memory for portfolio_manager",
)
except Exception as e:
logger.warning(
f"Failed to record to long-term memory for portfolio_manager: {e}",
)
async def _run_conference_cycles(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> Optional[str]:
"""
Run conference discussion cycles (within existing MsgHub context)
No nested MsgHub - this runs inside the main cycle's MsgHub.
Returns:
Conference summary string generated by PM
"""
if self.max_comm_cycles <= 0:
_log(
"Phase 2.1: Conference discussion - "
"Conference skipped (disabled)",
)
return None
conference_title = f"Investment Discussion - {date}"
if self.state_sync:
await self.state_sync.on_conference_start(
title=conference_title,
date=date,
)
# Conference participants: analysts + PM
conference_participants = self._get_active_analysts() + [self.pm]
# Use TeamMsgHub for conference if available
if TEAM_COORD_AVAILABLE and TeamMsgHub is not None:
_log(
f"Phase 2.1: Conference using TeamMsgHub with "
f"{len(conference_participants)} participants"
)
conference_hub = TeamMsgHub(participants=conference_participants)
else:
_log("Phase 2.1: Conference using standard MsgHub context")
conference_hub = None
# Run discussion cycles
async with conference_hub if conference_hub else nullcontext(None):
for cycle in range(self.max_comm_cycles):
_log(
"Phase 2.1: Conference discussion - "
f"Conference {cycle + 1}/{self.max_comm_cycles}",
)
if self.state_sync:
await self.state_sync.on_conference_cycle_start(
cycle=cycle + 1,
total_cycles=self.max_comm_cycles,
)
# PM sets agenda or asks questions
pm_prompt = self._build_pm_discussion_prompt(
cycle=cycle,
tickers=tickers,
date=date,
prices=prices,
analyst_results=analyst_results,
risk_assessment=risk_assessment,
)
pm_msg = Msg(name="system", content=pm_prompt, role="user")
pm_response = await self.pm.reply(pm_msg)
if self.state_sync:
pm_content = self._extract_text_content(pm_response.content)
await self.state_sync.on_conference_message(
agent_id="portfolio_manager",
content=pm_content,
)
# Analysts share perspectives (supports per-round active team updates)
for analyst in self._get_active_analysts():
analyst_prompt = self._build_analyst_discussion_prompt(
cycle=cycle,
tickers=tickers,
date=date,
)
analyst_msg = Msg(
name="system",
content=analyst_prompt,
role="user",
)
analyst_response = await analyst.reply(analyst_msg)
if self.state_sync:
analyst_content = self._extract_text_content(
analyst_response.content,
)
await self.state_sync.on_conference_message(
agent_id=analyst.name,
content=analyst_content,
)
if self.state_sync:
await self.state_sync.on_conference_cycle_end(
cycle=cycle + 1,
)
# Generate conference summary by PM
_log(
"Phase 2.1: Conference discussion - Generating conference summary",
)
summary_prompt = (
f"The conference discussion for {date} has concluded. "
f"As Portfolio Manager, provide a concise summary of the key insights, "
f"concerns, and consensus points discussed about {', '.join(tickers)}. "
f"Highlight any critical factors that should be considered in the final decision-making."
)
summary_msg = Msg(name="system", content=summary_prompt, role="user")
summary_response = await self.pm.reply(summary_msg)
conference_summary = self._extract_text_content(
summary_response.content,
)
_log(
"Phase 2.1: Conference discussion - Conference summary generated",
)
if self.state_sync:
await self.state_sync.on_conference_message(
agent_id="conference summary",
content=conference_summary,
)
await self.state_sync.on_conference_end()
return conference_summary
def _build_pm_discussion_prompt(
self,
cycle: int,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> str:
"""Build PM discussion prompt with full context"""
# Get current portfolio state
portfolio = self.pm.get_portfolio_state()
if cycle == 0:
# First cycle: provide full context
context_lines = [
f"As Portfolio Manager, review the following information for {date}:",
"",
"=== Current Portfolio ===",
f"Cash: ${portfolio.get('cash', 0):,.2f}",
f"Positions: {json.dumps(portfolio.get('positions', {}), indent=2)}",
"",
"=== Current Prices ===",
json.dumps(prices, indent=2),
"",
"=== Analyst Signals ===",
]
# Add analyst results summary
for result in analyst_results:
agent_name = result.get("agent", "Unknown")
content = result.get("content", "")
context_lines.append(f"{agent_name}: {content}")
context_lines.extend(
[
"",
"=== Risk Assessment ===",
str(risk_assessment.get("content", "")),
"",
"Based on the above context, share your key concerns or questions about the opportunities in "
f"{', '.join(tickers)}. Do not make final decisions yet - this is a discussion phase.",
],
)
return "\n".join(context_lines)
else:
return (
f"Continue the discussion. Share your thoughts on the perspectives raised "
f"and any remaining concerns about {', '.join(tickers)}."
)
def _build_analyst_discussion_prompt(
self,
cycle: int,
tickers: List[str],
date: str,
) -> str:
"""Build analyst discussion prompt"""
return (
f"Share your perspective on the discussion so far. "
f"Provide insights or address concerns raised by others about {', '.join(tickers)}. "
f"Do not use tools - focus on sharing your professional opinion."
)
async def _collect_final_predictions(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""
Collect final predictions from all analysts as simple text responses.
Analysts provide their predictions in plain text without tool calls.
"""
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Starting _collect_final_predictions for {len(active_analysts or self.analysts)} analysts",
)
final_predictions = []
analysts = active_analysts or self.analysts
for i, analyst in enumerate(analysts):
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Collecting prediction from analyst {i+1}/{len(analysts)}: {analyst.name}",
)
prompt = (
f"Based on your analysis, provide your final prediction for {date}. "
f"For each ticker ({', '.join(tickers)}), state: "
f"TICKER: UP/DOWN/NEUTRAL (confidence: X%). "
f"Do not use any tools, just respond with your predictions."
)
msg = Msg(name="system", content=prompt, role="user")
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Sending prediction request to {analyst.name}",
)
response = await analyst.reply(msg)
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" Received response from {analyst.name}",
)
# Parse predictions from text response
content = self._extract_text_content(response.content)
predictions_data = self._parse_predictions_from_text(
content,
tickers,
)
_log(
"Phase 2.2: Analysts generate final structured predictions\n"
f" {analyst.name} final predictions: {predictions_data}",
)
final_predictions.append(
{
"agent": analyst.name,
"predictions": predictions_data,
"raw_content": content,
},
)
# if self.state_sync:
# await self.state_sync.on_agent_complete(
# agent_id=f"{analyst.name}_final_prediction",
# content=content,
# )
return final_predictions
def _parse_predictions_from_text(
self,
content: str,
tickers: List[str],
) -> List[Dict[str, Any]]:
"""Parse predictions from analyst text response"""
predictions = []
content_upper = content.upper()
for ticker in tickers:
direction = "neutral"
confidence = 0.5
# Simple pattern matching for direction
ticker_idx = content_upper.find(ticker)
if ticker_idx >= 0:
# Look at text near ticker mention
context = content_upper[ticker_idx : ticker_idx + 100]
if (
"UP" in context
or "BULLISH" in context
or "LONG" in context
):
direction = "up"
confidence = 0.7
elif (
"DOWN" in context
or "BEARISH" in context
or "SHORT" in context
):
direction = "down"
confidence = 0.7
predictions.append(
{
"ticker": ticker,
"direction": direction,
"confidence": confidence,
},
)
return predictions
async def _run_analysts_with_sync(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Run all analysts with real-time sync after each completion"""
results = []
analysts = active_analysts or self.analysts
for analyst in analysts:
content = (
f"Analyze the following stocks for date {date}: {', '.join(tickers)}. "
f"Provide investment signals with confidence scores and reasoning."
)
msg = Msg(
name="system",
content=content,
role="user",
metadata={"tickers": tickers, "date": date},
)
result = await analyst.reply(msg)
extracted = self._extract_result_from_msg(result)
results.append(extracted)
# Sync retrieved memory first
await self._sync_memory_if_retrieved(analyst)
# Broadcast agent result via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id=analyst.name,
content=text_content,
)
return results
async def _run_analysts_parallel(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Run all analysts in parallel using TeamCoordinator.
This method replaces the sequential analyst loop with parallel execution
using the TeamCoordinator for orchestration.
Args:
tickers: List of stock tickers to analyze
date: Trading date
active_analysts: Optional list of analysts to run
Returns:
List of analyst result dictionaries
"""
analysts = active_analysts or self.analysts
if not analysts:
return []
if not TEAM_COORD_AVAILABLE:
_log("TeamCoordinator not available, falling back to sequential execution")
return await self._run_analysts_with_sync(
tickers=tickers,
date=date,
active_analysts=active_analysts,
)
_log(
f"Phase 1.1: Running {len(analysts)} analysts in parallel "
f"[{', '.join(a.name for a in analysts)}]"
)
# Build the analyst prompt
content = (
f"Analyze the following stocks for date {date}: {', '.join(tickers)}. "
f"Provide investment signals with confidence scores and reasoning."
)
# Create coordinator for parallel execution
coordinator = TeamCoordinator(
participants=analysts,
task_content=content,
)
# Run analysts in parallel via TeamCoordinator
results = await coordinator.run_phase(
"analyst_analysis",
metadata={"tickers": tickers, "date": date},
)
# Process results and sync
processed_results = []
for i, (analyst, result) in enumerate(zip(analysts, results)):
if result is not None:
extracted = self._extract_result_from_msg(result)
processed_results.append(extracted)
# Sync retrieved memory
await self._sync_memory_if_retrieved(analyst)
# Broadcast agent result via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id=analyst.name,
content=text_content,
)
else:
logger.warning(
"Analyst %s returned no result",
analyst.name,
)
processed_results.append({
"agent": analyst.name,
"content": "",
"success": False,
})
_log(
f"Phase 1.1: Parallel analyst execution complete "
f"({len(processed_results)}/{len(analysts)} successful)"
)
return processed_results
async def _run_analysts(
self,
tickers: List[str],
date: str,
active_analysts: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Run all analysts (without sync, for backward compatibility)"""
results = []
analysts = active_analysts or self.analysts
for analyst in analysts:
content = (
f"Analyze the following stocks for date {date}: {', '.join(tickers)}. "
f"Provide investment signals with confidence scores and reasoning."
)
msg = Msg(
name="system",
content=content,
role="user",
metadata={"tickers": tickers, "date": date},
)
result = await analyst.reply(msg)
results.append(self._extract_result_from_msg(result))
return results
async def _run_risk_manager_with_sync(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
) -> Dict[str, Any]:
"""Run risk manager assessment with real-time sync"""
portfolio = self.pm.get_portfolio_state()
context = {
"portfolio": portfolio,
"tickers": tickers,
"date": date,
"current_prices": prices,
}
content = (
f"Assess risk for the following portfolio and market conditions:\n"
f"{json.dumps(context, indent=2)}\n"
f"Provide risk warnings and recommendations."
)
msg = Msg(name="system", content=content, role="user")
result = await self.risk_manager.reply(msg)
extracted = self._extract_result_from_msg(result)
# Sync retrieved memory first
await self._sync_memory_if_retrieved(self.risk_manager)
# Broadcast agent result via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id="risk_manager",
content=text_content,
)
return extracted
async def _run_risk_manager(
self,
tickers: List[str],
date: str,
prices: Dict[str, float],
) -> Dict[str, Any]:
"""Run risk manager assessment (without sync, for backward compatibility)"""
portfolio = self.pm.get_portfolio_state()
context = {
"portfolio": portfolio,
"tickers": tickers,
"date": date,
"current_prices": prices,
}
content = (
f"Assess risk for the following portfolio and market conditions:\n"
f"{json.dumps(context, indent=2)}\n"
f"Provide risk warnings and recommendations."
)
msg = Msg(name="system", content=content, role="user")
result = await self.risk_manager.reply(msg)
return self._extract_result_from_msg(result)
async def _run_pm_with_sync(
self,
tickers: List[str],
date: str,
prices: Optional[Dict[str, float]],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> Dict[str, Any]:
"""Run PM decision-making with real-time sync"""
portfolio = self.pm.get_portfolio_state()
context = {
"analyst_signals": {
r["agent"]: r.get("content", "") for r in analyst_results
},
"risk_warnings": risk_assessment.get("content", ""),
"current_prices": prices,
"tickers": tickers,
"portfolio_cash": portfolio.get("cash", 0),
"portfolio_positions": portfolio.get("positions", {}),
}
# Add conference summary if available
if self.conference_summary:
context["conference_summary"] = self.conference_summary
content_parts = [
f"Based on the analyst signals, risk assessment, and conference discussion, "
f"make investment decisions for date {date}.\n",
f"Context:\n{json.dumps(context, indent=2)}\n",
]
if self.conference_summary:
content_parts.append(
f"\n=== Conference Summary ===\n{self.conference_summary}\n",
)
content_parts.append(
"\nUse the make_decision tool for each ticker to record your decisions. "
"After recording all decisions, provide a summary of your investment rationale.",
)
content = "".join(content_parts)
msg = Msg(name="system", content=content, role="user")
result = await self.pm.reply(msg)
extracted = self._extract_result_from_msg(result)
# Sync retrieved memory first
await self._sync_memory_if_retrieved(self.pm)
# Broadcast PM decision via StateSync
if self.state_sync:
text_content = self._extract_text_content(result.content)
await self.state_sync.on_agent_complete(
agent_id="portfolio_manager",
content=text_content,
)
return extracted
async def _run_pm(
self,
tickers: List[str],
date: str,
prices: Dict[str, float],
analyst_results: List[Dict[str, Any]],
risk_assessment: Dict[str, Any],
) -> Dict[str, Any]:
"""Run PM decision-making (without sync, for backward compatibility)"""
portfolio = self.pm.get_portfolio_state()
context = {
"analyst_signals": {
r["agent"]: r.get("content", "") for r in analyst_results
},
"risk_warnings": risk_assessment.get("content", ""),
"current_prices": prices,
"tickers": tickers,
"portfolio_cash": portfolio.get("cash", 0),
"portfolio_positions": portfolio.get("positions", {}),
}
content = (
f"Based on the analyst signals and risk assessment, make investment decisions "
f"for date {date}.\n"
f"Context:\n{json.dumps(context, indent=2)}\n\n"
f"Use the make_decision tool for each ticker to record your decisions. "
f"After recording all decisions, provide a summary of your investment rationale."
)
msg = Msg(name="system", content=content, role="user")
result = await self.pm.reply(msg)
return self._extract_result_from_msg(result)
def _execute_decisions(
self,
decisions: Dict[str, Dict],
prices: Optional[Dict[str, float]],
date: str,
) -> Dict[str, Any]:
"""Execute PM decisions with provided prices"""
if not decisions:
return {
"executed_trades": [],
"portfolio": self.pm.get_portfolio_state(),
}
executor = PortfolioTradeExecutor(
initial_portfolio=self.pm.get_portfolio_state(),
)
executed_trades = []
for ticker, decision in decisions.items():
action = decision.get("action", "hold")
quantity = decision.get("quantity", 0)
if action == "hold" or quantity == 0:
continue
price = prices.get(ticker)
if not price or price <= 0:
logger.warning(f"No price for {ticker}, skipping trade")
continue
result = executor.execute_trade(
ticker=ticker,
action=action,
quantity=quantity,
price=price,
current_date=date,
)
if result.get("status") == "success":
executed_trades.append(
{
"ticker": ticker,
"action": action,
"quantity": quantity,
"price": price,
},
)
updated_portfolio = executor.portfolio.copy()
self.pm.update_portfolio(updated_portfolio)
return {
"executed_trades": executed_trades,
"portfolio": updated_portfolio,
}
def _extract_result_from_msg(self, msg: Msg) -> Dict[str, Any]:
"""Extract result dictionary from Msg object"""
result = {
"agent": msg.name,
"content": msg.content,
}
if hasattr(msg, "metadata") and msg.metadata:
result.update(msg.metadata)
if isinstance(msg.content, str):
try:
result["content_parsed"] = json.loads(msg.content)
except json.JSONDecodeError:
pass
return result
def _extract_text_content(self, content: Any) -> str:
"""
Extract plain text from AgentScope Msg content
AgentScope content can be:
- str: plain text
- list: list of TextBlocks like [{'type': 'text', 'text': '...'}]
- dict: single TextBlock
"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
# TextBlock format: {'type': 'text', 'text': '...'}
if item.get("type") == "text" and "text" in item:
texts.append(item["text"])
elif "content" in item:
texts.append(str(item["content"]))
else:
texts.append(str(item))
elif isinstance(item, str):
texts.append(item)
else:
texts.append(str(item))
return "\n".join(texts)
if isinstance(content, dict):
if content.get("type") == "text" and "text" in content:
return content["text"]
return str(content)
return str(content)
def _format_pm_decisions(self, decisions: Dict[str, Dict]) -> str:
"""Format PM decisions as a human-readable string"""
if not decisions:
return "Portfolio analysis completed. No trades recommended."
decision_texts = []
for ticker, decision in decisions.items():
action = decision.get("action", "hold")
quantity = decision.get("quantity", 0)
reasoning = decision.get("reasoning", "")
if action != "hold" and quantity > 0:
decision_texts.append(
f"{action.upper()} {quantity} {ticker}: {reasoning}",
)
if decision_texts:
return "Decisions: " + "; ".join(decision_texts)
return "Portfolio analysis completed. No trades recommended."
def load_agents_from_workspace(
self,
workspace_id: str,
agent_factory: Optional[Any] = None,
) -> Dict[str, Any]:
"""
Load agents from workspace using AgentFactory.
This method supports the new EvoAgent architecture by loading
agents from a workspace instead of using hardcoded agents.
Args:
workspace_id: Workspace identifier
agent_factory: Optional AgentFactory instance (uses self.agent_factory if None)
Returns:
Dictionary with loaded agents:
{
"analysts": List[EvoAgent],
"risk_manager": EvoAgent,
"portfolio_manager": EvoAgent,
}
Raises:
ValueError: If workspace doesn't exist or no agents found
"""
factory = agent_factory or self.agent_factory
if factory is None:
from backend.agents import AgentFactory
factory = AgentFactory()
# Check workspace exists
if not factory.workspaces_root.exists():
raise ValueError(f"Workspaces root does not exist: {factory.workspaces_root}")
workspace_dir = factory.workspaces_root / workspace_id
if not workspace_dir.exists():
raise ValueError(f"Workspace '{workspace_id}' does not exist")
# Load agents from workspace
agents_data = factory.list_agents(workspace_id=workspace_id)
if not agents_data:
raise ValueError(f"No agents found in workspace '{workspace_id}'")
# Categorize agents by type
analysts = []
risk_manager = None
portfolio_manager = None
for agent_data in agents_data:
agent_type = agent_data.get("agent_type", "unknown")
agent_id = agent_data.get("agent_id")
# Load full agent configuration
config_path = Path(agent_data.get("config_path", ""))
if config_path.exists():
agent = factory.load_agent(agent_id, workspace_id)
if agent_type.endswith("_analyst"):
analysts.append(agent)
elif agent_type == "risk_manager":
risk_manager = agent
elif agent_type == "portfolio_manager":
portfolio_manager = agent
if not analysts:
raise ValueError(f"No analysts found in workspace '{workspace_id}'")
if risk_manager is None:
raise ValueError(f"No risk_manager found in workspace '{workspace_id}'")
if portfolio_manager is None:
raise ValueError(f"No portfolio_manager found in workspace '{workspace_id}'")
return {
"analysts": analysts,
"risk_manager": risk_manager,
"portfolio_manager": portfolio_manager,
}
def reload_agents_from_workspace(self, workspace_id: Optional[str] = None) -> None:
"""
Reload all agents from workspace.
This updates self.analysts, self.risk_manager, and self.pm
with agents loaded from the specified workspace.
Args:
workspace_id: Workspace ID (uses self.workspace_id if None)
"""
ws_id = workspace_id or self.workspace_id
if not ws_id:
raise ValueError("No workspace_id specified")
loaded = self.load_agents_from_workspace(ws_id)
self.analysts = loaded["analysts"]
self.risk_manager = loaded["risk_manager"]
self.pm = loaded["portfolio_manager"]
self.workspace_id = ws_id
logger.info(f"Reloaded {len(self.analysts)} analysts from workspace '{ws_id}'")
def _runtime_update_status(self, agent: Any, status: str) -> None:
if not self.runtime_manager:
return
agent_id = getattr(agent, "agent_id", None) or getattr(agent, "name", None)
if not agent_id:
return
self.runtime_manager.update_agent_status(agent_id, status, self._session_key)
def _runtime_batch_status(self, agents: List[Any], status: str) -> None:
for agent in agents:
self._runtime_update_status(agent, status)
def _all_analysts(self) -> List[Any]:
"""Return static analysts plus runtime-created analysts."""
return list(self.analysts) + list(self._dynamic_analysts.values())
def _create_runtime_analyst(self, agent_id: str, analyst_type: str) -> str:
"""Create one runtime analyst instance."""
if analyst_type not in ANALYST_TYPES:
return (
f"Unknown analyst_type '{analyst_type}'. "
f"Available: {', '.join(ANALYST_TYPES.keys())}"
)
if agent_id in {agent.name for agent in self._all_analysts()}:
return f"Analyst '{agent_id}' already exists."
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
project_root = Path(__file__).resolve().parents[2]
personas = get_prompt_loader().load_yaml_config("analyst", "personas")
persona = personas.get(analyst_type, {})
WorkspaceManager(project_root=project_root).ensure_agent_assets(
config_name=config_name,
agent_id=agent_id,
role_seed=persona.get("description", "").strip(),
style_seed="\n".join(f"- {item}" for item in persona.get("focus", [])),
policy_seed=(
"State a clear signal, confidence, and the conditions "
"that would invalidate the thesis."
),
)
agent = AnalystAgent(
analyst_type=analyst_type,
toolkit=create_agent_toolkit(
agent_id=agent_id,
config_name=config_name,
active_skill_dirs=[],
),
model=get_agent_model(analyst_type),
formatter=get_agent_formatter(analyst_type),
agent_id=agent_id,
config={"config_name": config_name},
)
self._dynamic_analysts[agent_id] = agent
update_active_analysts(
project_root=project_root,
config_name=config_name,
available_analysts=[item.name for item in self._all_analysts()],
add=[agent_id],
)
return f"Created runtime analyst '{agent_id}' ({analyst_type})."
def _remove_runtime_analyst(self, agent_id: str) -> str:
"""Remove one runtime-created analyst instance."""
if agent_id not in self._dynamic_analysts:
return f"Runtime analyst '{agent_id}' not found."
self._dynamic_analysts.pop(agent_id, None)
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
project_root = Path(__file__).resolve().parents[2]
update_active_analysts(
project_root=project_root,
config_name=config_name,
available_analysts=[item.name for item in self._all_analysts()],
remove=[agent_id],
)
return f"Removed runtime analyst '{agent_id}'."
def _get_active_analysts(self) -> List[Any]:
"""Resolve active analyst participants from run-scoped team pipeline config."""
config_name = getattr(self.pm, "config", {}).get("config_name", "default")
project_root = Path(__file__).resolve().parents[2]
analyst_map = {agent.name: agent for agent in self._all_analysts()}
active_ids = resolve_active_analysts(
project_root=project_root,
config_name=config_name,
available_analysts=list(analyst_map.keys()),
)
return [analyst_map[agent_id] for agent_id in active_ids if agent_id in analyst_map]
def _runtime_log_event(self, event: str, details: Optional[Dict[str, Any]] = None) -> None:
if not self.runtime_manager:
return
self.runtime_manager.log_event(event, details)