Files
evotraders/backend/core/pipeline_runner.py
cillin 45c3996434 refactor(cleanup): remove legacy agent classes and complete EvoAgent migration
Remove deprecated AnalystAgent, PMAgent, and RiskAgent classes.
All agent creation now goes through UnifiedAgentFactory creating EvoAgent instances.

- Delete backend/agents/analyst.py (169 lines)
- Delete backend/agents/portfolio_manager.py (420 lines)
- Delete backend/agents/risk_manager.py (139 lines)
- Update all imports to use EvoAgent exclusively
- Clean up unused imports across 25 files
- Update tests to work with simplified agent structure

Constraint: EvoAgent is now the single source of truth for all agent roles
Constraint: UnifiedAgentFactory handles runtime agent creation
Rejected: Keep legacy aliases | creates maintenance burden
Confidence: high
Scope-risk: moderate (affects agent instantiation paths)
Directive: All new agent features must be added to EvoAgent, not legacy classes
Not-tested: Kubernetes sandbox executor (marked with TODO)
2026-04-02 10:51:14 +08:00

741 lines
26 KiB
Python

# -*- coding: utf-8 -*-
"""
Pipeline Runner - Independent trading pipeline execution
This module provides functions to start/stop trading pipelines
that can be called from the REST API.
"""
from __future__ import annotations
import asyncio
import os
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Any, Dict, List, Optional, Callable
from backend.agents import EvoAgent
from backend.agents.agent_workspace import load_agent_workspace_config
from backend.agents.skills_manager import SkillsManager
from backend.agents.toolkit_factory import create_agent_toolkit, load_agent_profiles
from backend.agents.prompt_loader import get_prompt_loader
from backend.agents.workspace_manager import WorkspaceManager
from backend.config.constants import ANALYST_TYPES
from backend.core.pipeline import TradingPipeline
from backend.core.scheduler import BacktestScheduler, Scheduler
from backend.llm.models import get_agent_formatter, get_agent_model
from backend.runtime.manager import (
TradingRuntimeManager,
set_global_runtime_manager,
clear_global_runtime_manager,
set_shutdown_event,
clear_shutdown_event,
is_shutdown_requested,
)
from backend.services.market import MarketService
from backend.services.storage import StorageService
from backend.services.gateway import Gateway
from backend.utils.settlement import SettlementCoordinator
_prompt_loader = get_prompt_loader()
# Global gateway reference for cleanup
_gateway_instance: Optional[Gateway] = None
# Global long-term memory references for persistence
_long_term_memories: List[Any] = []
def _set_gateway(gateway: Optional[Gateway]) -> None:
"""Set global gateway reference."""
global _gateway_instance
_gateway_instance = gateway
def stop_gateway() -> None:
"""Stop the running gateway if exists."""
global _gateway_instance
if _gateway_instance is not None:
try:
_gateway_instance.stop()
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Error stopping gateway: {e}")
finally:
_gateway_instance = None
def _set_long_term_memories(memories: List[Any]) -> None:
"""Set global long-term memory references."""
global _long_term_memories
_long_term_memories = memories
def _clear_long_term_memories() -> None:
"""Clear global long-term memory references."""
global _long_term_memories
_long_term_memories = []
def _persist_long_term_memories_sync() -> None:
"""
Synchronously persist all long-term memories before shutdown.
This function ensures all memory data is flushed to disk/vector store
before the process exits. Should be called during cleanup.
"""
global _long_term_memories
if not _long_term_memories:
return
import logging
logger = logging.getLogger(__name__)
logger.info(f"[MemoryPersistence] Persisting {len(_long_term_memories)} memory instances...")
for i, memory in enumerate(_long_term_memories):
try:
# Try to save memory if it has a save method
if hasattr(memory, 'save') and callable(getattr(memory, 'save')):
if hasattr(memory, 'sync') and callable(getattr(memory, 'sync')):
# Use sync version if available
memory.sync()
logger.debug(f"[MemoryPersistence] Synced memory {i}")
else:
# Try async save with event loop
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Schedule save in running loop
loop.create_task(memory.save())
logger.debug(f"[MemoryPersistence] Scheduled save for memory {i}")
else:
loop.run_until_complete(memory.save())
logger.debug(f"[MemoryPersistence] Saved memory {i}")
except RuntimeError:
# No event loop, skip async save
pass
# Try to flush any pending writes
if hasattr(memory, 'flush') and callable(getattr(memory, 'flush')):
memory.flush()
logger.debug(f"[MemoryPersistence] Flushed memory {i}")
except Exception as e:
logger.warning(f"[MemoryPersistence] Failed to persist memory {i}: {e}")
logger.info("[MemoryPersistence] Memory persistence complete")
async def _persist_long_term_memories_async() -> None:
"""
Asynchronously persist all long-term memories.
This is the preferred method for persisting memories when
an async context is available.
"""
global _long_term_memories
if not _long_term_memories:
return
import logging
logger = logging.getLogger(__name__)
logger.info(f"[MemoryPersistence] Persisting {len(_long_term_memories)} memory instances async...")
for i, memory in enumerate(_long_term_memories):
try:
# Try async save first
if hasattr(memory, 'save') and callable(getattr(memory, 'save')):
await memory.save()
logger.debug(f"[MemoryPersistence] Saved memory {i} (async)")
# Try flush if available
if hasattr(memory, 'flush') and callable(getattr(memory, 'flush')):
memory.flush()
logger.debug(f"[MemoryPersistence] Flushed memory {i}")
except Exception as e:
logger.warning(f"[MemoryPersistence] Failed to persist memory {i}: {e}")
logger.info("[MemoryPersistence] Async memory persistence complete")
def create_long_term_memory(agent_name: str, run_id: str, run_dir: Path):
"""Create ReMeTaskLongTermMemory for an agent."""
try:
from agentscope.memory import ReMeTaskLongTermMemory
from agentscope.model import DashScopeChatModel
from agentscope.embedding import DashScopeTextEmbedding
except ImportError:
return None
api_key = os.getenv("MEMORY_API_KEY")
if not api_key:
return None
memory_dir = str(run_dir / "memory")
return ReMeTaskLongTermMemory(
agent_name=agent_name,
user_name=agent_name,
model=DashScopeChatModel(
model_name=os.getenv("MEMORY_MODEL_NAME", "qwen3-max"),
api_key=api_key,
stream=False,
),
embedding_model=DashScopeTextEmbedding(
model_name=os.getenv("MEMORY_EMBEDDING_MODEL", "text-embedding-v4"),
api_key=api_key,
dimensions=1024,
),
**{
"vector_store.default.backend": "local",
"vector_store.default.params.store_dir": memory_dir,
},
)
def _resolve_evo_agent_ids() -> set[str]:
"""Return agent ids selected to use EvoAgent.
By default, all supported roles use EvoAgent.
"""
raw = os.getenv("EVO_AGENT_IDS", "")
if not raw.strip():
# Default: all supported roles use EvoAgent
return set(ANALYST_TYPES) | {"risk_manager", "portfolio_manager"}
requested = {
item.strip()
for item in raw.split(",")
if item.strip()
}
return {
agent_id
for agent_id in requested
if agent_id in ANALYST_TYPES or agent_id in {"risk_manager", "portfolio_manager"}
}
def _create_analyst_agent(
*,
analyst_type: str,
run_id: str,
model,
formatter,
skills_manager: SkillsManager,
active_skill_map: Dict[str, list[Path]],
long_term_memory=None,
):
"""Create one analyst agent, optionally using EvoAgent."""
active_skill_dirs = active_skill_map.get(analyst_type, [])
toolkit = create_agent_toolkit(
analyst_type,
run_id,
active_skill_dirs=active_skill_dirs,
)
workspace_dir = skills_manager.get_agent_asset_dir(run_id, analyst_type)
agent_config = load_agent_workspace_config(workspace_dir / "agent.yaml")
agent = EvoAgent(
agent_id=analyst_type,
config_name=run_id,
workspace_dir=workspace_dir,
model=model,
formatter=formatter,
skills_manager=skills_manager,
prompt_files=agent_config.prompt_files,
long_term_memory=long_term_memory,
)
agent.toolkit = toolkit
setattr(agent, "workspace_id", run_id)
return agent
def _create_risk_manager_agent(
*,
run_id: str,
model,
formatter,
skills_manager: SkillsManager,
active_skill_map: Dict[str, list[Path]],
long_term_memory=None,
):
"""Create the risk manager, optionally using EvoAgent."""
active_skill_dirs = active_skill_map.get("risk_manager", [])
toolkit = create_agent_toolkit(
"risk_manager",
run_id,
active_skill_dirs=active_skill_dirs,
)
use_evo_agent = "risk_manager" in _resolve_evo_agent_ids()
if use_evo_agent:
workspace_dir = skills_manager.get_agent_asset_dir(run_id, "risk_manager")
agent_config = load_agent_workspace_config(workspace_dir / "agent.yaml")
agent = EvoAgent(
agent_id="risk_manager",
config_name=run_id,
workspace_dir=workspace_dir,
model=model,
formatter=formatter,
skills_manager=skills_manager,
prompt_files=agent_config.prompt_files,
long_term_memory=long_term_memory,
)
agent.toolkit = toolkit
setattr(agent, "workspace_id", run_id)
return agent
return RiskAgent(
model=model,
formatter=formatter,
name="risk_manager",
config={"config_name": run_id},
long_term_memory=long_term_memory,
toolkit=toolkit,
)
def _create_portfolio_manager_agent(
*,
run_id: str,
model,
formatter,
initial_cash: float,
margin_requirement: float,
skills_manager: SkillsManager,
active_skill_map: Dict[str, list[Path]],
long_term_memory=None,
):
"""Create the portfolio manager, optionally using EvoAgent."""
active_skill_dirs = active_skill_map.get("portfolio_manager", [])
use_evo_agent = "portfolio_manager" in _resolve_evo_agent_ids()
if use_evo_agent:
workspace_dir = skills_manager.get_agent_asset_dir(
run_id,
"portfolio_manager",
)
agent_config = load_agent_workspace_config(workspace_dir / "agent.yaml")
agent = EvoAgent(
agent_id="portfolio_manager",
config_name=run_id,
workspace_dir=workspace_dir,
model=model,
formatter=formatter,
skills_manager=skills_manager,
prompt_files=agent_config.prompt_files,
initial_cash=initial_cash,
margin_requirement=margin_requirement,
long_term_memory=long_term_memory,
)
agent.toolkit = create_agent_toolkit(
"portfolio_manager",
run_id,
owner=agent,
active_skill_dirs=active_skill_dirs,
)
setattr(agent, "workspace_id", run_id)
return agent
return PMAgent(
name="portfolio_manager",
model=model,
formatter=formatter,
initial_cash=initial_cash,
margin_requirement=margin_requirement,
config={"config_name": run_id},
long_term_memory=long_term_memory,
toolkit_factory=create_agent_toolkit,
toolkit_factory_kwargs={
"active_skill_dirs": active_skill_dirs,
},
)
def create_agents(
run_id: str,
run_dir: Path,
initial_cash: float,
margin_requirement: float,
enable_long_term_memory: bool = False,
):
"""Create all agents for the system."""
analysts = []
long_term_memories = []
# Initialize workspace manager and assets
workspace_manager = WorkspaceManager()
workspace_manager.initialize_default_assets(
config_name=run_id,
agent_ids=list(ANALYST_TYPES.keys()) + ["risk_manager", "portfolio_manager"],
analyst_personas=_prompt_loader.load_yaml_config("analyst", "personas"),
)
profiles = load_agent_profiles()
skills_manager = SkillsManager()
active_skill_map = skills_manager.prepare_active_skills(
config_name=run_id,
agent_defaults={
agent_id: profile.get("skills", [])
for agent_id, profile in profiles.items()
},
)
# Create analyst agents
for analyst_type in ANALYST_TYPES:
model = get_agent_model(analyst_type)
formatter = get_agent_formatter(analyst_type)
long_term_memory = None
if enable_long_term_memory:
long_term_memory = create_long_term_memory(analyst_type, run_id, run_dir)
if long_term_memory:
long_term_memories.append(long_term_memory)
analyst = _create_analyst_agent(
analyst_type=analyst_type,
run_id=run_id,
model=model,
formatter=formatter,
skills_manager=skills_manager,
active_skill_map=active_skill_map,
long_term_memory=long_term_memory,
)
analysts.append(analyst)
# Create risk manager
risk_long_term_memory = None
if enable_long_term_memory:
risk_long_term_memory = create_long_term_memory("risk_manager", run_id, run_dir)
if risk_long_term_memory:
long_term_memories.append(risk_long_term_memory)
risk_manager = _create_risk_manager_agent(
run_id=run_id,
model=get_agent_model("risk_manager"),
formatter=get_agent_formatter("risk_manager"),
skills_manager=skills_manager,
active_skill_map=active_skill_map,
long_term_memory=risk_long_term_memory,
)
# Create portfolio manager
pm_long_term_memory = None
if enable_long_term_memory:
pm_long_term_memory = create_long_term_memory("portfolio_manager", run_id, run_dir)
if pm_long_term_memory:
long_term_memories.append(pm_long_term_memory)
portfolio_manager = _create_portfolio_manager_agent(
run_id=run_id,
model=get_agent_model("portfolio_manager"),
formatter=get_agent_formatter("portfolio_manager"),
initial_cash=initial_cash,
margin_requirement=margin_requirement,
skills_manager=skills_manager,
active_skill_map=active_skill_map,
long_term_memory=pm_long_term_memory,
)
return analysts, risk_manager, portfolio_manager, long_term_memories
async def run_pipeline(
run_id: str,
run_dir: Path,
bootstrap: Dict[str, Any],
stop_event: asyncio.Event,
message_callback: Optional[Callable[[str, Any], None]] = None,
) -> None:
"""
Run the trading pipeline with the given configuration.
Service Startup Order:
Phase 1: WebSocket Server - Frontend can connect
Phase 2: Market Service - Price data starts flowing
Phase 3: Agent Runtime - Create all agents
Phase 4: Pipeline & Scheduler - Trading logic ready
Phase 5: Gateway Fully Operational - All systems running
Args:
run_id: Unique run identifier (timestamp)
run_dir: Run directory path
bootstrap: Bootstrap configuration
stop_event: Event to signal pipeline stop
message_callback: Optional callback for sending messages to clients
"""
import logging
logger = logging.getLogger(__name__)
# Set global shutdown event
set_shutdown_event(stop_event)
logger.info(f"[Pipeline {run_id}] ======================================")
logger.info(f"[Pipeline {run_id}] Starting with 5-phase initialization...")
logger.info(f"[Pipeline {run_id}] ======================================")
try:
# Extract config values
tickers = bootstrap.get("tickers", ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "AMD", "NFLX", "AVGO", "PLTR", "COIN"])
initial_cash = float(bootstrap.get("initial_cash", 100000.0))
margin_requirement = float(bootstrap.get("margin_requirement", 0.0))
max_comm_cycles = int(bootstrap.get("max_comm_cycles", 2))
schedule_mode = bootstrap.get("schedule_mode", "daily")
trigger_time = bootstrap.get("trigger_time", "09:30")
interval_minutes = int(bootstrap.get("interval_minutes", 60))
heartbeat_interval = int(bootstrap.get("heartbeat_interval", 0))
mode = bootstrap.get("mode", "live")
start_date = bootstrap.get("start_date")
end_date = bootstrap.get("end_date")
enable_memory = bootstrap.get("enable_memory", False)
is_backtest = mode == "backtest"
# ======================================================================
# PHASE 0: Initialize runtime manager
# ======================================================================
logger.info("[Phase 0/5] Initializing runtime manager...")
from backend.api.runtime import runtime_manager
if runtime_manager is None:
runtime_manager = TradingRuntimeManager(
config_name=run_id,
run_dir=run_dir,
bootstrap=bootstrap,
)
runtime_manager.prepare_run()
set_global_runtime_manager(runtime_manager)
# ======================================================================
# PHASE 1 & 2: Create infrastructure services (Market, Storage)
# These will be started by Gateway in the correct order
# ======================================================================
logger.info("[Phase 1-2/5] Creating infrastructure services...")
# Create storage service
storage_service = StorageService(
dashboard_dir=run_dir / "team_dashboard",
initial_cash=initial_cash,
config_name=run_id,
)
if not storage_service.files["summary"].exists():
storage_service.initialize_empty_dashboard()
else:
storage_service.update_leaderboard_model_info()
# Create market service (data source)
market_service = MarketService(
tickers=tickers,
poll_interval=10,
backtest_mode=is_backtest,
api_key=os.getenv("FINNHUB_API_KEY") if not is_backtest else None,
backtest_start_date=start_date if is_backtest else None,
backtest_end_date=end_date if is_backtest else None,
)
# ======================================================================
# PHASE 3: Create Agent Runtime
# ======================================================================
logger.info("[Phase 3/5] Creating agent runtime...")
analysts, risk_manager, pm, long_term_memories = create_agents(
run_id=run_id,
run_dir=run_dir,
initial_cash=initial_cash,
margin_requirement=margin_requirement,
enable_long_term_memory=enable_memory,
)
# Register agents with runtime manager
for agent in analysts + [risk_manager, pm]:
agent_id = getattr(agent, "agent_id", None) or getattr(agent, "name", None)
if agent_id:
runtime_manager.register_agent(agent_id)
# Load portfolio state
portfolio_state = storage_service.load_portfolio_state()
pm.load_portfolio_state(portfolio_state)
# Create settlement coordinator
settlement_coordinator = SettlementCoordinator(
storage=storage_service,
initial_capital=initial_cash,
)
# ======================================================================
# PHASE 4: Create Pipeline & Scheduler
# ======================================================================
logger.info("[Phase 4/5] Creating pipeline and scheduler...")
# Create pipeline
pipeline = TradingPipeline(
analysts=analysts,
risk_manager=risk_manager,
portfolio_manager=pm,
settlement_coordinator=settlement_coordinator,
max_comm_cycles=max_comm_cycles,
runtime_manager=runtime_manager,
)
# Create scheduler
scheduler_callback = None
live_scheduler = None
if is_backtest:
backtest_scheduler = BacktestScheduler(
start_date=start_date,
end_date=end_date,
trading_calendar="NYSE",
delay_between_days=0.5,
)
backtest_scheduler.get_trading_dates()
async def scheduler_callback_fn(callback):
await backtest_scheduler.start(callback)
scheduler_callback = scheduler_callback_fn
else:
# Live mode
live_scheduler = Scheduler(
mode=schedule_mode,
trigger_time=trigger_time,
interval_minutes=interval_minutes,
heartbeat_interval=heartbeat_interval if heartbeat_interval > 0 else None,
config={"config_name": run_id},
)
async def scheduler_callback_fn(callback):
await live_scheduler.start(callback)
scheduler_callback = scheduler_callback_fn
# ======================================================================
# PHASE 5: Start Gateway (WebSocket → Market → Scheduler)
# Gateway.start() will handle the final startup sequence:
# - WebSocket Server first (frontend can connect)
# - Market Service second (price data flows)
# - Scheduler last (trading begins)
# ======================================================================
logger.info("[Phase 5/5] Starting Gateway (WebSocket → Market → Scheduler)...")
gateway = Gateway(
market_service=market_service,
storage_service=storage_service,
pipeline=pipeline,
scheduler_callback=scheduler_callback,
config={
"mode": mode,
"backtest_mode": is_backtest,
"tickers": tickers,
"config_name": run_id,
"schedule_mode": schedule_mode,
"interval_minutes": interval_minutes,
"trigger_time": trigger_time,
"heartbeat_interval": heartbeat_interval,
"initial_cash": initial_cash,
"margin_requirement": margin_requirement,
"max_comm_cycles": max_comm_cycles,
"enable_memory": enable_memory,
},
scheduler=live_scheduler,
)
_set_gateway(gateway)
# Set global memory references for persistence
_set_long_term_memories(long_term_memories)
# Start pipeline execution
async with AsyncExitStack() as stack:
# Enter long-term memory contexts
for memory in long_term_memories:
await stack.enter_async_context(memory)
# Start Gateway - this will execute the 4-phase startup:
# Phase 1: WebSocket Server (frontend can connect immediately)
# Phase 2: Market Service (price updates start flowing)
# Phase 3: Market Status Monitor
# Phase 4: Scheduler (trading cycles begin)
gateway_task = asyncio.create_task(
gateway.start(host="0.0.0.0", port=8765)
)
logger.info("[Pipeline] Gateway startup initiated on ws://localhost:8765")
# Wait for Gateway to fully initialize all phases
await asyncio.sleep(0.5)
# Define the trading cycle callback
async def trading_cycle(session_key: str) -> None:
"""Execute one trading cycle."""
if is_shutdown_requested():
return
runtime_manager.set_session_key(session_key)
runtime_manager.log_event("cycle:start", {"session": session_key})
try:
# Fetch market data
market_data = await market_service.get_all_data()
# Run pipeline
await pipeline.run_cycle(
session_key=session_key,
market_data=market_data,
)
runtime_manager.log_event("cycle:complete", {"session": session_key})
except Exception as e:
runtime_manager.log_event("cycle:error", {"error": str(e)})
raise
# Start scheduler
if scheduler_callback:
await scheduler_callback(trading_cycle)
# Wait for stop signal
while not stop_event.is_set():
await asyncio.sleep(1)
# Cancel gateway task
if not gateway_task.done():
gateway_task.cancel()
try:
await gateway_task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
# Handle cancellation gracefully
raise
finally:
# Cleanup
logger.info("[Pipeline] Cleaning up...")
# Persist long-term memories before cleanup
try:
await _persist_long_term_memories_async()
except Exception as e:
logger.warning(f"[Pipeline] Memory persistence error: {e}")
# Stop Gateway
try:
stop_gateway()
logger.info("[Pipeline] Gateway stopped")
except Exception as e:
logger.error(f"[Pipeline] Error stopping gateway: {e}")
# Clear memory references
_clear_long_term_memories()
clear_shutdown_event()
clear_global_runtime_manager()
from backend.api.runtime import unregister_runtime_manager
unregister_runtime_manager()
logger.info("[Pipeline] Cleanup complete")