diff --git a/backend/agents/dynamic_team_types.py b/backend/agents/dynamic_team_types.py new file mode 100644 index 0000000..b40c3e9 --- /dev/null +++ b/backend/agents/dynamic_team_types.py @@ -0,0 +1,372 @@ +# -*- coding: utf-8 -*- +"""Dynamic Team Types - Core data types for PM-driven analyst team management. + +This module provides data structures for: +- Analyst persona definitions (custom analyst types) +- Analyst creation configuration (custom SOUL.md, AGENTS.md, etc.) +- Dynamic team runtime state tracking + +These types enable the Portfolio Manager to dynamically create, clone, and manage +analyst agents with custom configurations beyond the predefined 4 analyst types. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List +from datetime import datetime + + +@dataclass +class AnalystPersona: + """Analyst role definition - extends or replaces personas.yaml entries. + + Defines the identity, focus areas, and characteristics of an analyst type. + Can be used to create entirely new analyst types at runtime. + + Attributes: + name: Display name for the analyst (e.g., "期权策略分析师") + focus: List of focus areas (e.g., ["期权定价", "波动率交易"]) + description: Detailed description of the analyst's role and expertise + preferred_tools: Optional list of preferred tool types or categories + icon: Optional icon identifier for frontend display + """ + name: str + focus: List[str] + description: str + preferred_tools: Optional[List[str]] = None + icon: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "name": self.name, + "focus": self.focus, + "description": self.description, + "preferred_tools": self.preferred_tools, + "icon": self.icon, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> AnalystPersona: + """Create from dictionary.""" + return cls( + name=data["name"], + focus=data.get("focus", []), + description=data.get("description", ""), + preferred_tools=data.get("preferred_tools"), + icon=data.get("icon"), + ) + + +@dataclass +class AnalystConfig: + """Complete configuration for dynamically creating an analyst. + + This dataclass allows the PM to specify all aspects of analyst creation, + including custom workspace files, model overrides, and skill selections. + + Attributes: + persona: Complete persona definition (if creating custom type) + analyst_type: Reference to predefined type (e.g., "technical_analyst") + soul_md: Custom SOUL.md content (overrides default generation) + agents_md: Custom AGENTS.md content (overrides default generation) + profile_md: Custom PROFILE.md content (overrides default generation) + skills: List of skill IDs to enable for this analyst + model_name: Override default model for this analyst + memory_config: Custom memory system configuration + tags: Classification tags (e.g., ["options", "derivatives"]) + parent_id: If cloned, the source analyst ID + """ + # Identity configuration + persona: Optional[AnalystPersona] = None + analyst_type: Optional[str] = None # Reference to predefined type + + # Workspace file contents (override default generation) + soul_md: Optional[str] = None + agents_md: Optional[str] = None + profile_md: Optional[str] = None + bootstrap_md: Optional[str] = None + + # Runtime configuration + skills: Optional[List[str]] = field(default_factory=list) + model_name: Optional[str] = None + memory_config: Optional[Dict[str, Any]] = field(default_factory=dict) + + # Metadata + tags: Optional[List[str]] = field(default_factory=list) + parent_id: Optional[str] = None # For clone tracking + + def __post_init__(self): + """Initialize default collections.""" + if self.skills is None: + self.skills = [] + if self.memory_config is None: + self.memory_config = {} + if self.tags is None: + self.tags = [] + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "persona": self.persona.to_dict() if self.persona else None, + "analyst_type": self.analyst_type, + "soul_md": self.soul_md, + "agents_md": self.agents_md, + "profile_md": self.profile_md, + "bootstrap_md": self.bootstrap_md, + "skills": self.skills, + "model_name": self.model_name, + "memory_config": self.memory_config, + "tags": self.tags, + "parent_id": self.parent_id, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> AnalystConfig: + """Create from dictionary.""" + persona_data = data.get("persona") + return cls( + persona=AnalystPersona.from_dict(persona_data) if persona_data else None, + analyst_type=data.get("analyst_type"), + soul_md=data.get("soul_md"), + agents_md=data.get("agents_md"), + profile_md=data.get("profile_md"), + bootstrap_md=data.get("bootstrap_md"), + skills=data.get("skills", []), + model_name=data.get("model_name"), + memory_config=data.get("memory_config", {}), + tags=data.get("tags", []), + parent_id=data.get("parent_id"), + ) + + def get_effective_analyst_type(self) -> Optional[str]: + """Get the effective analyst type for tool selection. + + Returns analyst_type if set, otherwise derives from persona name. + """ + if self.analyst_type: + return self.analyst_type + if self.persona: + # Derive type ID from persona name (e.g., "期权策略分析师" -> "options_strategist") + return self._derive_type_id(self.persona.name) + return None + + @staticmethod + def _derive_type_id(name: str) -> str: + """Derive a type ID from a display name.""" + import re + # Convert Chinese or mixed names to snake_case + # Remove special characters, keep alphanumeric and spaces + cleaned = re.sub(r'[^\w\s]', '', name) + # Convert to lowercase and replace spaces with underscores + return cleaned.lower().strip().replace(' ', '_') + + +@dataclass +class DynamicAnalystInstance: + """Runtime information about a dynamically created analyst. + + Tracks the creation metadata and current state of a dynamic analyst. + + Attributes: + agent_id: Unique identifier for this analyst instance + config: The configuration used to create this analyst + created_at: Timestamp when the analyst was created + created_by: Identifier of the agent that created this analyst (usually PM) + status: Current status (active, paused, removed) + """ + agent_id: str + config: AnalystConfig + created_at: str = field(default_factory=lambda: datetime.now().isoformat()) + created_by: str = "portfolio_manager" + status: str = "active" # active, paused, removed + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "agent_id": self.agent_id, + "config": self.config.to_dict(), + "created_at": self.created_at, + "created_by": self.created_by, + "status": self.status, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> DynamicAnalystInstance: + """Create from dictionary.""" + return cls( + agent_id=data["agent_id"], + config=AnalystConfig.from_dict(data.get("config", {})), + created_at=data.get("created_at", datetime.now().isoformat()), + created_by=data.get("created_by", "portfolio_manager"), + status=data.get("status", "active"), + ) + + +@dataclass +class DynamicTeamState: + """Complete runtime state for dynamic analyst team management. + + This state is persisted alongside TEAM_PIPELINE.yaml and tracks: + - Custom analyst types registered at runtime + - All dynamically created analyst instances + - Configuration snapshots for cloning + + Attributes: + run_id: The run configuration this state belongs to + registered_types: Runtime-registered analyst type definitions + instances: Dynamically created analyst instances + version: State format version for migration handling + """ + run_id: str + registered_types: Dict[str, AnalystPersona] = field(default_factory=dict) + instances: Dict[str, DynamicAnalystInstance] = field(default_factory=dict) + version: int = 1 + + def register_type(self, type_id: str, persona: AnalystPersona) -> bool: + """Register a new analyst type. + + Returns: + True if registered, False if type_id already exists + """ + if type_id in self.registered_types: + return False + self.registered_types[type_id] = persona + return True + + def add_instance(self, instance: DynamicAnalystInstance) -> None: + """Add a new analyst instance.""" + self.instances[instance.agent_id] = instance + + def remove_instance(self, agent_id: str) -> bool: + """Mark an instance as removed. + + Returns: + True if instance was found and removed + """ + if agent_id in self.instances: + self.instances[agent_id].status = "removed" + return True + return False + + def get_active_instances(self) -> List[DynamicAnalystInstance]: + """Get all active (non-removed) analyst instances.""" + return [ + inst for inst in self.instances.values() + if inst.status == "active" + ] + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "run_id": self.run_id, + "registered_types": { + k: v.to_dict() for k, v in self.registered_types.items() + }, + "instances": { + k: v.to_dict() for k, v in self.instances.items() + }, + "version": self.version, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> DynamicTeamState: + """Create from dictionary.""" + registered_types = { + k: AnalystPersona.from_dict(v) + for k, v in data.get("registered_types", {}).items() + } + instances = { + k: DynamicAnalystInstance.from_dict(v) + for k, v in data.get("instances", {}).items() + } + return cls( + run_id=data.get("run_id", "unknown"), + registered_types=registered_types, + instances=instances, + version=data.get("version", 1), + ) + + +@dataclass +class CreateAnalystResult: + """Result of creating a dynamic analyst. + + Attributes: + success: Whether creation was successful + agent_id: The ID of the created analyst (if successful) + message: Human-readable result message + error: Error details (if failed) + """ + success: bool + agent_id: Optional[str] = None + message: str = "" + error: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for API responses.""" + return { + "success": self.success, + "agent_id": self.agent_id, + "message": self.message, + "error": self.error, + } + + +@dataclass +class CloneAnalystRequest: + """Request to clone an existing analyst. + + Attributes: + source_id: ID of the analyst to clone + new_id: ID for the new analyst + config_overrides: Configuration fields to override + """ + source_id: str + new_id: str + config_overrides: Optional[Dict[str, Any]] = field(default_factory=dict) + + def __post_init__(self): + if self.config_overrides is None: + self.config_overrides = {} + + +@dataclass +class AnalystTypeInfo: + """Information about an available analyst type. + + Used for listing all available types (predefined + runtime-registered). + + Attributes: + type_id: Unique identifier for this type + name: Display name + description: Type description + is_builtin: Whether this is a built-in type or runtime-registered + source: Source of this type (e.g., "constants", "runtime", "config") + """ + type_id: str + name: str + description: str + is_builtin: bool + source: str + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for API responses.""" + return { + "type_id": self.type_id, + "name": self.name, + "description": self.description, + "is_builtin": self.is_builtin, + "source": self.source, + } + + +__all__ = [ + "AnalystPersona", + "AnalystConfig", + "DynamicAnalystInstance", + "DynamicTeamState", + "CreateAnalystResult", + "CloneAnalystRequest", + "AnalystTypeInfo", +] diff --git a/backend/agents/toolkit_factory.py b/backend/agents/toolkit_factory.py index 30c148a..689c4cd 100644 --- a/backend/agents/toolkit_factory.py +++ b/backend/agents/toolkit_factory.py @@ -14,6 +14,14 @@ from backend.agents.agent_workspace import load_agent_workspace_config from backend.agents.skills_manager import SkillsManager from backend.agents.skill_metadata import parse_skill_metadata from backend.config.bootstrap_config import get_bootstrap_config_for_run +from backend.tools.dynamic_team_tools import ( + create_analyst, + clone_analyst, + remove_analyst, + list_analyst_types, + get_analyst_info, + get_team_summary, +) def load_agent_profiles() -> Dict[str, Dict[str, Any]]: @@ -138,6 +146,23 @@ def _register_portfolio_tool_groups(toolkit: Any, pm_agent: Any) -> None: group_name="portfolio_ops", ) + # Register dynamic team management tools + toolkit.create_tool_group( + group_name="dynamic_team", + description="Dynamic analyst team management tools.", + active=False, + notes=( + "Use these tools to create, clone, and manage analyst agents dynamically. " + "Only available when allow_dynamic_team_update is enabled." + ), + ) + toolkit.register_tool_function(create_analyst, group_name="dynamic_team") + toolkit.register_tool_function(clone_analyst, group_name="dynamic_team") + toolkit.register_tool_function(remove_analyst, group_name="dynamic_team") + toolkit.register_tool_function(list_analyst_types, group_name="dynamic_team") + toolkit.register_tool_function(get_analyst_info, group_name="dynamic_team") + toolkit.register_tool_function(get_team_summary, group_name="dynamic_team") + def _register_risk_tool_groups(toolkit: Any) -> None: """注册风险工具组""" diff --git a/backend/api/__init__.py b/backend/api/__init__.py index 71bb5a1..4cbca98 100644 --- a/backend/api/__init__.py +++ b/backend/api/__init__.py @@ -13,6 +13,7 @@ from .workspaces import router as workspaces_router from .guard import router as guard_router from .runtime import router as runtime_router from .runs import router as runs_router +from .dynamic_team import router as dynamic_team_router __all__ = [ "agents_router", @@ -20,4 +21,5 @@ __all__ = [ "guard_router", "runtime_router", "runs_router", + "dynamic_team_router", ] diff --git a/backend/api/dynamic_team.py b/backend/api/dynamic_team.py new file mode 100644 index 0000000..1e3e716 --- /dev/null +++ b/backend/api/dynamic_team.py @@ -0,0 +1,404 @@ +# -*- coding: utf-8 -*- +"""Dynamic Team API - REST endpoints for managing analyst team dynamically. + +This module provides API endpoints for: +- Creating new analysts with custom configuration +- Cloning existing analysts +- Removing analysts +- Listing available analyst types +- Getting analyst information +- Managing team composition + +These endpoints allow both the PM agent (via tool calls) and frontend +(via HTTP) to manage the analyst team dynamically. +""" +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from backend.agents.dynamic_team_types import ( + AnalystPersona, + AnalystConfig, + AnalystTypeInfo, +) +from backend.config.constants import ANALYST_TYPES +from backend.agents.prompt_loader import get_prompt_loader + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/dynamic-team", tags=["dynamic-team"]) + +PROJECT_ROOT = Path(__file__).resolve().parents[2] + + +# Pydantic models for API requests/responses + +class AnalystPersonaRequest(BaseModel): + """Request model for analyst persona definition.""" + name: str = Field(..., description="Display name for the analyst") + focus: List[str] = Field(default_factory=list, description="List of focus areas") + description: str = Field(..., description="Detailed description") + preferred_tools: Optional[List[str]] = Field(None, description="Preferred tool categories") + icon: Optional[str] = Field(None, description="Icon identifier") + + +class CreateAnalystRequest(BaseModel): + """Request model for creating a new analyst.""" + agent_id: str = Field(..., description="Unique identifier for the new analyst") + analyst_type: str = Field(..., description="Base type or custom identifier") + persona: Optional[AnalystPersonaRequest] = Field(None, description="Custom persona definition") + soul_md: Optional[str] = Field(None, description="Custom SOUL.md content") + agents_md: Optional[str] = Field(None, description="Custom AGENTS.md content") + profile_md: Optional[str] = Field(None, description="Custom PROFILE.md content") + bootstrap_md: Optional[str] = Field(None, description="Custom BOOTSTRAP.md content") + model_name: Optional[str] = Field(None, description="Override default LLM model") + skills: Optional[List[str]] = Field(None, description="List of skill IDs to enable") + tags: Optional[List[str]] = Field(None, description="Classification tags") + + +class CloneAnalystRequest(BaseModel): + """Request model for cloning an analyst.""" + source_id: str = Field(..., description="ID of the analyst to clone") + new_id: str = Field(..., description="Unique identifier for the new analyst") + name: Optional[str] = Field(None, description="New display name") + focus_additions: Optional[List[str]] = Field(None, description="Additional focus areas") + description_override: Optional[str] = Field(None, description="New description") + model_name: Optional[str] = Field(None, description="Override model from source") + + +class RegisterTypeRequest(BaseModel): + """Request model for registering a new analyst type.""" + type_id: str = Field(..., description="Unique identifier for this type") + name: str = Field(..., description="Display name") + focus: List[str] = Field(..., description="List of focus areas") + description: str = Field(..., description="Detailed description") + preferred_tools: Optional[List[str]] = Field(None, description="Preferred tool categories") + + +class AnalystResponse(BaseModel): + """Response model for analyst operations.""" + success: bool + agent_id: Optional[str] = None + message: str + error: Optional[str] = None + + +class AnalystTypeResponse(BaseModel): + """Response model for analyst type information.""" + type_id: str + name: str + description: str + is_builtin: bool + source: str + + +class AnalystInfoResponse(BaseModel): + """Response model for detailed analyst information.""" + found: bool + agent_id: str + config: Optional[Dict[str, Any]] = None + is_custom: bool = False + is_clone: bool = False + parent_id: Optional[str] = None + message: Optional[str] = None + + +class TeamSummaryResponse(BaseModel): + """Response model for team summary.""" + total_analysts: int + custom_analysts: int + cloned_analysts: int + analysts: List[Dict[str, Any]] + registered_types: int + + +# Helper function to get the current pipeline instance + +def _get_pipeline(run_id: str) -> Optional[Any]: + """Get the TradingPipeline instance for a run. + + Args: + run_id: The run configuration ID + + Returns: + TradingPipeline instance or None if not found + """ + # Import here to avoid circular imports + try: + from backend.apps.runtime_service import get_runtime_state + runtime_state = get_runtime_state() + if runtime_state and hasattr(runtime_state, 'pipeline'): + return runtime_state.pipeline + except Exception as e: + logger.warning(f"Could not get pipeline for run {run_id}: {e}") + return None + + +def _get_controller(run_id: str) -> Optional[Any]: + """Get the DynamicTeamController for a run. + + Args: + run_id: The run configuration ID + + Returns: + DynamicTeamController instance or None if not available + """ + try: + from backend.tools.dynamic_team_tools import get_controller + return get_controller() + except Exception as e: + logger.warning(f"Could not get controller for run {run_id}: {e}") + return None + + +# API Endpoints + +@router.get("/types", response_model=List[AnalystTypeResponse]) +async def list_analyst_types() -> List[AnalystTypeResponse]: + """List all available analyst types. + + Returns both built-in types (from ANALYST_TYPES) and runtime-registered types. + """ + result = [] + + # Add built-in types + for type_id, info in ANALYST_TYPES.items(): + result.append(AnalystTypeResponse( + type_id=type_id, + name=info.get("display_name", type_id), + description=info.get("description", ""), + is_builtin=True, + source="constants", + )) + + # Try to get runtime registered types + controller = _get_controller("default") + if controller: + for type_id, persona in controller._registered_types.items(): + result.append(AnalystTypeResponse( + type_id=type_id, + name=persona.name, + description=persona.description, + is_builtin=False, + source="runtime", + )) + + return result + + +@router.get("/personas") +async def get_personas() -> Dict[str, Any]: + """Get all analyst personas from personas.yaml. + + Returns the persona definitions used for analyst initialization. + """ + try: + personas = get_prompt_loader().load_yaml_config("analyst", "personas") + return {"success": True, "personas": personas} + except Exception as e: + logger.error(f"Failed to load personas: {e}") + raise HTTPException(status_code=500, detail=f"Failed to load personas: {e}") + + +@router.post("/runs/{run_id}/analysts", response_model=AnalystResponse) +async def create_analyst( + run_id: str, + request: CreateAnalystRequest, +) -> AnalystResponse: + """Create a new analyst in the specified run. + + Args: + run_id: The run configuration ID + request: Analyst creation configuration + + Returns: + Result of the creation operation + """ + controller = _get_controller(run_id) + if not controller: + raise HTTPException( + status_code=503, + detail="Dynamic team controller not available. Is the pipeline running?" + ) + + # Build persona if provided + persona = None + if request.persona: + persona = AnalystPersona( + name=request.persona.name, + focus=request.persona.focus, + description=request.persona.description, + preferred_tools=request.persona.preferred_tools, + icon=request.persona.icon, + ) + + # Build config + config = AnalystConfig( + persona=persona, + analyst_type=request.analyst_type if request.analyst_type in ANALYST_TYPES else None, + soul_md=request.soul_md, + agents_md=request.agents_md, + profile_md=request.profile_md, + bootstrap_md=request.bootstrap_md, + model_name=request.model_name, + skills=request.skills or [], + tags=request.tags or [], + ) + + # Create the analyst + result = controller.create_analyst( + agent_id=request.agent_id, + analyst_type=request.analyst_type, + name=persona.name if persona else None, + focus=persona.focus if persona else None, + description=persona.description if persona else None, + soul_md=config.soul_md, + agents_md=config.agents_md, + model_name=config.model_name, + ) + + return AnalystResponse(**result) + + +@router.post("/runs/{run_id}/analysts/clone", response_model=AnalystResponse) +async def clone_analyst( + run_id: str, + request: CloneAnalystRequest, +) -> AnalystResponse: + """Clone an existing analyst. + + Args: + run_id: The run configuration ID + request: Clone configuration + + Returns: + Result of the clone operation + """ + controller = _get_controller(run_id) + if not controller: + raise HTTPException( + status_code=503, + detail="Dynamic team controller not available. Is the pipeline running?" + ) + + result = controller.clone_analyst( + source_id=request.source_id, + new_id=request.new_id, + name=request.name, + focus_additions=request.focus_additions, + description_override=request.description_override, + model_name=request.model_name, + ) + + return AnalystResponse(**result) + + +@router.delete("/runs/{run_id}/analysts/{agent_id}", response_model=AnalystResponse) +async def remove_analyst(run_id: str, agent_id: str) -> AnalystResponse: + """Remove a dynamically created analyst. + + Args: + run_id: The run configuration ID + agent_id: The analyst to remove + + Returns: + Result of the removal operation + """ + controller = _get_controller(run_id) + if not controller: + raise HTTPException( + status_code=503, + detail="Dynamic team controller not available. Is the pipeline running?" + ) + + result = controller.remove_analyst(agent_id) + return AnalystResponse(**result) + + +@router.get("/runs/{run_id}/analysts/{agent_id}", response_model=AnalystInfoResponse) +async def get_analyst_info(run_id: str, agent_id: str) -> AnalystInfoResponse: + """Get information about a specific analyst. + + Args: + run_id: The run configuration ID + agent_id: The analyst ID + + Returns: + Analyst configuration and status + """ + controller = _get_controller(run_id) + if not controller: + raise HTTPException( + status_code=503, + detail="Dynamic team controller not available. Is the pipeline running?" + ) + + result = controller.get_analyst_info(agent_id) + return AnalystInfoResponse(**result) + + +@router.get("/runs/{run_id}/summary", response_model=TeamSummaryResponse) +async def get_team_summary(run_id: str) -> TeamSummaryResponse: + """Get a summary of the current analyst team. + + Args: + run_id: The run configuration ID + + Returns: + Team composition information + """ + controller = _get_controller(run_id) + if not controller: + raise HTTPException( + status_code=503, + detail="Dynamic team controller not available. Is the pipeline running?" + ) + + result = controller.get_team_summary() + return TeamSummaryResponse(**result) + + +@router.post("/runs/{run_id}/types", response_model=AnalystTypeResponse) +async def register_analyst_type( + run_id: str, + request: RegisterTypeRequest, +) -> AnalystTypeResponse: + """Register a new analyst type. + + Args: + run_id: The run configuration ID + request: Type registration configuration + + Returns: + Registered type information + """ + controller = _get_controller(run_id) + if not controller: + raise HTTPException( + status_code=503, + detail="Dynamic team controller not available. Is the pipeline running?" + ) + + result = controller.register_analyst_type( + type_id=request.type_id, + name=request.name, + focus=request.focus, + description=request.description, + preferred_tools=request.preferred_tools, + ) + + if not result.get("success", False): + raise HTTPException(status_code=400, detail=result.get("message", "Registration failed")) + + return AnalystTypeResponse( + type_id=request.type_id, + name=request.name, + description=request.description, + is_builtin=False, + source="runtime", + ) diff --git a/backend/apps/runtime_service.py b/backend/apps/runtime_service.py index 5dd6e6f..7be8521 100644 --- a/backend/apps/runtime_service.py +++ b/backend/apps/runtime_service.py @@ -5,7 +5,7 @@ from __future__ import annotations from fastapi import FastAPI -from backend.api import runtime_router +from backend.api import runtime_router, dynamic_team_router from backend.api.runtime import get_runtime_state, _check_gateway_health, _get_gateway_process_details from backend.apps.cors import add_cors_middleware @@ -78,6 +78,7 @@ def create_app() -> FastAPI: } app.include_router(runtime_router) + app.include_router(dynamic_team_router) return app diff --git a/backend/config/bootstrap_config.py b/backend/config/bootstrap_config.py index 1f709f1..5e9de46 100644 --- a/backend/config/bootstrap_config.py +++ b/backend/config/bootstrap_config.py @@ -131,6 +131,13 @@ def _coerce_bool(value: Any) -> bool: return bool(value) +def _normalize_schedule_mode(value: Any) -> str: + mode = str(value or "daily").strip().lower() + if mode == "intraday": + return "interval" + return mode or "daily" + + def resolve_runtime_config( project_root: Path, config_name: str, @@ -162,9 +169,9 @@ def resolve_runtime_config( get_env_int("MAX_COMM_CYCLES", 2), ), ), - "schedule_mode": str( + "schedule_mode": _normalize_schedule_mode( bootstrap.get("schedule_mode", schedule_mode), - ).strip().lower() or schedule_mode, + ), "interval_minutes": int( bootstrap.get( "interval_minutes", diff --git a/backend/core/apo.py b/backend/core/apo.py new file mode 100644 index 0000000..34f599a --- /dev/null +++ b/backend/core/apo.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +""" +Autonomous Policy Optimizer (APO) +Automatically tunes agent policies based on performance feedback. +""" + +import logging +import json +import os +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +from agentscope.message import Msg +from backend.llm.models import get_agent_model, get_agent_formatter +from backend.agents.workspace_manager import WorkspaceManager + +logger = logging.getLogger(__name__) + +class PolicyOptimizer: + """ + PolicyOptimizer analyzes trading performance and automatically updates + agent workspace files (POLICY.md, AGENTS.md) to improve future results. + """ + + def __init__(self, config_name: str, project_root: Optional[Path] = None): + self.config_name = config_name + self.workspace_manager = WorkspaceManager(project_root=project_root) + # Use a high-capability model for the optimizer (meta-agent) + self.model = get_agent_model("portfolio_manager") + self.formatter = get_agent_formatter("portfolio_manager") + + async def run_optimization( + self, + date: str, + reflection_content: str, + settlement_result: Dict[str, Any], + analyst_results: List[Dict[str, Any]], + decisions: Dict[str, Dict], + ) -> Dict[str, Any]: + """ + Run the optimization loop if performance indicates a need for change. + """ + total_pnl = settlement_result.get("portfolio_value", 0) - 100000.0 # Assuming 100k initial + # You might want to use a more sophisticated trigger, like 3 consecutive losses + if total_pnl >= 0: + logger.info(f"APO: Positive P&L (${total_pnl:,.2f}) for {date}, skipping optimization.") + return {"status": "skipped", "reason": "positive_pnl"} + + logger.info(f"APO: Negative P&L (${total_pnl:,.2f}) detected for {date}. Starting optimization...") + + # 1. Identify underperforming agents or logic + # 2. Generate policy updates + # 3. Apply updates + + optimizations = [] + + # Focus on agents that gave high confidence but wrong direction + underperformers = self._identify_underperformers(settlement_result, analyst_results) + + for agent_id in underperformers: + update = await self._generate_policy_update( + agent_id, + date, + reflection_content, + settlement_result, + analyst_results, + decisions + ) + if update: + self._apply_update(agent_id, update) + optimizations.append({ + "agent_id": agent_id, + "file": update.get("file", "POLICY.md"), + "change": update.get("change", "") + }) + + return { + "status": "completed", + "date": date, + "total_pnl": total_pnl, + "optimizations": optimizations + } + + def _identify_underperformers( + self, + settlement_result: Dict[str, Any], + analyst_results: List[Dict[str, Any]] + ) -> List[str]: + """Identify which agents might need policy adjustments.""" + underperformers = [] + + # Simple logic: if the overall day was a loss, all active analysts might need a check, + # but specifically those whose predictions didn't match the market. + # For now, let's include all analysts involved in the day. + for result in analyst_results: + agent_id = result.get("agent") + if agent_id: + underperformers.append(agent_id) + + # Also include PM and Risk Manager as they are critical + underperformers.append("portfolio_manager") + underperformers.append("risk_manager") + + return list(set(underperformers)) + + async def _generate_policy_update( + self, + agent_id: str, + date: str, + reflection_content: str, + settlement_result: Dict[str, Any], + analyst_results: List[Dict[str, Any]], + decisions: Dict[str, Dict], + ) -> Optional[Dict[str, str]]: + """Use LLM to generate a specific policy update for an agent.""" + + # Load current policy + try: + current_policy = self.workspace_manager.load_agent_file( + config_name=self.config_name, + agent_id=agent_id, + filename="POLICY.md" + ) + except Exception: + current_policy = "No existing policy found." + + prompt = f""" +As an Expert Meta-Optimizer for a multi-agent trading system, your task is to update the operational POLICY for an agent named '{agent_id}' based on recent performance failures. + +[Current Context] +Date: {date} +Daily Reflection: +{reflection_content} + +[Agent's Current POLICY.md] +{current_policy} + +[Task] +Analyze why the system failed (loss occurred). Identify what '{agent_id}' could have done differently or what new constraint/heuristic should be added to its policy to prevent similar mistakes in the future. + +Provide a specific, concise addition or modification to the POLICY.md file. +The output MUST be a JSON object with: +1. "reasoning": Brief explanation of why this change is needed. +2. "file": Always "POLICY.md". +3. "change": The EXACT markdown text to APPEND or REPLACE in the file. Keep it in Chinese as the system uses Chinese prompts. + +Output ONLY the JSON object. +""" + msg = Msg(name="system", content=prompt, role="user") + response = await self.model.reply(msg) + + content = response.content + if isinstance(content, list): + content = content[0].get("text", "") + + # Clean JSON if wrapped in markdown + if "```json" in content: + content = content.split("```json")[1].split("```")[0].strip() + + try: + return json.loads(content) + except Exception as e: + logger.error(f"APO: Failed to parse optimization response for {agent_id}: {e}") + return None + + def _apply_update(self, agent_id: str, update: Dict[str, str]) -> None: + """Apply the suggested update to the agent's workspace.""" + filename = update.get("file", "POLICY.md") + change = update.get("change", "") + + if not change: + return + + try: + current_content = self.workspace_manager.load_agent_file( + config_name=self.config_name, + agent_id=agent_id, + filename=filename + ) + + # Check if change is already there to avoid duplicates + if change.strip() in current_content: + logger.info(f"APO: Change already present in {agent_id}/{filename}") + return + + new_content = current_content + "\n\n### APO Update (" + datetime.now().strftime("%Y-%m-%d") + ")\n" + change + + self.workspace_manager.update_agent_file( + config_name=self.config_name, + agent_id=agent_id, + filename=filename, + content=new_content + ) + logger.info(f"APO: Updated {agent_id}/{filename} with new heuristics.") + except Exception as e: + logger.error(f"APO: Failed to apply update to {agent_id}/{filename}: {e}") diff --git a/backend/core/pipeline.py b/backend/core/pipeline.py index 42530f5..2e7827f 100644 --- a/backend/core/pipeline.py +++ b/backend/core/pipeline.py @@ -33,6 +33,8 @@ from backend.agents.workspace_manager import WorkspaceManager from backend.agents.prompt_loader import get_prompt_loader from backend.llm.models import get_agent_formatter, get_agent_model from backend.config.constants import ANALYST_TYPES +from backend.agents.dynamic_team_types import AnalystConfig +from backend.tools.dynamic_team_tools import DynamicTeamController, set_controller def _resolve_evo_agent_ids() -> set[str]: @@ -84,6 +86,9 @@ def _log(msg: str) -> None: logger.info(msg) +from backend.core.apo import PolicyOptimizer + + class TradingPipeline: """ Trading Pipeline - Orchestrates the complete trading cycle @@ -127,7 +132,21 @@ class TradingPipeline: self.runtime_manager = runtime_manager self._session_key: Optional[str] = None self._dynamic_analysts: Dict[str, Any] = {} + self._dynamic_analyst_configs: Dict[str, AnalystConfig] = {} + # Initialize APO (Autonomous Policy Optimizer) + config_name = workspace_id or (runtime_manager.config_name if runtime_manager else "default") + self.apo = PolicyOptimizer(config_name=config_name) + + # Initialize dynamic team controller and inject into PM + self._team_controller = DynamicTeamController( + create_callback=self._create_runtime_analyst, + remove_callback=self._remove_runtime_analyst, + get_analysts_callback=self._all_analysts, + ) + set_controller(self._team_controller) + + # Backward compatibility: also set individual callbacks if PM expects them if hasattr(self.pm, "set_team_controller"): self.pm.set_team_controller( create_agent_callback=self._create_runtime_analyst, @@ -150,23 +169,7 @@ class TradingPipeline: execute_decisions: bool = True, ) -> Dict[str, Any]: """ - Run one complete trading cycle - - Args: - tickers: List of stock tickers - date: Trading date (YYYY-MM-DD) - prices: Open prices {ticker: price} (for backtest) - close_prices: Close prices for settlement (for backtest) - market_caps: Optional market caps for baseline calculation - get_open_prices_fn: Async callback to wait for open prices (live mode) - get_close_prices_fn: Async callback to wait for close prices (live mode) - - For live mode: - - Analysis runs immediately - - Execution waits for market open via get_open_prices_fn - - Settlement waits for market close via get_close_prices_fn - - Each agent's result is broadcast immediately via StateSync. + Run one complete trading cycle with checkpointing support. """ _log(f"Starting cycle {date} - {len(tickers)} tickers") session_key = TradingSessionKey(date=date).key() @@ -176,14 +179,45 @@ class TradingPipeline: agents=active_analysts + [self.risk_manager, self.pm], session_key=session_key, ) + + # Load checkpoint if exists + checkpoint = self._load_checkpoint(session_key) + checkpoint_data = checkpoint.get("data", {}) if checkpoint else {} + last_phase = checkpoint.get("phase") if checkpoint else None + + if checkpoint: + _log(f"Resuming from checkpoint: {last_phase}") + # Restore state from checkpoint + analyst_results = checkpoint_data.get("analyst_results", []) + risk_assessment = checkpoint_data.get("risk_assessment", {}) + self.conference_summary = checkpoint_data.get("conference_summary") + final_predictions = checkpoint_data.get("final_predictions", []) + pm_result = checkpoint_data.get("pm_result", {}) + execution_result = checkpoint_data.get("execution_result", {}) + settlement_result = checkpoint_data.get("settlement_result") + # Prefer passed prices if not hold in checkpoint + if not prices: + prices = checkpoint_data.get("prices") + if not close_prices: + close_prices = checkpoint_data.get("close_prices") + else: + analyst_results = [] + risk_assessment = {} + self.conference_summary = None + final_predictions = [] + pm_result = {} + execution_result = {} + settlement_result = None + if self.runtime_manager: self.runtime_manager.set_session_key(session_key) - self._runtime_log_event("cycle:start", {"tickers": tickers, "date": date}) + self._runtime_log_event("cycle:start", {"tickers": tickers, "date": date, "resumed": checkpoint is not None}) self._runtime_batch_status(active_analysts, "analysis_in_progress") - # Phase 0: Clear short-term memory to avoid cross-day context pollution - _log("Phase 0: Clearing memory") - await self._clear_all_agent_memory() + # Phase 0: Clear memory (only if not resuming or if resuming from very start) + if not last_phase: + _log("Phase 0: Clearing memory") + await self._clear_all_agent_memory() participants = self._all_analysts() + [self.risk_manager, self.pm] @@ -196,125 +230,219 @@ class TradingPipeline: "system", ), ): - # Phase 1.1: Analysts (parallel execution with TeamCoordinator) - _log("Phase 1.1: Analyst analysis (parallel)") - analyst_results = await self._run_analysts_parallel( - tickers, - date, - active_analysts=active_analysts, - ) + # Phase 1.1: Analysts + if not last_phase or last_phase == "cleared": + _log("Phase 1.1: Analyst analysis (parallel)") + analyst_results = await self._run_analysts_parallel( + tickers, + date, + active_analysts=active_analysts, + ) + self._save_checkpoint(session_key, "analysis", { + "analyst_results": analyst_results, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "analysis" # Phase 1.2: Risk Manager - _log("Phase 1.2: Risk assessment") - self._runtime_update_status(self.risk_manager, "risk_assessment") - risk_assessment = await self._run_risk_manager_with_sync( - tickers, - date, - prices, - ) + if last_phase == "analysis": + _log("Phase 1.2: Risk assessment") + self._runtime_update_status(self.risk_manager, "risk_assessment") + risk_assessment = await self._run_risk_manager_with_sync( + tickers, + date, + prices, + ) + self._save_checkpoint(session_key, "risk_assessment", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "risk_assessment" - # Phase 2.1: Conference discussion (within same MsgHub) - _log("Phase 2.1: Conference discussion") - conference_summary = await self._run_conference_cycles( - tickers=tickers, - date=date, - prices=prices, - analyst_results=analyst_results, - risk_assessment=risk_assessment, - ) - self.conference_summary = conference_summary + # Phase 2.1: Conference discussion + if last_phase == "risk_assessment": + _log("Phase 2.1: Conference discussion") + conference_summary = await self._run_conference_cycles( + tickers=tickers, + date=date, + prices=prices, + analyst_results=analyst_results, + risk_assessment=risk_assessment, + ) + self.conference_summary = conference_summary + self._save_checkpoint(session_key, "conference", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "conference" # Phase 2.2: Analysts generate final structured predictions - _log("Phase 2.2: Analysts generate final structured predictions") - final_predictions = await self._collect_final_predictions( - tickers, - date, - active_analysts=active_analysts, - ) + if last_phase == "conference": + _log("Phase 2.2: Analysts generate final structured predictions") + final_predictions = await self._collect_final_predictions( + tickers, + date, + active_analysts=active_analysts, + ) + self._save_checkpoint(session_key, "predictions", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "final_predictions": final_predictions, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "predictions" - # Record final predictions for leaderboard ranking - if self.settlement_coordinator: + # Record final predictions + if last_phase == "predictions" and self.settlement_coordinator: self.settlement_coordinator.record_analyst_predictions( final_predictions, ) - # Live mode: wait for market open before execution - if get_open_prices_fn: + # Live mode: wait for market open + if not prices and get_open_prices_fn: _log("Waiting for market open...") prices = await get_open_prices_fn() _log(f"Got open prices: {prices}") + # Update prices in checkpoint if we just got them + self._save_checkpoint(session_key, "predictions", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "final_predictions": final_predictions, + "prices": prices, + "close_prices": close_prices + }) # Phase 3: PM makes decisions - _log("Phase 3.1: PM makes decisions") - self._runtime_update_status(self.pm, "decision_phase") - pm_result = await self._run_pm_with_sync( - tickers, - date, - prices, - analyst_results, - risk_assessment, - ) + if last_phase == "predictions": + _log("Phase 3.1: PM makes decisions") + self._runtime_update_status(self.pm, "decision_phase") + pm_result = await self._run_pm_with_sync( + tickers, + date, + prices, + analyst_results, + risk_assessment, + ) + self._save_checkpoint(session_key, "decisions", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "final_predictions": final_predictions, + "pm_result": pm_result, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "decisions" - decisions = pm_result.get("decisions", {}) - execution_result = { - "executed_trades": [], - "portfolio": self.pm.get_portfolio_state(), - } - if execute_decisions: - _log("Phase 4: Executing trades") - self._runtime_update_status(self.pm, "executing") - execution_result = self._execute_decisions(decisions, prices, date) - else: - _log("Phase 4: Skipping trade execution") + # Outside MsgHub for execution and settlement + decisions = pm_result.get("decisions", {}) if pm_result else {} + if not execution_result: + execution_result = { + "executed_trades": [], + "portfolio": self.pm.get_portfolio_state(), + } - # Live mode: wait for market close before settlement - if get_close_prices_fn: + if last_phase == "decisions": + if execute_decisions: + _log("Phase 4: Executing trades") + self._runtime_update_status(self.pm, "executing") + execution_result = self._execute_decisions(decisions, prices, date) + else: + _log("Phase 4: Skipping trade execution") + + self._save_checkpoint(session_key, "execution", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "final_predictions": final_predictions, + "pm_result": pm_result, + "execution_result": execution_result, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "execution" + + # Live mode: wait for market close + if not close_prices and get_close_prices_fn: _log("Waiting for market close") close_prices = await get_close_prices_fn() _log(f"Got close prices: {close_prices}") + # Update close_prices in checkpoint + self._save_checkpoint(session_key, "execution", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "final_predictions": final_predictions, + "pm_result": pm_result, + "execution_result": execution_result, + "prices": prices, + "close_prices": close_prices + }) - # Phase 5: Settlement - run after close prices available - settlement_result = None - if close_prices and self.settlement_coordinator: - _log("Phase 5: Daily review and generate memories") - self._runtime_batch_status( - [self.risk_manager] + self._all_analysts() + [self.pm], - "settlement", - ) + # Phase 5: Settlement + if last_phase == "execution": + if close_prices and self.settlement_coordinator: + _log("Phase 5: Daily review and generate memories") + self._runtime_batch_status( + [self.risk_manager] + self._all_analysts() + [self.pm], + "settlement", + ) - agent_trajectories = await self._capture_agent_trajectories() + agent_trajectories = await self._capture_agent_trajectories() - if market_caps is None: - market_caps = {ticker: 1e9 for ticker in tickers} + if market_caps is None: + market_caps = {ticker: 1e9 for ticker in tickers} - settlement_result = ( - self.settlement_coordinator.run_daily_settlement( + settlement_result = ( + self.settlement_coordinator.run_daily_settlement( + date=date, + tickers=tickers, + open_prices=prices, + close_prices=close_prices, + market_caps=market_caps, + agent_portfolio=execution_result.get("portfolio", {}), + analyst_results=analyst_results, + pm_decisions=decisions, + ) + ) + + await self._run_reflection( date=date, - tickers=tickers, + agent_trajectories=agent_trajectories, + analyst_results=analyst_results, + decisions=decisions, + executed_trades=execution_result.get("executed_trades", []), open_prices=prices, close_prices=close_prices, - market_caps=market_caps, - agent_portfolio=execution_result.get("portfolio", {}), - analyst_results=analyst_results, - pm_decisions=decisions, + settlement_result=settlement_result, + conference_summary=self.conference_summary, ) - ) - - await self._run_reflection( - date=date, - agent_trajectories=agent_trajectories, - analyst_results=analyst_results, - decisions=decisions, - executed_trades=execution_result.get("executed_trades", []), - open_prices=prices, - close_prices=close_prices, - settlement_result=settlement_result, - conference_summary=self.conference_summary, - ) - self._runtime_batch_status( - [self.risk_manager] + self._all_analysts() + [self.pm], - "reflection", - ) + self._runtime_batch_status( + [self.risk_manager] + self._all_analysts() + [self.pm], + "reflection", + ) + + self._save_checkpoint(session_key, "settlement", { + "analyst_results": analyst_results, + "risk_assessment": risk_assessment, + "conference_summary": conference_summary, + "final_predictions": final_predictions, + "pm_result": pm_result, + "execution_result": execution_result, + "settlement_result": settlement_result, + "prices": prices, + "close_prices": close_prices + }) + last_phase = "settlement" _log(f"Cycle complete: {date}") self._runtime_batch_status( @@ -323,6 +451,11 @@ class TradingPipeline: ) self._runtime_log_event("cycle:end", {"tickers": tickers, "date": date}) + # Optional: Clean up checkpoint after successful completion + # path = self._get_checkpoint_path(session_key) + # if path and path.exists(): + # path.unlink() + return { "analyst_results": analyst_results, "risk_assessment": risk_assessment, @@ -385,6 +518,44 @@ class TradingPipeline: await self.risk_manager.memory.clear() await self.pm.memory.clear() + def _get_checkpoint_path(self, session_key: str) -> Optional[Path]: + """Get the path to the pipeline checkpoint file.""" + if not self.runtime_manager or not self.runtime_manager.run_dir: + return None + checkpoint_dir = self.runtime_manager.run_dir / "state" / "checkpoints" + checkpoint_dir.mkdir(parents=True, exist_ok=True) + return checkpoint_dir / f"pipeline_{session_key}.json" + + def _save_checkpoint(self, session_key: str, phase: str, data: Dict[str, Any]) -> None: + """Save the current pipeline state to a checkpoint file.""" + path = self._get_checkpoint_path(session_key) + if not path: + return + + checkpoint = { + "session_key": session_key, + "phase": phase, + "timestamp": datetime.now().isoformat(), + "data": data + } + try: + path.write_text(json.dumps(checkpoint, ensure_ascii=False, indent=2, default=str), encoding="utf-8") + _log(f"Checkpoint saved: {phase} for {session_key}") + except Exception as e: + logger.error(f"Failed to save checkpoint: {e}") + + def _load_checkpoint(self, session_key: str) -> Optional[Dict[str, Any]]: + """Load the pipeline state from a checkpoint file.""" + path = self._get_checkpoint_path(session_key) + if not path or not path.exists(): + return None + + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception as e: + logger.error(f"Failed to load checkpoint: {e}") + return None + async def _sync_memory_if_retrieved(self, agent: Any) -> None: """ Check agent's short-term memory for retrieved long-term memory and sync to frontend. @@ -585,6 +756,25 @@ class TradingPipeline: content=reflection_content, ) + # Phase 6: APO (Autonomous Policy Optimization) + # If the day was a loss, let APO suggest and apply policy updates. + if hasattr(self, "apo") and self.apo: + _log(f"Phase 6: APO - Running autonomous policy optimization for {date}") + try: + apo_result = await self.apo.run_optimization( + date=date, + reflection_content=reflection_content, + settlement_result=settlement_result or {"portfolio_value": 100000.0 + total_pnl}, + analyst_results=analyst_results, + decisions=decisions + ) + if apo_result.get("status") == "completed": + _log(f"APO: Successfully applied {len(apo_result.get('optimizations', []))} policy updates.") + # Reload assets for next cycle to ensure they are picked up + self.reload_runtime_assets() + except Exception as e: + logger.error(f"APO: Optimization failed: {e}") + def _build_reflection_content( self, date: str, @@ -1562,28 +1752,74 @@ class TradingPipeline: """Return static analysts plus runtime-created analysts.""" return list(self.analysts) + list(self._dynamic_analysts.values()) - def _create_runtime_analyst(self, agent_id: str, analyst_type: str) -> str: - """Create one runtime analyst instance.""" - if analyst_type not in ANALYST_TYPES: + def _create_runtime_analyst( + self, + agent_id: str, + analyst_type: str, + custom_config: Optional[AnalystConfig] = None, + ) -> str: + """Create one runtime analyst instance. + + Args: + agent_id: Unique identifier for the new analyst + analyst_type: Type of analyst (e.g., "technical_analyst") + custom_config: Optional custom configuration for the analyst, + including persona, soul_md, agents_md, etc. + + Returns: + Success or error message + """ + # Validate analyst_type or custom_config + if analyst_type not in ANALYST_TYPES and not custom_config: return ( f"Unknown analyst_type '{analyst_type}'. " - f"Available: {', '.join(ANALYST_TYPES.keys())}" + f"Available: {', '.join(ANALYST_TYPES.keys())}. " + f"Or provide custom_config to create a custom analyst." ) if agent_id in {agent.name for agent in self._all_analysts()}: return f"Analyst '{agent_id}' already exists." config_name = getattr(self.pm, "config", {}).get("config_name", "default") project_root = Path(__file__).resolve().parents[2] - personas = get_prompt_loader().load_yaml_config("analyst", "personas") - persona = personas.get(analyst_type, {}) + + # Get persona: use custom_config if provided, else load from personas.yaml + if custom_config and custom_config.persona: + persona = { + "name": custom_config.persona.name, + "focus": custom_config.persona.focus, + "description": custom_config.persona.description, + } + else: + personas = get_prompt_loader().load_yaml_config("analyst", "personas") + persona = personas.get(analyst_type, {}) workspace_manager = WorkspaceManager(project_root=project_root) + + # Build file contents: use custom if provided, else generate from persona + file_contents = {} + if custom_config: + if custom_config.soul_md: + file_contents["SOUL.md"] = custom_config.soul_md + if custom_config.agents_md: + file_contents["AGENTS.md"] = custom_config.agents_md + if custom_config.profile_md: + file_contents["PROFILE.md"] = custom_config.profile_md + if custom_config.bootstrap_md: + file_contents["BOOTSTRAP.md"] = custom_config.bootstrap_md + + # Fill in any missing files with defaults + if not file_contents or len(file_contents) < 4: + default_files = workspace_manager.build_default_agent_files( + agent_id=agent_id, + persona=persona, + ) + for key, value in default_files.items(): + if key not in file_contents: + file_contents[key] = value + workspace_manager.ensure_agent_assets( config_name=config_name, agent_id=agent_id, - file_contents=workspace_manager.build_default_agent_files( - agent_id=agent_id, - persona=persona, - ), + file_contents=file_contents, ) # Create EvoAgent with workspace-driven configuration @@ -1594,11 +1830,23 @@ class TradingPipeline: agent_id, ) agent_config = load_agent_workspace_config(workspace_dir / "agent.yaml") + # Support model override from custom_config + if custom_config and custom_config.model_name: + # Import create_model for custom model creation + from backend.llm.models import create_model + # Use specified model name, default to openai provider + model = create_model( + model_name=custom_config.model_name, + model_provider=custom_config.memory_config.get("model_provider", "openai") if custom_config.memory_config else "openai" + ) + else: + model = get_agent_model(analyst_type) + agent = EvoAgent( agent_id=agent_id, config_name=config_name, workspace_dir=workspace_dir, - model=get_agent_model(analyst_type), + model=model, formatter=get_agent_formatter(analyst_type), prompt_files=agent_config.prompt_files, ) @@ -1611,6 +1859,11 @@ class TradingPipeline: # Keep workspace_id for backward compatibility setattr(agent, "workspace_id", config_name) self._dynamic_analysts[agent_id] = agent + + # Store custom config for future reference (e.g., cloning) + if custom_config: + self._dynamic_analyst_configs[agent_id] = custom_config + update_active_analysts( project_root=project_root, config_name=config_name, @@ -1624,6 +1877,8 @@ class TradingPipeline: if agent_id not in self._dynamic_analysts: return f"Runtime analyst '{agent_id}' not found." self._dynamic_analysts.pop(agent_id, None) + # Also remove stored config if exists + self._dynamic_analyst_configs.pop(agent_id, None) config_name = getattr(self.pm, "config", {}).get("config_name", "default") project_root = Path(__file__).resolve().parents[2] update_active_analysts( diff --git a/backend/core/scheduler.py b/backend/core/scheduler.py index 9bd2133..89bc905 100644 --- a/backend/core/scheduler.py +++ b/backend/core/scheduler.py @@ -17,6 +17,14 @@ NYSE_TZ = ZoneInfo("America/New_York") NYSE_CALENDAR = mcal.get_calendar("NYSE") +def normalize_schedule_mode(mode: str | None) -> str: + """Normalize schedule mode to the current public vocabulary.""" + value = str(mode or "daily").strip().lower() + if value == "intraday": + return "interval" + return value or "daily" + + class Scheduler: """ Market-aware scheduler for live trading. @@ -31,7 +39,7 @@ class Scheduler: heartbeat_interval: Optional[int] = None, config: Optional[dict] = None, ): - self.mode = mode + self.mode = normalize_schedule_mode(mode) self.trigger_time = trigger_time or "09:30" # NYSE timezone self.trigger_now = self.trigger_time == "now" self.interval_minutes = interval_minutes or 60 @@ -107,7 +115,7 @@ class Scheduler: if self.mode == "daily": self._task = asyncio.create_task(self._run_daily(self._callback)) - elif self.mode == "intraday": + elif self.mode == "interval": self._task = asyncio.create_task( self._run_intraday(self._callback), ) @@ -124,8 +132,13 @@ class Scheduler: """Update scheduler parameters in-place and restart its timing loop.""" changed = False - if mode and mode != self.mode: - self.mode = mode + if mode: + normalized_mode = normalize_schedule_mode(mode) + else: + normalized_mode = None + + if normalized_mode and normalized_mode != self.mode: + self.mode = normalized_mode changed = True if trigger_time and trigger_time != self.trigger_time: @@ -233,13 +246,13 @@ class Scheduler: await callback(date=current_date) async def _run_intraday(self, callback: Callable): - """Run every N minutes (for future use)""" + """Run every N minutes in interval mode.""" while self.running: now = self._now_nyse() current_date = now.strftime("%Y-%m-%d") if self._is_trading_day(now): - logger.info(f"Triggering intraday cycle for {current_date}") + logger.info(f"Triggering interval cycle for {current_date}") await callback(date=current_date) await asyncio.sleep(self.interval_minutes * 60) diff --git a/backend/services/gateway_admin_handlers.py b/backend/services/gateway_admin_handlers.py index 28659bc..6f75634 100644 --- a/backend/services/gateway_admin_handlers.py +++ b/backend/services/gateway_admin_handlers.py @@ -25,6 +25,13 @@ from backend.config.bootstrap_config import ( from backend.llm.models import get_agent_model_info +def _normalize_schedule_mode(value: Any) -> str: + mode = str(value or "daily").strip().lower() + if mode == "intraday": + return "interval" + return mode or "daily" + + async def handle_reload_runtime_assets(gateway: Any) -> None: config_name = gateway.config.get("config_name", "default") runtime_config = resolve_runtime_config( @@ -44,10 +51,10 @@ async def handle_reload_runtime_assets(gateway: Any) -> None: async def handle_update_runtime_config(gateway: Any, websocket: Any, data: dict[str, Any]) -> None: updates: dict[str, Any] = {} - schedule_mode = str(data.get("schedule_mode", "")).strip().lower() + schedule_mode = _normalize_schedule_mode(data.get("schedule_mode", "")) if schedule_mode: - if schedule_mode not in {"daily", "intraday"}: - await websocket.send(json.dumps({"type": "error", "message": "schedule_mode must be 'daily' or 'intraday'."}, ensure_ascii=False)) + if schedule_mode not in {"daily", "interval"}: + await websocket.send(json.dumps({"type": "error", "message": "schedule_mode must be 'daily' or 'interval'."}, ensure_ascii=False)) return updates["schedule_mode"] = schedule_mode diff --git a/backend/services/gateway_cycle_support.py b/backend/services/gateway_cycle_support.py index b69aadf..4b1dbf9 100644 --- a/backend/services/gateway_cycle_support.py +++ b/backend/services/gateway_cycle_support.py @@ -208,7 +208,7 @@ async def run_live_cycle(gateway: Any, date: str, tickers: list[str]) -> None: market_status = gateway.market_service.get_market_status() current_prices = gateway.market_service.get_all_prices() - if schedule_mode == "intraday": + if schedule_mode in {"interval", "intraday"}: execute_decisions = market_status.get("status") == "open" if execute_decisions: await gateway.state_sync.on_system_message("定时任务触发:当前处于交易时段,本轮将执行交易决策") diff --git a/backend/services/gateway_runtime_support.py b/backend/services/gateway_runtime_support.py index f288a2d..a5ffff4 100644 --- a/backend/services/gateway_runtime_support.py +++ b/backend/services/gateway_runtime_support.py @@ -8,6 +8,13 @@ from typing import Any from backend.data.provider_utils import normalize_symbol +def _normalize_schedule_mode(value: Any) -> str: + mode = str(value or "daily").strip().lower() + if mode == "intraday": + return "interval" + return mode or "daily" + + def normalize_watchlist(raw_tickers: Any) -> list[str]: """Parse watchlist payloads from websocket messages.""" if raw_tickers is None: @@ -51,9 +58,11 @@ def apply_runtime_config(gateway: Any, runtime_config: dict[str, Any]) -> dict[s gateway.pipeline.max_comm_cycles = int(runtime_config["max_comm_cycles"]) gateway.config["max_comm_cycles"] = gateway.pipeline.max_comm_cycles - gateway.config["schedule_mode"] = runtime_config.get( - "schedule_mode", - gateway.config.get("schedule_mode", "daily"), + gateway.config["schedule_mode"] = _normalize_schedule_mode( + runtime_config.get( + "schedule_mode", + gateway.config.get("schedule_mode", "daily"), + ), ) gateway.config["interval_minutes"] = int( runtime_config.get( diff --git a/backend/skills/builtin/dynamic_team_management/SKILL.md b/backend/skills/builtin/dynamic_team_management/SKILL.md new file mode 100644 index 0000000..511083f --- /dev/null +++ b/backend/skills/builtin/dynamic_team_management/SKILL.md @@ -0,0 +1,189 @@ +--- +name: dynamic_team_management +description: 动态管理团队中的分析师Agent,包括创建、克隆、移除分析师,以及查看可用分析师类型。 +version: 1.0.0 +tools: + - create_analyst + - clone_analyst + - remove_analyst + - list_analyst_types + - get_analyst_info + - get_team_summary +--- + +# 动态团队管理 + +当你需要调整分析师团队组成时,使用这个技能。投资经理可以动态创建新的分析师、克隆现有分析师进行定制、或移除不再需要的分析师。 + +## 1) When to use + +- 当前团队缺乏特定领域的分析能力(如期权、加密货币、ESG等) +- 需要多个相同类型但不同配置的分析师(如"激进型技术分析师"和"保守型技术分析师") +- 需要临时增加分析力量应对特殊市场环境 +- 发现某个分析师配置不当,需要移除并重建 +- 在团队讨论中发现需要新的分析视角 + +## 2) Required inputs + +### 创建分析师 (create_analyst) +- **agent_id**: 唯一标识符(如 "options_specialist_01") +- **analyst_type**: 基础类型(如 "technical_analyst")或自定义标识 +- **可选**: name, focus, description, soul_md, agents_md, model_name + +### 克隆分析师 (clone_analyst) +- **source_id**: 源分析师ID(如 "technical_analyst") +- **new_id**: 新分析师ID(如 "crypto_technical_01") +- **可选**: name, focus_additions, description_override, model_name + +### 移除分析师 (remove_analyst) +- **agent_id**: 要移除的分析师ID + +## 3) Decision procedure + +1. **评估当前团队能力缺口** + - 查看当前活跃的分析师列表 + - 识别缺失的分析视角或专业领域 + +2. **选择创建策略** + - 基于现有类型创建:指定analyst_type,提供自定义配置 + - 完全自定义:提供完整的persona定义 + - 克隆并修改:从现有分析师复制并应用覆盖 + +3. **配置分析师** + - 设置唯一agent_id + - 定义显示名称和关注点 + - 可选:提供自定义SOUL.md内容以精确定义行为 + +4. **验证创建结果** + - 检查返回的success状态 + - 确认新分析师已加入活跃列表 + +## 4) Tool call policy + +- **create_analyst**: 用于创建全新的分析师实例 + - 必须提供唯一的agent_id + - 基于预定义类型时,analyst_type必须在可用类型列表中,或提供完整自定义配置 + - 工具调用失败时,检查agent_id是否已存在 + +- **clone_analyst**: 用于基于现有分析师创建变体 + - 适用于:创建专注于特定行业的分析师(如从technical_analyst克隆crypto_technical) + - 新实例继承源配置,应用指定的覆盖 + +- **remove_analyst**: 用于移除动态创建的分析师 + - 只能移除通过本技能创建的分析师 + - 系统预定义分析师(fundamentals_analyst等)不可移除 + +- **list_analyst_types**: 用于查看所有可用分析师类型 + - 返回预定义类型 + 运行时注册类型 + +- **get_analyst_info**: 用于查看特定分析师的详细配置 + +- **get_team_summary**: 用于查看团队整体构成 + +## 5) Output schema + +### create_analyst / clone_analyst 输出 +```json +{ + "success": true, + "agent_id": "options_specialist_01", + "message": "Created runtime analyst 'options_specialist_01' (technical_analyst).", + "analyst_type": "technical_analyst" +} +``` + +### remove_analyst 输出 +```json +{ + "success": true, + "agent_id": "options_specialist_01", + "message": "Removed runtime analyst 'options_specialist_01'." +} +``` + +### list_analyst_types 输出 +```json +[ + { + "type_id": "fundamentals_analyst", + "name": "Fundamentals Analyst", + "description": "...", + "is_builtin": true, + "source": "constants" + } +] +``` + +## 6) Failure fallback + +- **agent_id已存在**: 返回错误,提示选择新的agent_id或使用clone_analyst基于现有创建变体 +- **analyst_type未知**: 提示使用list_analyst_types查看可用类型,或提供完整的自定义persona +- **创建失败**: 检查系统日志,可能原因包括:模型配置错误、工作空间权限问题 +- **移除失败**: 确认分析师是通过动态创建(系统预定义分析师不可移除) + +## 重要约定 + +### Agent ID 命名规则 + +为了使新创建的分析师能够正常工作,**agent_id 必须以 `_analyst` 结尾**。这是系统识别分析师类型并分配相应工具的关键约定。 + +- ✅ **正确**: `options_specialist_analyst`, `crypto_technical_analyst` +- ❌ **错误**: `options_specialist`, `crypto_expert` + +如果不遵循此约定,分析师将无法获得分析工具组(基本面、技术、情绪、估值等工具)。 + +### 全新自定义类型 vs 基于现有类型 + +**基于现有类型**(推荐用于快速创建): +- 使用 `analyst_type: "technical_analyst"` 等预定义类型 +- 可以覆盖 persona、SOUL.md 等配置 +- 工具组根据 `analyst_type` 自动选择 + +**全新自定义类型**(用于完全自定义): +- 设置 `analyst_type` 为自定义标识(如 `"custom"`)或任意字符串 +- 必须提供完整的 `persona` 定义 +- 建议提供 `soul_md` 精确定义行为 +- **agent_id 必须仍然以 `_analyst` 结尾** + +## 最佳实践 + +1. **命名约定**: 使用描述性agent_id,如 `industry_tech_analyst` 而非 `analyst_01`,**必须以 `_analyst` 结尾** +2. **版本控制**: 克隆分析师时,在new_id中包含版本信息,如 `technical_v2_crypto_analyst` +3. **文档记录**: 创建自定义分析师时,提供详细的description,便于后续理解和维护 +4. **资源管理**: 定期使用get_team_summary检查团队规模,移除不再需要的分析师 + +## 示例场景 + +### 场景1: 添加加密货币分析师 +``` +创建一个新的分析师,专注于加密货币技术分析: +- agent_id: "crypto_technical_01" +- analyst_type: "technical_analyst" +- name: "加密货币技术分析师" +- focus: ["链上数据分析", "DeFi协议", "加密货币技术指标"] +``` + +### 场景2: 克隆并定制 +``` +基于technical_analyst创建一个更激进的版本: +- source_id: "technical_analyst" +- new_id: "technical_aggressive_01" +- name: "激进技术分析师" +- focus_additions: ["高波动交易", "突破策略"] +- description_override: "专注于高风险高回报的技术策略..." +``` + +### 场景3: 创建全新自定义类型(期权专家) +``` +创建一个完全自定义的期权分析师(注意agent_id以_analyst结尾): +- agent_id: "options_strategist_analyst" +- analyst_type: "custom" # 使用非预定义类型 +- name: "期权策略分析师" +- focus: ["期权定价", "希腊字母", "波动率曲面"] +- soul_md: "# 角色定义\n你是期权策略专家,专注于..." +``` + +**说明**: +- 即使 `analyst_type` 是 "custom"(不在预定义类型中),只要提供完整的 `persona` 和 `soul_md`,系统就能创建功能完整的分析师 +- `agent_id` 必须以 `_analyst` 结尾才能获得分析工具 +- 模型使用全局默认,或通过 `model_name` 参数指定 diff --git a/backend/tests/test_gateway_support_modules.py b/backend/tests/test_gateway_support_modules.py index fe08d7d..fc5cb43 100644 --- a/backend/tests/test_gateway_support_modules.py +++ b/backend/tests/test_gateway_support_modules.py @@ -159,11 +159,11 @@ def test_apply_runtime_config_updates_gateway_state(): ) assert gateway.config["tickers"] == ["MSFT", "NVDA"] - assert gateway.config["schedule_mode"] == "intraday" + assert gateway.config["schedule_mode"] == "interval" assert gateway.storage.initial_cash == 150000.0 assert result["runtime_config_applied"]["max_comm_cycles"] == 4 assert gateway.scheduler.calls[-1] == { - "mode": "intraday", + "mode": "interval", "trigger_time": "10:30", "interval_minutes": 30, } diff --git a/backend/tests/test_runtime_service_app.py b/backend/tests/test_runtime_service_app.py index f67f60d..dcb3868 100644 --- a/backend/tests/test_runtime_service_app.py +++ b/backend/tests/test_runtime_service_app.py @@ -86,7 +86,7 @@ def test_runtime_service_get_runtime_config(monkeypatch, tmp_path): "---\n" "tickers:\n" " - AAPL\n" - "schedule_mode: intraday\n" + "schedule_mode: interval\n" "interval_minutes: 30\n" "trigger_time: '10:00'\n" "max_comm_cycles: 3\n" @@ -102,7 +102,7 @@ def test_runtime_service_get_runtime_config(monkeypatch, tmp_path): "run_dir": str(run_dir), "bootstrap_values": { "tickers": ["AAPL"], - "schedule_mode": "intraday", + "schedule_mode": "interval", "interval_minutes": 30, "trigger_time": "10:00", "max_comm_cycles": 3, @@ -123,7 +123,7 @@ def test_runtime_service_get_runtime_config(monkeypatch, tmp_path): assert response.status_code == 200 payload = response.json() assert payload["run_id"] == "demo" - assert payload["bootstrap"]["schedule_mode"] == "intraday" + assert payload["bootstrap"]["schedule_mode"] == "interval" assert payload["resolved"]["interval_minutes"] == 30 assert payload["resolved"]["enable_memory"] is True @@ -190,7 +190,7 @@ def test_runtime_service_update_runtime_config_persists_bootstrap(monkeypatch, t response = client.put( "/api/runtime/config", json={ - "schedule_mode": "intraday", + "schedule_mode": "interval", "interval_minutes": 15, "trigger_time": "10:15", "max_comm_cycles": 4, @@ -199,7 +199,7 @@ def test_runtime_service_update_runtime_config_persists_bootstrap(monkeypatch, t assert response.status_code == 200 payload = response.json() - assert payload["bootstrap"]["schedule_mode"] == "intraday" + assert payload["bootstrap"]["schedule_mode"] == "interval" assert payload["resolved"]["interval_minutes"] == 15 assert "interval_minutes: 15" in (run_dir / "BOOTSTRAP.md").read_text(encoding="utf-8") @@ -547,7 +547,7 @@ def test_start_runtime_restore_reuses_historical_run_id(monkeypatch, tmp_path): "run_dir": str(run_dir), "bootstrap_values": { "tickers": ["AAPL"], - "schedule_mode": "intraday", + "schedule_mode": "interval", "interval_minutes": 30, "trigger_time": "now", "max_comm_cycles": 2, diff --git a/backend/tools/dynamic_team_tools.py b/backend/tools/dynamic_team_tools.py new file mode 100644 index 0000000..990f4d8 --- /dev/null +++ b/backend/tools/dynamic_team_tools.py @@ -0,0 +1,518 @@ +# -*- coding: utf-8 -*- +"""Dynamic Team Management Tools - Tools for PM to manage analyst team dynamically. + +This module provides tools for the Portfolio Manager to: +- Create new analysts with custom configuration +- Clone existing analysts with variations +- Remove analysts from the team +- List available analyst types +- Get analyst information + +These tools are registered with the PM's toolkit and enable dynamic team management +as described in the Dynamic Team Architecture. +""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Callable +from dataclasses import asdict + +from backend.agents.dynamic_team_types import ( + AnalystPersona, + AnalystConfig, + CreateAnalystResult, + AnalystTypeInfo, +) +from backend.config.constants import ANALYST_TYPES + + +# Type alias for callbacks set by pipeline +CreateAnalystCallback = Callable[[str, str, Optional[AnalystConfig]], str] +RemoveAnalystCallback = Callable[[str], str] + + +class DynamicTeamController: + """Controller for dynamic analyst team management. + + This class is instantiated by TradingPipeline and injected into the PM agent + via set_team_controller(). It provides methods that the PM can call through + tools to manage the analyst team dynamically. + + Attributes: + create_callback: Callback to _create_runtime_analyst in pipeline + remove_callback: Callback to _remove_runtime_analyst in pipeline + get_analysts_callback: Callback to get current analysts list + registered_types: Runtime-registered custom analyst types + """ + + def __init__( + self, + create_callback: CreateAnalystCallback, + remove_callback: RemoveAnalystCallback, + get_analysts_callback: Optional[Callable[[], List[Any]]] = None, + ): + """Initialize the controller with callbacks from pipeline. + + Args: + create_callback: Function to create a runtime analyst + remove_callback: Function to remove a runtime analyst + get_analysts_callback: Optional function to get current analysts + """ + self._create_callback = create_callback + self._remove_callback = remove_callback + self._get_analysts_callback = get_analysts_callback + self._registered_types: Dict[str, AnalystPersona] = {} + self._instance_configs: Dict[str, AnalystConfig] = {} + + def create_analyst( + self, + agent_id: str, + analyst_type: str, + name: Optional[str] = None, + focus: Optional[List[str]] = None, + description: Optional[str] = None, + soul_md: Optional[str] = None, + agents_md: Optional[str] = None, + model_name: Optional[str] = None, + preferred_tools: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Create a new analyst with optional custom configuration. + + This tool allows the Portfolio Manager to dynamically create new analysts + during a trading session. The analyst can be based on a predefined type + or fully customized with a unique persona. + + Args: + agent_id: Unique identifier for the new analyst (e.g., "crypto_specialist_01") + analyst_type: Base type (e.g., "technical_analyst") or custom identifier + name: Display name for the analyst (overrides default) + focus: List of focus areas (overrides default) + description: Detailed description (overrides default) + soul_md: Custom SOUL.md content for the analyst's workspace + agents_md: Custom AGENTS.md content + model_name: Override the default LLM model + preferred_tools: List of preferred tool categories + + Returns: + Dict with success status, message, and analyst info + + Example: + >>> result = create_analyst( + ... agent_id="options_specialist", + ... analyst_type="technical_analyst", + ... name="期权策略分析师", + ... focus=["期权定价", "波动率交易"], + ... description="专注于期权市场分析和波动率交易策略...", + ... ) + """ + # Build custom config if any customization is provided + custom_config = None + if any([name, focus, description, soul_md, agents_md, model_name, preferred_tools]): + persona = None + if name or focus or description: + persona = AnalystPersona( + name=name or f"Custom {analyst_type}", + focus=focus or ["General Analysis"], + description=description or f"Custom analyst based on {analyst_type}", + preferred_tools=preferred_tools, + ) + + custom_config = AnalystConfig( + persona=persona, + analyst_type=analyst_type if analyst_type in ANALYST_TYPES else None, + soul_md=soul_md, + agents_md=agents_md, + model_name=model_name, + ) + + # Call the pipeline's create method + result_message = self._create_callback(agent_id, analyst_type, custom_config) + + # Parse result + success = result_message.startswith("Created") + if success: + self._instance_configs[agent_id] = custom_config if custom_config else AnalystConfig( + analyst_type=analyst_type + ) + + return { + "success": success, + "agent_id": agent_id if success else None, + "message": result_message, + "analyst_type": analyst_type, + } + + def clone_analyst( + self, + source_id: str, + new_id: str, + name: Optional[str] = None, + focus_additions: Optional[List[str]] = None, + description_override: Optional[str] = None, + model_name: Optional[str] = None, + ) -> Dict[str, Any]: + """Clone an existing analyst with optional modifications. + + Creates a new analyst by copying the configuration of an existing one + and applying specified overrides. Useful for creating specialized + variants (e.g., "crypto_technical" from "technical_analyst"). + + Args: + source_id: ID of the analyst to clone + new_id: Unique identifier for the new analyst + name: New display name (if different from source) + focus_additions: Additional focus areas to add + description_override: Completely new description + model_name: Override the model from source + + Returns: + Dict with success status, message, and new analyst info + + Example: + >>> result = clone_analyst( + ... source_id="technical_analyst", + ... new_id="crypto_technical_01", + ... name="加密货币技术分析师", + ... focus_additions=["链上数据", "DeFi协议分析"], + ... ) + """ + # Get source config if available + source_config = self._instance_configs.get(source_id) + + # Determine base type and config + if source_config: + base_type = source_config.analyst_type or source_id + base_persona = source_config.persona + else: + # Assume source_id is a known type + base_type = source_id + base_persona = None + + # Build new persona + new_focus = list(base_persona.focus) if base_persona else [] + if focus_additions: + new_focus.extend(focus_additions) + + new_name = name or (base_persona.name if base_persona else new_id) + new_description = description_override or (base_persona.description if base_persona else "") + + # Create new config with parent reference + new_config = AnalystConfig( + persona=AnalystPersona( + name=new_name, + focus=new_focus, + description=new_description, + preferred_tools=base_persona.preferred_tools if base_persona else None, + ), + analyst_type=base_type if base_type in ANALYST_TYPES else None, + soul_md=source_config.soul_md if source_config else None, + agents_md=source_config.agents_md if source_config else None, + model_name=model_name or (source_config.model_name if source_config else None), + parent_id=source_id, + ) + + # Create the new analyst + result_message = self._create_callback(new_id, base_type, new_config) + + success = result_message.startswith("Created") + if success: + self._instance_configs[new_id] = new_config + + return { + "success": success, + "agent_id": new_id if success else None, + "parent_id": source_id, + "message": result_message, + } + + def remove_analyst(self, agent_id: str) -> Dict[str, Any]: + """Remove a dynamically created analyst from the team. + + Args: + agent_id: ID of the analyst to remove + + Returns: + Dict with success status and message + + Example: + >>> result = remove_analyst("options_specialist") + """ + result_message = self._remove_callback(agent_id) + success = result_message.startswith("Removed") or "not found" not in result_message.lower() + + if success and agent_id in self._instance_configs: + del self._instance_configs[agent_id] + + return { + "success": success, + "agent_id": agent_id, + "message": result_message, + } + + def list_analyst_types(self) -> List[Dict[str, Any]]: + """List all available analyst types. + + Returns a list of all available analyst types, including: + - Built-in types from ANALYST_TYPES + - Runtime registered custom types + + Returns: + List of analyst type information dictionaries + + Example: + >>> types = list_analyst_types() + >>> print(types[0]["type_id"]) # "fundamentals_analyst" + """ + result = [] + + # Add built-in types + for type_id, info in ANALYST_TYPES.items(): + result.append({ + "type_id": type_id, + "name": info.get("display_name", type_id), + "description": info.get("description", ""), + "is_builtin": True, + "source": "constants", + }) + + # Add runtime registered types + for type_id, persona in self._registered_types.items(): + result.append({ + "type_id": type_id, + "name": persona.name, + "description": persona.description, + "is_builtin": False, + "source": "runtime", + }) + + return result + + def get_analyst_info(self, agent_id: str) -> Dict[str, Any]: + """Get information about a specific analyst. + + Args: + agent_id: ID of the analyst + + Returns: + Dict with analyst configuration and status + """ + config = self._instance_configs.get(agent_id) + if not config: + return { + "found": False, + "agent_id": agent_id, + "message": f"No configuration found for '{agent_id}'", + } + + return { + "found": True, + "agent_id": agent_id, + "config": config.to_dict(), + "is_custom": config.persona is not None, + "is_clone": config.parent_id is not None, + "parent_id": config.parent_id, + } + + def register_analyst_type( + self, + type_id: str, + name: str, + focus: List[str], + description: str, + preferred_tools: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Register a new analyst type for later creation. + + This allows defining reusable analyst personas that can be instantiated + multiple times with different configurations. + + Args: + type_id: Unique identifier for this type (e.g., "options_analyst") + name: Display name + focus: List of focus areas + description: Detailed description + preferred_tools: Optional list of preferred tool categories + + Returns: + Dict with success status and type info + + Example: + >>> result = register_analyst_type( + ... type_id="options_analyst", + ... name="期权分析师", + ... focus=["期权定价", "希腊字母分析"], + ... description="专注于期权策略和波动率分析", + ... ) + """ + if type_id in self._registered_types or type_id in ANALYST_TYPES: + return { + "success": False, + "type_id": type_id, + "message": f"Type '{type_id}' already exists", + } + + persona = AnalystPersona( + name=name, + focus=focus, + description=description, + preferred_tools=preferred_tools, + ) + self._registered_types[type_id] = persona + + return { + "success": True, + "type_id": type_id, + "persona": persona.to_dict(), + "message": f"Registered new analyst type '{type_id}'", + } + + def get_team_summary(self) -> Dict[str, Any]: + """Get a summary of the current analyst team. + + Returns: + Dict with team composition information + """ + analysts = [] + for agent_id, config in self._instance_configs.items(): + analysts.append({ + "agent_id": agent_id, + "name": config.persona.name if config.persona else agent_id, + "type": config.analyst_type, + "is_custom": config.persona is not None, + "is_clone": config.parent_id is not None, + }) + + return { + "total_analysts": len(analysts), + "custom_analysts": len([a for a in analysts if a["is_custom"]]), + "cloned_analysts": len([a for a in analysts if a["is_clone"]]), + "analysts": analysts, + "registered_types": len(self._registered_types), + } + + +# Global controller instance - set by pipeline +_controller_instance: Optional[DynamicTeamController] = None + + +def set_controller(controller: DynamicTeamController) -> None: + """Set the global controller instance. + + Called by TradingPipeline when initializing the PM agent. + """ + global _controller_instance + _controller_instance = controller + + +def get_controller() -> Optional[DynamicTeamController]: + """Get the global controller instance. + + Returns: + DynamicTeamController instance or None if not set + """ + return _controller_instance + + +# Tool functions that wrap the controller methods +# These are registered with the PM's toolkit + +def create_analyst( + agent_id: str, + analyst_type: str, + name: Optional[str] = None, + focus: Optional[str] = None, + description: Optional[str] = None, + soul_md: Optional[str] = None, + agents_md: Optional[str] = None, + model_name: Optional[str] = None, +) -> Dict[str, Any]: + """Tool wrapper for create_analyst. + + Note: focus parameter accepts comma-separated string for tool compatibility. + """ + controller = get_controller() + if not controller: + return {"success": False, "error": "Dynamic team controller not available"} + + focus_list = [f.strip() for f in focus.split(",")] if focus else None + return controller.create_analyst( + agent_id=agent_id, + analyst_type=analyst_type, + name=name, + focus=focus_list, + description=description, + soul_md=soul_md, + agents_md=agents_md, + model_name=model_name, + ) + + +def clone_analyst( + source_id: str, + new_id: str, + name: Optional[str] = None, + focus_additions: Optional[str] = None, + description_override: Optional[str] = None, + model_name: Optional[str] = None, +) -> Dict[str, Any]: + """Tool wrapper for clone_analyst. + + Note: focus_additions accepts comma-separated string. + """ + controller = get_controller() + if not controller: + return {"success": False, "error": "Dynamic team controller not available"} + + additions_list = [f.strip() for f in focus_additions.split(",")] if focus_additions else None + return controller.clone_analyst( + source_id=source_id, + new_id=new_id, + name=name, + focus_additions=additions_list, + description_override=description_override, + model_name=model_name, + ) + + +def remove_analyst(agent_id: str) -> Dict[str, Any]: + """Tool wrapper for remove_analyst.""" + controller = get_controller() + if not controller: + return {"success": False, "error": "Dynamic team controller not available"} + return controller.remove_analyst(agent_id) + + +def list_analyst_types() -> List[Dict[str, Any]]: + """Tool wrapper for list_analyst_types.""" + controller = get_controller() + if not controller: + return [] + return controller.list_analyst_types() + + +def get_analyst_info(agent_id: str) -> Dict[str, Any]: + """Tool wrapper for get_analyst_info.""" + controller = get_controller() + if not controller: + return {"found": False, "error": "Controller not available"} + return controller.get_analyst_info(agent_id) + + +def get_team_summary() -> Dict[str, Any]: + """Tool wrapper for get_team_summary.""" + controller = get_controller() + if not controller: + return {"error": "Controller not available"} + return controller.get_team_summary() + + +__all__ = [ + "DynamicTeamController", + "set_controller", + "get_controller", + "create_analyst", + "clone_analyst", + "remove_analyst", + "list_analyst_types", + "get_analyst_info", + "get_team_summary", +]