From f8e28ffff91de50c1d41dec513572e20e3e5c45c Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Dec 2021 11:07:03 +0100 Subject: [PATCH 1/5] fix: resource cache interface for InformerEventSource --- operator-framework-core/pom.xml | 15 -------- .../event/source/CachingEventSource.java | 18 ++++----- .../inbound/CachingInboundEventSource.java | 6 --- .../source/informer/InformerEventSource.java | 7 ++++ .../informer/InformerResourceCache.java | 38 +++++++++++++++++++ .../PerResourcePollingEventSource.java | 9 ++--- .../source/polling/PollingEventSource.java | 10 ++--- .../event/source/CachingEventSourceTest.java | 18 +-------- .../PerResourcePollingEventSourceTest.java | 17 +-------- .../polling/PollingEventSourceTest.java | 14 +------ pom.xml | 17 --------- sample-operators/mysql-schema/pom.xml | 13 ------- .../sample/MySQLSchemaReconciler.java | 14 +------ 13 files changed, 63 insertions(+), 133 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java diff --git a/operator-framework-core/pom.xml b/operator-framework-core/pom.xml index eafb9ec1b1..47fc3f9881 100644 --- a/operator-framework-core/pom.xml +++ b/operator-framework-core/pom.xml @@ -105,20 +105,5 @@ awaitility test - - javax.cache - cache-api - ${jcache.version} - - - com.github.ben-manes.caffeine - caffeine - test - - - com.github.ben-manes.caffeine - jcache - test - diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java index 9a2be41a70..31d794f7f1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java @@ -1,8 +1,9 @@ package io.javaoperatorsdk.operator.processing.event.source; +import java.util.Collections; +import java.util.Map; import java.util.Optional; - -import javax.cache.Cache; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,13 +26,9 @@ */ public abstract class CachingEventSource extends LifecycleAwareEventSource { - private static final Logger log = LoggerFactory.getLogger(CachingEventSource.class); - - protected Cache cache; + protected Map cache = new ConcurrentHashMap<>(); - public CachingEventSource(Cache cache) { - this.cache = cache; - } + public CachingEventSource() {} protected void handleDelete(ResourceID relatedResourceID) { if (!isRunning()) { @@ -56,8 +53,8 @@ protected void handleEvent(T value, ResourceID relatedResourceID) { } } - public Cache getCache() { - return cache; + public Map getCache() { + return Collections.unmodifiableMap(cache); } public Optional getCachedValue(ResourceID resourceID) { @@ -67,6 +64,5 @@ public Optional getCachedValue(ResourceID resourceID) { @Override public void stop() throws OperatorException { super.stop(); - cache.close(); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java index 51d1ed287d..308b4a36de 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java @@ -1,16 +1,10 @@ package io.javaoperatorsdk.operator.processing.event.source.inbound; -import javax.cache.Cache; - import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; public class CachingInboundEventSource extends CachingEventSource { - public CachingInboundEventSource(Cache cache) { - super(cache); - } - public void handleResourceEvent(T resource, ResourceID relatedResourceID) { super.handleEvent(resource, relatedResourceID); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 8095289221..197c67f457 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -4,6 +4,7 @@ import java.util.Set; import java.util.function.Function; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ public class InformerEventSource extends AbstractEventSou private final Function> secondaryToPrimaryResourcesIdSet; private final Function associatedWith; private final boolean skipUpdateEventPropagationIfNoChange; + private final InformerResourceCache informerResourceCache; public InformerEventSource(SharedInformer sharedInformer, Function> resourceToTargetResourceIDSet) { @@ -48,6 +50,7 @@ public InformerEventSource(SharedInformer sharedInformer, Function associatedWith, boolean skipUpdateEventPropagationIfNoChange) { this.sharedInformer = sharedInformer; + this.informerResourceCache = new InformerResourceCache<>(sharedInformer); this.secondaryToPrimaryResourcesIdSet = resourceToTargetResourceIDSet; this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange; if (sharedInformer.isRunning()) { @@ -103,6 +106,10 @@ private void propagateEvent(T object) { }); } + public ResourceCache getResourceCache() { + return informerResourceCache; + } + @Override public void start() { sharedInformer.run(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java new file mode 100644 index 0000000000..fe41dcd603 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java @@ -0,0 +1,38 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.SharedInformer; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; + +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +public class InformerResourceCache implements ResourceCache { + + private final SharedInformer sharedInformer; + + public InformerResourceCache(SharedInformer sharedInformer) { + this.sharedInformer = sharedInformer; + } + + @Override + public Optional get(ResourceID resourceID) { + return Optional.ofNullable(sharedInformer.getStore() + .getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), + resourceID.getName()))); + } + + @Override + public Stream list(Predicate predicate) { + return sharedInformer.getStore().list().stream().filter(predicate); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + return sharedInformer.getStore().list().stream() + .filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v)); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index bf9b41cf0e..61b76a5863 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -7,8 +7,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; -import javax.cache.Cache; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +42,13 @@ public class PerResourcePollingEventSource private final long period; public PerResourcePollingEventSource(ResourceSupplier resourceSupplier, - ResourceCache resourceCache, long period, Cache cache) { - this(resourceSupplier, resourceCache, period, cache, null); + ResourceCache resourceCache, long period) { + this(resourceSupplier, resourceCache, period, null); } public PerResourcePollingEventSource(ResourceSupplier resourceSupplier, - ResourceCache resourceCache, long period, Cache cache, + ResourceCache resourceCache, long period, Predicate registerPredicate) { - super(cache); this.resourceSupplier = resourceSupplier; this.resourceCache = resourceCache; this.period = period; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java index b2c3fdff78..c979d35558 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java @@ -2,9 +2,6 @@ import java.util.*; import java.util.function.Supplier; -import java.util.stream.StreamSupport; - -import javax.cache.Cache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,8 +19,7 @@ public class PollingEventSource extends CachingEventSource { private final long period; public PollingEventSource(Supplier> supplier, - long period, Cache cache) { - super(cache); + long period) { this.supplierToPoll = supplier; this.period = period; } @@ -46,8 +42,8 @@ public void run() { protected void getStateAndFillCache() { var values = supplierToPoll.get(); values.forEach((k, v) -> super.handleEvent(v, k)); - StreamSupport.stream(cache.spliterator(), false) - .filter(e -> !values.containsKey(e.getKey())).map(Cache.Entry::getKey) + cache.keySet().stream() + .filter(e -> !values.containsKey(e)) .forEach(super::handleDelete); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSourceTest.java index 15fcca7253..7c264aa4ad 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSourceTest.java @@ -1,19 +1,11 @@ package io.javaoperatorsdk.operator.processing.event.source; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.spi.CachingProvider; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider; import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*; import static org.assertj.core.api.Assertions.assertThat; @@ -22,16 +14,11 @@ class CachingEventSourceTest { private CachingEventSource cachingEventSource; - private Cache cache; private EventHandler eventHandlerMock = mock(EventHandler.class); @BeforeEach public void setup() { - CachingProvider cachingProvider = new CaffeineCachingProvider(); - CacheManager cacheManager = cachingProvider.getCacheManager(); - cache = cacheManager.createCache("test-caching", new MutableConfiguration<>()); - - cachingEventSource = new SimpleCachingEventSource(cache); + cachingEventSource = new SimpleCachingEventSource(); cachingEventSource.setEventHandler(eventHandlerMock); cachingEventSource.start(); } @@ -89,9 +76,6 @@ public void noEventOnDeleteIfResourceWasNotInCacheBefore() { public static class SimpleCachingEventSource extends CachingEventSource { - public SimpleCachingEventSource(Cache cache) { - super(cache); - } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java index 6eb4b98e3b..ddf1d505fd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java @@ -2,11 +2,6 @@ import java.util.Optional; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.spi.CachingProvider; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -17,10 +12,7 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; -import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider; - import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -32,22 +24,17 @@ class PerResourcePollingEventSourceTest { private PerResourcePollingEventSource.ResourceSupplier supplier = mock(PerResourcePollingEventSource.ResourceSupplier.class); private ResourceCache resourceCache = mock(ResourceCache.class); - private Cache cache; private EventHandler eventHandler = mock(EventHandler.class); private TestCustomResource testCustomResource = TestUtils.testCustomResource(); @BeforeEach public void setup() { - CachingProvider cachingProvider = new CaffeineCachingProvider(); - CacheManager cacheManager = cachingProvider.getCacheManager(); - cache = cacheManager.createCache("test-caching", new MutableConfiguration<>()); - when(resourceCache.get(any())).thenReturn(Optional.of(testCustomResource)); when(supplier.getResources(any())) .thenReturn(Optional.of(SampleExternalResource.testResource1())); pollingEventSource = - new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, cache); + new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD); pollingEventSource.setEventHandler(eventHandler); } @@ -63,7 +50,7 @@ public void pollsTheResourceAfterAwareOfIt() throws InterruptedException { @Test public void registeringTaskOnAPredicate() throws InterruptedException { - pollingEventSource = new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, cache, + pollingEventSource = new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1); pollingEventSource.setEventHandler(eventHandler); pollingEventSource.start(); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java index 177760f94e..3a0c39243e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java @@ -4,11 +4,6 @@ import java.util.Map; import java.util.function.Supplier; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.spi.CachingProvider; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -17,8 +12,6 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource; -import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider; - import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*; import static org.mockito.Mockito.*; @@ -26,16 +19,11 @@ class PollingEventSourceTest { private PollingEventSource pollingEventSource; private Supplier> supplier = mock(Supplier.class); - private Cache cache; private EventHandler eventHandler = mock(EventHandler.class); @BeforeEach public void setup() { - CachingProvider cachingProvider = new CaffeineCachingProvider(); - CacheManager cacheManager = cachingProvider.getCacheManager(); - cache = cacheManager.createCache("test-caching", new MutableConfiguration<>()); - - pollingEventSource = new PollingEventSource<>(supplier, 50, cache); + pollingEventSource = new PollingEventSource<>(supplier, 50); pollingEventSource.setEventHandler(eventHandler); } diff --git a/pom.xml b/pom.xml index 4248fca4ce..b708b6b6be 100644 --- a/pom.xml +++ b/pom.xml @@ -70,8 +70,6 @@ 2.17.1 1.0 1.6.2 - 1.1.1 - 3.0.4 @@ -170,21 +168,6 @@ operator-framework ${project.version} - - javax.cache - cache-api - ${jcache.version} - - - com.github.ben-manes.caffeine - caffeine - ${caffein.version} - - - com.github.ben-manes.caffeine - jcache - ${caffein.version} - diff --git a/sample-operators/mysql-schema/pom.xml b/sample-operators/mysql-schema/pom.xml index f5df325685..8693cf6154 100644 --- a/sample-operators/mysql-schema/pom.xml +++ b/sample-operators/mysql-schema/pom.xml @@ -67,19 +67,6 @@ jackson-dataformat-yaml 2.13.0 - - javax.cache - cache-api - ${jcache.version} - - - com.github.ben-manes.caffeine - caffeine - - - com.github.ben-manes.caffeine - jcache - diff --git a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java index 33b8df52df..7ecd883c01 100644 --- a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java +++ b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java @@ -6,11 +6,6 @@ import java.util.Base64; import java.util.Optional; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.spi.CachingProvider; - import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +21,6 @@ import io.javaoperatorsdk.operator.sample.schema.Schema; import io.javaoperatorsdk.operator.sample.schema.SchemaService; -import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider; - import static java.lang.String.format; @ControllerConfiguration @@ -50,15 +43,10 @@ public MySQLSchemaReconciler(KubernetesClient kubernetesClient, MySQLDbConfig my @Override public void prepareEventSources(EventSourceRegistry eventSourceRegistry) { - CachingProvider cachingProvider = new CaffeineCachingProvider(); - CacheManager cacheManager = cachingProvider.getCacheManager(); - Cache schemaCache = - cacheManager.createCache("schema-cache", new MutableConfiguration<>()); perResourcePollingEventSource = new PerResourcePollingEventSource<>(new SchemaPollingResourceSupplier(mysqlDbConfig), - eventSourceRegistry.getControllerResourceEventSource().getResourceCache(), POLL_PERIOD, - schemaCache); + eventSourceRegistry.getControllerResourceEventSource().getResourceCache(), POLL_PERIOD); eventSourceRegistry.registerEventSource(perResourcePollingEventSource); } From c6af17008383bd62785eacb4b7ed0018e3cd4d6a Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Dec 2021 11:20:45 +0100 Subject: [PATCH 2/5] fix: formatting --- .../event/source/CachingEventSource.java | 3 -- .../source/informer/InformerEventSource.java | 2 +- .../informer/InformerResourceCache.java | 54 +++++++++---------- 3 files changed, 28 insertions(+), 31 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java index 31d794f7f1..cf0e709253 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java @@ -5,9 +5,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.ResourceID; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 197c67f457..d2d5d358b6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -4,7 +4,6 @@ import java.util.Set; import java.util.function.Function; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +16,7 @@ import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; public class InformerEventSource extends AbstractEventSource { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java index fe41dcd603..bdb3a1a910 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java @@ -1,38 +1,38 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.informers.SharedInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; -import java.util.Optional; -import java.util.function.Predicate; -import java.util.stream.Stream; - public class InformerResourceCache implements ResourceCache { - private final SharedInformer sharedInformer; - - public InformerResourceCache(SharedInformer sharedInformer) { - this.sharedInformer = sharedInformer; - } - - @Override - public Optional get(ResourceID resourceID) { - return Optional.ofNullable(sharedInformer.getStore() - .getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), - resourceID.getName()))); - } - - @Override - public Stream list(Predicate predicate) { - return sharedInformer.getStore().list().stream().filter(predicate); - } - - @Override - public Stream list(String namespace, Predicate predicate) { - return sharedInformer.getStore().list().stream() - .filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v)); - } + private final SharedInformer sharedInformer; + + public InformerResourceCache(SharedInformer sharedInformer) { + this.sharedInformer = sharedInformer; + } + + @Override + public Optional get(ResourceID resourceID) { + return Optional.ofNullable(sharedInformer.getStore() + .getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), + resourceID.getName()))); + } + + @Override + public Stream list(Predicate predicate) { + return sharedInformer.getStore().list().stream().filter(predicate); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + return sharedInformer.getStore().list().stream() + .filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v)); + } } From ac259ad6023f958e35d73b60b4e00eee06911142 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Dec 2021 11:24:58 +0100 Subject: [PATCH 3/5] refactor: ResourceCache directly for InformerEventSource --- .../source/informer/InformerEventSource.java | 30 +++++++++++---- .../informer/InformerResourceCache.java | 38 ------------------- 2 files changed, 23 insertions(+), 45 deletions(-) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index d2d5d358b6..07d1e20f54 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,8 +1,11 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +21,8 @@ import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; -public class InformerEventSource extends AbstractEventSource { +public class InformerEventSource extends AbstractEventSource + implements ResourceCache { private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); @@ -26,7 +30,6 @@ public class InformerEventSource extends AbstractEventSou private final Function> secondaryToPrimaryResourcesIdSet; private final Function associatedWith; private final boolean skipUpdateEventPropagationIfNoChange; - private final InformerResourceCache informerResourceCache; public InformerEventSource(SharedInformer sharedInformer, Function> resourceToTargetResourceIDSet) { @@ -50,7 +53,6 @@ public InformerEventSource(SharedInformer sharedInformer, Function associatedWith, boolean skipUpdateEventPropagationIfNoChange) { this.sharedInformer = sharedInformer; - this.informerResourceCache = new InformerResourceCache<>(sharedInformer); this.secondaryToPrimaryResourcesIdSet = resourceToTargetResourceIDSet; this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange; if (sharedInformer.isRunning()) { @@ -106,10 +108,6 @@ private void propagateEvent(T object) { }); } - public ResourceCache getResourceCache() { - return informerResourceCache; - } - @Override public void start() { sharedInformer.run(); @@ -139,4 +137,22 @@ public T getAssociated(HasMetadata resource) { public SharedInformer getSharedInformer() { return sharedInformer; } + + @Override + public Optional get(ResourceID resourceID) { + return Optional.ofNullable(sharedInformer.getStore() + .getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), + resourceID.getName()))); + } + + @Override + public Stream list(Predicate predicate) { + return sharedInformer.getStore().list().stream().filter(predicate); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + return sharedInformer.getStore().list().stream() + .filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v)); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java deleted file mode 100644 index bdb3a1a910..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import java.util.Optional; -import java.util.function.Predicate; -import java.util.stream.Stream; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.informers.SharedInformer; -import io.fabric8.kubernetes.client.informers.cache.Cache; -import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache; - -public class InformerResourceCache implements ResourceCache { - - private final SharedInformer sharedInformer; - - public InformerResourceCache(SharedInformer sharedInformer) { - this.sharedInformer = sharedInformer; - } - - @Override - public Optional get(ResourceID resourceID) { - return Optional.ofNullable(sharedInformer.getStore() - .getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), - resourceID.getName()))); - } - - @Override - public Stream list(Predicate predicate) { - return sharedInformer.getStore().list().stream().filter(predicate); - } - - @Override - public Stream list(String namespace, Predicate predicate) { - return sharedInformer.getStore().list().stream() - .filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v)); - } -} From f849f19559bfafa9b7b6bebf7425a8b5b4ebaef1 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Dec 2021 12:18:02 +0100 Subject: [PATCH 4/5] fix: lock for sure --- .../event/source/CachingEventSource.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java index cf0e709253..0ff4eea11b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java @@ -4,6 +4,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.Event; @@ -24,6 +25,7 @@ public abstract class CachingEventSource extends LifecycleAwareEventSource { protected Map cache = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); public CachingEventSource() {} @@ -43,10 +45,15 @@ protected void handleEvent(T value, ResourceID relatedResourceID) { if (!isRunning()) { return; } - var cachedValue = cache.get(relatedResourceID); - if (cachedValue == null || !cachedValue.equals(value)) { - cache.put(relatedResourceID, value); - eventHandler.handleEvent(new Event(relatedResourceID)); + lock.lock(); + try { + var cachedValue = cache.get(relatedResourceID); + if (cachedValue == null || !cachedValue.equals(value)) { + cache.put(relatedResourceID, value); + eventHandler.handleEvent(new Event(relatedResourceID)); + } + } finally { + lock.unlock(); } } From 09f56505a80e49724ec4def206352acb68104542 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Dec 2021 13:02:32 +0100 Subject: [PATCH 5/5] fix: remove lock --- .../event/source/CachingEventSource.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java index 0ff4eea11b..cf0e709253 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java @@ -4,7 +4,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.Event; @@ -25,7 +24,6 @@ public abstract class CachingEventSource extends LifecycleAwareEventSource { protected Map cache = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); public CachingEventSource() {} @@ -45,15 +43,10 @@ protected void handleEvent(T value, ResourceID relatedResourceID) { if (!isRunning()) { return; } - lock.lock(); - try { - var cachedValue = cache.get(relatedResourceID); - if (cachedValue == null || !cachedValue.equals(value)) { - cache.put(relatedResourceID, value); - eventHandler.handleEvent(new Event(relatedResourceID)); - } - } finally { - lock.unlock(); + var cachedValue = cache.get(relatedResourceID); + if (cachedValue == null || !cachedValue.equals(value)) { + cache.put(relatedResourceID, value); + eventHandler.handleEvent(new Event(relatedResourceID)); } }