"""Tests for WebSocket market data feeds. This module contains tests for the WebSocketFeed implementations including Binance, OKX, and Bybit feeds. """ from __future__ import annotations import asyncio import json from datetime import datetime from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest from openclaw.exchange.websocket_feed import ( BinanceWebSocketFeed, BybitWebSocketFeed, ExchangeWebSocketFeed, KlineData, OKXWebSocketFeed, OrderBook, OrderBookLevel, Trade, WebSocketManager, create_default_manager, ) from openclaw.exchange.models import Ticker # ============================================================================ # Test Fixtures # ============================================================================ @pytest.fixture def mock_websocket(): """Create a mock WebSocket connection.""" ws = AsyncMock() ws.send = AsyncMock() ws.send_json = AsyncMock() ws.close = AsyncMock() return ws @pytest.fixture def ticker_callback(): """Create a mock ticker callback.""" return AsyncMock() @pytest.fixture def orderbook_callback(): """Create a mock orderbook callback.""" return AsyncMock() @pytest.fixture def trade_callback(): """Create a mock trade callback.""" return AsyncMock() @pytest.fixture def kline_callback(): """Create a mock kline callback.""" return AsyncMock() @pytest.fixture def binance_feed(): """Create a Binance WebSocket feed instance.""" return BinanceWebSocketFeed(futures=False) @pytest.fixture def okx_feed(): """Create an OKX WebSocket feed instance.""" return OKXWebSocketFeed() @pytest.fixture def bybit_feed(): """Create a Bybit WebSocket feed instance.""" return BybitWebSocketFeed(market="spot") @pytest.fixture def ws_manager(): """Create a WebSocket manager instance.""" return WebSocketManager() # ============================================================================ # Test Data Classes # ============================================================================ class TestOrderBook: """Tests for OrderBook data class.""" def test_orderbook_creation(self): """Test OrderBook creation.""" ob = OrderBook(symbol="BTC/USDT") assert ob.symbol == "BTC/USDT" assert ob.bids == [] assert ob.asks == [] assert ob.last_update_id == 0 def test_orderbook_best_bid_ask(self): """Test OrderBook best bid/ask properties.""" ob = OrderBook(symbol="BTC/USDT") ob.bids = [ OrderBookLevel(price=50000.0, amount=1.0), OrderBookLevel(price=49900.0, amount=2.0), ] ob.asks = [ OrderBookLevel(price=50100.0, amount=1.5), OrderBookLevel(price=50200.0, amount=2.5), ] assert ob.best_bid.price == 50000.0 assert ob.best_ask.price == 50100.0 assert ob.spread == 100.0 assert ob.mid_price == 50050.0 def test_orderbook_empty(self): """Test OrderBook with empty bids/asks.""" ob = OrderBook(symbol="BTC/USDT") assert ob.best_bid is None assert ob.best_ask is None assert ob.spread == 0.0 assert ob.mid_price == 0.0 def test_orderbook_update(self): """Test OrderBook update functionality.""" ob = OrderBook(symbol="BTC/USDT") # Initial update ob.update([[50000.0, 1.0], [49900.0, 2.0]], [[50100.0, 1.5]], 1) assert len(ob.bids) == 2 assert len(ob.asks) == 1 assert ob.bids[0].price == 50000.0 # Update existing level ob.update([[50000.0, 0.5]], [], 2) assert ob.bids[0].amount == 0.5 # Remove level with zero amount ob.update([[49900.0, 0.0]], [], 3) assert len(ob.bids) == 1 class TestKlineData: """Tests for KlineData data class.""" def test_kline_creation(self): """Test KlineData creation.""" kline = KlineData( symbol="BTC/USDT", interval="1m", timestamp=datetime.now(), open=50000.0, high=51000.0, low=49000.0, close=50500.0, volume=100.0, quote_volume=5000000.0, trades=1000, is_closed=True, ) assert kline.symbol == "BTC/USDT" assert kline.interval == "1m" assert kline.open == 50000.0 assert kline.high == 51000.0 assert kline.low == 49000.0 assert kline.close == 50500.0 assert kline.is_closed is True class TestTrade: """Tests for Trade data class.""" def test_trade_creation(self): """Test Trade creation.""" trade = Trade( symbol="BTC/USDT", trade_id="12345", price=50000.0, amount=0.5, side="buy", timestamp=datetime.now(), is_buyer_maker=True, ) assert trade.symbol == "BTC/USDT" assert trade.trade_id == "12345" assert trade.price == 50000.0 assert trade.amount == 0.5 assert trade.side == "buy" assert trade.is_buyer_maker is True # ============================================================================ # Test Binance WebSocket Feed # ============================================================================ class TestBinanceWebSocketFeed: """Tests for BinanceWebSocketFeed.""" def test_normalize_symbol(self, binance_feed): """Test symbol normalization for Binance.""" assert binance_feed._normalize_symbol("BTC/USDT") == "btcusdt" assert binance_feed._normalize_symbol("ETH/BTC") == "ethbtc" def test_denormalize_symbol(self, binance_feed): """Test symbol denormalization from Binance format.""" assert binance_feed._denormalize_symbol("btcusdt") == "BTC/USDT" assert binance_feed._denormalize_symbol("ethbtc") == "ETH/BTC" def test_get_interval_str(self, binance_feed): """Test interval string conversion.""" from openclaw.data.interface import Interval assert binance_feed._get_interval_str(Interval.MINUTE_1) == "1m" assert binance_feed._get_interval_str(Interval.HOUR_1) == "1h" assert binance_feed._get_interval_str("5m") == "5m" @pytest.mark.asyncio async def test_add_remove_ticker_callback(self, binance_feed, ticker_callback): """Test adding and removing ticker callbacks.""" binance_feed.add_ticker_callback("BTC/USDT", ticker_callback) assert ticker_callback in binance_feed._ticker_callbacks["BTC/USDT"] binance_feed.remove_ticker_callback("BTC/USDT", ticker_callback) assert ticker_callback not in binance_feed._ticker_callbacks["BTC/USDT"] @pytest.mark.asyncio async def test_add_remove_orderbook_callback(self, binance_feed, orderbook_callback): """Test adding and removing orderbook callbacks.""" binance_feed.add_orderbook_callback("BTC/USDT", orderbook_callback) assert orderbook_callback in binance_feed._orderbook_callbacks["BTC/USDT"] binance_feed.remove_orderbook_callback("BTC/USDT", orderbook_callback) assert orderbook_callback not in binance_feed._orderbook_callbacks["BTC/USDT"] @pytest.mark.asyncio async def test_handle_ticker(self, binance_feed, ticker_callback): """Test handling ticker messages.""" binance_feed.add_ticker_callback("BTC/USDT", ticker_callback) data = { "s": "BTCUSDT", "b": "50000.00", "a": "50010.00", "c": "50005.00", "h": "51000.00", "l": "49000.00", "v": "1000.00", } await binance_feed._handle_ticker(data) ticker_callback.assert_called_once() call_args = ticker_callback.call_args assert call_args[0][0] == "BTC/USDT" assert isinstance(call_args[0][1], Ticker) assert call_args[0][1].last == 50005.0 @pytest.mark.asyncio async def test_handle_trade(self, binance_feed, trade_callback): """Test handling trade messages.""" binance_feed.add_trade_callback("BTC/USDT", trade_callback) data = { "s": "BTCUSDT", "t": 12345, "p": "50000.00", "q": "0.500", "m": True, "T": 1640000000000, } await binance_feed._handle_trade(data) trade_callback.assert_called_once() call_args = trade_callback.call_args assert call_args[0][0] == "BTC/USDT" assert isinstance(call_args[0][1], Trade) assert call_args[0][1].price == 50000.0 @pytest.mark.asyncio async def test_handle_kline(self, binance_feed, kline_callback): """Test handling kline messages.""" binance_feed.add_kline_callback("BTC/USDT", kline_callback) data = { "k": { "s": "BTCUSDT", "i": "1m", "t": 1640000000000, "o": "50000.00", "h": "51000.00", "l": "49000.00", "c": "50500.00", "v": "100.00", "q": "5000000.00", "n": 1000, "x": True, } } await binance_feed._handle_kline(data) kline_callback.assert_called_once() call_args = kline_callback.call_args assert call_args[0][0] == "BTC/USDT" assert isinstance(call_args[0][1], KlineData) assert call_args[0][1].open == 50000.0 @pytest.mark.asyncio async def test_handle_orderbook(self, binance_feed, orderbook_callback): """Test handling orderbook messages.""" binance_feed.add_orderbook_callback("BTC/USDT", orderbook_callback) data = { "s": "BTCUSDT", "b": [["50000.00", "1.000"], ["49900.00", "2.000"]], "a": [["50100.00", "1.500"]], "u": 12345, } await binance_feed._handle_orderbook(data) orderbook_callback.assert_called_once() call_args = orderbook_callback.call_args assert call_args[0][0] == "BTC/USDT" assert isinstance(call_args[0][1], OrderBook) # ============================================================================ # Test OKX WebSocket Feed # ============================================================================ class TestOKXWebSocketFeed: """Tests for OKXWebSocketFeed.""" def test_normalize_symbol(self, okx_feed): """Test symbol normalization for OKX.""" assert okx_feed._normalize_symbol("BTC/USDT") == "BTC-USDT" assert okx_feed._normalize_symbol("ETH/BTC") == "ETH-BTC" def test_denormalize_symbol(self, okx_feed): """Test symbol denormalization from OKX format.""" assert okx_feed._denormalize_symbol("BTC-USDT") == "BTC/USDT" assert okx_feed._denormalize_symbol("ETH-BTC") == "ETH/BTC" @pytest.mark.asyncio async def test_handle_ticker(self, okx_feed, ticker_callback): """Test handling OKX ticker messages.""" okx_feed.add_ticker_callback("BTC/USDT", ticker_callback) data = { "instId": "BTC-USDT", "bidPx": "50000.00", "askPx": "50010.00", "last": "50005.00", "high24h": "51000.00", "low24h": "49000.00", "vol24h": "1000.00", } await okx_feed._handle_ticker("BTC/USDT", data) ticker_callback.assert_called_once() call_args = ticker_callback.call_args assert call_args[0][0] == "BTC/USDT" assert isinstance(call_args[0][1], Ticker) # ============================================================================ # Test Bybit WebSocket Feed # ============================================================================ class TestBybitWebSocketFeed: """Tests for BybitWebSocketFeed.""" def test_normalize_symbol(self, bybit_feed): """Test symbol normalization for Bybit.""" assert bybit_feed._normalize_symbol("BTC/USDT") == "BTCUSDT" assert bybit_feed._normalize_symbol("ETH/BTC") == "ETHBTC" def test_denormalize_symbol(self, bybit_feed): """Test symbol denormalization from Bybit format.""" assert bybit_feed._denormalize_symbol("BTCUSDT") == "BTC/USDT" assert bybit_feed._denormalize_symbol("ETHBTC") == "ETH/BTC" def test_get_interval_str(self, bybit_feed): """Test interval string conversion.""" from openclaw.data.interface import Interval assert bybit_feed._get_interval_str(Interval.MINUTE_1) == "1" assert bybit_feed._get_interval_str(Interval.HOUR_1) == "60" assert bybit_feed._get_interval_str("1m") == "1" @pytest.mark.asyncio async def test_handle_ticker(self, bybit_feed, ticker_callback): """Test handling Bybit ticker messages.""" bybit_feed.add_ticker_callback("BTC/USDT", ticker_callback) data = { "symbol": "BTCUSDT", "bid1Price": "50000.00", "ask1Price": "50010.00", "lastPrice": "50005.00", "highPrice24h": "51000.00", "lowPrice24h": "49000.00", "volume24h": "1000.00", } await bybit_feed._handle_ticker("BTC/USDT", data) ticker_callback.assert_called_once() call_args = ticker_callback.call_args assert call_args[0][0] == "BTC/USDT" assert isinstance(call_args[0][1], Ticker) # ============================================================================ # Test WebSocket Manager # ============================================================================ class TestWebSocketManager: """Tests for WebSocketManager.""" def test_add_remove_feed(self, ws_manager, binance_feed): """Test adding and removing feeds.""" ws_manager.add_feed(binance_feed) assert "binance" in ws_manager.list_feeds() ws_manager.remove_feed("binance") assert "binance" not in ws_manager.list_feeds() def test_get_feed(self, ws_manager, binance_feed): """Test getting a feed by ID.""" ws_manager.add_feed(binance_feed) feed = ws_manager.get_feed("binance") assert feed == binance_feed def test_get_feed_not_found(self, ws_manager): """Test getting a non-existent feed.""" feed = ws_manager.get_feed("nonexistent") assert feed is None @pytest.mark.asyncio async def test_subscribe_unsubscribe_ticker(self, ws_manager, binance_feed, ticker_callback): """Test subscribing and unsubscribing from ticker.""" ws_manager.add_feed(binance_feed) # Mock the feed's connect and subscribe methods binance_feed.connect = AsyncMock() binance_feed.subscribe_ticker = AsyncMock() binance_feed.unsubscribe_ticker = AsyncMock() await ws_manager.start() await ws_manager.subscribe_ticker("BTC/USDT", ticker_callback) binance_feed.subscribe_ticker.assert_called_once() await ws_manager.unsubscribe_ticker("BTC/USDT") binance_feed.unsubscribe_ticker.assert_called_once() await ws_manager.stop() @pytest.mark.asyncio async def test_get_all_tickers(self, ws_manager, binance_feed): """Test getting tickers from all feeds.""" ws_manager.add_feed(binance_feed) # Create a mock ticker ticker = Ticker( symbol="BTC/USDT", bid=50000.0, ask=50010.0, last=50005.0, timestamp=datetime.now(), ) binance_feed._tickers["BTC/USDT"] = ticker tickers = ws_manager.get_all_tickers("BTC/USDT") assert "binance" in tickers assert tickers["binance"] == ticker # ============================================================================ # Test Integration # ============================================================================ class TestIntegration: """Integration tests for WebSocket feeds.""" def test_create_default_manager(self): """Test creating a default manager with all exchanges.""" manager = create_default_manager() # Should have feeds for multiple exchanges feeds = manager.list_feeds() assert "binance" in feeds assert "binance_futures" in feeds assert "okx" in feeds assert "bybit_spot" in feeds assert "bybit_linear" in feeds # ============================================================================ # Test Error Handling # ============================================================================ class TestErrorHandling: """Tests for error handling.""" @pytest.mark.asyncio async def test_handle_invalid_json(self, binance_feed): """Test handling invalid JSON messages.""" # Should not raise an exception await binance_feed._handle_message("invalid json") @pytest.mark.asyncio async def test_handle_empty_message(self, binance_feed): """Test handling empty messages.""" # Should not raise an exception await binance_feed._handle_message("") @pytest.mark.asyncio async def test_handle_unknown_message_type(self, binance_feed): """Test handling unknown message types.""" # Should not raise an exception await binance_feed._handle_message(json.dumps({"unknown": "message"})) @pytest.mark.asyncio async def test_callback_error_handling(self, binance_feed): """Test that callback errors don't crash the feed.""" error_callback = AsyncMock(side_effect=Exception("Callback error")) binance_feed.add_ticker_callback("BTC/USDT", error_callback) data = {"s": "BTCUSDT", "b": "50000.00", "a": "50010.00", "c": "50005.00"} # Should not raise an exception await binance_feed._handle_ticker(data) error_callback.assert_called_once() # ============================================================================ # Main # ============================================================================ if __name__ == "__main__": pytest.main([__file__, "-v"])