From 48e002649e79f5244f28dff3327f15df997befc1 Mon Sep 17 00:00:00 2001 From: utkarsh Date: Sat, 24 Aug 2024 19:07:02 +0530 Subject: [PATCH 1/2] fix:passing changed flags in configuration change event Signed-off-by: utkarsh --- .../providers/flagd/FlagdProvider.java | 11 ++- .../flagd/resolver/grpc/GrpcConnector.java | 13 ++-- .../flagd/resolver/grpc/GrpcResolver.java | 3 +- .../resolver/process/InProcessResolver.java | 37 +++++----- .../resolver/process/model/FeatureFlag.java | 2 + .../resolver/process/storage/FlagStore.java | 38 ++++++++-- .../resolver/process/storage/Storage.java | 2 +- .../process/storage/StorageStateDTO.java | 27 +++++++ .../providers/flagd/FlagdProviderTest.java | 12 ++- .../resolver/grpc/GrpcConnectorTest.java | 6 +- .../process/InProcessResolverTest.java | 73 ++++++++++++------- .../flagd/resolver/process/MockStorage.java | 9 ++- .../process/storage/FlagStoreTest.java | 44 +++++++++-- 13 files changed, 195 insertions(+), 82 deletions(-) create mode 100644 providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 24f614656..70e95b0af 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -4,6 +4,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; +import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.FeatureProvider; @@ -14,6 +15,7 @@ import dev.openfeature.sdk.Value; import lombok.extern.slf4j.Slf4j; +import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -142,7 +144,7 @@ private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) { return clientCallCtx; } - private void setState(ProviderState newState) { + private void setState(ProviderState newState, List changedFlagsKeys) { ProviderState oldState; Lock l = this.lock.writeLock(); try { @@ -152,17 +154,17 @@ private void setState(ProviderState newState) { } finally { l.unlock(); } - this.handleStateTransition(oldState, newState); + this.handleStateTransition(oldState, newState, changedFlagsKeys); } - private void handleStateTransition(ProviderState oldState, ProviderState newState) { + private void handleStateTransition(ProviderState oldState, ProviderState newState, List changedFlagKeys) { // we got initialized if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) { // nothing to do, the SDK emits the events log.debug("Init completed"); return; } - // we got shutdown, not checking oldState as behavior remains the same for shutdown + // we got shutdown, not checking oldState as behavior remains the same for shutdown if (ProviderState.NOT_READY.equals(newState)) { // nothing to do log.debug("shutdown completed"); @@ -172,6 +174,7 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) { log.debug("Configuration changed"); ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build(); + details.setFlagsChanged(changedFlagKeys); this.emitProviderConfigurationChanged(details); return; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java index 290ba3669..092259dee 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java @@ -1,10 +1,11 @@ package dev.openfeature.contrib.providers.flagd.resolver.grpc; +import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; - +import java.util.function.BiConsumer; import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.Util; @@ -37,7 +38,7 @@ public class GrpcConnector { private final long deadline; private final Cache cache; - private final Consumer stateConsumer; + private final BiConsumer> stateConsumer; private int eventStreamAttempt = 1; private int eventStreamRetryBackoff; @@ -52,7 +53,7 @@ public class GrpcConnector { * @param cache cache to use. * @param stateConsumer lambda to call for setting the state. */ - public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer stateConsumer) { + public GrpcConnector(final FlagdOptions options, final Cache cache, BiConsumer> stateConsumer) { this.channel = ChannelBuilder.nettyChannel(options); this.serviceStub = ServiceGrpc.newStub(channel); this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel); @@ -100,7 +101,7 @@ public void shutdown() throws Exception { this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS); log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline)); } - this.stateConsumer.accept(ProviderState.NOT_READY); + this.stateConsumer.accept(ProviderState.NOT_READY, Collections.emptyList()); } } @@ -162,6 +163,6 @@ private void grpcStateConsumer(final ProviderState state) { } // chain to initiator - this.stateConsumer.accept(state); + this.stateConsumer.accept(state, Collections.emptyList()); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java index 9879d6c55..a8cd77ec9 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -62,7 +63,7 @@ public final class GrpcResolver implements Resolver { * @param stateConsumer lambda to communicate back the state. */ public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier stateSupplier, - final Consumer stateConsumer) { + final BiConsumer> stateConsumer) { this.cache = cache; this.stateSupplier = stateSupplier; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e3a525e20..7ddbbd0dc 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -6,7 +6,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector; @@ -23,8 +23,11 @@ import dev.openfeature.sdk.exceptions.TypeMismatchError; import lombok.extern.slf4j.Slf4j; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING; @@ -36,7 +39,7 @@ @Slf4j public class InProcessResolver implements Resolver { private final Storage flagStore; - private final Consumer stateConsumer; + private final BiConsumer> stateConsumer; private final Operator operator; private final long deadline; private final ImmutableMetadata metadata; @@ -45,7 +48,7 @@ public class InProcessResolver implements Resolver { /** * Initialize an in-process resolver. */ - public InProcessResolver(FlagdOptions options, Consumer stateConsumer) { + public InProcessResolver(FlagdOptions options, BiConsumer> stateConsumer) { this.flagStore = new FlagStore(getConnector(options)); this.deadline = options.getDeadline(); this.stateConsumer = stateConsumer; @@ -64,20 +67,20 @@ public void init() throws Exception { final Thread stateWatcher = new Thread(() -> { try { while (true) { - final StorageState storageState = flagStore.getStateQueue().take(); - switch (storageState) { + final StorageStateDTO storageStateDTO = flagStore.getStateQueue().take(); + switch (storageStateDTO.getStorageState()) { case OK: - stateConsumer.accept(ProviderState.READY); + stateConsumer.accept(ProviderState.READY, storageStateDTO.getChangedFlagsKeys()); this.connected.set(true); break; case ERROR: - stateConsumer.accept(ProviderState.ERROR); + stateConsumer.accept(ProviderState.ERROR,null); this.connected.set(false); break; case STALE: // todo set stale state default: - log.info(String.format("Storage emitted unhandled status: %s", storageState)); + log.info(String.format("Storage emitted unhandled status: %s", storageStateDTO.getStorageState())); } } } catch (InterruptedException e) { @@ -100,38 +103,35 @@ public void init() throws Exception { public void shutdown() throws InterruptedException { flagStore.shutdown(); this.connected.set(false); - stateConsumer.accept(ProviderState.NOT_READY); + stateConsumer.accept(ProviderState.NOT_READY,null); } /** * Resolve a boolean flag. */ public ProviderEvaluation booleanEvaluation(String key, Boolean defaultValue, - EvaluationContext ctx) { + EvaluationContext ctx) { return resolve(Boolean.class, key, ctx); } /** * Resolve a string flag. */ - public ProviderEvaluation stringEvaluation(String key, String defaultValue, - EvaluationContext ctx) { + public ProviderEvaluation stringEvaluation(String key, String defaultValue, EvaluationContext ctx) { return resolve(String.class, key, ctx); } /** * Resolve a double flag. */ - public ProviderEvaluation doubleEvaluation(String key, Double defaultValue, - EvaluationContext ctx) { + public ProviderEvaluation doubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) { return resolve(Double.class, key, ctx); } /** * Resolve an integer flag. */ - public ProviderEvaluation integerEvaluation(String key, Integer defaultValue, - EvaluationContext ctx) { + public ProviderEvaluation integerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) { return resolve(Integer.class, key, ctx); } @@ -160,8 +160,7 @@ static Connector getConnector(final FlagdOptions options) { : new GrpcStreamConnector(options); } - private ProviderEvaluation resolve(Class type, String key, - EvaluationContext ctx) { + private ProviderEvaluation resolve(Class type, String key, EvaluationContext ctx) { final FeatureFlag flag = flagStore.getFlag(key); // missing flag diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/model/FeatureFlag.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/model/FeatureFlag.java index 0fb3dcf0b..4e687c369 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/model/FeatureFlag.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/model/FeatureFlag.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import lombok.EqualsAndHashCode; import lombok.Getter; import java.util.Map; @@ -16,6 +17,7 @@ @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "Feature flag comes as a Json configuration, hence they must be parsed and exposed") @JsonIgnoreProperties(ignoreUnknown = true) +@EqualsAndHashCode public class FeatureFlag { public static final String EMPTY_TARGETING_STRING = "{}"; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index ee642cca9..12b0cbc17 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -6,8 +6,8 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import lombok.extern.slf4j.Slf4j; - import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -15,6 +15,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.stream.Collectors; /** * Feature flag storage. @@ -28,7 +29,7 @@ public class FlagStore implements Storage { private final WriteLock writeLock = sync.writeLock(); private final AtomicBoolean shutdown = new AtomicBoolean(false); - private final BlockingQueue stateBlockingQueue = new LinkedBlockingQueue<>(1); + private final BlockingQueue stateBlockingQueue = new LinkedBlockingQueue<>(1); private final Map flags = new HashMap<>(); private final Connector connector; @@ -88,7 +89,7 @@ public FeatureFlag getFlag(final String key) { /** * Retrieve blocking queue to check storage status. */ - public BlockingQueue getStateQueue() { + public BlockingQueue getStateQueue() { return stateBlockingQueue; } @@ -100,27 +101,29 @@ private void streamerListener(final Connector connector) throws InterruptedExcep switch (take.getType()) { case DATA: try { + List changedFlagsKeys; Map flagMap = FlagParser.parseString(take.getData(), throwIfInvalid); writeLock.lock(); try { + changedFlagsKeys = getChangedFlagsKeys(flagMap); flags.clear(); flags.putAll(flagMap); } finally { writeLock.unlock(); } - if (!stateBlockingQueue.offer(StorageState.OK)) { + if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.OK, changedFlagsKeys))) { log.warn("Failed to convey OK satus, queue is full"); } } catch (Throwable e) { // catch all exceptions and avoid stream listener interruptions log.warn("Invalid flag sync payload from connector", e); - if (!stateBlockingQueue.offer(StorageState.STALE)) { + if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.STALE))) { log.warn("Failed to convey STALE satus, queue is full"); } } break; case ERROR: - if (!stateBlockingQueue.offer(StorageState.ERROR)) { + if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.ERROR))) { log.warn("Failed to convey ERROR satus, queue is full"); } break; @@ -132,4 +135,27 @@ private void streamerListener(final Connector connector) throws InterruptedExcep log.info("Shutting down store stream listener"); } + private List getChangedFlagsKeys(Map newFlags) { + Map changedFlags = new HashMap<>(); + Map addedFeatureFlags = new HashMap<>(); + Map removedFeatureFlags = new HashMap<>(); + Map updatedFeatureFlags = new HashMap<>(); + newFlags.forEach((key, value) -> { + if (!flags.containsKey(key)) { + addedFeatureFlags.put(key, value); + } else if (flags.containsKey(key) && !value.equals(flags.get(key))) { + updatedFeatureFlags.put(key, value); + } + }); + flags.forEach((key,value) -> { + if(!newFlags.containsKey(key)) { + removedFeatureFlags.put(key, value); + } + }); + changedFlags.putAll(addedFeatureFlags); + changedFlags.putAll(removedFeatureFlags); + changedFlags.putAll(updatedFeatureFlags); + return changedFlags.keySet().stream().collect(Collectors.toList()); + } + } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java index 32337094e..4589eb1c2 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java @@ -14,5 +14,5 @@ public interface Storage { FeatureFlag getFlag(final String key); - BlockingQueue getStateQueue(); + BlockingQueue getStateQueue(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java new file mode 100644 index 000000000..17de9f934 --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java @@ -0,0 +1,27 @@ +package dev.openfeature.contrib.providers.flagd.resolver.process.storage; + + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +import java.util.Collections; +import java.util.List; + +@Getter +@ToString +@EqualsAndHashCode +public class StorageStateDTO { + private final StorageState storageState; + private final List changedFlagsKeys; + + public StorageStateDTO(StorageState storageState, List changedFlagsKeys) { + this.storageState = storageState; + this.changedFlagsKeys = changedFlagsKeys; + } + + public StorageStateDTO(StorageState storageState) { + this.storageState = storageState; + this.changedFlagsKeys = Collections.emptyList(); + } +} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index fede10600..3b207aa33 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -18,6 +18,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,6 +26,7 @@ import java.util.function.Supplier; import java.util.concurrent.LinkedBlockingQueue; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; @@ -499,7 +501,7 @@ void invalidate_cache() { .thenReturn(serviceStubMock); final Cache cache = new Cache("lru", 5); - grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, state -> { + grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> { }); } @@ -714,7 +716,7 @@ void disabled_cache() { mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(serviceStubMock); - grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, state -> { + grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> { }); } @@ -888,7 +890,7 @@ private FlagdProvider createProvider(GrpcConnector grpc, Supplier private FlagdProvider createProvider(GrpcConnector grpc, Cache cache, Supplier getState) { final FlagdOptions flagdOptions = FlagdOptions.builder().build(); final GrpcResolver grpcResolver = - new GrpcResolver(flagdOptions, cache, getState, (providerState) -> { + new GrpcResolver(flagdOptions, cache, getState, (providerState,changedFlagKeys) -> { }); final FlagdProvider provider = new FlagdProvider(); @@ -916,7 +918,9 @@ private FlagdProvider createInProcessProvider() { .deadline(1000) .build(); final FlagdProvider provider = new FlagdProvider(flagdOptions); - final MockStorage mockStorage = new MockStorage(new HashMap(), new LinkedBlockingQueue(Arrays.asList(StorageState.OK))); + final MockStorage mockStorage = new MockStorage(new HashMap(), + new LinkedBlockingQueue(Arrays.asList(new + StorageStateDTO(StorageState.OK)))); try { final Field flagResolver = FlagdProvider.class.getDeclaredField("flagResolver"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java index 8750f0499..3e360229f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java @@ -58,7 +58,7 @@ void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAcces final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); - final GrpcConnector connector = new GrpcConnector(options, cache, (state) -> { + final GrpcConnector connector = new GrpcConnector(options, cache, (state,changedFlagKeys) -> { }); Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub"); @@ -94,7 +94,7 @@ void initialization_succeed_with_connected_status() throws NoSuchFieldException, final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); - final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state) -> { + final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> { }); Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub"); @@ -118,7 +118,7 @@ void initialization_fail_with_timeout() throws Exception { final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); - final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state) -> { + final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> { }); Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index d6a063c16..c306baaaa 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -5,6 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.MockConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector; import dev.openfeature.sdk.ImmutableContext; @@ -22,12 +23,14 @@ import java.lang.reflect.Field; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import static dev.openfeature.contrib.providers.flagd.resolver.process.MockFlags.BOOLEAN_FLAG; import static dev.openfeature.contrib.providers.flagd.resolver.process.MockFlags.DISABLED_FLAG; @@ -51,7 +54,7 @@ class InProcessResolverTest { public void connectorSetup(){ // given FlagdOptions forGrpcOptions = - FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build(); + FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build(); FlagdOptions forOfflineOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).offlineFlagSourcePath("path").build(); FlagdOptions forCustomConnectorOptions = @@ -67,13 +70,11 @@ public void connectorSetup(){ public void eventHandling() throws Throwable { // given // note - queues with adequate capacity - final BlockingQueue sender = new LinkedBlockingQueue<>(5); + final BlockingQueue sender = new LinkedBlockingQueue<>(5); final BlockingQueue receiver = new LinkedBlockingQueue<>(5); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(new HashMap<>(), sender), - providerState -> { - receiver.offer(providerState); - }); + (providerState, changedFlagKeys) -> receiver.offer(providerState)); // when - init and emit events Thread initThread = new Thread(() -> { @@ -83,10 +84,10 @@ public void eventHandling() throws Throwable { } }); initThread.start(); - if (!sender.offer(StorageState.OK, 100, TimeUnit.MILLISECONDS)) { + if (!sender.offer(new StorageStateDTO(StorageState.OK, Collections.EMPTY_LIST), 100, TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } - if (!sender.offer(StorageState.ERROR, 100, TimeUnit.MILLISECONDS)) { + if (!sender.offer(new StorageStateDTO(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } @@ -106,8 +107,9 @@ public void simpleBooleanResolving() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("booleanFlag", BOOLEAN_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys) -> { + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", false, @@ -125,8 +127,9 @@ public void simpleDoubleResolving() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("doubleFlag", DOUBLE_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys) -> { + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("doubleFlag", 0d, @@ -144,8 +147,9 @@ public void fetchIntegerAsDouble() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("doubleFlag", DOUBLE_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys) -> { + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("doubleFlag", 0, @@ -163,7 +167,8 @@ public void fetchDoubleAsInt() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("integerFlag", INT_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys) -> { }); // when @@ -182,7 +187,8 @@ public void simpleIntResolving() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("integerFlag", INT_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when @@ -201,7 +207,8 @@ public void simpleObjectResolving() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("objectFlag", OBJECT_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); Map typeDefault = new HashMap<>(); @@ -227,7 +234,8 @@ public void missingFlag() throws Exception { // given final Map flagMap = new HashMap<>(); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when/then @@ -243,7 +251,8 @@ public void disabledFlag() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("disabledFlag", DISABLED_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when/then @@ -258,7 +267,8 @@ public void variantMismatchFlag() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when/then @@ -273,7 +283,8 @@ public void typeMismatchEvaluation() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("stringFlag", BOOLEAN_FLAG); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when/then @@ -288,7 +299,8 @@ public void booleanShorthandEvaluation() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false, @@ -306,7 +318,8 @@ public void targetingMatchedEvaluationFlag() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when @@ -325,7 +338,8 @@ public void targetingUnmatchedEvaluationFlag() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when @@ -344,7 +358,8 @@ public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalA final Map flagMap = new HashMap<>(); flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when @@ -363,7 +378,8 @@ public void targetingErrorEvaluationFlag() throws Exception { final Map flagMap = new HashMap<>(); flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState,changedFlagKeys) -> { }); // when/then @@ -396,13 +412,14 @@ public void validateMetadataInEvaluationResult() throws Exception { private InProcessResolver getInProcessResolverWth(final FlagdOptions options, final MockStorage storage) throws NoSuchFieldException, IllegalAccessException { - final InProcessResolver resolver = new InProcessResolver(options, providerState -> {}); + final InProcessResolver resolver = new InProcessResolver(options, (providerState, changedFlagKeys) -> { + }); return injectFlagStore(resolver, storage); } private InProcessResolver getInProcessResolverWth(final MockStorage storage, - final Consumer stateConsumer) + final BiConsumer> stateConsumer) throws NoSuchFieldException, IllegalAccessException { final InProcessResolver resolver = new InProcessResolver( diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java index 04c043bf4..dbbd87d9d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java @@ -2,7 +2,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; import javax.annotation.Nullable; import java.util.Map; @@ -11,9 +11,9 @@ public class MockStorage implements Storage { private final Map mockFlags; - private final BlockingQueue mockQueue; + private final BlockingQueue mockQueue; - public MockStorage(Map mockFlags, BlockingQueue mockQueue) { + public MockStorage(Map mockFlags, BlockingQueue mockQueue) { this.mockFlags = mockFlags; this.mockQueue = mockQueue; } @@ -35,8 +35,9 @@ public FeatureFlag getFlag(String key) { return mockFlags.get(key); } + @Nullable - public BlockingQueue getStateQueue() { + public BlockingQueue getStateQueue() { return mockQueue; } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index 7fd7ab056..4d51b618f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -1,12 +1,17 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; +import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; +import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import org.junit.Assert; import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.INVALID_FLAG; import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_LONG; @@ -25,7 +30,7 @@ public void connectorHandling() throws Exception { FlagStore store = new FlagStore(new MockConnector(payload), true); store.init(); - final BlockingQueue states = store.getStateQueue(); + final BlockingQueue states = store.getStateQueue(); // OK for simple flag assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { @@ -33,7 +38,7 @@ public void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - assertEquals(StorageState.OK, states.take()); + assertEquals(StorageState.OK, states.take().getStorageState()); }); // STALE for invalid flag @@ -42,7 +47,7 @@ public void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - assertEquals(StorageState.STALE, states.take()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); // OK again for next payload @@ -51,7 +56,7 @@ public void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - assertEquals(StorageState.OK, states.take()); + assertEquals(StorageState.OK, states.take().getStorageState()); }); // ERROR is propagated correctly @@ -60,15 +65,42 @@ public void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - assertEquals(StorageState.ERROR, states.take()); + assertEquals(StorageState.ERROR, states.take().getStorageState()); }); // Shutdown handling store.shutdown(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - assertEquals(StorageState.ERROR, states.take()); + assertEquals(StorageState.ERROR, states.take().getStorageState()); }); } + @Test + public void changedFlags() throws Exception { + final int maxDelay = 500; + final BlockingQueue payload = new LinkedBlockingQueue<>(); + FlagStore store = new FlagStore(new MockConnector(payload), true); + store.init(); + final BlockingQueue storageStateDTOS = store.getStateQueue(); + + assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { + payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_SIMPLE))); + }); + // flags changed for first time + assertEquals(FlagParser.parseString( + getFlagsFromResource(VALID_SIMPLE), true).keySet().stream().collect(Collectors.toList()), + storageStateDTOS.take().getChangedFlagsKeys()); + + assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { + payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_LONG))); + }); + Map expectedChangedFlags = + FlagParser.parseString(getFlagsFromResource(VALID_LONG),true); + expectedChangedFlags.remove("myBoolFlag"); + // flags changed from initial VALID_SIMPLE flag + Assert.assertEquals(expectedChangedFlags.keySet().stream().collect(Collectors.toList()), + storageStateDTOS.take().getChangedFlagsKeys()); + } + } From ac5896c39169b318fb0376177eeef15123cd5425 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 16 Sep 2024 15:24:14 -0400 Subject: [PATCH 2/2] fixup: minor fixes, add e2e assertion Signed-off-by: Todd Baert --- providers/flagd/lombok.config | 5 ++ .../providers/flagd/FlagdProvider.java | 8 +- .../resolver/grpc/EventStreamObserver.java | 47 +++++++----- .../flagd/resolver/grpc/GrpcConnector.java | 28 ++++--- .../flagd/resolver/grpc/GrpcResolver.java | 1 - .../resolver/process/InProcessResolver.java | 27 ++++--- .../resolver/process/storage/FlagStore.java | 18 ++--- .../resolver/process/storage/Storage.java | 2 +- ...eStateDTO.java => StorageStateChange.java} | 13 ++-- .../providers/flagd/FlagdProviderTest.java | 7 +- .../e2e/process/FlagdInProcessSetup.java | 1 - .../flagd/e2e/steps/StepDefinitions.java | 10 ++- .../grpc/EventStreamObserverTest.java | 2 +- .../process/InProcessResolverTest.java | 75 +++++++++---------- .../flagd/resolver/process/MockStorage.java | 8 +- .../process/storage/FlagStoreTest.java | 4 +- 16 files changed, 139 insertions(+), 117 deletions(-) create mode 100644 providers/flagd/lombok.config rename providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/{StorageStateDTO.java => StorageStateChange.java} (60%) diff --git a/providers/flagd/lombok.config b/providers/flagd/lombok.config new file mode 100644 index 000000000..bcd1afdae --- /dev/null +++ b/providers/flagd/lombok.config @@ -0,0 +1,5 @@ +# This file is needed to avoid errors throw by findbugs when working with lombok. +lombok.addSuppressWarnings = true +lombok.addLombokGeneratedAnnotation = true +config.stopBubbling = true +lombok.extern.findbugs.addSuppressFBWarnings = true diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 70e95b0af..58b4f2e6b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -4,10 +4,8 @@ import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; -import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; -import dev.openfeature.sdk.FeatureProvider; import dev.openfeature.sdk.Metadata; import dev.openfeature.sdk.ProviderEvaluation; import dev.openfeature.sdk.ProviderEventDetails; @@ -25,7 +23,7 @@ */ @Slf4j @SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"}) -public class FlagdProvider extends EventProvider implements FeatureProvider { +public class FlagdProvider extends EventProvider { private static final String FLAGD_PROVIDER = "flagD Provider"; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -173,8 +171,8 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat // configuration changed if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) { log.debug("Configuration changed"); - ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build(); - details.setFlagsChanged(changedFlagKeys); + ProviderEventDetails details = ProviderEventDetails.builder().flagsChanged(changedFlagKeys) + .message("configuration changed").build(); this.emitProviderConfigurationChanged(details); return; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java index bdcd2da83..b749adfd2 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java @@ -8,8 +8,11 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.function.Consumer; +import java.util.function.BiConsumer; /** * EventStreamObserver handles events emitted by flagd. @@ -17,7 +20,7 @@ @Slf4j @SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects") class EventStreamObserver implements StreamObserver { - private final Consumer stateConsumer; + private final BiConsumer> stateConsumer; private final Object sync; private final Cache cache; @@ -28,11 +31,11 @@ class EventStreamObserver implements StreamObserver { /** * Create a gRPC stream that get notified about flag changes. * - * @param sync synchronization object from caller - * @param cache cache to update - * @param stateConsumer lambda to call for setting the state + * @param sync synchronization object from caller + * @param cache cache to update + * @param stateConsumer lambda to call for setting the state */ - EventStreamObserver(Object sync, Cache cache, Consumer stateConsumer) { + EventStreamObserver(Object sync, Cache cache, BiConsumer> stateConsumer) { this.sync = sync; this.cache = cache; this.stateConsumer = stateConsumer; @@ -58,7 +61,7 @@ public void onError(Throwable t) { if (this.cache.getEnabled()) { this.cache.clear(); } - this.stateConsumer.accept(ProviderState.ERROR); + this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList()); // handle last call of this stream handleEndOfStream(); @@ -69,32 +72,38 @@ public void onCompleted() { if (this.cache.getEnabled()) { this.cache.clear(); } - this.stateConsumer.accept(ProviderState.ERROR); + this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList()); // handle last call of this stream handleEndOfStream(); } private void handleConfigurationChangeEvent(EventStreamResponse value) { - this.stateConsumer.accept(ProviderState.READY); - if (!this.cache.getEnabled()) { - return; - } + List changedFlags = new ArrayList<>(); + boolean cachingEnabled = this.cache.getEnabled(); + Map data = value.getData().getFieldsMap(); Value flagsValue = data.get(FLAGS_KEY); if (flagsValue == null) { - this.cache.clear(); - return; + if (cachingEnabled) { + this.cache.clear(); + } + } else { + Map flags = flagsValue.getStructValue().getFieldsMap(); + this.cache.getEnabled(); + for (String flagKey : flags.keySet()) { + changedFlags.add(flagKey); + if (cachingEnabled) { + this.cache.remove(flagKey); + } + } } - Map flags = flagsValue.getStructValue().getFieldsMap(); - for (String flagKey : flags.keySet()) { - this.cache.remove(flagKey); - } + this.stateConsumer.accept(ProviderState.READY, changedFlags); } private void handleProviderReadyEvent() { - this.stateConsumer.accept(ProviderState.READY); + this.stateConsumer.accept(ProviderState.READY, Collections.emptyList()); if (this.cache.getEnabled()) { this.cache.clear(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java index 092259dee..08de59824 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java @@ -53,7 +53,8 @@ public class GrpcConnector { * @param cache cache to use. * @param stateConsumer lambda to call for setting the state. */ - public GrpcConnector(final FlagdOptions options, final Cache cache, BiConsumer> stateConsumer) { + public GrpcConnector(final FlagdOptions options, final Cache cache, + BiConsumer> stateConsumer) { this.channel = ChannelBuilder.nettyChannel(options); this.serviceStub = ServiceGrpc.newStub(channel); this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel); @@ -81,7 +82,8 @@ public void initialize() throws Exception { /** * Shuts down all gRPC resources. * - * @throws Exception is something goes wrong while terminating the communication. + * @throws Exception is something goes wrong while terminating the + * communication. */ public void shutdown() throws Exception { // first shutdown the event listener @@ -115,12 +117,13 @@ public ServiceGrpc.ServiceBlockingStub getResolver() { } /** - * Event stream observer logic. This contains blocking mechanisms, hence must be run in a dedicated thread. + * Event stream observer logic. This contains blocking mechanisms, hence must be + * run in a dedicated thread. */ private void observeEventStream() { while (this.eventStreamAttempt <= this.maxEventStreamRetries) { - final StreamObserver responseObserver = - new EventStreamObserver(sync, this.cache, this::grpcStateConsumer); + final StreamObserver responseObserver = new EventStreamObserver(sync, this.cache, + this::grpcStateConsumer); this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); try { @@ -128,8 +131,10 @@ private void observeEventStream() { sync.wait(); } } catch (InterruptedException e) { - // Interruptions are considered end calls for this observer, hence log and return - // Note - this is the most common interruption when shutdown, hence the log level debug + // Interruptions are considered end calls for this observer, hence log and + // return + // Note - this is the most common interruption when shutdown, hence the log + // level debug log.debug("interruption while waiting for condition", e); Thread.currentThread().interrupt(); } @@ -141,17 +146,18 @@ private void observeEventStream() { try { Thread.sleep(this.eventStreamRetryBackoff); } catch (InterruptedException e) { - // Interruptions are considered end calls for this observer, hence log and return + // Interruptions are considered end calls for this observer, hence log and + // return log.warn("interrupted while restarting gRPC Event Stream"); Thread.currentThread().interrupt(); } } log.error("failed to connect to event stream, exhausted retries"); - this.grpcStateConsumer(ProviderState.ERROR); + this.grpcStateConsumer(ProviderState.ERROR, null); } - private void grpcStateConsumer(final ProviderState state) { + private void grpcStateConsumer(final ProviderState state, final List changedFlags) { // check for readiness if (ProviderState.READY.equals(state)) { this.eventStreamAttempt = 1; @@ -163,6 +169,6 @@ private void grpcStateConsumer(final ProviderState state) { } // chain to initiator - this.stateConsumer.accept(state, Collections.emptyList()); + this.stateConsumer.accept(state, changedFlags); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java index a8cd77ec9..06b7470cc 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java @@ -6,7 +6,6 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index 7ddbbd0dc..120ebad5a 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -6,7 +6,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector; @@ -26,8 +26,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.stream.Collectors; - import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING; @@ -53,8 +51,8 @@ public InProcessResolver(FlagdOptions options, BiConsumer { try { while (true) { - final StorageStateDTO storageStateDTO = flagStore.getStateQueue().take(); - switch (storageStateDTO.getStorageState()) { + final StorageStateChange storageStateChange = flagStore.getStateQueue().take(); + switch (storageStateChange.getStorageState()) { case OK: - stateConsumer.accept(ProviderState.READY, storageStateDTO.getChangedFlagsKeys()); + stateConsumer.accept(ProviderState.READY, storageStateChange.getChangedFlagsKeys()); this.connected.set(true); break; case ERROR: - stateConsumer.accept(ProviderState.ERROR,null); + stateConsumer.accept(ProviderState.ERROR, null); this.connected.set(false); break; case STALE: // todo set stale state default: - log.info(String.format("Storage emitted unhandled status: %s", storageStateDTO.getStorageState())); + log.info(String.format("Storage emitted unhandled status: %s", + storageStateChange.getStorageState())); } } } catch (InterruptedException e) { @@ -103,14 +102,14 @@ public void init() throws Exception { public void shutdown() throws InterruptedException { flagStore.shutdown(); this.connected.set(false); - stateConsumer.accept(ProviderState.NOT_READY,null); + stateConsumer.accept(ProviderState.NOT_READY, null); } /** * Resolve a boolean flag. */ public ProviderEvaluation booleanEvaluation(String key, Boolean defaultValue, - EvaluationContext ctx) { + EvaluationContext ctx) { return resolve(Boolean.class, key, ctx); } @@ -221,7 +220,7 @@ private ProviderEvaluation resolve(Class type, String key, EvaluationC .variant(resolvedVariant) .reason(reason); - return this.metadata == null ? evaluationBuilder.build() : - evaluationBuilder.flagMetadata(this.metadata).build(); + return this.metadata == null ? evaluationBuilder.build() + : evaluationBuilder.flagMetadata(this.metadata).build(); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index 12b0cbc17..3b4c03113 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -21,15 +21,15 @@ * Feature flag storage. */ @Slf4j -@SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, - justification = "Feature flag comes as a Json configuration, hence they must be exposed") +@SuppressFBWarnings(value = { + "EI_EXPOSE_REP" }, justification = "Feature flag comes as a Json configuration, hence they must be exposed") public class FlagStore implements Storage { private final ReentrantReadWriteLock sync = new ReentrantReadWriteLock(); private final ReadLock readLock = sync.readLock(); private final WriteLock writeLock = sync.writeLock(); private final AtomicBoolean shutdown = new AtomicBoolean(false); - private final BlockingQueue stateBlockingQueue = new LinkedBlockingQueue<>(1); + private final BlockingQueue stateBlockingQueue = new LinkedBlockingQueue<>(1); private final Map flags = new HashMap<>(); private final Connector connector; @@ -89,7 +89,7 @@ public FeatureFlag getFlag(final String key) { /** * Retrieve blocking queue to check storage status. */ - public BlockingQueue getStateQueue() { + public BlockingQueue getStateQueue() { return stateBlockingQueue; } @@ -111,19 +111,19 @@ private void streamerListener(final Connector connector) throws InterruptedExcep } finally { writeLock.unlock(); } - if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.OK, changedFlagsKeys))) { + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.OK, changedFlagsKeys))) { log.warn("Failed to convey OK satus, queue is full"); } } catch (Throwable e) { // catch all exceptions and avoid stream listener interruptions log.warn("Invalid flag sync payload from connector", e); - if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.STALE))) { + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { log.warn("Failed to convey STALE satus, queue is full"); } } break; case ERROR: - if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.ERROR))) { + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { log.warn("Failed to convey ERROR satus, queue is full"); } break; @@ -147,8 +147,8 @@ private List getChangedFlagsKeys(Map newFlags) { updatedFeatureFlags.put(key, value); } }); - flags.forEach((key,value) -> { - if(!newFlags.containsKey(key)) { + flags.forEach((key, value) -> { + if (!newFlags.containsKey(key)) { removedFeatureFlags.put(key, value); } }); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java index 4589eb1c2..10772154f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java @@ -14,5 +14,5 @@ public interface Storage { FeatureFlag getFlag(final String key); - BlockingQueue getStateQueue(); + BlockingQueue getStateQueue(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java similarity index 60% rename from providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java rename to providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java index 17de9f934..cf85b0432 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateDTO.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java @@ -1,26 +1,29 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; - import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +/** + * Represents a change in the stored flags. + */ @Getter @ToString @EqualsAndHashCode -public class StorageStateDTO { +public class StorageStateChange { private final StorageState storageState; private final List changedFlagsKeys; - public StorageStateDTO(StorageState storageState, List changedFlagsKeys) { + public StorageStateChange(StorageState storageState, List changedFlagsKeys) { this.storageState = storageState; - this.changedFlagsKeys = changedFlagsKeys; + this.changedFlagsKeys = new ArrayList<>(changedFlagsKeys); } - public StorageStateDTO(StorageState storageState) { + public StorageStateChange(StorageState storageState) { this.storageState = storageState; this.changedFlagsKeys = Collections.emptyList(); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index 3b207aa33..1a219f099 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -18,7 +18,6 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,7 +25,7 @@ import java.util.function.Supplier; import java.util.concurrent.LinkedBlockingQueue; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; @@ -919,8 +918,8 @@ private FlagdProvider createInProcessProvider() { .build(); final FlagdProvider provider = new FlagdProvider(flagdOptions); final MockStorage mockStorage = new MockStorage(new HashMap(), - new LinkedBlockingQueue(Arrays.asList(new - StorageStateDTO(StorageState.OK)))); + new LinkedBlockingQueue(Arrays.asList(new + StorageStateChange(StorageState.OK)))); try { final Field flagResolver = FlagdProvider.class.getDeclaredField("flagResolver"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java index d23caf42a..a101a631d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java @@ -12,7 +12,6 @@ import dev.openfeature.sdk.FeatureProvider; import io.cucumber.java.BeforeAll; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; @Isolated() @Order(value = Integer.MAX_VALUE) diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/StepDefinitions.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/StepDefinitions.java index c3d451ce9..68db6bf5e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/StepDefinitions.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/StepDefinitions.java @@ -65,6 +65,7 @@ public class StepDefinitions { private FlagEvaluationDetails typeErrorDetails; private boolean isChangeHandlerRun = false; + private String changedFlag; private boolean isReadyHandlerRun = false; private Consumer changeHandler; @@ -418,7 +419,12 @@ public void the_returned_value_should_be(Integer expectedValue) { @When("a PROVIDER_CONFIGURATION_CHANGED handler is added") public void a_provider_configuration_changed_handler_is_added() { this.changeHandler = (EventDetails details) -> { - this.isChangeHandlerRun = true; + if (details.getFlagsChanged().size() > 0) { + // we get multiple change events from the test container... + // we're only interested in the ones with the changed flag in question + this.changedFlag = details.getFlagsChanged().get(0); + this.isChangeHandlerRun = true; + } }; client.onProviderConfigurationChanged(this.changeHandler); @@ -440,7 +446,7 @@ public void the_provider_configuration_changed_handler_must_run() { @Then("the event details must indicate {string} was altered") public void the_event_details_must_indicate_was_altered(String flagKey) { - // TODO: In-process-provider doesnt support flag change list. + assertEquals(flagKey, this.changedFlag); } // Provider ready event diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java index 2f81ba28e..0345add4d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java @@ -42,7 +42,7 @@ void setUp() { cache = mock(Cache.class); reconnect = mock(Runnable.class); when(cache.getEnabled()).thenReturn(true); - stream = new EventStreamObserver(sync, cache, state -> states.add(state)); + stream = new EventStreamObserver(sync, cache, (state, changed) -> states.add(state)); } @Test diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index c306baaaa..9953b337f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -5,7 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.MockConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector; import dev.openfeature.sdk.ImmutableContext; @@ -51,14 +51,14 @@ class InProcessResolverTest { @Test - public void connectorSetup(){ + public void connectorSetup() { // given - FlagdOptions forGrpcOptions = - FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build(); - FlagdOptions forOfflineOptions = - FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).offlineFlagSourcePath("path").build(); - FlagdOptions forCustomConnectorOptions = - FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).customConnector(new MockConnector(null)).build(); + FlagdOptions forGrpcOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost") + .port(8080).build(); + FlagdOptions forOfflineOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) + .offlineFlagSourcePath("path").build(); + FlagdOptions forCustomConnectorOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) + .customConnector(new MockConnector(null)).build(); // then assertInstanceOf(GrpcStreamConnector.class, InProcessResolver.getConnector(forGrpcOptions)); @@ -70,7 +70,7 @@ public void connectorSetup(){ public void eventHandling() throws Throwable { // given // note - queues with adequate capacity - final BlockingQueue sender = new LinkedBlockingQueue<>(5); + final BlockingQueue sender = new LinkedBlockingQueue<>(5); final BlockingQueue receiver = new LinkedBlockingQueue<>(5); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(new HashMap<>(), sender), @@ -84,10 +84,10 @@ public void eventHandling() throws Throwable { } }); initThread.start(); - if (!sender.offer(new StorageStateDTO(StorageState.OK, Collections.EMPTY_LIST), 100, TimeUnit.MILLISECONDS)) { + if (!sender.offer(new StorageStateChange(StorageState.OK, Collections.EMPTY_LIST), 100, TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } - if (!sender.offer(new StorageStateDTO(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { + if (!sender.offer(new StorageStateChange(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } @@ -169,7 +169,7 @@ public void fetchDoubleAsInt() throws Exception { InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), (providerState, changedFlagKeys) -> { - }); + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("integerFlag", 0d, @@ -188,8 +188,8 @@ public void simpleIntResolving() throws Exception { flagMap.put("integerFlag", INT_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("integerFlag", 0, @@ -208,8 +208,8 @@ public void simpleObjectResolving() throws Exception { flagMap.put("objectFlag", OBJECT_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); Map typeDefault = new HashMap<>(); typeDefault.put("key", "0164"); @@ -235,8 +235,8 @@ public void missingFlag() throws Exception { final Map flagMap = new HashMap<>(); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when/then assertThrows(FlagNotFoundError.class, () -> { @@ -252,8 +252,8 @@ public void disabledFlag() throws Exception { flagMap.put("disabledFlag", DISABLED_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when/then assertThrows(FlagNotFoundError.class, () -> { @@ -268,8 +268,8 @@ public void variantMismatchFlag() throws Exception { flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when/then assertThrows(TypeMismatchError.class, () -> { @@ -284,8 +284,8 @@ public void typeMismatchEvaluation() throws Exception { flagMap.put("stringFlag", BOOLEAN_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when/then assertThrows(TypeMismatchError.class, () -> { @@ -300,8 +300,8 @@ public void booleanShorthandEvaluation() throws Exception { flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false, new ImmutableContext()); @@ -319,8 +319,8 @@ public void targetingMatchedEvaluationFlag() throws Exception { flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loopAlg", @@ -339,8 +339,8 @@ public void targetingUnmatchedEvaluationFlag() throws Exception { flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loopAlg", @@ -359,12 +359,12 @@ public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalA flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when - ProviderEvaluation providerEvaluation = - inProcessResolver.stringEvaluation("stringFlag", "loop", new MutableContext("xyz")); + ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loop", + new MutableContext("xyz")); // then assertEquals("binetAlg", providerEvaluation.getValue()); @@ -379,8 +379,8 @@ public void targetingErrorEvaluationFlag() throws Exception { flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState,changedFlagKeys) -> { - }); + (providerState, changedFlagKeys) -> { + }); // when/then assertThrows(ParseError.class, () -> { @@ -417,9 +417,8 @@ private InProcessResolver getInProcessResolverWth(final FlagdOptions options, fi return injectFlagStore(resolver, storage); } - private InProcessResolver getInProcessResolverWth(final MockStorage storage, - final BiConsumer> stateConsumer) + final BiConsumer> stateConsumer) throws NoSuchFieldException, IllegalAccessException { final InProcessResolver resolver = new InProcessResolver( diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java index dbbd87d9d..3e183d04c 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java @@ -2,7 +2,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange; import javax.annotation.Nullable; import java.util.Map; @@ -11,9 +11,9 @@ public class MockStorage implements Storage { private final Map mockFlags; - private final BlockingQueue mockQueue; + private final BlockingQueue mockQueue; - public MockStorage(Map mockFlags, BlockingQueue mockQueue) { + public MockStorage(Map mockFlags, BlockingQueue mockQueue) { this.mockFlags = mockFlags; this.mockQueue = mockQueue; } @@ -37,7 +37,7 @@ public FeatureFlag getFlag(String key) { @Nullable - public BlockingQueue getStateQueue() { + public BlockingQueue getStateQueue() { return mockQueue; } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index 4d51b618f..54c660477 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -30,7 +30,7 @@ public void connectorHandling() throws Exception { FlagStore store = new FlagStore(new MockConnector(payload), true); store.init(); - final BlockingQueue states = store.getStateQueue(); + final BlockingQueue states = store.getStateQueue(); // OK for simple flag assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { @@ -82,7 +82,7 @@ public void changedFlags() throws Exception { final BlockingQueue payload = new LinkedBlockingQueue<>(); FlagStore store = new FlagStore(new MockConnector(payload), true); store.init(); - final BlockingQueue storageStateDTOS = store.getStateQueue(); + final BlockingQueue storageStateDTOS = store.getStateQueue(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));