Files
evotraders/shared/models/openclaw.py
2026-03-30 17:46:44 +08:00

1104 lines
38 KiB
Python

# -*- coding: utf-8 -*-
"""Stable Pydantic models for OpenClaw CLI output.
These models normalize the raw JSON from `openclaw status --json`,
`sessions --json`, `cron list --json`, and `approvals get --json`
into a consistent, well-typed structure with safe accessors.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Literal
from pydantic import AliasChoices, BaseModel, Field
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
class HeartbeatAgentStatus(BaseModel):
"""Heartbeat status for a single agent."""
agent_id: str
enabled: bool
every: str = ""
every_ms: int | None = None
class HeartbeatStatus(BaseModel):
"""Heartbeat section of status output."""
default_agent_id: str
agents: list[HeartbeatAgentStatus] = Field(default_factory=list)
class LinkChannel(BaseModel):
"""Linked channel info in status output."""
id: str
label: str
linked: bool
auth_age_ms: int | None = None
class SessionDefaults(BaseModel):
"""Session defaults in status output."""
model: str | None = None
context_tokens: int | None = None
class SessionStatus(BaseModel):
"""A single session's live status fields."""
key: str
kind: Literal["direct", "group", "global", "unknown"] = "unknown"
agent_id: str | None = None
session_id: str | None = None
updated_at: int | None = Field(default=None, validation_alias="updatedAt")
age_ms: int | None = Field(default=None, validation_alias="ageMs")
thinking_level: str | None = Field(default=None)
fast_mode: bool | None = None
verbose_level: str | None = None
reasoning_level: str | None = None
elevated_level: str | None = None
system_sent: bool | None = None
aborted_last_run: bool | None = None
input_tokens: int | None = None
output_tokens: int | None = None
total_tokens: int | None = None
total_tokens_fresh: bool | None = None
cache_read: int | None = None
cache_write: int | None = None
remaining_tokens: int | None = None
percent_used: float | None = None
model: str | None = None
context_tokens: int | None = None
flags: list[str] = Field(default_factory=list)
@property
def updated_at_dt(self) -> datetime | None:
if self.updated_at is None:
return None
return datetime.fromtimestamp(self.updated_at / 1000, tz=timezone.utc)
@property
def age_str(self) -> str:
if self.age_ms is None:
return "?"
seconds = int(self.age_ms / 1000)
if seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
if minutes < 60:
return f"{minutes}m"
hours = minutes // 60
if hours < 24:
return f"{hours}h"
days = hours // 24
return f"{days}d"
class SessionsStatusGroup(BaseModel):
"""Sessions grouped by agent."""
agent_id: str
path: str
count: int
recent: list[SessionStatus] = Field(default_factory=list)
class SessionsStatus(BaseModel):
"""Sessions section of status output."""
paths: list[str] = Field(default_factory=list)
count: int = 0
defaults: SessionDefaults = Field(default_factory=SessionDefaults)
recent: list[SessionStatus] = Field(default_factory=list)
by_agent: list[SessionsStatusGroup] = Field(default_factory=list)
class UpdateChannelInfo(BaseModel):
"""Update channel info."""
channel: str | None = None
source: str | None = None
class UpdateInfo(BaseModel):
"""Update info in status output."""
install_kind: str | None = Field(default=None, validation_alias="installKind")
git_tag: str | None = Field(default=None, validation_alias="gitTag")
git_branch: str | None = Field(default=None, validation_alias="gitBranch")
class OsSummary(BaseModel):
"""OS summary in status output."""
os: str | None = None
arch: str | None = None
hostname: str | None = None
class MemoryInfo(BaseModel):
"""Memory info in status output."""
used_mb: float | None = Field(default=None, validation_alias="usedMb")
total_mb: float | None = None
percent_used: float | None = None
class SecretDiagnostics(BaseModel):
"""Secret diagnostics summary."""
total: int = 0
providers: dict[str, int] = Field(default_factory=dict)
class GatewayStatus(BaseModel):
"""Gateway connection status."""
mode: str | None = None
url: str | None = None
url_source: str | None = Field(default=None, validation_alias="urlSource")
misconfigured: bool | None = None
reachable: bool | None = None
connect_latency_ms: int | None = Field(default=None, validation_alias="connectLatencyMs")
self_: str | None = Field(default=None, validation_alias="self")
error: str | None = None
auth_warning: str | None = Field(default=None, validation_alias="authWarning")
class AgentHealth(BaseModel):
"""Per-agent health status."""
agent_id: str | None = Field(default=None, validation_alias="agentId")
status: str | None = None
model: str | None = None
error: str | None = None
class OpenClawStatus(BaseModel):
"""Complete parsed output of `openclaw status --json`.
All fields are optional so that partial/incomplete responses
from older CLI versions still parse cleanly.
"""
runtime_version: str | None = Field(default=None, validation_alias="runtimeVersion")
link_channel: LinkChannel | None = Field(default=None, validation_alias="linkChannel")
heartbeat: HeartbeatStatus | None = None
channel_summary: list[str] = Field(default_factory=list)
queued_system_events: list[str] = Field(default_factory=list)
sessions: SessionsStatus | None = None
os: OsSummary | None = None
update: UpdateInfo | None = None
update_channel: UpdateChannelInfo | None = Field(default=None, validation_alias="updateChannel")
memory: MemoryInfo | None = None
memory_plugin: str | None = Field(default=None, validation_alias="memoryPlugin")
gateway: GatewayStatus | None = None
gateway_service: dict[str, Any] | None = Field(default=None, validation_alias="gatewayService")
node_service: dict[str, Any] | None = Field(default=None, validation_alias="nodeService")
agents: list[AgentHealth] = Field(default_factory=list)
secret_diagnostics: SecretDiagnostics | None = Field(default=None, validation_alias="secretDiagnostics")
health: dict[str, Any] | None = None
usage: dict[str, Any] | None = None
last_heartbeat: dict[str, Any] | None = Field(default=None, validation_alias="lastHeartbeat")
# ---------------------------------------------------------------------------
# Sessions
# ---------------------------------------------------------------------------
class SessionEntry(BaseModel):
"""A single session entry from `openclaw sessions --json`."""
key: str = Field(validation_alias="key")
session_key: str | None = Field(default=None, validation_alias="sessionKey")
session_id: str | None = Field(default=None, validation_alias="sessionId")
updated_at: int | None = Field(default=None, validation_alias="updatedAt")
age_ms: int | None = Field(default=None, validation_alias="ageMs")
model: str | None = None
model_provider: str | None = Field(default=None, validation_alias="modelProvider")
kind: Literal["direct", "group", "global", "unknown"] = "unknown"
agent_id: str | None = Field(default=None, validation_alias="agentId")
flags: list[str] = Field(default_factory=list)
@property
def resolved_key(self) -> str:
return self.session_key or self.key
@property
def updated_at_dt(self) -> datetime | None:
if self.updated_at is None:
return None
return datetime.fromtimestamp(self.updated_at / 1000, tz=timezone.utc)
@property
def age_str(self) -> str:
if self.age_ms is None:
return "?"
seconds = int(self.age_ms / 1000)
if seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
if minutes < 60:
return f"{minutes}m"
hours = minutes // 60
if hours < 24:
return f"{hours}h"
days = hours // 24
return f"{days}d"
class SessionHistoryEvent(BaseModel):
"""A single event within a session history."""
ts: int | None = None
type: str = ""
role: str | None = None
content: str | None = None
model: str | None = None
tokens: int | None = None
channel_id: str | None = Field(default=None, validation_alias="channelId")
@property
def ts_dt(self) -> datetime | None:
if self.ts is None:
return None
return datetime.fromtimestamp(self.ts / 1000, tz=timezone.utc)
class SessionHistory(BaseModel):
"""Session history response from `openclaw sessions history <key> --json`."""
session_key: str = Field(validation_alias="sessionKey")
session_id: str | None = Field(default=None, validation_alias="sessionId")
events: list[SessionHistoryEvent] = Field(default_factory=list)
raw_text: str | None = Field(default=None, validation_alias="rawText")
class SessionsList(BaseModel):
"""Parsed response from `openclaw sessions --json`."""
sessions: list[SessionEntry] = Field(default_factory=list)
def get(self, key: str) -> SessionEntry | None:
for s in self.sessions:
if s.key == key or s.session_key == key:
return s
return None
# ---------------------------------------------------------------------------
# Cron
# ---------------------------------------------------------------------------
class CronJob(BaseModel):
"""A single cron job from `openclaw cron list --json`."""
id: str | None = None
label: str | None = None
schedule: str | None = None
cron: str | None = None
command: str | None = None
agent_id: str | None = Field(default=None, validation_alias="agentId")
enabled: bool = True
status: str | None = None
next_run_at: int | None = Field(default=None, validation_alias="nextRunAt")
last_run_at: int | None = Field(default=None, validation_alias="lastRunAt")
last_exit_code: int | None = Field(default=None, validation_alias="lastExitCode")
@property
def display_schedule(self) -> str:
return self.schedule or self.cron or "?"
@property
def next_run_dt(self) -> datetime | None:
if self.next_run_at is None:
return None
return datetime.fromtimestamp(self.next_run_at / 1000, tz=timezone.utc)
class CronList(BaseModel):
"""Parsed response from `openclaw cron list --json`."""
cron: list[CronJob] = Field(default_factory=list)
jobs: list[CronJob] = Field(default_factory=list)
@classmethod
def from_raw(cls, raw: dict[str, Any] | None) -> "CronList":
if not raw:
return cls()
raw_jobs: list[dict[str, Any]] = raw.get("cron") or raw.get("jobs") or []
jobs = [CronJob.model_validate(j, strict=False) if isinstance(j, dict) else j for j in raw_jobs]
return cls(cron=jobs, jobs=jobs)
def __iter__(self):
return iter(self.cron)
def __len__(self):
return len(self.cron)
# ---------------------------------------------------------------------------
# Approvals
# ---------------------------------------------------------------------------
class ExecApprovalToolInput(BaseModel):
"""Structured tool_input from an approval request."""
command: str | None = None
command_preview: str | None = Field(default=None, validation_alias="commandPreview")
cwd: str | None = None
env_keys: list[str] = Field(default_factory=list, validation_alias="envKeys")
argv: list[str] = Field(default_factory=list)
class ExecApprovalFinding(BaseModel):
"""A security finding attached to an approval request."""
severity: Literal["critical", "high", "medium", "low", "info"]
message: str
check: str | None = None
class ApprovalRequest(BaseModel):
"""A single approval request from `openclaw approvals get --json`."""
approval_id: str = Field(validation_alias="approvalId")
tool_name: str = Field(validation_alias="toolName")
status: Literal["pending", "approved", "denied", "expired"] = "pending"
agent_id: str | None = Field(default=None, validation_alias="agentId")
workspace_id: str | None = Field(default=None, validation_alias="workspaceId")
session_id: str | None = Field(default=None, validation_alias="sessionId")
created_at: int | None = Field(default=None, validation_alias="createdAt")
expires_at: int | None = Field(default=None, validation_alias="expiresAt")
tool_input: ExecApprovalToolInput | None = Field(default=None, validation_alias="toolInput")
findings: list[ExecApprovalFinding] = Field(default_factory=list)
@property
def created_at_dt(self) -> datetime | None:
if self.created_at is None:
return None
return datetime.fromtimestamp(self.created_at / 1000, tz=timezone.utc)
@property
def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.now(timezone.utc).timestamp() * 1000 > self.expires_at
class ApprovalsList(BaseModel):
"""Parsed response from `openclaw approvals get --json`."""
approvals: list[ApprovalRequest] = Field(default_factory=list)
pending: list[ApprovalRequest] = Field(default_factory=list)
@classmethod
def from_raw(cls, raw: dict[str, Any] | None) -> "ApprovalsList":
if not raw:
return cls()
raw_approvals: list[dict[str, Any]] = raw.get("approvals") or []
approvals = [
ApprovalRequest.model_validate(a, strict=False) if isinstance(a, dict) else a
for a in raw_approvals
]
pending = [a for a in approvals if a.status == "pending" and not a.is_expired]
return cls(approvals=approvals, pending=pending)
def __iter__(self):
return iter(self.approvals)
def __len__(self):
return len(self.approvals)
# ---------------------------------------------------------------------------
# Normalization helpers
# ---------------------------------------------------------------------------
def normalize_status(raw: dict[str, Any] | None) -> OpenClawStatus:
"""Parse raw status dict into a strongly-typed OpenClawStatus.
Silently ignores unknown fields so schema changes in newer CLI
versions don't break existing callers.
"""
if not raw:
return OpenClawStatus()
return OpenClawStatus.model_validate(raw, strict=False)
def normalize_sessions(raw: dict[str, Any] | None) -> SessionsList:
"""Parse raw sessions dict into a strongly-typed SessionsList."""
if not raw:
return SessionsList()
sessions = raw.get("sessions") or []
return SessionsList(
sessions=[
SessionEntry.model_validate(s, strict=False) for s in sessions if isinstance(s, dict)
]
)
def normalize_session_history(
raw: dict[str, Any] | None,
session_key: str = "",
) -> SessionHistory:
"""Parse raw session history dict into a SessionHistory."""
if not raw:
return SessionHistory(session_key=session_key, events=[])
history = raw.get("history") or []
return SessionHistory.model_validate(
{
**raw,
"sessionKey": raw.get("sessionKey", session_key),
"events": [
SessionHistoryEvent.model_validate(e, strict=False)
for e in history
if isinstance(e, dict)
],
},
strict=False,
)
def normalize_cron_jobs(raw: dict[str, Any] | None) -> CronList:
"""Parse raw cron list dict into a CronList."""
return CronList.from_raw(raw)
def normalize_approvals(raw: dict[str, Any] | None) -> ApprovalsList:
"""Parse raw approvals dict into an ApprovalsList."""
return ApprovalsList.from_raw(raw)
# ---------------------------------------------------------------------------
# Agents
# ---------------------------------------------------------------------------
class AgentSummary(BaseModel):
"""A single agent summary from `openclaw agents list --json`."""
id: str
name: str | None = None
identity_name: str | None = Field(default=None, validation_alias=AliasChoices("identityName", "identity_name"))
identity_emoji: str | None = Field(default=None, validation_alias=AliasChoices("identityEmoji", "identity_emoji"))
identity_source: Literal["identity", "config"] | None = Field(default=None, validation_alias=AliasChoices("identitySource", "identity_source"))
workspace: str = ""
agent_dir: str = Field(default="", validation_alias=AliasChoices("agentDir", "agent_dir"))
model: str | None = None
bindings: int = 0
binding_details: list[str] = Field(default_factory=list, validation_alias=AliasChoices("bindingDetails", "binding_details"))
routes: list[str] = Field(default_factory=list)
providers: list[str] = Field(default_factory=list)
is_default: bool = Field(default=False, validation_alias=AliasChoices("isDefault", "is_default"))
class AgentsList(BaseModel):
"""Parsed response from `openclaw agents list --json`."""
agents: list[AgentSummary] = Field(default_factory=list)
def __iter__(self):
return iter(self.agents)
def __len__(self):
return len(self.agents)
# ---------------------------------------------------------------------------
# Skills
# ---------------------------------------------------------------------------
class SkillRequirement(BaseModel):
"""Requirements entry for a skill."""
bins: list[str] = Field(default_factory=list)
any_bins: list[str] = Field(default_factory=list, validation_alias="anyBins")
env: list[str] = Field(default_factory=list)
config: list[str] = Field(default_factory=list)
os: list[str] = Field(default_factory=list)
class SkillStatusEntry(BaseModel):
"""A single skill entry from `openclaw skills list --json`."""
name: str = ""
description: str = ""
source: str = ""
bundled: bool = False
file_path: str = Field(default="", validation_alias="filePath")
base_dir: str = Field(default="", validation_alias="baseDir")
skill_key: str = Field(default="", validation_alias="skillKey")
primary_env: str | None = Field(default=None, validation_alias="primaryEnv")
emoji: str | None = None
homepage: str | None = None
always: bool = False
disabled: bool = False
blocked_by_allowlist: bool = Field(default=False, validation_alias="blockedByAllowlist")
eligible: bool = True
requirements: SkillRequirement = Field(default_factory=SkillRequirement)
missing: SkillRequirement = Field(default_factory=SkillRequirement, validation_alias="missing")
config_checks: list[dict[str, Any]] = Field(default_factory=list, validation_alias="configChecks")
install: list[dict[str, Any]] = Field(default_factory=list)
class SkillStatusReport(BaseModel):
"""Parsed response from `openclaw skills list --json`."""
workspace_dir: str = Field(default="", validation_alias="workspaceDir")
managed_skills_dir: str = Field(default="", validation_alias="managedSkillsDir")
skills: list[SkillStatusEntry] = Field(default_factory=list)
def __iter__(self):
return iter(self.skills)
def __len__(self):
return len(self.skills)
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class ModelRow(BaseModel):
"""A single model row from `openclaw models list --json`."""
key: str = ""
name: str = ""
input: str = ""
context_window: int | None = Field(default=None, validation_alias="contextWindow")
local: bool | None = None
available: bool | None = None
tags: list[str] = Field(default_factory=list)
missing: bool = False
class ModelsList(BaseModel):
"""Parsed response from `openclaw models list --json`."""
models: list[ModelRow] = Field(default_factory=list)
def __iter__(self):
return iter(self.models)
def __len__(self):
return len(self.models)
# ---------------------------------------------------------------------------
# Normalization helpers (agents, skills, models)
# ---------------------------------------------------------------------------
def normalize_agents(raw: dict[str, Any] | None) -> AgentsList:
"""Parse raw agents list dict into an AgentsList."""
if not raw:
return AgentsList()
agents = raw.get("agents") or (raw if isinstance(raw, list) else [])
if isinstance(agents, list):
return AgentsList(
agents=[
AgentSummary.model_validate(a, strict=False) for a in agents if isinstance(a, dict)
]
)
return AgentsList()
def normalize_skills(raw: dict[str, Any] | None) -> SkillStatusReport:
"""Parse raw skills list dict into a SkillStatusReport."""
if not raw:
return SkillStatusReport()
skills = raw.get("skills") or []
workspace_dir = raw.get("workspaceDir", "")
managed_skills_dir = raw.get("managedSkillsDir", "")
parsed_skills = [
SkillStatusEntry.model_validate(s, strict=False) for s in skills if isinstance(s, dict)
]
return SkillStatusReport(
workspace_dir=workspace_dir,
managed_skills_dir=managed_skills_dir,
skills=parsed_skills,
)
def normalize_models(raw: dict[str, Any] | None) -> ModelsList:
"""Parse raw models list dict into a ModelsList."""
if not raw:
return ModelsList()
models = raw.get("models") or raw if isinstance(raw, list) else []
if isinstance(models, list):
return ModelsList(
models=[ModelRow.model_validate(m, strict=False) for m in models if isinstance(m, dict)]
)
return ModelsList()
# ---------------------------------------------------------------------------
# Hooks
# ---------------------------------------------------------------------------
class HookInstallOption(BaseModel):
id: str = ""
kind: str = ""
label: str = ""
bins: list[str] = Field(default_factory=list)
class HookStatusEntry(BaseModel):
name: str = ""
description: str = ""
source: str = ""
plugin_id: str | None = Field(default=None, validation_alias="pluginId")
file_path: str = Field(default="", validation_alias="filePath")
base_dir: str = Field(default="", validation_alias="baseDir")
handler_path: str = Field(default="", validation_alias="handlerPath")
hook_key: str = Field(default="", validation_alias="hookKey")
emoji: str | None = None
homepage: str | None = None
events: list[str] = Field(default_factory=list)
always: bool = False
enabled_by_config: bool = Field(default=False, validation_alias="enabledByConfig")
requirements_satisfied: bool = Field(default=False, validation_alias="requirementsSatisfied")
loadable: bool = False
blocked_reason: str | None = Field(default=None, validation_alias="blockedReason")
managed_by_plugin: bool = Field(default=False, validation_alias="managedByPlugin")
requirements: SkillRequirement = Field(default_factory=SkillRequirement)
missing: SkillRequirement = Field(default_factory=SkillRequirement)
config_checks: list[dict[str, Any]] = Field(default_factory=list, validation_alias="configChecks")
install: list[HookInstallOption] = Field(default_factory=list)
class HookStatusReport(BaseModel):
workspace_dir: str = Field(default="", validation_alias="workspaceDir")
managed_hooks_dir: str = Field(default="", validation_alias="managedHooksDir")
hooks: list[HookStatusEntry] = Field(default_factory=list)
def __iter__(self):
return iter(self.hooks)
def __len__(self):
return len(self.hooks)
# ---------------------------------------------------------------------------
# Plugins
# ---------------------------------------------------------------------------
class PluginRecord(BaseModel):
id: str = ""
name: str = ""
version: str | None = None
description: str | None = None
format: str | None = None
bundle_format: str | None = Field(default=None, validation_alias="bundleFormat")
bundle_capabilities: list[str] = Field(default_factory=list, validation_alias="bundleCapabilities")
kind: str | None = None
source: str = ""
root_dir: str | None = Field(default=None, validation_alias="rootDir")
origin: dict[str, Any] = Field(default_factory=dict)
workspace_dir: str | None = Field(default=None, validation_alias="workspaceDir")
enabled: bool = False
status: Literal["loaded", "disabled", "error"] = "disabled"
error: str | None = None
tool_names: list[str] = Field(default_factory=list, validation_alias="toolNames")
hook_names: list[str] = Field(default_factory=list, validation_alias="hookNames")
channel_ids: list[str] = Field(default_factory=list, validation_alias="channelIds")
provider_ids: list[str] = Field(default_factory=list, validation_alias="providerIds")
speech_provider_ids: list[str] = Field(default_factory=list, validation_alias="speechProviderIds")
media_understanding_provider_ids: list[str] = Field(default_factory=list, validation_alias="mediaUnderstandingProviderIds")
image_generation_provider_ids: list[str] = Field(default_factory=list, validation_alias="imageGenerationProviderIds")
web_search_provider_ids: list[str] = Field(default_factory=list, validation_alias="webSearchProviderIds")
gateway_methods: list[str] = Field(default_factory=list, validation_alias="gatewayMethods")
cli_commands: list[str] = Field(default_factory=list, validation_alias="cliCommands")
services: list[str] = Field(default_factory=list)
commands: list[str] = Field(default_factory=list)
http_routes: int = 0
hook_count: int = Field(default=0, validation_alias="hookCount")
config_schema: bool = False
class PluginDiagnostic(BaseModel):
id: str = ""
message: str = ""
class PluginsList(BaseModel):
workspace_dir: str = Field(default="", validation_alias="workspaceDir")
plugins: list[PluginRecord] = Field(default_factory=list)
diagnostics: list[PluginDiagnostic] = Field(default_factory=list)
def __iter__(self):
return iter(self.plugins)
def __len__(self):
return len(self.plugins)
# ---------------------------------------------------------------------------
# Secrets
# ---------------------------------------------------------------------------
class SecretsAuditFinding(BaseModel):
code: str = ""
severity: Literal["info", "warn", "error"] = "info"
file: str = ""
json_path: str = Field(default="", validation_alias="jsonPath")
message: str = ""
provider: str | None = None
profile_id: str | None = Field(default=None, validation_alias="profileId")
class SecretsAuditSummary(BaseModel):
plaintext_count: int = Field(default=0, validation_alias="plaintextCount")
unresolved_ref_count: int = Field(default=0, validation_alias="unresolvedRefCount")
shadowed_ref_count: int = Field(default=0, validation_alias="shadowedRefCount")
legacy_residue_count: int = Field(default=0, validation_alias="legacyResidueCount")
class SecretsAuditResolution(BaseModel):
refs_checked: int = Field(default=0, validation_alias="refsChecked")
skipped_exec_refs: int = Field(default=0, validation_alias="skippedExecRefs")
resolvability_complete: bool = Field(default=False, validation_alias="resolvabilityComplete")
class SecretsAuditReport(BaseModel):
version: int = 1
status: Literal["clean", "findings", "unresolved"] = "clean"
resolution: SecretsAuditResolution = Field(default_factory=SecretsAuditResolution)
files_scanned: list[str] = Field(default_factory=list, validation_alias="filesScanned")
summary: SecretsAuditSummary = Field(default_factory=SecretsAuditSummary)
findings: list[SecretsAuditFinding] = Field(default_factory=list)
@classmethod
def from_raw(cls, raw: dict[str, Any] | None) -> "SecretsAuditReport":
if not raw:
return cls()
return cls.model_validate(raw, strict=False)
# ---------------------------------------------------------------------------
# Security
# ---------------------------------------------------------------------------
class SecurityAuditFinding(BaseModel):
check_id: str = Field(default="", validation_alias="checkId")
severity: Literal["info", "warn", "critical"] = "info"
title: str = ""
detail: str = ""
remediation: str | None = None
class SecurityAuditSummary(BaseModel):
critical: int = 0
warn: int = 0
info: int = 0
class SecurityAuditGatewayResult(BaseModel):
attempted: bool = False
url: str | None = None
ok: bool = False
error: str | None = None
close: dict[str, Any] | None = None
class SecurityAuditDeep(BaseModel):
gateway: SecurityAuditGatewayResult | None = None
class SecurityAuditReport(BaseModel):
ts: int = 0
summary: SecurityAuditSummary = Field(default_factory=SecurityAuditSummary)
findings: list[SecurityAuditFinding] = Field(default_factory=list)
deep: SecurityAuditDeep | None = None
class SecurityAuditResponse(BaseModel):
fix: dict[str, Any] | None = None
report: SecurityAuditReport | None = None
secret_diagnostics: list[str] = Field(default_factory=list, validation_alias="secretDiagnostics")
# ---------------------------------------------------------------------------
# Daemon
# ---------------------------------------------------------------------------
class DaemonServiceCommand(BaseModel):
program_arguments: list[str] = Field(default_factory=list, validation_alias="programArguments")
working_directory: str | None = Field(default=None, validation_alias="workingDirectory")
environment: dict[str, str] | None = None
source_path: str | None = Field(default=None, validation_alias="sourcePath")
class DaemonServiceInfo(BaseModel):
label: str = ""
loaded: bool = False
loaded_text: str = Field(default="", validation_alias="loadedText")
not_loaded_text: str = Field(default="", validation_alias="notLoadedText")
command: DaemonServiceCommand | None = None
runtime: dict[str, Any] | None = None
config_audit: dict[str, Any] | None = Field(default=None, validation_alias="configAudit")
class DaemonPortListener(BaseModel):
port: int = 0
status: str = ""
type: str = ""
class DaemonPortInfo(BaseModel):
port: int = 0
status: str = ""
listeners: list[DaemonPortListener] = Field(default_factory=list)
hints: list[str] = Field(default_factory=list)
class DaemonRpcInfo(BaseModel):
ok: bool = False
error: str | None = None
url: str | None = None
auth_warning: str | None = Field(default=None, validation_alias="authWarning")
class DaemonHealthInfo(BaseModel):
healthy: bool = False
stale_gateway_pids: list[int] = Field(default_factory=list, validation_alias="staleGatewayPids")
class DaemonExtraService(BaseModel):
label: str = ""
detail: str = ""
scope: str = ""
class DaemonStatus(BaseModel):
service: DaemonServiceInfo | None = None
config: dict[str, Any] | None = None
gateway: dict[str, Any] | None = None
port: DaemonPortInfo | None = None
port_cli: DaemonPortInfo | None = Field(default=None, validation_alias="portCli")
last_error: str | None = Field(default=None, validation_alias="lastError")
rpc: DaemonRpcInfo | None = None
health: DaemonHealthInfo | None = None
extra_services: list[DaemonExtraService] = Field(default_factory=list, validation_alias="extraServices")
# ---------------------------------------------------------------------------
# QR / Pairing
# ---------------------------------------------------------------------------
class PairingRequest(BaseModel):
code: str = ""
id: str = ""
meta: dict[str, Any] | None = None
created_at: str = Field(default="", validation_alias="createdAt")
class PairingListResponse(BaseModel):
channel: str = ""
requests: list[PairingRequest] = Field(default_factory=list)
class QrCodeResponse(BaseModel):
setup_code: str = Field(default="", validation_alias="setupCode")
gateway_url: str = Field(default="", validation_alias="gatewayUrl")
auth: Literal["token", "password"] = "token"
url_source: str = Field(default="", validation_alias="urlSource")
# ---------------------------------------------------------------------------
# Update status
# ---------------------------------------------------------------------------
class UpdateGitStatus(BaseModel):
root: str = ""
sha: str | None = None
tag: str | None = None
branch: str | None = None
upstream: str | None = None
dirty: bool | None = None
ahead: int | None = None
behind: int | None = None
fetch_ok: bool = Field(default=False, validation_alias="fetchOk")
error: str | None = None
class UpdateDepsStatus(BaseModel):
manager: str = ""
status: Literal["ok", "missing", "stale", "unknown"] = "unknown"
lockfile_path: str | None = Field(default=None, validation_alias="lockfilePath")
marker_path: str | None = Field(default=None, validation_alias="markerPath")
reason: str | None = None
class UpdateRegistryStatus(BaseModel):
latest_version: str | None = Field(default=None, validation_alias="latestVersion")
error: str | None = None
class UpdateCheckResult(BaseModel):
root: str | None = None
install_kind: Literal["git", "package", "unknown"] = "unknown"
package_manager: str = ""
git: UpdateGitStatus | None = None
deps: UpdateDepsStatus | None = None
registry: UpdateRegistryStatus | None = None
class UpdateChannelInfo2(BaseModel):
value: str | None = None
source: str = ""
label: str = ""
config: str | None = None
class UpdateStatusResponse(BaseModel):
update: UpdateCheckResult | None = None
channel: UpdateChannelInfo2 | None = None
availability: dict[str, Any] | None = None
# ---------------------------------------------------------------------------
# Models aliases / fallbacks
# ---------------------------------------------------------------------------
class ModelAliasEntry(BaseModel):
alias: str = ""
target: str = ""
class ModelAliasesList(BaseModel):
aliases: dict[str, str] = Field(default_factory=dict)
def items_list(self) -> list[ModelAliasEntry]:
return [ModelAliasEntry(alias=k, target=v) for k, v in self.aliases.items()]
class ModelFallbackItem(BaseModel):
raw: str = ""
provider: str = ""
model: str = ""
class ModelFallbacksList(BaseModel):
key: str = ""
label: str = ""
items: list[ModelFallbackItem] = Field(default_factory=list)
# ---------------------------------------------------------------------------
# Skills update result
# ---------------------------------------------------------------------------
class SkillUpdateResult(BaseModel):
ok: bool = False
slug: str = ""
previous_version: str | None = Field(default=None, validation_alias="previousVersion")
version: str = ""
changed: bool = False
target_dir: str = Field(default="", validation_alias="targetDir")
error: str | None = None
# ---------------------------------------------------------------------------
# Normalization helpers
# ---------------------------------------------------------------------------
def normalize_hooks(raw: dict[str, Any] | None) -> HookStatusReport:
if not raw:
return HookStatusReport()
hooks = raw.get("hooks") or []
workspace_dir = raw.get("workspaceDir", "")
managed_hooks_dir = raw.get("managedHooksDir", "")
parsed = [
HookStatusEntry.model_validate(h, strict=False) for h in hooks if isinstance(h, dict)
]
return HookStatusReport(
workspace_dir=workspace_dir,
managed_hooks_dir=managed_hooks_dir,
hooks=parsed,
)
def normalize_plugins(raw: dict[str, Any] | None) -> PluginsList:
if not raw:
return PluginsList()
plugins = raw.get("plugins") or []
diagnostics = raw.get("diagnostics") or []
return PluginsList(
workspace_dir=raw.get("workspaceDir", ""),
plugins=[PluginRecord.model_validate(p, strict=False) for p in plugins if isinstance(p, dict)],
diagnostics=[PluginDiagnostic.model_validate(d, strict=False) for d in diagnostics if isinstance(d, dict)],
)
def normalize_secrets_audit(raw: dict[str, Any] | None) -> SecretsAuditReport:
return SecretsAuditReport.from_raw(raw)
def normalize_security_audit(raw: dict[str, Any] | None) -> SecurityAuditResponse:
if not raw:
return SecurityAuditResponse()
return SecurityAuditResponse.model_validate(raw, strict=False)
def normalize_daemon_status(raw: dict[str, Any] | None) -> DaemonStatus:
if not raw:
return DaemonStatus()
return DaemonStatus.model_validate(raw, strict=False)
def normalize_pairing(raw: dict[str, Any] | None) -> PairingListResponse:
if not raw:
return PairingListResponse()
return PairingListResponse.model_validate(raw, strict=False)
def normalize_qr(raw: dict[str, Any] | None) -> QrCodeResponse:
if not raw:
return QrCodeResponse()
return QrCodeResponse.model_validate(raw, strict=False)
def normalize_update_status(raw: dict[str, Any] | None) -> UpdateStatusResponse:
if not raw:
return UpdateStatusResponse()
return UpdateStatusResponse.model_validate(raw, strict=False)
def normalize_model_aliases(raw: dict[str, Any] | None) -> ModelAliasesList:
if not raw:
return ModelAliasesList()
return ModelAliasesList.model_validate(raw, strict=False)
def normalize_model_fallbacks(raw: dict[str, Any] | None) -> ModelFallbacksList:
if not raw:
return ModelFallbacksList()
return ModelFallbacksList.model_validate(raw, strict=False)
def normalize_skill_update(raw: dict[str, Any] | None) -> SkillUpdateResult:
if not raw:
return SkillUpdateResult()
return SkillUpdateResult.model_validate(raw, strict=False)