Skip to content

K8SPG-761: add support for the PGO_WORKERS env var #1135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/percona/percona-postgresql-operator/percona/controller/pgcluster"
"github.com/percona/percona-postgresql-operator/percona/controller/pgrestore"
perconaPGUpgrade "github.com/percona/percona-postgresql-operator/percona/controller/pgupgrade"
"github.com/percona/percona-postgresql-operator/percona/k8s"
perconaRuntime "github.com/percona/percona-postgresql-operator/percona/runtime"
"github.com/percona/percona-postgresql-operator/percona/utils/registry"
v2 "github.com/percona/percona-postgresql-operator/pkg/apis/pgv2.percona.com/v2"
Expand Down Expand Up @@ -111,15 +110,13 @@ func main() {
// deprecation warnings when using an older version of a resource for backwards compatibility).
rest.SetDefaultWarningHandler(rest.NoWarnings{})

namespaces, err := k8s.GetWatchNamespace()
options, err := initManager(ctx)
assertNoError(err)

mgr, err := perconaRuntime.CreateRuntimeManager(
namespaces,
cfg,
false,
false,
features,
options,
)
assertNoError(err)

Expand Down Expand Up @@ -260,8 +257,8 @@ func addControllersToManager(ctx context.Context, mgr manager.Manager) error {

//+kubebuilder:rbac:groups="coordination.k8s.io",resources="leases",verbs={get,create,update}

func initManager() (runtime.Options, error) {
log := logging.FromContext(context.Background())
func initManager(ctx context.Context) (runtime.Options, error) {
log := logging.FromContext(ctx)

options := runtime.Options{}
options.Cache.SyncPeriod = initialize.Pointer(time.Hour)
Expand All @@ -279,6 +276,10 @@ func initManager() (runtime.Options, error) {
options.LeaderElection = true
options.LeaderElectionID = lease
options.LeaderElectionNamespace = os.Getenv("PGO_NAMESPACE")
} else {
// K8SPG-761
options.LeaderElection = true
options.LeaderElectionID = perconaRuntime.ElectionID
}

// Check PGO_TARGET_NAMESPACE for backwards compatibility with
Expand Down
20 changes: 12 additions & 8 deletions cmd/postgres-operator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"reflect"
"testing"
"time"
Expand All @@ -14,8 +15,9 @@ import (
)

func TestInitManager(t *testing.T) {
ctx := context.Background()
t.Run("Defaults", func(t *testing.T) {
options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)

if assert.Check(t, options.Cache.SyncPeriod != nil) {
Expand All @@ -30,12 +32,14 @@ func TestInitManager(t *testing.T) {
})

assert.Assert(t, options.Cache.DefaultNamespaces == nil)
assert.Assert(t, options.LeaderElection == false)
assert.Assert(t, options.LeaderElection == true)

{
options.Cache.SyncPeriod = nil
options.Controller.GroupKindConcurrency = nil
options.HealthProbeBindAddress = ""
options.LeaderElection = false
options.LeaderElectionID = ""

assert.Assert(t, reflect.ValueOf(options).IsZero(),
"expected remaining fields to be unset:\n%+v", options)
Expand All @@ -48,7 +52,7 @@ func TestInitManager(t *testing.T) {
t.Run("Invalid", func(t *testing.T) {
t.Setenv("PGO_CONTROLLER_LEASE_NAME", "INVALID_NAME")

options, err := initManager()
options, err := initManager(ctx)
assert.ErrorContains(t, err, "PGO_CONTROLLER_LEASE_NAME")
assert.ErrorContains(t, err, "invalid")

Expand All @@ -59,7 +63,7 @@ func TestInitManager(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
t.Setenv("PGO_CONTROLLER_LEASE_NAME", "valid-name")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.Assert(t, options.LeaderElection == true)
assert.Equal(t, options.LeaderElectionNamespace, "test-namespace")
Expand All @@ -70,7 +74,7 @@ func TestInitManager(t *testing.T) {
t.Run("PGO_TARGET_NAMESPACE", func(t *testing.T) {
t.Setenv("PGO_TARGET_NAMESPACE", "some-such")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.Assert(t, cmp.Len(options.Cache.DefaultNamespaces, 1),
"expected only one configured namespace")
Expand All @@ -81,7 +85,7 @@ func TestInitManager(t *testing.T) {
t.Run("PGO_TARGET_NAMESPACES", func(t *testing.T) {
t.Setenv("PGO_TARGET_NAMESPACES", "some-such,another-one")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.Assert(t, cmp.Len(options.Cache.DefaultNamespaces, 2),
"expect two configured namespaces")
Expand All @@ -95,7 +99,7 @@ func TestInitManager(t *testing.T) {
for _, v := range []string{"-3", "0", "3.14"} {
t.Setenv("PGO_WORKERS", v)

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.DeepEqual(t, options.Controller.GroupKindConcurrency,
map[string]int{
Expand All @@ -107,7 +111,7 @@ func TestInitManager(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
t.Setenv("PGO_WORKERS", "19")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.DeepEqual(t, options.Controller.GroupKindConcurrency,
map[string]int{
Expand Down
2 changes: 2 additions & 0 deletions config/manager/default/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ spec:
value: INFO
- name: DISABLE_TELEMETRY
value: "false"
- name: PGO_WORKERS
value: "1"
ports:
- containerPort: 8080
name: metrics
Expand Down
2 changes: 2 additions & 0 deletions deploy/bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47675,6 +47675,8 @@ spec:
value: INFO
- name: DISABLE_TELEMETRY
value: "false"
- name: PGO_WORKERS
value: "1"
image: perconalab/percona-postgresql-operator:main
imagePullPolicy: Always
livenessProbe:
Expand Down
2 changes: 2 additions & 0 deletions deploy/cw-bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47673,6 +47673,8 @@ spec:
value: INFO
- name: DISABLE_TELEMETRY
value: "false"
- name: PGO_WORKERS
value: "1"
image: perconalab/percona-postgresql-operator:main
imagePullPolicy: Always
livenessProbe:
Expand Down
2 changes: 2 additions & 0 deletions deploy/cw-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ spec:
value: INFO
- name: DISABLE_TELEMETRY
value: "false"
- name: PGO_WORKERS
value: "1"
image: perconalab/percona-postgresql-operator:main
imagePullPolicy: Always
livenessProbe:
Expand Down
2 changes: 2 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
value: INFO
- name: DISABLE_TELEMETRY
value: "false"
- name: PGO_WORKERS
value: "1"
image: perconalab/percona-postgresql-operator:main
imagePullPolicy: Always
livenessProbe:
Expand Down
13 changes: 12 additions & 1 deletion percona/controller/pgcluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/md5" //nolint:gosec
"fmt"
"os"
"strconv"
"sync"
"time"
Expand All @@ -24,8 +25,10 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
metricsServer "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/percona/percona-postgresql-operator/internal/controller/postgrescluster"
internalRuntime "github.com/percona/percona-postgresql-operator/internal/controller/runtime"
"github.com/percona/percona-postgresql-operator/internal/feature"
"github.com/percona/percona-postgresql-operator/internal/naming"
perconaController "github.com/percona/percona-postgresql-operator/percona/controller"
Expand Down Expand Up @@ -539,7 +542,15 @@ var _ = Describe("Watching secrets", Ordered, func() {
Expect(err).NotTo(HaveOccurred())

Expect(err).To(Not(HaveOccurred()))
mgr, err := runtime.CreateRuntimeManager(namespace.Name, cfg, true, true, gate)

os.Setenv("PGO_TARGET_NAMESPACE", "")
mgr, err := runtime.CreateRuntimeManager(cfg, gate, internalRuntime.Options{
LeaderElection: false,
HealthProbeBindAddress: "0",
Metrics: metricsServer.Options{
BindAddress: "0",
},
})
Expect(err).To(Succeed())
Expect(v2.AddToScheme(mgr.GetScheme())).To(Succeed())

Expand Down
61 changes: 19 additions & 42 deletions percona/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,34 @@ import (
"time"

"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsServer "sigs.k8s.io/controller-runtime/pkg/metrics/server"

r "github.com/percona/percona-postgresql-operator/internal/controller/runtime"
"github.com/percona/percona-postgresql-operator/internal/feature"
"github.com/percona/percona-postgresql-operator/internal/initialize"
"github.com/percona/percona-postgresql-operator/percona/k8s"
)

// default refresh interval in minutes
var refreshInterval = 60 * time.Minute
const refreshInterval time.Duration = 60 * time.Minute

const electionID string = "08db3feb.percona.com"
const ElectionID string = "08db3feb.percona.com"

// CreateRuntimeManager does the same thing as `internal/controller/runtime.CreateRuntimeManager`,
// excet it configures the manager to watch multiple namespaces.
func CreateRuntimeManager(namespaces string, config *rest.Config, disableMetrics, disableLeaderElection bool, features feature.MutableGate) (manager.Manager, error) {

var leaderElectionID string
if !disableLeaderElection {
leaderElectionID = electionID
}

options := manager.Options{
Cache: cache.Options{
SyncPeriod: &refreshInterval,
},
Scheme: r.Scheme,
LeaderElection: !disableLeaderElection,
LeaderElectionID: leaderElectionID,
// CreateRuntimeManager wraps internal/controller/runtime.NewManager and modifies the given options:
// - Fully overwrites the Cache field
// - Sets Cache.SyncPeriod to refreshInterval const
// - Sets Cache.DefaultNamespaces by using k8s.GetWatchNamespace() split by ","
// - Sets BaseContext to include the provided feature gates
func CreateRuntimeManager(config *rest.Config, features feature.MutableGate, options manager.Options) (manager.Manager, error) {
namespaces, err := k8s.GetWatchNamespace()
if err != nil {
return nil, err
}

options.BaseContext = func() context.Context {
ctx := context.Background()
return feature.NewContext(ctx, features)
options.Cache = cache.Options{
SyncPeriod: initialize.Pointer(refreshInterval),
}

nn := strings.Split(namespaces, ",")
if len(nn) > 0 && nn[0] != "" {
namespaces := make(map[string]cache.Config)
Expand All @@ -52,24 +43,10 @@ func CreateRuntimeManager(namespaces string, config *rest.Config, disableMetrics
options.Cache.DefaultNamespaces = namespaces
}

if disableMetrics {
options.HealthProbeBindAddress = "0"
options.Metrics = metricsServer.Options{
BindAddress: "0",
}
}

// Create a copy of the config to avoid modifying the original
configCopy := rest.CopyConfig(config)

// Ensure throttling is disabled by setting a fake rate limiter
configCopy.RateLimiter = flowcontrol.NewFakeAlwaysRateLimiter()

// create controller runtime manager
mgr, err := manager.New(configCopy, options)
if err != nil {
return nil, err
options.BaseContext = func() context.Context {
ctx := context.Background()
return feature.NewContext(ctx, features)
}

return mgr, nil
return r.NewManager(config, options)
}
Loading