Skip to content

Commit ef33dd8

Browse files
authored
Merge pull request #783 from rabbitmq/close-resources-on-env-init-failure
Close resources if environment initialization fails
2 parents 518cd09 + b430f7e commit ef33dd8

File tree

4 files changed

+242
-130
lines changed

4 files changed

+242
-130
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
package com.rabbitmq.stream.impl;
16+
17+
import java.util.ArrayList;
18+
import java.util.Collections;
19+
import java.util.List;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* Helper to register callbacks and call them in reverse order. Registered callbacks are made
26+
* automatically idempotent.
27+
*
28+
* <p>This class can be used to register closing callbacks, call them individually, and/or call all
29+
* of them (in LIFO order) with the {@link #close()} method.
30+
*
31+
* <p>From
32+
* https://github.com/rabbitmq/rabbitmq-perf-test/blob/main/src/main/java/com/rabbitmq/perf/ShutdownService.java.
33+
*/
34+
final class ShutdownService implements AutoCloseable {
35+
36+
private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownService.class);
37+
38+
private final List<AutoCloseable> closeables = Collections.synchronizedList(new ArrayList<>());
39+
40+
/**
41+
* Wrap and register the callback into an idempotent {@link AutoCloseable}.
42+
*
43+
* @param closeCallback
44+
* @return the callback as an idempotent {@link AutoCloseable}
45+
*/
46+
AutoCloseable wrap(CloseCallback closeCallback) {
47+
AtomicBoolean closingOrAlreadyClosed = new AtomicBoolean(false);
48+
AutoCloseable idempotentCloseCallback =
49+
new AutoCloseable() {
50+
@Override
51+
public void close() throws Exception {
52+
if (closingOrAlreadyClosed.compareAndSet(false, true)) {
53+
closeCallback.run();
54+
}
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return closeCallback.toString();
60+
}
61+
};
62+
closeables.add(idempotentCloseCallback);
63+
return idempotentCloseCallback;
64+
}
65+
66+
/** Close all the registered callbacks, in the reverse order of registration. */
67+
@Override
68+
public synchronized void close() {
69+
if (!closeables.isEmpty()) {
70+
for (int i = closeables.size() - 1; i >= 0; i--) {
71+
try {
72+
closeables.get(i).close();
73+
} catch (Exception e) {
74+
LOGGER.warn("Could not properly execute closing step '{}'", closeables.get(i), e);
75+
}
76+
}
77+
}
78+
}
79+
80+
@FunctionalInterface
81+
interface CloseCallback {
82+
83+
void run() throws Exception;
84+
}
85+
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 145 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -204,122 +204,136 @@ class StreamEnvironment implements Environment {
204204
.collect(toList());
205205
this.locators = List.copyOf(lctrs);
206206

207-
this.executorServiceFactory =
208-
new DefaultExecutorServiceFactory(
209-
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
210-
211-
if (clientParametersPrototype.eventLoopGroup == null) {
212-
this.eventLoopGroup = Utils.eventLoopGroup();
213-
this.clientParametersPrototype =
214-
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
215-
} else {
216-
this.eventLoopGroup = null;
217-
this.clientParametersPrototype =
218-
clientParametersPrototype
219-
.duplicate()
220-
.eventLoopGroup(clientParametersPrototype.eventLoopGroup);
221-
}
222-
ScheduledExecutorService executorService;
223-
if (scheduledExecutorService == null) {
224-
int threads = AVAILABLE_PROCESSORS;
225-
LOGGER.debug("Creating scheduled executor service with {} thread(s)", threads);
226-
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-scheduler-");
227-
executorService = Executors.newScheduledThreadPool(threads, threadFactory);
228-
this.privateScheduleExecutorService = true;
229-
} else {
230-
executorService = scheduledExecutorService;
231-
this.privateScheduleExecutorService = false;
232-
}
233-
this.scheduledExecutorService = executorService;
234-
235-
this.producersCoordinator =
236-
new ProducersCoordinator(
237-
this,
238-
maxProducersByConnection,
239-
maxTrackingConsumersByConnection,
240-
connectionNamingStrategy,
241-
coordinatorClientFactory(this, producerNodeRetryDelay),
242-
forceLeaderForProducers);
243-
this.consumersCoordinator =
244-
new ConsumersCoordinator(
245-
this,
246-
maxConsumersByConnection,
247-
connectionNamingStrategy,
248-
coordinatorClientFactory(this, consumerNodeRetryDelay),
249-
forceReplicaForConsumers,
250-
Utils.brokerPicker());
251-
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
252-
253-
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
254-
this.locatorReconnectionScheduledExecutorService =
255-
Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
256-
257-
ClientParameters clientParametersForInit = locatorParametersCopy();
258-
Runnable locatorInitSequence =
259-
() -> {
260-
RuntimeException lastException = null;
261-
for (int i = 0; i < locators.size(); i++) {
262-
Address address = addresses.get(i % addresses.size());
263-
Locator locator = locator(i);
264-
address = addressResolver.resolve(address);
265-
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
266-
Client.ClientParameters locatorParameters =
267-
clientParametersForInit
268-
.duplicate()
269-
.host(address.host())
270-
.port(address.port())
271-
.clientProperty("connection_name", connectionName)
272-
.shutdownListener(
273-
shutdownListener(locator, connectionNamingStrategy, clientFactory));
274-
try {
275-
Client client = clientFactory.apply(locatorParameters);
276-
locator.client(client);
277-
LOGGER.debug("Created locator connection '{}'", connectionName);
278-
LOGGER.debug("Locator connected to {}", address);
279-
} catch (RuntimeException e) {
280-
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
281-
lastException = e;
207+
ShutdownService shutdownService = new ShutdownService();
208+
try {
209+
this.executorServiceFactory =
210+
new DefaultExecutorServiceFactory(
211+
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
212+
shutdownService.wrap(this.executorServiceFactory::close);
213+
214+
if (clientParametersPrototype.eventLoopGroup == null) {
215+
this.eventLoopGroup = Utils.eventLoopGroup();
216+
shutdownService.wrap(() -> closeEventLoopGroup(this.eventLoopGroup));
217+
this.clientParametersPrototype =
218+
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
219+
} else {
220+
this.eventLoopGroup = null;
221+
this.clientParametersPrototype =
222+
clientParametersPrototype
223+
.duplicate()
224+
.eventLoopGroup(clientParametersPrototype.eventLoopGroup);
225+
}
226+
ScheduledExecutorService executorService;
227+
if (scheduledExecutorService == null) {
228+
int threads = AVAILABLE_PROCESSORS;
229+
LOGGER.debug("Creating scheduled executor service with {} thread(s)", threads);
230+
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-scheduler-");
231+
executorService = Executors.newScheduledThreadPool(threads, threadFactory);
232+
shutdownService.wrap(executorService::shutdownNow);
233+
this.privateScheduleExecutorService = true;
234+
} else {
235+
executorService = scheduledExecutorService;
236+
this.privateScheduleExecutorService = false;
237+
}
238+
this.scheduledExecutorService = executorService;
239+
240+
this.producersCoordinator =
241+
new ProducersCoordinator(
242+
this,
243+
maxProducersByConnection,
244+
maxTrackingConsumersByConnection,
245+
connectionNamingStrategy,
246+
coordinatorClientFactory(this, producerNodeRetryDelay),
247+
forceLeaderForProducers);
248+
shutdownService.wrap(this.producersCoordinator::close);
249+
this.consumersCoordinator =
250+
new ConsumersCoordinator(
251+
this,
252+
maxConsumersByConnection,
253+
connectionNamingStrategy,
254+
coordinatorClientFactory(this, consumerNodeRetryDelay),
255+
forceReplicaForConsumers,
256+
Utils.brokerPicker());
257+
shutdownService.wrap(this.consumersCoordinator::close);
258+
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
259+
shutdownService.wrap(this.offsetTrackingCoordinator::close);
260+
261+
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
262+
this.locatorReconnectionScheduledExecutorService =
263+
Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
264+
shutdownService.wrap(this.locatorReconnectionScheduledExecutorService::shutdownNow);
265+
266+
ClientParameters clientParametersForInit = locatorParametersCopy();
267+
Runnable locatorInitSequence =
268+
() -> {
269+
RuntimeException lastException = null;
270+
for (int i = 0; i < locators.size(); i++) {
271+
Address address = addresses.get(i % addresses.size());
272+
Locator locator = locator(i);
273+
address = addressResolver.resolve(address);
274+
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
275+
Client.ClientParameters locatorParameters =
276+
clientParametersForInit
277+
.duplicate()
278+
.host(address.host())
279+
.port(address.port())
280+
.clientProperty("connection_name", connectionName)
281+
.shutdownListener(
282+
shutdownListener(locator, connectionNamingStrategy, clientFactory));
283+
try {
284+
Client client = clientFactory.apply(locatorParameters);
285+
locator.client(client);
286+
LOGGER.debug("Created locator connection '{}'", connectionName);
287+
LOGGER.debug("Locator connected to {}", address);
288+
} catch (RuntimeException e) {
289+
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
290+
lastException = e;
291+
}
282292
}
283-
}
284-
if (this.locators.stream().allMatch(Locator::isNotSet)) {
285-
throw lastException == null
286-
? new StreamException("Not locator available")
287-
: lastException;
288-
} else {
289-
this.locators.forEach(
290-
l -> {
291-
if (l.isNotSet()) {
292-
ShutdownListener shutdownListener =
293-
shutdownListener(l, connectionNamingStrategy, clientFactory);
294-
Client.ClientParameters newLocatorParameters =
295-
this.locatorParametersCopy().shutdownListener(shutdownListener);
296-
scheduleLocatorConnection(
297-
newLocatorParameters,
298-
this.addressResolver,
299-
l,
300-
connectionNamingStrategy,
301-
clientFactory,
302-
this.locatorReconnectionScheduledExecutorService,
303-
this.recoveryBackOffDelayPolicy,
304-
l.label());
305-
}
306-
});
307-
}
308-
};
309-
if (lazyInit) {
310-
this.locatorInitializationSequence = locatorInitSequence;
311-
} else {
312-
locatorInitSequence.run();
313-
locatorsInitialized.set(true);
314-
this.locatorInitializationSequence = () -> {};
293+
if (this.locators.stream().allMatch(Locator::isNotSet)) {
294+
throw lastException == null
295+
? new StreamException("Not locator available")
296+
: lastException;
297+
} else {
298+
this.locators.forEach(
299+
l -> {
300+
if (l.isNotSet()) {
301+
ShutdownListener shutdownListener =
302+
shutdownListener(l, connectionNamingStrategy, clientFactory);
303+
Client.ClientParameters newLocatorParameters =
304+
this.locatorParametersCopy().shutdownListener(shutdownListener);
305+
scheduleLocatorConnection(
306+
newLocatorParameters,
307+
this.addressResolver,
308+
l,
309+
connectionNamingStrategy,
310+
clientFactory,
311+
this.locatorReconnectionScheduledExecutorService,
312+
this.recoveryBackOffDelayPolicy,
313+
l.label());
314+
}
315+
});
316+
}
317+
};
318+
if (lazyInit) {
319+
this.locatorInitializationSequence = locatorInitSequence;
320+
} else {
321+
locatorInitSequence.run();
322+
locatorsInitialized.set(true);
323+
this.locatorInitializationSequence = () -> {};
324+
}
325+
this.codec =
326+
clientParametersPrototype.codec() == null
327+
? Codecs.DEFAULT
328+
: clientParametersPrototype.codec();
329+
this.clockRefreshFuture =
330+
this.scheduledExecutorService.scheduleAtFixedRate(
331+
namedRunnable(this.clock::refresh, "Background clock refresh"), 1, 1, SECONDS);
332+
shutdownService.wrap(() -> this.clockRefreshFuture.cancel(false));
333+
} catch (RuntimeException e) {
334+
shutdownService.close();
335+
throw e;
315336
}
316-
this.codec =
317-
clientParametersPrototype.codec() == null
318-
? Codecs.DEFAULT
319-
: clientParametersPrototype.codec();
320-
this.clockRefreshFuture =
321-
this.scheduledExecutorService.scheduleAtFixedRate(
322-
namedRunnable(this.clock::refresh, "Background clock refresh"), 1, 1, SECONDS);
323337
}
324338

325339
private ShutdownListener shutdownListener(
@@ -717,20 +731,24 @@ public void close() {
717731
if (this.locatorReconnectionScheduledExecutorService != null) {
718732
this.locatorReconnectionScheduledExecutorService.shutdownNow();
719733
}
720-
try {
721-
if (this.eventLoopGroup != null
722-
&& (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) {
723-
LOGGER.debug("Closing Netty event loop group");
724-
this.eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
725-
}
726-
} catch (InterruptedException e) {
727-
LOGGER.info("Event loop group closing has been interrupted");
728-
Thread.currentThread().interrupt();
729-
} catch (ExecutionException e) {
730-
LOGGER.info("Event loop group closing failed", e);
731-
} catch (TimeoutException e) {
732-
LOGGER.info("Could not close event loop group in 10 seconds");
734+
closeEventLoopGroup(this.eventLoopGroup);
735+
}
736+
}
737+
738+
private static void closeEventLoopGroup(EventLoopGroup eventLoopGroup) {
739+
try {
740+
if (eventLoopGroup != null
741+
&& (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown())) {
742+
LOGGER.debug("Closing Netty event loop group");
743+
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
733744
}
745+
} catch (InterruptedException e) {
746+
LOGGER.info("Event loop group closing has been interrupted");
747+
Thread.currentThread().interrupt();
748+
} catch (ExecutionException e) {
749+
LOGGER.info("Event loop group closing failed", e);
750+
} catch (TimeoutException e) {
751+
LOGGER.info("Could not close event loop group in 10 seconds");
734752
}
735753
}
736754

0 commit comments

Comments
 (0)