# -*- coding: utf-8 -*- """ Tool Guard API Routes Provides REST API endpoints for tool guard operations. """ from __future__ import annotations from typing import Any, Dict, List, Optional from datetime import datetime, timezone from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from backend.agents.base.tool_guard import ( ApprovalRecord, ApprovalStatus, SeverityLevel, TOOL_GUARD_STORE, default_findings_for_tool, ) router = APIRouter(prefix="/api/guard", tags=["guard"]) # Request/Response Models class ToolCallRequest(BaseModel): """Tool call request.""" tool_name: str = Field(..., description="Name of the tool") tool_input: Dict[str, Any] = Field(default_factory=dict, description="Tool parameters") agent_id: str = Field(..., description="Agent making the request") workspace_id: str = Field(..., description="Run context; historical field name retained for compatibility") session_id: Optional[str] = Field(None, description="Session identifier") class ApprovalRequest(BaseModel): """Request to approve a tool call.""" approval_id: str = Field(..., description="Approval request ID") one_time: bool = Field(True, description="Whether this is a one-time approval") expires_in_minutes: Optional[int] = Field(30, description="Approval expiration time") class DenyRequest(BaseModel): """Request to deny a tool call.""" approval_id: str = Field(..., description="Approval request ID") reason: Optional[str] = Field(None, description="Reason for denial") class BatchApprovalRequest(BaseModel): """Request to approve multiple tool calls.""" approval_ids: List[str] = Field(..., description="List of approval request IDs") one_time: bool = Field(True, description="Whether these are one-time approvals") class BatchApprovalResponse(BaseModel): """Response for batch approval operation.""" approved: List[ApprovalResponse] = Field(default_factory=list, description="Successfully approved") failed: List[Dict[str, Any]] = Field(default_factory=list, description="Failed approvals with errors") total_requested: int total_approved: int total_failed: int class ToolFinding(BaseModel): """Tool guard finding.""" severity: SeverityLevel message: str field: Optional[str] = None class ApprovalResponse(BaseModel): """Tool approval response.""" approval_id: str status: ApprovalStatus tool_name: str tool_input: Dict[str, Any] agent_id: str workspace_id: str run_id: str session_id: Optional[str] = None findings: List[ToolFinding] = Field(default_factory=list) created_at: str resolved_at: Optional[str] = None resolved_by: Optional[str] = None scope_type: str = "runtime_run" scope_note: str = ( "Approvals are scoped to the active runtime run. `workspace_id` is " "retained as a compatibility field name; prefer `run_id` for display." ) class PendingApprovalsResponse(BaseModel): """List of pending approvals.""" approvals: List[ApprovalResponse] total: int STORE = TOOL_GUARD_STORE SAFE_TOOLS = { "get_price", "get_fundamentals", "get_news", "analyze_technical", } def _to_response(record: ApprovalRecord) -> ApprovalResponse: return ApprovalResponse( approval_id=record.approval_id, status=record.status, tool_name=record.tool_name, tool_input=record.tool_input, agent_id=record.agent_id, workspace_id=record.workspace_id, run_id=record.workspace_id, session_id=record.session_id, findings=[ToolFinding(**f.to_dict()) for f in record.findings], created_at=record.created_at.isoformat(), resolved_at=record.resolved_at.isoformat() if record.resolved_at else None, resolved_by=record.resolved_by, ) # Routes @router.post("/check", response_model=ApprovalResponse) async def check_tool_call( request: ToolCallRequest, ): """ Check if a tool call requires approval. Args: request: Tool call details Returns: Approval status - may be auto-approved, auto-denied, or pending """ record = STORE.create_pending( tool_name=request.tool_name, tool_input=request.tool_input, agent_id=request.agent_id, workspace_id=request.workspace_id, session_id=request.session_id, findings=default_findings_for_tool(request.tool_name), ) if request.tool_name in SAFE_TOOLS: record.status = ApprovalStatus.APPROVED record.resolved_at = datetime.now(timezone.utc) record.resolved_by = "system" STORE.set_status( record.approval_id, ApprovalStatus.APPROVED, resolved_by="system", notify_request=False, ) return _to_response(record) @router.post("/approve", response_model=ApprovalResponse) async def approve_tool_call( request: ApprovalRequest, ): """ Approve a pending tool call. Args: request: Approval parameters Returns: Updated approval status """ record = STORE.get(request.approval_id) if not record: raise HTTPException(status_code=404, detail="Approval request not found") if record.status != ApprovalStatus.PENDING: raise HTTPException(status_code=400, detail=f"Approval already {record.status}") record = STORE.set_status( request.approval_id, ApprovalStatus.APPROVED, resolved_by="user", notify_request=True, ) return _to_response(record) @router.post("/deny", response_model=ApprovalResponse) async def deny_tool_call( request: DenyRequest, ): """ Deny a pending tool call. Args: request: Denial parameters Returns: Updated approval status """ record = STORE.get(request.approval_id) if not record: raise HTTPException(status_code=404, detail="Approval request not found") if record.status != ApprovalStatus.PENDING: raise HTTPException(status_code=400, detail=f"Approval already {record.status}") record = STORE.set_status( request.approval_id, ApprovalStatus.DENIED, resolved_by="user", notify_request=True, ) record.metadata["denial_reason"] = request.reason return _to_response(record) @router.get("/pending", response_model=PendingApprovalsResponse) async def list_pending_approvals( workspace_id: Optional[str] = None, agent_id: Optional[str] = None, ): """ List pending tool approval requests. Args: workspace_id: Filter by run id (historical query parameter name retained) agent_id: Filter by agent Returns: List of pending approvals """ pending = [ _to_response(record) for record in STORE.list( status=ApprovalStatus.PENDING, workspace_id=workspace_id, agent_id=agent_id, ) ] return PendingApprovalsResponse(approvals=pending, total=len(pending)) @router.get("/approvals/{approval_id}", response_model=ApprovalResponse) async def get_approval_status( approval_id: str, ): """ Get the status of a specific approval request. Args: approval_id: Approval request ID Returns: Approval status """ record = STORE.get(approval_id) if not record: raise HTTPException(status_code=404, detail="Approval request not found") return _to_response(record) @router.delete("/approvals/{approval_id}") async def cancel_approval( approval_id: str, ): """ Cancel/delete a pending approval request. Args: approval_id: Approval request ID Returns: Success message """ record = STORE.get(approval_id) if not record: raise HTTPException(status_code=404, detail="Approval request not found") STORE.cancel(approval_id) return _to_response(record) @router.post("/approve/batch", response_model=BatchApprovalResponse) async def batch_approve_tool_calls( request: BatchApprovalRequest, ): """ Approve multiple pending tool calls in a single request. Args: request: Batch approval parameters with list of approval IDs Returns: Batch approval results with successful and failed approvals """ approved: List[ApprovalResponse] = [] failed: List[Dict[str, Any]] = [] for approval_id in request.approval_ids: record = STORE.get(approval_id) if not record: failed.append({ "approval_id": approval_id, "error": "Approval request not found", }) continue if record.status != ApprovalStatus.PENDING: failed.append({ "approval_id": approval_id, "error": f"Approval already {record.status}", }) continue try: record = STORE.set_status( approval_id, ApprovalStatus.APPROVED, resolved_by="user", notify_request=True, ) approved.append(_to_response(record)) except Exception as e: failed.append({ "approval_id": approval_id, "error": str(e), }) return BatchApprovalResponse( approved=approved, failed=failed, total_requested=len(request.approval_ids), total_approved=len(approved), total_failed=len(failed), )