feat: add runtime dynamic team controls

This commit is contained in:
2026-04-03 13:48:31 +08:00
parent dc0b250adc
commit ecfbd87244
16 changed files with 2146 additions and 147 deletions

View File

@@ -0,0 +1,372 @@
# -*- coding: utf-8 -*-
"""Dynamic Team Types - Core data types for PM-driven analyst team management.
This module provides data structures for:
- Analyst persona definitions (custom analyst types)
- Analyst creation configuration (custom SOUL.md, AGENTS.md, etc.)
- Dynamic team runtime state tracking
These types enable the Portfolio Manager to dynamically create, clone, and manage
analyst agents with custom configurations beyond the predefined 4 analyst types.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from datetime import datetime
@dataclass
class AnalystPersona:
"""Analyst role definition - extends or replaces personas.yaml entries.
Defines the identity, focus areas, and characteristics of an analyst type.
Can be used to create entirely new analyst types at runtime.
Attributes:
name: Display name for the analyst (e.g., "期权策略分析师")
focus: List of focus areas (e.g., ["期权定价", "波动率交易"])
description: Detailed description of the analyst's role and expertise
preferred_tools: Optional list of preferred tool types or categories
icon: Optional icon identifier for frontend display
"""
name: str
focus: List[str]
description: str
preferred_tools: Optional[List[str]] = None
icon: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"name": self.name,
"focus": self.focus,
"description": self.description,
"preferred_tools": self.preferred_tools,
"icon": self.icon,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> AnalystPersona:
"""Create from dictionary."""
return cls(
name=data["name"],
focus=data.get("focus", []),
description=data.get("description", ""),
preferred_tools=data.get("preferred_tools"),
icon=data.get("icon"),
)
@dataclass
class AnalystConfig:
"""Complete configuration for dynamically creating an analyst.
This dataclass allows the PM to specify all aspects of analyst creation,
including custom workspace files, model overrides, and skill selections.
Attributes:
persona: Complete persona definition (if creating custom type)
analyst_type: Reference to predefined type (e.g., "technical_analyst")
soul_md: Custom SOUL.md content (overrides default generation)
agents_md: Custom AGENTS.md content (overrides default generation)
profile_md: Custom PROFILE.md content (overrides default generation)
skills: List of skill IDs to enable for this analyst
model_name: Override default model for this analyst
memory_config: Custom memory system configuration
tags: Classification tags (e.g., ["options", "derivatives"])
parent_id: If cloned, the source analyst ID
"""
# Identity configuration
persona: Optional[AnalystPersona] = None
analyst_type: Optional[str] = None # Reference to predefined type
# Workspace file contents (override default generation)
soul_md: Optional[str] = None
agents_md: Optional[str] = None
profile_md: Optional[str] = None
bootstrap_md: Optional[str] = None
# Runtime configuration
skills: Optional[List[str]] = field(default_factory=list)
model_name: Optional[str] = None
memory_config: Optional[Dict[str, Any]] = field(default_factory=dict)
# Metadata
tags: Optional[List[str]] = field(default_factory=list)
parent_id: Optional[str] = None # For clone tracking
def __post_init__(self):
"""Initialize default collections."""
if self.skills is None:
self.skills = []
if self.memory_config is None:
self.memory_config = {}
if self.tags is None:
self.tags = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"persona": self.persona.to_dict() if self.persona else None,
"analyst_type": self.analyst_type,
"soul_md": self.soul_md,
"agents_md": self.agents_md,
"profile_md": self.profile_md,
"bootstrap_md": self.bootstrap_md,
"skills": self.skills,
"model_name": self.model_name,
"memory_config": self.memory_config,
"tags": self.tags,
"parent_id": self.parent_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> AnalystConfig:
"""Create from dictionary."""
persona_data = data.get("persona")
return cls(
persona=AnalystPersona.from_dict(persona_data) if persona_data else None,
analyst_type=data.get("analyst_type"),
soul_md=data.get("soul_md"),
agents_md=data.get("agents_md"),
profile_md=data.get("profile_md"),
bootstrap_md=data.get("bootstrap_md"),
skills=data.get("skills", []),
model_name=data.get("model_name"),
memory_config=data.get("memory_config", {}),
tags=data.get("tags", []),
parent_id=data.get("parent_id"),
)
def get_effective_analyst_type(self) -> Optional[str]:
"""Get the effective analyst type for tool selection.
Returns analyst_type if set, otherwise derives from persona name.
"""
if self.analyst_type:
return self.analyst_type
if self.persona:
# Derive type ID from persona name (e.g., "期权策略分析师" -> "options_strategist")
return self._derive_type_id(self.persona.name)
return None
@staticmethod
def _derive_type_id(name: str) -> str:
"""Derive a type ID from a display name."""
import re
# Convert Chinese or mixed names to snake_case
# Remove special characters, keep alphanumeric and spaces
cleaned = re.sub(r'[^\w\s]', '', name)
# Convert to lowercase and replace spaces with underscores
return cleaned.lower().strip().replace(' ', '_')
@dataclass
class DynamicAnalystInstance:
"""Runtime information about a dynamically created analyst.
Tracks the creation metadata and current state of a dynamic analyst.
Attributes:
agent_id: Unique identifier for this analyst instance
config: The configuration used to create this analyst
created_at: Timestamp when the analyst was created
created_by: Identifier of the agent that created this analyst (usually PM)
status: Current status (active, paused, removed)
"""
agent_id: str
config: AnalystConfig
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
created_by: str = "portfolio_manager"
status: str = "active" # active, paused, removed
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"agent_id": self.agent_id,
"config": self.config.to_dict(),
"created_at": self.created_at,
"created_by": self.created_by,
"status": self.status,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> DynamicAnalystInstance:
"""Create from dictionary."""
return cls(
agent_id=data["agent_id"],
config=AnalystConfig.from_dict(data.get("config", {})),
created_at=data.get("created_at", datetime.now().isoformat()),
created_by=data.get("created_by", "portfolio_manager"),
status=data.get("status", "active"),
)
@dataclass
class DynamicTeamState:
"""Complete runtime state for dynamic analyst team management.
This state is persisted alongside TEAM_PIPELINE.yaml and tracks:
- Custom analyst types registered at runtime
- All dynamically created analyst instances
- Configuration snapshots for cloning
Attributes:
run_id: The run configuration this state belongs to
registered_types: Runtime-registered analyst type definitions
instances: Dynamically created analyst instances
version: State format version for migration handling
"""
run_id: str
registered_types: Dict[str, AnalystPersona] = field(default_factory=dict)
instances: Dict[str, DynamicAnalystInstance] = field(default_factory=dict)
version: int = 1
def register_type(self, type_id: str, persona: AnalystPersona) -> bool:
"""Register a new analyst type.
Returns:
True if registered, False if type_id already exists
"""
if type_id in self.registered_types:
return False
self.registered_types[type_id] = persona
return True
def add_instance(self, instance: DynamicAnalystInstance) -> None:
"""Add a new analyst instance."""
self.instances[instance.agent_id] = instance
def remove_instance(self, agent_id: str) -> bool:
"""Mark an instance as removed.
Returns:
True if instance was found and removed
"""
if agent_id in self.instances:
self.instances[agent_id].status = "removed"
return True
return False
def get_active_instances(self) -> List[DynamicAnalystInstance]:
"""Get all active (non-removed) analyst instances."""
return [
inst for inst in self.instances.values()
if inst.status == "active"
]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"run_id": self.run_id,
"registered_types": {
k: v.to_dict() for k, v in self.registered_types.items()
},
"instances": {
k: v.to_dict() for k, v in self.instances.items()
},
"version": self.version,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> DynamicTeamState:
"""Create from dictionary."""
registered_types = {
k: AnalystPersona.from_dict(v)
for k, v in data.get("registered_types", {}).items()
}
instances = {
k: DynamicAnalystInstance.from_dict(v)
for k, v in data.get("instances", {}).items()
}
return cls(
run_id=data.get("run_id", "unknown"),
registered_types=registered_types,
instances=instances,
version=data.get("version", 1),
)
@dataclass
class CreateAnalystResult:
"""Result of creating a dynamic analyst.
Attributes:
success: Whether creation was successful
agent_id: The ID of the created analyst (if successful)
message: Human-readable result message
error: Error details (if failed)
"""
success: bool
agent_id: Optional[str] = None
message: str = ""
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API responses."""
return {
"success": self.success,
"agent_id": self.agent_id,
"message": self.message,
"error": self.error,
}
@dataclass
class CloneAnalystRequest:
"""Request to clone an existing analyst.
Attributes:
source_id: ID of the analyst to clone
new_id: ID for the new analyst
config_overrides: Configuration fields to override
"""
source_id: str
new_id: str
config_overrides: Optional[Dict[str, Any]] = field(default_factory=dict)
def __post_init__(self):
if self.config_overrides is None:
self.config_overrides = {}
@dataclass
class AnalystTypeInfo:
"""Information about an available analyst type.
Used for listing all available types (predefined + runtime-registered).
Attributes:
type_id: Unique identifier for this type
name: Display name
description: Type description
is_builtin: Whether this is a built-in type or runtime-registered
source: Source of this type (e.g., "constants", "runtime", "config")
"""
type_id: str
name: str
description: str
is_builtin: bool
source: str
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API responses."""
return {
"type_id": self.type_id,
"name": self.name,
"description": self.description,
"is_builtin": self.is_builtin,
"source": self.source,
}
__all__ = [
"AnalystPersona",
"AnalystConfig",
"DynamicAnalystInstance",
"DynamicTeamState",
"CreateAnalystResult",
"CloneAnalystRequest",
"AnalystTypeInfo",
]

View File

@@ -14,6 +14,14 @@ from backend.agents.agent_workspace import load_agent_workspace_config
from backend.agents.skills_manager import SkillsManager
from backend.agents.skill_metadata import parse_skill_metadata
from backend.config.bootstrap_config import get_bootstrap_config_for_run
from backend.tools.dynamic_team_tools import (
create_analyst,
clone_analyst,
remove_analyst,
list_analyst_types,
get_analyst_info,
get_team_summary,
)
def load_agent_profiles() -> Dict[str, Dict[str, Any]]:
@@ -138,6 +146,23 @@ def _register_portfolio_tool_groups(toolkit: Any, pm_agent: Any) -> None:
group_name="portfolio_ops",
)
# Register dynamic team management tools
toolkit.create_tool_group(
group_name="dynamic_team",
description="Dynamic analyst team management tools.",
active=False,
notes=(
"Use these tools to create, clone, and manage analyst agents dynamically. "
"Only available when allow_dynamic_team_update is enabled."
),
)
toolkit.register_tool_function(create_analyst, group_name="dynamic_team")
toolkit.register_tool_function(clone_analyst, group_name="dynamic_team")
toolkit.register_tool_function(remove_analyst, group_name="dynamic_team")
toolkit.register_tool_function(list_analyst_types, group_name="dynamic_team")
toolkit.register_tool_function(get_analyst_info, group_name="dynamic_team")
toolkit.register_tool_function(get_team_summary, group_name="dynamic_team")
def _register_risk_tool_groups(toolkit: Any) -> None:
"""注册风险工具组"""