Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 346 additions & 2 deletions controllers/apps/component/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

// componentWorkloadTransformer handles component workload generation
Expand Down Expand Up @@ -167,7 +168,7 @@ func (t *componentWorkloadTransformer) handleUpdate(transCtx *componentTransform
}
}

objCopy := copyAndMergeITS(runningITS, protoITS)
objCopy := copyAndMergeITS(runningITS, protoITS, legacyConfigManagerRequired(comp))
if objCopy != nil {
cli.Update(dag, nil, objCopy, &model.ReplaceIfExistingOption{})
// make sure the workload is updated after the env CM
Expand Down Expand Up @@ -220,7 +221,7 @@ func (t *componentWorkloadTransformer) handleWorkloadUpdate(transCtx *componentT
// copyAndMergeITS merges two ITS objects for updating:
// 1. new an object targetObj by copying from oldObj
// 2. merge all fields can be updated from newObj into targetObj
func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet) *workloads.InstanceSet {
func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet, legacyConfigManagerPolicy legacyConfigManagerPolicy) *workloads.InstanceSet {
itsObjCopy := oldITS.DeepCopy()
itsProto := newITS

Expand All @@ -241,6 +242,9 @@ func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet) *workloads.InstanceS
podTemplateCopy.Annotations = itsObjCopy.Spec.Template.Annotations

itsObjCopy.Spec.Template = podTemplateCopy
// Preserve the legacy config-manager only for existing workloads that still have it in their live template.
// This avoids an upgrade-only template diff from forcing all old Pods to restart after config-manager moved to kbagent.
preserveLegacyConfigManagerPodSpec(oldITS, itsProto, itsObjCopy, legacyConfigManagerPolicy)
itsObjCopy.Spec.Replicas = itsProto.Spec.Replicas
itsObjCopy.Spec.Roles = itsProto.Spec.Roles
itsObjCopy.Spec.LifecycleActions = itsProto.Spec.LifecycleActions
Expand Down Expand Up @@ -289,6 +293,346 @@ func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet) *workloads.InstanceS
return itsObjCopy
}

const (
legacyConfigManagerContainerName = "config-manager"
legacyConfigManagerToolsInitName = "install-config-manager-tool"
legacyConfigManagerToolsVolumeName = "kb-tools"
)

type legacyConfigManagerPolicy string

const (
legacyConfigManagerPolicyKeep legacyConfigManagerPolicy = "keep"
legacyConfigManagerPolicyCleanup legacyConfigManagerPolicy = "cleanup"
)

func preserveLegacyConfigManagerPodSpec(oldITS, desiredITS, mergedITS *workloads.InstanceSet, legacyPolicy legacyConfigManagerPolicy) {
if oldITS == nil || desiredITS == nil || mergedITS == nil {
return
}
oldSpec := &oldITS.Spec.Template.Spec
newSpec := &mergedITS.Spec.Template.Spec
_, oldCfg := intctrlutil.GetContainerByName(oldSpec.Containers, legacyConfigManagerContainerName)
if oldCfg == nil {
return
}
// The compatibility marker is owned by upstream parameters logic. Existing workloads should keep the
// legacy config-manager unless upstream has explicitly marked cleanup as safe.
if legacyPolicy == legacyConfigManagerPolicyCleanup && shouldCleanupLegacyConfigManager(oldITS, desiredITS) {
return
}
newSpec.Containers = preserveLegacyContainerOrder(oldSpec.Containers, newSpec.Containers, legacyConfigManagerContainerName, oldCfg)

var oldInit *corev1.Container
if _, initContainer := intctrlutil.GetContainerByName(oldSpec.InitContainers, legacyConfigManagerToolsInitName); initContainer != nil {
oldInit = initContainer
newSpec.InitContainers = preserveLegacyContainerOrder(oldSpec.InitContainers, newSpec.InitContainers, legacyConfigManagerToolsInitName, initContainer)
}

volumeNames := collectVolumeNamesFromContainers(oldCfg, oldInit)
newSpec.Volumes = preserveLegacyVolumeOrder(oldSpec.Volumes, newSpec.Volumes, volumeNames)
if _, ok := volumeNames[legacyConfigManagerToolsVolumeName]; ok {
mergeVolumeMountsByVolumeName(oldSpec.Containers, &newSpec.Containers, legacyConfigManagerToolsVolumeName)
mergeVolumeMountsByVolumeName(oldSpec.InitContainers, &newSpec.InitContainers, legacyConfigManagerToolsVolumeName)
}
}

func legacyConfigManagerRequired(comp *appsv1.Component) legacyConfigManagerPolicy {
if comp == nil || comp.Annotations == nil {
return legacyConfigManagerPolicyKeep
}
value, ok := comp.Annotations[constant.LegacyConfigManagerRequiredAnnotationKey]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This annotaion will be set by user?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not intended to be user-managed. It is written by the parameters controller on the Cluster and then inherited by the Component as an internal compatibility marker.

if !ok {
return legacyConfigManagerPolicyKeep
}
if value == "false" {
return legacyConfigManagerPolicyCleanup
}
return legacyConfigManagerPolicyKeep
}

func shouldCleanupLegacyConfigManager(oldITS, desiredITS *workloads.InstanceSet) bool {
oldTemplate := stripLegacyConfigManagerPodTemplate(oldITS.Spec.Template)
newTemplate := stripLegacyConfigManagerPodTemplate(desiredITS.Spec.Template)
if reflect.DeepEqual(oldTemplate, newTemplate) {
return false
}
// Container list or init container changes are evaluated by InstanceSet as pod upgrade changes.
// Clean up only when the configured policy will recreate Pods. Otherwise we keep the live template aligned
// with old Pods that still carry the legacy sidecar and avoid creating an extra rollout only for cleanup.
if hasPodUpgradeTemplateChanges(oldTemplate, newTemplate) {
return desiredITS.Spec.PodUpgradePolicy == appsv1.ReCreatePodUpdatePolicyType
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about PreferInPlace policy? If ITS controller decides to recreate pod, then we should also clean up config manager.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. PreferInPlace is only a preference, not a guarantee.

The cleanup logic has been updated to follow whether this rollout would actually recreate Pods, rather than matching the policy name literally. So if ITS still ends up recreating Pods under PreferInPlace, legacy config-manager will be cleaned up in the same rollout; otherwise it will be preserved.

}
// Resource-only updates may still be in-place when vertical scaling is enabled; otherwise they recreate Pods.
if hasPodResourceChanges(oldTemplate.Spec, newTemplate.Spec) {
return !viper.GetBool(constant.FeatureGateInPlacePodVerticalScaling)
}
// Annotation/label/toleration style changes follow the regular pod update policy.
if hasPodUpdateTemplateChanges(oldTemplate, newTemplate) {
return desiredITS.Spec.PodUpdatePolicy == appsv1.ReCreatePodUpdatePolicyType
}
return false
}

func stripLegacyConfigManagerPodTemplate(template corev1.PodTemplateSpec) corev1.PodTemplateSpec {
template = *template.DeepCopy()
volumeNames := map[string]struct{}{}
template.Spec.Containers = filterLegacyContainers(template.Spec.Containers, volumeNames, legacyConfigManagerContainerName)
template.Spec.InitContainers = filterLegacyContainers(template.Spec.InitContainers, volumeNames, legacyConfigManagerToolsInitName)
template.Spec.Containers = filterLegacyVolumeMounts(template.Spec.Containers, volumeNames)
template.Spec.InitContainers = filterLegacyVolumeMounts(template.Spec.InitContainers, volumeNames)
template.Spec.Volumes = filterLegacyVolumes(template.Spec.Volumes, volumeNames)
return template
}

func hasPodUpgradeTemplateChanges(oldTemplate, newTemplate corev1.PodTemplateSpec) bool {
oldTemplate = *oldTemplate.DeepCopy()
newTemplate = *newTemplate.DeepCopy()
oldTemplate.Annotations = nil
oldTemplate.Labels = nil
newTemplate.Annotations = nil
newTemplate.Labels = nil
oldTemplate.Spec.ActiveDeadlineSeconds = nil
newTemplate.Spec.ActiveDeadlineSeconds = nil
oldTemplate.Spec.Tolerations = nil
newTemplate.Spec.Tolerations = nil
clearContainerResources(oldTemplate.Spec.Containers)
clearContainerResources(oldTemplate.Spec.InitContainers)
clearContainerResources(newTemplate.Spec.Containers)
clearContainerResources(newTemplate.Spec.InitContainers)
return !reflect.DeepEqual(oldTemplate.Spec, newTemplate.Spec)
}

func hasPodResourceChanges(oldSpec, newSpec corev1.PodSpec) bool {
return !reflect.DeepEqual(getContainerResources(oldSpec.Containers), getContainerResources(newSpec.Containers)) ||
!reflect.DeepEqual(getContainerResources(oldSpec.InitContainers), getContainerResources(newSpec.InitContainers))
}

func hasPodUpdateTemplateChanges(oldTemplate, newTemplate corev1.PodTemplateSpec) bool {
if !reflect.DeepEqual(oldTemplate.Annotations, newTemplate.Annotations) {
return true
}
if !reflect.DeepEqual(oldTemplate.Labels, newTemplate.Labels) {
return true
}
if !reflect.DeepEqual(oldTemplate.Spec.ActiveDeadlineSeconds, newTemplate.Spec.ActiveDeadlineSeconds) {
return true
}
return !reflect.DeepEqual(oldTemplate.Spec.Tolerations, newTemplate.Spec.Tolerations)
}

func filterLegacyContainers(containers []corev1.Container, volumeNames map[string]struct{}, legacyContainerName string) []corev1.Container {
filtered := make([]corev1.Container, 0, len(containers))
for _, container := range containers {
if container.Name == legacyContainerName {
for _, mount := range container.VolumeMounts {
volumeNames[mount.Name] = struct{}{}
}
continue
}
filtered = append(filtered, container)
}
return filtered
}

func filterLegacyVolumeMounts(containers []corev1.Container, volumeNames map[string]struct{}) []corev1.Container {
for i := range containers {
mounts := containers[i].VolumeMounts[:0]
for _, mount := range containers[i].VolumeMounts {
if _, ok := volumeNames[mount.Name]; ok {
continue
}
mounts = append(mounts, mount)
}
containers[i].VolumeMounts = mounts
}
return containers
}

func filterLegacyVolumes(volumes []corev1.Volume, volumeNames map[string]struct{}) []corev1.Volume {
filtered := make([]corev1.Volume, 0, len(volumes))
for _, volume := range volumes {
if _, ok := volumeNames[volume.Name]; ok {
continue
}
filtered = append(filtered, volume)
}
return filtered
}

func clearContainerResources(containers []corev1.Container) {
for i := range containers {
containers[i].Resources = corev1.ResourceRequirements{}
}
}

func getContainerResources(containers []corev1.Container) map[string]corev1.ResourceRequirements {
resources := make(map[string]corev1.ResourceRequirements, len(containers))
for _, container := range containers {
resources[container.Name] = container.Resources
}
return resources
}

func collectVolumeNamesFromContainers(containers ...*corev1.Container) map[string]struct{} {
volumeNames := map[string]struct{}{}
for _, container := range containers {
if container == nil {
continue
}
for _, mount := range container.VolumeMounts {
volumeNames[mount.Name] = struct{}{}
}
}
return volumeNames
}

func preserveLegacyContainerOrder(oldContainers, newContainers []corev1.Container, legacyName string, legacyContainer *corev1.Container) []corev1.Container {
if legacyContainer == nil {
return newContainers
}
if _, container := intctrlutil.GetContainerByName(newContainers, legacyName); container != nil {
return newContainers
}

newContainerMap := make(map[string]corev1.Container, len(newContainers))
used := make(map[string]struct{}, len(newContainers))
for _, container := range newContainers {
newContainerMap[container.Name] = container
}

merged := make([]corev1.Container, 0, len(newContainers)+1)
for _, oldContainer := range oldContainers {
switch {
case oldContainer.Name == legacyName:
// Reinsert the legacy container at its historical position to avoid order-only PodSpec diffs.
merged = append(merged, *legacyContainer.DeepCopy())
case hasContainerByName(newContainerMap, oldContainer.Name):
merged = append(merged, newContainerMap[oldContainer.Name])
used[oldContainer.Name] = struct{}{}
}
}
for _, container := range newContainers {
if _, ok := used[container.Name]; ok {
continue
}
merged = append(merged, container)
}
return merged
}

func preserveLegacyVolumeOrder(oldVolumes, newVolumes []corev1.Volume, legacyVolumeNames map[string]struct{}) []corev1.Volume {
if len(legacyVolumeNames) == 0 {
return newVolumes
}
newVolumeMap := make(map[string]corev1.Volume, len(newVolumes))
used := make(map[string]struct{}, len(newVolumes))
for _, volume := range newVolumes {
newVolumeMap[volume.Name] = volume
}

merged := make([]corev1.Volume, 0, len(newVolumes)+len(legacyVolumeNames))
for _, oldVolume := range oldVolumes {
switch {
case hasVolumeByName(legacyVolumeNames, oldVolume.Name):
// Keep the original volume order for the same reason as containers: avoid restart-causing order diffs.
merged = append(merged, oldVolume)
case hasVolumeByNameMap(newVolumeMap, oldVolume.Name):
merged = append(merged, newVolumeMap[oldVolume.Name])
used[oldVolume.Name] = struct{}{}
}
}
for _, volume := range newVolumes {
if _, ok := used[volume.Name]; ok {
continue
}
if hasVolumeByName(legacyVolumeNames, volume.Name) {
continue
}
merged = append(merged, volume)
}
return merged
}

func mergeVolumeMountsByVolumeName(oldContainers []corev1.Container, newContainers *[]corev1.Container, volumeName string) {
for i := range *newContainers {
container := &(*newContainers)[i]
_, oldContainer := intctrlutil.GetContainerByName(oldContainers, container.Name)
if oldContainer == nil {
continue
}
container.VolumeMounts = preserveLegacyVolumeMountOrder(oldContainer.VolumeMounts, container.VolumeMounts, volumeName)
}
}

func preserveLegacyVolumeMountOrder(oldMounts, newMounts []corev1.VolumeMount, volumeName string) []corev1.VolumeMount {
hasLegacyMount := false
for _, mount := range oldMounts {
if mount.Name == volumeName {
hasLegacyMount = true
break
}
}
if !hasLegacyMount || hasVolumeMount(newMounts, volumeName) {
return newMounts
}

newMountMap := make(map[string]corev1.VolumeMount, len(newMounts))
used := make(map[string]struct{}, len(newMounts))
for _, mount := range newMounts {
newMountMap[volumeMountKey(mount)] = mount
}

merged := make([]corev1.VolumeMount, 0, len(newMounts)+1)
for _, oldMount := range oldMounts {
if oldMount.Name == volumeName {
merged = append(merged, oldMount)
} else {
key := volumeMountKey(oldMount)
if mount, ok := newMountMap[key]; ok {
merged = append(merged, mount)
used[key] = struct{}{}
}
}
}
for _, mount := range newMounts {
key := volumeMountKey(mount)
if _, ok := used[key]; ok {
continue
}
merged = append(merged, mount)
}
return merged
}

func hasContainerByName(containers map[string]corev1.Container, name string) bool {
_, ok := containers[name]
return ok
}

func hasVolumeByName(names map[string]struct{}, name string) bool {
_, ok := names[name]
return ok
}

func hasVolumeByNameMap(volumes map[string]corev1.Volume, name string) bool {
_, ok := volumes[name]
return ok
}

func hasVolumeMount(mounts []corev1.VolumeMount, name string) bool {
for _, mount := range mounts {
if mount.Name == name {
return true
}
}
return false
}

func volumeMountKey(mount corev1.VolumeMount) string {
return mount.Name + "|" + mount.MountPath + "|" + mount.SubPath
}

func checkNRollbackProtoImages(itsObj, itsProto *workloads.InstanceSet) {
if itsObj.Annotations == nil || itsProto.Annotations == nil {
return
Expand Down
Loading
Loading