Skip to content

[FLINK-27714] Migrate to java-operator-sdk v3 #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2022

Conversation

morhidi
Copy link
Contributor

@morhidi morhidi commented May 23, 2022

Hi Folks, started looking at the challenges of a potential java-operator-sdk v3 migration. The main motivation for the upgrade is to enable:

  • Dynamic namespaces (without operator restart)
  • Embrace v3 features that simplifies the current logic (secondary resources / error handling / status patch / controller configs / etc.)

This PR contains basically the necessary changes for the v3 upgrade:

  • Using operator config overrides with built in ConfigurationServiceOverrider
  • Unify the EventSource implementations in a dedicated EventSourceUtility
  • The InformerManager based secondary resource lookup was replaced by the built-in functionality in the Operator
  • The InformerManager is still used in the Webhook, due to lack of framework support in that part
  • Minor changes to handle refactored functionality between v2 and v3
  • Minor unit test changes

cc @wangyang0918 @Aitozi @tweise Trying to create just a working version first without too many changes.

@morhidi morhidi force-pushed the FLINK-27714 branch 2 times, most recently from 78a7ccc to a048a58 Compare May 24, 2022 12:26
@morhidi
Copy link
Contributor Author

morhidi commented May 24, 2022

Getting the following issue intermittently on submitting the basic-session-job.yaml

2022-05-24 14:17:29,863 i.j.o.p.e.ReconciliationDispatcher [ERROR] [default.basic-session-job-example] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-session-job-example', namespace='default'}, version: 853952} failed.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://127.0.0.1:55618/apis/flink.apache.org/v1beta1/namespaces/default/flinksessionjobs/basic-session-job-example. Message: Operation cannot be fulfilled on flinksessionjobs.flink.apache.org "basic-session-job-example": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=flink.apache.org, kind=flinksessionjobs, name=basic-session-job-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on flinksessionjobs.flink.apache.org "basic-session-job-example": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}).
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:682)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:661)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:555)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:518)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:342)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:322)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:649)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:195)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:200)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:141)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:43)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher$CustomResourceFacade.replaceResourceWithLock(ReconciliationDispatcher.java:344)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.updateCustomResourceWithFinalizer(ReconciliationDispatcher.java:302)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:101)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:80)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:55)
	at io.javaoperatorsdk.operator.processing.event.EventProcessor$ControllerExecution.run(EventProcessor.java:356)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

@csviri suggested to remove the status initialization, but the issue still persist.

protected FlinkSessionJobStatus initStatus() {
        return new FlinkSessionJobStatus();
 }

@morhidi morhidi marked this pull request as ready for review May 24, 2022 14:10
@Aitozi
Copy link

Aitozi commented May 25, 2022

Thanks @morhidi for this nice work, I'll take a closer look today

@morhidi
Copy link
Contributor Author

morhidi commented May 25, 2022

fyi @Aitozi I run into an issue while testing with basic-checkpoint-ha.yaml. Once you restart the operator the Flink Session Jobs won't find their FlinkDeployment anymore. This is a bug in the java-operator-sdk. @csviri was kind enough to help me with the issue. Quickly identified the RC operator-framework/java-operator-sdk#1238 and already fixed it operator-framework/java-operator-sdk#1237. A patch version is coming soon.

@csviri
Copy link
Contributor

csviri commented May 25, 2022

Hi, thank you @morhidi for reporting this issue. On the main it's already fixed. After we merged an additional related improvement: https://github.com/java-operator-sdk/java-operator-sdk/pull/1239/files (mainly making the event source ordering more explicit and better unit tested)
going to release 3.0.1 version. Hopefully today or tomorrow.
Thx!

Copy link

@Aitozi Aitozi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit sorry for late response :) It looks very nice 👍🏻 Just left some minor comments.

Copy link

@Aitozi Aitozi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

@morhidi
Copy link
Contributor Author

morhidi commented May 27, 2022

Getting the following issue intermittently on submitting the basic-session-job.yaml

2022-05-24 14:17:29,863 i.j.o.p.e.ReconciliationDispatcher [ERROR] [default.basic-session-job-example] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-session-job-example', namespace='default'}, version: 853952} failed.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://127.0.0.1:55618/apis/flink.apache.org/v1beta1/namespaces/default/flinksessionjobs/basic-session-job-example. Message: Operation cannot be fulfilled on flinksessionjobs.flink.apache.org "basic-session-job-example": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=flink.apache.org, kind=flinksessionjobs, name=basic-session-job-example, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on flinksessionjobs.flink.apache.org "basic-session-job-example": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}).
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:682)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:661)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:555)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:518)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:342)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:322)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:649)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:195)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:200)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:141)
	at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:43)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher$CustomResourceFacade.replaceResourceWithLock(ReconciliationDispatcher.java:344)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.updateCustomResourceWithFinalizer(ReconciliationDispatcher.java:302)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:101)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:80)
	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:55)
	at io.javaoperatorsdk.operator.processing.event.EventProcessor$ControllerExecution.run(EventProcessor.java:356)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

@csviri suggested to remove the status initialization, but the issue still persist.

protected FlinkSessionJobStatus initStatus() {
        return new FlinkSessionJobStatus();
 }

There's a fix for this already operator-framework/java-operator-sdk#1244

@morhidi
Copy link
Contributor Author

morhidi commented May 27, 2022

We have identified a couple of additional corner cases with @csviri
operator-framework/java-operator-sdk#1243 <- fixed
operator-framework/java-operator-sdk#1245 <- expected to be fixed by operator-framework/java-operator-sdk#1233

@morhidi
Copy link
Contributor Author

morhidi commented May 27, 2022

None of the aforementioned issues are blockers, and the retry mechanism in JOSDK solves them under the hood, but we can wait for another patch/minor release before merging to be completely on the safe side. According to @csviri a new version of JOSDK containing these fixes is expected to be released within 1-2 days.

@gyfora
Copy link
Contributor

gyfora commented May 29, 2022

+1 for waiting 1-2 days for the JOSDK fixes before merging

@morhidi
Copy link
Contributor Author

morhidi commented May 30, 2022

Bumped the JOSDK version to 3.0.2

@morhidi
Copy link
Contributor Author

morhidi commented May 30, 2022

I haven't hit any issues while smoke testing. Thanks @csviri for quickly addressing our findings.

@csviri
Copy link
Contributor

csviri commented May 31, 2022

I haven't hit any issues while smoke testing. Thanks @csviri for quickly addressing our findings.

Thx for reporting, and cooperating on fix of these issues!

@gyfora gyfora merged commit d2f1d71 into apache:main May 31, 2022
pgrefviau pushed a commit to pgrefviau/flink-kubernetes-operator that referenced this pull request Jan 6, 2025
@W-15381639: attempt to remove platform
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants