Skip to content

Commit 8a06367

Browse files
committed
tidyup
1 parent f80a9d0 commit 8a06367

11 files changed

+295
-342
lines changed

controllers/parameters/auto_reload_policy.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func init() {
2929

3030
type autoReloadPolicy struct{}
3131

32-
func (receiver autoReloadPolicy) Upgrade(params reconfigureContext) (returnedStatus, error) {
33-
_ = params
32+
func (receiver autoReloadPolicy) Upgrade(reconfigureContext) (returnedStatus, error) {
3433
return makeReturnedStatus(ESNone), nil
3534
}

controllers/parameters/combine_upgrade_policy.go renamed to controllers/parameters/combined_policy.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,31 @@ import (
2323
parametersv1alpha1 "github.com/apecloud/kubeblocks/apis/parameters/v1alpha1"
2424
)
2525

26-
var combineUpgradePolicyInstance = &combineUpgradePolicy{
27-
policyExecutors: []reconfigurePolicy{
28-
syncPolicyInstance,
29-
restartPolicyInstance,
30-
},
26+
func init() {
27+
registerPolicy(parametersv1alpha1.DynamicReloadAndRestartPolicy, combinedPolicyInst)
3128
}
3229

33-
type combineUpgradePolicy struct {
34-
policyExecutors []reconfigurePolicy
30+
var combinedPolicyInst = &combinedPolicy{
31+
policies: []reconfigurePolicy{
32+
syncPolicyInst,
33+
restartPolicyInst,
34+
},
3535
}
3636

37-
func init() {
38-
registerPolicy(parametersv1alpha1.DynamicReloadAndRestartPolicy, combineUpgradePolicyInstance)
37+
type combinedPolicy struct {
38+
policies []reconfigurePolicy
3939
}
4040

41-
func (h *combineUpgradePolicy) Upgrade(rctx reconfigureContext) (returnedStatus, error) {
42-
var ret returnedStatus
43-
for _, executor := range h.policyExecutors {
44-
retStatus, err := executor.Upgrade(rctx)
41+
func (h *combinedPolicy) Upgrade(rctx reconfigureContext) (returnedStatus, error) {
42+
var (
43+
status returnedStatus
44+
err error
45+
)
46+
for _, policy := range h.policies {
47+
status, err = policy.Upgrade(rctx)
4548
if err != nil {
46-
return retStatus, err
49+
return status, err
4750
}
48-
ret = retStatus
4951
}
50-
return ret, nil
52+
return status, nil
5153
}

controllers/parameters/combine_upgrade_policy_test.go renamed to controllers/parameters/combined_policy_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ var _ = Describe("Reconfigure CombineSyncPolicy", func() {
4646
Context("combine reconfigure policy test", func() {
4747
It("Should success without error", func() {
4848
By("check normal policy name")
49-
testPolicyExecs := &combineUpgradePolicy{
50-
policyExecutors: []reconfigurePolicy{&testPolicy{}},
49+
testPolicyExecs := &combinedPolicy{
50+
policies: []reconfigurePolicy{&testPolicy{}},
5151
}
5252

53-
Expect(upgradePolicyMap[parametersv1alpha1.DynamicReloadAndRestartPolicy]).ShouldNot(BeNil())
53+
Expect(reconfigurePolicyMap[parametersv1alpha1.DynamicReloadAndRestartPolicy]).ShouldNot(BeNil())
5454

5555
mockParam := newMockReconfigureParams("restartPolicy", k8sMockClient.Client(),
5656
withConfigSpec("for_test", map[string]string{
@@ -65,8 +65,8 @@ var _ = Describe("Reconfigure CombineSyncPolicy", func() {
6565

6666
It("Should success without error", func() {
6767
By("check failed policy name")
68-
testPolicyExecs := &combineUpgradePolicy{
69-
policyExecutors: []reconfigurePolicy{&testErrorPolicy{}},
68+
testPolicyExecs := &combinedPolicy{
69+
policies: []reconfigurePolicy{&testErrorPolicy{}},
7070
}
7171

7272
mockParam := newMockReconfigureParams("restartPolicy", k8sMockClient.Client(),

controllers/parameters/reconfigure_controller.go

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (r *ReconfigureReconciler) sync(reqCtx intctrlutil.RequestCtx, configMap *c
214214
configPatch.UpdateConfig))
215215
}
216216

217-
tasks, err := buildReconfigureTasks(configSpec, rctx, configPatch, forceRestart)
217+
tasks, err := r.buildReconfigureTasks(configSpec, rctx, configPatch, forceRestart)
218218
if err != nil {
219219
return intctrlutil.RequeueWithErrorAndRecordEvent(configMap, r.Recorder, err, reqCtx.Log)
220220
}
@@ -296,3 +296,121 @@ func (r *ReconfigureReconciler) succeed(rctx *ReconcileContext, reloadType strin
296296
result := reconciled(status, reloadType, parametersv1alpha1.CFinishedPhase)
297297
return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, reloadType, &result)
298298
}
299+
300+
type reconfigureTask struct {
301+
policy parametersv1alpha1.ReloadPolicy
302+
taskCtx reconfigureContext
303+
}
304+
305+
func (r reconfigureTask) reconfigure() (returnedStatus, error) {
306+
if executor, ok := reconfigurePolicyMap[r.policy]; ok {
307+
return executor.Upgrade(r.taskCtx)
308+
}
309+
return returnedStatus{}, fmt.Errorf("unknown reconfigure policy: %s", r.policy)
310+
}
311+
312+
func (r *ReconfigureReconciler) buildReconfigureTasks(templateSpec *appsv1.ComponentFileTemplate, rctx *ReconcileContext, patch *cfgcore.ConfigPatchInfo, restart bool) ([]reconfigureTask, error) {
313+
var tasks []reconfigureTask
314+
315+
// If the patch or ConfigRender is nil, return a single restart task.
316+
if patch == nil || rctx.ConfigRender == nil {
317+
return []reconfigureTask{r.buildRestartTask(templateSpec, rctx)}, nil
318+
}
319+
320+
// needReloadAction determines if a reload action is needed based on the ParametersDefinition and ReloadPolicy.
321+
needReloadAction := func(pd *parametersv1alpha1.ParametersDefinition, policy parametersv1alpha1.ReloadPolicy) bool {
322+
return !restart || (policy == parametersv1alpha1.SyncDynamicReloadPolicy && parameters.NeedDynamicReloadAction(&pd.Spec))
323+
}
324+
325+
for key, jsonPatch := range patch.UpdateConfig {
326+
pd, ok := rctx.ParametersDefs[key]
327+
// If the ParametersDefinition or its ReloadAction is nil, continue to the next iteration.
328+
if !ok || pd.Spec.ReloadAction == nil {
329+
continue
330+
}
331+
configFormat := parameters.GetComponentConfigDescription(&rctx.ConfigRender.Spec, key)
332+
if configFormat == nil || configFormat.FileFormatConfig == nil {
333+
continue
334+
}
335+
// Determine the appropriate ReloadPolicy.
336+
policy, err := r.resolveReconfigurePolicy(string(jsonPatch), configFormat.FileFormatConfig, &pd.Spec)
337+
if err != nil {
338+
return nil, err
339+
}
340+
// If a reload action is needed, append a new reload action task to the tasks slice.
341+
if needReloadAction(pd, policy) {
342+
tasks = append(tasks, r.buildReloadTask(policy, templateSpec, rctx, pd, configFormat, patch))
343+
}
344+
}
345+
346+
// If no tasks were added, return a single restart task.
347+
if len(tasks) == 0 {
348+
return []reconfigureTask{r.buildRestartTask(templateSpec, rctx)}, nil
349+
}
350+
351+
return tasks, nil
352+
}
353+
354+
func (r *ReconfigureReconciler) buildReloadTask(policy parametersv1alpha1.ReloadPolicy,
355+
templateSpec *appsv1.ComponentFileTemplate,
356+
rctx *ReconcileContext,
357+
pd *parametersv1alpha1.ParametersDefinition,
358+
configDescription *parametersv1alpha1.ComponentConfigDescription,
359+
patch *cfgcore.ConfigPatchInfo) reconfigureTask {
360+
return reconfigureTask{
361+
policy: policy,
362+
taskCtx: reconfigureContext{
363+
RequestCtx: rctx.RequestCtx,
364+
Client: rctx.Client,
365+
ConfigTemplate: *templateSpec,
366+
VersionHash: computeTargetVersionHash(rctx.RequestCtx, rctx.ConfigMap.Data),
367+
ParametersDef: &pd.Spec,
368+
ConfigDescription: configDescription,
369+
Cluster: rctx.ClusterObj,
370+
ClusterComponent: rctx.ClusterComObj,
371+
SynthesizedComponent: rctx.BuiltinComponent,
372+
its: rctx.its,
373+
Patch: patch,
374+
},
375+
}
376+
}
377+
378+
func (r *ReconfigureReconciler) buildRestartTask(configTemplate *appsv1.ComponentFileTemplate, rctx *ReconcileContext) reconfigureTask {
379+
return reconfigureTask{
380+
policy: parametersv1alpha1.RestartPolicy,
381+
taskCtx: reconfigureContext{
382+
RequestCtx: rctx.RequestCtx,
383+
Client: rctx.Client,
384+
ConfigTemplate: *configTemplate,
385+
VersionHash: computeTargetVersionHash(rctx.RequestCtx, rctx.ConfigMap.Data),
386+
ClusterComponent: rctx.ClusterComObj,
387+
Cluster: rctx.ClusterObj,
388+
SynthesizedComponent: rctx.BuiltinComponent,
389+
its: rctx.its,
390+
},
391+
}
392+
}
393+
394+
func (r *ReconfigureReconciler) resolveReconfigurePolicy(jsonPatch string, format *parametersv1alpha1.FileFormatConfig,
395+
pd *parametersv1alpha1.ParametersDefinitionSpec) (parametersv1alpha1.ReloadPolicy, error) {
396+
var policy = parametersv1alpha1.NonePolicy
397+
dynamicUpdate, err := cfgcore.CheckUpdateDynamicParameters(format, pd, jsonPatch)
398+
if err != nil {
399+
return policy, err
400+
}
401+
402+
// make decision
403+
switch {
404+
case !dynamicUpdate && parameters.NeedDynamicReloadAction(pd): // static parameters update and need to do hot update
405+
policy = parametersv1alpha1.DynamicReloadAndRestartPolicy
406+
case !dynamicUpdate: // static parameters update and only need to restart
407+
policy = parametersv1alpha1.RestartPolicy
408+
case parameters.IsAutoReload(pd.ReloadAction): // if core support hot update, don't need to do anything
409+
policy = parametersv1alpha1.AsyncDynamicReloadPolicy
410+
case enableSyncTrigger(pd.ReloadAction): // sync config-manager exec hot update
411+
policy = parametersv1alpha1.SyncDynamicReloadPolicy
412+
default: // config-manager auto trigger to hot update
413+
policy = parametersv1alpha1.AsyncDynamicReloadPolicy
414+
}
415+
return policy, nil
416+
}

controllers/parameters/reconfigure_controller_test.go

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020
package parameters
2121

2222
import (
23+
"testing"
24+
2325
. "github.com/onsi/ginkgo/v2"
2426
. "github.com/onsi/gomega"
2527

28+
"github.com/stretchr/testify/assert"
2629
corev1 "k8s.io/api/core/v1"
30+
"k8s.io/utils/ptr"
2731
"sigs.k8s.io/controller-runtime/pkg/client"
2832

33+
parametersv1alpha1 "github.com/apecloud/kubeblocks/apis/parameters/v1alpha1"
2934
"github.com/apecloud/kubeblocks/pkg/constant"
3035
"github.com/apecloud/kubeblocks/pkg/parameters/core"
3136
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
@@ -132,5 +137,129 @@ var _ = Describe("Reconfigure Controller", func() {
132137
}).Should(Succeed())
133138
})
134139
})
135-
136140
})
141+
142+
func Test_resolveReloadActionPolicy(t *testing.T) {
143+
type args struct {
144+
jsonPatch string
145+
format *parametersv1alpha1.FileFormatConfig
146+
pd *parametersv1alpha1.ParametersDefinitionSpec
147+
}
148+
tests := []struct {
149+
name string
150+
args args
151+
want parametersv1alpha1.ReloadPolicy
152+
wantErr bool
153+
}{{
154+
name: "restart policy",
155+
args: args{
156+
jsonPatch: `{"static1": "value1"}`,
157+
format: &parametersv1alpha1.FileFormatConfig{
158+
Format: parametersv1alpha1.JSON,
159+
},
160+
pd: &parametersv1alpha1.ParametersDefinitionSpec{
161+
StaticParameters: []string{
162+
"static1",
163+
"static2",
164+
},
165+
DynamicParameters: []string{
166+
"dynamic1",
167+
"dynamic2",
168+
},
169+
ReloadAction: &parametersv1alpha1.ReloadAction{
170+
ShellTrigger: &parametersv1alpha1.ShellTrigger{
171+
Command: []string{"/bin/true"},
172+
},
173+
},
174+
},
175+
},
176+
want: parametersv1alpha1.RestartPolicy,
177+
}, {
178+
name: "restart and reload policy",
179+
args: args{
180+
jsonPatch: `{"static1": "value1"}`,
181+
format: &parametersv1alpha1.FileFormatConfig{
182+
Format: parametersv1alpha1.JSON,
183+
},
184+
pd: &parametersv1alpha1.ParametersDefinitionSpec{
185+
ReloadAction: &parametersv1alpha1.ReloadAction{
186+
ShellTrigger: &parametersv1alpha1.ShellTrigger{
187+
Command: []string{"/bin/true"},
188+
},
189+
},
190+
MergeReloadAndRestart: ptr.To(false),
191+
},
192+
},
193+
want: parametersv1alpha1.DynamicReloadAndRestartPolicy,
194+
}, {
195+
name: "hot update policy",
196+
args: args{
197+
jsonPatch: `{"dynamic1": "value1"}`,
198+
format: &parametersv1alpha1.FileFormatConfig{
199+
Format: parametersv1alpha1.JSON,
200+
},
201+
pd: &parametersv1alpha1.ParametersDefinitionSpec{
202+
ReloadAction: &parametersv1alpha1.ReloadAction{
203+
AutoTrigger: &parametersv1alpha1.AutoTrigger{},
204+
},
205+
DynamicParameters: []string{
206+
"dynamic1",
207+
"dynamic2",
208+
},
209+
},
210+
},
211+
want: parametersv1alpha1.AsyncDynamicReloadPolicy,
212+
}, {
213+
name: "sync reload policy",
214+
args: args{
215+
jsonPatch: `{"dynamic1": "value1"}`,
216+
format: &parametersv1alpha1.FileFormatConfig{
217+
Format: parametersv1alpha1.JSON,
218+
},
219+
pd: &parametersv1alpha1.ParametersDefinitionSpec{
220+
ReloadAction: &parametersv1alpha1.ReloadAction{
221+
ShellTrigger: &parametersv1alpha1.ShellTrigger{
222+
Command: []string{"/bin/true"},
223+
Sync: ptr.To(true),
224+
},
225+
},
226+
DynamicParameters: []string{
227+
"dynamic1",
228+
"dynamic2",
229+
},
230+
},
231+
},
232+
want: parametersv1alpha1.SyncDynamicReloadPolicy,
233+
}, {
234+
name: "async reload policy",
235+
args: args{
236+
jsonPatch: `{"dynamic1": "value1"}`,
237+
format: &parametersv1alpha1.FileFormatConfig{
238+
Format: parametersv1alpha1.JSON,
239+
},
240+
pd: &parametersv1alpha1.ParametersDefinitionSpec{
241+
ReloadAction: &parametersv1alpha1.ReloadAction{
242+
ShellTrigger: &parametersv1alpha1.ShellTrigger{
243+
Command: []string{"/bin/true"},
244+
Sync: ptr.To(false),
245+
},
246+
},
247+
DynamicParameters: []string{
248+
"dynamic1",
249+
"dynamic2",
250+
},
251+
},
252+
},
253+
want: parametersv1alpha1.AsyncDynamicReloadPolicy,
254+
}}
255+
for _, tt := range tests {
256+
t.Run(tt.name, func(t *testing.T) {
257+
rr := &ReconfigureReconciler{}
258+
got, err := rr.resolveReconfigurePolicy(tt.args.jsonPatch, tt.args.format, tt.args.pd)
259+
if (err != nil) != tt.wantErr {
260+
t.Errorf("resolveReloadActionPolicy(%v, %v, %v)", tt.args.jsonPatch, tt.args.format, tt.args.pd)
261+
}
262+
assert.Equalf(t, tt.want, got, "resolveReloadActionPolicy(%v, %v, %v)", tt.args.jsonPatch, tt.args.format, tt.args.pd)
263+
})
264+
}
265+
}

0 commit comments

Comments
 (0)