Test Binance Spot WebSocket.
()
| 18 | |
| 19 | |
| 20 | def test_spot_websocket(): |
| 21 | """Test Binance Spot WebSocket.""" |
| 22 | print("=" * 60) |
| 23 | print("Testing Binance Spot WebSocket (1m closed klines only)") |
| 24 | print("=" * 60) |
| 25 | |
| 26 | message_count = 0 |
| 27 | should_stop = False |
| 28 | |
| 29 | def on_message(ws, data): |
| 30 | nonlocal message_count, should_stop |
| 31 | message_count += 1 |
| 32 | |
| 33 | # Data is already processed by WebSocket class (only closed 1m klines) |
| 34 | # timestamp is already formatted as 'YYYY-MM-DD HH:MM:SS' |
| 35 | print(f"[Spot] {data['timestamp']} | {data['open_time']} | {data['close_time']} | {data['symbol']} | " |
| 36 | f"Open: {data['open']} | Close: {data['close']} | " |
| 37 | f"High: {data['high']} | Low: {data['low']} | " |
| 38 | f"Volume: {data['volume']} | Interval: {data['interval']}") |
| 39 | |
| 40 | # Stop after receiving 5 closed klines |
| 41 | if message_count >= 5: |
| 42 | should_stop = True |
| 43 | try: |
| 44 | ws.close() |
| 45 | except: |
| 46 | pass |
| 47 | |
| 48 | def on_error(ws, error): |
| 49 | print(f"[Spot] Error: {error}") |
| 50 | |
| 51 | def on_close(ws): |
| 52 | print("[Spot] WebSocket closed") |
| 53 | |
| 54 | def on_open(ws): |
| 55 | print("[Spot] WebSocket opened") |
| 56 | |
| 57 | # Create and start Spot WebSocket |
| 58 | ws_client = BinanceSpotWebSocket( |
| 59 | on_message=on_message, |
| 60 | on_error=on_error, |
| 61 | on_close=on_close, |
| 62 | on_open=on_open, |
| 63 | testnet=False |
| 64 | ) |
| 65 | |
| 66 | # Subscribe to 1-minute klines (only closed klines will be processed) |
| 67 | ws_client.subscribe_kline("BTCUSDT", "1m") |
| 68 | |
| 69 | print("Starting Spot WebSocket...") |
| 70 | ws_client.start() |
| 71 | |
| 72 | # Wait for messages |
| 73 | try: |
| 74 | while ws_client.is_running() and not should_stop: |
| 75 | time.sleep(0.5) |
| 76 | except KeyboardInterrupt: |
| 77 | print("\n[Spot] Interrupted by user") |
nothing calls this directly
no test coverage detected