From eec8823185ea71e6fdeeb5bed0e1a413217e2998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 21 Jul 2021 17:55:43 +0200 Subject: [PATCH] Add a lazy initialization option in the environment builder Fixes #14 --- src/docs/asciidoc/api.adoc | 4 + .../rabbitmq/stream/EnvironmentBuilder.java | 13 +++ .../stream/impl/StreamEnvironment.java | 79 +++++++++++++------ .../stream/impl/StreamEnvironmentBuilder.java | 10 ++- .../stream/impl/StreamEnvironmentTest.java | 77 ++++++++++++++++-- .../impl/StreamEnvironmentUnitTest.java | 29 +++++++ 6 files changed, 183 insertions(+), 29 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 92c60946d0..a274e1339e 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -186,6 +186,10 @@ can maintain before a new connection is open. The value must be between 1 and 25 a new connection is open. The value must be between 1 and 255. |255 +|`lazyInitialization` +|To delay the connection opening until necessary. +|false + |`tls` |Configuration helper for TLS. |TLS is enabled if a `rabbitmq-stream+tls` URI is provided. diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 0fefd4ca79..93363b5855 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -286,6 +286,19 @@ public interface EnvironmentBuilder { EnvironmentBuilder topologyUpdateBackOffDelayPolicy( BackOffDelayPolicy topologyUpdateBackOffDelayPolicy); + /** + * To delay the connection opening until necessary. + * + *

No connection will be open before it is necessary (for stream management or + * producer/consumer creation). + * + *

Default is false. + * + * @param lazy + * @return this builder instance + */ + EnvironmentBuilder lazyInitialization(boolean lazy); + /** * Create the {@link Environment} instance. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 30596557c9..4a6cfd782d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -29,6 +29,7 @@ import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.compression.CompressionCodecFactory; +import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration; @@ -83,6 +84,8 @@ class StreamEnvironment implements Environment { private final Clock clock = new Clock(); private final ScheduledFuture clockRefreshFuture; private final ByteBufAllocator byteBufAllocator; + private final AtomicBoolean locatorInitialized = new AtomicBoolean(false); + private final Runnable locatorInitializationSequence; private volatile Client locator; StreamEnvironment( @@ -96,7 +99,8 @@ class StreamEnvironment implements Environment { int maxTrackingConsumersByConnection, int maxConsumersByConnection, DefaultTlsConfiguration tlsConfiguration, - ByteBufAllocator byteBufAllocator) { + ByteBufAllocator byteBufAllocator, + boolean lazyInit) { this( scheduledExecutorService, clientParametersPrototype, @@ -109,6 +113,7 @@ class StreamEnvironment implements Environment { maxConsumersByConnection, tlsConfiguration, byteBufAllocator, + lazyInit, cp -> new Client(cp)); } @@ -124,6 +129,7 @@ class StreamEnvironment implements Environment { int maxConsumersByConnection, DefaultTlsConfiguration tlsConfiguration, ByteBufAllocator byteBufAllocator, + boolean lazyInit, Function clientFactory) { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; @@ -235,29 +241,41 @@ class StreamEnvironment implements Environment { } }; shutdownListenerReference.set(shutdownListener); - RuntimeException lastException = null; - for (Address address : addresses) { - address = addressResolver.resolve(address); - Client.ClientParameters locatorParameters = - clientParametersPrototype - .duplicate() - .host(address.host()) - .port(address.port()) - .clientProperty("connection_name", "rabbitmq-stream-locator") - .shutdownListener(shutdownListenerReference.get()); - try { - this.locator = clientFactory.apply(locatorParameters); - LOGGER.debug("Locator connected to {}", address); - break; - } catch (RuntimeException e) { - LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage()); - lastException = e; - } - } - if (this.locator == null) { - throw lastException; + ClientParameters clientParametersForInit = clientParametersPrototype.duplicate(); + Runnable locatorInitSequence = + () -> { + RuntimeException lastException = null; + for (Address address : addresses) { + address = addressResolver.resolve(address); + Client.ClientParameters locatorParameters = + clientParametersForInit + .duplicate() + .host(address.host()) + .port(address.port()) + .clientProperty("connection_name", "rabbitmq-stream-locator") + .shutdownListener(shutdownListenerReference.get()); + try { + this.locator = clientFactory.apply(locatorParameters); + LOGGER.debug("Locator connected to {}", address); + break; + } catch (RuntimeException e) { + LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage()); + lastException = e; + } + } + if (this.locator == null) { + throw lastException; + } + }; + if (lazyInit) { + this.locatorInitializationSequence = locatorInitSequence; + } else { + locatorInitSequence.run(); + locatorInitialized.set(true); + this.locatorInitializationSequence = () -> {}; } - this.codec = locator.codec(); + this.codec = + clientParametersPrototype.codec == null ? Codecs.DEFAULT : clientParametersPrototype.codec; this.clockRefreshFuture = this.scheduledExecutorService.scheduleAtFixedRate( () -> this.clock.refresh(), 1, 1, SECONDS); @@ -318,13 +336,26 @@ public ByteBufAllocator byteBufAllocator() { return byteBufAllocator; } + private void maybeInitializeLocator() { + if (this.locatorInitialized.compareAndSet(false, true)) { + try { + this.locatorInitializationSequence.run(); + } catch (RuntimeException e) { + this.locatorInitialized.set(false); + throw e; + } + } + } + @Override public StreamCreator streamCreator() { + maybeInitializeLocator(); return new StreamStreamCreator(this); } @Override public void deleteStream(String stream) { + maybeInitializeLocator(); Client.Response response = this.locator().delete(stream); if (!response.isOk()) { throw new StreamException( @@ -339,6 +370,7 @@ public void deleteStream(String stream) { @Override public ProducerBuilder producerBuilder() { + maybeInitializeLocator(); return new StreamProducerBuilder(this); } @@ -360,6 +392,7 @@ void removeConsumer(StreamConsumer consumer) { @Override public ConsumerBuilder consumerBuilder() { + maybeInitializeLocator(); return new StreamConsumerBuilder(this); } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index c2fb0b5374..e42bd71ba3 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -60,6 +60,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT; private CompressionCodecFactory compressionCodecFactory; private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; + private boolean lazyInit = false; public StreamEnvironmentBuilder() {} @@ -266,6 +267,12 @@ public EnvironmentBuilder maxConsumersByConnection(int maxConsumersByConnection) return this; } + @Override + public EnvironmentBuilder lazyInitialization(boolean lazy) { + this.lazyInit = lazy; + return this; + } + @Override public TlsConfiguration tls() { this.tls.enable(); @@ -290,7 +297,8 @@ public Environment build() { maxTrackingConsumersByConnection, maxConsumersByConnection, tls, - byteBufAllocator); + byteBufAllocator, + lazyInit); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 99c1ca3a20..98408b3b5b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -26,11 +26,14 @@ import com.rabbitmq.stream.AuthenticationFailureException; import com.rabbitmq.stream.BackOffDelayPolicy; import com.rabbitmq.stream.ChannelCustomizer; +import com.rabbitmq.stream.ConfirmationHandler; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.Host; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.impl.Client.StreamMetadata; @@ -60,6 +63,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamEnvironmentTest { @@ -158,9 +163,10 @@ void environmentCreationShouldSucceedWhenUsingTls() { .close(); } - @Test - void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed() { - Environment environment = environmentBuilder.build(); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit) { + Environment environment = environmentBuilder.lazyInitialization(lazyInit).build(); Collection producers = IntStream.range(0, 2) .mapToObj(i -> environment.producerBuilder().stream(stream).build()) @@ -317,8 +323,7 @@ void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) @Test @TestUtils.DisabledIfRabbitMqCtlNotSet - void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown(TestInfo info) - throws Exception { + void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown() throws Exception { Environment environment = environmentBuilder .recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(10))) @@ -439,4 +444,66 @@ void streamCreationShouldBeIdempotent(TestInfo info) { assertThat(client.delete(s).isOk()).isTrue(); } } + + @Test + void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() { + String dummyHost = UUID.randomUUID().toString(); + Address dummyAddress = new Address(dummyHost, Client.DEFAULT_PORT); + try (Environment env = + environmentBuilder + .host(dummyHost) + .addressResolver(a -> dummyAddress) + .lazyInitialization(true) + .build()) { + + assertThatThrownBy(() -> env.streamCreator().stream("should not have been created").create()) + .isInstanceOf(StreamException.class); + assertThatThrownBy(() -> env.deleteStream("should not exist")) + .isInstanceOf(StreamException.class); + assertThatThrownBy(() -> env.producerBuilder().stream(stream).build()) + .isInstanceOf(StreamException.class); + assertThatThrownBy( + () -> + env.consumerBuilder().stream(stream) + .messageHandler((context, message) -> {}) + .build()) + .isInstanceOf(StreamException.class); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void createPublishConsumeDelete(boolean lazyInit, TestInfo info) { + try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) { + String s = streamName(info); + env.streamCreator().stream(s).create(); + int messageCount = 50_000; + CountDownLatch confirmLatch = new CountDownLatch(messageCount); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + + Producer producer = env.producerBuilder().stream(s).build(); + ConfirmationHandler confirmationHandler = confirmationStatus -> confirmLatch.countDown(); + IntStream.range(0, messageCount) + .forEach( + i -> { + Message message = + producer.messageBuilder().addData("".getBytes(StandardCharsets.UTF_8)).build(); + producer.send(message, confirmationHandler); + }); + + latchAssert(confirmLatch).completes(); + + Consumer consumer = + env.consumerBuilder().stream(s) + .offset(OffsetSpecification.first()) + .messageHandler((context, message) -> consumeLatch.countDown()) + .build(); + + latchAssert(consumeLatch).completes(); + + producer.close(); + consumer.close(); + env.deleteStream(s); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index 0baf10dc78..ddd84286c7 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.*; import com.rabbitmq.stream.BackOffDelayPolicy; +import com.rabbitmq.stream.impl.Client.ClientParameters; import io.netty.buffer.ByteBufAllocator; import java.net.URI; import java.time.Duration; @@ -29,6 +30,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -81,6 +84,7 @@ Client.ClientParameters duplicate() { ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, ByteBufAllocator.DEFAULT, + false, cf); } @@ -140,7 +144,32 @@ void shouldTryUrisOnInitializationFailure() throws Exception { ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, ByteBufAllocator.DEFAULT, + false, cf); verify(cf, times(3)).apply(any(Client.ClientParameters.class)); } + + @ParameterizedTest + @CsvSource({"false,1", "true,0"}) + void shouldNotOpenConnectionWhenLazyInitIsEnabled( + boolean lazyInit, int expectedConnectionCreation) throws Exception { + reset(cf); + when(cf.apply(any(Client.ClientParameters.class))).thenReturn(client); + environment = + new StreamEnvironment( + scheduledExecutorService, + new ClientParameters(), + Collections.emptyList(), + recoveryBackOffDelayPolicy, + topologyUpdateBackOffDelayPolicy, + host -> host, + ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT, + ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, + ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, + null, + ByteBufAllocator.DEFAULT, + lazyInit, + cf); + verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class)); + } }