#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Quick test script for OpenClaw WebSocket client.""" import asyncio import sys sys.path.insert(0, '.') from shared.client.openclaw_websocket_client import ( OpenClawWebSocketClient, DEFAULT_GATEWAY_URL, ) async def test_connection(): """Test basic connection to OpenClaw Gateway.""" print(f"Connecting to {DEFAULT_GATEWAY_URL}...") client = OpenClawWebSocketClient( url=DEFAULT_GATEWAY_URL, client_name="cli", client_version="1.0.0", gateway_token="d4b2d831b8a177b5cac07e781f438af3840bd0dfaca630ee", ) try: hello = await client.connect() print(f"āœ“ Connected!") print(f" Protocol version: {hello.protocol}") print(f" Server version: {hello.server_version}") print(f" Connection ID: {hello.conn_id}") # List sessions print("\nListing sessions...") sessions = await client.list_sessions(limit=5) print(f" Found {len(sessions)} sessions") for session in sessions[:3]: print(f" - {session.get('key', 'unknown')}") # List agents print("\nListing agents...") agents = await client.list_agents() print(f" Found {len(agents)} agents") for agent in agents[:3]: print(f" - {agent.get('id', 'unknown')}: {agent.get('name', 'unknown')}") # Send a message to agent if sessions: session_key = sessions[0].get('key') if session_key: print(f"\nSending message to session: {session_key}") result = await client.send_message(session_key, "Hello! This is a test message from Python.") print(f" Message sent: {result}") await client.disconnect() print("\nāœ“ All tests passed!") return True except Exception as e: print(f"āœ— Error: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": success = asyncio.run(test_connection()) sys.exit(0 if success else 1)