feat: Add agent workspace system and runtime management

- Add agent core modules (agent_core, factory, registry, skill_loader)
- Add runtime system for agent execution management
- Add REST API for agents, workspaces, and runtime control
- Add process supervisor for agent lifecycle management
- Add workspace template system with agent profiles
- Add frontend RuntimeView and runtime API integration
- Add per-agent skill workspaces for smoke_fullstack run
- Refactor skill system with active/installed separation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-17 16:43:29 +08:00
parent 2daf5717ba
commit 59b44545d0
121 changed files with 8384 additions and 358 deletions

21
backend/api/__init__.py Normal file
View File

@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
API Routes Package
Provides REST API endpoints for:
- Agent management
- Workspace management
- Tool guard operations
"""
from .agents import router as agents_router
from .workspaces import router as workspaces_router
from .guard import router as guard_router
from .runtime import router as runtime_router
__all__ = [
"agents_router",
"workspaces_router",
"guard_router",
"runtime_router",
]

405
backend/api/agents.py Normal file
View File

@@ -0,0 +1,405 @@
# -*- coding: utf-8 -*-
"""
Agent API Routes
Provides REST API endpoints for agent management within workspaces.
"""
from typing import Any, Dict, List, Optional
from pathlib import Path
from fastapi import APIRouter, HTTPException, Depends, Body
from pydantic import BaseModel, Field
from backend.agents import AgentFactory, WorkspaceManager, get_registry
from backend.agents.skills_manager import SkillsManager
router = APIRouter(prefix="/api/workspaces/{workspace_id}/agents", tags=["agents"])
# Request/Response Models
class CreateAgentRequest(BaseModel):
"""Request to create a new agent."""
agent_id: str = Field(..., description="Unique agent identifier")
agent_type: str = Field(..., description="Type of agent (e.g., technical_analyst)")
name: Optional[str] = Field(None, description="Display name")
description: Optional[str] = Field(None, description="Agent description")
clone_from: Optional[str] = Field(None, description="Agent ID to clone from")
llm_model_config: Optional[Dict[str, Any]] = Field(None, description="LLM model configuration")
class UpdateAgentRequest(BaseModel):
"""Request to update an agent."""
name: Optional[str] = None
description: Optional[str] = None
enabled_skills: Optional[List[str]] = None
disabled_skills: Optional[List[str]] = None
class AgentResponse(BaseModel):
"""Agent information response."""
agent_id: str
agent_type: str
workspace_id: str
config_path: str
agent_dir: str
status: str = "inactive"
class AgentFileResponse(BaseModel):
"""Agent file content response."""
filename: str
content: str
# Dependencies
def get_agent_factory():
"""Get AgentFactory instance."""
return AgentFactory()
def get_workspace_manager():
"""Get WorkspaceManager instance."""
return WorkspaceManager()
def get_skills_manager():
"""Get SkillsManager instance."""
return SkillsManager()
# Routes
@router.post("", response_model=AgentResponse)
async def create_agent(
workspace_id: str,
request: CreateAgentRequest,
factory: AgentFactory = Depends(get_agent_factory),
registry = Depends(get_registry),
):
"""
Create a new agent in a workspace.
Args:
workspace_id: Workspace identifier
request: Agent creation parameters
Returns:
Created agent information
"""
# Check workspace exists
if not factory.workspaces_root.exists():
raise HTTPException(status_code=404, detail="Workspaces root not found")
workspace_dir = factory.workspaces_root / workspace_id
if not workspace_dir.exists():
raise HTTPException(status_code=404, detail=f"Workspace '{workspace_id}' not found")
try:
# Create agent
agent = factory.create_agent(
agent_id=request.agent_id,
agent_type=request.agent_type,
workspace_id=workspace_id,
clone_from=request.clone_from,
)
# Register in registry
registry.register(
agent_id=request.agent_id,
agent_type=request.agent_type,
workspace_id=workspace_id,
config_path=str(agent.config_path),
agent_dir=str(agent.agent_dir),
status="inactive",
)
return AgentResponse(
agent_id=agent.agent_id,
agent_type=agent.agent_type,
workspace_id=agent.workspace_id,
config_path=str(agent.config_path),
agent_dir=str(agent.agent_dir),
status="inactive",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("", response_model=List[AgentResponse])
async def list_agents(
workspace_id: str,
factory: AgentFactory = Depends(get_agent_factory),
):
"""
List all agents in a workspace.
Args:
workspace_id: Workspace identifier
Returns:
List of agents
"""
try:
agents_data = factory.list_agents(workspace_id=workspace_id)
return [
AgentResponse(
agent_id=agent["agent_id"],
agent_type=agent["agent_type"],
workspace_id=workspace_id,
config_path=agent["config_path"],
agent_dir=str(Path(agent["config_path"]).parent),
status="inactive",
)
for agent in agents_data
]
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.get("/{agent_id}", response_model=AgentResponse)
async def get_agent(
workspace_id: str,
agent_id: str,
registry = Depends(get_registry),
):
"""
Get agent details.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
Returns:
Agent information
"""
agent_info = registry.get(agent_id)
if not agent_info or agent_info.workspace_id != workspace_id:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
return AgentResponse(
agent_id=agent_info.agent_id,
agent_type=agent_info.agent_type,
workspace_id=agent_info.workspace_id,
config_path=agent_info.config_path,
agent_dir=agent_info.agent_dir,
status=agent_info.status,
)
@router.delete("/{agent_id}")
async def delete_agent(
workspace_id: str,
agent_id: str,
factory: AgentFactory = Depends(get_agent_factory),
registry = Depends(get_registry),
):
"""
Delete an agent.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
Returns:
Success message
"""
# Check agent exists in registry
agent_info = registry.get(agent_id)
if not agent_info or agent_info.workspace_id != workspace_id:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Delete from factory
success = factory.delete_agent(agent_id, workspace_id)
if not success:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Unregister
registry.unregister(agent_id)
return {"message": f"Agent '{agent_id}' deleted successfully"}
@router.patch("/{agent_id}", response_model=AgentResponse)
async def update_agent(
workspace_id: str,
agent_id: str,
request: UpdateAgentRequest,
registry = Depends(get_registry),
):
"""
Update agent configuration.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
request: Update parameters
Returns:
Updated agent information
"""
agent_info = registry.get(agent_id)
if not agent_info or agent_info.workspace_id != workspace_id:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Update metadata in registry
metadata_updates = {}
if request.name:
metadata_updates["name"] = request.name
if request.description:
metadata_updates["description"] = request.description
if metadata_updates:
registry.update_metadata(agent_id, metadata_updates)
# Update skills if provided
if request.enabled_skills or request.disabled_skills:
skills_manager = SkillsManager()
skills_manager.update_agent_skill_overrides(
config_name=workspace_id,
agent_id=agent_id,
enable=request.enabled_skills or [],
disable=request.disabled_skills or [],
)
# Get updated info
agent_info = registry.get(agent_id)
return AgentResponse(
agent_id=agent_info.agent_id,
agent_type=agent_info.agent_type,
workspace_id=agent_info.workspace_id,
config_path=agent_info.config_path,
agent_dir=agent_info.agent_dir,
status=agent_info.status,
)
@router.post("/{agent_id}/skills/{skill_name}/enable")
async def enable_skill(
workspace_id: str,
agent_id: str,
skill_name: str,
registry = Depends(get_registry),
):
"""
Enable a skill for an agent.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
skill_name: Skill name to enable
Returns:
Success message
"""
agent_info = registry.get(agent_id)
if not agent_info or agent_info.workspace_id != workspace_id:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
skills_manager = SkillsManager()
result = skills_manager.update_agent_skill_overrides(
config_name=workspace_id,
agent_id=agent_id,
enable=[skill_name],
)
return {
"message": f"Skill '{skill_name}' enabled for agent '{agent_id}'",
"enabled_skills": result["enabled_skills"],
}
@router.post("/{agent_id}/skills/{skill_name}/disable")
async def disable_skill(
workspace_id: str,
agent_id: str,
skill_name: str,
registry = Depends(get_registry),
):
"""
Disable a skill for an agent.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
skill_name: Skill name to disable
Returns:
Success message
"""
agent_info = registry.get(agent_id)
if not agent_info or agent_info.workspace_id != workspace_id:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
skills_manager = SkillsManager()
result = skills_manager.update_agent_skill_overrides(
config_name=workspace_id,
agent_id=agent_id,
disable=[skill_name],
)
return {
"message": f"Skill '{skill_name}' disabled for agent '{agent_id}'",
"disabled_skills": result["disabled_skills"],
}
@router.get("/{agent_id}/files/{filename}", response_model=AgentFileResponse)
async def get_agent_file(
workspace_id: str,
agent_id: str,
filename: str,
workspace_manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
Read an agent's workspace file.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
filename: File to read (e.g., SOUL.md, ROLE.md)
Returns:
File content
"""
try:
content = workspace_manager.load_agent_file(
config_name=workspace_id,
agent_id=agent_id,
filename=filename,
)
return AgentFileResponse(filename=filename, content=content)
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"File '{filename}' not found")
@router.put("/{agent_id}/files/{filename}", response_model=AgentFileResponse)
async def update_agent_file(
workspace_id: str,
agent_id: str,
filename: str,
content: str = Body(..., media_type="text/plain"),
workspace_manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
Update an agent's workspace file.
Args:
workspace_id: Workspace identifier
agent_id: Agent identifier
filename: File to update
content: New file content
Returns:
Updated file information
"""
try:
workspace_manager.update_agent_file(
config_name=workspace_id,
agent_id=agent_id,
filename=filename,
content=content,
)
return AgentFileResponse(filename=filename, content=content)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

257
backend/api/guard.py Normal file
View File

@@ -0,0 +1,257 @@
# -*- coding: utf-8 -*-
"""
Tool Guard API Routes
Provides REST API endpoints for tool guard operations.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from backend.agents.base.tool_guard import (
ApprovalRecord,
ApprovalStatus,
SeverityLevel,
TOOL_GUARD_STORE,
default_findings_for_tool,
)
router = APIRouter(prefix="/api/guard", tags=["guard"])
# Request/Response Models
class ToolCallRequest(BaseModel):
"""Tool call request."""
tool_name: str = Field(..., description="Name of the tool")
tool_input: Dict[str, Any] = Field(default_factory=dict, description="Tool parameters")
agent_id: str = Field(..., description="Agent making the request")
workspace_id: str = Field(..., description="Workspace context")
session_id: Optional[str] = Field(None, description="Session identifier")
class ApprovalRequest(BaseModel):
"""Request to approve a tool call."""
approval_id: str = Field(..., description="Approval request ID")
one_time: bool = Field(True, description="Whether this is a one-time approval")
expires_in_minutes: Optional[int] = Field(30, description="Approval expiration time")
class DenyRequest(BaseModel):
"""Request to deny a tool call."""
approval_id: str = Field(..., description="Approval request ID")
reason: Optional[str] = Field(None, description="Reason for denial")
class ToolFinding(BaseModel):
"""Tool guard finding."""
severity: SeverityLevel
message: str
field: Optional[str] = None
class ApprovalResponse(BaseModel):
"""Tool approval response."""
approval_id: str
status: ApprovalStatus
tool_name: str
tool_input: Dict[str, Any]
agent_id: str
workspace_id: str
session_id: Optional[str] = None
findings: List[ToolFinding] = Field(default_factory=list)
created_at: str
resolved_at: Optional[str] = None
resolved_by: Optional[str] = None
class PendingApprovalsResponse(BaseModel):
"""List of pending approvals."""
approvals: List[ApprovalResponse]
total: int
STORE = TOOL_GUARD_STORE
SAFE_TOOLS = {
"get_price",
"get_fundamentals",
"get_news",
"analyze_technical",
}
def _to_response(record: ApprovalRecord) -> ApprovalResponse:
return ApprovalResponse(
approval_id=record.approval_id,
status=record.status,
tool_name=record.tool_name,
tool_input=record.tool_input,
agent_id=record.agent_id,
workspace_id=record.workspace_id,
session_id=record.session_id,
findings=[ToolFinding(**f.to_dict()) for f in record.findings],
created_at=record.created_at.isoformat(),
resolved_at=record.resolved_at.isoformat() if record.resolved_at else None,
resolved_by=record.resolved_by,
)
# Routes
@router.post("/check", response_model=ApprovalResponse)
async def check_tool_call(
request: ToolCallRequest,
):
"""
Check if a tool call requires approval.
Args:
request: Tool call details
Returns:
Approval status - may be auto-approved, auto-denied, or pending
"""
record = STORE.create_pending(
tool_name=request.tool_name,
tool_input=request.tool_input,
agent_id=request.agent_id,
workspace_id=request.workspace_id,
session_id=request.session_id,
findings=default_findings_for_tool(request.tool_name),
)
if request.tool_name in SAFE_TOOLS:
record.status = ApprovalStatus.APPROVED
record.resolved_at = datetime.utcnow()
record.resolved_by = "system"
STORE.set_status(
record.approval_id,
ApprovalStatus.APPROVED,
resolved_by="system",
notify_request=False,
)
return _to_response(record)
@router.post("/approve", response_model=ApprovalResponse)
async def approve_tool_call(
request: ApprovalRequest,
):
"""
Approve a pending tool call.
Args:
request: Approval parameters
Returns:
Updated approval status
"""
record = STORE.get(request.approval_id)
if not record:
raise HTTPException(status_code=404, detail="Approval request not found")
if record.status != ApprovalStatus.PENDING:
raise HTTPException(status_code=400, detail=f"Approval already {record.status}")
record.status = ApprovalStatus.APPROVED
record.resolved_at = datetime.utcnow()
record.resolved_by = "user"
return _to_response(record)
@router.post("/deny", response_model=ApprovalResponse)
async def deny_tool_call(
request: DenyRequest,
):
"""
Deny a pending tool call.
Args:
request: Denial parameters
Returns:
Updated approval status
"""
record = STORE.get(request.approval_id)
if not record:
raise HTTPException(status_code=404, detail="Approval request not found")
if record.status != ApprovalStatus.PENDING:
raise HTTPException(status_code=400, detail=f"Approval already {record.status}")
record.status = ApprovalStatus.DENIED
record.resolved_at = datetime.utcnow()
record.resolved_by = "user"
record.metadata["denial_reason"] = request.reason
return _to_response(record)
@router.get("/pending", response_model=PendingApprovalsResponse)
async def list_pending_approvals(
workspace_id: Optional[str] = None,
agent_id: Optional[str] = None,
):
"""
List pending tool approval requests.
Args:
workspace_id: Filter by workspace
agent_id: Filter by agent
Returns:
List of pending approvals
"""
pending = [
_to_response(record)
for record in STORE.list(
status=ApprovalStatus.PENDING,
workspace_id=workspace_id,
agent_id=agent_id,
)
]
return PendingApprovalsResponse(approvals=pending, total=len(pending))
@router.get("/approvals/{approval_id}", response_model=ApprovalResponse)
async def get_approval_status(
approval_id: str,
):
"""
Get the status of a specific approval request.
Args:
approval_id: Approval request ID
Returns:
Approval status
"""
record = STORE.get(approval_id)
if not record:
raise HTTPException(status_code=404, detail="Approval request not found")
return _to_response(record)
@router.delete("/approvals/{approval_id}")
async def cancel_approval(
approval_id: str,
):
"""
Cancel/delete a pending approval request.
Args:
approval_id: Approval request ID
Returns:
Success message
"""
record = STORE.get(approval_id)
if not record:
raise HTTPException(status_code=404, detail="Approval request not found")
STORE.cancel(approval_id)
return _to_response(record)

135
backend/api/runtime.py Normal file
View File

@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
"""Runtime API routes exposing the latest trading run state."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from backend.runtime.agent_runtime import AgentRuntimeState
from backend.runtime.context import TradingRunContext
from backend.runtime.manager import TradingRuntimeManager
router = APIRouter(prefix="/api/runtime", tags=["runtime"])
runtime_manager: Optional[TradingRuntimeManager] = None
PROJECT_ROOT = Path(__file__).resolve().parents[2]
class RunContextResponse(BaseModel):
config_name: str
run_dir: str
bootstrap_values: Dict[str, Any]
class RuntimeAgentState(BaseModel):
agent_id: str
status: str
last_session: Optional[str] = None
last_updated: str
class RuntimeAgentsResponse(BaseModel):
agents: List[RuntimeAgentState]
class RuntimeEvent(BaseModel):
timestamp: str
event: str
details: Dict[str, Any]
session: Optional[str]
class RuntimeEventsResponse(BaseModel):
events: List[RuntimeEvent]
def _latest_snapshot_path() -> Optional[Path]:
candidates = sorted(
PROJECT_ROOT.glob("runs/*/state/runtime_state.json"),
key=lambda path: path.stat().st_mtime,
reverse=True,
)
return candidates[0] if candidates else None
def _load_snapshot() -> Dict[str, Any]:
snapshot_path = _latest_snapshot_path()
if snapshot_path is None or not snapshot_path.exists():
raise HTTPException(status_code=503, detail="runtime manager is not initialized")
return json.loads(snapshot_path.read_text(encoding="utf-8"))
def _get_runtime_payload() -> Dict[str, Any]:
if runtime_manager is not None:
return runtime_manager.build_snapshot()
return _load_snapshot()
def _to_state_response(state: AgentRuntimeState) -> RuntimeAgentState:
return RuntimeAgentState(
agent_id=state.agent_id,
status=state.status,
last_session=state.last_session,
last_updated=state.last_updated.isoformat(),
)
@router.get("/context", response_model=RunContextResponse)
async def get_run_context() -> RunContextResponse:
"""Return the most recent run context."""
payload = _get_runtime_payload()
context = payload.get("context")
if context is None:
raise HTTPException(status_code=404, detail="run context is not ready")
return RunContextResponse(
config_name=context["config_name"],
run_dir=context["run_dir"],
bootstrap_values=context["bootstrap_values"],
)
@router.get("/agents", response_model=RuntimeAgentsResponse)
async def list_agent_states() -> RuntimeAgentsResponse:
"""List the current runtime state of every registered agent."""
payload = _get_runtime_payload()
agents = [RuntimeAgentState(**agent) for agent in payload.get("agents", [])]
return RuntimeAgentsResponse(agents=agents)
@router.get("/events", response_model=RuntimeEventsResponse)
async def list_runtime_events() -> RuntimeEventsResponse:
"""Return the recent runtime events that TradingRuntimeManager emitted."""
payload = _get_runtime_payload()
events = [RuntimeEvent(**event) for event in payload.get("events", [])]
return RuntimeEventsResponse(events=events)
@router.get("/agents/{agent_id}", response_model=RuntimeAgentState)
async def get_agent_state(agent_id: str) -> RuntimeAgentState:
"""Return the current runtime state for a single agent."""
payload = _get_runtime_payload()
state = next(
(agent for agent in payload.get("agents", []) if agent["agent_id"] == agent_id),
None,
)
if state is None:
raise HTTPException(status_code=404, detail=f"agent '{agent_id}' not registered")
return RuntimeAgentState(**state)
def register_runtime_manager(manager: TradingRuntimeManager) -> None:
"""Allow other modules to expose the runtime manager to the API."""
global runtime_manager
runtime_manager = manager
def unregister_runtime_manager() -> None:
"""Drop the runtime manager reference (used for shutdown/testing)."""
global runtime_manager
runtime_manager = None

196
backend/api/workspaces.py Normal file
View File

@@ -0,0 +1,196 @@
# -*- coding: utf-8 -*-
"""
Workspace API Routes
Provides REST API endpoints for workspace management.
"""
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from backend.agents import WorkspaceManager
router = APIRouter(prefix="/api/workspaces", tags=["workspaces"])
# Request/Response Models
class CreateWorkspaceRequest(BaseModel):
"""Request to create a new workspace."""
workspace_id: str = Field(..., description="Unique workspace identifier")
name: Optional[str] = Field(None, description="Display name")
description: Optional[str] = Field(None, description="Workspace description")
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
class UpdateWorkspaceRequest(BaseModel):
"""Request to update a workspace."""
name: Optional[str] = None
description: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class WorkspaceResponse(BaseModel):
"""Workspace information response."""
workspace_id: str
name: str
description: str
created_at: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class WorkspaceListResponse(BaseModel):
"""List of workspaces response."""
workspaces: List[WorkspaceResponse]
total: int
# Dependencies
def get_workspace_manager():
"""Get WorkspaceManager instance."""
return WorkspaceManager()
# Routes
@router.post("", response_model=WorkspaceResponse)
async def create_workspace(
request: CreateWorkspaceRequest,
manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
Create a new workspace.
Args:
request: Workspace creation parameters
Returns:
Created workspace information
"""
try:
config = manager.create_workspace(
workspace_id=request.workspace_id,
name=request.name,
description=request.description,
metadata=request.metadata or {},
)
return WorkspaceResponse(
workspace_id=config.workspace_id,
name=config.name,
description=config.description,
created_at=config.created_at,
metadata=config.metadata,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("", response_model=WorkspaceListResponse)
async def list_workspaces(
manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
List all workspaces.
Returns:
List of workspaces
"""
workspaces = manager.list_workspaces()
return WorkspaceListResponse(
workspaces=[
WorkspaceResponse(
workspace_id=ws.workspace_id,
name=ws.name,
description=ws.description,
created_at=ws.created_at,
metadata=ws.metadata,
)
for ws in workspaces
],
total=len(workspaces),
)
@router.get("/{workspace_id}", response_model=WorkspaceResponse)
async def get_workspace(
workspace_id: str,
manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
Get workspace details.
Args:
workspace_id: Workspace identifier
Returns:
Workspace information
"""
workspace = manager.get_workspace(workspace_id)
if not workspace:
raise HTTPException(status_code=404, detail=f"Workspace '{workspace_id}' not found")
return WorkspaceResponse(
workspace_id=workspace["workspace_id"],
name=workspace.get("name", workspace_id),
description=workspace.get("description", ""),
created_at=workspace.get("created_at"),
metadata=workspace.get("metadata", {}),
)
@router.patch("/{workspace_id}", response_model=WorkspaceResponse)
async def update_workspace(
workspace_id: str,
request: UpdateWorkspaceRequest,
manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
Update workspace configuration.
Args:
workspace_id: Workspace identifier
request: Update parameters
Returns:
Updated workspace information
"""
try:
config = manager.update_workspace_config(
workspace_id=workspace_id,
name=request.name,
description=request.description,
metadata=request.metadata,
)
return WorkspaceResponse(
workspace_id=config.workspace_id,
name=config.name,
description=config.description,
created_at=config.created_at,
metadata=config.metadata,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.delete("/{workspace_id}")
async def delete_workspace(
workspace_id: str,
force: bool = False,
manager: WorkspaceManager = Depends(get_workspace_manager),
):
"""
Delete a workspace.
Args:
workspace_id: Workspace identifier
force: If True, delete even if workspace has agents
Returns:
Success message
"""
try:
success = manager.delete_workspace(workspace_id, force=force)
if not success:
raise HTTPException(status_code=404, detail=f"Workspace '{workspace_id}' not found")
return {"message": f"Workspace '{workspace_id}' deleted successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))