Skip to content

Commit 9f0a33f

Browse files
committed
Refine databento venue dataset mapping
1 parent d1eea6f commit 9f0a33f

File tree

4 files changed

+31
-31
lines changed

4 files changed

+31
-31
lines changed

nautilus_trader/adapters/databento/config.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
# limitations under the License.
1414
# -------------------------------------------------------------------------------------------------
1515

16-
from nautilus_trader.adapters.databento.types import Dataset
1716
from nautilus_trader.config import LiveDataClientConfig
1817
from nautilus_trader.model.identifiers import InstrumentId
19-
from nautilus_trader.model.identifiers import Venue
2018

2119

2220
class DatabentoDataClientConfig(LiveDataClientConfig, frozen=True):
@@ -46,7 +44,7 @@ class DatabentoDataClientConfig(LiveDataClientConfig, frozen=True):
4644
e.g. {'GLBX.MDP3', ['ES.FUT', 'ES.OPT']} (for all E-mini S&P 500 futures and options products).
4745
instrument_ids : list[InstrumentId], optional
4846
The instrument IDs to request instrument definitions for on start.
49-
venue_dataset_map: dict[Venue, Dataset], optional
47+
venue_dataset_map: dict[str, str], optional
5048
A dictionary to override the default dataset used for a venue.
5149
5250
"""
@@ -59,4 +57,4 @@ class DatabentoDataClientConfig(LiveDataClientConfig, frozen=True):
5957
mbo_subscriptions_delay: float | None = 3.0 # Need to have received all definitions
6058
instrument_ids: list[InstrumentId] | None = None
6159
parent_symbols: dict[str, set[str]] | None = None
62-
venue_dataset_map: dict[Venue, Dataset] | None = None
60+
venue_dataset_map: dict[str, str] | None = None

nautilus_trader/adapters/databento/data.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def __init__(
162162

163163
# Cache instrument index
164164
for instrument_id in config.instrument_ids or []:
165-
dataset = self._get_dataset_for_venue(instrument_id.venue)
165+
dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
166166
self._instrument_ids[dataset].add(instrument_id)
167167

168168
# MBO/L3 subscription buffering
@@ -175,11 +175,6 @@ def __init__(
175175
self._update_dataset_ranges_interval_secs: int = 60 * 60 # Once per hour (hard-coded)
176176
self._update_dataset_ranges_task: asyncio.Task | None = None
177177

178-
def _get_dataset_for_venue(self, venue: Venue) -> Dataset:
179-
return (
180-
self._venue_dataset_map and self._venue_dataset_map.get(venue)
181-
) or self._loader.get_dataset_for_venue(venue)
182-
183178
async def _connect(self) -> None:
184179
if not self._instrument_ids:
185180
return # Nothing else to do yet
@@ -331,7 +326,7 @@ def _send_all_instruments_to_data_engine(self) -> None:
331326

332327
async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -> None:
333328
try:
334-
dataset: Dataset = self._get_dataset_for_venue(instrument_id.venue)
329+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
335330
subscribed_instruments = self._instrument_ids[dataset]
336331

337332
if instrument_id in subscribed_instruments:
@@ -420,7 +415,7 @@ async def _subscribe_imbalance(self, data_type: DataType) -> None:
420415
try:
421416
# TODO: Create `DatabentoTimeSeriesParams`
422417
instrument_id: InstrumentId = data_type.metadata["instrument_id"]
423-
dataset: Dataset = self._get_dataset_for_venue(instrument_id.venue)
418+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
424419
live_client = self._get_live_client(dataset)
425420
live_client.subscribe(
426421
schema=DatabentoSchema.IMBALANCE.value,
@@ -434,7 +429,7 @@ async def _subscribe_statistics(self, data_type: DataType) -> None:
434429
try:
435430
# TODO: Create `DatabentoTimeSeriesParams`
436431
instrument_id: InstrumentId = data_type.metadata["instrument_id"]
437-
dataset: Dataset = self._get_dataset_for_venue(instrument_id.venue)
432+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
438433
live_client = self._get_live_client(dataset)
439434
live_client.subscribe(
440435
schema=DatabentoSchema.STATISTICS.value,
@@ -450,7 +445,7 @@ async def _subscribe_instruments(self, command: SubscribeInstruments) -> None:
450445

451446
async def _subscribe_instrument(self, command: SubscribeInstrument) -> None:
452447
try:
453-
dataset: Dataset = self._get_dataset_for_venue(command.instrument_id.venue)
448+
dataset: Dataset = self._loader.get_dataset_for_venue(command.instrument_id.venue)
454449
start: int | None = command.params.get("start")
455450

456451
live_client = self._get_live_client(dataset)
@@ -510,7 +505,7 @@ async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> Non
510505
)
511506
return
512507

513-
dataset: Dataset = self._get_dataset_for_venue(command.instrument_id.venue)
508+
dataset: Dataset = self._loader.get_dataset_for_venue(command.instrument_id.venue)
514509

515510
if self._is_buffering_mbo_subscriptions:
516511
self._log.debug(
@@ -545,7 +540,7 @@ async def _subscribe_order_book_deltas_batch(
545540
if not instrument_ids:
546541
return # No subscribing instrument IDs were loaded in the cache
547542

548-
dataset: Dataset = self._get_dataset_for_venue(instrument_ids[0].venue)
543+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue)
549544
live_client = self._get_live_client_mbo(dataset)
550545

551546
if dataset == "GLBX.MDP3":
@@ -600,7 +595,7 @@ async def _subscribe_order_book_snapshots(self, command: SubscribeOrderBook) ->
600595
)
601596
return
602597

603-
dataset: Dataset = self._get_dataset_for_venue(command.instrument_id.venue)
598+
dataset: Dataset = self._loader.get_dataset_for_venue(command.instrument_id.venue)
604599
live_client = self._get_live_client(dataset)
605600
live_client.subscribe(
606601
schema=schema,
@@ -625,7 +620,7 @@ async def _subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None:
625620

626621
start: int | None = command.params.get("start")
627622

628-
dataset: Dataset = self._get_dataset_for_venue(command.instrument_id.venue)
623+
dataset: Dataset = self._loader.get_dataset_for_venue(command.instrument_id.venue)
629624
live_client = self._get_live_client(dataset)
630625
live_client.subscribe(
631626
schema=schema,
@@ -649,7 +644,7 @@ async def _subscribe_trade_ticks(self, command: SubscribeTradeTicks) -> None:
649644

650645
start: int | None = command.params.get("start")
651646

652-
dataset: Dataset = self._get_dataset_for_venue(command.instrument_id.venue)
647+
dataset: Dataset = self._loader.get_dataset_for_venue(command.instrument_id.venue)
653648
live_client = self._get_live_client(dataset)
654649
live_client.subscribe(
655650
schema=DatabentoSchema.TRADES.value,
@@ -662,7 +657,7 @@ async def _subscribe_trade_ticks(self, command: SubscribeTradeTicks) -> None:
662657

663658
async def _subscribe_bars(self, command: SubscribeBars) -> None:
664659
try:
665-
dataset: Dataset = self._get_dataset_for_venue(
660+
dataset: Dataset = self._loader.get_dataset_for_venue(
666661
command.bar_type.instrument_id.venue,
667662
)
668663

@@ -686,7 +681,7 @@ async def _subscribe_bars(self, command: SubscribeBars) -> None:
686681

687682
async def _subscribe_instrument_status(self, command: SubscribeInstrumentStatus) -> None:
688683
try:
689-
dataset: Dataset = self._get_dataset_for_venue(command.instrument_id.venue)
684+
dataset: Dataset = self._loader.get_dataset_for_venue(command.instrument_id.venue)
690685

691686
live_client = self._get_live_client(dataset)
692687
live_client.subscribe(
@@ -770,7 +765,7 @@ async def _request_instrument_status(
770765
start = data_type.metadata.get("start")
771766
end = data_type.metadata.get("end")
772767

773-
dataset: Dataset = self._get_dataset_for_venue(instrument_id.venue)
768+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
774769
_, available_end = await self._get_dataset_range(dataset)
775770

776771
start = start or available_end - pd.Timedelta(days=2)
@@ -803,7 +798,7 @@ async def _request_imbalance(self, data_type: DataType, correlation_id: UUID4) -
803798
start = data_type.metadata.get("start")
804799
end = data_type.metadata.get("end")
805800

806-
dataset: Dataset = self._get_dataset_for_venue(instrument_id.venue)
801+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
807802
_, available_end = await self._get_dataset_range(dataset)
808803

809804
start = start or available_end - pd.Timedelta(days=2)
@@ -834,7 +829,7 @@ async def _request_statistics(self, data_type: DataType, correlation_id: UUID4)
834829
start = data_type.metadata.get("start")
835830
end = data_type.metadata.get("end")
836831

837-
dataset: Dataset = self._get_dataset_for_venue(instrument_id.venue)
832+
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
838833
_, available_end = await self._get_dataset_range(dataset)
839834

840835
start = start or available_end - pd.Timedelta(days=2)
@@ -861,7 +856,7 @@ async def _request_statistics(self, data_type: DataType, correlation_id: UUID4)
861856
)
862857

863858
async def _request_instrument(self, request: RequestInstrument) -> None:
864-
dataset: Dataset = self._get_dataset_for_venue(request.instrument_id.venue)
859+
dataset: Dataset = self._loader.get_dataset_for_venue(request.instrument_id.venue)
865860
_, available_end = await self._get_dataset_range(dataset)
866861

867862
start = request.start or available_end - pd.Timedelta(days=2)
@@ -892,7 +887,7 @@ async def _request_instrument(self, request: RequestInstrument) -> None:
892887
self._handle_instrument(instruments[0], request.id, request.params)
893888

894889
async def _request_instruments(self, request: RequestInstruments) -> None:
895-
dataset: Dataset = self._get_dataset_for_venue(request.venue)
890+
dataset: Dataset = self._loader.get_dataset_for_venue(request.venue)
896891
_, available_end = await self._get_dataset_range(dataset)
897892

898893
start = request.start or available_end - pd.Timedelta(days=2)
@@ -925,7 +920,7 @@ async def _request_instruments(self, request: RequestInstruments) -> None:
925920
self._handle_instruments(instruments, request.venue, request.id, request.params)
926921

927922
async def _request_quote_ticks(self, request: RequestQuoteTicks) -> None:
928-
dataset: Dataset = self._get_dataset_for_venue(request.instrument_id.venue)
923+
dataset: Dataset = self._loader.get_dataset_for_venue(request.instrument_id.venue)
929924
_, available_end = await self._get_dataset_range(dataset)
930925

931926
start = request.start or available_end - pd.Timedelta(days=1)
@@ -963,7 +958,7 @@ async def _request_quote_ticks(self, request: RequestQuoteTicks) -> None:
963958
self._handle_quote_ticks(request.instrument_id, quotes, request.id, request.params)
964959

965960
async def _request_trade_ticks(self, request: RequestTradeTicks) -> None:
966-
dataset: Dataset = self._get_dataset_for_venue(request.instrument_id.venue)
961+
dataset: Dataset = self._loader.get_dataset_for_venue(request.instrument_id.venue)
967962
_, available_end = await self._get_dataset_range(dataset)
968963

969964
start = request.start or available_end - pd.Timedelta(days=1)
@@ -991,7 +986,7 @@ async def _request_trade_ticks(self, request: RequestTradeTicks) -> None:
991986
self._handle_trade_ticks(request.instrument_id, trades, request.id, request.params)
992987

993988
async def _request_bars(self, request: RequestBars) -> None:
994-
dataset: Dataset = self._get_dataset_for_venue(request.bar_type.instrument_id.venue)
989+
dataset: Dataset = self._loader.get_dataset_for_venue(request.bar_type.instrument_id.venue)
995990
_, available_end = await self._get_dataset_range(dataset)
996991

997992
start = request.start or available_end - pd.Timedelta(days=1)

nautilus_trader/adapters/databento/factories.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def create( # type: ignore
146146
gateway=config.http_gateway,
147147
)
148148

149-
loader = DatabentoDataLoader()
149+
loader = DatabentoDataLoader(config.venue_dataset_map)
150150
provider = get_cached_databento_instrument_provider(
151151
http_client=http_client,
152152
clock=clock,

nautilus_trader/adapters/databento/loaders.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,14 @@ class DatabentoDataLoader:
5555
5656
"""
5757

58-
def __init__(self) -> None:
58+
def __init__(
59+
self,
60+
venue_dataset_map: dict[str, str] | None = None,
61+
) -> None:
5962
self._pyo3_loader: nautilus_pyo3.DatabentoDataLoader = nautilus_pyo3.DatabentoDataLoader(
6063
str(PUBLISHERS_FILEPATH),
6164
)
65+
self._venue_dataset_map = venue_dataset_map
6266

6367
def load_publishers(self, path: PathLike[str] | str) -> None:
6468
"""
@@ -102,6 +106,9 @@ def get_dataset_for_venue(self, venue: Venue) -> str:
102106
If `venue` is not in the map of publishers.
103107
104108
"""
109+
if self._venue_dataset_map and (dataset := self._venue_dataset_map.get(venue.value)):
110+
return dataset
111+
105112
dataset = self._pyo3_loader.get_dataset_for_venue(nautilus_pyo3.Venue(venue.value))
106113

107114
if dataset is None:

0 commit comments

Comments
 (0)