- Migrate OpenClaw from HTTP (port 8004) to WebSocket (port 18789) - Add workspace file list and content preview handlers - Add OpenClawStatus component with agent/skills view - Add OpenClawView panel in trader interface - Add Zustand store for OpenClaw state management - Fix gateway logging noise (yfinance, websockets) - Fix RunWorkspaceManager.get_agent_asset_dir attribute error - Handle missing workspace files gracefully in preview Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1104 lines
38 KiB
Python
1104 lines
38 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Stable Pydantic models for OpenClaw CLI output.
|
|
|
|
These models normalize the raw JSON from `openclaw status --json`,
|
|
`sessions --json`, `cron list --json`, and `approvals get --json`
|
|
into a consistent, well-typed structure with safe accessors.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Literal
|
|
|
|
from pydantic import AliasChoices, BaseModel, Field
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class HeartbeatAgentStatus(BaseModel):
|
|
"""Heartbeat status for a single agent."""
|
|
|
|
agent_id: str
|
|
enabled: bool
|
|
every: str = ""
|
|
every_ms: int | None = None
|
|
|
|
|
|
class HeartbeatStatus(BaseModel):
|
|
"""Heartbeat section of status output."""
|
|
|
|
default_agent_id: str
|
|
agents: list[HeartbeatAgentStatus] = Field(default_factory=list)
|
|
|
|
|
|
class LinkChannel(BaseModel):
|
|
"""Linked channel info in status output."""
|
|
|
|
id: str
|
|
label: str
|
|
linked: bool
|
|
auth_age_ms: int | None = None
|
|
|
|
|
|
class SessionDefaults(BaseModel):
|
|
"""Session defaults in status output."""
|
|
|
|
model: str | None = None
|
|
context_tokens: int | None = None
|
|
|
|
|
|
class SessionStatus(BaseModel):
|
|
"""A single session's live status fields."""
|
|
|
|
key: str
|
|
kind: Literal["direct", "group", "global", "unknown"] = "unknown"
|
|
agent_id: str | None = None
|
|
session_id: str | None = None
|
|
updated_at: int | None = Field(default=None, validation_alias="updatedAt")
|
|
age_ms: int | None = Field(default=None, validation_alias="ageMs")
|
|
thinking_level: str | None = Field(default=None)
|
|
fast_mode: bool | None = None
|
|
verbose_level: str | None = None
|
|
reasoning_level: str | None = None
|
|
elevated_level: str | None = None
|
|
system_sent: bool | None = None
|
|
aborted_last_run: bool | None = None
|
|
input_tokens: int | None = None
|
|
output_tokens: int | None = None
|
|
total_tokens: int | None = None
|
|
total_tokens_fresh: bool | None = None
|
|
cache_read: int | None = None
|
|
cache_write: int | None = None
|
|
remaining_tokens: int | None = None
|
|
percent_used: float | None = None
|
|
model: str | None = None
|
|
context_tokens: int | None = None
|
|
flags: list[str] = Field(default_factory=list)
|
|
|
|
@property
|
|
def updated_at_dt(self) -> datetime | None:
|
|
if self.updated_at is None:
|
|
return None
|
|
return datetime.fromtimestamp(self.updated_at / 1000, tz=timezone.utc)
|
|
|
|
@property
|
|
def age_str(self) -> str:
|
|
if self.age_ms is None:
|
|
return "?"
|
|
seconds = int(self.age_ms / 1000)
|
|
if seconds < 60:
|
|
return f"{seconds}s"
|
|
minutes = seconds // 60
|
|
if minutes < 60:
|
|
return f"{minutes}m"
|
|
hours = minutes // 60
|
|
if hours < 24:
|
|
return f"{hours}h"
|
|
days = hours // 24
|
|
return f"{days}d"
|
|
|
|
|
|
class SessionsStatusGroup(BaseModel):
|
|
"""Sessions grouped by agent."""
|
|
|
|
agent_id: str
|
|
path: str
|
|
count: int
|
|
recent: list[SessionStatus] = Field(default_factory=list)
|
|
|
|
|
|
class SessionsStatus(BaseModel):
|
|
"""Sessions section of status output."""
|
|
|
|
paths: list[str] = Field(default_factory=list)
|
|
count: int = 0
|
|
defaults: SessionDefaults = Field(default_factory=SessionDefaults)
|
|
recent: list[SessionStatus] = Field(default_factory=list)
|
|
by_agent: list[SessionsStatusGroup] = Field(default_factory=list)
|
|
|
|
|
|
class UpdateChannelInfo(BaseModel):
|
|
"""Update channel info."""
|
|
|
|
channel: str | None = None
|
|
source: str | None = None
|
|
|
|
|
|
class UpdateInfo(BaseModel):
|
|
"""Update info in status output."""
|
|
|
|
install_kind: str | None = Field(default=None, validation_alias="installKind")
|
|
git_tag: str | None = Field(default=None, validation_alias="gitTag")
|
|
git_branch: str | None = Field(default=None, validation_alias="gitBranch")
|
|
|
|
|
|
class OsSummary(BaseModel):
|
|
"""OS summary in status output."""
|
|
|
|
os: str | None = None
|
|
arch: str | None = None
|
|
hostname: str | None = None
|
|
|
|
|
|
class MemoryInfo(BaseModel):
|
|
"""Memory info in status output."""
|
|
|
|
used_mb: float | None = Field(default=None, validation_alias="usedMb")
|
|
total_mb: float | None = None
|
|
percent_used: float | None = None
|
|
|
|
|
|
class SecretDiagnostics(BaseModel):
|
|
"""Secret diagnostics summary."""
|
|
|
|
total: int = 0
|
|
providers: dict[str, int] = Field(default_factory=dict)
|
|
|
|
|
|
class GatewayStatus(BaseModel):
|
|
"""Gateway connection status."""
|
|
|
|
mode: str | None = None
|
|
url: str | None = None
|
|
url_source: str | None = Field(default=None, validation_alias="urlSource")
|
|
misconfigured: bool | None = None
|
|
reachable: bool | None = None
|
|
connect_latency_ms: int | None = Field(default=None, validation_alias="connectLatencyMs")
|
|
self_: str | None = Field(default=None, validation_alias="self")
|
|
error: str | None = None
|
|
auth_warning: str | None = Field(default=None, validation_alias="authWarning")
|
|
|
|
|
|
class AgentHealth(BaseModel):
|
|
"""Per-agent health status."""
|
|
|
|
agent_id: str | None = Field(default=None, validation_alias="agentId")
|
|
status: str | None = None
|
|
model: str | None = None
|
|
error: str | None = None
|
|
|
|
|
|
class OpenClawStatus(BaseModel):
|
|
"""Complete parsed output of `openclaw status --json`.
|
|
|
|
All fields are optional so that partial/incomplete responses
|
|
from older CLI versions still parse cleanly.
|
|
"""
|
|
|
|
runtime_version: str | None = Field(default=None, validation_alias="runtimeVersion")
|
|
link_channel: LinkChannel | None = Field(default=None, validation_alias="linkChannel")
|
|
heartbeat: HeartbeatStatus | None = None
|
|
channel_summary: list[str] = Field(default_factory=list)
|
|
queued_system_events: list[str] = Field(default_factory=list)
|
|
sessions: SessionsStatus | None = None
|
|
os: OsSummary | None = None
|
|
update: UpdateInfo | None = None
|
|
update_channel: UpdateChannelInfo | None = Field(default=None, validation_alias="updateChannel")
|
|
memory: MemoryInfo | None = None
|
|
memory_plugin: str | None = Field(default=None, validation_alias="memoryPlugin")
|
|
gateway: GatewayStatus | None = None
|
|
gateway_service: dict[str, Any] | None = Field(default=None, validation_alias="gatewayService")
|
|
node_service: dict[str, Any] | None = Field(default=None, validation_alias="nodeService")
|
|
agents: list[AgentHealth] = Field(default_factory=list)
|
|
secret_diagnostics: SecretDiagnostics | None = Field(default=None, validation_alias="secretDiagnostics")
|
|
health: dict[str, Any] | None = None
|
|
usage: dict[str, Any] | None = None
|
|
last_heartbeat: dict[str, Any] | None = Field(default=None, validation_alias="lastHeartbeat")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class SessionEntry(BaseModel):
|
|
"""A single session entry from `openclaw sessions --json`."""
|
|
|
|
key: str = Field(validation_alias="key")
|
|
session_key: str | None = Field(default=None, validation_alias="sessionKey")
|
|
session_id: str | None = Field(default=None, validation_alias="sessionId")
|
|
updated_at: int | None = Field(default=None, validation_alias="updatedAt")
|
|
age_ms: int | None = Field(default=None, validation_alias="ageMs")
|
|
model: str | None = None
|
|
model_provider: str | None = Field(default=None, validation_alias="modelProvider")
|
|
kind: Literal["direct", "group", "global", "unknown"] = "unknown"
|
|
agent_id: str | None = Field(default=None, validation_alias="agentId")
|
|
flags: list[str] = Field(default_factory=list)
|
|
|
|
@property
|
|
def resolved_key(self) -> str:
|
|
return self.session_key or self.key
|
|
|
|
@property
|
|
def updated_at_dt(self) -> datetime | None:
|
|
if self.updated_at is None:
|
|
return None
|
|
return datetime.fromtimestamp(self.updated_at / 1000, tz=timezone.utc)
|
|
|
|
@property
|
|
def age_str(self) -> str:
|
|
if self.age_ms is None:
|
|
return "?"
|
|
seconds = int(self.age_ms / 1000)
|
|
if seconds < 60:
|
|
return f"{seconds}s"
|
|
minutes = seconds // 60
|
|
if minutes < 60:
|
|
return f"{minutes}m"
|
|
hours = minutes // 60
|
|
if hours < 24:
|
|
return f"{hours}h"
|
|
days = hours // 24
|
|
return f"{days}d"
|
|
|
|
|
|
class SessionHistoryEvent(BaseModel):
|
|
"""A single event within a session history."""
|
|
|
|
ts: int | None = None
|
|
type: str = ""
|
|
role: str | None = None
|
|
content: str | None = None
|
|
model: str | None = None
|
|
tokens: int | None = None
|
|
channel_id: str | None = Field(default=None, validation_alias="channelId")
|
|
|
|
@property
|
|
def ts_dt(self) -> datetime | None:
|
|
if self.ts is None:
|
|
return None
|
|
return datetime.fromtimestamp(self.ts / 1000, tz=timezone.utc)
|
|
|
|
|
|
class SessionHistory(BaseModel):
|
|
"""Session history response from `openclaw sessions history <key> --json`."""
|
|
|
|
session_key: str = Field(validation_alias="sessionKey")
|
|
session_id: str | None = Field(default=None, validation_alias="sessionId")
|
|
events: list[SessionHistoryEvent] = Field(default_factory=list)
|
|
raw_text: str | None = Field(default=None, validation_alias="rawText")
|
|
|
|
|
|
class SessionsList(BaseModel):
|
|
"""Parsed response from `openclaw sessions --json`."""
|
|
|
|
sessions: list[SessionEntry] = Field(default_factory=list)
|
|
|
|
def get(self, key: str) -> SessionEntry | None:
|
|
for s in self.sessions:
|
|
if s.key == key or s.session_key == key:
|
|
return s
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cron
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CronJob(BaseModel):
|
|
"""A single cron job from `openclaw cron list --json`."""
|
|
|
|
id: str | None = None
|
|
label: str | None = None
|
|
schedule: str | None = None
|
|
cron: str | None = None
|
|
command: str | None = None
|
|
agent_id: str | None = Field(default=None, validation_alias="agentId")
|
|
enabled: bool = True
|
|
status: str | None = None
|
|
next_run_at: int | None = Field(default=None, validation_alias="nextRunAt")
|
|
last_run_at: int | None = Field(default=None, validation_alias="lastRunAt")
|
|
last_exit_code: int | None = Field(default=None, validation_alias="lastExitCode")
|
|
|
|
@property
|
|
def display_schedule(self) -> str:
|
|
return self.schedule or self.cron or "?"
|
|
|
|
@property
|
|
def next_run_dt(self) -> datetime | None:
|
|
if self.next_run_at is None:
|
|
return None
|
|
return datetime.fromtimestamp(self.next_run_at / 1000, tz=timezone.utc)
|
|
|
|
|
|
class CronList(BaseModel):
|
|
"""Parsed response from `openclaw cron list --json`."""
|
|
|
|
cron: list[CronJob] = Field(default_factory=list)
|
|
jobs: list[CronJob] = Field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_raw(cls, raw: dict[str, Any] | None) -> "CronList":
|
|
if not raw:
|
|
return cls()
|
|
raw_jobs: list[dict[str, Any]] = raw.get("cron") or raw.get("jobs") or []
|
|
jobs = [CronJob.model_validate(j, strict=False) if isinstance(j, dict) else j for j in raw_jobs]
|
|
return cls(cron=jobs, jobs=jobs)
|
|
|
|
def __iter__(self):
|
|
return iter(self.cron)
|
|
|
|
def __len__(self):
|
|
return len(self.cron)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Approvals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ExecApprovalToolInput(BaseModel):
|
|
"""Structured tool_input from an approval request."""
|
|
|
|
command: str | None = None
|
|
command_preview: str | None = Field(default=None, validation_alias="commandPreview")
|
|
cwd: str | None = None
|
|
env_keys: list[str] = Field(default_factory=list, validation_alias="envKeys")
|
|
argv: list[str] = Field(default_factory=list)
|
|
|
|
|
|
class ExecApprovalFinding(BaseModel):
|
|
"""A security finding attached to an approval request."""
|
|
|
|
severity: Literal["critical", "high", "medium", "low", "info"]
|
|
message: str
|
|
check: str | None = None
|
|
|
|
|
|
class ApprovalRequest(BaseModel):
|
|
"""A single approval request from `openclaw approvals get --json`."""
|
|
|
|
approval_id: str = Field(validation_alias="approvalId")
|
|
tool_name: str = Field(validation_alias="toolName")
|
|
status: Literal["pending", "approved", "denied", "expired"] = "pending"
|
|
agent_id: str | None = Field(default=None, validation_alias="agentId")
|
|
workspace_id: str | None = Field(default=None, validation_alias="workspaceId")
|
|
session_id: str | None = Field(default=None, validation_alias="sessionId")
|
|
created_at: int | None = Field(default=None, validation_alias="createdAt")
|
|
expires_at: int | None = Field(default=None, validation_alias="expiresAt")
|
|
tool_input: ExecApprovalToolInput | None = Field(default=None, validation_alias="toolInput")
|
|
findings: list[ExecApprovalFinding] = Field(default_factory=list)
|
|
|
|
@property
|
|
def created_at_dt(self) -> datetime | None:
|
|
if self.created_at is None:
|
|
return None
|
|
return datetime.fromtimestamp(self.created_at / 1000, tz=timezone.utc)
|
|
|
|
@property
|
|
def is_expired(self) -> bool:
|
|
if self.expires_at is None:
|
|
return False
|
|
return datetime.now(timezone.utc).timestamp() * 1000 > self.expires_at
|
|
|
|
|
|
class ApprovalsList(BaseModel):
|
|
"""Parsed response from `openclaw approvals get --json`."""
|
|
|
|
approvals: list[ApprovalRequest] = Field(default_factory=list)
|
|
pending: list[ApprovalRequest] = Field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_raw(cls, raw: dict[str, Any] | None) -> "ApprovalsList":
|
|
if not raw:
|
|
return cls()
|
|
raw_approvals: list[dict[str, Any]] = raw.get("approvals") or []
|
|
approvals = [
|
|
ApprovalRequest.model_validate(a, strict=False) if isinstance(a, dict) else a
|
|
for a in raw_approvals
|
|
]
|
|
pending = [a for a in approvals if a.status == "pending" and not a.is_expired]
|
|
return cls(approvals=approvals, pending=pending)
|
|
|
|
def __iter__(self):
|
|
return iter(self.approvals)
|
|
|
|
def __len__(self):
|
|
return len(self.approvals)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Normalization helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def normalize_status(raw: dict[str, Any] | None) -> OpenClawStatus:
|
|
"""Parse raw status dict into a strongly-typed OpenClawStatus.
|
|
|
|
Silently ignores unknown fields so schema changes in newer CLI
|
|
versions don't break existing callers.
|
|
"""
|
|
if not raw:
|
|
return OpenClawStatus()
|
|
return OpenClawStatus.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_sessions(raw: dict[str, Any] | None) -> SessionsList:
|
|
"""Parse raw sessions dict into a strongly-typed SessionsList."""
|
|
if not raw:
|
|
return SessionsList()
|
|
sessions = raw.get("sessions") or []
|
|
return SessionsList(
|
|
sessions=[
|
|
SessionEntry.model_validate(s, strict=False) for s in sessions if isinstance(s, dict)
|
|
]
|
|
)
|
|
|
|
|
|
def normalize_session_history(
|
|
raw: dict[str, Any] | None,
|
|
session_key: str = "",
|
|
) -> SessionHistory:
|
|
"""Parse raw session history dict into a SessionHistory."""
|
|
if not raw:
|
|
return SessionHistory(session_key=session_key, events=[])
|
|
history = raw.get("history") or []
|
|
return SessionHistory.model_validate(
|
|
{
|
|
**raw,
|
|
"sessionKey": raw.get("sessionKey", session_key),
|
|
"events": [
|
|
SessionHistoryEvent.model_validate(e, strict=False)
|
|
for e in history
|
|
if isinstance(e, dict)
|
|
],
|
|
},
|
|
strict=False,
|
|
)
|
|
|
|
|
|
def normalize_cron_jobs(raw: dict[str, Any] | None) -> CronList:
|
|
"""Parse raw cron list dict into a CronList."""
|
|
return CronList.from_raw(raw)
|
|
|
|
|
|
def normalize_approvals(raw: dict[str, Any] | None) -> ApprovalsList:
|
|
"""Parse raw approvals dict into an ApprovalsList."""
|
|
return ApprovalsList.from_raw(raw)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agents
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class AgentSummary(BaseModel):
|
|
"""A single agent summary from `openclaw agents list --json`."""
|
|
|
|
id: str
|
|
name: str | None = None
|
|
identity_name: str | None = Field(default=None, validation_alias=AliasChoices("identityName", "identity_name"))
|
|
identity_emoji: str | None = Field(default=None, validation_alias=AliasChoices("identityEmoji", "identity_emoji"))
|
|
identity_source: Literal["identity", "config"] | None = Field(default=None, validation_alias=AliasChoices("identitySource", "identity_source"))
|
|
workspace: str = ""
|
|
agent_dir: str = Field(default="", validation_alias=AliasChoices("agentDir", "agent_dir"))
|
|
model: str | None = None
|
|
bindings: int = 0
|
|
binding_details: list[str] = Field(default_factory=list, validation_alias=AliasChoices("bindingDetails", "binding_details"))
|
|
routes: list[str] = Field(default_factory=list)
|
|
providers: list[str] = Field(default_factory=list)
|
|
is_default: bool = Field(default=False, validation_alias=AliasChoices("isDefault", "is_default"))
|
|
|
|
|
|
class AgentsList(BaseModel):
|
|
"""Parsed response from `openclaw agents list --json`."""
|
|
|
|
agents: list[AgentSummary] = Field(default_factory=list)
|
|
|
|
def __iter__(self):
|
|
return iter(self.agents)
|
|
|
|
def __len__(self):
|
|
return len(self.agents)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Skills
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SkillRequirement(BaseModel):
|
|
"""Requirements entry for a skill."""
|
|
|
|
bins: list[str] = Field(default_factory=list)
|
|
any_bins: list[str] = Field(default_factory=list, validation_alias="anyBins")
|
|
env: list[str] = Field(default_factory=list)
|
|
config: list[str] = Field(default_factory=list)
|
|
os: list[str] = Field(default_factory=list)
|
|
|
|
|
|
class SkillStatusEntry(BaseModel):
|
|
"""A single skill entry from `openclaw skills list --json`."""
|
|
|
|
name: str = ""
|
|
description: str = ""
|
|
source: str = ""
|
|
bundled: bool = False
|
|
file_path: str = Field(default="", validation_alias="filePath")
|
|
base_dir: str = Field(default="", validation_alias="baseDir")
|
|
skill_key: str = Field(default="", validation_alias="skillKey")
|
|
primary_env: str | None = Field(default=None, validation_alias="primaryEnv")
|
|
emoji: str | None = None
|
|
homepage: str | None = None
|
|
always: bool = False
|
|
disabled: bool = False
|
|
blocked_by_allowlist: bool = Field(default=False, validation_alias="blockedByAllowlist")
|
|
eligible: bool = True
|
|
requirements: SkillRequirement = Field(default_factory=SkillRequirement)
|
|
missing: SkillRequirement = Field(default_factory=SkillRequirement, validation_alias="missing")
|
|
config_checks: list[dict[str, Any]] = Field(default_factory=list, validation_alias="configChecks")
|
|
install: list[dict[str, Any]] = Field(default_factory=list)
|
|
|
|
|
|
class SkillStatusReport(BaseModel):
|
|
"""Parsed response from `openclaw skills list --json`."""
|
|
|
|
workspace_dir: str = Field(default="", validation_alias="workspaceDir")
|
|
managed_skills_dir: str = Field(default="", validation_alias="managedSkillsDir")
|
|
skills: list[SkillStatusEntry] = Field(default_factory=list)
|
|
|
|
def __iter__(self):
|
|
return iter(self.skills)
|
|
|
|
def __len__(self):
|
|
return len(self.skills)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ModelRow(BaseModel):
|
|
"""A single model row from `openclaw models list --json`."""
|
|
|
|
key: str = ""
|
|
name: str = ""
|
|
input: str = ""
|
|
context_window: int | None = Field(default=None, validation_alias="contextWindow")
|
|
local: bool | None = None
|
|
available: bool | None = None
|
|
tags: list[str] = Field(default_factory=list)
|
|
missing: bool = False
|
|
|
|
|
|
class ModelsList(BaseModel):
|
|
"""Parsed response from `openclaw models list --json`."""
|
|
|
|
models: list[ModelRow] = Field(default_factory=list)
|
|
|
|
def __iter__(self):
|
|
return iter(self.models)
|
|
|
|
def __len__(self):
|
|
return len(self.models)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Normalization helpers (agents, skills, models)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def normalize_agents(raw: dict[str, Any] | None) -> AgentsList:
|
|
"""Parse raw agents list dict into an AgentsList."""
|
|
if not raw:
|
|
return AgentsList()
|
|
agents = raw.get("agents") or (raw if isinstance(raw, list) else [])
|
|
if isinstance(agents, list):
|
|
return AgentsList(
|
|
agents=[
|
|
AgentSummary.model_validate(a, strict=False) for a in agents if isinstance(a, dict)
|
|
]
|
|
)
|
|
return AgentsList()
|
|
|
|
|
|
def normalize_skills(raw: dict[str, Any] | None) -> SkillStatusReport:
|
|
"""Parse raw skills list dict into a SkillStatusReport."""
|
|
if not raw:
|
|
return SkillStatusReport()
|
|
skills = raw.get("skills") or []
|
|
workspace_dir = raw.get("workspaceDir", "")
|
|
managed_skills_dir = raw.get("managedSkillsDir", "")
|
|
parsed_skills = [
|
|
SkillStatusEntry.model_validate(s, strict=False) for s in skills if isinstance(s, dict)
|
|
]
|
|
return SkillStatusReport(
|
|
workspace_dir=workspace_dir,
|
|
managed_skills_dir=managed_skills_dir,
|
|
skills=parsed_skills,
|
|
)
|
|
|
|
|
|
def normalize_models(raw: dict[str, Any] | None) -> ModelsList:
|
|
"""Parse raw models list dict into a ModelsList."""
|
|
if not raw:
|
|
return ModelsList()
|
|
models = raw.get("models") or raw if isinstance(raw, list) else []
|
|
if isinstance(models, list):
|
|
return ModelsList(
|
|
models=[ModelRow.model_validate(m, strict=False) for m in models if isinstance(m, dict)]
|
|
)
|
|
return ModelsList()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hooks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class HookInstallOption(BaseModel):
|
|
id: str = ""
|
|
kind: str = ""
|
|
label: str = ""
|
|
bins: list[str] = Field(default_factory=list)
|
|
|
|
|
|
class HookStatusEntry(BaseModel):
|
|
name: str = ""
|
|
description: str = ""
|
|
source: str = ""
|
|
plugin_id: str | None = Field(default=None, validation_alias="pluginId")
|
|
file_path: str = Field(default="", validation_alias="filePath")
|
|
base_dir: str = Field(default="", validation_alias="baseDir")
|
|
handler_path: str = Field(default="", validation_alias="handlerPath")
|
|
hook_key: str = Field(default="", validation_alias="hookKey")
|
|
emoji: str | None = None
|
|
homepage: str | None = None
|
|
events: list[str] = Field(default_factory=list)
|
|
always: bool = False
|
|
enabled_by_config: bool = Field(default=False, validation_alias="enabledByConfig")
|
|
requirements_satisfied: bool = Field(default=False, validation_alias="requirementsSatisfied")
|
|
loadable: bool = False
|
|
blocked_reason: str | None = Field(default=None, validation_alias="blockedReason")
|
|
managed_by_plugin: bool = Field(default=False, validation_alias="managedByPlugin")
|
|
requirements: SkillRequirement = Field(default_factory=SkillRequirement)
|
|
missing: SkillRequirement = Field(default_factory=SkillRequirement)
|
|
config_checks: list[dict[str, Any]] = Field(default_factory=list, validation_alias="configChecks")
|
|
install: list[HookInstallOption] = Field(default_factory=list)
|
|
|
|
|
|
class HookStatusReport(BaseModel):
|
|
workspace_dir: str = Field(default="", validation_alias="workspaceDir")
|
|
managed_hooks_dir: str = Field(default="", validation_alias="managedHooksDir")
|
|
hooks: list[HookStatusEntry] = Field(default_factory=list)
|
|
|
|
def __iter__(self):
|
|
return iter(self.hooks)
|
|
|
|
def __len__(self):
|
|
return len(self.hooks)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugins
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class PluginRecord(BaseModel):
|
|
id: str = ""
|
|
name: str = ""
|
|
version: str | None = None
|
|
description: str | None = None
|
|
format: str | None = None
|
|
bundle_format: str | None = Field(default=None, validation_alias="bundleFormat")
|
|
bundle_capabilities: list[str] = Field(default_factory=list, validation_alias="bundleCapabilities")
|
|
kind: str | None = None
|
|
source: str = ""
|
|
root_dir: str | None = Field(default=None, validation_alias="rootDir")
|
|
origin: dict[str, Any] = Field(default_factory=dict)
|
|
workspace_dir: str | None = Field(default=None, validation_alias="workspaceDir")
|
|
enabled: bool = False
|
|
status: Literal["loaded", "disabled", "error"] = "disabled"
|
|
error: str | None = None
|
|
tool_names: list[str] = Field(default_factory=list, validation_alias="toolNames")
|
|
hook_names: list[str] = Field(default_factory=list, validation_alias="hookNames")
|
|
channel_ids: list[str] = Field(default_factory=list, validation_alias="channelIds")
|
|
provider_ids: list[str] = Field(default_factory=list, validation_alias="providerIds")
|
|
speech_provider_ids: list[str] = Field(default_factory=list, validation_alias="speechProviderIds")
|
|
media_understanding_provider_ids: list[str] = Field(default_factory=list, validation_alias="mediaUnderstandingProviderIds")
|
|
image_generation_provider_ids: list[str] = Field(default_factory=list, validation_alias="imageGenerationProviderIds")
|
|
web_search_provider_ids: list[str] = Field(default_factory=list, validation_alias="webSearchProviderIds")
|
|
gateway_methods: list[str] = Field(default_factory=list, validation_alias="gatewayMethods")
|
|
cli_commands: list[str] = Field(default_factory=list, validation_alias="cliCommands")
|
|
services: list[str] = Field(default_factory=list)
|
|
commands: list[str] = Field(default_factory=list)
|
|
http_routes: int = 0
|
|
hook_count: int = Field(default=0, validation_alias="hookCount")
|
|
config_schema: bool = False
|
|
|
|
|
|
class PluginDiagnostic(BaseModel):
|
|
id: str = ""
|
|
message: str = ""
|
|
|
|
|
|
class PluginsList(BaseModel):
|
|
workspace_dir: str = Field(default="", validation_alias="workspaceDir")
|
|
plugins: list[PluginRecord] = Field(default_factory=list)
|
|
diagnostics: list[PluginDiagnostic] = Field(default_factory=list)
|
|
|
|
def __iter__(self):
|
|
return iter(self.plugins)
|
|
|
|
def __len__(self):
|
|
return len(self.plugins)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Secrets
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SecretsAuditFinding(BaseModel):
|
|
code: str = ""
|
|
severity: Literal["info", "warn", "error"] = "info"
|
|
file: str = ""
|
|
json_path: str = Field(default="", validation_alias="jsonPath")
|
|
message: str = ""
|
|
provider: str | None = None
|
|
profile_id: str | None = Field(default=None, validation_alias="profileId")
|
|
|
|
|
|
class SecretsAuditSummary(BaseModel):
|
|
plaintext_count: int = Field(default=0, validation_alias="plaintextCount")
|
|
unresolved_ref_count: int = Field(default=0, validation_alias="unresolvedRefCount")
|
|
shadowed_ref_count: int = Field(default=0, validation_alias="shadowedRefCount")
|
|
legacy_residue_count: int = Field(default=0, validation_alias="legacyResidueCount")
|
|
|
|
|
|
class SecretsAuditResolution(BaseModel):
|
|
refs_checked: int = Field(default=0, validation_alias="refsChecked")
|
|
skipped_exec_refs: int = Field(default=0, validation_alias="skippedExecRefs")
|
|
resolvability_complete: bool = Field(default=False, validation_alias="resolvabilityComplete")
|
|
|
|
|
|
class SecretsAuditReport(BaseModel):
|
|
version: int = 1
|
|
status: Literal["clean", "findings", "unresolved"] = "clean"
|
|
resolution: SecretsAuditResolution = Field(default_factory=SecretsAuditResolution)
|
|
files_scanned: list[str] = Field(default_factory=list, validation_alias="filesScanned")
|
|
summary: SecretsAuditSummary = Field(default_factory=SecretsAuditSummary)
|
|
findings: list[SecretsAuditFinding] = Field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_raw(cls, raw: dict[str, Any] | None) -> "SecretsAuditReport":
|
|
if not raw:
|
|
return cls()
|
|
return cls.model_validate(raw, strict=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Security
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SecurityAuditFinding(BaseModel):
|
|
check_id: str = Field(default="", validation_alias="checkId")
|
|
severity: Literal["info", "warn", "critical"] = "info"
|
|
title: str = ""
|
|
detail: str = ""
|
|
remediation: str | None = None
|
|
|
|
|
|
class SecurityAuditSummary(BaseModel):
|
|
critical: int = 0
|
|
warn: int = 0
|
|
info: int = 0
|
|
|
|
|
|
class SecurityAuditGatewayResult(BaseModel):
|
|
attempted: bool = False
|
|
url: str | None = None
|
|
ok: bool = False
|
|
error: str | None = None
|
|
close: dict[str, Any] | None = None
|
|
|
|
|
|
class SecurityAuditDeep(BaseModel):
|
|
gateway: SecurityAuditGatewayResult | None = None
|
|
|
|
|
|
class SecurityAuditReport(BaseModel):
|
|
ts: int = 0
|
|
summary: SecurityAuditSummary = Field(default_factory=SecurityAuditSummary)
|
|
findings: list[SecurityAuditFinding] = Field(default_factory=list)
|
|
deep: SecurityAuditDeep | None = None
|
|
|
|
|
|
class SecurityAuditResponse(BaseModel):
|
|
fix: dict[str, Any] | None = None
|
|
report: SecurityAuditReport | None = None
|
|
secret_diagnostics: list[str] = Field(default_factory=list, validation_alias="secretDiagnostics")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Daemon
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class DaemonServiceCommand(BaseModel):
|
|
program_arguments: list[str] = Field(default_factory=list, validation_alias="programArguments")
|
|
working_directory: str | None = Field(default=None, validation_alias="workingDirectory")
|
|
environment: dict[str, str] | None = None
|
|
source_path: str | None = Field(default=None, validation_alias="sourcePath")
|
|
|
|
|
|
class DaemonServiceInfo(BaseModel):
|
|
label: str = ""
|
|
loaded: bool = False
|
|
loaded_text: str = Field(default="", validation_alias="loadedText")
|
|
not_loaded_text: str = Field(default="", validation_alias="notLoadedText")
|
|
command: DaemonServiceCommand | None = None
|
|
runtime: dict[str, Any] | None = None
|
|
config_audit: dict[str, Any] | None = Field(default=None, validation_alias="configAudit")
|
|
|
|
|
|
class DaemonPortListener(BaseModel):
|
|
port: int = 0
|
|
status: str = ""
|
|
type: str = ""
|
|
|
|
|
|
class DaemonPortInfo(BaseModel):
|
|
port: int = 0
|
|
status: str = ""
|
|
listeners: list[DaemonPortListener] = Field(default_factory=list)
|
|
hints: list[str] = Field(default_factory=list)
|
|
|
|
|
|
class DaemonRpcInfo(BaseModel):
|
|
ok: bool = False
|
|
error: str | None = None
|
|
url: str | None = None
|
|
auth_warning: str | None = Field(default=None, validation_alias="authWarning")
|
|
|
|
|
|
class DaemonHealthInfo(BaseModel):
|
|
healthy: bool = False
|
|
stale_gateway_pids: list[int] = Field(default_factory=list, validation_alias="staleGatewayPids")
|
|
|
|
|
|
class DaemonExtraService(BaseModel):
|
|
label: str = ""
|
|
detail: str = ""
|
|
scope: str = ""
|
|
|
|
|
|
class DaemonStatus(BaseModel):
|
|
service: DaemonServiceInfo | None = None
|
|
config: dict[str, Any] | None = None
|
|
gateway: dict[str, Any] | None = None
|
|
port: DaemonPortInfo | None = None
|
|
port_cli: DaemonPortInfo | None = Field(default=None, validation_alias="portCli")
|
|
last_error: str | None = Field(default=None, validation_alias="lastError")
|
|
rpc: DaemonRpcInfo | None = None
|
|
health: DaemonHealthInfo | None = None
|
|
extra_services: list[DaemonExtraService] = Field(default_factory=list, validation_alias="extraServices")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# QR / Pairing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class PairingRequest(BaseModel):
|
|
code: str = ""
|
|
id: str = ""
|
|
meta: dict[str, Any] | None = None
|
|
created_at: str = Field(default="", validation_alias="createdAt")
|
|
|
|
|
|
class PairingListResponse(BaseModel):
|
|
channel: str = ""
|
|
requests: list[PairingRequest] = Field(default_factory=list)
|
|
|
|
|
|
class QrCodeResponse(BaseModel):
|
|
setup_code: str = Field(default="", validation_alias="setupCode")
|
|
gateway_url: str = Field(default="", validation_alias="gatewayUrl")
|
|
auth: Literal["token", "password"] = "token"
|
|
url_source: str = Field(default="", validation_alias="urlSource")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Update status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class UpdateGitStatus(BaseModel):
|
|
root: str = ""
|
|
sha: str | None = None
|
|
tag: str | None = None
|
|
branch: str | None = None
|
|
upstream: str | None = None
|
|
dirty: bool | None = None
|
|
ahead: int | None = None
|
|
behind: int | None = None
|
|
fetch_ok: bool = Field(default=False, validation_alias="fetchOk")
|
|
error: str | None = None
|
|
|
|
|
|
class UpdateDepsStatus(BaseModel):
|
|
manager: str = ""
|
|
status: Literal["ok", "missing", "stale", "unknown"] = "unknown"
|
|
lockfile_path: str | None = Field(default=None, validation_alias="lockfilePath")
|
|
marker_path: str | None = Field(default=None, validation_alias="markerPath")
|
|
reason: str | None = None
|
|
|
|
|
|
class UpdateRegistryStatus(BaseModel):
|
|
latest_version: str | None = Field(default=None, validation_alias="latestVersion")
|
|
error: str | None = None
|
|
|
|
|
|
class UpdateCheckResult(BaseModel):
|
|
root: str | None = None
|
|
install_kind: Literal["git", "package", "unknown"] = "unknown"
|
|
package_manager: str = ""
|
|
git: UpdateGitStatus | None = None
|
|
deps: UpdateDepsStatus | None = None
|
|
registry: UpdateRegistryStatus | None = None
|
|
|
|
|
|
class UpdateChannelInfo2(BaseModel):
|
|
value: str | None = None
|
|
source: str = ""
|
|
label: str = ""
|
|
config: str | None = None
|
|
|
|
|
|
class UpdateStatusResponse(BaseModel):
|
|
update: UpdateCheckResult | None = None
|
|
channel: UpdateChannelInfo2 | None = None
|
|
availability: dict[str, Any] | None = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Models aliases / fallbacks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ModelAliasEntry(BaseModel):
|
|
alias: str = ""
|
|
target: str = ""
|
|
|
|
|
|
class ModelAliasesList(BaseModel):
|
|
aliases: dict[str, str] = Field(default_factory=dict)
|
|
|
|
def items_list(self) -> list[ModelAliasEntry]:
|
|
return [ModelAliasEntry(alias=k, target=v) for k, v in self.aliases.items()]
|
|
|
|
|
|
class ModelFallbackItem(BaseModel):
|
|
raw: str = ""
|
|
provider: str = ""
|
|
model: str = ""
|
|
|
|
|
|
class ModelFallbacksList(BaseModel):
|
|
key: str = ""
|
|
label: str = ""
|
|
items: list[ModelFallbackItem] = Field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Skills update result
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class SkillUpdateResult(BaseModel):
|
|
ok: bool = False
|
|
slug: str = ""
|
|
previous_version: str | None = Field(default=None, validation_alias="previousVersion")
|
|
version: str = ""
|
|
changed: bool = False
|
|
target_dir: str = Field(default="", validation_alias="targetDir")
|
|
error: str | None = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Normalization helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def normalize_hooks(raw: dict[str, Any] | None) -> HookStatusReport:
|
|
if not raw:
|
|
return HookStatusReport()
|
|
hooks = raw.get("hooks") or []
|
|
workspace_dir = raw.get("workspaceDir", "")
|
|
managed_hooks_dir = raw.get("managedHooksDir", "")
|
|
parsed = [
|
|
HookStatusEntry.model_validate(h, strict=False) for h in hooks if isinstance(h, dict)
|
|
]
|
|
return HookStatusReport(
|
|
workspace_dir=workspace_dir,
|
|
managed_hooks_dir=managed_hooks_dir,
|
|
hooks=parsed,
|
|
)
|
|
|
|
|
|
def normalize_plugins(raw: dict[str, Any] | None) -> PluginsList:
|
|
if not raw:
|
|
return PluginsList()
|
|
plugins = raw.get("plugins") or []
|
|
diagnostics = raw.get("diagnostics") or []
|
|
return PluginsList(
|
|
workspace_dir=raw.get("workspaceDir", ""),
|
|
plugins=[PluginRecord.model_validate(p, strict=False) for p in plugins if isinstance(p, dict)],
|
|
diagnostics=[PluginDiagnostic.model_validate(d, strict=False) for d in diagnostics if isinstance(d, dict)],
|
|
)
|
|
|
|
|
|
def normalize_secrets_audit(raw: dict[str, Any] | None) -> SecretsAuditReport:
|
|
return SecretsAuditReport.from_raw(raw)
|
|
|
|
|
|
def normalize_security_audit(raw: dict[str, Any] | None) -> SecurityAuditResponse:
|
|
if not raw:
|
|
return SecurityAuditResponse()
|
|
return SecurityAuditResponse.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_daemon_status(raw: dict[str, Any] | None) -> DaemonStatus:
|
|
if not raw:
|
|
return DaemonStatus()
|
|
return DaemonStatus.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_pairing(raw: dict[str, Any] | None) -> PairingListResponse:
|
|
if not raw:
|
|
return PairingListResponse()
|
|
return PairingListResponse.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_qr(raw: dict[str, Any] | None) -> QrCodeResponse:
|
|
if not raw:
|
|
return QrCodeResponse()
|
|
return QrCodeResponse.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_update_status(raw: dict[str, Any] | None) -> UpdateStatusResponse:
|
|
if not raw:
|
|
return UpdateStatusResponse()
|
|
return UpdateStatusResponse.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_model_aliases(raw: dict[str, Any] | None) -> ModelAliasesList:
|
|
if not raw:
|
|
return ModelAliasesList()
|
|
return ModelAliasesList.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_model_fallbacks(raw: dict[str, Any] | None) -> ModelFallbacksList:
|
|
if not raw:
|
|
return ModelFallbacksList()
|
|
return ModelFallbacksList.model_validate(raw, strict=False)
|
|
|
|
|
|
def normalize_skill_update(raw: dict[str, Any] | None) -> SkillUpdateResult:
|
|
if not raw:
|
|
return SkillUpdateResult()
|
|
return SkillUpdateResult.model_validate(raw, strict=False)
|