From 952f29bc08986b513ba416f36182f693eacbec01 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 8 Mar 2022 16:41:13 +0100 Subject: [PATCH 1/5] fix: log message --- .../operator/processing/event/EventProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 29f9adab55..5b26418ae3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -224,6 +224,7 @@ void eventProcessingFinished( } private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) { + log.debug("Postponing reconciliation for resource id: {}",resourceID); eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID); } From 2e841088b4908be16d356a36312950884075f062 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 08:36:46 +0100 Subject: [PATCH 2/5] fix: added temp res cache --- .../controller/TemporaryResourceCache.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java new file mode 100644 index 0000000000..ce6163782b --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java @@ -0,0 +1,65 @@ +package io.javaoperatorsdk.operator.processing.event.source.controller; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class TemporaryResourceCache { + + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); + + private final Map cache = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final ControllerResourceCache managedInformerEventSource; + + public TemporaryResourceCache(ControllerResourceCache managedInformerEventSource) { + this.managedInformerEventSource = managedInformerEventSource; + } + + public void removeResourceFromCache(T resource) { + lock.lock(); + try { + cache.remove(ResourceID.fromResource(resource)); + } finally { + lock.unlock(); + } + } + + public void putUpdatedResource(T newResource, String previousResourceVersion) { + lock.lock(); + try { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); + } + } finally { + lock.unlock(); + } + } + + public Optional getResourceFromCache(ResourceID resourceID) { + try { + lock.lock(); + return Optional.ofNullable(cache.get(resourceID)); + } finally { + lock.unlock(); + } + } +} + From 95e022fb1cb443e707104847290b4c59362c9e16 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 09:30:27 +0100 Subject: [PATCH 3/5] fix: wip --- .../operator/processing/Controller.java | 2 +- .../processing/event/EventProcessor.java | 6 +- .../ControllerResourceEventSource.java | 25 ++++-- .../controller/TemporaryResourceCache.java | 86 ++++++++++--------- .../processing/event/EventProcessorTest.java | 18 ++-- 5 files changed, 76 insertions(+), 61 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index f9a600f1a7..704945925f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -188,7 +188,7 @@ public void start() throws OperatorException { if (reconciler instanceof EventSourceInitializer) { ((EventSourceInitializer) reconciler) .prepareEventSources(new EventSourceContext<>( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), configurationService(), kubernetesClient)) .forEach(eventSourceManager::registerEventSource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 5b26418ae3..ad29553b11 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -50,7 +50,7 @@ class EventProcessor implements EventHandler, LifecycleAw EventProcessor(EventSourceManager eventSourceManager) { this( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), eventSourceManager.getController().getConfiguration().getName(), new ReconciliationDispatcher<>(eventSourceManager.getController()), @@ -73,7 +73,7 @@ class EventProcessor implements EventHandler, LifecycleAw Retry retry, Metrics metrics) { this( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName, reconciliationDispatcher, @@ -224,7 +224,7 @@ void eventProcessingFinished( } private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) { - log.debug("Postponing reconciliation for resource id: {}",resourceID); + log.debug("Postponing reconciliation for resource id: {}", resourceID); eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 9feabf40bf..0c728c306f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -5,6 +5,8 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +24,7 @@ import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; @@ -29,7 +32,7 @@ public class ControllerResourceEventSource extends AbstractResourceEventSource - implements ResourceEventHandler { + implements ResourceEventHandler, ResourceCache { public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace"; @@ -42,6 +45,7 @@ public class ControllerResourceEventSource private final ResourceEventFilter filter; private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; private final ControllerResourceCache cache; + private final TemporaryResourceCache temporaryResourceCache; public ControllerResourceEventSource(Controller controller) { super(controller.getConfiguration().getResourceClass()); @@ -50,7 +54,7 @@ public ControllerResourceEventSource(Controller controller) { var cloner = configurationService != null ? configurationService.getResourceCloner() : ConfigurationService.DEFAULT_CLONER; this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner); - + temporaryResourceCache = new TemporaryResourceCache<>(cache); var filters = new ResourceEventFilter[] { ResourceEventFilters.finalizerNeededAndApplied(), ResourceEventFilters.markedForDeletion(), @@ -162,8 +166,19 @@ public Optional get(ResourceID resourceID) { return cache.get(resourceID); } - public ControllerResourceCache getResourceCache() { - return cache; + @Override + public Stream keys() { + return cache.keys(); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list(predicate); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + return cache.list(namespace, predicate); } /** @@ -204,6 +219,6 @@ private void handleKubernetesClientException(Exception e) { @Override public Optional getAssociated(T primary) { - return cache.get(ResourceID.fromResource(primary)); + return get(ResourceID.fromResource(primary)); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java index ce6163782b..294ac95ef5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java @@ -4,62 +4,64 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; public class TemporaryResourceCache { - private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); - private final Map cache = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); - private final ControllerResourceCache managedInformerEventSource; + private final Map cache = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final ControllerResourceCache managedInformerEventSource; - public TemporaryResourceCache(ControllerResourceCache managedInformerEventSource) { - this.managedInformerEventSource = managedInformerEventSource; - } + public TemporaryResourceCache(ControllerResourceCache managedInformerEventSource) { + this.managedInformerEventSource = managedInformerEventSource; + } - public void removeResourceFromCache(T resource) { - lock.lock(); - try { - cache.remove(ResourceID.fromResource(resource)); - } finally { - lock.unlock(); - } + public void removeResourceFromCache(T resource) { + lock.lock(); + try { + cache.remove(ResourceID.fromResource(resource)); + } finally { + lock.unlock(); } + } - public void putUpdatedResource(T newResource, String previousResourceVersion) { - lock.lock(); - try { - var resourceId = ResourceID.fromResource(newResource); - var informerCacheResource = managedInformerEventSource.get(resourceId); - if (informerCacheResource.isEmpty()) { - log.debug("No cached value present for resource: {}", newResource); - return; - } - // if this is not true that means the cache was already updated - if (informerCacheResource.get().getMetadata().getResourceVersion() - .equals(previousResourceVersion)) { - log.debug("Putting resource to temporal cache with id: {}", resourceId); - cache.put(resourceId, newResource); - } else { - // if something is in cache it's surely obsolete now - cache.remove(resourceId); - } - } finally { - lock.unlock(); - } + public void putUpdatedResource(T newResource, String previousResourceVersion) { + lock.lock(); + try { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); + } + } finally { + lock.unlock(); } + } - public Optional getResourceFromCache(ResourceID resourceID) { - try { - lock.lock(); - return Optional.ofNullable(cache.get(resourceID)); - } finally { - lock.unlock(); - } + public Optional getResourceFromCache(ResourceID resourceID) { + try { + lock.lock(); + return Optional.ofNullable(cache.get(resourceID)); + } finally { + lock.unlock(); } + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 5a3f73742e..bd170b8173 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -13,7 +13,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.monitoring.Metrics; -import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceCache; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -44,8 +43,6 @@ class EventProcessorTest { private ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); - private ControllerResourceCache resourceCacheMock = - mock(ControllerResourceCache.class); private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private ControllerResourceEventSource controllerResourceEventSourceMock = mock(ControllerResourceEventSource.class); @@ -58,7 +55,6 @@ public void setup() { when(eventSourceManagerMock.getControllerResourceEventSource()) .thenReturn(controllerResourceEventSourceMock); - when(controllerResourceEventSourceMock.getResourceCache()).thenReturn(resourceCacheMock); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, @@ -83,7 +79,8 @@ public void dispatchesEventsIfNoExecutionInProgress() { @Test public void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); - when(resourceCacheMock.get(event.getRelatedCustomResourceID())).thenReturn(Optional.empty()); + when(controllerResourceEventSourceMock.get(event.getRelatedCustomResourceID())) + .thenReturn(Optional.empty()); eventProcessor.handleEvent(event); @@ -222,7 +219,7 @@ public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { updatedCr.getMetadata().setResourceVersion("2"); var mockCREventSource = mock(ControllerResourceEventSource.class); eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(cr)); + when(controllerResourceEventSourceMock.get(eq(crID))).thenReturn(Optional.of(cr)); when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), @@ -241,7 +238,7 @@ public void dontWhitelistsEventWhenOtherChangeDuringExecution() { otherChangeCR.getMetadata().setResourceVersion("3"); var mockCREventSource = mock(ControllerResourceEventSource.class); eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(otherChangeCR)); + when(controllerResourceEventSourceMock.get(eq(crID))).thenReturn(Optional.of(otherChangeCR)); when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), @@ -256,7 +253,7 @@ public void dontWhitelistsEventIfUpdatedEventInCache() { var cr = testCustomResource(crID); var mockCREventSource = mock(ControllerResourceEventSource.class); eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(cr)); + when(controllerResourceEventSourceMock.get(eq(crID))).thenReturn(Optional.of(cr)); when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), @@ -282,7 +279,8 @@ public void startProcessedMarkedEventReceivedBefore() { eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, metricsMock)); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(testCustomResource())); + when(controllerResourceEventSourceMock.get(eq(crID))) + .thenReturn(Optional.of(testCustomResource())); eventProcessor.handleEvent(new Event(crID)); verify(reconciliationDispatcherMock, timeout(100).times(0)).handleExecution(any()); @@ -311,7 +309,7 @@ private ResourceEvent prepareCREvent() { private ResourceEvent prepareCREvent(ResourceID uid) { TestCustomResource customResource = testCustomResource(uid); - when(resourceCacheMock.get(eq(uid))).thenReturn(Optional.of(customResource)); + when(controllerResourceEventSourceMock.get(eq(uid))).thenReturn(Optional.of(customResource)); return new ResourceEvent(ResourceAction.UPDATED, ResourceID.fromResource(customResource)); } From d42dc63d61ede0123fac8aa4b940f6f28b0084c4 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 11:13:55 +0100 Subject: [PATCH 4/5] wip --- .../controller/TemporaryResourceCache.java | 59 +++++++------------ 1 file changed, 21 insertions(+), 38 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java index 294ac95ef5..fb402b0b86 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java @@ -1,9 +1,8 @@ package io.javaoperatorsdk.operator.processing.event.source.controller; +import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,53 +14,37 @@ public class TemporaryResourceCache { private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); - private final Map cache = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final Map cache = new HashMap<>(); private final ControllerResourceCache managedInformerEventSource; public TemporaryResourceCache(ControllerResourceCache managedInformerEventSource) { this.managedInformerEventSource = managedInformerEventSource; } - public void removeResourceFromCache(T resource) { - lock.lock(); - try { - cache.remove(ResourceID.fromResource(resource)); - } finally { - lock.unlock(); - } + public synchronized void removeResourceFromCache(T resource) { + cache.remove(ResourceID.fromResource(resource)); } - public void putUpdatedResource(T newResource, String previousResourceVersion) { - lock.lock(); - try { - var resourceId = ResourceID.fromResource(newResource); - var informerCacheResource = managedInformerEventSource.get(resourceId); - if (informerCacheResource.isEmpty()) { - log.debug("No cached value present for resource: {}", newResource); - return; - } - // if this is not true that means the cache was already updated - if (informerCacheResource.get().getMetadata().getResourceVersion() - .equals(previousResourceVersion)) { - log.debug("Putting resource to temporal cache with id: {}", resourceId); - cache.put(resourceId, newResource); - } else { - // if something is in cache it's surely obsolete now - cache.remove(resourceId); - } - } finally { - lock.unlock(); + public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); } } - public Optional getResourceFromCache(ResourceID resourceID) { - try { - lock.lock(); - return Optional.ofNullable(cache.get(resourceID)); - } finally { - lock.unlock(); - } + public synchronized Optional getResourceFromCache(ResourceID resourceID) { + return Optional.ofNullable(cache.get(resourceID)); } } From 9416ca0cda51566dc372ec638435890a2b02d3a6 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Mar 2022 11:29:14 +0100 Subject: [PATCH 5/5] fix: wip --- .../processing/event/EventProcessor.java | 47 ++--------------- .../ControllerResourceEventSource.java | 42 +++++++-------- .../OnceWhitelistEventFilterEventFilter.java | 35 ------------- .../processing/event/EventProcessorTest.java | 51 ------------------- ...ceWhitelistEventFilterEventFilterTest.java | 41 --------------- .../ControllerResourceEventSourceTest.java | 22 -------- 6 files changed, 23 insertions(+), 215 deletions(-) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java delete mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index ad29553b11..a88d80783d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -29,7 +29,6 @@ import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; class EventProcessor implements EventHandler, LifecycleAware { @@ -208,12 +207,12 @@ void eventProcessingFinished( if (eventMarker.deleteEventPresent(resourceID)) { cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { + postExecutionControl.getUpdatedCustomResource().ifPresent(r -> { + eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate( + r, executionScope.getResource()); + }); if (eventMarker.eventPresent(resourceID)) { - if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) { - submitReconciliationExecution(resourceID); - } else { - postponeReconciliationAndHandleCacheSyncEvent(resourceID); - } + submitReconciliationExecution(resourceID); } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } @@ -223,42 +222,6 @@ void eventProcessingFinished( } } - private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) { - log.debug("Postponing reconciliation for resource id: {}", resourceID); - eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID); - } - - private boolean isCacheReadyForInstantReconciliation( - ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - if (!postExecutionControl.customResourceUpdatedDuringExecution()) { - return true; - } - String originalResourceVersion = getVersion(executionScope.getResource()); - String customResourceVersionAfterExecution = - getVersion( - postExecutionControl - .getUpdatedCustomResource() - .orElseThrow( - () -> new IllegalStateException( - "Updated custom resource must be present at this point of time"))); - String cachedCustomResourceVersion = - getVersion( - cache - .get(executionScope.getCustomResourceID()) - .orElseThrow( - () -> new IllegalStateException( - "Cached custom resource must be present at this point"))); - - if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { - return true; - } - // If the cached resource version equals neither the version before nor after execution - // probably an update happened on the custom resource independent of the framework during - // reconciliation. We cannot tell at this point if it happened before our update or before. - // (Well we could if we would parse resource version, but that should not be done by definition) - return !cachedCustomResourceVersion.equals(originalResourceVersion); - } - private void reScheduleExecutionIfInstructed( PostExecutionControl postExecutionControl, R customResource) { postExecutionControl diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 0c728c306f..e2b4e89ebd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -43,7 +43,6 @@ public class ControllerResourceEventSource new ConcurrentHashMap<>(); private final ResourceEventFilter filter; - private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; private final ControllerResourceCache cache; private final TemporaryResourceCache temporaryResourceCache; @@ -58,16 +57,8 @@ public ControllerResourceEventSource(Controller controller) { var filters = new ResourceEventFilter[] { ResourceEventFilters.finalizerNeededAndApplied(), ResourceEventFilters.markedForDeletion(), - ResourceEventFilters.generationAware(), - null + ResourceEventFilters.generationAware() }; - - if (controller.getConfiguration().isGenerationAware()) { - onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>(); - filters[filters.length - 1] = onceWhitelistEventFilterEventFilter; - } else { - onceWhitelistEventFilterEventFilter = null; - } if (controller.getConfiguration().getEventFilter() != null) { filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters)); } else { @@ -130,6 +121,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource try { log.debug( "Event received for resource: {}", getName(customResource)); + temporaryResourceCache.removeResourceFromCache(customResource); MDCUtils.addResourceInfo(customResource); controller.getEventSourceManager().broadcastOnResourceEvent(action, customResource, oldResource); @@ -162,8 +154,16 @@ public void onDelete(T resource, boolean b) { eventReceived(ResourceAction.DELETED, resource, null); } + + @Override public Optional get(ResourceID resourceID) { - return cache.get(resourceID); + Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); + if (resource.isPresent()) { + log.debug("Resource found in temporal cache for Resource ID: {}", resourceID); + return resource; + } else { + return cache.get(resourceID); + } } @Override @@ -193,19 +193,6 @@ public SharedIndexInformer getInformer(String namespace) { return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY)); } - /** - * This will ensure that the next event received after this method is called will not be filtered - * out. - * - * @param resourceID - to which the event is related - */ - public void whitelistNextEvent(ResourceID resourceID) { - if (onceWhitelistEventFilterEventFilter != null) { - onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID); - } - } - - private void handleKubernetesClientException(Exception e) { KubernetesClientException ke = (KubernetesClientException) e; if (404 == ke.getCode()) { @@ -221,4 +208,11 @@ private void handleKubernetesClientException(Exception e) { public Optional getAssociated(T primary) { return get(ResourceID.fromResource(primary)); } + + public void handleRecentResourceUpdate(T resource, + T previousResourceVersion) { + temporaryResourceCache.putUpdatedResource(resource, + previousResourceVersion.getMetadata().getResourceVersion()); + } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java deleted file mode 100644 index 8262ff1c21..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.controller; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public class OnceWhitelistEventFilterEventFilter - implements ResourceEventFilter { - - private static final Logger log = - LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class); - - private final ConcurrentMap whiteList = new ConcurrentHashMap<>(); - - @Override - public boolean acceptChange(ControllerConfiguration configuration, T oldResource, - T newResource) { - ResourceID resourceID = ResourceID.fromResource(newResource); - boolean res = whiteList.remove(resourceID, resourceID); - if (res) { - log.debug("Accepting whitelisted event for CR id: {}", resourceID); - } - return res; - } - - public void whitelistNextEvent(ResourceID resourceID) { - whiteList.putIfAbsent(resourceID, resourceID); - } -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index bd170b8173..7422e6f572 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -211,57 +211,6 @@ public void doNotFireEventsIfClosing() { verify(reconciliationDispatcherMock, timeout(50).times(0)).handleExecution(any()); } - @Test - public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var updatedCr = testCustomResource(crID); - updatedCr.getMetadata().setResourceVersion("2"); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(controllerResourceEventSourceMock.get(eq(crID))).thenReturn(Optional.of(cr)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(updatedCr)); - - verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID)); - } - - @Test - public void dontWhitelistsEventWhenOtherChangeDuringExecution() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var updatedCr = testCustomResource(crID); - updatedCr.getMetadata().setResourceVersion("2"); - var otherChangeCR = testCustomResource(crID); - otherChangeCR.getMetadata().setResourceVersion("3"); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(controllerResourceEventSourceMock.get(eq(crID))).thenReturn(Optional.of(otherChangeCR)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(updatedCr)); - - verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); - } - - @Test - public void dontWhitelistsEventIfUpdatedEventInCache() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(controllerResourceEventSourceMock.get(eq(crID))).thenReturn(Optional.of(cr)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(cr)); - - verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); - } - @Test public void cancelScheduleOnceEventsOnSuccessfulExecution() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java deleted file mode 100644 index f82bea55c7..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source; - -import org.junit.jupiter.api.Test; - -import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.controller.OnceWhitelistEventFilterEventFilter; - -import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; -import static org.assertj.core.api.Assertions.assertThat; - -class OnceWhitelistEventFilterEventFilterTest { - - private OnceWhitelistEventFilterEventFilter filter = new OnceWhitelistEventFilterEventFilter<>(); - - @Test - public void notAcceptCustomResourceNotWhitelisted() { - assertThat(filter.acceptChange(null, - testCustomResource(), testCustomResource())).isFalse(); - } - - @Test - public void allowCustomResourceWhitelisted() { - var cr = TestUtils.testCustomResource(); - - filter.whitelistNextEvent(ResourceID.fromResource(cr)); - - assertThat(filter.acceptChange(null, cr, cr)).isTrue(); - } - - @Test - public void allowCustomResourceWhitelistedOnlyOnce() { - var cr = TestUtils.testCustomResource(); - - filter.whitelistNextEvent(ResourceID.fromResource(cr)); - - assertThat(filter.acceptChange(null, cr, cr)).isTrue(); - assertThat(filter.acceptChange(null, cr, cr)).isFalse(); - } - -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 5859115ee2..7f20918f13 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -14,7 +14,6 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; -import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -103,27 +102,6 @@ public void eventWithNoGenerationProcessedIfNoFinalizer() { verify(eventHandler, times(1)).handleEvent(any()); } - @Test - public void handlesNextEventIfWhitelisted() { - TestCustomResource customResource = TestUtils.testCustomResource(); - customResource.getMetadata().setFinalizers(List.of(FINALIZER)); - source.whitelistNextEvent(ResourceID.fromResource(customResource)); - - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); - - verify(eventHandler, times(1)).handleEvent(any()); - } - - @Test - public void notHandlesNextEventIfNotWhitelisted() { - TestCustomResource customResource = TestUtils.testCustomResource(); - customResource.getMetadata().setFinalizers(List.of(FINALIZER)); - - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); - - verify(eventHandler, times(0)).handleEvent(any()); - } - @Test public void callsBroadcastsOnResourceEvents() { TestCustomResource customResource1 = TestUtils.testCustomResource();