# -*- coding: utf-8 -*- """Dynamic Team API - REST endpoints for managing analyst team dynamically. This module provides API endpoints for: - Creating new analysts with custom configuration - Cloning existing analysts - Removing analysts - Listing available analyst types - Getting analyst information - Managing team composition These endpoints allow both the PM agent (via tool calls) and frontend (via HTTP) to manage the analyst team dynamically. """ from __future__ import annotations import logging from pathlib import Path from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from backend.agents.dynamic_team_types import ( AnalystPersona, AnalystConfig, AnalystTypeInfo, ) from backend.config.constants import ANALYST_TYPES from backend.agents.prompt_loader import get_prompt_loader logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/dynamic-team", tags=["dynamic-team"]) PROJECT_ROOT = Path(__file__).resolve().parents[2] # Pydantic models for API requests/responses class AnalystPersonaRequest(BaseModel): """Request model for analyst persona definition.""" name: str = Field(..., description="Display name for the analyst") focus: List[str] = Field(default_factory=list, description="List of focus areas") description: str = Field(..., description="Detailed description") preferred_tools: Optional[List[str]] = Field(None, description="Preferred tool categories") icon: Optional[str] = Field(None, description="Icon identifier") class CreateAnalystRequest(BaseModel): """Request model for creating a new analyst.""" agent_id: str = Field(..., description="Unique identifier for the new analyst") analyst_type: str = Field(..., description="Base type or custom identifier") persona: Optional[AnalystPersonaRequest] = Field(None, description="Custom persona definition") soul_md: Optional[str] = Field(None, description="Custom SOUL.md content") agents_md: Optional[str] = Field(None, description="Custom AGENTS.md content") profile_md: Optional[str] = Field(None, description="Custom PROFILE.md content") bootstrap_md: Optional[str] = Field(None, description="Custom BOOTSTRAP.md content") model_name: Optional[str] = Field(None, description="Override default LLM model") skills: Optional[List[str]] = Field(None, description="List of skill IDs to enable") tags: Optional[List[str]] = Field(None, description="Classification tags") class CloneAnalystRequest(BaseModel): """Request model for cloning an analyst.""" source_id: str = Field(..., description="ID of the analyst to clone") new_id: str = Field(..., description="Unique identifier for the new analyst") name: Optional[str] = Field(None, description="New display name") focus_additions: Optional[List[str]] = Field(None, description="Additional focus areas") description_override: Optional[str] = Field(None, description="New description") model_name: Optional[str] = Field(None, description="Override model from source") class RegisterTypeRequest(BaseModel): """Request model for registering a new analyst type.""" type_id: str = Field(..., description="Unique identifier for this type") name: str = Field(..., description="Display name") focus: List[str] = Field(..., description="List of focus areas") description: str = Field(..., description="Detailed description") preferred_tools: Optional[List[str]] = Field(None, description="Preferred tool categories") class AnalystResponse(BaseModel): """Response model for analyst operations.""" success: bool agent_id: Optional[str] = None message: str error: Optional[str] = None class AnalystTypeResponse(BaseModel): """Response model for analyst type information.""" type_id: str name: str description: str is_builtin: bool source: str class AnalystInfoResponse(BaseModel): """Response model for detailed analyst information.""" found: bool agent_id: str config: Optional[Dict[str, Any]] = None is_custom: bool = False is_clone: bool = False parent_id: Optional[str] = None message: Optional[str] = None class TeamSummaryResponse(BaseModel): """Response model for team summary.""" total_analysts: int custom_analysts: int cloned_analysts: int analysts: List[Dict[str, Any]] registered_types: int # Helper function to get the current pipeline instance def _get_pipeline(run_id: str) -> Optional[Any]: """Get the TradingPipeline instance for a run. Args: run_id: The run configuration ID Returns: TradingPipeline instance or None if not found """ # Import here to avoid circular imports try: from backend.apps.runtime_service import get_runtime_state runtime_state = get_runtime_state() if runtime_state and hasattr(runtime_state, 'pipeline'): return runtime_state.pipeline except Exception as e: logger.warning(f"Could not get pipeline for run {run_id}: {e}") return None def _get_controller(run_id: str) -> Optional[Any]: """Get the DynamicTeamController for a run. Args: run_id: The run configuration ID Returns: DynamicTeamController instance or None if not available """ try: from backend.tools.dynamic_team_tools import get_controller return get_controller() except Exception as e: logger.warning(f"Could not get controller for run {run_id}: {e}") return None # API Endpoints @router.get("/types", response_model=List[AnalystTypeResponse]) async def list_analyst_types() -> List[AnalystTypeResponse]: """List all available analyst types. Returns both built-in types (from ANALYST_TYPES) and runtime-registered types. """ result = [] # Add built-in types for type_id, info in ANALYST_TYPES.items(): result.append(AnalystTypeResponse( type_id=type_id, name=info.get("display_name", type_id), description=info.get("description", ""), is_builtin=True, source="constants", )) # Try to get runtime registered types controller = _get_controller("default") if controller: for type_id, persona in controller._registered_types.items(): result.append(AnalystTypeResponse( type_id=type_id, name=persona.name, description=persona.description, is_builtin=False, source="runtime", )) return result @router.get("/personas") async def get_personas() -> Dict[str, Any]: """Get all analyst personas from personas.yaml. Returns the persona definitions used for analyst initialization. """ try: personas = get_prompt_loader().load_yaml_config("analyst", "personas") return {"success": True, "personas": personas} except Exception as e: logger.error(f"Failed to load personas: {e}") raise HTTPException(status_code=500, detail=f"Failed to load personas: {e}") @router.post("/runs/{run_id}/analysts", response_model=AnalystResponse) async def create_analyst( run_id: str, request: CreateAnalystRequest, ) -> AnalystResponse: """Create a new analyst in the specified run. Args: run_id: The run configuration ID request: Analyst creation configuration Returns: Result of the creation operation """ controller = _get_controller(run_id) if not controller: raise HTTPException( status_code=503, detail="Dynamic team controller not available. Is the pipeline running?" ) # Build persona if provided persona = None if request.persona: persona = AnalystPersona( name=request.persona.name, focus=request.persona.focus, description=request.persona.description, preferred_tools=request.persona.preferred_tools, icon=request.persona.icon, ) # Build config config = AnalystConfig( persona=persona, analyst_type=request.analyst_type if request.analyst_type in ANALYST_TYPES else None, soul_md=request.soul_md, agents_md=request.agents_md, profile_md=request.profile_md, bootstrap_md=request.bootstrap_md, model_name=request.model_name, skills=request.skills or [], tags=request.tags or [], ) # Create the analyst result = controller.create_analyst( agent_id=request.agent_id, analyst_type=request.analyst_type, name=persona.name if persona else None, focus=persona.focus if persona else None, description=persona.description if persona else None, soul_md=config.soul_md, agents_md=config.agents_md, model_name=config.model_name, ) return AnalystResponse(**result) @router.post("/runs/{run_id}/analysts/clone", response_model=AnalystResponse) async def clone_analyst( run_id: str, request: CloneAnalystRequest, ) -> AnalystResponse: """Clone an existing analyst. Args: run_id: The run configuration ID request: Clone configuration Returns: Result of the clone operation """ controller = _get_controller(run_id) if not controller: raise HTTPException( status_code=503, detail="Dynamic team controller not available. Is the pipeline running?" ) result = controller.clone_analyst( source_id=request.source_id, new_id=request.new_id, name=request.name, focus_additions=request.focus_additions, description_override=request.description_override, model_name=request.model_name, ) return AnalystResponse(**result) @router.delete("/runs/{run_id}/analysts/{agent_id}", response_model=AnalystResponse) async def remove_analyst(run_id: str, agent_id: str) -> AnalystResponse: """Remove a dynamically created analyst. Args: run_id: The run configuration ID agent_id: The analyst to remove Returns: Result of the removal operation """ controller = _get_controller(run_id) if not controller: raise HTTPException( status_code=503, detail="Dynamic team controller not available. Is the pipeline running?" ) result = controller.remove_analyst(agent_id) return AnalystResponse(**result) @router.get("/runs/{run_id}/analysts/{agent_id}", response_model=AnalystInfoResponse) async def get_analyst_info(run_id: str, agent_id: str) -> AnalystInfoResponse: """Get information about a specific analyst. Args: run_id: The run configuration ID agent_id: The analyst ID Returns: Analyst configuration and status """ controller = _get_controller(run_id) if not controller: raise HTTPException( status_code=503, detail="Dynamic team controller not available. Is the pipeline running?" ) result = controller.get_analyst_info(agent_id) return AnalystInfoResponse(**result) @router.get("/runs/{run_id}/summary", response_model=TeamSummaryResponse) async def get_team_summary(run_id: str) -> TeamSummaryResponse: """Get a summary of the current analyst team. Args: run_id: The run configuration ID Returns: Team composition information """ controller = _get_controller(run_id) if not controller: raise HTTPException( status_code=503, detail="Dynamic team controller not available. Is the pipeline running?" ) result = controller.get_team_summary() return TeamSummaryResponse(**result) @router.post("/runs/{run_id}/types", response_model=AnalystTypeResponse) async def register_analyst_type( run_id: str, request: RegisterTypeRequest, ) -> AnalystTypeResponse: """Register a new analyst type. Args: run_id: The run configuration ID request: Type registration configuration Returns: Registered type information """ controller = _get_controller(run_id) if not controller: raise HTTPException( status_code=503, detail="Dynamic team controller not available. Is the pipeline running?" ) result = controller.register_analyst_type( type_id=request.type_id, name=request.name, focus=request.focus, description=request.description, preferred_tools=request.preferred_tools, ) if not result.get("success", False): raise HTTPException(status_code=400, detail=result.get("message", "Registration failed")) return AnalystTypeResponse( type_id=request.type_id, name=request.name, description=request.description, is_builtin=False, source="runtime", )