diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java index 963eb16f76..cc832a2e21 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java @@ -3,11 +3,9 @@ import java.util.Optional; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.processing.event.EventList; public interface Context { - EventList getEvents(); - Optional getRetryInfo(); + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java index 4614b562df..f32a784e11 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java @@ -3,21 +3,13 @@ import java.util.Optional; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.processing.event.EventList; public class DefaultContext implements Context { private final RetryInfo retryInfo; - private final EventList events; - public DefaultContext(EventList events, RetryInfo retryInfo) { + public DefaultContext(RetryInfo retryInfo) { this.retryInfo = retryInfo; - this.events = events; - } - - @Override - public EventList getEvents() { - return events; } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java index edad82be27..5779cc4eb9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java @@ -6,12 +6,15 @@ public interface ResourceController { /** + * Note that this method is used in combination of finalizers. If automatic finalizer handling is + * turned off for the controller, this method is not called. + * * The implementation should delete the associated component(s). Note that this is method is * called when an object is marked for deletion. After it's executed the custom resource finalizer * is automatically removed by the framework; unless the return value is * {@link DeleteControl#noFinalizerRemoval()}, which indicates that the controller has determined - * that the resource should not be deleted yet, in which case it is up to the controller to - * restore the resource's status so that it's not marked for deletion anymore. + * that the resource should not be deleted yet. This is usually a corner case, when a cleanup is + * tried again eventually. * *

* It's important that this method be idempotent, as it could be called several times, depending diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index aac6a08289..608a47567b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -20,11 +20,12 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; +import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; -import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; /** @@ -38,7 +39,6 @@ public class DefaultEventHandler> implements Even @Deprecated private static EventMonitor monitor = EventMonitor.NOOP; - private final EventBuffer eventBuffer; private final Set underProcessing = new HashSet<>(); private final EventDispatcher eventDispatcher; private final Retry retry; @@ -50,6 +50,7 @@ public class DefaultEventHandler> implements Even private volatile boolean running; private final ResourceCache resourceCache; private DefaultEventSourceManager eventSourceManager; + private final EventMarker eventMarker; public DefaultEventHandler(ConfiguredController controller, ResourceCache resourceCache) { this( @@ -58,18 +59,20 @@ public DefaultEventHandler(ConfiguredController controller, ResourceCache controller.getConfiguration().getName(), new EventDispatcher<>(controller), GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), - controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor()); + controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor(), + new EventMarker()); } DefaultEventHandler(EventDispatcher eventDispatcher, ResourceCache resourceCache, String relatedControllerName, - Retry retry) { - this(resourceCache, null, relatedControllerName, eventDispatcher, retry, null); + Retry retry, EventMarker eventMarker) { + this(resourceCache, null, relatedControllerName, eventDispatcher, retry, null, eventMarker); } private DefaultEventHandler(ResourceCache resourceCache, ExecutorService executor, String relatedControllerName, - EventDispatcher eventDispatcher, Retry retry, EventMonitor monitor) { + EventDispatcher eventDispatcher, Retry retry, EventMonitor monitor, + EventMarker eventMarker) { this.running = true; this.executor = executor == null @@ -79,9 +82,9 @@ private DefaultEventHandler(ResourceCache resourceCache, ExecutorService exec this.controllerName = relatedControllerName; this.eventDispatcher = eventDispatcher; this.retry = retry; - eventBuffer = new EventBuffer(); this.resourceCache = resourceCache; this.eventMonitor = monitor != null ? monitor : EventMonitor.NOOP; + this.eventMarker = eventMarker; } public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { @@ -113,71 +116,75 @@ private EventMonitor monitor() { @Override public void handleEvent(Event event) { + lock.lock(); try { - lock.lock(); log.debug("Received event: {}", event); if (!this.running) { log.debug("Skipping event: {} because the event handler is shutting down", event); return; } final var monitor = monitor(); - eventBuffer.addEvent(event.getRelatedCustomResourceID(), event); monitor.processedEvent(event.getRelatedCustomResourceID(), event); - executeBufferedEvents(event.getRelatedCustomResourceID()); - } finally { - lock.unlock(); - } - } - @Override - public void close() { - try { - lock.lock(); - this.running = false; + handleEventMarking(event); + if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) { + submitReconciliationExecution(event.getRelatedCustomResourceID()); + } else { + cleanupForDeletedEvent(event.getRelatedCustomResourceID()); + } } finally { lock.unlock(); } } - private boolean executeBufferedEvents(CustomResourceID customResourceUid) { - boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid); + private boolean submitReconciliationExecution(CustomResourceID customResourceUid) { boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid); Optional latestCustomResource = resourceCache.getCustomResource(customResourceUid); - if (!controllerUnderExecution && newEventForResourceId && latestCustomResource.isPresent()) { + if (!controllerUnderExecution + && latestCustomResource.isPresent()) { setUnderExecutionProcessing(customResourceUid); ExecutionScope executionScope = new ExecutionScope( - eventBuffer.getAndRemoveEventsForExecution(customResourceUid), latestCustomResource.get(), retryInfo(customResourceUid)); + eventMarker.unMarkEventReceived(customResourceUid); log.debug("Executing events for custom resource. Scope: {}", executionScope); executor.execute(new ControllerExecution(executionScope)); return true; } else { log.debug( - "Skipping executing controller for resource id: {}. Events in queue: {}." + "Skipping executing controller for resource id: {}." + " Controller in execution: {}. Latest CustomResource present: {}", customResourceUid, - newEventForResourceId, controllerUnderExecution, latestCustomResource.isPresent()); if (latestCustomResource.isEmpty()) { - log.warn("no custom resource found in cache for CustomResourceID: {}", customResourceUid); + log.warn("no custom resource found in cache for CustomResourceID: {}", + customResourceUid); } return false; } } + private void handleEventMarking(Event event) { + if (event instanceof CustomResourceEvent && + ((CustomResourceEvent) event).getAction() == ResourceAction.DELETED) { + eventMarker.markDeleteEventReceived(event); + } else if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) { + eventMarker.markEventReceived(event); + } + } + private RetryInfo retryInfo(CustomResourceID customResourceUid) { return retryState.get(customResourceUid); } void eventProcessingFinished( ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + lock.lock(); try { - lock.lock(); if (!running) { return; } @@ -188,23 +195,29 @@ void eventProcessingFinished( postExecutionControl); unsetUnderExecution(executionScope.getCustomResourceID()); - if (retry != null && postExecutionControl.exceptionDuringExecution()) { + // If a delete event present at this phase, it was received during reconciliation. + // So we either removed the finalizer during reconciliation or we don't use finalizers. + // Either way we don't want to retry. + if (retry != null && postExecutionControl.exceptionDuringExecution() && + !eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { handleRetryOnException(executionScope); - final var monitor = monitor(); - executionScope.getEvents() - .forEach(e -> monitor.failedEvent(executionScope.getCustomResourceID(), e)); + // todo revisit monitoring since events are not present anymore + // final var monitor = monitor(); executionScope.getEvents().forEach(e -> + // monitor.failedEvent(executionScope.getCustomResourceID(), e)); return; } if (retry != null) { - markSuccessfulExecutionRegardingRetry(executionScope); + handleSuccessfulExecutionRegardingRetry(executionScope); } - if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { - cleanupAfterDeletedEvent(executionScope.getCustomResourceID()); + if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { + cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { - var executed = executeBufferedEvents(executionScope.getCustomResourceID()); - if (!executed) { - reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getCustomResource()); + if (eventMarker.eventPresent(executionScope.getCustomResourceID())) { + submitReconciliationExecution(executionScope.getCustomResourceID()); + } else { + reScheduleExecutionIfInstructed(postExecutionControl, + executionScope.getCustomResource()); } } } finally { @@ -227,13 +240,13 @@ private void reScheduleExecutionIfInstructed(PostExecutionControl postExecuti private void handleRetryOnException(ExecutionScope executionScope) { RetryExecution execution = getOrInitRetryExecution(executionScope); var customResourceID = executionScope.getCustomResourceID(); - boolean newEventsExists = eventBuffer - .newEventsExists(customResourceID); - eventBuffer.putBackEvents(customResourceID, executionScope.getEvents()); + boolean eventPresent = eventMarker.eventPresent(customResourceID); + eventMarker.markEventReceived(customResourceID); - if (newEventsExists) { - log.debug("New events exists for for resource id: {}", customResourceID); - executeBufferedEvents(customResourceID); + if (eventPresent) { + log.debug("New events exists for for resource id: {}", + customResourceID); + submitReconciliationExecution(customResourceID); return; } Optional nextDelay = execution.nextDelay(); @@ -251,7 +264,7 @@ private void handleRetryOnException(ExecutionScope executionScope) { () -> log.error("Exhausted retries for {}", executionScope)); } - private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { + private void handleSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { log.debug( "Marking successful execution for resource: {}", getName(executionScope.getCustomResource())); @@ -270,9 +283,9 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) return retryExecution; } - private void cleanupAfterDeletedEvent(CustomResourceID customResourceUid) { + private void cleanupForDeletedEvent(CustomResourceID customResourceUid) { eventSourceManager.cleanupForCustomResource(customResourceUid); - eventBuffer.cleanup(customResourceUid); + eventMarker.cleanup(customResourceUid); } private boolean isControllerUnderExecution(CustomResourceID customResourceUid) { @@ -287,6 +300,15 @@ private void unsetUnderExecution(CustomResourceID customResourceUid) { underProcessing.remove(customResourceUid); } + @Override + public void close() { + lock.lock(); + try { + this.running = false; + } finally { + lock.unlock(); + } + } private class ControllerExecution implements Runnable { private final ExecutionScope executionScope; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java deleted file mode 100644 index b9c565a001..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java +++ /dev/null @@ -1,45 +0,0 @@ -package io.javaoperatorsdk.operator.processing; - -import java.util.*; - -import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -import io.javaoperatorsdk.operator.processing.event.Event; - -class EventBuffer { - - private final Map> events = new HashMap<>(); - - public void addEvent(Event event) { - addEvent(event.getRelatedCustomResourceID(), event); - } - - public void addEvent(CustomResourceID uid, Event event) { - Objects.requireNonNull(uid, "uid"); - Objects.requireNonNull(event, "event"); - - List crEvents = events.computeIfAbsent(uid, (customResourceID) -> new LinkedList<>()); - crEvents.add(event); - } - - public boolean newEventsExists(CustomResourceID resourceId) { - return events.get(resourceId) != null && !events.get(resourceId).isEmpty(); - } - - public void putBackEvents(CustomResourceID resourceUid, List oldEvents) { - List crEvents = events.computeIfAbsent(resourceUid, (id) -> new LinkedList<>()); - crEvents.addAll(0, oldEvents); - } - - public boolean containsEvents(CustomResourceID customResourceId) { - return events.get(customResourceId) != null; - } - - public List getAndRemoveEventsForExecution(CustomResourceID resourceUid) { - List crEvents = events.remove(resourceUid); - return crEvents == null ? Collections.emptyList() : crEvents; - } - - public void cleanup(CustomResourceID resourceUid) { - events.remove(resourceUid); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index 0e385dc553..dd8ae8dbae 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -10,9 +10,7 @@ import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.api.*; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.EventList; -import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; @@ -55,15 +53,7 @@ public PostExecutionControl handleExecution(ExecutionScope executionScope) private PostExecutionControl handleDispatch(ExecutionScope executionScope) { R resource = executionScope.getCustomResource(); - log.debug("Handling events: {} for resource {}", executionScope.getEvents(), getName(resource)); - - if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { - log.debug( - "Skipping dispatch processing because of a Delete event: {} with version: {}", - getName(resource), - getVersion(resource)); - return PostExecutionControl.defaultDispatch(); - } + log.debug("Handling dispatch for resource {}", getName(resource)); final var markedForDeletion = resource.isMarkedForDeletion(); if (markedForDeletion && shouldNotDispatchToDelete(resource)) { @@ -75,8 +65,7 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) } Context context = - new DefaultContext<>( - new EventList(executionScope.getEvents()), executionScope.getRetryInfo()); + new DefaultContext<>(executionScope.getRetryInfo()); if (markedForDeletion) { return handleDelete(resource, context); } else { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventMarker.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventMarker.java new file mode 100644 index 0000000000..9be023416b --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventMarker.java @@ -0,0 +1,84 @@ +package io.javaoperatorsdk.operator.processing; + +import java.util.HashMap; + +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.Event; + +/** + * Manages the state of received events. Basically there can be only three distinct states relevant + * for event processing. Either an event is received, so we eventually process or no event for + * processing at the moment. The third case is if a DELETE event is received, this is a special case + * meaning that the custom resource is deleted. We don't want to do any processing anymore so other + * events are irrelevant for us from this point. Note that the dependant resources are either + * cleaned up by K8S garbage collection or by the controller implementation for cleanup. + */ +public class EventMarker { + + public enum EventingState { + /** Event but NOT Delete event present */ + EVENT_PRESENT, NO_EVENT_PRESENT, + /** Delete event present, from this point other events are not relevant */ + DELETE_EVENT_PRESENT, + } + + private final HashMap eventingState = new HashMap<>(); + + private EventingState getEventingState(CustomResourceID customResourceID) { + EventingState actualState = eventingState.get(customResourceID); + return actualState == null ? EventingState.NO_EVENT_PRESENT : actualState; + } + + private void setEventingState(CustomResourceID customResourceID, EventingState state) { + eventingState.put(customResourceID, state); + } + + public void markEventReceived(Event event) { + markEventReceived(event.getRelatedCustomResourceID()); + } + + public void markEventReceived(CustomResourceID customResourceID) { + if (deleteEventPresent(customResourceID)) { + throw new IllegalStateException("Cannot receive event after a delete event received"); + } + setEventingState(customResourceID, EventingState.EVENT_PRESENT); + } + + public void unMarkEventReceived(CustomResourceID customResourceID) { + var actualState = getEventingState(customResourceID); + switch (actualState) { + case EVENT_PRESENT: + setEventingState(customResourceID, + EventingState.NO_EVENT_PRESENT); + break; + case DELETE_EVENT_PRESENT: + throw new IllegalStateException("Cannot unmark delete event."); + } + } + + public void markDeleteEventReceived(Event event) { + markDeleteEventReceived(event.getRelatedCustomResourceID()); + } + + public void markDeleteEventReceived(CustomResourceID customResourceID) { + setEventingState(customResourceID, EventingState.DELETE_EVENT_PRESENT); + } + + public boolean deleteEventPresent(CustomResourceID customResourceID) { + return getEventingState(customResourceID) == EventingState.DELETE_EVENT_PRESENT; + } + + public boolean eventPresent(CustomResourceID customResourceID) { + var actualState = getEventingState(customResourceID); + return actualState == EventingState.EVENT_PRESENT; + } + + public boolean noEventPresent(CustomResourceID customResourceID) { + var actualState = getEventingState(customResourceID); + return actualState == EventingState.NO_EVENT_PRESENT; + } + + public void cleanup(CustomResourceID customResourceID) { + eventingState.remove(customResourceID); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java index 4f5f0ca7f7..6cf05e9308 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java @@ -1,29 +1,20 @@ package io.javaoperatorsdk.operator.processing; -import java.util.List; - import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -import io.javaoperatorsdk.operator.processing.event.Event; public class ExecutionScope> { - private final List events; // the latest custom resource from cache private final R customResource; private final RetryInfo retryInfo; - public ExecutionScope(List list, R customResource, RetryInfo retryInfo) { - this.events = list; + public ExecutionScope(R customResource, RetryInfo retryInfo) { this.customResource = customResource; this.retryInfo = retryInfo; } - public List getEvents() { - return events; - } - public R getCustomResource() { return customResource; } @@ -35,8 +26,6 @@ public CustomResourceID getCustomResourceID() { @Override public String toString() { return "ExecutionScope{" - + "events=" - + events + ", customResource uid: " + customResource.getMetadata().getUid() + ", version: " diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEvent.java index c445f2bf27..7c55939da5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEvent.java @@ -2,14 +2,13 @@ @SuppressWarnings("rawtypes") public class DefaultEvent implements Event { - private final CustomResourceID relatedCustomResource; + private final CustomResourceID relatedCustomResource; public DefaultEvent(CustomResourceID targetCustomResource) { this.relatedCustomResource = targetCustomResource; } - @Override public CustomResourceID getRelatedCustomResourceID() { return relatedCustomResource; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventList.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventList.java deleted file mode 100644 index d9560f6f1c..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventList.java +++ /dev/null @@ -1,27 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import java.util.List; -import java.util.Optional; - -public class EventList { - - private final List eventList; - - public EventList(List eventList) { - this.eventList = eventList; - } - - public List getList() { - return eventList; - } - - public Optional getLatestOfType(Class eventType) { - for (int i = eventList.size() - 1; i >= 0; i--) { - Event event = eventList.get(i); - if (event.getClass().isAssignableFrom(eventType)) { - return (Optional) Optional.of(event); - } - } - return Optional.empty(); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEvent.java index 15a67108db..0c20369e0a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEvent.java @@ -1,19 +1,15 @@ package io.javaoperatorsdk.operator.processing.event.internal; -import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.DefaultEvent; public class CustomResourceEvent extends DefaultEvent { private final ResourceAction action; - private final CustomResource customResource; - public CustomResourceEvent(ResourceAction action, - CustomResource resource) { - super(CustomResourceID.fromResource(resource)); - this.customResource = resource; + CustomResourceID customResourceID) { + super(customResourceID); this.action = action; } @@ -21,14 +17,9 @@ public CustomResourceEvent(ResourceAction action, public String toString() { return "CustomResourceEvent{" + "action=" + action + - ", customResource=" + customResource + '}'; } - public CustomResource getCustomResource() { - return customResource; - } - public ResourceAction getAction() { return action; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 9964e7b450..3b56e5b685 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -112,7 +112,8 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource CustomResourceEventFilters.generationAware())); if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) { - eventHandler.handleEvent(new CustomResourceEvent(action, clone(customResource))); + eventHandler.handleEvent( + new CustomResourceEvent(action, CustomResourceID.fromResource(customResource))); } else { log.debug( "Skipping event handling resource {} with version: {}", diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEvent.java deleted file mode 100644 index 81c78d31b4..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.internal; - -import java.util.Optional; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -import io.javaoperatorsdk.operator.processing.event.DefaultEvent; - -public class InformerEvent extends DefaultEvent { - - private final ResourceAction action; - private final T resource; - private final T oldResource; - - public InformerEvent(ResourceAction action, - T resource, T oldResource) { - super(CustomResourceID.fromResource(resource)); - this.action = action; - this.resource = resource; - this.oldResource = oldResource; - } - - public T getResource() { - return resource; - } - - public Optional getOldResource() { - return Optional.ofNullable(oldResource); - } - - public ResourceAction getAction() { - return action; - } - -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java index 34e409fa17..42b1c94084 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/InformerEventSource.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.client.informers.cache.Store; import io.javaoperatorsdk.operator.processing.event.AbstractEventSource; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.DefaultEvent; public class InformerEventSource extends AbstractEventSource { @@ -55,7 +56,7 @@ public InformerEventSource(SharedInformer sharedInformer, sharedInformer.addEventHandler(new ResourceEventHandler<>() { @Override public void onAdd(T t) { - propagateEvent(ResourceAction.ADDED, t, null); + propagateEvent(t); } @Override @@ -65,23 +66,23 @@ public void onUpdate(T oldObject, T newObject) { .equals(newObject.getMetadata().getResourceVersion())) { return; } - propagateEvent(ResourceAction.UPDATED, newObject, oldObject); + propagateEvent(newObject); } @Override public void onDelete(T t, boolean b) { - propagateEvent(ResourceAction.DELETED, t, null); + propagateEvent(t); } }); } - private void propagateEvent(ResourceAction action, T object, T oldObject) { + private void propagateEvent(T object) { var uids = resourceToUIDs.apply(object); if (uids.isEmpty()) { return; } uids.forEach(uid -> { - InformerEvent event = new InformerEvent(action, object, oldObject); + DefaultEvent event = new DefaultEvent(CustomResourceID.fromResource(object)); this.eventHandler.handleEvent(event); }); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEvent.java deleted file mode 100644 index 9e9bfb5040..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEvent.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.internal; - -import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -import io.javaoperatorsdk.operator.processing.event.DefaultEvent; - -public class TimerEvent extends DefaultEvent { - - public TimerEvent(CustomResourceID relatedCustomResourceUid) { - super(relatedCustomResourceUid); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java index d4638f5c3b..5015df4281 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.AbstractEventSource; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.DefaultEvent; public class TimerEventSource> extends AbstractEventSource { private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class); @@ -94,7 +95,7 @@ public EventProducerTimeTask(CustomResourceID customResourceUid) { public void run() { if (running.get()) { log.debug("Producing event for custom resource id: {}", customResourceUid); - eventHandler.handleEvent(new TimerEvent(customResourceUid)); + eventHandler.handleEvent(new DefaultEvent(customResourceUid)); } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 023d589e03..1ecc551951 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -13,11 +13,11 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.DefaultEvent; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -26,13 +26,7 @@ import static io.javaoperatorsdk.operator.processing.event.internal.ResourceAction.DELETED; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; class DefaultEventHandlerTest { @@ -41,6 +35,7 @@ class DefaultEventHandlerTest { public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; public static final int SEPARATE_EXECUTION_TIMEOUT = 450; public static final String TEST_NAMESPACE = "default-event-handler-test"; + private EventMarker eventMarker = new EventMarker(); private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); @@ -49,11 +44,11 @@ class DefaultEventHandlerTest { private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", null); + new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", null, eventMarker); private DefaultEventHandler defaultEventHandlerWithRetry = new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", - GenericRetry.defaultLimitedExponentialRetry()); + GenericRetry.defaultLimitedExponentialRetry(), eventMarker); @BeforeEach public void setup() { @@ -91,43 +86,11 @@ public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedExcep .handleExecution(any()); } - @Test - public void buffersAllIncomingEventsWhileControllerInExecution() { - CustomResourceID resourceUid = eventAlreadyUnderProcessing(); - - defaultEventHandler.handleEvent(nonCREvent(resourceUid)); - defaultEventHandler.handleEvent(prepareCREvent(resourceUid)); - - ArgumentCaptor captor = ArgumentCaptor.forClass(ExecutionScope.class); - verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) - .handleExecution(captor.capture()); - List events = captor.getAllValues().get(1).getEvents(); - assertThat(events).hasSize(2); - assertThat(events.get(0)).isInstanceOf(TimerEvent.class); - assertThat(events.get(1)).isInstanceOf(CustomResourceEvent.class); - } - - @Test - public void cleanUpAfterDeleteEvent() { - TestCustomResource customResource = testCustomResource(); - when(resourceCache.getCustomResource(CustomResourceID.fromResource(customResource))) - .thenReturn(Optional.of(customResource)); - CustomResourceEvent event = - new CustomResourceEvent(DELETED, customResource); - - defaultEventHandler.handleEvent(event); - - waitMinimalTime(); - verify(defaultEventSourceManagerMock, times(1)) - .cleanupForCustomResource(CustomResourceID.fromResource(customResource)); - } - @Test public void schedulesAnEventRetryOnException() { - Event event = prepareCREvent(); TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(List.of(event), customResource, null); + ExecutionScope executionScope = new ExecutionScope(customResource, null); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); @@ -160,7 +123,6 @@ public void executesTheControllerInstantlyAfterErrorIfEventsBuffered() { .handleExecution(executionScopeArgumentCaptor.capture()); List allValues = executionScopeArgumentCaptor.getAllValues(); assertThat(allValues).hasSize(2); - assertThat(allValues.get(1).getEvents()).hasSize(2); verify(retryTimerEventSourceMock, never()) .scheduleOnce(eq(customResource), eq(GenericRetry.DEFAULT_INITIAL_INTERVAL)); } @@ -238,12 +200,29 @@ public void doNotFireEventsIfClosing() { verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); } - private void waitMinimalTime() { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } + @Test + public void cleansUpWhenDeleteEventReceivedAndNoEventPresent() { + Event deleteEvent = + new CustomResourceEvent(DELETED, prepareCREvent().getRelatedCustomResourceID()); + + defaultEventHandler.handleEvent(deleteEvent); + + verify(defaultEventSourceManagerMock, times(1)) + .cleanupForCustomResource(eq(deleteEvent.getRelatedCustomResourceID())); + } + + @Test + public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() { + var cr = testCustomResource(new CustomResourceID(UUID.randomUUID().toString())); + var crEvent = prepareCREvent(CustomResourceID.fromResource(cr)); + eventMarker.markDeleteEventReceived(crEvent.getRelatedCustomResourceID()); + var executionScope = new ExecutionScope(cr, null); + + defaultEventHandler.eventProcessingFinished(executionScope, + PostExecutionControl.defaultDispatch()); + + verify(defaultEventSourceManagerMock, times(1)) + .cleanupForCustomResource(eq(crEvent.getRelatedCustomResourceID())); } private CustomResourceID eventAlreadyUnderProcessing() { @@ -265,11 +244,12 @@ private CustomResourceEvent prepareCREvent() { private CustomResourceEvent prepareCREvent(CustomResourceID uid) { TestCustomResource customResource = testCustomResource(uid); when(resourceCache.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource)); - return new CustomResourceEvent(ResourceAction.UPDATED, customResource); + return new CustomResourceEvent(ResourceAction.UPDATED, + CustomResourceID.fromResource(customResource)); } private Event nonCREvent(CustomResourceID relatedCustomResourceUid) { - return new TimerEvent(relatedCustomResourceUid); + return new DefaultEvent(relatedCustomResourceUid); } private void overrideData(CustomResourceID id, CustomResource applyTo) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventBufferTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventBufferTest.java deleted file mode 100644 index 6dc16f8a69..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventBufferTest.java +++ /dev/null @@ -1,60 +0,0 @@ -package io.javaoperatorsdk.operator.processing; - -import java.util.List; -import java.util.UUID; - -import org.junit.jupiter.api.Test; - -import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -import io.javaoperatorsdk.operator.processing.event.Event; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; - -import static org.assertj.core.api.Assertions.assertThat; - -class EventBufferTest { - - private EventBuffer eventBuffer = new EventBuffer(); - - String name = UUID.randomUUID().toString(); - CustomResourceID customResourceID = new CustomResourceID(name); - Event testEvent1 = new TimerEvent(customResourceID); - Event testEvent2 = new TimerEvent(customResourceID); - - @Test - public void storesEvents() { - eventBuffer.addEvent(testEvent1); - eventBuffer.addEvent(testEvent2); - - assertThat(eventBuffer.containsEvents(testEvent1.getRelatedCustomResourceID())).isTrue(); - List events = eventBuffer.getAndRemoveEventsForExecution(customResourceID); - assertThat(events).hasSize(2); - } - - @Test - public void getsAndRemovesEvents() { - eventBuffer.addEvent(testEvent1); - eventBuffer.addEvent(testEvent2); - - List events = eventBuffer.getAndRemoveEventsForExecution(new CustomResourceID(name)); - assertThat(events).hasSize(2); - assertThat(events).contains(testEvent1, testEvent2); - } - - @Test - public void checksIfThereAreStoredEvents() { - eventBuffer.addEvent(testEvent1); - eventBuffer.addEvent(testEvent2); - - assertThat(eventBuffer.containsEvents(testEvent1.getRelatedCustomResourceID())).isTrue(); - } - - @Test - public void canClearEvents() { - eventBuffer.addEvent(testEvent1); - eventBuffer.addEvent(testEvent2); - - eventBuffer.cleanup(customResourceID); - - assertThat(eventBuffer.containsEvents(testEvent1.getRelatedCustomResourceID())).isFalse(); - } -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java index 00693e2f34..b709c026e1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java @@ -19,6 +19,7 @@ import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; @@ -267,7 +268,6 @@ void propagatesRetryInfoToContextIfFinalizerSet() { eventDispatcher.handleExecution( new ExecutionScope( - List.of(), testCustomResource, new RetryInfo() { @Override @@ -330,10 +330,11 @@ private void removeFinalizers(CustomResource customResource) { public ExecutionScope executionScopeWithCREvent( ResourceAction action, CustomResource resource, Event... otherEvents) { - CustomResourceEvent event = new CustomResourceEvent(action, resource); + CustomResourceEvent event = + new CustomResourceEvent(action, CustomResourceID.fromResource(resource)); List eventList = new ArrayList<>(1 + otherEvents.length); eventList.add(event); eventList.addAll(Arrays.asList(otherEvents)); - return new ExecutionScope(eventList, resource, null); + return new ExecutionScope(resource, null); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventMarkerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventMarkerTest.java new file mode 100644 index 0000000000..98b87f1713 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventMarkerTest.java @@ -0,0 +1,67 @@ +package io.javaoperatorsdk.operator.processing; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; + +import static org.assertj.core.api.Assertions.assertThat; + +class EventMarkerTest { + + private final EventMarker eventMarker = new EventMarker(); + private CustomResourceID sampleCustomResourceID = new CustomResourceID("test-name"); + + @Test + public void returnsNoEventPresentIfNotMarkedYet() { + assertThat(eventMarker.noEventPresent(sampleCustomResourceID)).isTrue(); + } + + @Test + public void marksEvent() { + eventMarker.markEventReceived(sampleCustomResourceID); + + assertThat(eventMarker.eventPresent(sampleCustomResourceID)).isTrue(); + assertThat(eventMarker.deleteEventPresent(sampleCustomResourceID)).isFalse(); + } + + @Test + public void marksDeleteEvent() { + eventMarker.markDeleteEventReceived(sampleCustomResourceID); + + assertThat(eventMarker.deleteEventPresent(sampleCustomResourceID)) + .isTrue(); + assertThat(eventMarker.eventPresent(sampleCustomResourceID)).isFalse(); + } + + @Test + public void afterDeleteEventMarkEventIsNotRelevant() { + eventMarker.markEventReceived(sampleCustomResourceID); + + eventMarker.markDeleteEventReceived(sampleCustomResourceID); + + assertThat(eventMarker.deleteEventPresent(sampleCustomResourceID)) + .isTrue(); + assertThat(eventMarker.eventPresent(sampleCustomResourceID)).isFalse(); + } + + @Test + public void cleansUp() { + eventMarker.markEventReceived(sampleCustomResourceID); + eventMarker.markDeleteEventReceived(sampleCustomResourceID); + + eventMarker.cleanup(sampleCustomResourceID); + + assertThat(eventMarker.deleteEventPresent(sampleCustomResourceID)).isFalse(); + assertThat(eventMarker.eventPresent(sampleCustomResourceID)).isFalse(); + } + + @Test + public void cannotMarkEventAfterDeleteEventReceived() { + Assertions.assertThrows(IllegalStateException.class, () -> { + eventMarker.markDeleteEventReceived(sampleCustomResourceID); + eventMarker.markEventReceived(sampleCustomResourceID); + }); + } + +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventListTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventListTest.java deleted file mode 100644 index dc65981cff..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventListTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import java.util.Arrays; - -import org.junit.jupiter.api.Test; - -import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; - -class EventListTest { - - @Test - public void returnsLatestOfEventType() { - TimerEvent event2 = new TimerEvent(new CustomResourceID("name1")); - EventList eventList = - new EventList( - Arrays.asList(mock(Event.class), new TimerEvent(new CustomResourceID("name2")), event2, - mock(Event.class))); - - assertThat(eventList.getLatestOfType(TimerEvent.class).get()).isEqualTo(event2); - } -} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java index 27b651c9fb..33e590f75f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java @@ -41,7 +41,6 @@ public UpdateControl createOrUpdateResource( timerEventSource.schedule(resource, TIMER_DELAY, TIMER_PERIOD); - log.info("Events:: " + context.getEvents()); numberOfExecutions.addAndGet(1); ensureStatusExists(resource); resource.getStatus().setState(EventSourceTestCustomResourceStatus.State.SUCCESS);