# -*- coding: utf-8 -*- """OpenClaw WebSocket handlers — gateway calls OpenClaw Gateway via WebSocket.""" from __future__ import annotations import json import logging from typing import TYPE_CHECKING if TYPE_CHECKING: from backend.services.gateway import Gateway logger = logging.getLogger(__name__) def _ensure_session_bridge(gateway) -> None: """Forward OpenClaw session events into 大时代 frontend websockets.""" if getattr(gateway, "_openclaw_session_bridge_ready", False): return async def _forward(event) -> None: payload = event.payload or {} session_key = str(payload.get("sessionKey") or payload.get("key") or "").strip() if not session_key: return subscriber_map = getattr(gateway, "_openclaw_session_subscribers", {}) targets = [ ws for ws, session_keys in list(subscriber_map.items()) if session_key in session_keys ] if not targets: return message = json.dumps( { "type": "openclaw_session_event", "event": event.event, "session_key": session_key, "payload": payload, } ) stale = [] for ws in targets: try: await ws.send(message) except Exception: stale.append(ws) for ws in stale: try: subscriber_map.pop(ws, None) except Exception: pass def _handler(event) -> None: try: import asyncio asyncio.create_task(_forward(event)) except Exception as exc: logger.debug("OpenClaw session bridge skipped event: %s", exc) client = _get_ws_client(gateway) client.add_event_handler(_handler) gateway._openclaw_session_bridge_ready = True gateway._openclaw_session_bridge_handler = _handler if not hasattr(gateway, "_openclaw_session_subscribers"): gateway._openclaw_session_subscribers = {} def _get_ws_client(gateway) -> "OpenClawWebSocketClient": """Get the OpenClaw WebSocket client from gateway.""" from shared.client.openclaw_websocket_client import OpenClawWebSocketClient client = gateway._openclaw_ws if client is None: raise RuntimeError("OpenClaw Gateway not connected") return client async def _ws_call(gateway, method: str, params: dict | None = None) -> dict: """Call OpenClaw Gateway via WebSocket and return result.""" try: client = _get_ws_client(gateway) return await client.call_method(method, params) except Exception as exc: logger.warning("OpenClaw Gateway call failed for %s: %s", method, exc) return {"error": str(exc)[:200]} async def handle_get_openclaw_status(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "status") await websocket.send(json.dumps({"type": "openclaw_status_loaded", "data": result})) async def handle_get_openclaw_sessions(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "sessions.list", {"limit": 50, "includeLastMessage": True}) await websocket.send(json.dumps({"type": "openclaw_sessions_loaded", "data": result})) async def handle_get_openclaw_session_detail(gateway, websocket, data: dict) -> None: session_key = data.get("session_key", "") result = await _ws_call(gateway, "sessions.list", {"limit": 200, "includeLastMessage": True}) session = None if isinstance(result, dict): for item in result.get("sessions", []) or []: if not isinstance(item, dict): continue if item.get("key") == session_key or item.get("sessionKey") == session_key: session = item break await websocket.send(json.dumps({ "type": "openclaw_session_detail_loaded", "data": {"session": session, "error": None if session else f"session '{session_key}' not found"}, "session_key": session_key, })) async def handle_get_openclaw_session_history(gateway, websocket, data: dict) -> None: session_key = data.get("session_key", "") limit = data.get("limit", 20) try: from backend.services.openclaw_cli import OpenClawCliService result = OpenClawCliService().get_session_history_model(session_key, limit=limit) payload = { "session_key": result.session_key, "session_id": result.session_id, "history": result.events, "events": result.events, "raw_text": result.raw_text, } except Exception as exc: payload = {"error": str(exc)[:200], "history": []} await websocket.send(json.dumps({ "type": "openclaw_session_history_loaded", "data": payload, "session_key": session_key, })) async def handle_openclaw_resolve_session(gateway, websocket, data: dict) -> None: params = {} agent_id = str(data.get("agent_id") or "").strip() label = str(data.get("label") or "").strip() channel = str(data.get("channel") or "").strip() if agent_id: params["agentId"] = agent_id if label: params["label"] = label if channel: params["channel"] = channel params["includeGlobal"] = bool(data.get("include_global", True)) result = await _ws_call(gateway, "sessions.resolve", params) await websocket.send(json.dumps({"type": "openclaw_session_resolved", "data": result})) async def handle_openclaw_create_session(gateway, websocket, data: dict) -> None: params = {} agent_id = str(data.get("agent_id") or "").strip() label = str(data.get("label") or "").strip() model = str(data.get("model") or "").strip() initial_message = str(data.get("initial_message") or "").strip() if agent_id: params["agentId"] = agent_id if label: params["label"] = label if model: params["model"] = model if initial_message: params["message"] = initial_message result = await _ws_call(gateway, "sessions.create", params) await websocket.send(json.dumps({"type": "openclaw_session_created", "data": result})) async def handle_openclaw_send_message(gateway, websocket, data: dict) -> None: session_key = str(data.get("session_key") or "").strip() message = str(data.get("message") or "").strip() thinking = str(data.get("thinking") or "").strip() if not session_key or not message: await websocket.send( json.dumps( { "type": "openclaw_message_sent", "data": {"error": "session_key and message are required"}, } ) ) return params = {"key": session_key, "message": message} if thinking: params["thinking"] = thinking result = await _ws_call(gateway, "sessions.send", params) await websocket.send( json.dumps( { "type": "openclaw_message_sent", "data": result, "session_key": session_key, } ) ) async def handle_openclaw_subscribe_session(gateway, websocket, data: dict) -> None: session_key = str(data.get("session_key") or "").strip() if not session_key: await websocket.send( json.dumps( { "type": "openclaw_session_subscribed", "data": {"error": "session_key is required"}, } ) ) return _ensure_session_bridge(gateway) result = await _ws_call(gateway, "sessions.messages.subscribe", {"key": session_key}) if not isinstance(result, dict) or not result.get("error"): subscriber_map = getattr(gateway, "_openclaw_session_subscribers", {}) subscriber_map.setdefault(websocket, set()).add(session_key) gateway._openclaw_session_subscribers = subscriber_map await websocket.send( json.dumps( { "type": "openclaw_session_subscribed", "data": result, "session_key": session_key, } ) ) async def handle_openclaw_unsubscribe_session(gateway, websocket, data: dict) -> None: session_key = str(data.get("session_key") or "").strip() if not session_key: await websocket.send( json.dumps( { "type": "openclaw_session_unsubscribed", "data": {"error": "session_key is required"}, } ) ) return result = await _ws_call(gateway, "sessions.messages.unsubscribe", {"key": session_key}) subscriber_map = getattr(gateway, "_openclaw_session_subscribers", {}) session_keys = subscriber_map.get(websocket) if isinstance(session_keys, set): session_keys.discard(session_key) if not session_keys: subscriber_map.pop(websocket, None) gateway._openclaw_session_subscribers = subscriber_map await websocket.send( json.dumps( { "type": "openclaw_session_unsubscribed", "data": result, "session_key": session_key, } ) ) async def handle_openclaw_reset_session(gateway, websocket, data: dict) -> None: session_key = str(data.get("session_key") or "").strip() if not session_key: await websocket.send( json.dumps( { "type": "openclaw_session_reset", "data": {"error": "session_key is required"}, } ) ) return result = await _ws_call(gateway, "sessions.reset", {"key": session_key}) await websocket.send( json.dumps( { "type": "openclaw_session_reset", "data": result, "session_key": session_key, } ) ) async def handle_openclaw_delete_session(gateway, websocket, data: dict) -> None: session_key = str(data.get("session_key") or "").strip() if not session_key: await websocket.send( json.dumps( { "type": "openclaw_session_deleted", "data": {"error": "session_key is required"}, } ) ) return result = await _ws_call(gateway, "sessions.delete", {"key": session_key}) await websocket.send( json.dumps( { "type": "openclaw_session_deleted", "data": result, "session_key": session_key, } ) ) async def handle_get_openclaw_cron(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "cron.list") await websocket.send(json.dumps({"type": "openclaw_cron_loaded", "data": result})) async def handle_get_openclaw_approvals(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "exec.approvals.get") await websocket.send(json.dumps({"type": "openclaw_approvals_loaded", "data": result})) async def handle_get_openclaw_agents(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "agents.list") sessions_result = await _ws_call( gateway, "sessions.list", {"limit": 200, "includeLastMessage": True}, ) config_result = await _ws_call(gateway, "config.get") session_model_by_agent: dict[str, str] = {} default_session_model: str | None = None agent_skills_by_id: dict[str, list[str] | None] = {} default_agent_skills: list[str] | None = None parsed_config = config_result.get("parsed") if isinstance(config_result, dict) else None if isinstance(parsed_config, dict): agents_cfg = parsed_config.get("agents") if isinstance(agents_cfg, dict): defaults_cfg = agents_cfg.get("defaults") if isinstance(defaults_cfg, dict): default_skills = defaults_cfg.get("skills") if isinstance(default_skills, list): default_agent_skills = [ str(skill).strip() for skill in default_skills if str(skill).strip() ] list_cfg = agents_cfg.get("list") if isinstance(list_cfg, list): for entry in list_cfg: if not isinstance(entry, dict): continue agent_id = str(entry.get("id") or "").strip() if not agent_id: continue skills = entry.get("skills") if isinstance(skills, list): agent_skills_by_id[agent_id] = [ str(skill).strip() for skill in skills if str(skill).strip() ] elif skills == []: agent_skills_by_id[agent_id] = [] if isinstance(sessions_result, dict) and isinstance(sessions_result.get("sessions"), list): defaults = sessions_result.get("defaults") if isinstance(defaults, dict): value = ( defaults.get("model") or defaults.get("modelName") or defaults.get("model_name") ) if value: default_session_model = str(value) for session in sessions_result.get("sessions", []): if not isinstance(session, dict): continue agent_id = str( session.get("agentId") or session.get("agent_id") or "" ).strip() if not agent_id: key = str(session.get("key") or session.get("sessionKey") or "").strip() parts = key.split(":") if len(parts) >= 3 and parts[0] == "agent": agent_id = parts[1] model_value = ( session.get("model") or session.get("modelName") or session.get("model_name") or session.get("resolvedModel") or session.get("resolved_model") or session.get("defaultModel") or session.get("default_model") ) if agent_id and model_value and agent_id not in session_model_by_agent: session_model_by_agent[agent_id] = str(model_value) if isinstance(result, dict) and isinstance(result.get("agents"), list): normalized_agents = [] for agent in result.get("agents", []): if not isinstance(agent, dict): normalized_agents.append(agent) continue normalized = dict(agent) if not normalized.get("model"): normalized["model"] = ( normalized.get("modelName") or normalized.get("model_name") or normalized.get("resolvedModel") or normalized.get("resolved_model") or normalized.get("defaultModel") or normalized.get("default_model") or session_model_by_agent.get(str(normalized.get("id") or "").strip()) or default_session_model ) agent_id = str(normalized.get("id") or "").strip() if "skills" not in normalized: normalized["skills"] = agent_skills_by_id.get(agent_id, default_agent_skills) normalized_agents.append(normalized) result = {**result, "agents": normalized_agents} await websocket.send(json.dumps({"type": "openclaw_agents_loaded", "data": result})) async def handle_get_openclaw_agents_presence(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "node.list") await websocket.send(json.dumps({"type": "openclaw_agents_presence_loaded", "data": result})) async def handle_get_openclaw_skills(gateway, websocket, data: dict) -> None: agent_id = str(data.get("agent_id") or "").strip() params = {"agentId": agent_id} if agent_id else {} result = await _ws_call(gateway, "skills.status", params) await websocket.send(json.dumps({"type": "openclaw_skills_loaded", "data": result})) async def handle_get_openclaw_models(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "models.list") await websocket.send(json.dumps({"type": "openclaw_models_loaded", "data": result})) async def handle_get_openclaw_hooks(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "tools.catalog") await websocket.send(json.dumps({"type": "openclaw_hooks_loaded", "data": result})) async def handle_get_openclaw_plugins(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "config.get") await websocket.send(json.dumps({"type": "openclaw_plugins_loaded", "data": result})) async def handle_get_openclaw_secrets_audit(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "secrets.reload") await websocket.send(json.dumps({"type": "openclaw_secrets_audit_loaded", "data": result})) async def handle_get_openclaw_security_audit(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "gateway.identity.get") await websocket.send(json.dumps({"type": "openclaw_security_audit_loaded", "data": result})) async def handle_get_openclaw_daemon_status(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "doctor.memory.status") await websocket.send(json.dumps({"type": "openclaw_daemon_status_loaded", "data": result})) async def handle_get_openclaw_pairing(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "device.pair.list") await websocket.send(json.dumps({"type": "openclaw_pairing_loaded", "data": result})) async def handle_get_openclaw_qr(gateway, websocket, data: dict) -> None: await websocket.send(json.dumps({"type": "openclaw_qr_loaded", "data": {"error": "QR code not available via WebSocket"}})) async def handle_get_openclaw_update_status(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "update.run") await websocket.send(json.dumps({"type": "openclaw_update_status_loaded", "data": result})) async def handle_get_openclaw_models_aliases(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "models.list") await websocket.send(json.dumps({"type": "openclaw_models_aliases_loaded", "data": result})) async def handle_get_openclaw_models_fallbacks(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "models.list") await websocket.send(json.dumps({"type": "openclaw_models_fallbacks_loaded", "data": result})) async def handle_get_openclaw_models_image_fallbacks(gateway, websocket, data: dict) -> None: result = await _ws_call(gateway, "models.list") await websocket.send(json.dumps({"type": "openclaw_models_image_fallbacks_loaded", "data": result})) async def handle_get_openclaw_skill_update(gateway, websocket, data: dict) -> None: slug = data.get("slug") all_flag = data.get("all", False) params = {} if slug is not None: params["slug"] = slug if all_flag: params["all"] = "true" result = await _ws_call(gateway, "skills.update", params) await websocket.send(json.dumps({"type": "openclaw_skill_update_loaded", "data": result})) async def handle_get_openclaw_workspace_files(gateway, websocket, data: dict) -> None: raw_workspace = data.get("workspace", "") # Use the workspace param (which is actually the agent.id from frontend) as agent_id agent_id = raw_workspace or "main" result = await _ws_call(gateway, "agents.files.list", {"agentId": agent_id}) if isinstance(result, dict): result["workspace"] = agent_id await websocket.send(json.dumps({"type": "openclaw_workspace_files_loaded", "data": result})) async def handle_get_openclaw_workspace_file(gateway, websocket, data: dict) -> None: agent_id = data.get("agent_id", "main") file_name = data.get("file_name", "") if not file_name: await websocket.send(json.dumps({"type": "openclaw_workspace_file_loaded", "data": {"error": "file_name is required"}})) return result = await _ws_call(gateway, "agents.files.get", {"agentId": agent_id, "name": file_name}) await websocket.send(json.dumps({"type": "openclaw_workspace_file_loaded", "data": result}))