Skip to content

Commit f903581

Browse files
authored
Optimize message bus topic-matching logic by 100× (#2634)
- Add is_matching regex based randomized testing - Add bulk benchmarking logic - Add iterative backtracking implementation - Use iterative matching for 1000x improvement
1 parent e36b75b commit f903581

File tree

5 files changed

+271
-4
lines changed

5 files changed

+271
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/common/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ sysinfo = "0.35.1"
7777
proptest = { workspace = true }
7878
tempfile = { workspace = true }
7979
criterion = {workspace = true}
80+
rand.workspace = true
81+
regex = "1.11.1"
8082

8183
[build-dependencies]
8284
cbindgen = { workspace = true, optional = true }
@@ -85,3 +87,8 @@ cbindgen = { workspace = true, optional = true }
8587
name = "cache_orders"
8688
path = "benches/cache/orders.rs"
8789
harness = false
90+
91+
[[bench]]
92+
name = "matching"
93+
path = "benches/matching.rs"
94+
harness = false

crates/common/benches/matching.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// -------------------------------------------------------------------------------------------------
2+
// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3+
// https://nautechsystems.io
4+
//
5+
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6+
// You may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
// -------------------------------------------------------------------------------------------------
15+
16+
use criterion::{Criterion, black_box, criterion_group, criterion_main};
17+
use nautilus_common::msgbus::{is_matching, is_matching_backtracking};
18+
use rand::{Rng, SeedableRng, rngs::StdRng};
19+
use regex::Regex;
20+
use ustr::Ustr;
21+
22+
fn create_topics(n: usize, rng: &mut StdRng) -> Vec<Ustr> {
23+
let cat = ["data", "info", "order"];
24+
let model = ["quotes", "trades", "orderbooks", "depths"];
25+
let venue = ["BINANCE", "BYBIT", "OKX", "FTX", "KRAKEN"];
26+
let instrument = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"];
27+
28+
let mut topics = Vec::new();
29+
for _ in 0..n {
30+
let cat = cat[rng.random_range(0..cat.len())];
31+
let model = model[rng.random_range(0..model.len())];
32+
let venue = venue[rng.random_range(0..venue.len())];
33+
let instrument = instrument[rng.random_range(0..instrument.len())];
34+
topics.push(Ustr::from(&format!(
35+
"{}.{}.{}.{}",
36+
cat, model, venue, instrument
37+
)));
38+
}
39+
topics
40+
}
41+
42+
fn bench_matching(c: &mut Criterion) {
43+
let pattern = "data.*.BINANCE.ETH???";
44+
let pattern_ustr = Ustr::from(pattern);
45+
46+
{
47+
let mut rng = StdRng::seed_from_u64(42);
48+
let mut custom_group = c.benchmark_group("Custom matching");
49+
50+
for ele in [1, 10, 100, 1000] {
51+
let topics = create_topics(ele, &mut rng);
52+
53+
custom_group.bench_function(format!("{} topics", ele), |b| {
54+
b.iter(|| {
55+
for topic in topics.iter() {
56+
black_box(is_matching(&pattern_ustr, topic));
57+
}
58+
});
59+
});
60+
}
61+
62+
custom_group.finish();
63+
}
64+
65+
{
66+
let mut rng = StdRng::seed_from_u64(42);
67+
let mut regex_group = c.benchmark_group("Regex matching");
68+
69+
for ele in [1, 10, 100, 1000] {
70+
let topics = create_topics(ele, &mut rng);
71+
72+
regex_group.bench_function(format!("{} topics", ele), |b| {
73+
b.iter(|| {
74+
let regex = Regex::new(pattern).unwrap();
75+
for topic in topics.iter() {
76+
black_box(regex.is_match(topic));
77+
}
78+
});
79+
});
80+
}
81+
82+
regex_group.finish();
83+
}
84+
85+
{
86+
let mut rng = StdRng::seed_from_u64(42);
87+
let mut iter_group = c.benchmark_group("Iterative backtracking matching");
88+
89+
for ele in [1, 10, 100, 1000] {
90+
let topics = create_topics(ele, &mut rng);
91+
92+
iter_group.bench_function(format!("{} topics", ele), |b| {
93+
b.iter(|| {
94+
for topic in topics.iter() {
95+
black_box(is_matching_backtracking(&pattern_ustr, topic));
96+
}
97+
});
98+
});
99+
}
100+
101+
iter_group.finish();
102+
}
103+
}
104+
105+
criterion_group!(benches, bench_matching);
106+
criterion_main!(benches);

crates/common/src/msgbus/mod.rs

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ pub fn subscribe<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler, prio
182182
// Find existing patterns which match this topic
183183
let mut matches = Vec::new();
184184
for (pattern, subs) in msgbus_ref_mut.patterns.iter_mut() {
185-
if is_matching(&Ustr::from(topic.as_ref()), pattern) {
185+
if is_matching_backtracking(&Ustr::from(topic.as_ref()), pattern) {
186186
subs.push(sub.clone());
187187
subs.sort();
188188
// subs.sort_by(|a, b| a.priority.cmp(&b.priority).then_with(|| a.cmp(b)));
@@ -465,7 +465,7 @@ impl MessageBus {
465465

466466
// Collect matching subscriptions from direct subscriptions
467467
matching_subs.extend(self.subscriptions.iter().filter_map(|(sub, _)| {
468-
if is_matching(&sub.topic, pattern) {
468+
if is_matching_backtracking(&sub.topic, pattern) {
469469
Some(sub.clone())
470470
} else {
471471
None
@@ -511,7 +511,7 @@ impl MessageBus {
511511
pattern: &'a Ustr,
512512
) -> impl Iterator<Item = &'a ShareableMessageHandler> {
513513
self.subscriptions.iter().filter_map(move |(sub, _)| {
514-
if is_matching(&sub.topic, pattern) {
514+
if is_matching_backtracking(&sub.topic, pattern) {
515515
Some(&sub.handler)
516516
} else {
517517
None
@@ -593,6 +593,61 @@ pub fn is_matching(topic: &Ustr, pattern: &Ustr) -> bool {
593593
table[n][m]
594594
}
595595

596+
/// Match a topic and a string pattern using iterative backtracking algorithm
597+
/// pattern can contains -
598+
/// '*' - match 0 or more characters after this
599+
/// '?' - match any character once
600+
/// 'a-z' - match the specific character
601+
#[must_use]
602+
pub fn is_matching_backtracking(topic: &Ustr, pattern: &Ustr) -> bool {
603+
let topic_bytes = topic.as_bytes();
604+
let pattern_bytes = pattern.as_bytes();
605+
606+
// Stack to store states for backtracking (topic_idx, pattern_idx)
607+
let mut stack = vec![(0, 0)];
608+
609+
while let Some((mut i, mut j)) = stack.pop() {
610+
loop {
611+
// Found a match if we've consumed both strings
612+
if i == topic.len() && j == pattern.len() {
613+
return true;
614+
}
615+
616+
// If we've reached the end of the pattern, break to try other paths
617+
if j == pattern.len() {
618+
break;
619+
}
620+
621+
// Handle '*' wildcard
622+
if pattern_bytes[j] == b'*' {
623+
// Try skipping '*' entirely first
624+
stack.push((i, j + 1));
625+
626+
// Continue with matching current character and keeping '*'
627+
if i < topic.len() {
628+
i += 1;
629+
continue;
630+
}
631+
break;
632+
}
633+
// Handle '?' or exact character match
634+
else if i < topic.len()
635+
&& (pattern_bytes[j] == b'?' || topic_bytes[i] == pattern_bytes[j])
636+
{
637+
// Continue matching linearly without stack operations
638+
i += 1;
639+
j += 1;
640+
continue;
641+
}
642+
643+
// No match found in current path
644+
break;
645+
}
646+
}
647+
648+
false
649+
}
650+
596651
impl Default for MessageBus {
597652
/// Creates a new default [`MessageBus`] instance.
598653
fn default() -> Self {

crates/common/src/msgbus/tests.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515

1616
use nautilus_core::UUID4;
1717
use nautilus_model::identifiers::TraderId;
18+
use rand::{Rng, SeedableRng, rngs::StdRng};
19+
use regex::Regex;
1820
use rstest::rstest;
1921
use ustr::Ustr;
2022

2123
use crate::msgbus::{
22-
self, MessageBus, get_message_bus, is_matching,
24+
self, MessageBus, get_message_bus, is_matching, is_matching_backtracking,
2325
stubs::{
2426
check_handler_was_called, get_call_check_shareable_handler, get_stub_shareable_handler,
2527
},
@@ -217,4 +219,100 @@ fn test_is_matching(#[case] topic: &str, #[case] pattern: &str, #[case] expected
217219
is_matching(&Ustr::from(topic), &Ustr::from(pattern)),
218220
expected
219221
);
222+
assert_eq!(
223+
is_matching_backtracking(&Ustr::from(topic), &Ustr::from(pattern)),
224+
expected
225+
);
226+
}
227+
228+
fn convert_pattern_to_regex(pattern: &str) -> String {
229+
let mut regex = String::new();
230+
regex.push('^');
231+
232+
for c in pattern.chars() {
233+
match c {
234+
'.' => regex.push_str("\\."),
235+
'*' => regex.push_str(".*"),
236+
'?' => regex.push_str("."),
237+
_ => regex.push(c),
238+
}
239+
}
240+
241+
regex.push('$');
242+
regex
243+
}
244+
245+
#[rstest]
246+
#[case("a??.quo*es.?I?AN*ET?US*T", "^a..\\.quo.*es\\..I.AN.*ET.US.*T$")]
247+
#[case("da?*.?u*?s??*NC**ETH?", "^da..*\\..u.*.s...*NC.*.*ETH.$")]
248+
fn test_convert_pattern_to_regex(#[case] pat: &str, #[case] regex: &str) {
249+
assert_eq!(convert_pattern_to_regex(pat), regex);
250+
}
251+
252+
fn generate_pattern_from_topic(topic: &str, rng: &mut StdRng) -> String {
253+
let mut pattern = String::new();
254+
255+
for c in topic.chars() {
256+
let val: f64 = rng.random();
257+
// 10% chance of wildcard
258+
if val < 0.1 {
259+
pattern.push('*');
260+
}
261+
// 20% chance of question mark
262+
else if val < 0.3 {
263+
pattern.push('?');
264+
}
265+
// 20% chance of skipping
266+
else if val < 0.5 {
267+
continue;
268+
}
269+
// 50% chance of keeping the character
270+
else {
271+
pattern.push(c);
272+
};
273+
}
274+
275+
pattern
276+
}
277+
278+
#[rstest]
279+
fn test_matching_with_regex() {
280+
let topic = "data.quotes.BINANCE.ETHUSDT";
281+
let mut rng = StdRng::seed_from_u64(42);
282+
283+
for i in 0..1000 {
284+
let pattern = generate_pattern_from_topic(topic, &mut rng);
285+
let regex_pattern = convert_pattern_to_regex(&pattern);
286+
let regex = Regex::new(&regex_pattern).unwrap();
287+
assert_eq!(
288+
is_matching(&Ustr::from(topic), &Ustr::from(&pattern)),
289+
regex.is_match(topic),
290+
"Failed to match on iteration: {}, pattern: \"{}\", topic: {}, regex: \"{}\"",
291+
i,
292+
pattern,
293+
topic,
294+
regex_pattern
295+
);
296+
}
297+
}
298+
299+
#[rstest]
300+
fn test_matching_backtracking() {
301+
let topic = "data.quotes.BINANCE.ETHUSDT";
302+
let mut rng = StdRng::seed_from_u64(42);
303+
304+
for i in 0..1000 {
305+
let pattern = generate_pattern_from_topic(topic, &mut rng);
306+
let regex_pattern = convert_pattern_to_regex(&pattern);
307+
let regex = Regex::new(&regex_pattern).unwrap();
308+
assert_eq!(
309+
is_matching_backtracking(&Ustr::from(topic), &Ustr::from(&pattern)),
310+
regex.is_match(topic),
311+
"Failed to match on iteration: {}, pattern: \"{}\", topic: {}, regex: \"{}\"",
312+
i,
313+
pattern,
314+
topic,
315+
regex_pattern
316+
);
317+
}
220318
}

0 commit comments

Comments
 (0)