Skip to content

Commit d521cce

Browse files
authored
Refine Rust catalog file operations (#2454)
1 parent a61e59e commit d521cce

File tree

17 files changed

+619
-160
lines changed

17 files changed

+619
-160
lines changed

Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/adapters/tardis/src/replay.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, pat
416416
};
417417

418418
let filepath = path.join(parquet_filepath_bars(bar_type, date));
419-
match write_batch_to_parquet(batch, &filepath, None, None) {
419+
match write_batch_to_parquet(batch, &filepath, None, None, None) {
420420
Ok(()) => tracing::info!("File written: {filepath:?}"),
421421
Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
422422
}
@@ -449,7 +449,7 @@ fn write_batch(
449449
path: &Path,
450450
) {
451451
let filepath = path.join(parquet_filepath(typename, instrument_id, date));
452-
match write_batch_to_parquet(batch, &filepath, None, None) {
452+
match write_batch_to_parquet(batch, &filepath, None, None, None) {
453453
Ok(()) => tracing::info!("File written: {filepath:?}"),
454454
Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
455455
}

crates/persistence/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ serde_json = { workspace = true }
4646
serde = { workspace = true }
4747
tokio = { workspace = true }
4848
anyhow = { workspace = true }
49-
thiserror = { workspace = true }
5049
parquet = { workspace = true }
5150
binary-heap-plus = "0.5.0"
5251
compare = "0.1.0"

crates/persistence/src/backend/catalog.rs

Lines changed: 81 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
use std::path::PathBuf;
1717

18-
use anyhow::Result;
18+
use anyhow::{Result, anyhow};
1919
use datafusion::arrow::record_batch::RecordBatch;
2020
use heck::ToSnakeCase;
2121
use itertools::Itertools;
@@ -26,10 +26,8 @@ use nautilus_model::data::{
2626
};
2727
use nautilus_serialization::{
2828
arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
29-
parquet::{
30-
ParquetWriteMode, combine_data_files, min_max_from_parquet_metadata,
31-
write_batches_to_parquet,
32-
},
29+
enums::ParquetWriteMode,
30+
parquet::{combine_data_files, min_max_from_parquet_metadata, write_batches_to_parquet},
3331
};
3432
use serde::Serialize;
3533

@@ -87,38 +85,34 @@ impl ParquetDataCatalog {
8785
let _ = self.write_to_parquet(bar, None, None, None, write_mode);
8886
}
8987

90-
#[must_use]
9188
pub fn write_to_parquet<T>(
9289
&self,
9390
data: Vec<T>,
9491
path: Option<PathBuf>,
9592
compression: Option<parquet::basic::Compression>,
9693
max_row_group_size: Option<usize>,
9794
write_mode: Option<ParquetWriteMode>,
98-
) -> PathBuf
95+
) -> Result<PathBuf>
9996
where
10097
T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
10198
{
10299
let type_name = std::any::type_name::<T>().to_snake_case();
103100
Self::check_ascending_timestamps(&data, &type_name);
104-
105-
let batches = self.data_to_record_batches(data);
106-
let batch = batches.first().expect("Expected at least one batch");
107-
let schema = batch.schema();
108-
let instrument_id = schema.metadata.get("instrument_id");
109-
let path =
110-
path.unwrap_or_else(|| self.make_path(T::path_prefix(), instrument_id, write_mode));
101+
let batches = self.data_to_record_batches(data)?;
102+
let schema = batches.first().expect("Batches are empty.").schema();
103+
let instrument_id = schema.metadata.get("instrument_id").cloned();
104+
let new_path = self.make_path(T::path_prefix(), instrument_id, write_mode)?;
105+
let path = path.unwrap_or(new_path);
111106

112107
// Write all batches to parquet file
113108
info!(
114109
"Writing {} batches of {type_name} data to {path:?}",
115110
batches.len()
116111
);
117112

118-
write_batches_to_parquet(&batches, &path, compression, max_row_group_size, write_mode)
119-
.unwrap_or_else(|_| panic!("Failed to write {type_name} to parquet"));
113+
write_batches_to_parquet(&batches, &path, compression, max_row_group_size, write_mode)?;
120114

121-
path
115+
Ok(path)
122116
}
123117

124118
fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
@@ -128,71 +122,77 @@ impl ParquetDataCatalog {
128122
);
129123
}
130124

131-
#[must_use]
132-
pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> Vec<RecordBatch>
125+
pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> Result<Vec<RecordBatch>>
133126
where
134127
T: GetTsInit + EncodeToRecordBatch,
135128
{
136-
data.into_iter()
137-
.chunks(self.batch_size)
138-
.into_iter()
139-
.map(|chunk| {
140-
// Take first element and extract metadata
141-
// SAFETY: Unwrap safe as already checked that `data` not empty
142-
let data = chunk.collect_vec();
143-
let metadata = EncodeToRecordBatch::chunk_metadata(&data);
144-
T::encode_batch(&metadata, &data).expect("Expected to encode batch")
145-
})
146-
.collect()
129+
let mut batches = Vec::new();
130+
131+
for chunk in &data.into_iter().chunks(self.batch_size) {
132+
let data = chunk.collect_vec();
133+
let metadata = EncodeToRecordBatch::chunk_metadata(&data);
134+
let record_batch = T::encode_batch(&metadata, &data)?;
135+
batches.push(record_batch);
136+
}
137+
138+
Ok(batches)
147139
}
148140

149141
fn make_path(
150142
&self,
151143
type_name: &str,
152-
instrument_id: Option<&String>,
144+
instrument_id: Option<String>,
153145
write_mode: Option<ParquetWriteMode>,
154-
) -> PathBuf {
146+
) -> Result<PathBuf> {
147+
let path = self.make_directory_path(type_name, instrument_id);
148+
std::fs::create_dir_all(&path)?;
155149
let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
156-
let mut path = self.base_path.join("data").join(type_name);
150+
let mut file_path = path.join("data-0.parquet");
151+
let mut empty_path = file_path.clone();
152+
let mut i = 0;
157153

158-
if let Some(id) = instrument_id {
159-
path = path.join(id);
154+
while empty_path.exists() {
155+
i += 1;
156+
let name = format!("data-{i}.parquet");
157+
empty_path = path.join(name);
160158
}
161159

162-
std::fs::create_dir_all(&path).expect("Failed to create directory");
163-
let mut file_path = path.join("data-0.parquet");
160+
if i > 1 && used_write_mode != ParquetWriteMode::NewFile {
161+
return Err(anyhow!(
162+
"Only ParquetWriteMode::NewFile is allowed for a directory containing several parquet files."
163+
));
164+
} else if used_write_mode == ParquetWriteMode::NewFile {
165+
file_path = empty_path;
166+
}
167+
168+
info!("Created directory path: {file_path:?}");
164169

165-
if used_write_mode == ParquetWriteMode::NewFile {
166-
let mut i = 0;
170+
Ok(file_path)
171+
}
167172

168-
while file_path.exists() {
169-
i += 1;
170-
let name = format!("data-{i}.parquet");
171-
file_path = path.join(name);
172-
}
173+
fn make_directory_path(&self, type_name: &str, instrument_id: Option<String>) -> PathBuf {
174+
let mut path = self.base_path.join("data").join(type_name);
175+
176+
if let Some(id) = instrument_id {
177+
path = path.join(id.replace('/', "")); // for FX symbols like EUR/USD
173178
}
174179

175-
info!("Created directory path: {file_path:?}");
176-
file_path
180+
path
177181
}
178182

179-
#[must_use]
180183
pub fn write_to_json<T>(
181184
&self,
182185
data: Vec<T>,
183186
path: Option<PathBuf>,
184187
write_metadata: bool,
185-
) -> PathBuf
188+
) -> Result<PathBuf>
186189
where
187190
T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
188191
{
189192
let type_name = std::any::type_name::<T>().to_snake_case();
190193
Self::check_ascending_timestamps(&data, &type_name);
191-
192-
let json_path = path.unwrap_or_else(|| {
193-
let path = self.make_path(T::path_prefix(), None, None);
194-
path.with_extension("json")
195-
});
194+
let new_path = self.make_path(T::path_prefix(), None, None)?;
195+
let json_path = path.unwrap_or(new_path.with_extension("json"));
196196

197197
info!(
198198
"Writing {} records of {type_name} data to {json_path:?}",
@@ -203,22 +203,17 @@ impl ParquetDataCatalog {
203203
let metadata = T::chunk_metadata(&data);
204204
let metadata_path = json_path.with_extension("metadata.json");
205205
info!("Writing metadata to {metadata_path:?}");
206-
let metadata_file = std::fs::File::create(&metadata_path)
207-
.unwrap_or_else(|_| panic!("Failed to create metadata file at {metadata_path:?}"));
208-
serde_json::to_writer_pretty(metadata_file, &metadata)
209-
.unwrap_or_else(|_| panic!("Failed to write metadata to JSON"));
206+
let metadata_file = std::fs::File::create(&metadata_path)?;
207+
serde_json::to_writer_pretty(metadata_file, &metadata)?;
210208
}
211209

212-
let file = std::fs::File::create(&json_path)
213-
.unwrap_or_else(|_| panic!("Failed to create JSON file at {json_path:?}"));
214-
215-
serde_json::to_writer_pretty(file, &serde_json::to_value(data).unwrap())
216-
.unwrap_or_else(|_| panic!("Failed to write {type_name} to JSON"));
210+
let file = std::fs::File::create(&json_path)?;
211+
serde_json::to_writer_pretty(file, &serde_json::to_value(data)?)?;
217212

218-
json_path
213+
Ok(json_path)
219214
}
220215

221-
pub fn consolidate_data(&self, type_name: &str, instrument_id: Option<&String>) -> Result<()> {
216+
pub fn consolidate_data(&self, type_name: &str, instrument_id: Option<String>) -> Result<()> {
222217
let parquet_files = self.query_parquet_files(type_name, instrument_id)?;
223218

224219
if !parquet_files.is_empty() {
@@ -273,7 +268,6 @@ impl ParquetDataCatalog {
273268
let entry = entry.unwrap();
274269
entry.path().is_dir()
275270
});
276-
277271
let has_files = std::fs::read_dir(&directory)?.any(|entry| {
278272
let entry = entry.unwrap();
279273
entry.path().is_file()
@@ -294,42 +288,47 @@ impl ParquetDataCatalog {
294288
start: Option<UnixNanos>,
295289
end: Option<UnixNanos>,
296290
where_clause: Option<&str>,
297-
) -> datafusion::error::Result<QueryResult>
291+
) -> Result<QueryResult>
298292
where
299293
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
300294
{
301-
let path_str = path.to_str().unwrap();
302-
let table_name = path.file_stem().unwrap().to_str().unwrap();
295+
let path_str = path.to_str().expect("Failed to convert path to string");
296+
let table_name = path
297+
.file_stem()
298+
.unwrap()
299+
.to_str()
300+
.expect("Failed to convert path to string");
303301
let query = build_query(table_name, start, end, where_clause);
304302
self.session
305303
.add_file::<T>(table_name, path_str, Some(&query))?;
304+
306305
Ok(self.session.get_query_result())
307306
}
308307

309308
/// Query data loaded in the catalog
310309
pub fn query_directory<T>(
311310
&mut self,
312-
// use instrument_ids or bar_types to query specific subset of the data
313311
instrument_ids: Vec<String>,
314312
start: Option<UnixNanos>,
315313
end: Option<UnixNanos>,
316314
where_clause: Option<&str>,
317-
) -> datafusion::error::Result<QueryResult>
315+
) -> Result<QueryResult>
318316
where
319317
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
320318
{
321319
let mut paths = Vec::new();
322-
for instrument_id in &instrument_ids {
323-
paths.push(self.make_path(T::path_prefix(), Some(instrument_id), None));
320+
321+
for instrument_id in instrument_ids {
322+
paths.extend(self.query_parquet_files(T::path_prefix(), Some(instrument_id))?);
324323
}
325324

326325
// If no specific instrument_id is selected query all files for the data type
327326
if paths.is_empty() {
328-
paths.push(self.make_path(T::path_prefix(), None, None));
327+
paths.push(self.make_path(T::path_prefix(), None, None)?);
329328
}
330329

331330
for path in &paths {
332-
let path = path.to_str().unwrap();
331+
let path = path.to_str().expect("Failed to convert path to string");
333332
let query = build_query(path, start, end, where_clause);
334333
self.session.add_file::<T>(path, path, Some(&query))?;
335334
}
@@ -338,12 +337,13 @@ impl ParquetDataCatalog {
338337
}
339338

340339
#[allow(dead_code)]
341-
fn query_timestamp_bound(
340+
pub fn query_timestamp_bound(
342341
&self,
343342
data_cls: &str,
344-
instrument_id: Option<&String>,
345-
is_last: bool,
343+
instrument_id: Option<String>,
344+
is_last: Option<bool>,
346345
) -> Result<Option<i64>> {
346+
let is_last = is_last.unwrap_or(true);
347347
let parquet_files = self.query_parquet_files(data_cls, instrument_id)?;
348348

349349
if parquet_files.is_empty() {
@@ -354,8 +354,8 @@ impl ParquetDataCatalog {
354354
.iter()
355355
.map(|file| min_max_from_parquet_metadata(file, "ts_init"))
356356
.collect::<Result<Vec<_>, _>>()?;
357-
358357
let mut timestamps: Vec<i64> = Vec::new();
358+
359359
for min_max in min_max_per_file {
360360
let (min, max) = min_max;
361361

@@ -377,17 +377,12 @@ impl ParquetDataCatalog {
377377
}
378378
}
379379

380-
fn query_parquet_files(
380+
pub fn query_parquet_files(
381381
&self,
382382
type_name: &str,
383-
instrument_id: Option<&String>,
383+
instrument_id: Option<String>,
384384
) -> Result<Vec<PathBuf>> {
385-
let mut path = self.base_path.join("data").join(type_name);
386-
387-
if let Some(id) = instrument_id {
388-
path = path.join(id);
389-
}
390-
385+
let path = self.make_directory_path(type_name, instrument_id);
391386
let mut files = Vec::new();
392387

393388
if path.exists() {

0 commit comments

Comments
 (0)