Skip to content

Improve reconnection robustness for Bybit private/trading channels #2520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions nautilus_trader/adapters/bybit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from nautilus_trader.config import LiveDataClientConfig
from nautilus_trader.config import LiveExecClientConfig
from nautilus_trader.config import PositiveFloat
from nautilus_trader.config import PositiveInt


Expand Down Expand Up @@ -116,11 +117,13 @@ class BybitExecClientConfig(LiveExecClientConfig, frozen=True):
The maximum delay (milliseconds) between retries.
recv_window_ms : PositiveInt, default 5000
The receive window (milliseconds) for Bybit HTTP requests.
ws_trade_timeout_secs : float, default 5.0
ws_trade_timeout_secs : PositiveFloat, default 5.0
The timeout for trade websocket messages.
ws_auth_timeout_secs : PositiveFloat, default 5.0
The timeout for auth websocket messages.
futures_leverages : dict[BybitSymbol, PositiveInt], optional
The leverages for futures.
position_mode : BybitPositionMode, optional
position_mode : dict[BybitSymbol, BybitPositionMode], optional
The position mode for `USDT perpetual` and `Inverse futures`.
margin_mode : BybitMarginMode, optional
Set Margin Mode.
Expand All @@ -147,7 +150,8 @@ class BybitExecClientConfig(LiveExecClientConfig, frozen=True):
retry_delay_initial_ms: PositiveInt | None = None
retry_delay_max_ms: PositiveInt | None = None
recv_window_ms: PositiveInt = 5_000
ws_trade_timeout_secs: float | None = 5.0
ws_trade_timeout_secs: PositiveFloat | None = 5.0
ws_auth_timeout_secs: PositiveFloat | None = 5.0
futures_leverages: dict[BybitSymbol, PositiveInt] | None = None
position_mode: dict[BybitSymbol, BybitPositionMode] | None = None
margin_mode: BybitMarginMode | None = None
42 changes: 34 additions & 8 deletions nautilus_trader/adapters/bybit/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from nautilus_trader.common.component import LiveClock
from nautilus_trader.common.component import Logger
from nautilus_trader.common.enums import LogColor
from nautilus_trader.config import PositiveFloat
from nautilus_trader.core.nautilus_pyo3 import WebSocketClient
from nautilus_trader.core.nautilus_pyo3 import WebSocketClientError
from nautilus_trader.core.nautilus_pyo3 import WebSocketConfig
Expand Down Expand Up @@ -83,7 +84,7 @@ class BybitWebSocketClient:
Whether the client is a private channel.
is_trade : bool, optional
Whether the client is a trade channel.
ws_trade_timeout_secs: float, default 5.0
ws_trade_timeout_secs: PositiveFloat, default 5.0
The timeout for trade websocket messages.
recv_window_ms : int, default 5_000
The receive window (milliseconds) for Bybit WebSocket order requests.
Expand All @@ -106,7 +107,8 @@ def __init__(
loop: asyncio.AbstractEventLoop,
is_private: bool | None = False,
is_trade: bool | None = False,
ws_trade_timeout_secs: float | None = 5.0,
ws_trade_timeout_secs: PositiveFloat | None = 5.0,
ws_auth_timeout_secs: PositiveFloat | None = 5.0,
recv_window_ms: int = 5_000,
) -> None:
if is_private and is_trade:
Expand Down Expand Up @@ -135,6 +137,10 @@ def __init__(
self._is_trade = is_trade
self._auth_required = is_private or is_trade
self._is_authenticated = False
self._ws_auth_timeout_secs = ws_auth_timeout_secs

self._reconnect_task: asyncio.Task | None = None
self._auth_event = asyncio.Event()

self._decoder_ws_message_general = msgspec_json.Decoder(BybitWsMessageGeneral)
self._decoder_ws_private_channel_auth = msgspec_json.Decoder(BybitWsPrivateChannelAuthMsg)
Expand Down Expand Up @@ -189,7 +195,7 @@ def reconnect(self) -> None:

self._log.warning(f"Trying to reconnect to {self._base_url}")
self._reconnecting = True
self._loop.create_task(self._reconnect_wrapper())
self._reconnect_task = self._loop.create_task(self._reconnect_wrapper())

async def _reconnect_wrapper(self) -> None:
try:
Expand Down Expand Up @@ -266,18 +272,38 @@ def _check_auth_success(self, raw: bytes) -> None:

if msg.is_auth_success():
self._is_authenticated = True
self._auth_event.set()
self._log.info(f"{self.channel_type} channel authenticated", LogColor.GREEN)
else:
raise RuntimeError(f"{self.channel_type} channel authentication failed: {msg}")

async def _authenticate(self) -> None:
self._is_authenticated = False
self._auth_event.clear()

signature = self._get_signature()
await self._send(signature)

while not self._is_authenticated:
self._log.debug(f"Waiting for {self.channel_type} channel authentication")
await asyncio.sleep(0.1)
try:
await self._send(signature)
await asyncio.wait_for(
self._auth_event.wait(),
timeout=self._ws_auth_timeout_secs,
)
except TimeoutError:
self._log.warning(f"{self.channel_type} channel authentication timeout")
raise
except WebSocketClientError as e:
self._log.exception(
f"{self.channel_type} channel failed to send authentication request",
{e},
)
raise
except Exception as e:
self._log.exception(
f"{self.channel_type} channel unexpected error during authentication",
{e},
)
raise

async def _subscribe(self, subscription: str) -> None:
self._log.debug(f"Subscribing to {subscription}")
Expand Down Expand Up @@ -430,7 +456,7 @@ async def _order(
args: list[
BybitPlaceOrderPostParams | BybitAmendOrderPostParams | BybitCancelOrderPostParams
],
timeout_secs: float | None,
timeout_secs: PositiveFloat | None,
) -> BybitWsOrderResponseMsg:
req_id = UUID4().value

Expand Down
Loading