Files
evotraders/backend/tools/dynamic_team_tools.py

519 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""Dynamic Team Management Tools - Tools for PM to manage analyst team dynamically.
This module provides tools for the Portfolio Manager to:
- Create new analysts with custom configuration
- Clone existing analysts with variations
- Remove analysts from the team
- List available analyst types
- Get analyst information
These tools are registered with the PM's toolkit and enable dynamic team management
as described in the Dynamic Team Architecture.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Callable
from dataclasses import asdict
from backend.agents.dynamic_team_types import (
AnalystPersona,
AnalystConfig,
CreateAnalystResult,
AnalystTypeInfo,
)
from backend.config.constants import ANALYST_TYPES
# Type alias for callbacks set by pipeline
CreateAnalystCallback = Callable[[str, str, Optional[AnalystConfig]], str]
RemoveAnalystCallback = Callable[[str], str]
class DynamicTeamController:
"""Controller for dynamic analyst team management.
This class is instantiated by TradingPipeline and injected into the PM agent
via set_team_controller(). It provides methods that the PM can call through
tools to manage the analyst team dynamically.
Attributes:
create_callback: Callback to _create_runtime_analyst in pipeline
remove_callback: Callback to _remove_runtime_analyst in pipeline
get_analysts_callback: Callback to get current analysts list
registered_types: Runtime-registered custom analyst types
"""
def __init__(
self,
create_callback: CreateAnalystCallback,
remove_callback: RemoveAnalystCallback,
get_analysts_callback: Optional[Callable[[], List[Any]]] = None,
):
"""Initialize the controller with callbacks from pipeline.
Args:
create_callback: Function to create a runtime analyst
remove_callback: Function to remove a runtime analyst
get_analysts_callback: Optional function to get current analysts
"""
self._create_callback = create_callback
self._remove_callback = remove_callback
self._get_analysts_callback = get_analysts_callback
self._registered_types: Dict[str, AnalystPersona] = {}
self._instance_configs: Dict[str, AnalystConfig] = {}
def create_analyst(
self,
agent_id: str,
analyst_type: str,
name: Optional[str] = None,
focus: Optional[List[str]] = None,
description: Optional[str] = None,
soul_md: Optional[str] = None,
agents_md: Optional[str] = None,
model_name: Optional[str] = None,
preferred_tools: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Create a new analyst with optional custom configuration.
This tool allows the Portfolio Manager to dynamically create new analysts
during a trading session. The analyst can be based on a predefined type
or fully customized with a unique persona.
Args:
agent_id: Unique identifier for the new analyst (e.g., "crypto_specialist_01")
analyst_type: Base type (e.g., "technical_analyst") or custom identifier
name: Display name for the analyst (overrides default)
focus: List of focus areas (overrides default)
description: Detailed description (overrides default)
soul_md: Custom SOUL.md content for the analyst's workspace
agents_md: Custom AGENTS.md content
model_name: Override the default LLM model
preferred_tools: List of preferred tool categories
Returns:
Dict with success status, message, and analyst info
Example:
>>> result = create_analyst(
... agent_id="options_specialist",
... analyst_type="technical_analyst",
... name="期权策略分析师",
... focus=["期权定价", "波动率交易"],
... description="专注于期权市场分析和波动率交易策略...",
... )
"""
# Build custom config if any customization is provided
custom_config = None
if any([name, focus, description, soul_md, agents_md, model_name, preferred_tools]):
persona = None
if name or focus or description:
persona = AnalystPersona(
name=name or f"Custom {analyst_type}",
focus=focus or ["General Analysis"],
description=description or f"Custom analyst based on {analyst_type}",
preferred_tools=preferred_tools,
)
custom_config = AnalystConfig(
persona=persona,
analyst_type=analyst_type if analyst_type in ANALYST_TYPES else None,
soul_md=soul_md,
agents_md=agents_md,
model_name=model_name,
)
# Call the pipeline's create method
result_message = self._create_callback(agent_id, analyst_type, custom_config)
# Parse result
success = result_message.startswith("Created")
if success:
self._instance_configs[agent_id] = custom_config if custom_config else AnalystConfig(
analyst_type=analyst_type
)
return {
"success": success,
"agent_id": agent_id if success else None,
"message": result_message,
"analyst_type": analyst_type,
}
def clone_analyst(
self,
source_id: str,
new_id: str,
name: Optional[str] = None,
focus_additions: Optional[List[str]] = None,
description_override: Optional[str] = None,
model_name: Optional[str] = None,
) -> Dict[str, Any]:
"""Clone an existing analyst with optional modifications.
Creates a new analyst by copying the configuration of an existing one
and applying specified overrides. Useful for creating specialized
variants (e.g., "crypto_technical" from "technical_analyst").
Args:
source_id: ID of the analyst to clone
new_id: Unique identifier for the new analyst
name: New display name (if different from source)
focus_additions: Additional focus areas to add
description_override: Completely new description
model_name: Override the model from source
Returns:
Dict with success status, message, and new analyst info
Example:
>>> result = clone_analyst(
... source_id="technical_analyst",
... new_id="crypto_technical_01",
... name="加密货币技术分析师",
... focus_additions=["链上数据", "DeFi协议分析"],
... )
"""
# Get source config if available
source_config = self._instance_configs.get(source_id)
# Determine base type and config
if source_config:
base_type = source_config.analyst_type or source_id
base_persona = source_config.persona
else:
# Assume source_id is a known type
base_type = source_id
base_persona = None
# Build new persona
new_focus = list(base_persona.focus) if base_persona else []
if focus_additions:
new_focus.extend(focus_additions)
new_name = name or (base_persona.name if base_persona else new_id)
new_description = description_override or (base_persona.description if base_persona else "")
# Create new config with parent reference
new_config = AnalystConfig(
persona=AnalystPersona(
name=new_name,
focus=new_focus,
description=new_description,
preferred_tools=base_persona.preferred_tools if base_persona else None,
),
analyst_type=base_type if base_type in ANALYST_TYPES else None,
soul_md=source_config.soul_md if source_config else None,
agents_md=source_config.agents_md if source_config else None,
model_name=model_name or (source_config.model_name if source_config else None),
parent_id=source_id,
)
# Create the new analyst
result_message = self._create_callback(new_id, base_type, new_config)
success = result_message.startswith("Created")
if success:
self._instance_configs[new_id] = new_config
return {
"success": success,
"agent_id": new_id if success else None,
"parent_id": source_id,
"message": result_message,
}
def remove_analyst(self, agent_id: str) -> Dict[str, Any]:
"""Remove a dynamically created analyst from the team.
Args:
agent_id: ID of the analyst to remove
Returns:
Dict with success status and message
Example:
>>> result = remove_analyst("options_specialist")
"""
result_message = self._remove_callback(agent_id)
success = result_message.startswith("Removed") or "not found" not in result_message.lower()
if success and agent_id in self._instance_configs:
del self._instance_configs[agent_id]
return {
"success": success,
"agent_id": agent_id,
"message": result_message,
}
def list_analyst_types(self) -> List[Dict[str, Any]]:
"""List all available analyst types.
Returns a list of all available analyst types, including:
- Built-in types from ANALYST_TYPES
- Runtime registered custom types
Returns:
List of analyst type information dictionaries
Example:
>>> types = list_analyst_types()
>>> print(types[0]["type_id"]) # "fundamentals_analyst"
"""
result = []
# Add built-in types
for type_id, info in ANALYST_TYPES.items():
result.append({
"type_id": type_id,
"name": info.get("display_name", type_id),
"description": info.get("description", ""),
"is_builtin": True,
"source": "constants",
})
# Add runtime registered types
for type_id, persona in self._registered_types.items():
result.append({
"type_id": type_id,
"name": persona.name,
"description": persona.description,
"is_builtin": False,
"source": "runtime",
})
return result
def get_analyst_info(self, agent_id: str) -> Dict[str, Any]:
"""Get information about a specific analyst.
Args:
agent_id: ID of the analyst
Returns:
Dict with analyst configuration and status
"""
config = self._instance_configs.get(agent_id)
if not config:
return {
"found": False,
"agent_id": agent_id,
"message": f"No configuration found for '{agent_id}'",
}
return {
"found": True,
"agent_id": agent_id,
"config": config.to_dict(),
"is_custom": config.persona is not None,
"is_clone": config.parent_id is not None,
"parent_id": config.parent_id,
}
def register_analyst_type(
self,
type_id: str,
name: str,
focus: List[str],
description: str,
preferred_tools: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Register a new analyst type for later creation.
This allows defining reusable analyst personas that can be instantiated
multiple times with different configurations.
Args:
type_id: Unique identifier for this type (e.g., "options_analyst")
name: Display name
focus: List of focus areas
description: Detailed description
preferred_tools: Optional list of preferred tool categories
Returns:
Dict with success status and type info
Example:
>>> result = register_analyst_type(
... type_id="options_analyst",
... name="期权分析师",
... focus=["期权定价", "希腊字母分析"],
... description="专注于期权策略和波动率分析",
... )
"""
if type_id in self._registered_types or type_id in ANALYST_TYPES:
return {
"success": False,
"type_id": type_id,
"message": f"Type '{type_id}' already exists",
}
persona = AnalystPersona(
name=name,
focus=focus,
description=description,
preferred_tools=preferred_tools,
)
self._registered_types[type_id] = persona
return {
"success": True,
"type_id": type_id,
"persona": persona.to_dict(),
"message": f"Registered new analyst type '{type_id}'",
}
def get_team_summary(self) -> Dict[str, Any]:
"""Get a summary of the current analyst team.
Returns:
Dict with team composition information
"""
analysts = []
for agent_id, config in self._instance_configs.items():
analysts.append({
"agent_id": agent_id,
"name": config.persona.name if config.persona else agent_id,
"type": config.analyst_type,
"is_custom": config.persona is not None,
"is_clone": config.parent_id is not None,
})
return {
"total_analysts": len(analysts),
"custom_analysts": len([a for a in analysts if a["is_custom"]]),
"cloned_analysts": len([a for a in analysts if a["is_clone"]]),
"analysts": analysts,
"registered_types": len(self._registered_types),
}
# Global controller instance - set by pipeline
_controller_instance: Optional[DynamicTeamController] = None
def set_controller(controller: DynamicTeamController) -> None:
"""Set the global controller instance.
Called by TradingPipeline when initializing the PM agent.
"""
global _controller_instance
_controller_instance = controller
def get_controller() -> Optional[DynamicTeamController]:
"""Get the global controller instance.
Returns:
DynamicTeamController instance or None if not set
"""
return _controller_instance
# Tool functions that wrap the controller methods
# These are registered with the PM's toolkit
def create_analyst(
agent_id: str,
analyst_type: str,
name: Optional[str] = None,
focus: Optional[str] = None,
description: Optional[str] = None,
soul_md: Optional[str] = None,
agents_md: Optional[str] = None,
model_name: Optional[str] = None,
) -> Dict[str, Any]:
"""Tool wrapper for create_analyst.
Note: focus parameter accepts comma-separated string for tool compatibility.
"""
controller = get_controller()
if not controller:
return {"success": False, "error": "Dynamic team controller not available"}
focus_list = [f.strip() for f in focus.split(",")] if focus else None
return controller.create_analyst(
agent_id=agent_id,
analyst_type=analyst_type,
name=name,
focus=focus_list,
description=description,
soul_md=soul_md,
agents_md=agents_md,
model_name=model_name,
)
def clone_analyst(
source_id: str,
new_id: str,
name: Optional[str] = None,
focus_additions: Optional[str] = None,
description_override: Optional[str] = None,
model_name: Optional[str] = None,
) -> Dict[str, Any]:
"""Tool wrapper for clone_analyst.
Note: focus_additions accepts comma-separated string.
"""
controller = get_controller()
if not controller:
return {"success": False, "error": "Dynamic team controller not available"}
additions_list = [f.strip() for f in focus_additions.split(",")] if focus_additions else None
return controller.clone_analyst(
source_id=source_id,
new_id=new_id,
name=name,
focus_additions=additions_list,
description_override=description_override,
model_name=model_name,
)
def remove_analyst(agent_id: str) -> Dict[str, Any]:
"""Tool wrapper for remove_analyst."""
controller = get_controller()
if not controller:
return {"success": False, "error": "Dynamic team controller not available"}
return controller.remove_analyst(agent_id)
def list_analyst_types() -> List[Dict[str, Any]]:
"""Tool wrapper for list_analyst_types."""
controller = get_controller()
if not controller:
return []
return controller.list_analyst_types()
def get_analyst_info(agent_id: str) -> Dict[str, Any]:
"""Tool wrapper for get_analyst_info."""
controller = get_controller()
if not controller:
return {"found": False, "error": "Controller not available"}
return controller.get_analyst_info(agent_id)
def get_team_summary() -> Dict[str, Any]:
"""Tool wrapper for get_team_summary."""
controller = get_controller()
if not controller:
return {"error": "Controller not available"}
return controller.get_team_summary()
__all__ = [
"DynamicTeamController",
"set_controller",
"get_controller",
"create_analyst",
"clone_analyst",
"remove_analyst",
"list_analyst_types",
"get_analyst_info",
"get_team_summary",
]