Skip to content

Commit 49e0da9

Browse files
authored
Add support for Binance's Mark Price Stream across all markets (#2670)
1 parent d1eb748 commit 49e0da9

File tree

5 files changed

+57
-15
lines changed

5 files changed

+57
-15
lines changed

nautilus_trader/adapters/binance/data.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ def __init__(
170170
self._connect_websockets_delay: float = 0.0 # Delay for bulk subscriptions to come in
171171
self._connect_websockets_task: asyncio.Task | None = None
172172

173+
self._subscribe_allow_no_instrument_id = [
174+
BinanceFuturesMarkPriceUpdate,
175+
]
176+
173177
# HTTP API
174178
self._http_client = client
175179
self._http_market = market
@@ -296,7 +300,10 @@ def _should_retry(self, error_code: BinanceErrorCode, retries: int) -> bool:
296300

297301
async def _subscribe(self, command: SubscribeData) -> None:
298302
instrument_id: InstrumentId | None = command.data_type.metadata.get("instrument_id")
299-
if instrument_id is None:
303+
if (
304+
instrument_id is None
305+
and command.data_type.type not in self._subscribe_allow_no_instrument_id
306+
):
300307
self._log.error(
301308
f"Cannot subscribe to `{command.data_type.type}` no instrument ID in `data_type` metadata",
302309
)
@@ -307,21 +314,25 @@ async def _subscribe(self, command: SubscribeData) -> None:
307314
elif command.data_type.type == BinanceFuturesMarkPriceUpdate:
308315
if not self._binance_account_type.is_futures:
309316
self._log.error(
310-
f"Cannot subscribe to `BinanceFuturesMarkPriceUpdate` "
317+
"Cannot subscribe to `BinanceFuturesMarkPriceUpdate` "
311318
f"for {self._binance_account_type.value} account types",
312319
)
313320
return
314-
await self._ws_client.subscribe_mark_price(instrument_id.symbol.value, speed=1000)
321+
mark_price_symbol = instrument_id.symbol.value if instrument_id else None
322+
await self._ws_client.subscribe_mark_price(mark_price_symbol, speed=1000)
315323
else:
316324
self._log.error(
317325
f"Cannot subscribe to {command.data_type.type} (not implemented)",
318326
)
319327

320328
async def _unsubscribe(self, command: UnsubscribeData) -> None:
321329
instrument_id: InstrumentId | None = command.data_type.metadata.get("instrument_id")
322-
if instrument_id is None:
330+
if (
331+
instrument_id is None
332+
and command.data_type.type not in self._subscribe_allow_no_instrument_id
333+
):
323334
self._log.error(
324-
"Cannot subscribe to `BinanceFuturesMarkPriceUpdate` no instrument ID in `data_type` metadata",
335+
"Cannot unsubscribe to `BinanceFuturesMarkPriceUpdate` no instrument ID in `data_type` metadata",
325336
)
326337
return
327338

nautilus_trader/adapters/binance/futures/data.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from nautilus_trader.adapters.binance.data import BinanceCommonDataClient
2323
from nautilus_trader.adapters.binance.futures.enums import BinanceFuturesEnumParser
2424
from nautilus_trader.adapters.binance.futures.http.market import BinanceFuturesMarketHttpAPI
25+
from nautilus_trader.adapters.binance.futures.schemas.market import BinanceFuturesMarkPriceAllMsg
26+
from nautilus_trader.adapters.binance.futures.schemas.market import BinanceFuturesMarkPriceData
2527
from nautilus_trader.adapters.binance.futures.schemas.market import BinanceFuturesMarkPriceMsg
2628
from nautilus_trader.adapters.binance.futures.schemas.market import BinanceFuturesTradeMsg
2729
from nautilus_trader.adapters.binance.futures.types import BinanceFuturesMarkPriceUpdate
@@ -111,10 +113,14 @@ def __init__(
111113

112114
# Register additional futures websocket handlers
113115
self._ws_handlers["@markPrice"] = self._handle_mark_price
116+
self._ws_handlers["!markPrice@arr"] = self._handle_mark_price_all
114117

115118
# Websocket msgspec decoders
116119
self._decoder_futures_trade_msg = msgspec.json.Decoder(BinanceFuturesTradeMsg)
117120
self._decoder_futures_mark_price_msg = msgspec.json.Decoder(BinanceFuturesMarkPriceMsg)
121+
self._decoder_futures_mark_price_all_msg = msgspec.json.Decoder(
122+
BinanceFuturesMarkPriceAllMsg,
123+
)
118124

119125
# -- WEBSOCKET HANDLERS ---------------------------------------------------------------------------------
120126

@@ -149,10 +155,9 @@ def _handle_trade(self, raw: bytes) -> None:
149155
else:
150156
self._handle_data(trade_tick)
151157

152-
def _handle_mark_price(self, raw: bytes) -> None:
153-
msg = self._decoder_futures_mark_price_msg.decode(raw)
154-
instrument_id: InstrumentId = self._get_cached_instrument_id(msg.data.s)
155-
data = msg.data.parse_to_binance_futures_mark_price_update(
158+
def _handle_mark_price_data(self, data: BinanceFuturesMarkPriceData) -> None:
159+
instrument_id: InstrumentId = self._get_cached_instrument_id(data.s)
160+
data = data.parse_to_binance_futures_mark_price_update(
156161
instrument_id=instrument_id,
157162
ts_init=self._clock.timestamp_ns(),
158163
)
@@ -161,6 +166,7 @@ def _handle_mark_price(self, raw: bytes) -> None:
161166
metadata={"instrument_id": instrument_id},
162167
)
163168
generic = CustomData(data_type=data_type, data=data)
169+
164170
self._handle_data(generic)
165171
self._handle_data(
166172
MarkPriceUpdate(
@@ -170,3 +176,12 @@ def _handle_mark_price(self, raw: bytes) -> None:
170176
data.ts_init,
171177
),
172178
)
179+
180+
def _handle_mark_price(self, raw: bytes) -> None:
181+
msg = self._decoder_futures_mark_price_msg.decode(raw)
182+
self._handle_mark_price_data(msg.data)
183+
184+
def _handle_mark_price_all(self, raw: bytes) -> None:
185+
msg = self._decoder_futures_mark_price_all_msg.decode(raw)
186+
for data in msg.data:
187+
self._handle_mark_price_data(data)

nautilus_trader/adapters/binance/futures/schemas/market.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,12 @@ class BinanceFuturesMarkPriceMsg(msgspec.Struct, frozen=True):
248248

249249
stream: str
250250
data: BinanceFuturesMarkPriceData
251+
252+
253+
class BinanceFuturesMarkPriceAllMsg(msgspec.Struct, frozen=True):
254+
"""
255+
WebSocket message from Binance Futures All Mark Price Update events.
256+
"""
257+
258+
stream: str
259+
data: list[BinanceFuturesMarkPriceData]

nautilus_trader/adapters/binance/futures/types.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414
# -------------------------------------------------------------------------------------------------
1515

16+
from __future__ import annotations
17+
1618
from decimal import Decimal
1719
from typing import Any
1820

@@ -47,7 +49,7 @@ class BinanceFuturesMarkPriceUpdate(Data):
4749
4850
References
4951
----------
50-
https://binance-docs.github.io/apidocs/futures/en/#mark-price-stream
52+
https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Mark-Price-Stream
5153
5254
"""
5355

@@ -109,7 +111,7 @@ def ts_init(self) -> int:
109111
return self._ts_init
110112

111113
@staticmethod
112-
def from_dict(values: dict[str, Any]) -> "BinanceFuturesMarkPriceUpdate":
114+
def from_dict(values: dict[str, Any]) -> BinanceFuturesMarkPriceUpdate:
113115
"""
114116
Return a Binance Futures mark price update parsed from the given values.
115117
@@ -135,7 +137,7 @@ def from_dict(values: dict[str, Any]) -> "BinanceFuturesMarkPriceUpdate":
135137
)
136138

137139
@staticmethod
138-
def to_dict(obj: "BinanceFuturesMarkPriceUpdate") -> dict[str, Any]:
140+
def to_dict(obj: BinanceFuturesMarkPriceUpdate) -> dict[str, Any]:
139141
"""
140142
Return a dictionary representation of this object.
141143

nautilus_trader/adapters/binance/websocket/client.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ async def _connect_client(self, client_id: int, streams: list[str]) -> None:
225225
msg = self._create_subscribe_msg(streams=streams[1:])
226226
await self._send(client_id, msg)
227227
self._log.debug(
228-
f"Client {client_id}: Subscribed to additional {len(streams)-1} streams",
228+
f"Client {client_id}: Subscribed to additional {len(streams) - 1} streams",
229229
)
230230

231231
def _handle_ping(self, client_id: int, raw: bytes) -> None:
@@ -557,12 +557,17 @@ async def subscribe_mark_price(
557557
"""
558558
Subscribe to aggregate mark price stream.
559559
"""
560-
if speed not in (1000, 3000):
560+
if speed and speed not in (1000, 3000):
561561
raise ValueError(f"`speed` options are 1000ms or 3000ms only, was {speed}")
562+
562563
if symbol is None:
563564
stream = "!markPrice@arr"
564565
else:
565-
stream = f"{BinanceSymbol(symbol).lower()}@markPrice@{int(speed / 1000)}s"
566+
stream = f"{BinanceSymbol(symbol).lower()}@markPrice"
567+
568+
if speed:
569+
stream += f"@{int(speed / 1000)}s"
570+
566571
await self._subscribe(stream)
567572

568573
async def unsubscribe_mark_price(

0 commit comments

Comments
 (0)