-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstream_trades.py
More file actions
110 lines (81 loc) · 2.7 KB
/
stream_trades.py
File metadata and controls
110 lines (81 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python3
"""
WebSocket Streaming Example - Real-Time Trade Data
Stream trades via WebSocket.
Available WebSocket streams:
- trades: Executed trades with price, size, direction
- orders: Order lifecycle events (open, filled, cancelled)
- book_updates: Order book changes (incremental deltas)
- events: Balance changes, transfers, deposits, withdrawals
- twap: TWAP execution data
- writer_actions: HyperCore <-> HyperEVM asset transfers
Note: L2/L4 order book snapshots are available via gRPC (see stream_orderbook.py).
Requirements:
pip install hyperliquid-sdk[websocket]
Usage:
export ENDPOINT="https://your-endpoint.example.com/TOKEN"
python stream_trades.py
"""
import os
import signal
import sys
import time
from datetime import datetime
from hyperliquid_sdk import HyperliquidSDK
ENDPOINT = os.environ.get("ENDPOINT") or os.environ.get("QUICKNODE_ENDPOINT")
if not ENDPOINT:
print("WebSocket Streaming Example")
print("=" * 60)
print()
print("Usage:")
print(" export ENDPOINT='https://your-endpoint.example.com/TOKEN'")
print(" python stream_trades.py")
sys.exit(1)
def timestamp():
return datetime.now().isoformat()[11:23]
def main():
print("=" * 60)
print("WebSocket Trade Streaming")
print("=" * 60)
trade_count = [0]
# Create SDK client
sdk = HyperliquidSDK(ENDPOINT)
stream = sdk.stream
def on_connect():
print("[CONNECTED]")
def on_error(err):
print(f"[ERROR] {err}")
def on_trade(data):
# Events are [[user, trade_data], ...]
block = data.get("block", {})
for event in block.get("events", []):
if isinstance(event, list) and len(event) >= 2:
t = event[1] # trade_data is second element
trade_count[0] += 1
coin = t.get("coin", "?")
px = float(t.get("px", "0"))
sz = t.get("sz", "?")
side = "BUY " if t.get("side") == "B" else "SELL"
print(f"[{timestamp()}] {side} {sz} {coin} @ ${px:,.2f}")
stream.on_connect = on_connect
stream.on_error = on_error
stream.trades(["BTC", "ETH"], on_trade)
print("\nSubscribing to BTC and ETH trades...")
print("-" * 60)
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print("\nShutting down gracefully...")
stream.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
stream.start()
start = time.time()
while trade_count[0] < 20 and time.time() - start < 60:
time.sleep(0.1)
print(f"\nReceived {trade_count[0]} trades.")
stream.stop()
print()
print("=" * 60)
print("Done!")
if __name__ == "__main__":
main()