Skip to content

Commit 6aed989

Browse files
authored
Merge pull request #21 from rabbitmq/perf-producer-names
Support message de-duplication in stream-perf-test
2 parents 0eb762d + 18f1281 commit 6aed989

File tree

7 files changed

+117
-27
lines changed

7 files changed

+117
-27
lines changed

src/docs/asciidoc/api.adoc

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,8 @@ The following table sums up the main settings to create a `Producer`:
346346

347347
|`name`
348348
|The logical name of the producer. Specify a name to enable
349-
<<outbound-message-de-deduplication,message de-deduplication>>.
350-
|`null` (no de-duplication)
349+
<<outbound-message-deduplication,message deduplication>>.
350+
|`null` (no deduplication)
351351

352352
|`batchSize`
353353
|The maximum number of messages to accumulate before sending them to the broker.
@@ -357,7 +357,7 @@ The following table sums up the main settings to create a `Producer`:
357357
|[[producer-sub-entry-size-configuration-entry]]The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing
358358
frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries
359359
as well. Use this feature to increase throughput at the cost of increased latency and
360-
potential duplicated messages even when de-duplication is enabled.
360+
potential duplicated messages even when deduplication is enabled.
361361
|1 (meaning no use of sub-entry batching)
362362

363363
|`maxUnconfirmedMessages`
@@ -475,8 +475,8 @@ type system. It provides good interoperability, which allows streams
475475
to be accessed as AMQP 0-9-1 queues, without data loss.
476476
====
477477

478-
[[outbound-message-de-deduplication]]
479-
===== Message De-deduplication
478+
[[outbound-message-deduplication]]
479+
===== Message Deduplication
480480

481481
RabbitMQ Stream provides publisher confirms to avoid losing messages: once
482482
the broker has persisted a message it sends a confirmation for this message.
@@ -490,9 +490,9 @@ Luckily RabbitMQ Stream can detect and filter out duplicated messages, based
490490
on 2 client-side elements: the _producer name_ and the _message publishing ID_.
491491

492492
[WARNING]
493-
.De-duplication is not guaranteed when using sub-entries batching
493+
.Deduplication is not guaranteed when using sub-entries batching
494494
====
495-
It is not possible to guarantee de-duplication when
495+
It is not possible to guarantee deduplication when
496496
<<producer-sub-entry-size-configuration-entry, sub-entry batching>> is in use.
497497
Sub-entry batching is disabled by default and it does not prevent from
498498
batching messages in a single publish frame, which can already provide
@@ -502,9 +502,9 @@ very high throughput.
502502
====== Setting the Name of a Producer
503503

504504
The producer name is set when creating the producer instance, which automatically
505-
enables de-duplication:
505+
enables deduplication:
506506

507-
.Naming a producer to enable message de-duplication
507+
.Naming a producer to enable message deduplication
508508
[source,java,indent=0]
509509
--------
510510
include::{test-examples}/ProducerUsage.java[tag=producer-with-name]
@@ -518,13 +518,13 @@ will automatically recover and retry outstanding messages. The broker will then
518518
filter out messages it has already received and persisted. No more duplicates!
519519

520520
[IMPORTANT]
521-
.Why setting `confirmTimeout` to 0 when using de-duplication?
521+
.Why setting `confirmTimeout` to 0 when using deduplication?
522522
====
523-
The point of de-duplication is to avoid duplicates when retrying unconfirmed messages.
523+
The point of deduplication is to avoid duplicates when retrying unconfirmed messages.
524524
But why retrying in the first place? To avoid _losing_ messages, that is enforcing
525525
_at-least-once_ semantics. If the client does not stubbornly retry messages and gives
526526
up at some point, messages can be lost, which maps to _at-most-once_ semantics. This
527-
is why the de-duplication examples set the
527+
is why the deduplication examples set the
528528
<<producer-confirm-timeout-configuration-entry,`confirmTimeout` setting>> to `Duration.ZERO`:
529529
to disable the background task that calls the confirmation callback for outstanding
530530
messages that time out. This way the client will do its best to retry messages
@@ -539,11 +539,11 @@ stream at the same time.
539539

540540
====== Understanding Publishing ID
541541

542-
The producer name is only one part of the de-duplication mechanism, the other part
542+
The producer name is only one part of the deduplication mechanism, the other part
543543
is the _message publishing ID_. If the producer has a name, the client automatically
544544
assigns a publishing ID to each outbound message for the producer. The publishing ID
545545
is a strictly increasing sequence, starting at 0 and incremented for each message. The default
546-
publishing sequence is good enough for de-duplication, but it is possible to
546+
publishing sequence is good enough for deduplication, but it is possible to
547547
assign a publishing ID to each message:
548548

549549
.Using an explicit publishing ID
@@ -570,7 +570,7 @@ properties (e.g. `messageId`).
570570
[IMPORTANT]
571571
.Do not mix client-assigned and custom publishing ID
572572
====
573-
As soon as a producer name is set, message de-duplication is enabled.
573+
As soon as a producer name is set, message deduplication is enabled.
574574
It is then possible to let the producer assign a publishing ID to each
575575
message or assign custom publishing IDs. *Do one or the other, not both!*
576576
====

src/docs/asciidoc/overview.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ you need to use an AMQP 0-9-1 client library.
4949
RabbitMQ stream provides at-least-once guarantees thanks to the
5050
publisher confirm mechanism, which is supported by the stream Java client.
5151

52-
Message <<api.adoc#outbound-message-de-deduplication,de-duplication>>
52+
Message <<api.adoc#outbound-message-deduplication,deduplication>>
5353
is also supported on the publisher side.
5454

5555
[[stream-client-overview]]
@@ -63,7 +63,7 @@ to build fast, efficient, and robust client applications.
6363
* _administrate streams (creation/deletion) directly from applications._ This
6464
can also be useful for development and testing.
6565
* _adapt publishing throughput_ thanks to the configurable batch size and flow control.
66-
* _avoid publishing duplicate messages_ thanks to message de-duplication.
66+
* _avoid publishing duplicate messages_ thanks to message deduplication.
6767
* _consume asynchronously from streams and resume where left off_ thanks to
6868
automatic or manual offset tracking.
6969
* _enforce https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/[best practices] to create client connections_ – to stream leaders for publishers to minimize inter-node traffic and to stream replicas for consumers to offload leaders.

src/docs/asciidoc/performance-tool.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ The following command shows how to store the offset every 100,000 messages:
404404
java -jar stream-perf-test.jar --store-every 100000
405405
----
406406

407+
[[consumer-names]]
407408
===== Consumer Names
408409

409410
When using `--store-every` (see above) for <<api.adoc#consumer-offset-tracking, offset tracking>>,
@@ -448,6 +449,21 @@ force the offset they start consuming from. With consumer names that do not chan
448449
tracking consumers would ignore the specified offset and would start where they left off
449450
(this is the purpose of offset tracking).
450451

452+
===== Producer Names
453+
You can use the `--producer-names` option to set the producer names pattern and therefore
454+
enable <<api.adoc#outbound-message-deduplication, message deduplication>> (using the default
455+
publishing sequence starting at 0 and incremented for each message).
456+
The same naming options apply as above in <<api.adoc#consumer-names, consumer names>> with the only
457+
difference that the default pattern is empty (i.e. no deduplication).
458+
459+
Here is an example of the usage of the `--producer-names` option:
460+
461+
----
462+
java -jar stream-perf-test.jar --producer-names %s-%d
463+
----
464+
465+
The run will start one producer and will use the `stream-1` producer reference (default stream is `stream` and the number of the producer is 1.)
466+
451467
=== Building the Performance Tool
452468

453469
To build the uber JAR:

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration;
3131
import com.rabbitmq.stream.OffsetSpecification;
3232
import com.rabbitmq.stream.Producer;
33+
import com.rabbitmq.stream.ProducerBuilder;
3334
import com.rabbitmq.stream.StreamCreator;
3435
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
3536
import com.rabbitmq.stream.StreamException;
@@ -258,6 +259,16 @@ public class StreamPerfTest implements Callable<Integer> {
258259
converter = Utils.OneTo255RangeIntegerTypeConverter.class)
259260
private int producersByConnection;
260261

262+
@CommandLine.Option(
263+
names = {"--producer-names", "-pn"},
264+
description =
265+
"naming strategy for producer names. Valid values are 'uuid' or a pattern with "
266+
+ "stream name and producer index as arguments. "
267+
+ "If set, a publishing ID is automatically assigned to each outbound message.",
268+
defaultValue = "",
269+
converter = Utils.NameStrategyConverter.class)
270+
private BiFunction<String, Integer, String> producerNameStrategy;
271+
261272
@CommandLine.Option(
262273
names = {"--tracking-consumers-by-connection", "-ccbc"},
263274
description = "number of tracking consumers by connection. Value must be between 1 and 255.",
@@ -284,7 +295,7 @@ public class StreamPerfTest implements Callable<Integer> {
284295
"naming strategy for consumer names. Valid values are 'uuid' or a pattern with "
285296
+ "stream name and consumer index as arguments.",
286297
defaultValue = "%s-%d",
287-
converter = Utils.ConsumerNameStrategyConverter.class)
298+
converter = Utils.NameStrategyConverter.class)
288299
private BiFunction<String, Integer, String> consumerNameStrategy;
289300

290301
@CommandLine.Option(
@@ -597,10 +608,16 @@ public Integer call() throws Exception {
597608
}
598609

599610
String stream = stream();
611+
ProducerBuilder producerBuilder = environment.producerBuilder();
612+
613+
String producerName = this.producerNameStrategy.apply(stream, i + 1);
614+
if (producerName != "") {
615+
producerBuilder =
616+
producerBuilder.name(producerName).confirmTimeout(Duration.ZERO);
617+
}
600618

601619
Producer producer =
602-
environment
603-
.producerBuilder()
620+
producerBuilder
604621
.subEntrySize(this.subEntrySize)
605622
.batchSize(this.batchSize)
606623
.compression(

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,15 @@ public ByteCapacity convert(String value) {
151151
}
152152
}
153153

154-
static class ConsumerNameStrategyConverter
154+
static class NameStrategyConverter
155155
implements CommandLine.ITypeConverter<BiFunction<String, Integer, String>> {
156156

157157
@Override
158158
public BiFunction<String, Integer, String> convert(String input) {
159159
if ("uuid".equals(input)) {
160160
return (stream, index) -> UUID.randomUUID().toString();
161161
} else {
162-
return new PatternConsumerNameStrategy(input);
162+
return new PatternNameStrategy(input);
163163
}
164164
}
165165
}
@@ -429,11 +429,11 @@ public X509Certificate[] getAcceptedIssuers() {
429429
}
430430
}
431431

432-
static final class PatternConsumerNameStrategy implements BiFunction<String, Integer, String> {
432+
static final class PatternNameStrategy implements BiFunction<String, Integer, String> {
433433

434434
private final String pattern;
435435

436-
PatternConsumerNameStrategy(String pattern) {
436+
PatternNameStrategy(String pattern) {
437437
this.pattern = pattern;
438438
}
439439

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,38 @@ void offsetShouldBeStoredWhenOptionIsEnabled() throws Exception {
187187
void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception {
188188
Future<?> run = run(builder());
189189
waitUntilStreamExists(s);
190-
String consumerName = s + "-0"; // convention
190+
String consumerName = s + "-0"; // default value when offset tracking is enabled
191191
assertThat(client.queryOffset(consumerName, s)).isZero();
192192
waitOneSecond();
193193
assertThat(client.queryOffset(consumerName, s)).isZero();
194194
run.cancel(true);
195195
waitRunEnds();
196196
}
197197

198+
@Test
199+
void publishingSequenceShouldBeStoredWhenProducerNamesAreSet() throws Exception {
200+
Future<?> run = run(builder().producerNames("producer-%2$d-on-stream-%1$s"));
201+
waitUntilStreamExists(s);
202+
String producerName = "producer-1-on-stream-" + s;
203+
long seq = client.queryPublisherSequence(producerName, s);
204+
waitOneSecond();
205+
waitAtMost(() -> client.queryPublisherSequence(producerName, s) > seq);
206+
run.cancel(true);
207+
waitRunEnds();
208+
}
209+
210+
@Test
211+
void publishingSequenceShouldNotBeStoredWhenProducerNamesAreNotSet() throws Exception {
212+
Future<?> run = run(builder());
213+
waitUntilStreamExists(s);
214+
String producerName = s + "-0"; // shooting in the dark here
215+
assertThat(client.queryPublisherSequence(producerName, s)).isZero();
216+
waitOneSecond();
217+
assertThat(client.queryPublisherSequence(producerName, s)).isZero();
218+
run.cancel(true);
219+
waitRunEnds();
220+
}
221+
198222
@Test
199223
@DisabledIfTlsNotEnabled
200224
void shouldConnectWithTls() throws Exception {
@@ -405,6 +429,11 @@ ArgumentsBuilder consumerNames(String pattern) {
405429
return this;
406430
}
407431

432+
ArgumentsBuilder producerNames(String pattern) {
433+
arguments.put("producer-names", pattern);
434+
return this;
435+
}
436+
408437
String build() {
409438
return this.arguments.entrySet().stream()
410439
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@
2222
import com.rabbitmq.stream.OffsetSpecification;
2323
import com.rabbitmq.stream.compression.Compression;
2424
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
25-
import com.rabbitmq.stream.perf.Utils.PatternConsumerNameStrategy;
25+
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
26+
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
2627
import com.rabbitmq.stream.perf.Utils.RangeTypeConverter;
2728
import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter;
2829
import java.util.Arrays;
2930
import java.util.Collections;
3031
import java.util.List;
3132
import java.util.Locale;
3233
import java.util.Random;
34+
import java.util.UUID;
3335
import java.util.function.BiFunction;
3436
import java.util.stream.IntStream;
3537
import java.util.stream.LongStream;
@@ -143,10 +145,36 @@ void compressionTypeConverterKo(String value) {
143145
"consumer-%2$d-on-stream-%1$s,consumer-2-on-stream-s1"
144146
})
145147
void consumerNameStrategy(String pattern, String expected) {
146-
BiFunction<String, Integer, String> strategy = new PatternConsumerNameStrategy(pattern);
148+
BiFunction<String, Integer, String> strategy = new PatternNameStrategy(pattern);
147149
assertThat(strategy.apply("s1", 2)).isEqualTo(expected);
148150
}
149151

152+
@Test
153+
void producerConsumerNameStrategyConverterShouldReturnUuidWhenAskedForUuid() {
154+
NameStrategyConverter nameStrategyConverter = new NameStrategyConverter();
155+
BiFunction<String, Integer, String> nameStrategy = nameStrategyConverter.convert("uuid");
156+
String name = nameStrategy.apply("stream", 1);
157+
UUID.fromString(name);
158+
assertThat(nameStrategy.apply("stream", 1)).isNotEqualTo(name);
159+
}
160+
161+
@Test
162+
void producerConsumerNameStrategyConverterShouldReturnEmptyStringWhenPatternIsEmptyString() {
163+
NameStrategyConverter nameStrategyConverter = new NameStrategyConverter();
164+
BiFunction<String, Integer, String> nameStrategy = nameStrategyConverter.convert("");
165+
assertThat(nameStrategy.apply("stream", 1)).isEmpty();
166+
assertThat(nameStrategy.apply("stream", 2)).isEmpty();
167+
}
168+
169+
@Test
170+
void producerConsumerNameStrategyConverterShouldReturnPatternStrategyWhenAsked() {
171+
NameStrategyConverter nameStrategyConverter = new NameStrategyConverter();
172+
BiFunction<String, Integer, String> nameStrategy =
173+
nameStrategyConverter.convert("stream-%s-consumer-%d");
174+
assertThat(nameStrategy).isInstanceOf(PatternNameStrategy.class);
175+
assertThat(nameStrategy.apply("s1", 2)).isEqualTo("stream-s1-consumer-2");
176+
}
177+
150178
@Test
151179
void sniServerNamesConverter() throws Exception {
152180
SniServerNamesConverter converter = new SniServerNamesConverter();

0 commit comments

Comments
 (0)