From 38102d08053413241615eabaca28821ba94700c7 Mon Sep 17 00:00:00 2001 From: cillin Date: Mon, 23 Mar 2026 17:46:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(backend):=20Agent=20=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E5=92=8C=20API=20=E5=A2=9E=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - task_delegator: 完善团队任务分发逻辑 - runtime API: 增强运行时管理功能 - skills_manager: 技能管理改进 - tool_guard: 工具调用守卫优化 - evo_agent: 核心 Agent 改进 Co-Authored-By: Claude Opus 4.6 --- backend/agents/base/evo_agent.py | 4 +- backend/agents/base/tool_guard.py | 186 +++++++------- backend/agents/skills_manager.py | 29 ++- backend/agents/team/task_delegator.py | 341 +++++++++++++++++++++++--- backend/api/runtime.py | 330 ++++++++++++++++++++++--- 5 files changed, 725 insertions(+), 165 deletions(-) diff --git a/backend/agents/base/evo_agent.py b/backend/agents/base/evo_agent.py index 5bd0e4d..9058429 100644 --- a/backend/agents/base/evo_agent.py +++ b/backend/agents/base/evo_agent.py @@ -470,7 +470,7 @@ class EvoAgent(ToolGuardMixin, ReActAgent): """ return self._messenger - def delegate_task( + async def delegate_task( self, task_type: str, task_data: Dict[str, Any], @@ -493,7 +493,7 @@ class EvoAgent(ToolGuardMixin, ReActAgent): } try: - return self._task_delegator.delegate_task( + return await self._task_delegator.delegate_task( task_type=task_type, task_data=task_data, target_agent=target_agent, diff --git a/backend/agents/base/tool_guard.py b/backend/agents/base/tool_guard.py index 7391073..24f038a 100644 --- a/backend/agents/base/tool_guard.py +++ b/backend/agents/base/tool_guard.py @@ -289,6 +289,7 @@ class ToolGuardMixin: self._approval_timeout = approval_timeout self._pending_approval: Optional[ToolApprovalRequest] = None self._approval_callback: Optional[Callable[[ToolApprovalRequest], None]] = None + self._approval_lock = asyncio.Lock() def set_approval_callback( self, @@ -383,73 +384,80 @@ class ToolGuardMixin: Returns: True if approved, False otherwise """ - record = TOOL_GUARD_STORE.create_pending( - tool_name=tool_name, - tool_input=tool_input, - agent_id=getattr(self, "agent_id", "unknown"), - workspace_id=getattr(self, "workspace_id", "default"), - session_id=getattr(self, "session_id", None), - findings=default_findings_for_tool(tool_name), - ) - - manager = get_global_runtime_manager() - if manager: - manager.register_pending_approval( - record.approval_id, - { - "tool_name": record.tool_name, - "agent_id": record.agent_id, - "workspace_id": record.workspace_id, - "session_id": record.session_id, - "tool_input": record.tool_input, - }, + async with self._approval_lock: + record = TOOL_GUARD_STORE.create_pending( + tool_name=tool_name, + tool_input=tool_input, + agent_id=getattr(self, "agent_id", "unknown"), + workspace_id=getattr(self, "workspace_id", "default"), + session_id=getattr(self, "session_id", None), + findings=default_findings_for_tool(tool_name), ) - self._pending_approval = ToolApprovalRequest( - approval_id=record.approval_id, - tool_name=tool_name, - tool_input=tool_input, - tool_call_id=tool_call_id, - session_id=getattr(self, "session_id", None), - ) - record.pending_request = self._pending_approval + manager = get_global_runtime_manager() + if manager: + manager.register_pending_approval( + record.approval_id, + { + "tool_name": record.tool_name, + "agent_id": record.agent_id, + "workspace_id": record.workspace_id, + "session_id": record.session_id, + "tool_input": record.tool_input, + }, + ) - # Notify via callback if set - if self._approval_callback: - self._approval_callback(self._pending_approval) + self._pending_approval = ToolApprovalRequest( + approval_id=record.approval_id, + tool_name=tool_name, + tool_input=tool_input, + tool_call_id=tool_call_id, + session_id=getattr(self, "session_id", None), + ) + record.pending_request = self._pending_approval - # Wait for approval - approval_request = self._pending_approval + # Notify via callback if set + if self._approval_callback: + self._approval_callback(self._pending_approval) + + # Wait for approval (lock is released during wait, re-acquired after) + approval_request = self._pending_approval + + # Wait for approval outside the lock to allow concurrent approval approved = await approval_request.wait_for_approval( timeout=self._approval_timeout ) - if approval_request: - status = ( - ApprovalStatus.APPROVED - if approval_request.approved is True - else ApprovalStatus.DENIED - if approval_request.approved is False - else ApprovalStatus.EXPIRED - ) - TOOL_GUARD_STORE.set_status( - approval_request.approval_id, - status, - resolved_by="agent", - notify_request=False, - ) - manager = get_global_runtime_manager() - if manager: - manager.resolve_pending_approval( - approval_request.approval_id, - resolved_by="agent", - status=status.value, + async with self._approval_lock: + if approval_request: + status = ( + ApprovalStatus.APPROVED + if approval_request.approved is True + else ApprovalStatus.DENIED + if approval_request.approved is False + else ApprovalStatus.EXPIRED ) + TOOL_GUARD_STORE.set_status( + approval_request.approval_id, + status, + resolved_by="agent", + notify_request=False, + ) + manager = get_global_runtime_manager() + if manager: + manager.resolve_pending_approval( + approval_request.approval_id, + resolved_by="agent", + status=status.value, + ) + + # Only clear if this is still the same request + if self._pending_approval is approval_request: + self._pending_approval = None - self._pending_approval = None return approved - def approve_guard_call(self, request_id: Optional[str] = None) -> bool: + async def approve_guard_call(self, request_id: Optional[str] = None) -> bool: """Approve a pending guard request. This method is called externally to approve a tool call @@ -461,28 +469,29 @@ class ToolGuardMixin: Returns: True if a request was approved, False if no pending request """ - if self._pending_approval is None: - logger.warning("No pending approval request to approve") - return False + async with self._approval_lock: + if self._pending_approval is None: + logger.warning("No pending approval request to approve") + return False - TOOL_GUARD_STORE.set_status( - self._pending_approval.approval_id, - ApprovalStatus.APPROVED, - resolved_by="agent", - notify_request=False, - ) - manager = get_global_runtime_manager() - if manager: - manager.resolve_pending_approval( + TOOL_GUARD_STORE.set_status( self._pending_approval.approval_id, + ApprovalStatus.APPROVED, resolved_by="agent", - status=ApprovalStatus.APPROVED.value, + notify_request=False, ) - self._pending_approval.approve() - logger.info("Approved tool call: %s", self._pending_approval.tool_name) - return True + manager = get_global_runtime_manager() + if manager: + manager.resolve_pending_approval( + self._pending_approval.approval_id, + resolved_by="agent", + status=ApprovalStatus.APPROVED.value, + ) + self._pending_approval.approve() + logger.info("Approved tool call: %s", self._pending_approval.tool_name) + return True - def deny_guard_call(self, request_id: Optional[str] = None) -> bool: + async def deny_guard_call(self, request_id: Optional[str] = None) -> bool: """Deny a pending guard request. This method is called externally to deny a tool call @@ -494,26 +503,27 @@ class ToolGuardMixin: Returns: True if a request was denied, False if no pending request """ - if self._pending_approval is None: - logger.warning("No pending approval request to deny") - return False + async with self._approval_lock: + if self._pending_approval is None: + logger.warning("No pending approval request to deny") + return False - TOOL_GUARD_STORE.set_status( - self._pending_approval.approval_id, - ApprovalStatus.DENIED, - resolved_by="agent", - notify_request=False, - ) - manager = get_global_runtime_manager() - if manager: - manager.resolve_pending_approval( + TOOL_GUARD_STORE.set_status( self._pending_approval.approval_id, + ApprovalStatus.DENIED, resolved_by="agent", - status=ApprovalStatus.DENIED.value, + notify_request=False, ) - self._pending_approval.deny() - logger.info("Denied tool call: %s", self._pending_approval.tool_name) - return True + manager = get_global_runtime_manager() + if manager: + manager.resolve_pending_approval( + self._pending_approval.approval_id, + resolved_by="agent", + status=ApprovalStatus.DENIED.value, + ) + self._pending_approval.deny() + logger.info("Denied tool call: %s", self._pending_approval.tool_name) + return True async def _acting(self, tool_call) -> dict | None: """Intercept sensitive tool calls before execution. diff --git a/backend/agents/skills_manager.py b/backend/agents/skills_manager.py index e922400..9271a90 100644 --- a/backend/agents/skills_manager.py +++ b/backend/agents/skills_manager.py @@ -5,6 +5,7 @@ from pathlib import Path import shutil import tempfile import zipfile +from threading import Lock from typing import Any, Dict, Iterable, Iterator, List, Optional, Set from urllib.parse import urlparse from urllib.request import urlretrieve @@ -39,6 +40,7 @@ class SkillsManager: self.project_root / "backend" / "skills" / "customized" ) self.runs_root = self.project_root / "runs" + self._lock = Lock() def get_active_root(self, config_name: str) -> Path: return self.runs_root / config_name / "skills" / "active" @@ -737,7 +739,7 @@ class SkillsManager: if local_root.exists(): watched_paths.append(local_root) - handler = _SkillsChangeHandler(watched_paths, callback) + handler = _SkillsChangeHandler(watched_paths, callback, self._lock) observer = Observer() for path in watched_paths: observer.schedule(handler, str(path), recursive=True) @@ -759,11 +761,13 @@ class SkillsManager: Map of agent_id -> list of reloaded skill paths, or empty dict if no changes were detected. """ - changed = self._pending_skill_changes.get(config_name) - if not changed: - return {} + with self._lock: + changed = self._pending_skill_changes.get(config_name) + if not changed: + return {} + + self._pending_skill_changes[config_name] = set() - self._pending_skill_changes[config_name] = set() return self.prepare_active_skills(config_name, agent_defaults) # ------------------------------------------------------------------------- @@ -821,10 +825,12 @@ class _SkillsChangeHandler(FileSystemEventHandler): self, watched_paths: List[Path], callback: Optional[Any] = None, + lock: Optional[Lock] = None, ) -> None: super().__init__() self._watched_paths = watched_paths self._callback = callback + self._lock = lock def on_any_event(self, event: FileSystemEvent) -> None: if event.is_directory: @@ -832,9 +838,16 @@ class _SkillsChangeHandler(FileSystemEventHandler): src_path = Path(event.src_path) for watched in self._watched_paths: if src_path.is_relative_to(watched): - SkillsManager._pending_skill_changes.setdefault( - self._run_id_from_path(src_path), set() - ).add(src_path) + run_id = self._run_id_from_path(src_path) + if self._lock: + with self._lock: + SkillsManager._pending_skill_changes.setdefault( + run_id, set() + ).add(src_path) + else: + SkillsManager._pending_skill_changes.setdefault( + run_id, set() + ).add(src_path) if self._callback: self._callback([src_path]) break diff --git a/backend/agents/team/task_delegator.py b/backend/agents/team/task_delegator.py index 184c50b..5c16bfb 100644 --- a/backend/agents/team/task_delegator.py +++ b/backend/agents/team/task_delegator.py @@ -17,6 +17,9 @@ from agentscope.message import Msg logger = logging.getLogger(__name__) +# Default timeout for subagent execution (seconds) +DEFAULT_EXECUTION_TIMEOUT = 120.0 + # Type alias for subagent specification SubagentSpec = Dict[str, Any] @@ -56,19 +59,26 @@ class TaskDelegator: } """ - def __init__(self, messenger: Any, registry: Any): + def __init__(self, agent: Any): """Initialize TaskDelegator. Args: - messenger: AgentMessenger for communication - registry: AgentRegistry for agent lookup + agent: Parent EvoAgent instance for accessing model, formatter, workspace """ - self._messenger = messenger - self._registry = registry + self._agent = agent + # Get messenger from parent agent if available + self._messenger = getattr(agent, "messenger", None) + self._registry = getattr(agent, "_registry", None) self._subagents: Dict[str, Any] = {} self._dynamic_subagents: Dict[str, SubagentSpec] = {} self._tasks: Dict[str, asyncio.Task] = {} + # Extract model and formatter from parent agent + self._model = getattr(agent, "model", None) + self._formatter = getattr(agent, "formatter", None) + self._workspace_dir = getattr(agent, "workspace_dir", None) + self._config_name = getattr(agent, "config_name", None) + async def delegate( self, agent_id: str, @@ -187,7 +197,7 @@ class TaskDelegator: """Get copy of active tasks dict.""" return dict(self._tasks) - def delegate_task( + async def delegate_task( self, task_type: str, task_data: Dict[str, Any], @@ -239,8 +249,8 @@ class TaskDelegator: else: effective_target = "default" - # Execute the task - task_result = self._execute_task( + # Execute the task (async) + task_result = await self._execute_task( task_type=task_type, task_data=task_data, target_agent=effective_target, @@ -263,13 +273,13 @@ class TaskDelegator: "error": str(e), } - def _execute_task( + async def _execute_task( self, task_type: str, task_data: Dict[str, Any], target_agent: str, - ) -> Any: - """Execute the delegated task. + ) -> Dict[str, Any]: + """Execute the delegated task with a real subagent. Args: task_type: Type of task @@ -277,48 +287,315 @@ class TaskDelegator: target_agent: Target agent identifier Returns: - Task execution result + Task execution result with success/failure info """ task_content = task_data.get("task", task_data.get("prompt", "")) + timeout = task_data.get("timeout", DEFAULT_EXECUTION_TIMEOUT) # Check if we have a dynamic subagent spec for this target agent_spec = self._dynamic_subagents.get(target_agent) if agent_spec: logger.info( - "Executing task '%s' with dynamic subagent '%s' (prompt: %s)", + "Executing task '%s' with dynamic subagent '%s'", task_type, target_agent, - agent_spec.get("prompt", "")[:50], ) - # In a full implementation, this would create and run an actual agent - # For now, return a structured result indicating the task was received + return await self._create_and_run_subagent( + agent_name=target_agent, + agent_spec=agent_spec, + task_content=task_content, + task_type=task_type, + timeout=timeout, + ) + + # Fallback: try to use parent agent's model to process the task directly + logger.info( + "Executing task '%s' with parent agent '%s' (no dynamic subagent)", + task_type, + target_agent, + ) + return await self._run_with_parent_agent( + task_content=task_content, + task_type=task_type, + timeout=timeout, + ) + + async def _create_and_run_subagent( + self, + agent_name: str, + agent_spec: SubagentSpec, + task_content: str, + task_type: str, + timeout: float, + ) -> Dict[str, Any]: + """Create and run a dynamic subagent. + + Args: + agent_name: Name identifier for the subagent + agent_spec: Subagent specification (description, prompt, tools, model) + task_content: Task prompt to send to the subagent + task_type: Type of task + timeout: Execution timeout in seconds + + Returns: + Dict with execution results + """ + subagent_id = f"subagent_{agent_name}_{uuid.uuid4().hex[:8]}" + + try: + # Create subagent instance + subagent = await self._create_subagent( + subagent_id=subagent_id, + agent_spec=agent_spec, + ) + + if subagent is None: + return { + "task_type": task_type, + "task": task_content, + "subagent": agent_name, + "status": "failed", + "error": "Failed to create subagent", + "message": f"Could not instantiate subagent '{agent_name}'", + } + + # Store for potential cleanup + self._subagents[subagent_id] = subagent + + # Execute with timeout + result = await asyncio.wait_for( + self._run_subagent(subagent, task_content), + timeout=timeout, + ) + + # Extract response content + response_content = "" + if isinstance(result, Msg): + response_content = result.content + elif hasattr(result, "content"): + response_content = str(result.content) + elif isinstance(result, dict): + response_content = result.get("content", str(result)) + else: + response_content = str(result) + + logger.info( + "Subagent '%s' completed task '%s' successfully", + agent_name, + task_type, + ) + return { "task_type": task_type, "task": task_content, "subagent": { - "name": target_agent, + "name": agent_name, + "id": subagent_id, "description": agent_spec.get("description", ""), - "prompt": agent_spec.get("prompt", ""), - "tools": agent_spec.get("tools", []), }, "status": "completed", - "message": f"Task '{task_type}' executed with dynamic subagent '{target_agent}'", + "response": response_content, + "message": f"Task '{task_type}' executed with subagent '{agent_name}'", } - # Fallback: execute with default behavior - logger.info( - "Executing task '%s' with default agent '%s'", - task_type, - target_agent, + except asyncio.TimeoutError: + logger.warning( + "Subagent '%s' timed out after %.1f seconds for task '%s'", + agent_name, + timeout, + task_type, + ) + # Cancel the task if still running + if subagent_id in self._subagents: + self._subagents.pop(subagent_id, None) + return { + "task_type": task_type, + "task": task_content, + "subagent": agent_name, + "status": "timeout", + "error": f"Execution timed out after {timeout} seconds", + "message": f"Task '{task_type}' timed out for subagent '{agent_name}'", + } + + except Exception as e: + logger.error( + "Subagent '%s' failed for task '%s': %s", + agent_name, + task_type, + e, + exc_info=True, + ) + # Cleanup on failure + if subagent_id in self._subagents: + self._subagents.pop(subagent_id, None) + return { + "task_type": task_type, + "task": task_content, + "subagent": agent_name, + "status": "error", + "error": str(e), + "message": f"Task '{task_type}' failed for subagent '{agent_name}': {e}", + } + + async def _create_subagent( + self, + subagent_id: str, + agent_spec: SubagentSpec, + ) -> Optional[Any]: + """Create a subagent instance. + + Uses the parent agent's model/formatter to create a lightweight + subagent for task execution. + + Args: + subagent_id: Unique identifier for the subagent + agent_spec: Subagent specification + + Returns: + Subagent instance or None if creation fails + """ + try: + # Import here to avoid circular imports + from agentscope.memory import InMemoryMemory + + # Get model and formatter from parent + model = self._model + formatter = self._formatter + + if model is None: + logger.error("Cannot create subagent: parent agent has no model") + return None + + # Build system prompt from agent spec + description = agent_spec.get("description", "") + prompt_template = agent_spec.get("prompt", "") + system_prompt = f"""You are {description} + +{prompt_template} + +Your task is to complete the user's request below. +""" + + # Create a minimal ReActAgent as the subagent + from agentscope.agent import ReActAgent + + subagent = ReActAgent( + name=subagent_id, + model=model, + sys_prompt=system_prompt, + toolkit=None, # Could load tools from agent_spec.get("tools", []) + memory=InMemoryMemory(), + formatter=formatter, + max_iters=agent_spec.get("max_iters", 5), + ) + + logger.debug("Created subagent: %s", subagent_id) + return subagent + + except Exception as e: + logger.error( + "Failed to create subagent '%s': %s", + subagent_id, + e, + exc_info=True, + ) + return None + + async def _run_subagent( + self, + subagent: Any, + task_content: str, + ) -> Any: + """Run a subagent with the given task. + + Args: + subagent: Subagent instance + task_content: Task prompt + + Returns: + Agent response (Msg or similar) + """ + from agentscope.message import Msg + + # Create message for the subagent + task_msg = Msg( + name="user", + content=task_content, + role="user", ) - return { - "task_type": task_type, - "task": task_content, - "target_agent": target_agent, - "status": "completed", - "message": f"Task '{task_type}' executed with agent '{target_agent}'", - } + + # Execute the agent + response = await subagent.reply(task_msg) + return response + + async def _run_with_parent_agent( + self, + task_content: str, + task_type: str, + timeout: float, + ) -> Dict[str, Any]: + """Run task using the parent agent directly. + + Used when no dynamic subagent is defined. + + Args: + task_content: Task prompt + task_type: Type of task + timeout: Execution timeout + + Returns: + Dict with execution results + """ + try: + result = await asyncio.wait_for( + self._agent.reply(Msg( + name="user", + content=task_content, + role="user", + )), + timeout=timeout, + ) + + response_content = "" + if isinstance(result, Msg): + response_content = result.content + elif hasattr(result, "content"): + response_content = str(result.content) + else: + response_content = str(result) + + return { + "task_type": task_type, + "task": task_content, + "status": "completed", + "response": response_content, + "message": f"Task '{task_type}' executed with parent agent", + } + + except asyncio.TimeoutError: + return { + "task_type": task_type, + "task": task_content, + "status": "timeout", + "error": f"Execution timed out after {timeout} seconds", + "message": f"Task '{task_type}' timed out", + } + + except Exception as e: + logger.error( + "Parent agent failed for task '%s': %s", + task_type, + e, + exc_info=True, + ) + return { + "task_type": task_type, + "task": task_content, + "status": "error", + "error": str(e), + "message": f"Task '{task_type}' failed: {e}", + } def get_dynamic_subagent(self, name: str) -> Optional[SubagentSpec]: """Get a dynamically defined subagent specification. diff --git a/backend/api/runtime.py b/backend/api/runtime.py index 6f2b7a2..61d94f7 100644 --- a/backend/api/runtime.py +++ b/backend/api/runtime.py @@ -16,20 +16,123 @@ from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) -from fastapi import APIRouter, HTTPException, BackgroundTasks +from fastapi import APIRouter, BackgroundTasks, HTTPException, Request from pydantic import BaseModel, Field from backend.runtime.agent_runtime import AgentRuntimeState -from backend.runtime.manager import TradingRuntimeManager, get_global_runtime_manager +from backend.config.bootstrap_config import ( + resolve_runtime_config, + update_bootstrap_values_for_run, +) router = APIRouter(prefix="/api/runtime", tags=["runtime"]) -runtime_manager: Optional[TradingRuntimeManager] = None PROJECT_ROOT = Path(__file__).resolve().parents[2] -# Gateway process management -_gateway_process: Optional[subprocess.Popen] = None -_gateway_port: int = 8765 + +class RuntimeState: + """Thread-safe singleton for managing runtime state. + + Encapsulates runtime_manager, _gateway_process, and _gateway_port + with asyncio.Lock protection for concurrent access. + """ + + _instance: Optional["RuntimeState"] = None + _lock: asyncio.Lock = asyncio.Lock() + + def __new__(cls) -> "RuntimeState": + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + self._runtime_manager: Optional[Any] = None + self._gateway_process: Optional[subprocess.Popen] = None + self._gateway_port: int = 8765 + self._state_lock = asyncio.Lock() + self._initialized = True + + @property + async def lock(self) -> asyncio.Lock: + """Get the asyncio lock for state synchronization.""" + return self._state_lock + + @property + def runtime_manager(self) -> Optional[Any]: + """Get the runtime manager (no lock - read only).""" + return self._runtime_manager + + @runtime_manager.setter + def runtime_manager(self, value: Optional[Any]) -> None: + """Set the runtime manager.""" + self._runtime_manager = value + + @property + def gateway_process(self) -> Optional[subprocess.Popen]: + """Get the gateway process (no lock - read only).""" + return self._gateway_process + + @gateway_process.setter + def gateway_process(self, value: Optional[subprocess.Popen]) -> None: + """Set the gateway process.""" + self._gateway_process = value + + @property + def gateway_port(self) -> int: + """Get the gateway port.""" + return self._gateway_port + + @gateway_port.setter + def gateway_port(self, value: int) -> None: + """Set the gateway port.""" + self._gateway_port = value + + async def set_runtime_manager(self, manager: Any) -> None: + """Set runtime manager with lock protection.""" + async with self._state_lock: + self._runtime_manager = manager + + async def get_runtime_manager(self) -> Optional[Any]: + """Get runtime manager with lock protection.""" + async with self._state_lock: + return self._runtime_manager + + async def set_gateway_process(self, process: Optional[subprocess.Popen]) -> None: + """Set gateway process with lock protection.""" + async with self._state_lock: + self._gateway_process = process + + async def get_gateway_process(self) -> Optional[subprocess.Popen]: + """Get gateway process with lock protection.""" + async with self._state_lock: + return self._gateway_process + + async def set_gateway_port(self, port: int) -> None: + """Set gateway port with lock protection.""" + async with self._state_lock: + self._gateway_port = port + + async def get_gateway_port(self) -> int: + """Get gateway port with lock protection.""" + async with self._state_lock: + return self._gateway_port + + +# Singleton instance +_runtime_state = RuntimeState() + + +def get_runtime_state() -> RuntimeState: + """Get the RuntimeState singleton instance.""" + return _runtime_state + + +# Backward compatibility: module-level runtime_manager for external imports +# This is set by register_runtime_manager() for backward compatibility +runtime_manager: Optional[Any] = None class RunContextResponse(BaseModel): @@ -96,6 +199,24 @@ class GatewayStatusResponse(BaseModel): run_id: Optional[str] = None +class RuntimeConfigResponse(BaseModel): + run_id: str + is_running: bool + gateway_port: int + bootstrap: Dict[str, Any] + resolved: Dict[str, Any] + + +class UpdateRuntimeConfigRequest(BaseModel): + schedule_mode: Optional[str] = None + interval_minutes: Optional[int] = Field(default=None, ge=1) + trigger_time: Optional[str] = None + max_comm_cycles: Optional[int] = Field(default=None, ge=1) + initial_cash: Optional[float] = Field(default=None, gt=0) + margin_requirement: Optional[float] = Field(default=None, ge=0) + enable_memory: Optional[bool] = None + + def _generate_run_id() -> str: """Generate timestamp-based run ID: YYYYMMDD_HHMMSS""" return datetime.now().strftime("%Y%m%d_%H%M%S") @@ -118,31 +239,31 @@ def _find_available_port(start_port: int = 8765, max_port: int = 9000) -> int: def _is_gateway_running() -> bool: """Check if Gateway process is running.""" - global _gateway_process - if _gateway_process is None: + process = _runtime_state.gateway_process + if process is None: return False - return _gateway_process.poll() is None + return process.poll() is None def _stop_gateway() -> bool: """Stop the Gateway process.""" - global _gateway_process - if _gateway_process is None: + process = _runtime_state.gateway_process + if process is None: return False try: # Try graceful shutdown first - _gateway_process.terminate() + process.terminate() try: - _gateway_process.wait(timeout=5) + process.wait(timeout=5) except subprocess.TimeoutExpired: # Force kill if graceful shutdown fails - _gateway_process.kill() - _gateway_process.wait() + process.kill() + process.wait() except Exception as e: logger.warning(f"Error during gateway shutdown: {e}") finally: - _gateway_process = None + _runtime_state.gateway_process = None return True @@ -237,8 +358,6 @@ async def get_runtime_events() -> RuntimeEventsResponse: @router.get("/gateway/status", response_model=GatewayStatusResponse) async def get_gateway_status() -> GatewayStatusResponse: """Get Gateway process status and port.""" - global _gateway_port - is_running = _is_gateway_running() run_id = None @@ -255,22 +374,128 @@ async def get_gateway_status() -> GatewayStatusResponse: return GatewayStatusResponse( is_running=is_running, - port=_gateway_port, + port=_runtime_state.gateway_port, run_id=run_id ) @router.get("/gateway/port") -async def get_gateway_port() -> Dict[str, Any]: +async def get_gateway_port(request: Request) -> Dict[str, Any]: """Get WebSocket Gateway port for frontend connection.""" - global _gateway_port + gateway_port = _runtime_state.gateway_port return { - "port": _gateway_port, + "port": gateway_port, "is_running": _is_gateway_running(), - "ws_url": f"ws://localhost:{_gateway_port}" + "ws_url": _build_gateway_ws_url(request, gateway_port), } +def _build_gateway_ws_url(request: Request, port: int) -> str: + """Build a proxy-safe Gateway WebSocket URL.""" + forwarded_proto = request.headers.get("x-forwarded-proto", "").split(",")[0].strip() + scheme = forwarded_proto or request.url.scheme + ws_scheme = "wss" if scheme == "https" else "ws" + + forwarded_host = request.headers.get("x-forwarded-host", "").split(",")[0].strip() + host = forwarded_host or request.url.hostname or "localhost" + if ":" in host and not host.startswith("["): + host = host.split(":", 1)[0] + + return f"{ws_scheme}://{host}:{port}" + + +def _load_latest_runtime_snapshot() -> Dict[str, Any]: + """Load the latest persisted runtime snapshot.""" + snapshots = sorted( + PROJECT_ROOT.glob("runs/*/state/runtime_state.json"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + if not snapshots: + raise HTTPException(status_code=404, detail="No runtime information available") + return json.loads(snapshots[0].read_text(encoding="utf-8")) + + +def _get_current_runtime_context() -> Dict[str, Any]: + """Return the active runtime context from the latest snapshot.""" + if not _is_gateway_running(): + raise HTTPException(status_code=404, detail="No runtime is currently running") + latest = _load_latest_runtime_snapshot() + context = latest.get("context") or {} + if not context.get("config_name"): + raise HTTPException(status_code=404, detail="No runtime context available") + return context + + +def _resolve_runtime_response(run_id: str) -> RuntimeConfigResponse: + """Build a normalized runtime config response for the active run.""" + context = _get_current_runtime_context() + bootstrap = dict(context.get("bootstrap_values") or {}) + resolved = resolve_runtime_config( + project_root=PROJECT_ROOT, + config_name=run_id, + enable_memory=bool(bootstrap.get("enable_memory", False)), + schedule_mode=str(bootstrap.get("schedule_mode", "daily")), + interval_minutes=int(bootstrap.get("interval_minutes", 60) or 60), + trigger_time=str(bootstrap.get("trigger_time", "09:30") or "09:30"), + ) + return RuntimeConfigResponse( + run_id=run_id, + is_running=True, + gateway_port=_runtime_state.gateway_port, + bootstrap=bootstrap, + resolved=resolved, + ) + + +def _normalize_runtime_config_updates( + request: UpdateRuntimeConfigRequest, +) -> Dict[str, Any]: + """Validate and normalize runtime config updates.""" + updates: Dict[str, Any] = {} + + if request.schedule_mode is not None: + schedule_mode = str(request.schedule_mode).strip().lower() + if schedule_mode not in {"daily", "intraday"}: + raise HTTPException( + status_code=400, + detail="schedule_mode must be 'daily' or 'intraday'", + ) + updates["schedule_mode"] = schedule_mode + + if request.interval_minutes is not None: + updates["interval_minutes"] = int(request.interval_minutes) + + if request.trigger_time is not None: + trigger_time = str(request.trigger_time).strip() + if trigger_time and trigger_time != "now": + try: + datetime.strptime(trigger_time, "%H:%M") + except ValueError as exc: + raise HTTPException( + status_code=400, + detail="trigger_time must use HH:MM or 'now'", + ) from exc + updates["trigger_time"] = trigger_time or "09:30" + + if request.max_comm_cycles is not None: + updates["max_comm_cycles"] = int(request.max_comm_cycles) + + if request.initial_cash is not None: + updates["initial_cash"] = float(request.initial_cash) + + if request.margin_requirement is not None: + updates["margin_requirement"] = float(request.margin_requirement) + + if request.enable_memory is not None: + updates["enable_memory"] = bool(request.enable_memory) + + if not updates: + raise HTTPException(status_code=400, detail="No runtime config updates provided") + + return updates + + @router.post("/start", response_model=LaunchResponse) async def start_runtime( config: LaunchConfig, @@ -284,7 +509,8 @@ async def start_runtime( 4. Start Gateway as subprocess (Data Plane) 5. Return Gateway port for WebSocket connection """ - global _gateway_process, _gateway_port + # Lazy import to avoid circular dependency + from backend.runtime.manager import TradingRuntimeManager # 1. Stop existing Gateway if _is_gateway_running(): @@ -325,22 +551,24 @@ async def start_runtime( _write_bootstrap_md(run_dir, bootstrap) # 6. Find available port and start Gateway process - _gateway_port = _find_available_port(start_port=8765) + gateway_port = _find_available_port(start_port=8765) + _runtime_state.gateway_port = gateway_port try: - _gateway_process = _start_gateway_process( + process = _start_gateway_process( run_id=run_id, run_dir=run_dir, bootstrap=bootstrap, - port=_gateway_port + port=gateway_port ) + _runtime_state.gateway_process = process # Wait briefly to check if process started successfully await asyncio.sleep(2) if not _is_gateway_running(): - stdout, stderr = _gateway_process.communicate(timeout=1) - _gateway_process = None + stdout, stderr = process.communicate(timeout=1) + _runtime_state.gateway_process = None raise HTTPException( status_code=500, detail=f"Gateway failed to start: {stderr.decode() if stderr else 'Unknown error'}" @@ -354,16 +582,44 @@ async def start_runtime( run_id=run_id, status="started", run_dir=str(run_dir), - gateway_port=_gateway_port, - message=f"Runtime started with run_id: {run_id}, Gateway on port: {_gateway_port}", + gateway_port=gateway_port, + message=f"Runtime started with run_id: {run_id}, Gateway on port: {gateway_port}", ) +@router.get("/config", response_model=RuntimeConfigResponse) +async def get_runtime_config() -> RuntimeConfigResponse: + """Return the current runtime bootstrap and resolved settings.""" + context = _get_current_runtime_context() + return _resolve_runtime_response(context["config_name"]) + + +@router.put("/config", response_model=RuntimeConfigResponse) +async def update_runtime_config( + request: UpdateRuntimeConfigRequest, +) -> RuntimeConfigResponse: + """Persist selected runtime configuration updates for the active run.""" + context = _get_current_runtime_context() + run_id = context["config_name"] + updates = _normalize_runtime_config_updates(request) + updated = update_bootstrap_values_for_run(PROJECT_ROOT, run_id, updates) + + manager = _runtime_state.runtime_manager + if manager is not None and getattr(manager, "config_name", None) == run_id: + manager.bootstrap.update(updates) + if getattr(manager, "context", None) is not None: + manager.context.bootstrap_values.update(updates) + if hasattr(manager, "_persist_snapshot"): + manager._persist_snapshot() + + response = _resolve_runtime_response(run_id) + response.bootstrap = dict(updated.values) + return response + + @router.post("/stop", response_model=StopResponse) async def stop_runtime(force: bool = True) -> StopResponse: """Stop the current running runtime.""" - global _gateway_process - was_running = _is_gateway_running() if not was_running: @@ -421,21 +677,25 @@ async def get_current_runtime(): "run_id": context.get("config_name"), "run_dir": context.get("run_dir"), "is_running": True, - "gateway_port": _gateway_port, + "gateway_port": _runtime_state.gateway_port, "bootstrap": context.get("bootstrap_values", {}), } -def register_runtime_manager(manager: TradingRuntimeManager) -> None: +def register_runtime_manager(manager: Any) -> None: """Allow other modules to expose the runtime manager to the API.""" global runtime_manager runtime_manager = manager + # Also update the RuntimeState singleton for internal consistency + _runtime_state.runtime_manager = manager def unregister_runtime_manager() -> None: """Drop the runtime manager reference.""" global runtime_manager runtime_manager = None + # Also update the RuntimeState singleton for internal consistency + _runtime_state.runtime_manager = None def _write_bootstrap_md(run_dir: Path, bootstrap: Dict[str, Any]) -> None: