1
1
package dev .openfeature .contrib .providers .flagd ;
2
2
3
3
import dev .openfeature .contrib .providers .flagd .resolver .Resolver ;
4
- import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
4
+ import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
5
+ import dev .openfeature .contrib .providers .flagd .resolver .common .Util ;
5
6
import dev .openfeature .contrib .providers .flagd .resolver .grpc .GrpcResolver ;
6
7
import dev .openfeature .contrib .providers .flagd .resolver .grpc .cache .Cache ;
7
8
import dev .openfeature .contrib .providers .flagd .resolver .process .InProcessResolver ;
12
13
import dev .openfeature .sdk .ImmutableStructure ;
13
14
import dev .openfeature .sdk .Metadata ;
14
15
import dev .openfeature .sdk .ProviderEvaluation ;
16
+ import dev .openfeature .sdk .ProviderEvent ;
15
17
import dev .openfeature .sdk .ProviderEventDetails ;
16
18
import dev .openfeature .sdk .Structure ;
17
19
import dev .openfeature .sdk .Value ;
18
20
import java .util .ArrayList ;
19
21
import java .util .Collections ;
20
22
import java .util .List ;
23
+ import java .util .concurrent .Executors ;
24
+ import java .util .concurrent .ScheduledExecutorService ;
25
+ import java .util .concurrent .ScheduledFuture ;
26
+ import java .util .concurrent .TimeUnit ;
21
27
import java .util .function .Function ;
22
28
import lombok .extern .slf4j .Slf4j ;
23
29
@@ -30,11 +36,31 @@ public class FlagdProvider extends EventProvider {
30
36
private Function <Structure , EvaluationContext > contextEnricher ;
31
37
private static final String FLAGD_PROVIDER = "flagd" ;
32
38
private final Resolver flagResolver ;
33
- private volatile boolean initialized = false ;
34
- private volatile boolean connected = false ;
35
- private volatile Structure syncMetadata = new ImmutableStructure ();
36
- private volatile EvaluationContext enrichedContext = new ImmutableContext ();
37
39
private final List <Hook > hooks = new ArrayList <>();
40
+ private final EventsLock eventsLock = new EventsLock ();
41
+
42
+ /**
43
+ * An executor service responsible for emitting
44
+ * {@link ProviderEvent#PROVIDER_ERROR} after the provider went
45
+ * {@link ProviderEvent#PROVIDER_STALE} for {@link #gracePeriod} seconds.
46
+ */
47
+ private final ScheduledExecutorService errorExecutor ;
48
+
49
+ /**
50
+ * A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
51
+ */
52
+ private ScheduledFuture <?> errorTask ;
53
+
54
+ /**
55
+ * The grace period in milliseconds to wait after
56
+ * {@link ProviderEvent#PROVIDER_STALE} before emitting a
57
+ * {@link ProviderEvent#PROVIDER_ERROR}.
58
+ */
59
+ private final long gracePeriod ;
60
+ /**
61
+ * The deadline in milliseconds for GRPC operations.
62
+ */
63
+ private final long deadline ;
38
64
39
65
protected final void finalize () {
40
66
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
@@ -55,18 +81,34 @@ public FlagdProvider() {
55
81
public FlagdProvider (final FlagdOptions options ) {
56
82
switch (options .getResolverType ().asString ()) {
57
83
case Config .RESOLVER_IN_PROCESS :
58
- this .flagResolver = new InProcessResolver (options , this ::isConnected , this :: onConnectionEvent );
84
+ this .flagResolver = new InProcessResolver (options , this ::onProviderEvent );
59
85
break ;
60
86
case Config .RESOLVER_RPC :
61
87
this .flagResolver = new GrpcResolver (
62
- options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onConnectionEvent );
88
+ options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onProviderEvent );
63
89
break ;
64
90
default :
65
91
throw new IllegalStateException (
66
92
String .format ("Requested unsupported resolver type of %s" , options .getResolverType ()));
67
93
}
68
94
hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
69
95
contextEnricher = options .getContextEnricher ();
96
+ errorExecutor = Executors .newSingleThreadScheduledExecutor ();
97
+ gracePeriod = options .getRetryGracePeriod ();
98
+ deadline = options .getDeadline ();
99
+ }
100
+
101
+ /**
102
+ * Internal constructor for test cases.
103
+ * DO NOT MAKE PUBLIC
104
+ */
105
+ FlagdProvider (Resolver resolver , boolean initialized ) {
106
+ this .flagResolver = resolver ;
107
+ deadline = Config .DEFAULT_DEADLINE ;
108
+ gracePeriod = Config .DEFAULT_STREAM_RETRY_GRACE_PERIOD ;
109
+ hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
110
+ errorExecutor = Executors .newSingleThreadScheduledExecutor ();
111
+ this .eventsLock .initialized = initialized ;
70
112
}
71
113
72
114
@ Override
@@ -75,27 +117,39 @@ public List<Hook> getProviderHooks() {
75
117
}
76
118
77
119
@ Override
78
- public synchronized void initialize (EvaluationContext evaluationContext ) throws Exception {
79
- if (this .initialized ) {
80
- return ;
81
- }
120
+ public void initialize (EvaluationContext evaluationContext ) throws Exception {
121
+ synchronized (eventsLock ) {
122
+ if (eventsLock .initialized ) {
123
+ return ;
124
+ }
82
125
83
- this .flagResolver .init ();
84
- this .initialized = this .connected = true ;
126
+ flagResolver .init ();
127
+ }
128
+ // block till ready - this works with deadline fine for rpc, but with in_process
129
+ // we also need to take parsing into the equation
130
+ // TODO: evaluate where we are losing time, so we can remove this magic number -
131
+ // follow up
132
+ // wait outside of the synchonrization or we'll deadlock
133
+ Util .busyWaitAndCheck (this .deadline * 2 , () -> eventsLock .initialized );
85
134
}
86
135
87
136
@ Override
88
- public synchronized void shutdown () {
89
- if (!this .initialized ) {
90
- return ;
91
- }
92
-
93
- try {
94
- this .flagResolver .shutdown ();
95
- } catch (Exception e ) {
96
- log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
97
- } finally {
98
- this .initialized = false ;
137
+ public void shutdown () {
138
+ synchronized (eventsLock ) {
139
+ if (!eventsLock .initialized ) {
140
+ return ;
141
+ }
142
+ try {
143
+ this .flagResolver .shutdown ();
144
+ if (errorExecutor != null ) {
145
+ errorExecutor .shutdownNow ();
146
+ errorExecutor .awaitTermination (deadline , TimeUnit .MILLISECONDS );
147
+ }
148
+ } catch (Exception e ) {
149
+ log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
150
+ } finally {
151
+ eventsLock .initialized = false ;
152
+ }
99
153
}
100
154
}
101
155
@@ -106,27 +160,27 @@ public Metadata getMetadata() {
106
160
107
161
@ Override
108
162
public ProviderEvaluation <Boolean > getBooleanEvaluation (String key , Boolean defaultValue , EvaluationContext ctx ) {
109
- return this . flagResolver .booleanEvaluation (key , defaultValue , ctx );
163
+ return flagResolver .booleanEvaluation (key , defaultValue , ctx );
110
164
}
111
165
112
166
@ Override
113
167
public ProviderEvaluation <String > getStringEvaluation (String key , String defaultValue , EvaluationContext ctx ) {
114
- return this . flagResolver .stringEvaluation (key , defaultValue , ctx );
168
+ return flagResolver .stringEvaluation (key , defaultValue , ctx );
115
169
}
116
170
117
171
@ Override
118
172
public ProviderEvaluation <Double > getDoubleEvaluation (String key , Double defaultValue , EvaluationContext ctx ) {
119
- return this . flagResolver .doubleEvaluation (key , defaultValue , ctx );
173
+ return flagResolver .doubleEvaluation (key , defaultValue , ctx );
120
174
}
121
175
122
176
@ Override
123
177
public ProviderEvaluation <Integer > getIntegerEvaluation (String key , Integer defaultValue , EvaluationContext ctx ) {
124
- return this . flagResolver .integerEvaluation (key , defaultValue , ctx );
178
+ return flagResolver .integerEvaluation (key , defaultValue , ctx );
125
179
}
126
180
127
181
@ Override
128
182
public ProviderEvaluation <Value > getObjectEvaluation (String key , Value defaultValue , EvaluationContext ctx ) {
129
- return this . flagResolver .objectEvaluation (key , defaultValue , ctx );
183
+ return flagResolver .objectEvaluation (key , defaultValue , ctx );
130
184
}
131
185
132
186
/**
@@ -139,7 +193,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
139
193
* @return Object map representing sync metadata
140
194
*/
141
195
protected Structure getSyncMetadata () {
142
- return new ImmutableStructure (syncMetadata .asMap ());
196
+ return new ImmutableStructure (eventsLock . syncMetadata .asMap ());
143
197
}
144
198
145
199
/**
@@ -148,50 +202,109 @@ protected Structure getSyncMetadata() {
148
202
* @return context
149
203
*/
150
204
EvaluationContext getEnrichedContext () {
151
- return enrichedContext ;
205
+ return eventsLock . enrichedContext ;
152
206
}
153
207
154
- private boolean isConnected () {
155
- return this .connected ;
156
- }
208
+ @ SuppressWarnings ("checkstyle:fallthrough" )
209
+ private void onProviderEvent (FlagdProviderEvent flagdProviderEvent ) {
157
210
158
- private void onConnectionEvent (ConnectionEvent connectionEvent ) {
159
- final boolean wasConnected = connected ;
160
- final boolean isConnected = connected = connectionEvent .isConnected ();
211
+ synchronized (eventsLock ) {
212
+ log .info ("FlagdProviderEvent: {}" , flagdProviderEvent );
213
+ eventsLock .syncMetadata = flagdProviderEvent .getSyncMetadata ();
214
+ if (flagdProviderEvent .getSyncMetadata () != null ) {
215
+ eventsLock .enrichedContext = contextEnricher .apply (flagdProviderEvent .getSyncMetadata ());
216
+ }
161
217
162
- syncMetadata = connectionEvent .getSyncMetadata ();
163
- enrichedContext = contextEnricher .apply (connectionEvent .getSyncMetadata ());
218
+ /*
219
+ * We only use Error and Ready as previous states.
220
+ * As error will first be emitted as Stale, and only turns after a while into an
221
+ * emitted Error.
222
+ * Ready is needed, as the InProcessResolver does not have a dedicated ready
223
+ * event, hence we need to
224
+ * forward a configuration changed to the ready, if we are not in the ready
225
+ * state.
226
+ */
227
+ switch (flagdProviderEvent .getEvent ()) {
228
+ case PROVIDER_CONFIGURATION_CHANGED :
229
+ if (eventsLock .previousEvent == ProviderEvent .PROVIDER_READY ) {
230
+ onConfigurationChanged (flagdProviderEvent );
231
+ break ;
232
+ }
233
+ // intentional fall through, a not-ready change will trigger a ready.
234
+ case PROVIDER_READY :
235
+ onReady ();
236
+ eventsLock .previousEvent = ProviderEvent .PROVIDER_READY ;
237
+ break ;
164
238
165
- if (!initialized ) {
166
- return ;
239
+ case PROVIDER_ERROR :
240
+ if (eventsLock .previousEvent != ProviderEvent .PROVIDER_ERROR ) {
241
+ onError ();
242
+ }
243
+ eventsLock .previousEvent = ProviderEvent .PROVIDER_ERROR ;
244
+ break ;
245
+ default :
246
+ log .info ("Unknown event {}" , flagdProviderEvent .getEvent ());
247
+ }
167
248
}
249
+ }
250
+
251
+ private void onConfigurationChanged (FlagdProviderEvent flagdProviderEvent ) {
252
+ this .emitProviderConfigurationChanged (ProviderEventDetails .builder ()
253
+ .flagsChanged (flagdProviderEvent .getFlagsChanged ())
254
+ .message ("configuration changed" )
255
+ .build ());
256
+ }
168
257
169
- if (!wasConnected && isConnected ) {
170
- ProviderEventDetails details = ProviderEventDetails .builder ()
171
- .flagsChanged (connectionEvent .getFlagsChanged ())
172
- .message ("connected to flagd" )
173
- .build ();
174
- this .emitProviderReady (details );
175
- return ;
258
+ private void onReady () {
259
+ if (!eventsLock .initialized ) {
260
+ eventsLock .initialized = true ;
261
+ log .info ("initialized FlagdProvider" );
176
262
}
263
+ if (errorTask != null && !errorTask .isCancelled ()) {
264
+ errorTask .cancel (false );
265
+ log .debug ("Reconnection task cancelled as connection became READY." );
266
+ }
267
+ this .emitProviderReady (
268
+ ProviderEventDetails .builder ().message ("connected to flagd" ).build ());
269
+ }
177
270
178
- if (wasConnected && isConnected ) {
179
- ProviderEventDetails details = ProviderEventDetails .builder ()
180
- .flagsChanged (connectionEvent .getFlagsChanged ())
181
- .message ("configuration changed" )
182
- .build ();
183
- this .emitProviderConfigurationChanged (details );
184
- return ;
271
+ private void onError () {
272
+ log .info ("Connection lost. Emit STALE event..." );
273
+ log .debug ("Waiting {}s for connection to become available..." , gracePeriod );
274
+ this .emitProviderStale (ProviderEventDetails .builder ()
275
+ .message ("there has been an error" )
276
+ .build ());
277
+
278
+ if (errorTask != null && !errorTask .isCancelled ()) {
279
+ errorTask .cancel (false );
185
280
}
186
281
187
- if (connectionEvent .isStale ()) {
188
- this .emitProviderStale (ProviderEventDetails .builder ()
189
- .message ("there has been an error" )
190
- .build ());
191
- } else {
192
- this .emitProviderError (ProviderEventDetails .builder ()
193
- .message ("there has been an error" )
194
- .build ());
282
+ if (!errorExecutor .isShutdown ()) {
283
+ errorTask = errorExecutor .schedule (
284
+ () -> {
285
+ if (eventsLock .previousEvent == ProviderEvent .PROVIDER_ERROR ) {
286
+ log .debug (
287
+ "Provider did not reconnect successfully within {}s. Emit ERROR event..." ,
288
+ gracePeriod );
289
+ flagResolver .onError ();
290
+ this .emitProviderError (ProviderEventDetails .builder ()
291
+ .message ("there has been an error" )
292
+ .build ());
293
+ }
294
+ },
295
+ gracePeriod ,
296
+ TimeUnit .SECONDS );
195
297
}
196
298
}
299
+
300
+ /**
301
+ * Contains all fields we need to worry about locking, used as intrinsic lock
302
+ * for sync blocks.
303
+ */
304
+ static class EventsLock {
305
+ volatile ProviderEvent previousEvent = null ;
306
+ volatile Structure syncMetadata = new ImmutableStructure ();
307
+ volatile boolean initialized = false ;
308
+ volatile EvaluationContext enrichedContext = new ImmutableContext ();
309
+ }
197
310
}
0 commit comments