Skip to content

feat: add a DependentResource implementation that's also an EventSource #1094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Optional<RetryInfo> getRetryInfo() {
public <T> Optional<T> getSecondaryResource(Class<T> expectedType, String eventSourceName) {
return controller.getEventSourceManager()
.getResourceEventSourceFor(expectedType, eventSourceName)
.flatMap(es -> es.getAssociated(primaryResource));
.flatMap(es -> es.getAssociatedResource(primaryResource));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
public interface EventSourceInitializer<P extends HasMetadata> {

/**
* Prepares a list of {@link EventSource} implementations to be registered by the SDK.
* Prepares a map of {@link EventSource} implementations keyed by the name with which they need to
* be registered by the SDK.
*
* @param context a {@link EventSourceContext} providing access to information useful to event
* sources
* @return list of event sources to register
* @return a map of event sources to register
*/
Map<String, EventSource> prepareEventSources(EventSourceContext<P> context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.fabric8.kubernetes.api.model.HasMetadata;

@SuppressWarnings("rawtypes")
public class UpdateControl<T extends HasMetadata> extends BaseControl<UpdateControl<T>> {

private final T resource;
Expand Down Expand Up @@ -32,8 +31,7 @@ public static <T extends HasMetadata> UpdateControl<T> updateResource(T customRe
return new UpdateControl<>(customResource, false, true);
}

public static <T extends HasMetadata> UpdateControl<T> updateStatus(
T customResource) {
public static <T extends HasMetadata> UpdateControl<T> updateStatus(T customResource) {
return new UpdateControl<>(customResource, true, false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.ResourceOwner;

/**
* An interface to implement and provide dependent resource support.
*
* @param <R> the dependent resource type
* @param <P> the associated primary resource type
*/
public interface DependentResource<R, P extends HasMetadata> {
public interface DependentResource<R, P extends HasMetadata> extends ResourceOwner<R, P> {

/**
* Reconciles the dependent resource given the desired primary state
Expand All @@ -22,25 +21,6 @@ public interface DependentResource<R, P extends HasMetadata> {
*/
ReconcileResult<R> reconcile(P primary, Context<P> context);

/**
* The intention with get resource is to return the actual state of the resource. Usually from a
* local cache, what was updated after the reconciliation, or typically from the event source that
* caches the resources.
*
* @param primaryResource the primary resource for which we want to retrieve the secondary
* resource
* @return an {@link Optional} containing the secondary resource or {@link Optional#empty()} if it
* doesn't exist
*/
Optional<R> getResource(P primaryResource);

/**
* Retrieves the resource type associated with this DependentResource
*
* @return the resource type associated with this DependentResource
*/
Class<R> resourceType();

/**
* Computes a default name for the specified DependentResource class
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ public interface EventSourceProvider<P extends HasMetadata> {
* @return the initiated event source.
*/
EventSource initEventSource(EventSourceContext<P> context);

EventSource getEventSource();
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public UpdateControl<P> execute() throws Exception {
if (!exceptions.isEmpty()) {
throw new AggregatedOperatorException("One or more DependentResource(s) failed:\n" +
exceptions.stream()
.map(e -> "\t\t- " + e.getMessage())
.map(Controller.this::createExceptionInformation)
.collect(Collectors.joining("\n")),
exceptions);
}
Expand All @@ -215,6 +215,24 @@ public UpdateControl<P> execute() throws Exception {
});
}

private String createExceptionInformation(Exception e) {
final var exceptionLocation = Optional.ofNullable(e.getCause())
.map(Throwable::getStackTrace)
.filter(stackTrace -> stackTrace.length > 0)
.map(stackTrace -> {
int i = 0;
while (i < stackTrace.length) {
final var moduleName = stackTrace[i].getModuleName();
if (!"java.base".equals(moduleName)) {
return " at: " + stackTrace[i].toString();
}
i++;
}
return "";
});
return "\t\t- " + e.getMessage() + exceptionLocation.orElse("");
}

public void initAndRegisterEventSources(EventSourceContext<P> context) {
dependents.entrySet().stream()
.filter(drEntry -> drEntry.getValue() instanceof EventSourceProvider)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.javaoperatorsdk.operator.processing;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;

public interface ResourceOwner<R, P extends HasMetadata> {

/**
* Retrieves the resource type associated with this ResourceOwner
*
* @return the resource type associated with this ResourceOwner
*/
Class<R> resourceType();

/**
* Retrieves the resource associated with the specified primary one, returning the actual state of
* the resource. Typically, this state might come from a local cache, updated after
* reconciliation.
*
* @param primary the primary resource for which we want to retrieve the secondary resource
* @return an {@link Optional} containing the secondary resource or {@link Optional#empty()} if it
* doesn't exist
*/
Optional<R> getAssociatedResource(P primary);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter;
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand All @@ -31,14 +28,12 @@ public AbstractDependentResource() {

@Override
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
var maybeActual = getResource(primary);
var maybeActual = getAssociatedResource(primary);
if (creatable || updatable) {
if (maybeActual.isEmpty()) {
if (creatable) {
var desired = desired(primary, context);
log.info("Creating {} for primary {}", desired.getClass().getSimpleName(),
ResourceID.fromResource(primary));
log.debug("Creating dependent {} for primary {}", desired, primary);
logForOperation("Creating", primary, desired);
var createdResource = handleCreate(desired, primary, context);
return ReconcileResult.resourceCreated(createdResource);
}
Expand All @@ -48,9 +43,7 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) {
final var match = updater.match(actual, primary, context);
if (!match.matched()) {
final var desired = match.computedDesired().orElse(desired(primary, context));
log.info("Updating {} for primary {}", desired.getClass().getSimpleName(),
ResourceID.fromResource(primary));
log.debug("Updating dependent {} for primary {}", desired, primary);
logForOperation("Updating", primary, desired);
var updatedResource = handleUpdate(actual, desired, primary, context);
return ReconcileResult.resourceUpdated(updatedResource);
}
Expand All @@ -66,91 +59,47 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) {
return ReconcileResult.noOperation(maybeActual.orElse(null));
}

protected R handleCreate(R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R created = null;
try {
prepareEventFiltering(desired, resourceID);
created = creator.create(desired, primary, context);
cacheAfterCreate(resourceID, created);
return created;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(desired, resourceID, created);
throw e;
}
}

private void cleanupAfterEventFiltering(R desired, ResourceID resourceID, R created) {
if (isFilteringEventSource()) {
eventSourceAsRecentOperationEventFilter()
.cleanupOnCreateOrUpdateEventFiltering(resourceID, created);
}
}

private void cacheAfterCreate(ResourceID resourceID, R created) {
if (isRecentOperationCacheFiller()) {
eventSourceAsRecentOperationCacheFiller().handleRecentResourceCreate(resourceID, created);
}
private void logForOperation(String operation, P primary, R desired) {
final var desiredDesc = desired instanceof HasMetadata
? "'" + ((HasMetadata) desired).getMetadata().getName() + "' "
+ ((HasMetadata) desired).getKind()
: desired.getClass().getSimpleName();
log.info("{} {} for primary {}", operation, desiredDesc, ResourceID.fromResource(primary));
log.debug("{} dependent {} for primary {}", operation, desired, primary);
}

private void cacheAfterUpdate(R actual, ResourceID resourceID, R updated) {
if (isRecentOperationCacheFiller()) {
eventSourceAsRecentOperationCacheFiller().handleRecentResourceUpdate(resourceID, updated,
actual);
}
protected R handleCreate(R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R created = creator.create(desired, primary, context);
onCreated(resourceID, created);
return created;
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
if (isFilteringEventSource()) {
eventSourceAsRecentOperationEventFilter().prepareForCreateOrUpdateEventFiltering(resourceID,
desired);
}
}
/**
* Allows sub-classes to perform additional processing (e.g. caching) on the created resource if
* needed.
*
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
* newly created resource
* @param created the newly created resource
*/
protected abstract void onCreated(ResourceID primaryResourceId, R created);

/**
* Allows sub-classes to perform additional processing on the updated resource if needed.
*
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
* newly updated resource
* @param updated the updated resource
* @param actual the resource as it was before the update
*/
protected abstract void onUpdated(ResourceID primaryResourceId, R updated, R actual);

protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R updated = null;
try {
prepareEventFiltering(desired, resourceID);
updated = updater.update(actual, desired, primary, context);
cacheAfterUpdate(actual, resourceID, updated);
return updated;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(desired, resourceID, updated);
throw e;
}
}

@SuppressWarnings("unchecked")
private RecentOperationEventFilter<R> eventSourceAsRecentOperationEventFilter() {
return (RecentOperationEventFilter<R>) ((EventSourceProvider<P>) this).getEventSource();
}

@SuppressWarnings("unchecked")
private RecentOperationCacheFiller<R> eventSourceAsRecentOperationCacheFiller() {
return (RecentOperationCacheFiller<R>) ((EventSourceProvider<P>) this).getEventSource();
}

@SuppressWarnings("unchecked")
// this cannot be done in constructor since event source might be initialized later
protected boolean isFilteringEventSource() {
if (this instanceof EventSourceProvider) {
final var eventSource = ((EventSourceProvider<P>) this).getEventSource();
return eventSource instanceof RecentOperationEventFilter;
} else {
return false;
}
}

@SuppressWarnings("unchecked")
// this cannot be done in constructor since event source might be initialized later
protected boolean isRecentOperationCacheFiller() {
if (this instanceof EventSourceProvider) {
final var eventSource = ((EventSourceProvider<P>) this).getEventSource();
return eventSource instanceof RecentOperationCacheFiller;
} else {
return false;
}
R updated = updater.update(actual, desired, primary, context);
onUpdated(resourceID, updated, actual);
return updated;
}

protected R desired(P primary, Context<P> context) {
Expand Down
Loading