#!/usr/bin/env python3 """Demo script for WebSocket market data feeds. This script demonstrates how to use the WebSocketManager to subscribe to real-time market data from multiple exchanges (Binance, OKX, Bybit). Usage: python demo_websocket.py Press Ctrl+C to stop. """ from __future__ import annotations import asyncio import signal import sys from datetime import datetime from typing import Optional from loguru import logger from openclaw.exchange.websocket_feed import ( BinanceWebSocketFeed, BybitWebSocketFeed, OKXWebSocketFeed, WebSocketManager, OrderBook, Trade, KlineData, Ticker, ) # Global flag for graceful shutdown running = True async def on_ticker(symbol: str, ticker: Ticker) -> None: """Handle ticker updates.""" print(f"\n[TICKER] {symbol}") print(f" Last: {ticker.last}") print(f" Bid: {ticker.bid} | Ask: {ticker.ask}") print(f" Spread: {ticker.spread:.4f} ({ticker.spread_pct:.4f}%)") if ticker.high and ticker.low: print(f" 24h High: {ticker.high} | Low: {ticker.low}") if ticker.volume: print(f" 24h Volume: {ticker.volume:,.2f}") async def on_orderbook(symbol: str, orderbook: OrderBook) -> None: """Handle order book updates.""" print(f"\n[ORDERBOOK] {symbol}") print(f" Best Bid: {orderbook.best_bid.price if orderbook.best_bid else 'N/A'}") print(f" Best Ask: {orderbook.best_ask.price if orderbook.best_ask else 'N/A'}") print(f" Spread: {orderbook.spread:.4f}") print(f" Mid Price: {orderbook.mid_price:.4f}") # Show top 5 levels print(" Bids (top 5):") for level in orderbook.bids[:5]: print(f" {level.price:.2f} x {level.amount:.4f}") print(" Asks (top 5):") for level in orderbook.asks[:5]: print(f" {level.price:.2f} x {level.amount:.4f}") async def on_trade(symbol: str, trade: Trade) -> None: """Handle trade updates.""" side_emoji = "🟢" if trade.side == "buy" else "🔴" print(f"\n[TRADE] {side_emoji} {symbol}") print(f" Price: {trade.price}") print(f" Amount: {trade.amount}") print(f" Side: {trade.side}") print(f" Time: {trade.timestamp.strftime('%H:%M:%S.%f')[:-3]}") async def on_kline(symbol: str, kline: KlineData) -> None: """Handle kline/candlestick updates.""" status = "✓" if kline.is_closed else "..." print(f"\n[KLINE {status}] {symbol} ({kline.interval})") print(f" O: {kline.open:.2f} | H: {kline.high:.2f} | L: {kline.low:.2f} | C: {kline.close:.2f}") print(f" Volume: {kline.volume:.4f}") if kline.trades: print(f" Trades: {kline.trades}") def signal_handler(sig: int, frame: Optional[object]) -> None: """Handle shutdown signals.""" global running print("\n\nShutting down...") running = False async def demo_single_exchange() -> None: """Demo with a single exchange (Binance).""" print("=" * 60) print("Demo 1: Single Exchange (Binance)") print("=" * 60) # Create a WebSocket manager manager = WebSocketManager() # Add Binance feed binance = BinanceWebSocketFeed(futures=False) manager.add_feed(binance) # Start the manager await manager.start() # Subscribe to market data symbol = "BTC/USDT" await manager.subscribe_ticker(symbol, on_ticker) await manager.subscribe_orderbook(symbol, on_orderbook) await manager.subscribe_trades(symbol, on_trade) await manager.subscribe_klines(symbol, "1m", on_kline) print(f"\nSubscribed to {symbol} on Binance") print("Waiting for data... (Press Ctrl+C to stop)\n") # Run for 60 seconds try: for _ in range(60): if not running: break await asyncio.sleep(1) except asyncio.CancelledError: pass # Cleanup await manager.stop() print("\nDemo 1 completed.\n") async def demo_multiple_exchanges() -> None: """Demo with multiple exchanges.""" print("=" * 60) print("Demo 2: Multiple Exchanges (Binance, OKX, Bybit)") print("=" * 60) # Create a WebSocket manager manager = WebSocketManager() # Add multiple exchange feeds manager.add_feed(BinanceWebSocketFeed(futures=False)) manager.add_feed(OKXWebSocketFeed()) manager.add_feed(BybitWebSocketFeed(market="spot")) # Start the manager await manager.start() # Subscribe to market data on all exchanges symbols = ["BTC/USDT", "ETH/USDT"] for symbol in symbols: await manager.subscribe_ticker(symbol, on_ticker) print(f"Subscribed to {symbol} ticker on all exchanges") print("\nWaiting for data from multiple exchanges... (Press Ctrl+C to stop)\n") # Run for 60 seconds try: for _ in range(60): if not running: break await asyncio.sleep(1) except asyncio.CancelledError: pass # Cleanup await manager.stop() print("\nDemo 2 completed.\n") async def demo_default_manager() -> None: """Demo using the default manager with all exchanges pre-configured.""" print("=" * 60) print("Demo 3: Default Manager (All Exchanges)") print("=" * 60) # Create default manager with all exchanges from openclaw.exchange.websocket_feed import create_default_manager manager = create_default_manager() await manager.start() # Subscribe to popular symbols symbols = ["BTC/USDT", "ETH/USDT", "SOL/USDT"] for symbol in symbols: await manager.subscribe_ticker(symbol, on_ticker) await manager.subscribe_orderbook(symbol, on_orderbook) print(f"Subscribed to {symbol} on all available exchanges") print("\nWaiting for data... (Press Ctrl+C to stop)\n") # Run for 60 seconds try: for _ in range(60): if not running: break await asyncio.sleep(1) except asyncio.CancelledError: pass # Cleanup await manager.stop() print("\nDemo 3 completed.\n") async def main() -> None: """Main entry point.""" global running # Setup signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print("\n" + "=" * 60) print("WebSocket Market Data Feed Demo") print("=" * 60) print("\nThis demo shows how to use WebSocket feeds from multiple") print("cryptocurrency exchanges to receive real-time market data.\n") try: # Run demos if running: await demo_single_exchange() if running: await asyncio.sleep(2) await demo_multiple_exchanges() if running: await asyncio.sleep(2) await demo_default_manager() except Exception as e: logger.error(f"Demo error: {e}") raise print("\n" + "=" * 60) print("All demos completed!") print("=" * 60) if __name__ == "__main__": # Run the async main function asyncio.run(main())