From 49555b7809c2b0ee4f7b0a837504fe13f3333939 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Tue, 27 Aug 2024 16:51:52 -0400 Subject: [PATCH 1/4] fix: add more logging in sync stream Signed-off-by: Todd Baert --- .../connector/grpc/GrpcStreamConnector.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index 8184da048..c4aa51a3d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -20,11 +20,12 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract. + * Implements the {@link Connector} contract and emit flags obtained from flagd + * sync gRPC contract. */ @Slf4j -@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, - justification = "Random is used to generate a variation & flag configurations require exposing") +@SuppressFBWarnings(value = { "PREDICTABLE_RANDOM", + "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing") public class GrpcStreamConnector implements Connector { private static final Random RANDOM = new Random(); @@ -111,40 +112,47 @@ public void shutdown() throws InterruptedException { * Contains blocking calls, to be used concurrently. */ static void observeEventStream(final BlockingQueue writeTo, - final AtomicBoolean shutdown, - final FlagSyncServiceStub serviceStub, - final SyncFlagsRequest request) + final AtomicBoolean shutdown, + final FlagSyncServiceStub serviceStub, + final SyncFlagsRequest request) throws InterruptedException { final BlockingQueue streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); int retryDelay = INIT_BACK_OFF; + log.info("Initializing sync stream observer"); + while (!shutdown.get()) { + log.debug("Initializing sync stream request"); serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver)); while (!shutdown.get()) { final GrpcResponseModel response = streamReceiver.take(); if (response.isComplete()) { - // The stream is complete. This is not considered as an error + log.warn("Stream completed"); + // The stream is complete, this isn't really an error but we should try to + // reconnect break; } if (response.getError() != null) { - log.warn(String.format("Error from grpc connection, retrying in %dms", retryDelay), + log.error(String.format("Error from grpc connection, retrying in %dms", retryDelay), response.getError()); if (!writeTo.offer( new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) { - log.warn("Failed to convey ERROR satus, queue is full"); + log.error("Failed to convey ERROR satus, queue is full"); } break; } final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse(); + String data = flagsResponse.getFlagConfiguration(); + log.debug("Got stream response: " + data); if (!writeTo.offer( - new StreamPayload(StreamPayloadType.DATA, flagsResponse.getFlagConfiguration()))) { - log.warn("Stream writing failed"); + new StreamPayload(StreamPayloadType.DATA, data))) { + log.error("Stream writing failed"); } // reset retry delay if we succeeded in a retry attempt @@ -158,6 +166,7 @@ static void observeEventStream(final BlockingQueue writeTo, } // busy wait till next attempt + log.warn(String.format("Stream failed, retrying in %dms", retryDelay)); Thread.sleep(retryDelay + RANDOM.nextInt(INIT_BACK_OFF)); if (retryDelay < MAX_BACK_OFF) { From eea6ae45974e6e9e9a2fff9cede23b5982e4923b Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Tue, 27 Aug 2024 16:54:19 -0400 Subject: [PATCH 2/4] Update providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java Signed-off-by: Todd Baert --- .../process/storage/connector/grpc/GrpcStreamConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index c4aa51a3d..fc697e837 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -130,7 +130,7 @@ static void observeEventStream(final BlockingQueue writeTo, final GrpcResponseModel response = streamReceiver.take(); if (response.isComplete()) { - log.warn("Stream completed"); + log.info("Sync stream completed"); // The stream is complete, this isn't really an error but we should try to // reconnect break; From e2eeb19192535492af02d38b6148db3c0a8c8518 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Tue, 27 Aug 2024 16:54:46 -0400 Subject: [PATCH 3/4] Update providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java Signed-off-by: Todd Baert --- .../process/storage/connector/grpc/GrpcStreamConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index fc697e837..d624a8ad5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -142,7 +142,7 @@ static void observeEventStream(final BlockingQueue writeTo, if (!writeTo.offer( new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) { - log.error("Failed to convey ERROR satus, queue is full"); + log.error("Failed to convey ERROR status, queue is full"); } break; } From 10aa27b17cae84890f2ff27152fd946a5ea20a93 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Tue, 27 Aug 2024 16:59:31 -0400 Subject: [PATCH 4/4] fixup: lint Signed-off-by: Todd Baert --- .../process/storage/connector/grpc/GrpcStreamConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index d624a8ad5..4e28f8bdf 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -25,7 +25,7 @@ */ @Slf4j @SuppressFBWarnings(value = { "PREDICTABLE_RANDOM", - "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing") + "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing") public class GrpcStreamConnector implements Connector { private static final Random RANDOM = new Random();