# -*- coding: utf-8 -*- """Stable Pydantic models for OpenClaw CLI output. These models normalize the raw JSON from `openclaw status --json`, `sessions --json`, `cron list --json`, and `approvals get --json` into a consistent, well-typed structure with safe accessors. """ from __future__ import annotations from datetime import datetime, timezone from typing import Any, Literal from pydantic import AliasChoices, BaseModel, Field # --------------------------------------------------------------------------- # Status # --------------------------------------------------------------------------- class HeartbeatAgentStatus(BaseModel): """Heartbeat status for a single agent.""" agent_id: str enabled: bool every: str = "" every_ms: int | None = None class HeartbeatStatus(BaseModel): """Heartbeat section of status output.""" default_agent_id: str agents: list[HeartbeatAgentStatus] = Field(default_factory=list) class LinkChannel(BaseModel): """Linked channel info in status output.""" id: str label: str linked: bool auth_age_ms: int | None = None class SessionDefaults(BaseModel): """Session defaults in status output.""" model: str | None = None context_tokens: int | None = None class SessionStatus(BaseModel): """A single session's live status fields.""" key: str kind: Literal["direct", "group", "global", "unknown"] = "unknown" agent_id: str | None = None session_id: str | None = None updated_at: int | None = Field(default=None, validation_alias="updatedAt") age_ms: int | None = Field(default=None, validation_alias="ageMs") thinking_level: str | None = Field(default=None) fast_mode: bool | None = None verbose_level: str | None = None reasoning_level: str | None = None elevated_level: str | None = None system_sent: bool | None = None aborted_last_run: bool | None = None input_tokens: int | None = None output_tokens: int | None = None total_tokens: int | None = None total_tokens_fresh: bool | None = None cache_read: int | None = None cache_write: int | None = None remaining_tokens: int | None = None percent_used: float | None = None model: str | None = None context_tokens: int | None = None flags: list[str] = Field(default_factory=list) @property def updated_at_dt(self) -> datetime | None: if self.updated_at is None: return None return datetime.fromtimestamp(self.updated_at / 1000, tz=timezone.utc) @property def age_str(self) -> str: if self.age_ms is None: return "?" seconds = int(self.age_ms / 1000) if seconds < 60: return f"{seconds}s" minutes = seconds // 60 if minutes < 60: return f"{minutes}m" hours = minutes // 60 if hours < 24: return f"{hours}h" days = hours // 24 return f"{days}d" class SessionsStatusGroup(BaseModel): """Sessions grouped by agent.""" agent_id: str path: str count: int recent: list[SessionStatus] = Field(default_factory=list) class SessionsStatus(BaseModel): """Sessions section of status output.""" paths: list[str] = Field(default_factory=list) count: int = 0 defaults: SessionDefaults = Field(default_factory=SessionDefaults) recent: list[SessionStatus] = Field(default_factory=list) by_agent: list[SessionsStatusGroup] = Field(default_factory=list) class UpdateChannelInfo(BaseModel): """Update channel info.""" channel: str | None = None source: str | None = None class UpdateInfo(BaseModel): """Update info in status output.""" install_kind: str | None = Field(default=None, validation_alias="installKind") git_tag: str | None = Field(default=None, validation_alias="gitTag") git_branch: str | None = Field(default=None, validation_alias="gitBranch") class OsSummary(BaseModel): """OS summary in status output.""" os: str | None = None arch: str | None = None hostname: str | None = None class MemoryInfo(BaseModel): """Memory info in status output.""" used_mb: float | None = Field(default=None, validation_alias="usedMb") total_mb: float | None = None percent_used: float | None = None class SecretDiagnostics(BaseModel): """Secret diagnostics summary.""" total: int = 0 providers: dict[str, int] = Field(default_factory=dict) class GatewayStatus(BaseModel): """Gateway connection status.""" mode: str | None = None url: str | None = None url_source: str | None = Field(default=None, validation_alias="urlSource") misconfigured: bool | None = None reachable: bool | None = None connect_latency_ms: int | None = Field(default=None, validation_alias="connectLatencyMs") self_: str | None = Field(default=None, validation_alias="self") error: str | None = None auth_warning: str | None = Field(default=None, validation_alias="authWarning") class AgentHealth(BaseModel): """Per-agent health status.""" agent_id: str | None = Field(default=None, validation_alias="agentId") status: str | None = None model: str | None = None error: str | None = None class OpenClawStatus(BaseModel): """Complete parsed output of `openclaw status --json`. All fields are optional so that partial/incomplete responses from older CLI versions still parse cleanly. """ runtime_version: str | None = Field(default=None, validation_alias="runtimeVersion") link_channel: LinkChannel | None = Field(default=None, validation_alias="linkChannel") heartbeat: HeartbeatStatus | None = None channel_summary: list[str] = Field(default_factory=list) queued_system_events: list[str] = Field(default_factory=list) sessions: SessionsStatus | None = None os: OsSummary | None = None update: UpdateInfo | None = None update_channel: UpdateChannelInfo | None = Field(default=None, validation_alias="updateChannel") memory: MemoryInfo | None = None memory_plugin: str | None = Field(default=None, validation_alias="memoryPlugin") gateway: GatewayStatus | None = None gateway_service: dict[str, Any] | None = Field(default=None, validation_alias="gatewayService") node_service: dict[str, Any] | None = Field(default=None, validation_alias="nodeService") agents: list[AgentHealth] = Field(default_factory=list) secret_diagnostics: SecretDiagnostics | None = Field(default=None, validation_alias="secretDiagnostics") health: dict[str, Any] | None = None usage: dict[str, Any] | None = None last_heartbeat: dict[str, Any] | None = Field(default=None, validation_alias="lastHeartbeat") # --------------------------------------------------------------------------- # Sessions # --------------------------------------------------------------------------- class SessionEntry(BaseModel): """A single session entry from `openclaw sessions --json`.""" key: str = Field(validation_alias="key") session_key: str | None = Field(default=None, validation_alias="sessionKey") session_id: str | None = Field(default=None, validation_alias="sessionId") updated_at: int | None = Field(default=None, validation_alias="updatedAt") age_ms: int | None = Field(default=None, validation_alias="ageMs") model: str | None = None model_provider: str | None = Field(default=None, validation_alias="modelProvider") kind: Literal["direct", "group", "global", "unknown"] = "unknown" agent_id: str | None = Field(default=None, validation_alias="agentId") flags: list[str] = Field(default_factory=list) @property def resolved_key(self) -> str: return self.session_key or self.key @property def updated_at_dt(self) -> datetime | None: if self.updated_at is None: return None return datetime.fromtimestamp(self.updated_at / 1000, tz=timezone.utc) @property def age_str(self) -> str: if self.age_ms is None: return "?" seconds = int(self.age_ms / 1000) if seconds < 60: return f"{seconds}s" minutes = seconds // 60 if minutes < 60: return f"{minutes}m" hours = minutes // 60 if hours < 24: return f"{hours}h" days = hours // 24 return f"{days}d" class SessionHistoryEvent(BaseModel): """A single event within a session history.""" ts: int | None = None type: str = "" role: str | None = None content: str | None = None model: str | None = None tokens: int | None = None channel_id: str | None = Field(default=None, validation_alias="channelId") @property def ts_dt(self) -> datetime | None: if self.ts is None: return None return datetime.fromtimestamp(self.ts / 1000, tz=timezone.utc) class SessionHistory(BaseModel): """Session history response from `openclaw sessions history --json`.""" session_key: str = Field(validation_alias="sessionKey") session_id: str | None = Field(default=None, validation_alias="sessionId") events: list[SessionHistoryEvent] = Field(default_factory=list) raw_text: str | None = Field(default=None, validation_alias="rawText") class SessionsList(BaseModel): """Parsed response from `openclaw sessions --json`.""" sessions: list[SessionEntry] = Field(default_factory=list) def get(self, key: str) -> SessionEntry | None: for s in self.sessions: if s.key == key or s.session_key == key: return s return None # --------------------------------------------------------------------------- # Cron # --------------------------------------------------------------------------- class CronJob(BaseModel): """A single cron job from `openclaw cron list --json`.""" id: str | None = None label: str | None = None schedule: str | None = None cron: str | None = None command: str | None = None agent_id: str | None = Field(default=None, validation_alias="agentId") enabled: bool = True status: str | None = None next_run_at: int | None = Field(default=None, validation_alias="nextRunAt") last_run_at: int | None = Field(default=None, validation_alias="lastRunAt") last_exit_code: int | None = Field(default=None, validation_alias="lastExitCode") @property def display_schedule(self) -> str: return self.schedule or self.cron or "?" @property def next_run_dt(self) -> datetime | None: if self.next_run_at is None: return None return datetime.fromtimestamp(self.next_run_at / 1000, tz=timezone.utc) class CronList(BaseModel): """Parsed response from `openclaw cron list --json`.""" cron: list[CronJob] = Field(default_factory=list) jobs: list[CronJob] = Field(default_factory=list) @classmethod def from_raw(cls, raw: dict[str, Any] | None) -> "CronList": if not raw: return cls() raw_jobs: list[dict[str, Any]] = raw.get("cron") or raw.get("jobs") or [] jobs = [CronJob.model_validate(j, strict=False) if isinstance(j, dict) else j for j in raw_jobs] return cls(cron=jobs, jobs=jobs) def __iter__(self): return iter(self.cron) def __len__(self): return len(self.cron) # --------------------------------------------------------------------------- # Approvals # --------------------------------------------------------------------------- class ExecApprovalToolInput(BaseModel): """Structured tool_input from an approval request.""" command: str | None = None command_preview: str | None = Field(default=None, validation_alias="commandPreview") cwd: str | None = None env_keys: list[str] = Field(default_factory=list, validation_alias="envKeys") argv: list[str] = Field(default_factory=list) class ExecApprovalFinding(BaseModel): """A security finding attached to an approval request.""" severity: Literal["critical", "high", "medium", "low", "info"] message: str check: str | None = None class ApprovalRequest(BaseModel): """A single approval request from `openclaw approvals get --json`.""" approval_id: str = Field(validation_alias="approvalId") tool_name: str = Field(validation_alias="toolName") status: Literal["pending", "approved", "denied", "expired"] = "pending" agent_id: str | None = Field(default=None, validation_alias="agentId") workspace_id: str | None = Field(default=None, validation_alias="workspaceId") session_id: str | None = Field(default=None, validation_alias="sessionId") created_at: int | None = Field(default=None, validation_alias="createdAt") expires_at: int | None = Field(default=None, validation_alias="expiresAt") tool_input: ExecApprovalToolInput | None = Field(default=None, validation_alias="toolInput") findings: list[ExecApprovalFinding] = Field(default_factory=list) @property def created_at_dt(self) -> datetime | None: if self.created_at is None: return None return datetime.fromtimestamp(self.created_at / 1000, tz=timezone.utc) @property def is_expired(self) -> bool: if self.expires_at is None: return False return datetime.now(timezone.utc).timestamp() * 1000 > self.expires_at class ApprovalsList(BaseModel): """Parsed response from `openclaw approvals get --json`.""" approvals: list[ApprovalRequest] = Field(default_factory=list) pending: list[ApprovalRequest] = Field(default_factory=list) @classmethod def from_raw(cls, raw: dict[str, Any] | None) -> "ApprovalsList": if not raw: return cls() raw_approvals: list[dict[str, Any]] = raw.get("approvals") or [] approvals = [ ApprovalRequest.model_validate(a, strict=False) if isinstance(a, dict) else a for a in raw_approvals ] pending = [a for a in approvals if a.status == "pending" and not a.is_expired] return cls(approvals=approvals, pending=pending) def __iter__(self): return iter(self.approvals) def __len__(self): return len(self.approvals) # --------------------------------------------------------------------------- # Normalization helpers # --------------------------------------------------------------------------- def normalize_status(raw: dict[str, Any] | None) -> OpenClawStatus: """Parse raw status dict into a strongly-typed OpenClawStatus. Silently ignores unknown fields so schema changes in newer CLI versions don't break existing callers. """ if not raw: return OpenClawStatus() return OpenClawStatus.model_validate(raw, strict=False) def normalize_sessions(raw: dict[str, Any] | None) -> SessionsList: """Parse raw sessions dict into a strongly-typed SessionsList.""" if not raw: return SessionsList() sessions = raw.get("sessions") or [] return SessionsList( sessions=[ SessionEntry.model_validate(s, strict=False) for s in sessions if isinstance(s, dict) ] ) def normalize_session_history( raw: dict[str, Any] | None, session_key: str = "", ) -> SessionHistory: """Parse raw session history dict into a SessionHistory.""" if not raw: return SessionHistory(session_key=session_key, events=[]) history = raw.get("history") or [] return SessionHistory.model_validate( { **raw, "sessionKey": raw.get("sessionKey", session_key), "events": [ SessionHistoryEvent.model_validate(e, strict=False) for e in history if isinstance(e, dict) ], }, strict=False, ) def normalize_cron_jobs(raw: dict[str, Any] | None) -> CronList: """Parse raw cron list dict into a CronList.""" return CronList.from_raw(raw) def normalize_approvals(raw: dict[str, Any] | None) -> ApprovalsList: """Parse raw approvals dict into an ApprovalsList.""" return ApprovalsList.from_raw(raw) # --------------------------------------------------------------------------- # Agents # --------------------------------------------------------------------------- class AgentSummary(BaseModel): """A single agent summary from `openclaw agents list --json`.""" id: str name: str | None = None identity_name: str | None = Field(default=None, validation_alias=AliasChoices("identityName", "identity_name")) identity_emoji: str | None = Field(default=None, validation_alias=AliasChoices("identityEmoji", "identity_emoji")) identity_source: Literal["identity", "config"] | None = Field(default=None, validation_alias=AliasChoices("identitySource", "identity_source")) workspace: str = "" agent_dir: str = Field(default="", validation_alias=AliasChoices("agentDir", "agent_dir")) model: str | None = None bindings: int = 0 binding_details: list[str] = Field(default_factory=list, validation_alias=AliasChoices("bindingDetails", "binding_details")) routes: list[str] = Field(default_factory=list) providers: list[str] = Field(default_factory=list) is_default: bool = Field(default=False, validation_alias=AliasChoices("isDefault", "is_default")) class AgentsList(BaseModel): """Parsed response from `openclaw agents list --json`.""" agents: list[AgentSummary] = Field(default_factory=list) def __iter__(self): return iter(self.agents) def __len__(self): return len(self.agents) # --------------------------------------------------------------------------- # Skills # --------------------------------------------------------------------------- class SkillRequirement(BaseModel): """Requirements entry for a skill.""" bins: list[str] = Field(default_factory=list) any_bins: list[str] = Field(default_factory=list, validation_alias="anyBins") env: list[str] = Field(default_factory=list) config: list[str] = Field(default_factory=list) os: list[str] = Field(default_factory=list) class SkillStatusEntry(BaseModel): """A single skill entry from `openclaw skills list --json`.""" name: str = "" description: str = "" source: str = "" bundled: bool = False file_path: str = Field(default="", validation_alias="filePath") base_dir: str = Field(default="", validation_alias="baseDir") skill_key: str = Field(default="", validation_alias="skillKey") primary_env: str | None = Field(default=None, validation_alias="primaryEnv") emoji: str | None = None homepage: str | None = None always: bool = False disabled: bool = False blocked_by_allowlist: bool = Field(default=False, validation_alias="blockedByAllowlist") eligible: bool = True requirements: SkillRequirement = Field(default_factory=SkillRequirement) missing: SkillRequirement = Field(default_factory=SkillRequirement, validation_alias="missing") config_checks: list[dict[str, Any]] = Field(default_factory=list, validation_alias="configChecks") install: list[dict[str, Any]] = Field(default_factory=list) class SkillStatusReport(BaseModel): """Parsed response from `openclaw skills list --json`.""" workspace_dir: str = Field(default="", validation_alias="workspaceDir") managed_skills_dir: str = Field(default="", validation_alias="managedSkillsDir") skills: list[SkillStatusEntry] = Field(default_factory=list) def __iter__(self): return iter(self.skills) def __len__(self): return len(self.skills) # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class ModelRow(BaseModel): """A single model row from `openclaw models list --json`.""" key: str = "" name: str = "" input: str = "" context_window: int | None = Field(default=None, validation_alias="contextWindow") local: bool | None = None available: bool | None = None tags: list[str] = Field(default_factory=list) missing: bool = False class ModelsList(BaseModel): """Parsed response from `openclaw models list --json`.""" models: list[ModelRow] = Field(default_factory=list) def __iter__(self): return iter(self.models) def __len__(self): return len(self.models) # --------------------------------------------------------------------------- # Normalization helpers (agents, skills, models) # --------------------------------------------------------------------------- def normalize_agents(raw: dict[str, Any] | None) -> AgentsList: """Parse raw agents list dict into an AgentsList.""" if not raw: return AgentsList() agents = raw.get("agents") or (raw if isinstance(raw, list) else []) if isinstance(agents, list): return AgentsList( agents=[ AgentSummary.model_validate(a, strict=False) for a in agents if isinstance(a, dict) ] ) return AgentsList() def normalize_skills(raw: dict[str, Any] | None) -> SkillStatusReport: """Parse raw skills list dict into a SkillStatusReport.""" if not raw: return SkillStatusReport() skills = raw.get("skills") or [] workspace_dir = raw.get("workspaceDir", "") managed_skills_dir = raw.get("managedSkillsDir", "") parsed_skills = [ SkillStatusEntry.model_validate(s, strict=False) for s in skills if isinstance(s, dict) ] return SkillStatusReport( workspace_dir=workspace_dir, managed_skills_dir=managed_skills_dir, skills=parsed_skills, ) def normalize_models(raw: dict[str, Any] | None) -> ModelsList: """Parse raw models list dict into a ModelsList.""" if not raw: return ModelsList() models = raw.get("models") or raw if isinstance(raw, list) else [] if isinstance(models, list): return ModelsList( models=[ModelRow.model_validate(m, strict=False) for m in models if isinstance(m, dict)] ) return ModelsList() # --------------------------------------------------------------------------- # Hooks # --------------------------------------------------------------------------- class HookInstallOption(BaseModel): id: str = "" kind: str = "" label: str = "" bins: list[str] = Field(default_factory=list) class HookStatusEntry(BaseModel): name: str = "" description: str = "" source: str = "" plugin_id: str | None = Field(default=None, validation_alias="pluginId") file_path: str = Field(default="", validation_alias="filePath") base_dir: str = Field(default="", validation_alias="baseDir") handler_path: str = Field(default="", validation_alias="handlerPath") hook_key: str = Field(default="", validation_alias="hookKey") emoji: str | None = None homepage: str | None = None events: list[str] = Field(default_factory=list) always: bool = False enabled_by_config: bool = Field(default=False, validation_alias="enabledByConfig") requirements_satisfied: bool = Field(default=False, validation_alias="requirementsSatisfied") loadable: bool = False blocked_reason: str | None = Field(default=None, validation_alias="blockedReason") managed_by_plugin: bool = Field(default=False, validation_alias="managedByPlugin") requirements: SkillRequirement = Field(default_factory=SkillRequirement) missing: SkillRequirement = Field(default_factory=SkillRequirement) config_checks: list[dict[str, Any]] = Field(default_factory=list, validation_alias="configChecks") install: list[HookInstallOption] = Field(default_factory=list) class HookStatusReport(BaseModel): workspace_dir: str = Field(default="", validation_alias="workspaceDir") managed_hooks_dir: str = Field(default="", validation_alias="managedHooksDir") hooks: list[HookStatusEntry] = Field(default_factory=list) def __iter__(self): return iter(self.hooks) def __len__(self): return len(self.hooks) # --------------------------------------------------------------------------- # Plugins # --------------------------------------------------------------------------- class PluginRecord(BaseModel): id: str = "" name: str = "" version: str | None = None description: str | None = None format: str | None = None bundle_format: str | None = Field(default=None, validation_alias="bundleFormat") bundle_capabilities: list[str] = Field(default_factory=list, validation_alias="bundleCapabilities") kind: str | None = None source: str = "" root_dir: str | None = Field(default=None, validation_alias="rootDir") origin: dict[str, Any] = Field(default_factory=dict) workspace_dir: str | None = Field(default=None, validation_alias="workspaceDir") enabled: bool = False status: Literal["loaded", "disabled", "error"] = "disabled" error: str | None = None tool_names: list[str] = Field(default_factory=list, validation_alias="toolNames") hook_names: list[str] = Field(default_factory=list, validation_alias="hookNames") channel_ids: list[str] = Field(default_factory=list, validation_alias="channelIds") provider_ids: list[str] = Field(default_factory=list, validation_alias="providerIds") speech_provider_ids: list[str] = Field(default_factory=list, validation_alias="speechProviderIds") media_understanding_provider_ids: list[str] = Field(default_factory=list, validation_alias="mediaUnderstandingProviderIds") image_generation_provider_ids: list[str] = Field(default_factory=list, validation_alias="imageGenerationProviderIds") web_search_provider_ids: list[str] = Field(default_factory=list, validation_alias="webSearchProviderIds") gateway_methods: list[str] = Field(default_factory=list, validation_alias="gatewayMethods") cli_commands: list[str] = Field(default_factory=list, validation_alias="cliCommands") services: list[str] = Field(default_factory=list) commands: list[str] = Field(default_factory=list) http_routes: int = 0 hook_count: int = Field(default=0, validation_alias="hookCount") config_schema: bool = False class PluginDiagnostic(BaseModel): id: str = "" message: str = "" class PluginsList(BaseModel): workspace_dir: str = Field(default="", validation_alias="workspaceDir") plugins: list[PluginRecord] = Field(default_factory=list) diagnostics: list[PluginDiagnostic] = Field(default_factory=list) def __iter__(self): return iter(self.plugins) def __len__(self): return len(self.plugins) # --------------------------------------------------------------------------- # Secrets # --------------------------------------------------------------------------- class SecretsAuditFinding(BaseModel): code: str = "" severity: Literal["info", "warn", "error"] = "info" file: str = "" json_path: str = Field(default="", validation_alias="jsonPath") message: str = "" provider: str | None = None profile_id: str | None = Field(default=None, validation_alias="profileId") class SecretsAuditSummary(BaseModel): plaintext_count: int = Field(default=0, validation_alias="plaintextCount") unresolved_ref_count: int = Field(default=0, validation_alias="unresolvedRefCount") shadowed_ref_count: int = Field(default=0, validation_alias="shadowedRefCount") legacy_residue_count: int = Field(default=0, validation_alias="legacyResidueCount") class SecretsAuditResolution(BaseModel): refs_checked: int = Field(default=0, validation_alias="refsChecked") skipped_exec_refs: int = Field(default=0, validation_alias="skippedExecRefs") resolvability_complete: bool = Field(default=False, validation_alias="resolvabilityComplete") class SecretsAuditReport(BaseModel): version: int = 1 status: Literal["clean", "findings", "unresolved"] = "clean" resolution: SecretsAuditResolution = Field(default_factory=SecretsAuditResolution) files_scanned: list[str] = Field(default_factory=list, validation_alias="filesScanned") summary: SecretsAuditSummary = Field(default_factory=SecretsAuditSummary) findings: list[SecretsAuditFinding] = Field(default_factory=list) @classmethod def from_raw(cls, raw: dict[str, Any] | None) -> "SecretsAuditReport": if not raw: return cls() return cls.model_validate(raw, strict=False) # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class SecurityAuditFinding(BaseModel): check_id: str = Field(default="", validation_alias="checkId") severity: Literal["info", "warn", "critical"] = "info" title: str = "" detail: str = "" remediation: str | None = None class SecurityAuditSummary(BaseModel): critical: int = 0 warn: int = 0 info: int = 0 class SecurityAuditGatewayResult(BaseModel): attempted: bool = False url: str | None = None ok: bool = False error: str | None = None close: dict[str, Any] | None = None class SecurityAuditDeep(BaseModel): gateway: SecurityAuditGatewayResult | None = None class SecurityAuditReport(BaseModel): ts: int = 0 summary: SecurityAuditSummary = Field(default_factory=SecurityAuditSummary) findings: list[SecurityAuditFinding] = Field(default_factory=list) deep: SecurityAuditDeep | None = None class SecurityAuditResponse(BaseModel): fix: dict[str, Any] | None = None report: SecurityAuditReport | None = None secret_diagnostics: list[str] = Field(default_factory=list, validation_alias="secretDiagnostics") # --------------------------------------------------------------------------- # Daemon # --------------------------------------------------------------------------- class DaemonServiceCommand(BaseModel): program_arguments: list[str] = Field(default_factory=list, validation_alias="programArguments") working_directory: str | None = Field(default=None, validation_alias="workingDirectory") environment: dict[str, str] | None = None source_path: str | None = Field(default=None, validation_alias="sourcePath") class DaemonServiceInfo(BaseModel): label: str = "" loaded: bool = False loaded_text: str = Field(default="", validation_alias="loadedText") not_loaded_text: str = Field(default="", validation_alias="notLoadedText") command: DaemonServiceCommand | None = None runtime: dict[str, Any] | None = None config_audit: dict[str, Any] | None = Field(default=None, validation_alias="configAudit") class DaemonPortListener(BaseModel): port: int = 0 status: str = "" type: str = "" class DaemonPortInfo(BaseModel): port: int = 0 status: str = "" listeners: list[DaemonPortListener] = Field(default_factory=list) hints: list[str] = Field(default_factory=list) class DaemonRpcInfo(BaseModel): ok: bool = False error: str | None = None url: str | None = None auth_warning: str | None = Field(default=None, validation_alias="authWarning") class DaemonHealthInfo(BaseModel): healthy: bool = False stale_gateway_pids: list[int] = Field(default_factory=list, validation_alias="staleGatewayPids") class DaemonExtraService(BaseModel): label: str = "" detail: str = "" scope: str = "" class DaemonStatus(BaseModel): service: DaemonServiceInfo | None = None config: dict[str, Any] | None = None gateway: dict[str, Any] | None = None port: DaemonPortInfo | None = None port_cli: DaemonPortInfo | None = Field(default=None, validation_alias="portCli") last_error: str | None = Field(default=None, validation_alias="lastError") rpc: DaemonRpcInfo | None = None health: DaemonHealthInfo | None = None extra_services: list[DaemonExtraService] = Field(default_factory=list, validation_alias="extraServices") # --------------------------------------------------------------------------- # QR / Pairing # --------------------------------------------------------------------------- class PairingRequest(BaseModel): code: str = "" id: str = "" meta: dict[str, Any] | None = None created_at: str = Field(default="", validation_alias="createdAt") class PairingListResponse(BaseModel): channel: str = "" requests: list[PairingRequest] = Field(default_factory=list) class QrCodeResponse(BaseModel): setup_code: str = Field(default="", validation_alias="setupCode") gateway_url: str = Field(default="", validation_alias="gatewayUrl") auth: Literal["token", "password"] = "token" url_source: str = Field(default="", validation_alias="urlSource") # --------------------------------------------------------------------------- # Update status # --------------------------------------------------------------------------- class UpdateGitStatus(BaseModel): root: str = "" sha: str | None = None tag: str | None = None branch: str | None = None upstream: str | None = None dirty: bool | None = None ahead: int | None = None behind: int | None = None fetch_ok: bool = Field(default=False, validation_alias="fetchOk") error: str | None = None class UpdateDepsStatus(BaseModel): manager: str = "" status: Literal["ok", "missing", "stale", "unknown"] = "unknown" lockfile_path: str | None = Field(default=None, validation_alias="lockfilePath") marker_path: str | None = Field(default=None, validation_alias="markerPath") reason: str | None = None class UpdateRegistryStatus(BaseModel): latest_version: str | None = Field(default=None, validation_alias="latestVersion") error: str | None = None class UpdateCheckResult(BaseModel): root: str | None = None install_kind: Literal["git", "package", "unknown"] = "unknown" package_manager: str = "" git: UpdateGitStatus | None = None deps: UpdateDepsStatus | None = None registry: UpdateRegistryStatus | None = None class UpdateChannelInfo2(BaseModel): value: str | None = None source: str = "" label: str = "" config: str | None = None class UpdateStatusResponse(BaseModel): update: UpdateCheckResult | None = None channel: UpdateChannelInfo2 | None = None availability: dict[str, Any] | None = None # --------------------------------------------------------------------------- # Models aliases / fallbacks # --------------------------------------------------------------------------- class ModelAliasEntry(BaseModel): alias: str = "" target: str = "" class ModelAliasesList(BaseModel): aliases: dict[str, str] = Field(default_factory=dict) def items_list(self) -> list[ModelAliasEntry]: return [ModelAliasEntry(alias=k, target=v) for k, v in self.aliases.items()] class ModelFallbackItem(BaseModel): raw: str = "" provider: str = "" model: str = "" class ModelFallbacksList(BaseModel): key: str = "" label: str = "" items: list[ModelFallbackItem] = Field(default_factory=list) # --------------------------------------------------------------------------- # Skills update result # --------------------------------------------------------------------------- class SkillUpdateResult(BaseModel): ok: bool = False slug: str = "" previous_version: str | None = Field(default=None, validation_alias="previousVersion") version: str = "" changed: bool = False target_dir: str = Field(default="", validation_alias="targetDir") error: str | None = None # --------------------------------------------------------------------------- # Normalization helpers # --------------------------------------------------------------------------- def normalize_hooks(raw: dict[str, Any] | None) -> HookStatusReport: if not raw: return HookStatusReport() hooks = raw.get("hooks") or [] workspace_dir = raw.get("workspaceDir", "") managed_hooks_dir = raw.get("managedHooksDir", "") parsed = [ HookStatusEntry.model_validate(h, strict=False) for h in hooks if isinstance(h, dict) ] return HookStatusReport( workspace_dir=workspace_dir, managed_hooks_dir=managed_hooks_dir, hooks=parsed, ) def normalize_plugins(raw: dict[str, Any] | None) -> PluginsList: if not raw: return PluginsList() plugins = raw.get("plugins") or [] diagnostics = raw.get("diagnostics") or [] return PluginsList( workspace_dir=raw.get("workspaceDir", ""), plugins=[PluginRecord.model_validate(p, strict=False) for p in plugins if isinstance(p, dict)], diagnostics=[PluginDiagnostic.model_validate(d, strict=False) for d in diagnostics if isinstance(d, dict)], ) def normalize_secrets_audit(raw: dict[str, Any] | None) -> SecretsAuditReport: return SecretsAuditReport.from_raw(raw) def normalize_security_audit(raw: dict[str, Any] | None) -> SecurityAuditResponse: if not raw: return SecurityAuditResponse() return SecurityAuditResponse.model_validate(raw, strict=False) def normalize_daemon_status(raw: dict[str, Any] | None) -> DaemonStatus: if not raw: return DaemonStatus() return DaemonStatus.model_validate(raw, strict=False) def normalize_pairing(raw: dict[str, Any] | None) -> PairingListResponse: if not raw: return PairingListResponse() return PairingListResponse.model_validate(raw, strict=False) def normalize_qr(raw: dict[str, Any] | None) -> QrCodeResponse: if not raw: return QrCodeResponse() return QrCodeResponse.model_validate(raw, strict=False) def normalize_update_status(raw: dict[str, Any] | None) -> UpdateStatusResponse: if not raw: return UpdateStatusResponse() return UpdateStatusResponse.model_validate(raw, strict=False) def normalize_model_aliases(raw: dict[str, Any] | None) -> ModelAliasesList: if not raw: return ModelAliasesList() return ModelAliasesList.model_validate(raw, strict=False) def normalize_model_fallbacks(raw: dict[str, Any] | None) -> ModelFallbacksList: if not raw: return ModelFallbacksList() return ModelFallbacksList.model_validate(raw, strict=False) def normalize_skill_update(raw: dict[str, Any] | None) -> SkillUpdateResult: if not raw: return SkillUpdateResult() return SkillUpdateResult.model_validate(raw, strict=False)