MCPcopy Index your code
hub / github.com/SkyworkAI/DeepResearchAgent / test_spot_websocket

Function test_spot_websocket

tests/test_binance.py:20–81  ·  view source on GitHub ↗

Test Binance Spot WebSocket.

()

Source from the content-addressed store, hash-verified

18
19
20def test_spot_websocket():
21 """Test Binance Spot WebSocket."""
22 print("=" * 60)
23 print("Testing Binance Spot WebSocket (1m closed klines only)")
24 print("=" * 60)
25
26 message_count = 0
27 should_stop = False
28
29 def on_message(ws, data):
30 nonlocal message_count, should_stop
31 message_count += 1
32
33 # Data is already processed by WebSocket class (only closed 1m klines)
34 # timestamp is already formatted as 'YYYY-MM-DD HH:MM:SS'
35 print(f"[Spot] {data['timestamp']} | {data['open_time']} | {data['close_time']} | {data['symbol']} | "
36 f"Open: {data['open']} | Close: {data['close']} | "
37 f"High: {data['high']} | Low: {data['low']} | "
38 f"Volume: {data['volume']} | Interval: {data['interval']}")
39
40 # Stop after receiving 5 closed klines
41 if message_count >= 5:
42 should_stop = True
43 try:
44 ws.close()
45 except:
46 pass
47
48 def on_error(ws, error):
49 print(f"[Spot] Error: {error}")
50
51 def on_close(ws):
52 print("[Spot] WebSocket closed")
53
54 def on_open(ws):
55 print("[Spot] WebSocket opened")
56
57 # Create and start Spot WebSocket
58 ws_client = BinanceSpotWebSocket(
59 on_message=on_message,
60 on_error=on_error,
61 on_close=on_close,
62 on_open=on_open,
63 testnet=False
64 )
65
66 # Subscribe to 1-minute klines (only closed klines will be processed)
67 ws_client.subscribe_kline("BTCUSDT", "1m")
68
69 print("Starting Spot WebSocket...")
70 ws_client.start()
71
72 # Wait for messages
73 try:
74 while ws_client.is_running() and not should_stop:
75 time.sleep(0.5)
76 except KeyboardInterrupt:
77 print("\n[Spot] Interrupted by user")

Callers

nothing calls this directly

Calls 6

subscribe_klineMethod · 0.95
startMethod · 0.95
is_runningMethod · 0.95
stopMethod · 0.95
printFunction · 0.85

Tested by

no test coverage detected