1
1
package io .javaoperatorsdk .operator .processing .event .source .polling ;
2
2
3
- import com .github .benmanes .caffeine .jcache .spi .CaffeineCachingProvider ;
3
+ import java .util .Optional ;
4
+
5
+ import javax .cache .Cache ;
6
+ import javax .cache .CacheManager ;
7
+ import javax .cache .configuration .MutableConfiguration ;
8
+ import javax .cache .spi .CachingProvider ;
9
+
10
+ import org .junit .jupiter .api .BeforeEach ;
11
+ import org .junit .jupiter .api .Test ;
12
+
4
13
import io .javaoperatorsdk .operator .TestUtils ;
5
14
import io .javaoperatorsdk .operator .processing .event .EventHandler ;
6
15
import io .javaoperatorsdk .operator .processing .event .ResourceID ;
7
16
import io .javaoperatorsdk .operator .processing .event .source .SampleExternalResource ;
8
17
import io .javaoperatorsdk .operator .processing .event .source .controller .ResourceCache ;
9
18
import io .javaoperatorsdk .operator .sample .simple .TestCustomResource ;
10
- import org .junit .jupiter .api .BeforeEach ;
11
- import org .junit .jupiter .api .Test ;
12
-
13
- import javax .cache .Cache ;
14
- import javax .cache .CacheManager ;
15
- import javax .cache .configuration .MutableConfiguration ;
16
- import javax .cache .spi .CachingProvider ;
17
19
18
- import java .util .Optional ;
19
- import java .util .function .Predicate ;
20
+ import com .github .benmanes .caffeine .jcache .spi .CaffeineCachingProvider ;
20
21
21
22
import static org .junit .jupiter .api .Assertions .*;
22
23
import static org .mockito .ArgumentMatchers .any ;
25
26
26
27
class PerResourcePollingEventSourceTest {
27
28
28
- public static final int PERIOD = 50 ;
29
- private PerResourcePollingEventSource <SampleExternalResource , TestCustomResource > pollingEventSource ;
30
- private PerResourcePollingEventSource .ResourceSupplier <SampleExternalResource , TestCustomResource >
31
- supplier = mock (PerResourcePollingEventSource .ResourceSupplier .class );
32
- private ResourceCache <TestCustomResource > resourceCache = mock (ResourceCache .class );
33
- private Cache <ResourceID , SampleExternalResource > cache ;
34
- private EventHandler eventHandler = mock (EventHandler .class );
35
- private TestCustomResource testCustomResource = TestUtils .testCustomResource ();
36
-
37
- @ BeforeEach
38
- public void setup () {
39
- CachingProvider cachingProvider = new CaffeineCachingProvider ();
40
- CacheManager cacheManager = cachingProvider .getCacheManager ();
41
- cache = cacheManager .createCache ("test-caching" , new MutableConfiguration <>());
42
-
43
- when (resourceCache .get (any ())).thenReturn (Optional .of (testCustomResource ));
44
- when (supplier .getResources (any ())). thenReturn ( Optional . of ( SampleExternalResource . testResource1 ()));
45
-
46
- pollingEventSource = new PerResourcePollingEventSource <>
47
- ( supplier , resourceCache , PERIOD , cache );
48
- pollingEventSource . setEventHandler ( eventHandler );
49
- }
50
-
51
- @ Test
52
- public void pollsTheResourceAfterAwareOfIt () throws InterruptedException {
53
- pollingEventSource . start ();
54
- pollingEventSource .onResourceCreated ( testCustomResource );
55
-
56
- Thread . sleep ( 3 * PERIOD );
57
- verify ( supplier , atLeast ( 2 )). getResources ( eq ( testCustomResource ) );
58
- verify (eventHandler , times ( 1 )).handleEvent ( any ( ));
59
- }
60
-
61
- @ Test
62
- public void registeringTaskOnAPredicate () throws InterruptedException {
63
- pollingEventSource = new PerResourcePollingEventSource <>
64
- (supplier , resourceCache , PERIOD , cache ,
65
- testCustomResource -> testCustomResource .getMetadata ().getGeneration () > 1 );
66
- pollingEventSource .setEventHandler (eventHandler );
67
- pollingEventSource .start ();
68
- pollingEventSource .onResourceCreated (testCustomResource );
69
- Thread .sleep (2 * PERIOD );
70
-
71
- verify (supplier , times (0 )).getResources (eq (testCustomResource ));
72
- testCustomResource .getMetadata ().setGeneration (2L );
73
- pollingEventSource .onResourceUpdated (testCustomResource , testCustomResource );
74
-
75
- Thread .sleep (2 * PERIOD );
76
-
77
- verify (supplier , atLeast (1 )).getResources (eq (testCustomResource ));
78
- }
79
-
80
- @ Test
81
- public void propagateEventOnDeletedResource () throws InterruptedException {
82
- pollingEventSource .start ();
83
- pollingEventSource .onResourceCreated (testCustomResource );
84
- when (supplier .getResources (any ()))
85
- .thenReturn (Optional .of (SampleExternalResource .testResource1 ()))
86
- .thenReturn (Optional .empty ());
87
-
88
- Thread .sleep (3 * PERIOD );
89
- verify (supplier ,atLeast (2 )).getResources (eq (testCustomResource ));
90
- verify (eventHandler , times (2 )).handleEvent (any ());
91
- }
92
- }
29
+ public static final int PERIOD = 50 ;
30
+ private PerResourcePollingEventSource <SampleExternalResource , TestCustomResource > pollingEventSource ;
31
+ private PerResourcePollingEventSource .ResourceSupplier <SampleExternalResource , TestCustomResource > supplier =
32
+ mock (PerResourcePollingEventSource .ResourceSupplier .class );
33
+ private ResourceCache <TestCustomResource > resourceCache = mock (ResourceCache .class );
34
+ private Cache <ResourceID , SampleExternalResource > cache ;
35
+ private EventHandler eventHandler = mock (EventHandler .class );
36
+ private TestCustomResource testCustomResource = TestUtils .testCustomResource ();
37
+
38
+ @ BeforeEach
39
+ public void setup () {
40
+ CachingProvider cachingProvider = new CaffeineCachingProvider ();
41
+ CacheManager cacheManager = cachingProvider .getCacheManager ();
42
+ cache = cacheManager .createCache ("test-caching" , new MutableConfiguration <>());
43
+
44
+ when (resourceCache .get (any ())).thenReturn (Optional .of (testCustomResource ));
45
+ when (supplier .getResources (any ()))
46
+ . thenReturn ( Optional . of ( SampleExternalResource . testResource1 ()));
47
+
48
+ pollingEventSource =
49
+ new PerResourcePollingEventSource <>( supplier , resourceCache , PERIOD , cache );
50
+ pollingEventSource . setEventHandler ( eventHandler );
51
+ }
52
+
53
+ @ Test
54
+ public void pollsTheResourceAfterAwareOfIt () throws InterruptedException {
55
+ pollingEventSource .start ( );
56
+ pollingEventSource . onResourceCreated ( testCustomResource );
57
+
58
+ Thread . sleep ( 3 * PERIOD );
59
+ verify (supplier , atLeast ( 2 )).getResources ( eq ( testCustomResource ));
60
+ verify ( eventHandler , times ( 1 )). handleEvent ( any ());
61
+ }
62
+
63
+ @ Test
64
+ public void registeringTaskOnAPredicate () throws InterruptedException {
65
+ pollingEventSource = new PerResourcePollingEventSource <> (supplier , resourceCache , PERIOD , cache ,
66
+ testCustomResource -> testCustomResource .getMetadata ().getGeneration () > 1 );
67
+ pollingEventSource .setEventHandler (eventHandler );
68
+ pollingEventSource .start ();
69
+ pollingEventSource .onResourceCreated (testCustomResource );
70
+ Thread .sleep (2 * PERIOD );
71
+
72
+ verify (supplier , times (0 )).getResources (eq (testCustomResource ));
73
+ testCustomResource .getMetadata ().setGeneration (2L );
74
+ pollingEventSource .onResourceUpdated (testCustomResource , testCustomResource );
75
+
76
+ Thread .sleep (2 * PERIOD );
77
+
78
+ verify (supplier , atLeast (1 )).getResources (eq (testCustomResource ));
79
+ }
80
+
81
+ @ Test
82
+ public void propagateEventOnDeletedResource () throws InterruptedException {
83
+ pollingEventSource .start ();
84
+ pollingEventSource .onResourceCreated (testCustomResource );
85
+ when (supplier .getResources (any ()))
86
+ .thenReturn (Optional .of (SampleExternalResource .testResource1 ()))
87
+ .thenReturn (Optional .empty ());
88
+
89
+ Thread .sleep (3 * PERIOD );
90
+ verify (supplier , atLeast (2 )).getResources (eq (testCustomResource ));
91
+ verify (eventHandler , times (2 )).handleEvent (any ());
92
+ }
93
+ }
0 commit comments