Skip to content

Commit f44b512

Browse files
committed
fix: ensure controller and retry sources are started first
This is needed to ensure that they are ready to process events as soon as the informer-backed sources start.
1 parent 596756a commit f44b512

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import java.util.Collections;
4-
import java.util.HashSet;
53
import java.util.Objects;
64
import java.util.Set;
5+
import java.util.concurrent.ConcurrentNavigableMap;
6+
import java.util.concurrent.ConcurrentSkipListMap;
77
import java.util.concurrent.locks.ReentrantLock;
88

99
import org.slf4j.Logger;
@@ -25,7 +25,8 @@ public class EventSourceManager<R extends HasMetadata>
2525
private static final Logger log = LoggerFactory.getLogger(EventSourceManager.class);
2626

2727
private final ReentrantLock lock = new ReentrantLock();
28-
private final Set<EventSource> eventSources = Collections.synchronizedSet(new HashSet<>());
28+
private final ConcurrentNavigableMap<String, EventSource> eventSources =
29+
new ConcurrentSkipListMap<>();
2930
private final EventProcessor<R> eventProcessor;
3031
private TimerEventSource<R> retryAndRescheduleTimerEventSource;
3132
private ControllerResourceEventSource<R> controllerResourceEventSource;
@@ -56,7 +57,7 @@ public void start() throws OperatorException {
5657
lock.lock();
5758
try {
5859
log.debug("Starting event sources.");
59-
for (var eventSource : eventSources) {
60+
for (var eventSource : eventSources.values()) {
6061
try {
6162
eventSource.start();
6263
} catch (Exception e) {
@@ -73,7 +74,7 @@ public void stop() {
7374
lock.lock();
7475
try {
7576
log.debug("Closing event sources.");
76-
for (var eventSource : eventSources) {
77+
for (var eventSource : eventSources.values()) {
7778
try {
7879
eventSource.stop();
7980
} catch (Exception e) {
@@ -93,7 +94,7 @@ public final void registerEventSource(EventSource eventSource)
9394
Objects.requireNonNull(eventSource, "EventSource must not be null");
9495
lock.lock();
9596
try {
96-
eventSources.add(eventSource);
97+
eventSources.put(keyFor(eventSource), eventSource);
9798
eventSource.setEventHandler(eventProcessor);
9899
eventSource.setEventRegistry(this);
99100
} catch (Throwable e) {
@@ -108,10 +109,23 @@ public final void registerEventSource(EventSource eventSource)
108109
}
109110
}
110111

112+
private String keyFor(EventSource source) {
113+
var name = source.getClass().getName();
114+
// make sure we process controller and timer event sources first
115+
// this is needed so that these sources are set when informer sources start so that events can
116+
// properly be processed
117+
if (source == controllerResourceEventSource) {
118+
name = 0 + name;
119+
} else if (source == retryAndRescheduleTimerEventSource) {
120+
name = 1 + name;
121+
}
122+
return name;
123+
}
124+
111125
public void cleanupForCustomResource(ResourceID customResourceUid) {
112126
lock.lock();
113127
try {
114-
for (EventSource eventSource : this.eventSources) {
128+
for (EventSource eventSource : this.eventSources.values()) {
115129
eventSource.cleanupForResource(customResourceUid);
116130
}
117131
} finally {
@@ -121,7 +135,7 @@ public void cleanupForCustomResource(ResourceID customResourceUid) {
121135

122136
@Override
123137
public Set<EventSource> getRegisteredEventSources() {
124-
return Collections.unmodifiableSet(eventSources);
138+
return Set.copyOf(eventSources.values());
125139
}
126140

127141
@Override

0 commit comments

Comments
 (0)