Skip to content

Commit 89272b4

Browse files
csvirimetacosm
authored andcommitted
fix: sample improvements
1 parent b4ddaae commit 89272b4

File tree

7 files changed

+123
-8
lines changed

7 files changed

+123
-8
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ private void pollForResource(R resource) {
8181
}
8282
}
8383

84+
private Optional<T> getAndCacheResource(ResourceID resourceID) {
85+
var resource = resourceCache.get(resourceID);
86+
if (resource.isPresent()) {
87+
var value = resourceSupplier.getResources(resource.get());
88+
value.ifPresent(v -> cache.put(resourceID, v));
89+
return value;
90+
}
91+
return Optional.empty();
92+
}
93+
8494
@Override
8595
public void onResourceCreated(R resource) {
8696
checkAndRegisterTask(resource);
@@ -121,6 +131,22 @@ public void run() {
121131
}
122132
}
123133

134+
/**
135+
*
136+
* @param resourceID of the target related resource
137+
* @return the cached value of the resource, if not present it gets the resource from the
138+
* supplier. If the supplier provides a value it is cached, so there will be no new event
139+
* related for the new event.
140+
*/
141+
public Optional<T> getValueFromCacheOrSupplier(ResourceID resourceID) {
142+
var cachedValue = getCachedValue(resourceID);
143+
if (cachedValue.isPresent()) {
144+
return cachedValue;
145+
} else {
146+
return getAndCacheResource(resourceID);
147+
}
148+
}
149+
124150
public interface ResourceSupplier<T, R> {
125151
Optional<T> getResources(R resource);
126152
}

sample-operators/mysql-schema/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@
6767
<artifactId>jackson-dataformat-yaml</artifactId>
6868
<version>2.13.0</version>
6969
</dependency>
70+
<dependency>
71+
<groupId>javax.cache</groupId>
72+
<artifactId>cache-api</artifactId>
73+
<version>${jcache.version}</version>
74+
</dependency>
75+
<dependency>
76+
<groupId>com.github.ben-manes.caffeine</groupId>
77+
<artifactId>caffeine</artifactId>
78+
</dependency>
79+
<dependency>
80+
<groupId>com.github.ben-manes.caffeine</groupId>
81+
<artifactId>jcache</artifactId>
82+
</dependency>
7083
</dependencies>
7184

7285
<build>

sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
import java.util.Base64;
77
import java.util.Optional;
88

9+
import javax.cache.Cache;
10+
import javax.cache.CacheManager;
11+
import javax.cache.configuration.MutableConfiguration;
12+
import javax.cache.spi.CachingProvider;
13+
914
import org.apache.commons.lang3.RandomStringUtils;
1015
import org.slf4j.Logger;
1116
import org.slf4j.LoggerFactory;
@@ -15,30 +20,55 @@
1520
import io.fabric8.kubernetes.api.model.SecretBuilder;
1621
import io.fabric8.kubernetes.client.KubernetesClient;
1722
import io.javaoperatorsdk.operator.api.reconciler.*;
23+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
24+
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
25+
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
26+
import io.javaoperatorsdk.operator.sample.schema.Schema;
1827
import io.javaoperatorsdk.operator.sample.schema.SchemaService;
1928

29+
import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;
30+
2031
import static java.lang.String.format;
2132

2233
@ControllerConfiguration
2334
public class MySQLSchemaReconciler
24-
implements Reconciler<MySQLSchema>, ErrorStatusHandler<MySQLSchema> {
35+
implements Reconciler<MySQLSchema>, ErrorStatusHandler<MySQLSchema>,
36+
EventSourceInitializer<MySQLSchema> {
2537
public static final String SECRET_FORMAT = "%s-secret";
2638
public static final String USERNAME_FORMAT = "%s-user";
39+
public static final int POLL_PERIOD = 500;
2740
private final Logger log = LoggerFactory.getLogger(getClass());
2841

2942
private final KubernetesClient kubernetesClient;
3043
private final MySQLDbConfig mysqlDbConfig;
44+
PerResourcePollingEventSource<Schema, MySQLSchema> perResourcePollingEventSource;
3145

3246
public MySQLSchemaReconciler(KubernetesClient kubernetesClient, MySQLDbConfig mysqlDbConfig) {
3347
this.kubernetesClient = kubernetesClient;
3448
this.mysqlDbConfig = mysqlDbConfig;
3549
}
3650

51+
@Override
52+
public void prepareEventSources(EventSourceRegistry<MySQLSchema> eventSourceRegistry) {
53+
CachingProvider cachingProvider = new CaffeineCachingProvider();
54+
CacheManager cacheManager = cachingProvider.getCacheManager();
55+
Cache<ResourceID, Schema> schemaCache =
56+
cacheManager.createCache("schema-cache", new MutableConfiguration<>());
57+
58+
perResourcePollingEventSource =
59+
new PerResourcePollingEventSource<>(new SchemaPollingResourceSupplier(mysqlDbConfig),
60+
eventSourceRegistry.getControllerResourceEventSource().getResourceCache(), POLL_PERIOD,
61+
schemaCache);
62+
63+
eventSourceRegistry.registerEventSource(perResourcePollingEventSource);
64+
}
65+
3766
@Override
3867
public UpdateControl<MySQLSchema> reconcile(MySQLSchema schema,
3968
Context context) {
69+
var dbSchema = perResourcePollingEventSource
70+
.getValueFromCacheOrSupplier(ResourceID.fromResource(schema));
4071
try (Connection connection = getConnection()) {
41-
var dbSchema = SchemaService.getSchema(connection, schema.getMetadata().getName());
4272
if (!dbSchema.isPresent()) {
4373
var schemaName = schema.getMetadata().getName();
4474
String password = RandomStringUtils.randomAlphanumeric(16);
@@ -115,6 +145,12 @@ private void updateStatusPojo(MySQLSchema schema, String secretName, String user
115145

116146
private void createSecret(MySQLSchema schema, String password, String secretName,
117147
String userName) {
148+
149+
var currentSecret = kubernetesClient.secrets().inNamespace(schema.getMetadata().getNamespace())
150+
.withName(secretName).get();
151+
if (currentSecret != null) {
152+
return;
153+
}
118154
Secret credentialsSecret =
119155
new SecretBuilder()
120156
.withNewMetadata()
@@ -133,4 +169,6 @@ private void createSecret(MySQLSchema schema, String password, String secretName
133169
.inNamespace(schema.getMetadata().getNamespace())
134170
.create(credentialsSecret);
135171
}
172+
173+
136174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.javaoperatorsdk.operator.sample;
2+
3+
import java.util.Optional;
4+
5+
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
6+
import io.javaoperatorsdk.operator.sample.schema.Schema;
7+
import io.javaoperatorsdk.operator.sample.schema.SchemaService;
8+
9+
public class SchemaPollingResourceSupplier
10+
implements PerResourcePollingEventSource.ResourceSupplier<Schema, MySQLSchema> {
11+
12+
private final SchemaService schemaService;
13+
14+
public SchemaPollingResourceSupplier(MySQLDbConfig mySQLDbConfig) {
15+
this.schemaService = new SchemaService(mySQLDbConfig);
16+
}
17+
18+
@Override
19+
public Optional<Schema> getResources(MySQLSchema resource) {
20+
return schemaService.getSchema(resource.getMetadata().getName());
21+
}
22+
}

sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/SchemaStatus.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.javaoperatorsdk.operator.sample;
22

3-
public class SchemaStatus {
3+
import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus;
4+
5+
public class SchemaStatus extends ObservedGenerationAwareStatus {
46

57
private String url;
68

sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/schema/Schema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package io.javaoperatorsdk.operator.sample.schema;
22

3+
import java.io.Serializable;
34
import java.util.Objects;
45

5-
public class Schema {
6+
public class Schema implements Serializable {
67

78
private String name;
89
private String characterSet;

sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/schema/SchemaService.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ public static void createSchemaAndRelatedUser(Connection connection, String sche
4040
"CREATE SCHEMA `%1$s` DEFAULT CHARACTER SET %2$s",
4141
schemaName, encoding));
4242
}
43-
44-
try (Statement statement = connection.createStatement()) {
45-
statement.execute(format("CREATE USER '%1$s' IDENTIFIED BY '%2$s'", userName, password));
43+
if (!userExists(connection, userName)) {
44+
try (Statement statement = connection.createStatement()) {
45+
statement.execute(format("CREATE USER '%1$s' IDENTIFIED BY '%2$s'", userName, password));
46+
}
4647
}
47-
4848
try (Statement statement = connection.createStatement()) {
4949
statement.execute(
5050
format("GRANT ALL ON `%1$s`.* TO '%2$s'", schemaName, userName));
@@ -73,6 +73,19 @@ public static void deleteSchemaAndRelatedUser(Connection connection, String sche
7373
}
7474
}
7575

76+
private static boolean userExists(Connection connection, String username) {
77+
try (PreparedStatement ps =
78+
connection.prepareStatement(
79+
"SELECT EXISTS(SELECT 1 FROM mysql.user WHERE user = ?)")) {
80+
ps.setString(1, username);
81+
try (ResultSet resultSet = ps.executeQuery()) {
82+
return resultSet.next();
83+
}
84+
} catch (SQLException e) {
85+
throw new IllegalStateException(e);
86+
}
87+
}
88+
7689
public static Optional<Schema> getSchema(Connection connection, String schemaName) {
7790
try (PreparedStatement ps =
7891
connection.prepareStatement(

0 commit comments

Comments
 (0)