# -*- coding: utf-8 -*- """Dynamic Team Management Tools - Tools for PM to manage analyst team dynamically. This module provides tools for the Portfolio Manager to: - Create new analysts with custom configuration - Clone existing analysts with variations - Remove analysts from the team - List available analyst types - Get analyst information These tools are registered with the PM's toolkit and enable dynamic team management as described in the Dynamic Team Architecture. """ from __future__ import annotations from typing import Any, Dict, List, Optional, Callable from dataclasses import asdict from backend.agents.dynamic_team_types import ( AnalystPersona, AnalystConfig, CreateAnalystResult, AnalystTypeInfo, ) from backend.config.constants import ANALYST_TYPES # Type alias for callbacks set by pipeline CreateAnalystCallback = Callable[[str, str, Optional[AnalystConfig]], str] RemoveAnalystCallback = Callable[[str], str] class DynamicTeamController: """Controller for dynamic analyst team management. This class is instantiated by TradingPipeline and injected into the PM agent via set_team_controller(). It provides methods that the PM can call through tools to manage the analyst team dynamically. Attributes: create_callback: Callback to _create_runtime_analyst in pipeline remove_callback: Callback to _remove_runtime_analyst in pipeline get_analysts_callback: Callback to get current analysts list registered_types: Runtime-registered custom analyst types """ def __init__( self, create_callback: CreateAnalystCallback, remove_callback: RemoveAnalystCallback, get_analysts_callback: Optional[Callable[[], List[Any]]] = None, ): """Initialize the controller with callbacks from pipeline. Args: create_callback: Function to create a runtime analyst remove_callback: Function to remove a runtime analyst get_analysts_callback: Optional function to get current analysts """ self._create_callback = create_callback self._remove_callback = remove_callback self._get_analysts_callback = get_analysts_callback self._registered_types: Dict[str, AnalystPersona] = {} self._instance_configs: Dict[str, AnalystConfig] = {} def create_analyst( self, agent_id: str, analyst_type: str, name: Optional[str] = None, focus: Optional[List[str]] = None, description: Optional[str] = None, soul_md: Optional[str] = None, agents_md: Optional[str] = None, model_name: Optional[str] = None, preferred_tools: Optional[List[str]] = None, ) -> Dict[str, Any]: """Create a new analyst with optional custom configuration. This tool allows the Portfolio Manager to dynamically create new analysts during a trading session. The analyst can be based on a predefined type or fully customized with a unique persona. Args: agent_id: Unique identifier for the new analyst (e.g., "crypto_specialist_01") analyst_type: Base type (e.g., "technical_analyst") or custom identifier name: Display name for the analyst (overrides default) focus: List of focus areas (overrides default) description: Detailed description (overrides default) soul_md: Custom SOUL.md content for the analyst's workspace agents_md: Custom AGENTS.md content model_name: Override the default LLM model preferred_tools: List of preferred tool categories Returns: Dict with success status, message, and analyst info Example: >>> result = create_analyst( ... agent_id="options_specialist", ... analyst_type="technical_analyst", ... name="期权策略分析师", ... focus=["期权定价", "波动率交易"], ... description="专注于期权市场分析和波动率交易策略...", ... ) """ # Build custom config if any customization is provided custom_config = None if any([name, focus, description, soul_md, agents_md, model_name, preferred_tools]): persona = None if name or focus or description: persona = AnalystPersona( name=name or f"Custom {analyst_type}", focus=focus or ["General Analysis"], description=description or f"Custom analyst based on {analyst_type}", preferred_tools=preferred_tools, ) custom_config = AnalystConfig( persona=persona, analyst_type=analyst_type if analyst_type in ANALYST_TYPES else None, soul_md=soul_md, agents_md=agents_md, model_name=model_name, ) # Call the pipeline's create method result_message = self._create_callback(agent_id, analyst_type, custom_config) # Parse result success = result_message.startswith("Created") if success: self._instance_configs[agent_id] = custom_config if custom_config else AnalystConfig( analyst_type=analyst_type ) return { "success": success, "agent_id": agent_id if success else None, "message": result_message, "analyst_type": analyst_type, } def clone_analyst( self, source_id: str, new_id: str, name: Optional[str] = None, focus_additions: Optional[List[str]] = None, description_override: Optional[str] = None, model_name: Optional[str] = None, ) -> Dict[str, Any]: """Clone an existing analyst with optional modifications. Creates a new analyst by copying the configuration of an existing one and applying specified overrides. Useful for creating specialized variants (e.g., "crypto_technical" from "technical_analyst"). Args: source_id: ID of the analyst to clone new_id: Unique identifier for the new analyst name: New display name (if different from source) focus_additions: Additional focus areas to add description_override: Completely new description model_name: Override the model from source Returns: Dict with success status, message, and new analyst info Example: >>> result = clone_analyst( ... source_id="technical_analyst", ... new_id="crypto_technical_01", ... name="加密货币技术分析师", ... focus_additions=["链上数据", "DeFi协议分析"], ... ) """ # Get source config if available source_config = self._instance_configs.get(source_id) # Determine base type and config if source_config: base_type = source_config.analyst_type or source_id base_persona = source_config.persona else: # Assume source_id is a known type base_type = source_id base_persona = None # Build new persona new_focus = list(base_persona.focus) if base_persona else [] if focus_additions: new_focus.extend(focus_additions) new_name = name or (base_persona.name if base_persona else new_id) new_description = description_override or (base_persona.description if base_persona else "") # Create new config with parent reference new_config = AnalystConfig( persona=AnalystPersona( name=new_name, focus=new_focus, description=new_description, preferred_tools=base_persona.preferred_tools if base_persona else None, ), analyst_type=base_type if base_type in ANALYST_TYPES else None, soul_md=source_config.soul_md if source_config else None, agents_md=source_config.agents_md if source_config else None, model_name=model_name or (source_config.model_name if source_config else None), parent_id=source_id, ) # Create the new analyst result_message = self._create_callback(new_id, base_type, new_config) success = result_message.startswith("Created") if success: self._instance_configs[new_id] = new_config return { "success": success, "agent_id": new_id if success else None, "parent_id": source_id, "message": result_message, } def remove_analyst(self, agent_id: str) -> Dict[str, Any]: """Remove a dynamically created analyst from the team. Args: agent_id: ID of the analyst to remove Returns: Dict with success status and message Example: >>> result = remove_analyst("options_specialist") """ result_message = self._remove_callback(agent_id) success = result_message.startswith("Removed") or "not found" not in result_message.lower() if success and agent_id in self._instance_configs: del self._instance_configs[agent_id] return { "success": success, "agent_id": agent_id, "message": result_message, } def list_analyst_types(self) -> List[Dict[str, Any]]: """List all available analyst types. Returns a list of all available analyst types, including: - Built-in types from ANALYST_TYPES - Runtime registered custom types Returns: List of analyst type information dictionaries Example: >>> types = list_analyst_types() >>> print(types[0]["type_id"]) # "fundamentals_analyst" """ result = [] # Add built-in types for type_id, info in ANALYST_TYPES.items(): result.append({ "type_id": type_id, "name": info.get("display_name", type_id), "description": info.get("description", ""), "is_builtin": True, "source": "constants", }) # Add runtime registered types for type_id, persona in self._registered_types.items(): result.append({ "type_id": type_id, "name": persona.name, "description": persona.description, "is_builtin": False, "source": "runtime", }) return result def get_analyst_info(self, agent_id: str) -> Dict[str, Any]: """Get information about a specific analyst. Args: agent_id: ID of the analyst Returns: Dict with analyst configuration and status """ config = self._instance_configs.get(agent_id) if not config: return { "found": False, "agent_id": agent_id, "message": f"No configuration found for '{agent_id}'", } return { "found": True, "agent_id": agent_id, "config": config.to_dict(), "is_custom": config.persona is not None, "is_clone": config.parent_id is not None, "parent_id": config.parent_id, } def register_analyst_type( self, type_id: str, name: str, focus: List[str], description: str, preferred_tools: Optional[List[str]] = None, ) -> Dict[str, Any]: """Register a new analyst type for later creation. This allows defining reusable analyst personas that can be instantiated multiple times with different configurations. Args: type_id: Unique identifier for this type (e.g., "options_analyst") name: Display name focus: List of focus areas description: Detailed description preferred_tools: Optional list of preferred tool categories Returns: Dict with success status and type info Example: >>> result = register_analyst_type( ... type_id="options_analyst", ... name="期权分析师", ... focus=["期权定价", "希腊字母分析"], ... description="专注于期权策略和波动率分析", ... ) """ if type_id in self._registered_types or type_id in ANALYST_TYPES: return { "success": False, "type_id": type_id, "message": f"Type '{type_id}' already exists", } persona = AnalystPersona( name=name, focus=focus, description=description, preferred_tools=preferred_tools, ) self._registered_types[type_id] = persona return { "success": True, "type_id": type_id, "persona": persona.to_dict(), "message": f"Registered new analyst type '{type_id}'", } def get_team_summary(self) -> Dict[str, Any]: """Get a summary of the current analyst team. Returns: Dict with team composition information """ analysts = [] for agent_id, config in self._instance_configs.items(): analysts.append({ "agent_id": agent_id, "name": config.persona.name if config.persona else agent_id, "type": config.analyst_type, "is_custom": config.persona is not None, "is_clone": config.parent_id is not None, }) return { "total_analysts": len(analysts), "custom_analysts": len([a for a in analysts if a["is_custom"]]), "cloned_analysts": len([a for a in analysts if a["is_clone"]]), "analysts": analysts, "registered_types": len(self._registered_types), } # Global controller instance - set by pipeline _controller_instance: Optional[DynamicTeamController] = None def set_controller(controller: DynamicTeamController) -> None: """Set the global controller instance. Called by TradingPipeline when initializing the PM agent. """ global _controller_instance _controller_instance = controller def get_controller() -> Optional[DynamicTeamController]: """Get the global controller instance. Returns: DynamicTeamController instance or None if not set """ return _controller_instance # Tool functions that wrap the controller methods # These are registered with the PM's toolkit def create_analyst( agent_id: str, analyst_type: str, name: Optional[str] = None, focus: Optional[str] = None, description: Optional[str] = None, soul_md: Optional[str] = None, agents_md: Optional[str] = None, model_name: Optional[str] = None, ) -> Dict[str, Any]: """Tool wrapper for create_analyst. Note: focus parameter accepts comma-separated string for tool compatibility. """ controller = get_controller() if not controller: return {"success": False, "error": "Dynamic team controller not available"} focus_list = [f.strip() for f in focus.split(",")] if focus else None return controller.create_analyst( agent_id=agent_id, analyst_type=analyst_type, name=name, focus=focus_list, description=description, soul_md=soul_md, agents_md=agents_md, model_name=model_name, ) def clone_analyst( source_id: str, new_id: str, name: Optional[str] = None, focus_additions: Optional[str] = None, description_override: Optional[str] = None, model_name: Optional[str] = None, ) -> Dict[str, Any]: """Tool wrapper for clone_analyst. Note: focus_additions accepts comma-separated string. """ controller = get_controller() if not controller: return {"success": False, "error": "Dynamic team controller not available"} additions_list = [f.strip() for f in focus_additions.split(",")] if focus_additions else None return controller.clone_analyst( source_id=source_id, new_id=new_id, name=name, focus_additions=additions_list, description_override=description_override, model_name=model_name, ) def remove_analyst(agent_id: str) -> Dict[str, Any]: """Tool wrapper for remove_analyst.""" controller = get_controller() if not controller: return {"success": False, "error": "Dynamic team controller not available"} return controller.remove_analyst(agent_id) def list_analyst_types() -> List[Dict[str, Any]]: """Tool wrapper for list_analyst_types.""" controller = get_controller() if not controller: return [] return controller.list_analyst_types() def get_analyst_info(agent_id: str) -> Dict[str, Any]: """Tool wrapper for get_analyst_info.""" controller = get_controller() if not controller: return {"found": False, "error": "Controller not available"} return controller.get_analyst_info(agent_id) def get_team_summary() -> Dict[str, Any]: """Tool wrapper for get_team_summary.""" controller = get_controller() if not controller: return {"error": "Controller not available"} return controller.get_team_summary() __all__ = [ "DynamicTeamController", "set_controller", "get_controller", "create_analyst", "clone_analyst", "remove_analyst", "list_analyst_types", "get_analyst_info", "get_team_summary", ]