Skip to content

Add a lazy initialization option in the environment builder #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ can maintain before a new connection is open. The value must be between 1 and 25
a new connection is open. The value must be between 1 and 255.
|255

|`lazyInitialization`
|To delay the connection opening until necessary.
|false

|`tls`
|Configuration helper for TLS.
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ public interface EnvironmentBuilder {
EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
BackOffDelayPolicy topologyUpdateBackOffDelayPolicy);

/**
* To delay the connection opening until necessary.
*
* <p>No connection will be open before it is necessary (for stream management or
* producer/consumer creation).
*
* <p>Default is false.
*
* @param lazy
* @return this builder instance
*/
EnvironmentBuilder lazyInitialization(boolean lazy);

/**
* Create the {@link Environment} instance.
*
Expand Down
79 changes: 56 additions & 23 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
Expand Down Expand Up @@ -83,6 +84,8 @@ class StreamEnvironment implements Environment {
private final Clock clock = new Clock();
private final ScheduledFuture<?> clockRefreshFuture;
private final ByteBufAllocator byteBufAllocator;
private final AtomicBoolean locatorInitialized = new AtomicBoolean(false);
private final Runnable locatorInitializationSequence;
private volatile Client locator;

StreamEnvironment(
Expand All @@ -96,7 +99,8 @@ class StreamEnvironment implements Environment {
int maxTrackingConsumersByConnection,
int maxConsumersByConnection,
DefaultTlsConfiguration tlsConfiguration,
ByteBufAllocator byteBufAllocator) {
ByteBufAllocator byteBufAllocator,
boolean lazyInit) {
this(
scheduledExecutorService,
clientParametersPrototype,
Expand All @@ -109,6 +113,7 @@ class StreamEnvironment implements Environment {
maxConsumersByConnection,
tlsConfiguration,
byteBufAllocator,
lazyInit,
cp -> new Client(cp));
}

Expand All @@ -124,6 +129,7 @@ class StreamEnvironment implements Environment {
int maxConsumersByConnection,
DefaultTlsConfiguration tlsConfiguration,
ByteBufAllocator byteBufAllocator,
boolean lazyInit,
Function<Client.ClientParameters, Client> clientFactory) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
Expand Down Expand Up @@ -235,29 +241,41 @@ class StreamEnvironment implements Environment {
}
};
shutdownListenerReference.set(shutdownListener);
RuntimeException lastException = null;
for (Address address : addresses) {
address = addressResolver.resolve(address);
Client.ClientParameters locatorParameters =
clientParametersPrototype
.duplicate()
.host(address.host())
.port(address.port())
.clientProperty("connection_name", "rabbitmq-stream-locator")
.shutdownListener(shutdownListenerReference.get());
try {
this.locator = clientFactory.apply(locatorParameters);
LOGGER.debug("Locator connected to {}", address);
break;
} catch (RuntimeException e) {
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
lastException = e;
}
}
if (this.locator == null) {
throw lastException;
ClientParameters clientParametersForInit = clientParametersPrototype.duplicate();
Runnable locatorInitSequence =
() -> {
RuntimeException lastException = null;
for (Address address : addresses) {
address = addressResolver.resolve(address);
Client.ClientParameters locatorParameters =
clientParametersForInit
.duplicate()
.host(address.host())
.port(address.port())
.clientProperty("connection_name", "rabbitmq-stream-locator")
.shutdownListener(shutdownListenerReference.get());
try {
this.locator = clientFactory.apply(locatorParameters);
LOGGER.debug("Locator connected to {}", address);
break;
} catch (RuntimeException e) {
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
lastException = e;
}
}
if (this.locator == null) {
throw lastException;
}
};
if (lazyInit) {
this.locatorInitializationSequence = locatorInitSequence;
} else {
locatorInitSequence.run();
locatorInitialized.set(true);
this.locatorInitializationSequence = () -> {};
}
this.codec = locator.codec();
this.codec =
clientParametersPrototype.codec == null ? Codecs.DEFAULT : clientParametersPrototype.codec;
this.clockRefreshFuture =
this.scheduledExecutorService.scheduleAtFixedRate(
() -> this.clock.refresh(), 1, 1, SECONDS);
Expand Down Expand Up @@ -318,13 +336,26 @@ public ByteBufAllocator byteBufAllocator() {
return byteBufAllocator;
}

private void maybeInitializeLocator() {
if (this.locatorInitialized.compareAndSet(false, true)) {
try {
this.locatorInitializationSequence.run();
} catch (RuntimeException e) {
this.locatorInitialized.set(false);
throw e;
}
}
}

@Override
public StreamCreator streamCreator() {
maybeInitializeLocator();
return new StreamStreamCreator(this);
}

@Override
public void deleteStream(String stream) {
maybeInitializeLocator();
Client.Response response = this.locator().delete(stream);
if (!response.isOk()) {
throw new StreamException(
Expand All @@ -339,6 +370,7 @@ public void deleteStream(String stream) {

@Override
public ProducerBuilder producerBuilder() {
maybeInitializeLocator();
return new StreamProducerBuilder(this);
}

Expand All @@ -360,6 +392,7 @@ void removeConsumer(StreamConsumer consumer) {

@Override
public ConsumerBuilder consumerBuilder() {
maybeInitializeLocator();
return new StreamConsumerBuilder(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
private CompressionCodecFactory compressionCodecFactory;
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
private boolean lazyInit = false;

public StreamEnvironmentBuilder() {}

Expand Down Expand Up @@ -266,6 +267,12 @@ public EnvironmentBuilder maxConsumersByConnection(int maxConsumersByConnection)
return this;
}

@Override
public EnvironmentBuilder lazyInitialization(boolean lazy) {
this.lazyInit = lazy;
return this;
}

@Override
public TlsConfiguration tls() {
this.tls.enable();
Expand All @@ -290,7 +297,8 @@ public Environment build() {
maxTrackingConsumersByConnection,
maxConsumersByConnection,
tls,
byteBufAllocator);
byteBufAllocator,
lazyInit);
}

static final class DefaultTlsConfiguration implements TlsConfiguration {
Expand Down
77 changes: 72 additions & 5 deletions src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import com.rabbitmq.stream.AuthenticationFailureException;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.ChannelCustomizer;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.Host;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.Client.StreamMetadata;
Expand Down Expand Up @@ -60,6 +63,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class StreamEnvironmentTest {
Expand Down Expand Up @@ -158,9 +163,10 @@ void environmentCreationShouldSucceedWhenUsingTls() {
.close();
}

@Test
void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed() {
Environment environment = environmentBuilder.build();
@ParameterizedTest
@ValueSource(booleans = {false, true})
void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit) {
Environment environment = environmentBuilder.lazyInitialization(lazyInit).build();
Collection<Producer> producers =
IntStream.range(0, 2)
.mapToObj(i -> environment.producerBuilder().stream(stream).build())
Expand Down Expand Up @@ -317,8 +323,7 @@ void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info)

@Test
@TestUtils.DisabledIfRabbitMqCtlNotSet
void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown(TestInfo info)
throws Exception {
void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown() throws Exception {
Environment environment =
environmentBuilder
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(10)))
Expand Down Expand Up @@ -439,4 +444,66 @@ void streamCreationShouldBeIdempotent(TestInfo info) {
assertThat(client.delete(s).isOk()).isTrue();
}
}

@Test
void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() {
String dummyHost = UUID.randomUUID().toString();
Address dummyAddress = new Address(dummyHost, Client.DEFAULT_PORT);
try (Environment env =
environmentBuilder
.host(dummyHost)
.addressResolver(a -> dummyAddress)
.lazyInitialization(true)
.build()) {

assertThatThrownBy(() -> env.streamCreator().stream("should not have been created").create())
.isInstanceOf(StreamException.class);
assertThatThrownBy(() -> env.deleteStream("should not exist"))
.isInstanceOf(StreamException.class);
assertThatThrownBy(() -> env.producerBuilder().stream(stream).build())
.isInstanceOf(StreamException.class);
assertThatThrownBy(
() ->
env.consumerBuilder().stream(stream)
.messageHandler((context, message) -> {})
.build())
.isInstanceOf(StreamException.class);
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void createPublishConsumeDelete(boolean lazyInit, TestInfo info) {
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
String s = streamName(info);
env.streamCreator().stream(s).create();
int messageCount = 50_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
CountDownLatch consumeLatch = new CountDownLatch(messageCount);

Producer producer = env.producerBuilder().stream(s).build();
ConfirmationHandler confirmationHandler = confirmationStatus -> confirmLatch.countDown();
IntStream.range(0, messageCount)
.forEach(
i -> {
Message message =
producer.messageBuilder().addData("".getBytes(StandardCharsets.UTF_8)).build();
producer.send(message, confirmationHandler);
});

latchAssert(confirmLatch).completes();

Consumer consumer =
env.consumerBuilder().stream(s)
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> consumeLatch.countDown())
.build();

latchAssert(consumeLatch).completes();

producer.close();
consumer.close();
env.deleteStream(s);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.*;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import io.netty.buffer.ByteBufAllocator;
import java.net.URI;
import java.time.Duration;
Expand All @@ -29,6 +30,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand Down Expand Up @@ -81,6 +84,7 @@ Client.ClientParameters duplicate() {
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
ByteBufAllocator.DEFAULT,
false,
cf);
}

Expand Down Expand Up @@ -140,7 +144,32 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
ByteBufAllocator.DEFAULT,
false,
cf);
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
}

@ParameterizedTest
@CsvSource({"false,1", "true,0"})
void shouldNotOpenConnectionWhenLazyInitIsEnabled(
boolean lazyInit, int expectedConnectionCreation) throws Exception {
reset(cf);
when(cf.apply(any(Client.ClientParameters.class))).thenReturn(client);
environment =
new StreamEnvironment(
scheduledExecutorService,
new ClientParameters(),
Collections.emptyList(),
recoveryBackOffDelayPolicy,
topologyUpdateBackOffDelayPolicy,
host -> host,
ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT,
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
ByteBufAllocator.DEFAULT,
lazyInit,
cf);
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
}
}