568 lines
18 KiB
Python
568 lines
18 KiB
Python
"""Tests for WebSocket market data feeds.
|
|
|
|
This module contains tests for the WebSocketFeed implementations
|
|
including Binance, OKX, and Bybit feeds.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
from unittest.mock import AsyncMock, MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from openclaw.exchange.websocket_feed import (
|
|
BinanceWebSocketFeed,
|
|
BybitWebSocketFeed,
|
|
ExchangeWebSocketFeed,
|
|
KlineData,
|
|
OKXWebSocketFeed,
|
|
OrderBook,
|
|
OrderBookLevel,
|
|
Trade,
|
|
WebSocketManager,
|
|
create_default_manager,
|
|
)
|
|
from openclaw.exchange.models import Ticker
|
|
|
|
|
|
# ============================================================================
|
|
# Test Fixtures
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_websocket():
|
|
"""Create a mock WebSocket connection."""
|
|
ws = AsyncMock()
|
|
ws.send = AsyncMock()
|
|
ws.send_json = AsyncMock()
|
|
ws.close = AsyncMock()
|
|
return ws
|
|
|
|
|
|
@pytest.fixture
|
|
def ticker_callback():
|
|
"""Create a mock ticker callback."""
|
|
return AsyncMock()
|
|
|
|
|
|
@pytest.fixture
|
|
def orderbook_callback():
|
|
"""Create a mock orderbook callback."""
|
|
return AsyncMock()
|
|
|
|
|
|
@pytest.fixture
|
|
def trade_callback():
|
|
"""Create a mock trade callback."""
|
|
return AsyncMock()
|
|
|
|
|
|
@pytest.fixture
|
|
def kline_callback():
|
|
"""Create a mock kline callback."""
|
|
return AsyncMock()
|
|
|
|
|
|
@pytest.fixture
|
|
def binance_feed():
|
|
"""Create a Binance WebSocket feed instance."""
|
|
return BinanceWebSocketFeed(futures=False)
|
|
|
|
|
|
@pytest.fixture
|
|
def okx_feed():
|
|
"""Create an OKX WebSocket feed instance."""
|
|
return OKXWebSocketFeed()
|
|
|
|
|
|
@pytest.fixture
|
|
def bybit_feed():
|
|
"""Create a Bybit WebSocket feed instance."""
|
|
return BybitWebSocketFeed(market="spot")
|
|
|
|
|
|
@pytest.fixture
|
|
def ws_manager():
|
|
"""Create a WebSocket manager instance."""
|
|
return WebSocketManager()
|
|
|
|
|
|
# ============================================================================
|
|
# Test Data Classes
|
|
# ============================================================================
|
|
|
|
|
|
class TestOrderBook:
|
|
"""Tests for OrderBook data class."""
|
|
|
|
def test_orderbook_creation(self):
|
|
"""Test OrderBook creation."""
|
|
ob = OrderBook(symbol="BTC/USDT")
|
|
assert ob.symbol == "BTC/USDT"
|
|
assert ob.bids == []
|
|
assert ob.asks == []
|
|
assert ob.last_update_id == 0
|
|
|
|
def test_orderbook_best_bid_ask(self):
|
|
"""Test OrderBook best bid/ask properties."""
|
|
ob = OrderBook(symbol="BTC/USDT")
|
|
ob.bids = [
|
|
OrderBookLevel(price=50000.0, amount=1.0),
|
|
OrderBookLevel(price=49900.0, amount=2.0),
|
|
]
|
|
ob.asks = [
|
|
OrderBookLevel(price=50100.0, amount=1.5),
|
|
OrderBookLevel(price=50200.0, amount=2.5),
|
|
]
|
|
|
|
assert ob.best_bid.price == 50000.0
|
|
assert ob.best_ask.price == 50100.0
|
|
assert ob.spread == 100.0
|
|
assert ob.mid_price == 50050.0
|
|
|
|
def test_orderbook_empty(self):
|
|
"""Test OrderBook with empty bids/asks."""
|
|
ob = OrderBook(symbol="BTC/USDT")
|
|
assert ob.best_bid is None
|
|
assert ob.best_ask is None
|
|
assert ob.spread == 0.0
|
|
assert ob.mid_price == 0.0
|
|
|
|
def test_orderbook_update(self):
|
|
"""Test OrderBook update functionality."""
|
|
ob = OrderBook(symbol="BTC/USDT")
|
|
|
|
# Initial update
|
|
ob.update([[50000.0, 1.0], [49900.0, 2.0]], [[50100.0, 1.5]], 1)
|
|
|
|
assert len(ob.bids) == 2
|
|
assert len(ob.asks) == 1
|
|
assert ob.bids[0].price == 50000.0
|
|
|
|
# Update existing level
|
|
ob.update([[50000.0, 0.5]], [], 2)
|
|
assert ob.bids[0].amount == 0.5
|
|
|
|
# Remove level with zero amount
|
|
ob.update([[49900.0, 0.0]], [], 3)
|
|
assert len(ob.bids) == 1
|
|
|
|
|
|
class TestKlineData:
|
|
"""Tests for KlineData data class."""
|
|
|
|
def test_kline_creation(self):
|
|
"""Test KlineData creation."""
|
|
kline = KlineData(
|
|
symbol="BTC/USDT",
|
|
interval="1m",
|
|
timestamp=datetime.now(),
|
|
open=50000.0,
|
|
high=51000.0,
|
|
low=49000.0,
|
|
close=50500.0,
|
|
volume=100.0,
|
|
quote_volume=5000000.0,
|
|
trades=1000,
|
|
is_closed=True,
|
|
)
|
|
|
|
assert kline.symbol == "BTC/USDT"
|
|
assert kline.interval == "1m"
|
|
assert kline.open == 50000.0
|
|
assert kline.high == 51000.0
|
|
assert kline.low == 49000.0
|
|
assert kline.close == 50500.0
|
|
assert kline.is_closed is True
|
|
|
|
|
|
class TestTrade:
|
|
"""Tests for Trade data class."""
|
|
|
|
def test_trade_creation(self):
|
|
"""Test Trade creation."""
|
|
trade = Trade(
|
|
symbol="BTC/USDT",
|
|
trade_id="12345",
|
|
price=50000.0,
|
|
amount=0.5,
|
|
side="buy",
|
|
timestamp=datetime.now(),
|
|
is_buyer_maker=True,
|
|
)
|
|
|
|
assert trade.symbol == "BTC/USDT"
|
|
assert trade.trade_id == "12345"
|
|
assert trade.price == 50000.0
|
|
assert trade.amount == 0.5
|
|
assert trade.side == "buy"
|
|
assert trade.is_buyer_maker is True
|
|
|
|
|
|
# ============================================================================
|
|
# Test Binance WebSocket Feed
|
|
# ============================================================================
|
|
|
|
|
|
class TestBinanceWebSocketFeed:
|
|
"""Tests for BinanceWebSocketFeed."""
|
|
|
|
def test_normalize_symbol(self, binance_feed):
|
|
"""Test symbol normalization for Binance."""
|
|
assert binance_feed._normalize_symbol("BTC/USDT") == "btcusdt"
|
|
assert binance_feed._normalize_symbol("ETH/BTC") == "ethbtc"
|
|
|
|
def test_denormalize_symbol(self, binance_feed):
|
|
"""Test symbol denormalization from Binance format."""
|
|
assert binance_feed._denormalize_symbol("btcusdt") == "BTC/USDT"
|
|
assert binance_feed._denormalize_symbol("ethbtc") == "ETH/BTC"
|
|
|
|
def test_get_interval_str(self, binance_feed):
|
|
"""Test interval string conversion."""
|
|
from openclaw.data.interface import Interval
|
|
|
|
assert binance_feed._get_interval_str(Interval.MINUTE_1) == "1m"
|
|
assert binance_feed._get_interval_str(Interval.HOUR_1) == "1h"
|
|
assert binance_feed._get_interval_str("5m") == "5m"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_remove_ticker_callback(self, binance_feed, ticker_callback):
|
|
"""Test adding and removing ticker callbacks."""
|
|
binance_feed.add_ticker_callback("BTC/USDT", ticker_callback)
|
|
assert ticker_callback in binance_feed._ticker_callbacks["BTC/USDT"]
|
|
|
|
binance_feed.remove_ticker_callback("BTC/USDT", ticker_callback)
|
|
assert ticker_callback not in binance_feed._ticker_callbacks["BTC/USDT"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_remove_orderbook_callback(self, binance_feed, orderbook_callback):
|
|
"""Test adding and removing orderbook callbacks."""
|
|
binance_feed.add_orderbook_callback("BTC/USDT", orderbook_callback)
|
|
assert orderbook_callback in binance_feed._orderbook_callbacks["BTC/USDT"]
|
|
|
|
binance_feed.remove_orderbook_callback("BTC/USDT", orderbook_callback)
|
|
assert orderbook_callback not in binance_feed._orderbook_callbacks["BTC/USDT"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_ticker(self, binance_feed, ticker_callback):
|
|
"""Test handling ticker messages."""
|
|
binance_feed.add_ticker_callback("BTC/USDT", ticker_callback)
|
|
|
|
data = {
|
|
"s": "BTCUSDT",
|
|
"b": "50000.00",
|
|
"a": "50010.00",
|
|
"c": "50005.00",
|
|
"h": "51000.00",
|
|
"l": "49000.00",
|
|
"v": "1000.00",
|
|
}
|
|
|
|
await binance_feed._handle_ticker(data)
|
|
|
|
ticker_callback.assert_called_once()
|
|
call_args = ticker_callback.call_args
|
|
assert call_args[0][0] == "BTC/USDT"
|
|
assert isinstance(call_args[0][1], Ticker)
|
|
assert call_args[0][1].last == 50005.0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_trade(self, binance_feed, trade_callback):
|
|
"""Test handling trade messages."""
|
|
binance_feed.add_trade_callback("BTC/USDT", trade_callback)
|
|
|
|
data = {
|
|
"s": "BTCUSDT",
|
|
"t": 12345,
|
|
"p": "50000.00",
|
|
"q": "0.500",
|
|
"m": True,
|
|
"T": 1640000000000,
|
|
}
|
|
|
|
await binance_feed._handle_trade(data)
|
|
|
|
trade_callback.assert_called_once()
|
|
call_args = trade_callback.call_args
|
|
assert call_args[0][0] == "BTC/USDT"
|
|
assert isinstance(call_args[0][1], Trade)
|
|
assert call_args[0][1].price == 50000.0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_kline(self, binance_feed, kline_callback):
|
|
"""Test handling kline messages."""
|
|
binance_feed.add_kline_callback("BTC/USDT", kline_callback)
|
|
|
|
data = {
|
|
"k": {
|
|
"s": "BTCUSDT",
|
|
"i": "1m",
|
|
"t": 1640000000000,
|
|
"o": "50000.00",
|
|
"h": "51000.00",
|
|
"l": "49000.00",
|
|
"c": "50500.00",
|
|
"v": "100.00",
|
|
"q": "5000000.00",
|
|
"n": 1000,
|
|
"x": True,
|
|
}
|
|
}
|
|
|
|
await binance_feed._handle_kline(data)
|
|
|
|
kline_callback.assert_called_once()
|
|
call_args = kline_callback.call_args
|
|
assert call_args[0][0] == "BTC/USDT"
|
|
assert isinstance(call_args[0][1], KlineData)
|
|
assert call_args[0][1].open == 50000.0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_orderbook(self, binance_feed, orderbook_callback):
|
|
"""Test handling orderbook messages."""
|
|
binance_feed.add_orderbook_callback("BTC/USDT", orderbook_callback)
|
|
|
|
data = {
|
|
"s": "BTCUSDT",
|
|
"b": [["50000.00", "1.000"], ["49900.00", "2.000"]],
|
|
"a": [["50100.00", "1.500"]],
|
|
"u": 12345,
|
|
}
|
|
|
|
await binance_feed._handle_orderbook(data)
|
|
|
|
orderbook_callback.assert_called_once()
|
|
call_args = orderbook_callback.call_args
|
|
assert call_args[0][0] == "BTC/USDT"
|
|
assert isinstance(call_args[0][1], OrderBook)
|
|
|
|
|
|
# ============================================================================
|
|
# Test OKX WebSocket Feed
|
|
# ============================================================================
|
|
|
|
|
|
class TestOKXWebSocketFeed:
|
|
"""Tests for OKXWebSocketFeed."""
|
|
|
|
def test_normalize_symbol(self, okx_feed):
|
|
"""Test symbol normalization for OKX."""
|
|
assert okx_feed._normalize_symbol("BTC/USDT") == "BTC-USDT"
|
|
assert okx_feed._normalize_symbol("ETH/BTC") == "ETH-BTC"
|
|
|
|
def test_denormalize_symbol(self, okx_feed):
|
|
"""Test symbol denormalization from OKX format."""
|
|
assert okx_feed._denormalize_symbol("BTC-USDT") == "BTC/USDT"
|
|
assert okx_feed._denormalize_symbol("ETH-BTC") == "ETH/BTC"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_ticker(self, okx_feed, ticker_callback):
|
|
"""Test handling OKX ticker messages."""
|
|
okx_feed.add_ticker_callback("BTC/USDT", ticker_callback)
|
|
|
|
data = {
|
|
"instId": "BTC-USDT",
|
|
"bidPx": "50000.00",
|
|
"askPx": "50010.00",
|
|
"last": "50005.00",
|
|
"high24h": "51000.00",
|
|
"low24h": "49000.00",
|
|
"vol24h": "1000.00",
|
|
}
|
|
|
|
await okx_feed._handle_ticker("BTC/USDT", data)
|
|
|
|
ticker_callback.assert_called_once()
|
|
call_args = ticker_callback.call_args
|
|
assert call_args[0][0] == "BTC/USDT"
|
|
assert isinstance(call_args[0][1], Ticker)
|
|
|
|
|
|
# ============================================================================
|
|
# Test Bybit WebSocket Feed
|
|
# ============================================================================
|
|
|
|
|
|
class TestBybitWebSocketFeed:
|
|
"""Tests for BybitWebSocketFeed."""
|
|
|
|
def test_normalize_symbol(self, bybit_feed):
|
|
"""Test symbol normalization for Bybit."""
|
|
assert bybit_feed._normalize_symbol("BTC/USDT") == "BTCUSDT"
|
|
assert bybit_feed._normalize_symbol("ETH/BTC") == "ETHBTC"
|
|
|
|
def test_denormalize_symbol(self, bybit_feed):
|
|
"""Test symbol denormalization from Bybit format."""
|
|
assert bybit_feed._denormalize_symbol("BTCUSDT") == "BTC/USDT"
|
|
assert bybit_feed._denormalize_symbol("ETHBTC") == "ETH/BTC"
|
|
|
|
def test_get_interval_str(self, bybit_feed):
|
|
"""Test interval string conversion."""
|
|
from openclaw.data.interface import Interval
|
|
|
|
assert bybit_feed._get_interval_str(Interval.MINUTE_1) == "1"
|
|
assert bybit_feed._get_interval_str(Interval.HOUR_1) == "60"
|
|
assert bybit_feed._get_interval_str("1m") == "1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_ticker(self, bybit_feed, ticker_callback):
|
|
"""Test handling Bybit ticker messages."""
|
|
bybit_feed.add_ticker_callback("BTC/USDT", ticker_callback)
|
|
|
|
data = {
|
|
"symbol": "BTCUSDT",
|
|
"bid1Price": "50000.00",
|
|
"ask1Price": "50010.00",
|
|
"lastPrice": "50005.00",
|
|
"highPrice24h": "51000.00",
|
|
"lowPrice24h": "49000.00",
|
|
"volume24h": "1000.00",
|
|
}
|
|
|
|
await bybit_feed._handle_ticker("BTC/USDT", data)
|
|
|
|
ticker_callback.assert_called_once()
|
|
call_args = ticker_callback.call_args
|
|
assert call_args[0][0] == "BTC/USDT"
|
|
assert isinstance(call_args[0][1], Ticker)
|
|
|
|
|
|
# ============================================================================
|
|
# Test WebSocket Manager
|
|
# ============================================================================
|
|
|
|
|
|
class TestWebSocketManager:
|
|
"""Tests for WebSocketManager."""
|
|
|
|
def test_add_remove_feed(self, ws_manager, binance_feed):
|
|
"""Test adding and removing feeds."""
|
|
ws_manager.add_feed(binance_feed)
|
|
assert "binance" in ws_manager.list_feeds()
|
|
|
|
ws_manager.remove_feed("binance")
|
|
assert "binance" not in ws_manager.list_feeds()
|
|
|
|
def test_get_feed(self, ws_manager, binance_feed):
|
|
"""Test getting a feed by ID."""
|
|
ws_manager.add_feed(binance_feed)
|
|
feed = ws_manager.get_feed("binance")
|
|
assert feed == binance_feed
|
|
|
|
def test_get_feed_not_found(self, ws_manager):
|
|
"""Test getting a non-existent feed."""
|
|
feed = ws_manager.get_feed("nonexistent")
|
|
assert feed is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe_unsubscribe_ticker(self, ws_manager, binance_feed, ticker_callback):
|
|
"""Test subscribing and unsubscribing from ticker."""
|
|
ws_manager.add_feed(binance_feed)
|
|
|
|
# Mock the feed's connect and subscribe methods
|
|
binance_feed.connect = AsyncMock()
|
|
binance_feed.subscribe_ticker = AsyncMock()
|
|
binance_feed.unsubscribe_ticker = AsyncMock()
|
|
|
|
await ws_manager.start()
|
|
await ws_manager.subscribe_ticker("BTC/USDT", ticker_callback)
|
|
|
|
binance_feed.subscribe_ticker.assert_called_once()
|
|
|
|
await ws_manager.unsubscribe_ticker("BTC/USDT")
|
|
binance_feed.unsubscribe_ticker.assert_called_once()
|
|
|
|
await ws_manager.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_all_tickers(self, ws_manager, binance_feed):
|
|
"""Test getting tickers from all feeds."""
|
|
ws_manager.add_feed(binance_feed)
|
|
|
|
# Create a mock ticker
|
|
ticker = Ticker(
|
|
symbol="BTC/USDT",
|
|
bid=50000.0,
|
|
ask=50010.0,
|
|
last=50005.0,
|
|
timestamp=datetime.now(),
|
|
)
|
|
binance_feed._tickers["BTC/USDT"] = ticker
|
|
|
|
tickers = ws_manager.get_all_tickers("BTC/USDT")
|
|
assert "binance" in tickers
|
|
assert tickers["binance"] == ticker
|
|
|
|
|
|
# ============================================================================
|
|
# Test Integration
|
|
# ============================================================================
|
|
|
|
|
|
class TestIntegration:
|
|
"""Integration tests for WebSocket feeds."""
|
|
|
|
def test_create_default_manager(self):
|
|
"""Test creating a default manager with all exchanges."""
|
|
manager = create_default_manager()
|
|
|
|
# Should have feeds for multiple exchanges
|
|
feeds = manager.list_feeds()
|
|
assert "binance" in feeds
|
|
assert "binance_futures" in feeds
|
|
assert "okx" in feeds
|
|
assert "bybit_spot" in feeds
|
|
assert "bybit_linear" in feeds
|
|
|
|
|
|
# ============================================================================
|
|
# Test Error Handling
|
|
# ============================================================================
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Tests for error handling."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_invalid_json(self, binance_feed):
|
|
"""Test handling invalid JSON messages."""
|
|
# Should not raise an exception
|
|
await binance_feed._handle_message("invalid json")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_empty_message(self, binance_feed):
|
|
"""Test handling empty messages."""
|
|
# Should not raise an exception
|
|
await binance_feed._handle_message("")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_unknown_message_type(self, binance_feed):
|
|
"""Test handling unknown message types."""
|
|
# Should not raise an exception
|
|
await binance_feed._handle_message(json.dumps({"unknown": "message"}))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_callback_error_handling(self, binance_feed):
|
|
"""Test that callback errors don't crash the feed."""
|
|
error_callback = AsyncMock(side_effect=Exception("Callback error"))
|
|
binance_feed.add_ticker_callback("BTC/USDT", error_callback)
|
|
|
|
data = {"s": "BTCUSDT", "b": "50000.00", "a": "50010.00", "c": "50005.00"}
|
|
|
|
# Should not raise an exception
|
|
await binance_feed._handle_ticker(data)
|
|
|
|
error_callback.assert_called_once()
|
|
|
|
|
|
# ============================================================================
|
|
# Main
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|