diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index 92c60946d0..a274e1339e 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -186,6 +186,10 @@ can maintain before a new connection is open. The value must be between 1 and 25
a new connection is open. The value must be between 1 and 255.
|255
+|`lazyInitialization`
+|To delay the connection opening until necessary.
+|false
+
|`tls`
|Configuration helper for TLS.
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
index 0fefd4ca79..93363b5855 100644
--- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
@@ -286,6 +286,19 @@ public interface EnvironmentBuilder {
EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
BackOffDelayPolicy topologyUpdateBackOffDelayPolicy);
+ /**
+ * To delay the connection opening until necessary.
+ *
+ *
No connection will be open before it is necessary (for stream management or
+ * producer/consumer creation).
+ *
+ *
Default is false.
+ *
+ * @param lazy
+ * @return this builder instance
+ */
+ EnvironmentBuilder lazyInitialization(boolean lazy);
+
/**
* Create the {@link Environment} instance.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index 30596557c9..4a6cfd782d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -29,6 +29,7 @@
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
@@ -83,6 +84,8 @@ class StreamEnvironment implements Environment {
private final Clock clock = new Clock();
private final ScheduledFuture> clockRefreshFuture;
private final ByteBufAllocator byteBufAllocator;
+ private final AtomicBoolean locatorInitialized = new AtomicBoolean(false);
+ private final Runnable locatorInitializationSequence;
private volatile Client locator;
StreamEnvironment(
@@ -96,7 +99,8 @@ class StreamEnvironment implements Environment {
int maxTrackingConsumersByConnection,
int maxConsumersByConnection,
DefaultTlsConfiguration tlsConfiguration,
- ByteBufAllocator byteBufAllocator) {
+ ByteBufAllocator byteBufAllocator,
+ boolean lazyInit) {
this(
scheduledExecutorService,
clientParametersPrototype,
@@ -109,6 +113,7 @@ class StreamEnvironment implements Environment {
maxConsumersByConnection,
tlsConfiguration,
byteBufAllocator,
+ lazyInit,
cp -> new Client(cp));
}
@@ -124,6 +129,7 @@ class StreamEnvironment implements Environment {
int maxConsumersByConnection,
DefaultTlsConfiguration tlsConfiguration,
ByteBufAllocator byteBufAllocator,
+ boolean lazyInit,
Function clientFactory) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
@@ -235,29 +241,41 @@ class StreamEnvironment implements Environment {
}
};
shutdownListenerReference.set(shutdownListener);
- RuntimeException lastException = null;
- for (Address address : addresses) {
- address = addressResolver.resolve(address);
- Client.ClientParameters locatorParameters =
- clientParametersPrototype
- .duplicate()
- .host(address.host())
- .port(address.port())
- .clientProperty("connection_name", "rabbitmq-stream-locator")
- .shutdownListener(shutdownListenerReference.get());
- try {
- this.locator = clientFactory.apply(locatorParameters);
- LOGGER.debug("Locator connected to {}", address);
- break;
- } catch (RuntimeException e) {
- LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
- lastException = e;
- }
- }
- if (this.locator == null) {
- throw lastException;
+ ClientParameters clientParametersForInit = clientParametersPrototype.duplicate();
+ Runnable locatorInitSequence =
+ () -> {
+ RuntimeException lastException = null;
+ for (Address address : addresses) {
+ address = addressResolver.resolve(address);
+ Client.ClientParameters locatorParameters =
+ clientParametersForInit
+ .duplicate()
+ .host(address.host())
+ .port(address.port())
+ .clientProperty("connection_name", "rabbitmq-stream-locator")
+ .shutdownListener(shutdownListenerReference.get());
+ try {
+ this.locator = clientFactory.apply(locatorParameters);
+ LOGGER.debug("Locator connected to {}", address);
+ break;
+ } catch (RuntimeException e) {
+ LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
+ lastException = e;
+ }
+ }
+ if (this.locator == null) {
+ throw lastException;
+ }
+ };
+ if (lazyInit) {
+ this.locatorInitializationSequence = locatorInitSequence;
+ } else {
+ locatorInitSequence.run();
+ locatorInitialized.set(true);
+ this.locatorInitializationSequence = () -> {};
}
- this.codec = locator.codec();
+ this.codec =
+ clientParametersPrototype.codec == null ? Codecs.DEFAULT : clientParametersPrototype.codec;
this.clockRefreshFuture =
this.scheduledExecutorService.scheduleAtFixedRate(
() -> this.clock.refresh(), 1, 1, SECONDS);
@@ -318,13 +336,26 @@ public ByteBufAllocator byteBufAllocator() {
return byteBufAllocator;
}
+ private void maybeInitializeLocator() {
+ if (this.locatorInitialized.compareAndSet(false, true)) {
+ try {
+ this.locatorInitializationSequence.run();
+ } catch (RuntimeException e) {
+ this.locatorInitialized.set(false);
+ throw e;
+ }
+ }
+ }
+
@Override
public StreamCreator streamCreator() {
+ maybeInitializeLocator();
return new StreamStreamCreator(this);
}
@Override
public void deleteStream(String stream) {
+ maybeInitializeLocator();
Client.Response response = this.locator().delete(stream);
if (!response.isOk()) {
throw new StreamException(
@@ -339,6 +370,7 @@ public void deleteStream(String stream) {
@Override
public ProducerBuilder producerBuilder() {
+ maybeInitializeLocator();
return new StreamProducerBuilder(this);
}
@@ -360,6 +392,7 @@ void removeConsumer(StreamConsumer consumer) {
@Override
public ConsumerBuilder consumerBuilder() {
+ maybeInitializeLocator();
return new StreamConsumerBuilder(this);
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
index c2fb0b5374..e42bd71ba3 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
@@ -60,6 +60,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
private CompressionCodecFactory compressionCodecFactory;
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
+ private boolean lazyInit = false;
public StreamEnvironmentBuilder() {}
@@ -266,6 +267,12 @@ public EnvironmentBuilder maxConsumersByConnection(int maxConsumersByConnection)
return this;
}
+ @Override
+ public EnvironmentBuilder lazyInitialization(boolean lazy) {
+ this.lazyInit = lazy;
+ return this;
+ }
+
@Override
public TlsConfiguration tls() {
this.tls.enable();
@@ -290,7 +297,8 @@ public Environment build() {
maxTrackingConsumersByConnection,
maxConsumersByConnection,
tls,
- byteBufAllocator);
+ byteBufAllocator,
+ lazyInit);
}
static final class DefaultTlsConfiguration implements TlsConfiguration {
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
index 99c1ca3a20..98408b3b5b 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
@@ -26,11 +26,14 @@
import com.rabbitmq.stream.AuthenticationFailureException;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.ChannelCustomizer;
+import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.Host;
+import com.rabbitmq.stream.Message;
+import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.Client.StreamMetadata;
@@ -60,6 +63,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class StreamEnvironmentTest {
@@ -158,9 +163,10 @@ void environmentCreationShouldSucceedWhenUsingTls() {
.close();
}
- @Test
- void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed() {
- Environment environment = environmentBuilder.build();
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit) {
+ Environment environment = environmentBuilder.lazyInitialization(lazyInit).build();
Collection producers =
IntStream.range(0, 2)
.mapToObj(i -> environment.producerBuilder().stream(stream).build())
@@ -317,8 +323,7 @@ void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info)
@Test
@TestUtils.DisabledIfRabbitMqCtlNotSet
- void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown(TestInfo info)
- throws Exception {
+ void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown() throws Exception {
Environment environment =
environmentBuilder
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(10)))
@@ -439,4 +444,66 @@ void streamCreationShouldBeIdempotent(TestInfo info) {
assertThat(client.delete(s).isOk()).isTrue();
}
}
+
+ @Test
+ void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() {
+ String dummyHost = UUID.randomUUID().toString();
+ Address dummyAddress = new Address(dummyHost, Client.DEFAULT_PORT);
+ try (Environment env =
+ environmentBuilder
+ .host(dummyHost)
+ .addressResolver(a -> dummyAddress)
+ .lazyInitialization(true)
+ .build()) {
+
+ assertThatThrownBy(() -> env.streamCreator().stream("should not have been created").create())
+ .isInstanceOf(StreamException.class);
+ assertThatThrownBy(() -> env.deleteStream("should not exist"))
+ .isInstanceOf(StreamException.class);
+ assertThatThrownBy(() -> env.producerBuilder().stream(stream).build())
+ .isInstanceOf(StreamException.class);
+ assertThatThrownBy(
+ () ->
+ env.consumerBuilder().stream(stream)
+ .messageHandler((context, message) -> {})
+ .build())
+ .isInstanceOf(StreamException.class);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void createPublishConsumeDelete(boolean lazyInit, TestInfo info) {
+ try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
+ String s = streamName(info);
+ env.streamCreator().stream(s).create();
+ int messageCount = 50_000;
+ CountDownLatch confirmLatch = new CountDownLatch(messageCount);
+ CountDownLatch consumeLatch = new CountDownLatch(messageCount);
+
+ Producer producer = env.producerBuilder().stream(s).build();
+ ConfirmationHandler confirmationHandler = confirmationStatus -> confirmLatch.countDown();
+ IntStream.range(0, messageCount)
+ .forEach(
+ i -> {
+ Message message =
+ producer.messageBuilder().addData("".getBytes(StandardCharsets.UTF_8)).build();
+ producer.send(message, confirmationHandler);
+ });
+
+ latchAssert(confirmLatch).completes();
+
+ Consumer consumer =
+ env.consumerBuilder().stream(s)
+ .offset(OffsetSpecification.first())
+ .messageHandler((context, message) -> consumeLatch.countDown())
+ .build();
+
+ latchAssert(consumeLatch).completes();
+
+ producer.close();
+ consumer.close();
+ env.deleteStream(s);
+ }
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
index 0baf10dc78..ddd84286c7 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
@@ -17,6 +17,7 @@
import static org.mockito.Mockito.*;
import com.rabbitmq.stream.BackOffDelayPolicy;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
import io.netty.buffer.ByteBufAllocator;
import java.net.URI;
import java.time.Duration;
@@ -29,6 +30,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -81,6 +84,7 @@ Client.ClientParameters duplicate() {
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
ByteBufAllocator.DEFAULT,
+ false,
cf);
}
@@ -140,7 +144,32 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
ByteBufAllocator.DEFAULT,
+ false,
cf);
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
}
+
+ @ParameterizedTest
+ @CsvSource({"false,1", "true,0"})
+ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
+ boolean lazyInit, int expectedConnectionCreation) throws Exception {
+ reset(cf);
+ when(cf.apply(any(Client.ClientParameters.class))).thenReturn(client);
+ environment =
+ new StreamEnvironment(
+ scheduledExecutorService,
+ new ClientParameters(),
+ Collections.emptyList(),
+ recoveryBackOffDelayPolicy,
+ topologyUpdateBackOffDelayPolicy,
+ host -> host,
+ ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT,
+ ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
+ ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
+ null,
+ ByteBufAllocator.DEFAULT,
+ lazyInit,
+ cf);
+ verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
+ }
}