# -*- coding: utf-8 -*- """Thin service wrapper around the OpenClaw CLI.""" from __future__ import annotations import json import os import shlex import shutil import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any from shared.models.openclaw import ( AgentSummary, AgentsList, ApprovalsList, CronList, DaemonStatus, HookStatusReport, ModelAliasesList, ModelFallbacksList, ModelRow, ModelsList, OpenClawStatus, PairingListResponse, PluginsList, QrCodeResponse, SecretsAuditReport, SecurityAuditResponse, SessionEntry, SessionHistory, SessionsList, SkillStatusReport, SkillUpdateResult, UpdateStatusResponse, normalize_agents, normalize_approvals, normalize_cron_jobs, normalize_daemon_status, normalize_hooks, normalize_model_aliases, normalize_model_fallbacks, normalize_models, normalize_pairing, normalize_plugins, normalize_qr, normalize_security_audit, normalize_secrets_audit, normalize_session_history, normalize_sessions, normalize_skill_update, normalize_skills, normalize_status, normalize_update_status, ) PROJECT_ROOT = Path(__file__).resolve().parents[2] REFERENCE_OPENCLAW_ROOT = PROJECT_ROOT / "reference" / "openclaw" REFERENCE_OPENCLAW_ENTRY = REFERENCE_OPENCLAW_ROOT / "openclaw.mjs" class OpenClawCliError(RuntimeError): """Raised when the OpenClaw CLI invocation fails.""" def __init__( self, message: str, *, command: list[str], exit_code: int | None = None, stdout: str = "", stderr: str = "", ) -> None: super().__init__(message) self.command = command self.exit_code = exit_code self.stdout = stdout self.stderr = stderr @dataclass(frozen=True) class OpenClawCliResult: """Command execution result.""" command: list[str] exit_code: int stdout: str stderr: str def resolve_openclaw_base_command() -> list[str]: """Resolve the command prefix used to launch OpenClaw.""" explicit = os.getenv("OPENCLAW_CMD", "").strip() if explicit: return shlex.split(explicit) installed = shutil.which("openclaw") if installed: return [installed] if REFERENCE_OPENCLAW_ENTRY.exists(): return [sys.executable if sys.executable.endswith("node") else "node", str(REFERENCE_OPENCLAW_ENTRY)] return ["openclaw"] def resolve_openclaw_cwd() -> Path: """Resolve the working directory for CLI execution.""" explicit = os.getenv("OPENCLAW_CWD", "").strip() if explicit: return Path(explicit).expanduser() if REFERENCE_OPENCLAW_ROOT.exists(): return REFERENCE_OPENCLAW_ROOT return PROJECT_ROOT class OpenClawCliService: """OpenClaw CLI integration service.""" def __init__( self, *, base_command: list[str] | None = None, cwd: Path | None = None, timeout_seconds: float | None = None, ) -> None: self.base_command = list(base_command or resolve_openclaw_base_command()) self.cwd = cwd or resolve_openclaw_cwd() self.timeout_seconds = timeout_seconds or float( os.getenv("OPENCLAW_TIMEOUT_SECONDS", "15") ) def health(self) -> dict[str, Any]: """Return the current CLI wiring state.""" binary = self.base_command[0] if self.base_command else "openclaw" resolved = shutil.which(binary) if len(self.base_command) == 1 else binary return { "status": "healthy", "service": "openclaw-service", "base_command": self.base_command, "cwd": str(self.cwd), "binary_resolved": resolved is not None, "reference_entry_available": REFERENCE_OPENCLAW_ENTRY.exists(), "timeout_seconds": self.timeout_seconds, } def status(self) -> dict[str, Any]: """Read `openclaw status --json`.""" return self.run_json(["status", "--json"]) def list_sessions(self) -> dict[str, Any]: """Read `openclaw sessions --json`.""" return self.run_json(["sessions", "--json"]) def get_session(self, session_key: str) -> dict[str, Any]: """Resolve a single session out of the sessions list.""" payload = self.list_sessions() sessions = payload.get("sessions") or [] for item in sessions: if not isinstance(item, dict): continue if item.get("key") == session_key or item.get("sessionKey") == session_key: return item raise KeyError(session_key) def get_session_history(self, session_key: str, *, limit: int = 20) -> dict[str, Any]: """Read session history with a JSON-first fallback to raw text.""" args = ["sessions", "history", session_key, "--json", "--limit", str(limit)] try: return self.run_json(args) except OpenClawCliError as exc: raise exc except json.JSONDecodeError: result = self.run(args) return { "sessionKey": session_key, "limit": limit, "rawText": result.stdout, } def list_cron_jobs(self) -> dict[str, Any]: """Read `openclaw cron list --json`.""" return self.run_json(["cron", "list", "--json"]) def list_approvals(self) -> dict[str, Any]: """Read `openclaw approvals get --json`.""" return self.run_json(["approvals", "get", "--json"]) def list_agents(self) -> dict[str, Any]: """Read `openclaw agents list --json`.""" return self.run_json(["agents", "list", "--json"]) def list_skills(self) -> dict[str, Any]: """Read `openclaw skills list --json`.""" return self.run_json(["skills", "list", "--json"]) def list_models(self) -> dict[str, Any]: """Read `openclaw models list --json`.""" return self.run_json(["models", "list", "--json"]) def list_hooks(self) -> dict[str, Any]: """Read `openclaw hooks list --json`.""" return self.run_json(["hooks", "list", "--json"]) def list_plugins(self) -> dict[str, Any]: """Read `openclaw plugins list --json`.""" return self.run_json(["plugins", "list", "--json"]) def secrets_audit(self) -> dict[str, Any]: """Read `openclaw secrets audit --json`.""" return self.run_json(["secrets", "audit", "--json"]) def security_audit(self) -> dict[str, Any]: """Read `openclaw security audit --json`.""" return self.run_json(["security", "audit", "--json"]) def daemon_status(self) -> dict[str, Any]: """Read `openclaw daemon status --json`.""" return self.run_json(["daemon", "status", "--json"]) def pairing_list(self) -> dict[str, Any]: """Read `openclaw pairing list --json`.""" return self.run_json(["pairing", "list", "--json"]) def qr_code(self) -> dict[str, Any]: """Read `openclaw qr --json`.""" return self.run_json(["qr", "--json"]) def update_status(self) -> dict[str, Any]: """Read `openclaw update status --json`.""" return self.run_json(["update", "status", "--json"]) def list_model_aliases(self) -> dict[str, Any]: """Read `openclaw models aliases list --json`.""" return self.run_json(["models", "aliases", "list", "--json"]) def list_model_fallbacks(self) -> dict[str, Any]: """Read `openclaw models fallbacks list --json`.""" return self.run_json(["models", "fallbacks", "list", "--json"]) def list_model_image_fallbacks(self) -> dict[str, Any]: """Read `openclaw models image-fallbacks list --json`.""" return self.run_json(["models", "image-fallbacks", "list", "--json"]) def skill_update(self, *, slug: str | None = None, all: bool = False) -> dict[str, Any]: """Read `openclaw skills update --json`.""" args = ["skills", "update", "--json"] if slug: args.append(slug) if all: args.append("--all") return self.run_json(args) def models_status(self, *, probe: bool = False) -> dict[str, Any]: """Read `openclaw models status --json [--probe]`.""" args = ["models", "status", "--json"] if probe: args.append("--probe") return self.run_json(args) def channels_status(self, *, probe: bool = False) -> dict[str, Any]: """Read `openclaw channels status [--probe] --json`.""" args = ["channels", "status", "--json"] if probe: args.append("--probe") return self.run_json(args) def list_workspace_files(self, workspace_path: str) -> dict[str, Any]: """List .md files in an OpenClaw agent workspace with their content. Reads the workspace directory and returns metadata + content for each .md file. """ from pathlib import Path wp = Path(workspace_path).expanduser().resolve() if not wp.exists() or not wp.is_dir(): return {"workspace": str(wp), "files": [], "error": "workspace not found"} md_files = sorted(wp.glob("*.md")) files = [] for md_file in md_files: try: content = md_file.read_text(encoding="utf-8") # Preview: first 300 chars preview = content[:300].strip() files.append({ "name": md_file.name, "path": str(md_file), "size": len(content), "preview": preview, "previewTruncated": len(content) > 300, }) except OSError as exc: files.append({ "name": md_file.name, "path": str(md_file), "size": 0, "preview": "", "error": str(exc), }) return {"workspace": str(wp), "files": files} def channels_list(self) -> dict[str, Any]: """Read `openclaw channels list --json`.""" return self.run_json(["channels", "list", "--json"]) def hook_info(self, name: str) -> dict[str, Any]: """Read `openclaw hooks info --json`.""" args = ["hooks", "info", name, "--json"] try: return self.run_json(args) except json.JSONDecodeError: result = self.run(args) return {"raw": result.stdout} def hooks_check(self) -> dict[str, Any]: """Read `openclaw hooks check --json`.""" return self.run_json(["hooks", "check", "--json"]) def plugins_inspect(self, *, plugin_id: str | None = None, all: bool = False) -> dict[str, Any]: """Read `openclaw plugins inspect [--json] [--all]`.""" args = ["plugins", "inspect", "--json"] if all: args.append("--all") elif plugin_id: args.append(plugin_id) return self.run_json(args) # ------------------------------------------------------------------------- # Typed variants — these use Pydantic models and are the preferred path. # ------------------------------------------------------------------------- def status_model(self) -> OpenClawStatus: """Read and parse `openclaw status --json` into a typed model.""" raw = self.status() return normalize_status(raw) def list_sessions_model(self) -> SessionsList: """Read and parse `openclaw sessions --json` into a typed model.""" raw = self.list_sessions() return normalize_sessions(raw) def get_session_model(self, session_key: str) -> SessionEntry: """Resolve a single session and return a typed model.""" raw = self.get_session(session_key) return SessionEntry.model_validate(raw, strict=False) def get_session_history_model(self, session_key: str, *, limit: int = 20) -> SessionHistory: """Read session history and return a typed model.""" raw = self.get_session_history(session_key, limit=limit) return normalize_session_history(raw, session_key=session_key) def list_cron_jobs_model(self) -> CronList: """Read and parse `openclaw cron list --json` into a typed model.""" raw = self.list_cron_jobs() return normalize_cron_jobs(raw) def list_approvals_model(self) -> ApprovalsList: """Read and parse `openclaw approvals get --json` into a typed model.""" raw = self.list_approvals() return normalize_approvals(raw) # ------------------------------------------------------------------------- # Typed variants # ------------------------------------------------------------------------- def list_agents_model(self) -> AgentsList: """Read and parse `openclaw agents list --json` into a typed model.""" raw = self.list_agents() if isinstance(raw, list): return AgentsList(agents=[AgentSummary.model_validate(a, strict=False) for a in raw if isinstance(a, dict)]) return normalize_agents(raw) def list_skills_model(self) -> SkillStatusReport: """Read and parse `openclaw skills list --json` into a typed model.""" raw = self.list_skills() return normalize_skills(raw) def list_models_model(self) -> ModelsList: """Read and parse `openclaw models list --json` into a typed model.""" raw = self.list_models() if isinstance(raw, list): return ModelsList(models=[ModelRow.model_validate(m, strict=False) for m in raw if isinstance(m, dict)]) return normalize_models(raw) def list_hooks_model(self) -> HookStatusReport: raw = self.list_hooks() return normalize_hooks(raw) def list_plugins_model(self) -> PluginsList: raw = self.list_plugins() return normalize_plugins(raw) def secrets_audit_model(self) -> SecretsAuditReport: raw = self.secrets_audit() return normalize_secrets_audit(raw) def security_audit_model(self) -> SecurityAuditResponse: raw = self.security_audit() return normalize_security_audit(raw) def daemon_status_model(self) -> DaemonStatus: raw = self.daemon_status() return normalize_daemon_status(raw) def pairing_list_model(self) -> PairingListResponse: raw = self.pairing_list() return normalize_pairing(raw) def qr_code_model(self) -> QrCodeResponse: raw = self.qr_code() return normalize_qr(raw) def update_status_model(self) -> UpdateStatusResponse: raw = self.update_status() return normalize_update_status(raw) def list_model_aliases_model(self) -> ModelAliasesList: raw = self.list_model_aliases() return normalize_model_aliases(raw) def list_model_fallbacks_model(self) -> ModelFallbacksList: raw = self.list_model_fallbacks() return normalize_model_fallbacks(raw) def list_model_image_fallbacks_model(self) -> ModelFallbacksList: raw = self.list_model_image_fallbacks() return normalize_model_fallbacks(raw) def skill_update_model(self, *, slug: str | None = None, all: bool = False) -> SkillUpdateResult: raw = self.skill_update(slug=slug, all=all) return normalize_skill_update(raw) def models_status_model(self, *, probe: bool = False) -> dict[str, Any]: """Read `openclaw models status --json` and return the raw dict.""" return self.models_status(probe=probe) def channels_status_model(self, *, probe: bool = False) -> dict[str, Any]: """Read `openclaw channels status --json` and return the raw dict.""" return self.channels_status(probe=probe) def channels_list_model(self) -> dict[str, Any]: """Read `openclaw channels list --json` and return the raw dict.""" return self.channels_list() def hook_info_model(self, name: str) -> dict[str, Any]: """Read `openclaw hooks info --json` and return the raw dict.""" return self.hook_info(name) def hooks_check_model(self) -> dict[str, Any]: """Read `openclaw hooks check --json` and return the raw dict.""" return self.hooks_check() def plugins_inspect_model(self, *, plugin_id: str | None = None, all: bool = False) -> dict[str, Any]: """Read `openclaw plugins inspect --json [--all]` and return the raw dict.""" return self.plugins_inspect(plugin_id=plugin_id, all=all) def agents_bindings(self, *, agent: str | None = None) -> dict[str, Any]: """Read `openclaw agents bindings --json [--agent ]`.""" args = ["agents", "bindings", "--json"] if agent: args.extend(["--agent", agent]) return self.run_json(args) def agents_bindings_model(self, *, agent: str | None = None) -> dict[str, Any]: """Read `openclaw agents bindings --json` and return the raw dict.""" return self.agents_bindings(agent=agent) def agents_presence(self) -> dict[str, Any]: """Read session presence for all agents from runtime session files. Reads ~/.openclaw/agents/{agentId}/sessions/sessions.json for each agent and counts sessions in active states within a recency window. """ import json from pathlib import Path openclaw_home = Path.home() / ".openclaw" agents_path = openclaw_home / "agents" if not agents_path.exists(): return {"status": "not_connected", "agents": {}} ACTIVE_STATES = { "running", "active", "busy", "blocked", "waiting_approval", "working", "in_progress", "processing", "thinking", "executing", "streaming", } 45 * 60 * 1000 # 45 minutes result: dict[str, Any] = {"status": "connected", "agents": {}} try: for agent_dir in agents_path.iterdir(): if not agent_dir.is_dir(): continue sessions_file = agent_dir / "sessions" / "sessions.json" if not sessions_file.exists(): continue try: sessions_data = json.loads(sessions_file.read_text()) except (json.JSONDecodeError, OSError): continue sessions = sessions_data if isinstance(sessions_data, list) else [] active_count = 0 for session in sessions: if not isinstance(session, dict): continue state = str(session.get("state") or session.get("status") or "").lower() if state in ACTIVE_STATES: active_count += 1 if active_count > 0: result["agents"][agent_dir.name] = { "activeSessions": active_count, "status": "active", } else: result["agents"][agent_dir.name] = { "activeSessions": 0, "status": "idle", } except OSError: result["status"] = "partial" return result def agents_from_config(self) -> dict[str, Any]: """Read agent list directly from openclaw.json config file. Falls back to scanning ~/.openclaw/agents/ directories when config is absent. This avoids the CLI timeout from `agents list --json`. """ import json openclaw_home = Path.home() / ".openclaw" config_path = openclaw_home / "openclaw.json" if not config_path.exists(): return {"status": "not_connected", "agents": []} try: raw = json.loads(config_path.read_text()) except (json.JSONDecodeError, OSError): return {"status": "partial", "agents": []} agents_list = raw.get("agents", {}).get("list", []) if not agents_list: return {"status": "partial", "agents": [], "detail": "agents.list is empty"} agents = [] for entry in agents_list: if not isinstance(entry, dict): continue agent_id = entry.get("id", "").strip() if not agent_id: continue agents.append({ "id": agent_id, "name": entry.get("name", "").strip() or agent_id, "model": entry.get("model") or "", "workspace": entry.get("workspace") or "", "is_default": entry.get("id") == raw.get("agents", {}).get("defaults", {}).get("id"), }) return {"status": "connected", "agents": agents} def gateway_status(self, *, url: str | None = None, token: str | None = None) -> dict[str, Any]: """Read `openclaw gateway status --json [--url ] [--token ]`. May fail if gateway is unreachable.""" args = ["gateway", "status", "--json"] if url: args.extend(["--url", url]) if token: args.extend(["--token", token]) return self.run_json(args) def memory_status(self, *, agent: str | None = None, deep: bool = False) -> dict[str, Any]: """Read `openclaw memory status --json [--agent ] [--deep]`. Returns array of per-agent status.""" args = ["memory", "status", "--json"] if agent: args.extend(["--agent", agent]) if deep: args.append("--deep") return self.run_json(args) # ------------------------------------------------------------------------- # Write agents commands # ------------------------------------------------------------------------- def agents_add( self, name: str, *, workspace: str | None = None, model: str | None = None, agent_dir: str | None = None, bind: list[str] | None = None, non_interactive: bool = False, ) -> dict[str, Any]: """Run `openclaw agents add [--workspace ] [--model ] [--agent-dir ] [--bind ] [--non-interactive] --json`.""" args = ["agents", "add", name, "--json"] if workspace: args.extend(["--workspace", workspace]) if model: args.extend(["--model", model]) if agent_dir: args.extend(["--agent-dir", agent_dir]) if bind: for b in bind: args.extend(["--bind", b]) if non_interactive: args.append("--non-interactive") return self.run_json(args) def agents_delete(self, id: str, *, force: bool = False) -> dict[str, Any]: """Run `openclaw agents delete [--force] --json`.""" args = ["agents", "delete", id, "--json"] if force: args.append("--force") return self.run_json(args) def agents_bind( self, *, agent: str | None = None, bind: list[str] | None = None, ) -> dict[str, Any]: """Run `openclaw agents bind [--agent ] [--bind ] --json`.""" args = ["agents", "bind", "--json"] if agent: args.extend(["--agent", agent]) if bind: for b in bind: args.extend(["--bind", b]) return self.run_json(args) def agents_unbind( self, *, agent: str | None = None, bind: list[str] | None = None, all: bool = False, ) -> dict[str, Any]: """Run `openclaw agents unbind [--agent ] [--bind ] [--all] --json`.""" args = ["agents", "unbind", "--json"] if agent: args.extend(["--agent", agent]) if bind: for b in bind: args.extend(["--bind", b]) if all: args.append("--all") return self.run_json(args) def agents_set_identity( self, *, agent: str | None = None, workspace: str | None = None, identity_file: str | None = None, name: str | None = None, emoji: str | None = None, theme: str | None = None, avatar: str | None = None, from_identity: bool = False, ) -> dict[str, Any]: """Run `openclaw agents set-identity [--agent ] [--workspace ] [--identity-file ] [--from-identity] [--name ] [--emoji ] [--theme ] [--avatar ] --json`.""" args = ["agents", "set-identity", "--json"] if agent: args.extend(["--agent", agent]) if workspace: args.extend(["--workspace", workspace]) if identity_file: args.extend(["--identity-file", identity_file]) if from_identity: args.append("--from-identity") if name: args.extend(["--name", name]) if emoji: args.extend(["--emoji", emoji]) if theme: args.extend(["--theme", theme]) if avatar: args.extend(["--avatar", avatar]) return self.run_json(args) def run_json(self, args: list[str]) -> dict[str, Any]: """Run the CLI and decode JSON stdout, falling back to stderr.""" result = self.run(args) text = result.stdout.strip() or result.stderr.strip() if not text: return {} return json.loads(text) def run(self, args: list[str]) -> OpenClawCliResult: """Run the CLI and return stdout/stderr.""" command = [*self.base_command, *args] env = os.environ.copy() try: completed = subprocess.run( command, cwd=self.cwd, env=env, capture_output=True, text=True, timeout=self.timeout_seconds, check=False, ) except FileNotFoundError as exc: raise OpenClawCliError( "OpenClaw CLI executable was not found.", command=command, ) from exc except subprocess.TimeoutExpired as exc: raise OpenClawCliError( f"OpenClaw CLI timed out after {self.timeout_seconds:.1f}s.", command=command, stdout=exc.stdout or "", stderr=exc.stderr or "", ) from exc if completed.returncode != 0: raise OpenClawCliError( "OpenClaw CLI command failed.", command=command, exit_code=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, ) return OpenClawCliResult( command=command, exit_code=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, )