- task_delegator: 完善团队任务分发逻辑 - runtime API: 增强运行时管理功能 - skills_manager: 技能管理改进 - tool_guard: 工具调用守卫优化 - evo_agent: 核心 Agent 改进 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
621 lines
19 KiB
Python
621 lines
19 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""TaskDelegator - Subagent spawning and task delegation.
|
|
|
|
Provides delegate() and delegate_parallel() for spawning subagents
|
|
with separate context and memory. Supports runtime dynamic subagent
|
|
definition via task_data with description, prompt, and tools.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union
|
|
|
|
from agentscope.message import Msg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default timeout for subagent execution (seconds)
|
|
DEFAULT_EXECUTION_TIMEOUT = 120.0
|
|
|
|
|
|
# Type alias for subagent specification
|
|
SubagentSpec = Dict[str, Any]
|
|
"""Subagent specification format:
|
|
{
|
|
"description": "Expert code reviewer...",
|
|
"prompt": "Analyze code quality...",
|
|
"tools": ["Read", "Glob", "Grep"], # Optional: list of tool names
|
|
"model": "gpt-4o", # Optional: model name
|
|
}
|
|
"""
|
|
|
|
|
|
class TaskDelegator:
|
|
"""Delegates tasks to subagents with isolated context.
|
|
|
|
Supports:
|
|
- delegate(): Spawn single subagent for task
|
|
- delegate_parallel(): Spawn multiple subagents concurrently
|
|
- delegate_task(): Delegate with dynamic subagent definition from task_data
|
|
|
|
Each subagent gets its own memory/context to prevent
|
|
cross-contamination.
|
|
|
|
Dynamic Subagent Definition:
|
|
task_data can include an "agents" dict to define subagents inline:
|
|
|
|
task_data = {
|
|
"task": "Review the code changes",
|
|
"agents": {
|
|
"code-reviewer": {
|
|
"description": "Expert code reviewer for quality and security.",
|
|
"prompt": "Analyze code quality and suggest improvements.",
|
|
"tools": ["Read", "Glob", "Grep"],
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
def __init__(self, agent: Any):
|
|
"""Initialize TaskDelegator.
|
|
|
|
Args:
|
|
agent: Parent EvoAgent instance for accessing model, formatter, workspace
|
|
"""
|
|
self._agent = agent
|
|
# Get messenger from parent agent if available
|
|
self._messenger = getattr(agent, "messenger", None)
|
|
self._registry = getattr(agent, "_registry", None)
|
|
self._subagents: Dict[str, Any] = {}
|
|
self._dynamic_subagents: Dict[str, SubagentSpec] = {}
|
|
self._tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
# Extract model and formatter from parent agent
|
|
self._model = getattr(agent, "model", None)
|
|
self._formatter = getattr(agent, "formatter", None)
|
|
self._workspace_dir = getattr(agent, "workspace_dir", None)
|
|
self._config_name = getattr(agent, "config_name", None)
|
|
|
|
async def delegate(
|
|
self,
|
|
agent_id: str,
|
|
task: Callable[..., Awaitable[Msg]],
|
|
context: Optional[Dict[str, Any]] = None,
|
|
) -> asyncio.Task:
|
|
"""Delegate task to a single subagent.
|
|
|
|
Args:
|
|
agent_id: Unique identifier for this subagent instance
|
|
task: Async function representing the task
|
|
context: Optional context dict for the subagent
|
|
|
|
Returns:
|
|
asyncio.Task for the delegated task
|
|
"""
|
|
async def _run_with_context():
|
|
result = await task(context or {})
|
|
return result
|
|
|
|
self._tasks[agent_id] = asyncio.create_task(_run_with_context())
|
|
logger.info("Delegated task to subagent: %s", agent_id)
|
|
return self._tasks[agent_id]
|
|
|
|
async def delegate_parallel(
|
|
self,
|
|
tasks: List[Dict[str, Any]],
|
|
) -> List[asyncio.Task]:
|
|
"""Delegate multiple tasks in parallel.
|
|
|
|
Args:
|
|
tasks: List of task dicts with keys:
|
|
- agent_id: Unique identifier
|
|
- task: Async function to execute
|
|
- context: Optional context dict
|
|
|
|
Returns:
|
|
List of asyncio.Task for all delegated tasks
|
|
"""
|
|
async def _run_task(task_def: Dict[str, Any]):
|
|
agent_id = task_def["agent_id"]
|
|
task_func = task_def["task"]
|
|
context = task_def.get("context", {})
|
|
|
|
async def _run_with_context():
|
|
return await task_func(context)
|
|
|
|
self._tasks[agent_id] = asyncio.create_task(_run_with_context())
|
|
return self._tasks[agent_id]
|
|
|
|
gathered_tasks = await asyncio.gather(
|
|
*[_run_task(t) for t in tasks],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
valid_tasks = [t for t in gathered_tasks if isinstance(t, asyncio.Task)]
|
|
logger.info(
|
|
"Delegated %d tasks in parallel (%d succeeded)",
|
|
len(tasks),
|
|
len(valid_tasks),
|
|
)
|
|
return valid_tasks
|
|
|
|
async def wait_for(self, agent_id: str, timeout: Optional[float] = None) -> Any:
|
|
"""Wait for subagent task to complete.
|
|
|
|
Args:
|
|
agent_id: Subagent identifier
|
|
timeout: Optional timeout in seconds
|
|
|
|
Returns:
|
|
Task result
|
|
|
|
Raises:
|
|
asyncio.TimeoutError: If task doesn't complete in time
|
|
KeyError: If agent_id not found
|
|
"""
|
|
if agent_id not in self._tasks:
|
|
raise KeyError(f"Unknown subagent: {agent_id}")
|
|
|
|
try:
|
|
return await asyncio.wait_for(
|
|
self._tasks[agent_id],
|
|
timeout=timeout,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Task %s timed out after %s seconds", agent_id, timeout)
|
|
raise
|
|
|
|
async def cancel(self, agent_id: str) -> bool:
|
|
"""Cancel a subagent task.
|
|
|
|
Args:
|
|
agent_id: Subagent identifier
|
|
|
|
Returns:
|
|
True if task was cancelled
|
|
"""
|
|
if agent_id in self._tasks:
|
|
self._tasks[agent_id].cancel()
|
|
del self._tasks[agent_id]
|
|
logger.info("Cancelled subagent task: %s", agent_id)
|
|
return True
|
|
return False
|
|
|
|
def list_tasks(self) -> List[str]:
|
|
"""List active subagent task IDs.
|
|
|
|
Returns:
|
|
List of agent_ids with pending tasks
|
|
"""
|
|
return list(self._tasks.keys())
|
|
|
|
@property
|
|
def tasks(self) -> Dict[str, asyncio.Task]:
|
|
"""Get copy of active tasks dict."""
|
|
return dict(self._tasks)
|
|
|
|
async def delegate_task(
|
|
self,
|
|
task_type: str,
|
|
task_data: Dict[str, Any],
|
|
target_agent: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Delegate a task with optional dynamic subagent definition.
|
|
|
|
Supports runtime subagent definition via task_data["agents"]:
|
|
|
|
task_data = {
|
|
"task": "Review code changes",
|
|
"agents": {
|
|
"code-reviewer": {
|
|
"description": "Expert code reviewer...",
|
|
"prompt": "Analyze code quality...",
|
|
"tools": ["Read", "Glob", "Grep"],
|
|
}
|
|
}
|
|
}
|
|
|
|
Args:
|
|
task_type: Type of task (e.g., "analysis", "review", "research")
|
|
task_data: Task payload, may include "agents" for dynamic subagent def
|
|
target_agent: Optional specific agent ID to delegate to
|
|
|
|
Returns:
|
|
Dict with "success" and result/error
|
|
"""
|
|
try:
|
|
# Extract dynamic subagent definitions from task_data
|
|
agents_def = task_data.get("agents", {})
|
|
|
|
if agents_def:
|
|
# Register dynamic subagents
|
|
for agent_name, agent_spec in agents_def.items():
|
|
self._dynamic_subagents[agent_name] = agent_spec
|
|
logger.info(
|
|
"Registered dynamic subagent: %s (description: %s)",
|
|
agent_name,
|
|
agent_spec.get("description", "")[:50],
|
|
)
|
|
|
|
# Determine target agent
|
|
effective_target = target_agent
|
|
if not effective_target:
|
|
# Use first available dynamic subagent or default
|
|
if agents_def:
|
|
effective_target = next(iter(agents_def.keys()))
|
|
else:
|
|
effective_target = "default"
|
|
|
|
# Execute the task (async)
|
|
task_result = await self._execute_task(
|
|
task_type=task_type,
|
|
task_data=task_data,
|
|
target_agent=effective_target,
|
|
)
|
|
|
|
# Clean up dynamic subagents after execution
|
|
for agent_name in agents_def.keys():
|
|
self._dynamic_subagents.pop(agent_name, None)
|
|
|
|
return {
|
|
"success": True,
|
|
"result": task_result,
|
|
"subagents_used": list(agents_def.keys()) if agents_def else [],
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Task delegation failed: %s", e)
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
}
|
|
|
|
async def _execute_task(
|
|
self,
|
|
task_type: str,
|
|
task_data: Dict[str, Any],
|
|
target_agent: str,
|
|
) -> Dict[str, Any]:
|
|
"""Execute the delegated task with a real subagent.
|
|
|
|
Args:
|
|
task_type: Type of task
|
|
task_data: Task payload
|
|
target_agent: Target agent identifier
|
|
|
|
Returns:
|
|
Task execution result with success/failure info
|
|
"""
|
|
task_content = task_data.get("task", task_data.get("prompt", ""))
|
|
timeout = task_data.get("timeout", DEFAULT_EXECUTION_TIMEOUT)
|
|
|
|
# Check if we have a dynamic subagent spec for this target
|
|
agent_spec = self._dynamic_subagents.get(target_agent)
|
|
|
|
if agent_spec:
|
|
logger.info(
|
|
"Executing task '%s' with dynamic subagent '%s'",
|
|
task_type,
|
|
target_agent,
|
|
)
|
|
return await self._create_and_run_subagent(
|
|
agent_name=target_agent,
|
|
agent_spec=agent_spec,
|
|
task_content=task_content,
|
|
task_type=task_type,
|
|
timeout=timeout,
|
|
)
|
|
|
|
# Fallback: try to use parent agent's model to process the task directly
|
|
logger.info(
|
|
"Executing task '%s' with parent agent '%s' (no dynamic subagent)",
|
|
task_type,
|
|
target_agent,
|
|
)
|
|
return await self._run_with_parent_agent(
|
|
task_content=task_content,
|
|
task_type=task_type,
|
|
timeout=timeout,
|
|
)
|
|
|
|
async def _create_and_run_subagent(
|
|
self,
|
|
agent_name: str,
|
|
agent_spec: SubagentSpec,
|
|
task_content: str,
|
|
task_type: str,
|
|
timeout: float,
|
|
) -> Dict[str, Any]:
|
|
"""Create and run a dynamic subagent.
|
|
|
|
Args:
|
|
agent_name: Name identifier for the subagent
|
|
agent_spec: Subagent specification (description, prompt, tools, model)
|
|
task_content: Task prompt to send to the subagent
|
|
task_type: Type of task
|
|
timeout: Execution timeout in seconds
|
|
|
|
Returns:
|
|
Dict with execution results
|
|
"""
|
|
subagent_id = f"subagent_{agent_name}_{uuid.uuid4().hex[:8]}"
|
|
|
|
try:
|
|
# Create subagent instance
|
|
subagent = await self._create_subagent(
|
|
subagent_id=subagent_id,
|
|
agent_spec=agent_spec,
|
|
)
|
|
|
|
if subagent is None:
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"subagent": agent_name,
|
|
"status": "failed",
|
|
"error": "Failed to create subagent",
|
|
"message": f"Could not instantiate subagent '{agent_name}'",
|
|
}
|
|
|
|
# Store for potential cleanup
|
|
self._subagents[subagent_id] = subagent
|
|
|
|
# Execute with timeout
|
|
result = await asyncio.wait_for(
|
|
self._run_subagent(subagent, task_content),
|
|
timeout=timeout,
|
|
)
|
|
|
|
# Extract response content
|
|
response_content = ""
|
|
if isinstance(result, Msg):
|
|
response_content = result.content
|
|
elif hasattr(result, "content"):
|
|
response_content = str(result.content)
|
|
elif isinstance(result, dict):
|
|
response_content = result.get("content", str(result))
|
|
else:
|
|
response_content = str(result)
|
|
|
|
logger.info(
|
|
"Subagent '%s' completed task '%s' successfully",
|
|
agent_name,
|
|
task_type,
|
|
)
|
|
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"subagent": {
|
|
"name": agent_name,
|
|
"id": subagent_id,
|
|
"description": agent_spec.get("description", ""),
|
|
},
|
|
"status": "completed",
|
|
"response": response_content,
|
|
"message": f"Task '{task_type}' executed with subagent '{agent_name}'",
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(
|
|
"Subagent '%s' timed out after %.1f seconds for task '%s'",
|
|
agent_name,
|
|
timeout,
|
|
task_type,
|
|
)
|
|
# Cancel the task if still running
|
|
if subagent_id in self._subagents:
|
|
self._subagents.pop(subagent_id, None)
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"subagent": agent_name,
|
|
"status": "timeout",
|
|
"error": f"Execution timed out after {timeout} seconds",
|
|
"message": f"Task '{task_type}' timed out for subagent '{agent_name}'",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Subagent '%s' failed for task '%s': %s",
|
|
agent_name,
|
|
task_type,
|
|
e,
|
|
exc_info=True,
|
|
)
|
|
# Cleanup on failure
|
|
if subagent_id in self._subagents:
|
|
self._subagents.pop(subagent_id, None)
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"subagent": agent_name,
|
|
"status": "error",
|
|
"error": str(e),
|
|
"message": f"Task '{task_type}' failed for subagent '{agent_name}': {e}",
|
|
}
|
|
|
|
async def _create_subagent(
|
|
self,
|
|
subagent_id: str,
|
|
agent_spec: SubagentSpec,
|
|
) -> Optional[Any]:
|
|
"""Create a subagent instance.
|
|
|
|
Uses the parent agent's model/formatter to create a lightweight
|
|
subagent for task execution.
|
|
|
|
Args:
|
|
subagent_id: Unique identifier for the subagent
|
|
agent_spec: Subagent specification
|
|
|
|
Returns:
|
|
Subagent instance or None if creation fails
|
|
"""
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from agentscope.memory import InMemoryMemory
|
|
|
|
# Get model and formatter from parent
|
|
model = self._model
|
|
formatter = self._formatter
|
|
|
|
if model is None:
|
|
logger.error("Cannot create subagent: parent agent has no model")
|
|
return None
|
|
|
|
# Build system prompt from agent spec
|
|
description = agent_spec.get("description", "")
|
|
prompt_template = agent_spec.get("prompt", "")
|
|
system_prompt = f"""You are {description}
|
|
|
|
{prompt_template}
|
|
|
|
Your task is to complete the user's request below.
|
|
"""
|
|
|
|
# Create a minimal ReActAgent as the subagent
|
|
from agentscope.agent import ReActAgent
|
|
|
|
subagent = ReActAgent(
|
|
name=subagent_id,
|
|
model=model,
|
|
sys_prompt=system_prompt,
|
|
toolkit=None, # Could load tools from agent_spec.get("tools", [])
|
|
memory=InMemoryMemory(),
|
|
formatter=formatter,
|
|
max_iters=agent_spec.get("max_iters", 5),
|
|
)
|
|
|
|
logger.debug("Created subagent: %s", subagent_id)
|
|
return subagent
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to create subagent '%s': %s",
|
|
subagent_id,
|
|
e,
|
|
exc_info=True,
|
|
)
|
|
return None
|
|
|
|
async def _run_subagent(
|
|
self,
|
|
subagent: Any,
|
|
task_content: str,
|
|
) -> Any:
|
|
"""Run a subagent with the given task.
|
|
|
|
Args:
|
|
subagent: Subagent instance
|
|
task_content: Task prompt
|
|
|
|
Returns:
|
|
Agent response (Msg or similar)
|
|
"""
|
|
from agentscope.message import Msg
|
|
|
|
# Create message for the subagent
|
|
task_msg = Msg(
|
|
name="user",
|
|
content=task_content,
|
|
role="user",
|
|
)
|
|
|
|
# Execute the agent
|
|
response = await subagent.reply(task_msg)
|
|
return response
|
|
|
|
async def _run_with_parent_agent(
|
|
self,
|
|
task_content: str,
|
|
task_type: str,
|
|
timeout: float,
|
|
) -> Dict[str, Any]:
|
|
"""Run task using the parent agent directly.
|
|
|
|
Used when no dynamic subagent is defined.
|
|
|
|
Args:
|
|
task_content: Task prompt
|
|
task_type: Type of task
|
|
timeout: Execution timeout
|
|
|
|
Returns:
|
|
Dict with execution results
|
|
"""
|
|
try:
|
|
result = await asyncio.wait_for(
|
|
self._agent.reply(Msg(
|
|
name="user",
|
|
content=task_content,
|
|
role="user",
|
|
)),
|
|
timeout=timeout,
|
|
)
|
|
|
|
response_content = ""
|
|
if isinstance(result, Msg):
|
|
response_content = result.content
|
|
elif hasattr(result, "content"):
|
|
response_content = str(result.content)
|
|
else:
|
|
response_content = str(result)
|
|
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"status": "completed",
|
|
"response": response_content,
|
|
"message": f"Task '{task_type}' executed with parent agent",
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"status": "timeout",
|
|
"error": f"Execution timed out after {timeout} seconds",
|
|
"message": f"Task '{task_type}' timed out",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Parent agent failed for task '%s': %s",
|
|
task_type,
|
|
e,
|
|
exc_info=True,
|
|
)
|
|
return {
|
|
"task_type": task_type,
|
|
"task": task_content,
|
|
"status": "error",
|
|
"error": str(e),
|
|
"message": f"Task '{task_type}' failed: {e}",
|
|
}
|
|
|
|
def get_dynamic_subagent(self, name: str) -> Optional[SubagentSpec]:
|
|
"""Get a dynamically defined subagent specification.
|
|
|
|
Args:
|
|
name: Subagent name
|
|
|
|
Returns:
|
|
Subagent spec dict or None if not found
|
|
"""
|
|
return self._dynamic_subagents.get(name)
|
|
|
|
def list_dynamic_subagents(self) -> List[str]:
|
|
"""List all registered dynamic subagent names.
|
|
|
|
Returns:
|
|
List of subagent names
|
|
"""
|
|
return list(self._dynamic_subagents.keys())
|
|
|
|
|
|
__all__ = ["TaskDelegator", "SubagentSpec"]
|