Skip to content

[FLINK-27714] Migrate to java-operator-sdk v3 #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.informer.InformerManager;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.Observer;
Expand All @@ -50,16 +49,15 @@
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

/** Main Class for Flink native k8s operator. */
public class FlinkOperator {
Expand All @@ -69,34 +67,47 @@ public class FlinkOperator {

private final KubernetesClient client;
private final FlinkService flinkService;
private final ConfigurationService configurationService;
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
private final MetricGroup metricGroup;
private final InformerManager informerManager;

public FlinkOperator(@Nullable Configuration conf) {
this.client = new DefaultKubernetesClient();
this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
this.configurationService =
getConfigurationService(configManager.getOperatorConfiguration());
this.operator = new Operator(client, configurationService);
this.operator =
new Operator(
client,
getConfigurationServiceOverriderConsumer(
configManager.getOperatorConfiguration()));
this.flinkService = new FlinkService(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
this.metricGroup =
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
Set<String> watchedNamespaces =
configManager.getOperatorConfiguration().getWatchedNamespaces();
this.informerManager = new InformerManager(watchedNamespaces, client);
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
}

@VisibleForTesting
protected static Consumer<ConfigurationServiceOverrider>
getConfigurationServiceOverriderConsumer(
FlinkOperatorConfiguration operatorConfiguration) {
return overrider -> {
int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
};
}

private void registerDeploymentController() {
StatusHelper<FlinkDeploymentStatus> statusHelper = new StatusHelper<>(client);
ReconcilerFactory reconcilerFactory =
new ReconcilerFactory(client, flinkService, configManager, informerManager);
new ReconcilerFactory(client, flinkService, configManager);
ObserverFactory observerFactory =
new ObserverFactory(client, flinkService, configManager, statusHelper);

Expand All @@ -114,7 +125,6 @@ private void registerDeploymentController() {
new FlinkControllerConfig<>(
controller,
configManager.getOperatorConfiguration().getWatchedNamespaces());
controllerConfig.setConfigurationService(configurationService);
operator.register(controller, controllerConfig);
}

Expand All @@ -127,53 +137,26 @@ private void registerSessionJobController() {
FlinkSessionJobController controller =
new FlinkSessionJobController(
configManager,
client,
validators,
reconciler,
observer,
new MetricManager<>(metricGroup),
statusHelper,
informerManager);
statusHelper);

FlinkControllerConfig<FlinkSessionJob> controllerConfig =
new FlinkControllerConfig<>(
controller,
configManager.getOperatorConfiguration().getWatchedNamespaces());

controllerConfig.setConfigurationService(configurationService);
operator.register(controller, controllerConfig);
}

private ConfigurationService getConfigurationService(
FlinkOperatorConfiguration operatorConfiguration) {

ConfigurationServiceOverrider configOverrider =
new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
.checkingCRDAndValidateLocalModel(false);

int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread pool.");
configOverrider = configOverrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
configOverrider = configOverrider.withConcurrentReconciliationThreads(parallelism);
}
return configOverrider.build();
}

public void run() {
registerDeploymentController();
registerSessionJobController();
operator.installShutdownHook();
operator.start();
}

@VisibleForTesting
protected Operator getOperator() {
return operator;
}

public static void main(String... args) {
EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Operator", args);
new FlinkOperator(null).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;

import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class FlinkConfigManager {
private final AtomicLong defaultConfigVersion = new AtomicLong(0);

private final LoadingCache<Key, Configuration> cache;
private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
private final Set<String> namespaces = EnvUtils.getWatchedNamespaces();

public FlinkConfigManager() {
this(GlobalConfiguration.loadConfiguration());
Expand Down Expand Up @@ -188,13 +188,6 @@ private void scheduleConfigWatcher(ScheduledExecutorService executorService) {
LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
}

@VisibleForTesting
public void setWatchedNamespaces(Set<String> namespaces) {
this.namespaces = namespaces;
operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
}

@VisibleForTesting
protected Cache<Key, Configuration> getCache() {
return cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration;
import io.javaoperatorsdk.operator.config.runtime.AnnotationControllerConfiguration;

import java.util.Set;

/** Custom config for {@link FlinkDeploymentController}. */
public class FlinkControllerConfig<CR extends HasMetadata> extends AnnotationConfiguration<CR> {
public class FlinkControllerConfig<CR extends HasMetadata>
extends AnnotationControllerConfiguration<CR> {

private final Set<String> watchedNamespaces;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.kubernetes.operator.controller;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
Expand All @@ -30,37 +29,38 @@
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/** Controller that runs the main reconcile loop for Flink deployments. */
@ControllerConfiguration
@ControllerConfiguration()
public class FlinkDeploymentController
implements Reconciler<FlinkDeployment>,
ErrorStatusHandler<FlinkDeployment>,
EventSourceInitializer<FlinkDeployment> {
EventSourceInitializer<FlinkDeployment>,
Cleaner<FlinkDeployment> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);

private final FlinkConfigManager configManager;
Expand All @@ -71,7 +71,6 @@ public class FlinkDeploymentController
private final ObserverFactory observerFactory;
private final MetricManager<FlinkDeployment> metricManager;
private final StatusHelper<FlinkDeploymentStatus> statusHelper;
private Set<String> effectiveNamespaces;
private final ConcurrentHashMap<Tuple2<String, String>, FlinkDeploymentStatus> statusCache =
new ConcurrentHashMap<>();

Expand All @@ -90,7 +89,6 @@ public FlinkDeploymentController(
this.observerFactory = observerFactory;
this.metricManager = metricManager;
this.statusHelper = statusHelper;
this.effectiveNamespaces = configManager.getOperatorConfiguration().getWatchedNamespaces();
}

@Override
Expand All @@ -108,7 +106,8 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
}

@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)
throws Exception {
LOG.info("Starting reconciliation");
statusHelper.updateStatusFromCache(flinkApp);
FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
Expand Down Expand Up @@ -152,26 +151,18 @@ private void handleDeploymentFailed(FlinkDeployment flinkApp, DeploymentFailedEx
}

@Override
public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
if (effectiveNamespaces.isEmpty()) {
return List.of(OperatorUtils.createJmDepInformerEventSource(kubernetesClient));
} else {
return effectiveNamespaces.stream()
.map(ns -> OperatorUtils.createJmDepInformerEventSource(kubernetesClient, ns))
.collect(Collectors.toList());
}
}

@VisibleForTesting
public void setEffectiveNamespaces(Set<String> effectiveNamespaces) {
this.effectiveNamespaces = effectiveNamespaces;
public Map<String, EventSource> prepareEventSources(
EventSourceContext<FlinkDeployment> context) {
return EventSourceInitializer.nameEventSources(
EventSourceUtils.getSessionJobInformerEventSource(context),
EventSourceUtils.getDeploymentInformerEventSource(context));
}

@Override
public Optional<FlinkDeployment> updateErrorStatus(
FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) {
return ReconciliationUtils.updateErrorStatus(
flinkApp, retryInfo, e, metricManager, statusHelper);
public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) {
return ReconciliationUtils.toErrorStatusUpdateControl(
flinkDeployment, context.getRetryInfo(), e, metricManager, statusHelper);
}

private boolean validateDeployment(FlinkDeployment deployment) {
Expand Down
Loading