diff --git a/apis/operations/v1alpha1/opsrequest_validation.go b/apis/operations/v1alpha1/opsrequest_validation.go index ce051bebea5..673d185f2dc 100644 --- a/apis/operations/v1alpha1/opsrequest_validation.go +++ b/apis/operations/v1alpha1/opsrequest_validation.go @@ -604,21 +604,29 @@ func (r *OpsRequest) checkVolumesAllowExpansion(ctx context.Context, cli client. allowExpansion bool requestStorage resource.Quantity isShardingComponent bool + hasPvc bool } - // component name/ sharding name -> vct name -> entity + // [component name]/ [sharding name]/ [component name.its name] -> vct name -> entity vols := make(map[string]map[string]Entity) setVols := func(vcts []OpsRequestVolumeClaimTemplate, componentName string) { for _, vct := range vcts { if _, ok := vols[componentName]; !ok { vols[componentName] = make(map[string]Entity) } - vols[componentName][vct.Name] = Entity{false, nil, false, vct.Storage, false} + vols[componentName][vct.Name] = Entity{false, nil, false, vct.Storage, false, false} } } for _, comp := range r.Spec.VolumeExpansionList { setVols(comp.VolumeClaimTemplates, comp.ComponentOps.ComponentName) + for _, compSpec := range cluster.Spec.ComponentSpecs { + if compSpec.Name == comp.ComponentOps.ComponentName { + for _, its := range compSpec.Instances { + setVols(comp.VolumeClaimTemplates, fmt.Sprintf("%s.%s", compSpec.Name, its.Name)) + } + } + } } fillVol := func(vct appsv1.PersistentVolumeClaimTemplate, key string, isShardingComp bool) { e, ok := vols[key][vct.Name] @@ -638,26 +646,43 @@ func (r *OpsRequest) checkVolumesAllowExpansion(ctx context.Context, cli client. fillVol(vct, componentName, isShardingComp) } } + fillItsVols := func(itsSpec appsv1.InstanceTemplate, cmpVcts []appsv1.PersistentVolumeClaimTemplate, key string) { + if _, ok := vols[key]; !ok { + return // ignore not-exist its + } + mergedVcts := mergeItsCmpTemplates(itsSpec.VolumeClaimTemplates, cmpVcts) + for _, vct := range mergedVcts { + fillVol(vct, key, false) + } + } // traverse the spec to update volumes for _, comp := range cluster.Spec.ComponentSpecs { fillCompVols(comp, comp.Name, false) + for _, its := range comp.Instances { + // update its vct volumes + fillItsVols(its, comp.VolumeClaimTemplates, fmt.Sprintf("%s.%s", comp.Name, its.Name)) + } } for _, sharding := range cluster.Spec.Shardings { fillCompVols(sharding.Template, sharding.Name, true) } // check all used storage classes - var err error for key, compVols := range vols { for vname := range compVols { e := vols[key][vname] if !e.existInSpec { continue } - e.storageClassName, err = r.getSCNameByPvcAndCheckStorageSize(ctx, cli, key, vname, e.isShardingComponent, e.requestStorage) + found, scName, err := r.getSCNameByPvcAndCheckStorageSize(ctx, cli, key, vname, e.isShardingComponent, e.requestStorage) if err != nil { return err } + if !found { + continue + } + e.hasPvc = found + e.storageClassName = scName allowExpansion, err := r.checkStorageClassAllowExpansion(ctx, cli, e.storageClassName) if err != nil { continue // ignore the error and take it as not-supported @@ -674,6 +699,9 @@ func (r *OpsRequest) checkVolumesAllowExpansion(ctx context.Context, cli client. notSupportSc []string ) for vct, e := range compVols { + if !e.hasPvc { + continue + } if !e.existInSpec { notFound = append(notFound, vct) } @@ -724,7 +752,7 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context, key, vctName string, isShardingComponent bool, - requestStorage resource.Quantity) (*string, error) { + requestStorage resource.Quantity) (found bool, scName *string, err error) { componentName := key targetInsTPLName := "" if strings.Contains(key, ".") { @@ -743,7 +771,7 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context, } pvcList := &corev1.PersistentVolumeClaimList{} if err := cli.List(ctx, pvcList, client.InNamespace(r.Namespace), matchingLabels); err != nil { - return nil, err + return false, nil, err } var pvc *corev1.PersistentVolumeClaim for _, pvcItem := range pvcList.Items { @@ -753,14 +781,34 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context, } } if pvc == nil { - return nil, nil + return false, nil, nil } previousValue := *pvc.Status.Capacity.Storage() if requestStorage.Cmp(previousValue) < 0 { - return nil, fmt.Errorf(`requested storage size of volumeClaimTemplate "%s" can not less than status.capacity.storage "%s" `, + return true, nil, fmt.Errorf(`requested storage size of volumeClaimTemplate "%s" can not less than status.capacity.storage "%s" `, vctName, previousValue.String()) } - return pvc.Spec.StorageClassName, nil + return true, pvc.Spec.StorageClassName, nil +} + +func mergeItsCmpTemplates(itsVcts []appsv1.PersistentVolumeClaimTemplate, cmpVcts []appsv1.PersistentVolumeClaimTemplate) []appsv1.PersistentVolumeClaimTemplate { + mergedVcts := make([]appsv1.PersistentVolumeClaimTemplate, 0) + mergedVcts = append(mergedVcts, cmpVcts...) + for _, itsVct := range itsVcts { + found := false + for i, cmpVct := range mergedVcts { + if itsVct.Name == cmpVct.Name { + // cmpVct will be override by itsVct + mergedVcts[i] = itsVct + found = true + break + } + } + if !found { + mergedVcts = append(mergedVcts, itsVct) + } + } + return mergedVcts } // validateVerticalResourceList checks if k8s resourceList is legal diff --git a/pkg/controller/instanceset/instance_util.go b/pkg/controller/instanceset/instance_util.go index 31889d59ea3..0fabd968044 100644 --- a/pkg/controller/instanceset/instance_util.go +++ b/pkg/controller/instanceset/instance_util.go @@ -259,7 +259,7 @@ func GenerateAllInstanceNames(parentName string, replicas int32, templates []Ins instanceNameList := make([]string, 0) for _, template := range templates { replicas := template.GetReplicas() - ordinalList, err := convertOrdinalsToSortedList(template.GetOrdinals()) + ordinalList, err := ConvertOrdinalsToSortedList(template.GetOrdinals()) if err != nil { return nil, err } @@ -271,7 +271,7 @@ func GenerateAllInstanceNames(parentName string, replicas int32, templates []Ins totalReplicas += replicas } if totalReplicas < replicas { - ordinalList, err := convertOrdinalsToSortedList(defaultTemplateOrdinals) + ordinalList, err := ConvertOrdinalsToSortedList(defaultTemplateOrdinals) if err != nil { return nil, err } @@ -350,7 +350,7 @@ func generateInstanceNamesWithOrdinalList(parentName, templateName string, return instanceNameList, nil } -func convertOrdinalsToSortedList(ordinals kbappsv1.Ordinals) ([]int32, error) { +func ConvertOrdinalsToSortedList(ordinals kbappsv1.Ordinals) ([]int32, error) { ordinalList := sets.New(ordinals.Discrete...) for _, item := range ordinals.Ranges { start := item.Start diff --git a/pkg/controller/instanceset/instance_util_test.go b/pkg/controller/instanceset/instance_util_test.go index 8cc9b3cafcb..f43dcd94b15 100644 --- a/pkg/controller/instanceset/instance_util_test.go +++ b/pkg/controller/instanceset/instance_util_test.go @@ -397,7 +397,7 @@ var _ = Describe("instance util test", func() { }, Discrete: []int32{0, 6}, } - ordinalList, err := convertOrdinalsToSortedList(ordinals) + ordinalList, err := ConvertOrdinalsToSortedList(ordinals) Expect(err).Should(BeNil()) sets.New(ordinalList...).Equal(sets.New[int32](0, 2, 3, 4, 6)) }) @@ -412,7 +412,7 @@ var _ = Describe("instance util test", func() { }, Discrete: []int32{0}, } - ordinalList, err := convertOrdinalsToSortedList(ordinals) + ordinalList, err := ConvertOrdinalsToSortedList(ordinals) errExpected := fmt.Errorf("range's end(%v) must >= start(%v)", 2, 4) Expect(err).Should(Equal(errExpected)) Expect(ordinalList).Should(BeNil()) diff --git a/pkg/operations/volume_expansion.go b/pkg/operations/volume_expansion.go index b2cb172afcf..23a62c72614 100644 --- a/pkg/operations/volume_expansion.go +++ b/pkg/operations/volume_expansion.go @@ -49,6 +49,8 @@ type volumeExpansionHelper struct { vctName string expectCount int offlineInstanceNames []string + templateName string + ordinals appsv1.Ordinals } var _ OpsHandler = volumeExpansionOpsHandler{} @@ -137,6 +139,18 @@ func (ve volumeExpansionOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCt vctName: vct.Name, offlineInstanceNames: compSpec.OfflineInstances, }) + for _, template := range compSpec.Instances { + // todo: consider instance template with volumeClaimTemplates + veHelpers = append(veHelpers, volumeExpansionHelper{ + compOps: compOps, + fullComponentName: fullComponentName, + expectCount: int(*template.Replicas), + vctName: vct.Name, + offlineInstanceNames: compSpec.OfflineInstances, + templateName: template.Name, + ordinals: template.Ordinals, + }) + } } } } @@ -285,6 +299,9 @@ func (ve volumeExpansionOpsHandler) handleVCTExpansionProgress(reqCtx intctrluti completedCount int err error ) + if veHelper.expectCount <= 0 { + return 0, 0, nil + } matchingLabels := client.MatchingLabels{ constant.AppInstanceLabelKey: opsRes.Cluster.Name, constant.VolumeClaimTemplateNameLabelKey: veHelper.vctName, @@ -295,7 +312,11 @@ func (ve volumeExpansionOpsHandler) handleVCTExpansionProgress(reqCtx intctrluti return 0, 0, err } workloadName := constant.GenerateWorkloadNamePattern(opsRes.Cluster.Name, veHelper.fullComponentName) - instanceNames, err := instanceset.GenerateInstanceNamesFromTemplate(workloadName, "", int32(veHelper.expectCount), veHelper.offlineInstanceNames, nil) + ordinalList, err := instanceset.ConvertOrdinalsToSortedList(veHelper.ordinals) + if err != nil { + return 0, 0, err + } + instanceNames, err := instanceset.GenerateInstanceNamesFromTemplate(workloadName, veHelper.templateName, int32(veHelper.expectCount), veHelper.offlineInstanceNames, ordinalList) if err != nil { return 0, 0, err } diff --git a/pkg/operations/volume_expansion_test.go b/pkg/operations/volume_expansion_test.go index c58087ad25f..2bfd21c50fd 100644 --- a/pkg/operations/volume_expansion_test.go +++ b/pkg/operations/volume_expansion_test.go @@ -21,6 +21,7 @@ package operations import ( "fmt" + "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -32,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubectl/pkg/util/storage" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" @@ -67,9 +69,14 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { // delete cluster(and all dependent sub-resources), cluster definition testapps.ClearClusterResources(&testCtx) + // delete component definition resources + testapps.ClearComponentResourcesWithRemoveFinalizerOption(&testCtx) + // delete rest resources inNS := client.InNamespace(testCtx.DefaultNamespace) ml := client.HasLabels{testCtx.TestObjLabelKey} + // delete pvc resources + testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.PersistentVolumeClaimSignature, true, inNS, ml) // namespaced testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml) // non-namespaced @@ -88,6 +95,25 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { constant.KBAppComponentLabelKey, consensusCompName).SetStorage("2Gi").SetStorageClass(storageClassName).CheckedCreate(&testCtx) } + createItsPVC := func(clusterName, scName, vctName, pvcName, cmpName string) { + var itsName string + parts := strings.Split(pvcName, cmpName+"-") + if len(parts) == 2 && strings.Contains(parts[1], "-") { + p := strings.Split(parts[1], "-") + itsName = p[0] + testapps.NewPersistentVolumeClaimFactory(testCtx.DefaultNamespace, pvcName, clusterName, + consensusCompName, testapps.DataVolumeName).AddLabels(constant.AppInstanceLabelKey, clusterName, + constant.VolumeClaimTemplateNameLabelKey, testapps.DataVolumeName, + constant.KBAppComponentLabelKey, consensusCompName, constant.KBAppInstanceTemplateLabelKey, + itsName).SetStorage("2Gi").SetStorageClass(storageClassName).CheckedCreate(&testCtx) + } else { + testapps.NewPersistentVolumeClaimFactory(testCtx.DefaultNamespace, pvcName, clusterName, + consensusCompName, testapps.DataVolumeName).AddLabels(constant.AppInstanceLabelKey, clusterName, + constant.VolumeClaimTemplateNameLabelKey, testapps.DataVolumeName, + constant.KBAppComponentLabelKey, consensusCompName).SetStorage("2Gi").SetStorageClass(storageClassName).CheckedCreate(&testCtx) + } + } + // getVolumeClaimNames gets all PVC names of component compName. // // cluster.Spec.GetComponentByName(compName).VolumeClaimTemplates[*].Name will be used if no claimNames provided @@ -140,16 +166,73 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { return pvcNames } - initResourcesForVolumeExpansion := func(clusterObject *appsv1.Cluster, opsRes *OpsResource, storage string, replicas int) (*opsv1alpha1.OpsRequest, []string) { - pvcNames := getVolumeClaimNames(opsRes.Cluster, consensusCompName) - for _, pvcName := range pvcNames { - createPVC(clusterObject.Name, storageClassName, vctName, pvcName) - // mock pvc is Bound - Expect(testapps.GetAndChangeObjStatus(&testCtx, client.ObjectKey{Name: pvcName, Namespace: testCtx.DefaultNamespace}, func(pvc *corev1.PersistentVolumeClaim) { - pvc.Status.Phase = corev1.ClaimBound - })()).ShouldNot(HaveOccurred()) + // getItsVolumeClaimNames gets all PVC names of component compName and instanceTemplateName. + // + // cluster.Spec.GetComponentByName(compName).VolumeClaimTemplates[*].Name will be used if no claimNames provided + // + // nil return if: + // 1. component compName not found or + // 2. len(VolumeClaimTemplates)==0 or + // 3. any claimNames not found + getItsVolumeClaimNames := func(cluster *appsv1.Cluster, compName string, claimNames ...string) []string { + if cluster == nil { + return nil + } + comp := cluster.Spec.GetComponentByName(compName) + if comp == nil { + return nil + } + if len(comp.VolumeClaimTemplates) == 0 { + return nil + } + if len(claimNames) == 0 { + for _, template := range comp.VolumeClaimTemplates { + claimNames = append(claimNames, template.Name) + } + } + allExist := true + for _, name := range claimNames { + found := false + for _, template := range comp.VolumeClaimTemplates { + if template.Name == name { + found = true + break + } + } + if !found { + allExist = false + break + } + } + if !allExist { + return nil + } + pvcNames := make([]string, 0) + for _, claimName := range claimNames { + restReplicas := comp.Replicas + for i := 0; i < int(comp.Replicas); i++ { + // here we did not handle ordinals in instance template + for _, its := range comp.Instances { + for j := 0; j < int(*its.Replicas); j++ { + pvcName := fmt.Sprintf("%s-%s-%s-%s-%d", claimName, cluster.Name, compName, its.Name, j) + pvcNames = append(pvcNames, pvcName) + restReplicas-- + i++ + } + } + for x := 0; x < int(restReplicas); x++ { + pvcName := fmt.Sprintf("%s-%s-%s-%d", claimName, cluster.Name, compName, x) + pvcNames = append(pvcNames, pvcName) + i++ + } + } } + return pvcNames + } + + initResourcesForVolumeExpansionBase := func(clusterObject *appsv1.Cluster, opsRes *OpsResource, storage string, replicas int, preparePvc func() []string) (*opsv1alpha1.OpsRequest, []string) { + pvcNames := preparePvc() currRandomStr := testCtx.GetRandomStr() ops := testops.NewOpsRequestObj("volumeexpansion-ops-"+currRandomStr, testCtx.DefaultNamespace, clusterObject.Name, opsv1alpha1.VolumeExpansionType) @@ -171,10 +254,44 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { return ops, pvcNames } + initResourceForItsVolumeExpansion := func(clusterObject *appsv1.Cluster, opsRes *OpsResource, storage string, replicas int) (*opsv1alpha1.OpsRequest, []string) { + return initResourcesForVolumeExpansionBase(clusterObject, opsRes, storage, replicas, func() []string { + pvcNames := getItsVolumeClaimNames(opsRes.Cluster, consensusCompName) + for _, pvcName := range pvcNames { + createItsPVC(clusterObject.Name, storageClassName, vctName, pvcName, consensusCompName) + // mock pvc is Bound + Expect(testapps.GetAndChangeObjStatus(&testCtx, client.ObjectKey{Name: pvcName, Namespace: testCtx.DefaultNamespace}, func(pvc *corev1.PersistentVolumeClaim) { + pvc.Status.Phase = corev1.ClaimBound + })()).ShouldNot(HaveOccurred()) + + } + return pvcNames + }) + } + + initResourcesForVolumeExpansion := func(clusterObject *appsv1.Cluster, opsRes *OpsResource, storage string, replicas int) (*opsv1alpha1.OpsRequest, []string) { + return initResourcesForVolumeExpansionBase(clusterObject, opsRes, storage, replicas, func() []string { + pvcNames := getVolumeClaimNames(opsRes.Cluster, consensusCompName) + for _, pvcName := range pvcNames { + createPVC(clusterObject.Name, storageClassName, vctName, pvcName) + // mock pvc is Bound + Expect(testapps.GetAndChangeObjStatus(&testCtx, client.ObjectKey{Name: pvcName, Namespace: testCtx.DefaultNamespace}, func(pvc *corev1.PersistentVolumeClaim) { + pvc.Status.Phase = corev1.ClaimBound + })()).ShouldNot(HaveOccurred()) + + } + return pvcNames + }) + } + mockVolumeExpansionActionAndReconcile := func(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, newOps *opsv1alpha1.OpsRequest, pvcNames []string) { // first step, validate ops and update phase to Creating _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).Should(BeNil()) + // ops phase should not failed + Eventually(testapps.CheckObj(&testCtx, client.ObjectKey{Name: newOps.Name, Namespace: testCtx.DefaultNamespace}, func(g Gomega, tmpOps *opsv1alpha1.OpsRequest) { + g.Expect(tmpOps.Status.Phase).To(Not(Equal(opsv1alpha1.OpsFailedPhase))) + })).Should(Succeed()) // next step, do volume-expand action _, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes) @@ -203,7 +320,8 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { clusterObject *appsv1.Cluster, opsRes *OpsResource, requestStorage, - actualStorage string) { + actualStorage string, + initResourceFunc func(clusterObject *appsv1.Cluster, opsRes *OpsResource, storage string, replicas int) (*opsv1alpha1.OpsRequest, []string)) { // mock cluster is Running to support volume expansion ops Expect(testapps.ChangeObjStatus(&testCtx, clusterObject, func() { clusterObject.Status.Phase = appsv1.RunningClusterPhase @@ -211,7 +329,7 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { // init resources for volume expansion comp := clusterObject.Spec.GetComponentByName(consensusCompName) - newOps, pvcNames := initResourcesForVolumeExpansion(clusterObject, opsRes, requestStorage, int(comp.Replicas)) + newOps, pvcNames := initResourceFunc(clusterObject, opsRes, requestStorage, int(comp.Replicas)) By("mock run volumeExpansion action and reconcileAction") mockVolumeExpansionActionAndReconcile(reqCtx, opsRes, newOps, pvcNames) @@ -303,13 +421,52 @@ var _ = Describe("OpsRequest Controller Volume Expansion Handler", func() { })).ShouldNot(HaveOccurred()) By("Test VolumeExpansion with consistent storageSize") - testVolumeExpansion(reqCtx, clusterObject, opsRes, "3Gi", "3Gi") + testVolumeExpansion(reqCtx, clusterObject, opsRes, "3Gi", "3Gi", initResourcesForVolumeExpansion) By("Test VolumeExpansion with inconsistent storageSize but it is valid") - testVolumeExpansion(reqCtx, clusterObject, opsRes, "5G", "5Gi") + testVolumeExpansion(reqCtx, clusterObject, opsRes, "5G", "5Gi", initResourcesForVolumeExpansion) By("Test delete the Running VolumeExpansion OpsRequest") testDeleteRunningVolumeExpansion(clusterObject, opsRes) }) + + It("VolumeExpandsion with InstanceTemplate", func() { + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + _, clusterObject := testapps.InitConsensusMysql(&testCtx, clusterName, compDefName, consensusCompName) + // all pods use instance template + Expect(testapps.ChangeObj(&testCtx, clusterObject, func(lc *appsv1.Cluster) { + lc.Spec.ComponentSpecs[0].Instances = []appsv1.InstanceTemplate{ + { + Name: "foo", + Replicas: ptr.To(int32(1)), + }, + { + Name: "bar", + Replicas: ptr.To(int32(2)), + }, + } + })).ShouldNot(HaveOccurred()) + // init storageClass + sc := testapps.CreateStorageClass(&testCtx, storageClassName, true) + Expect(testapps.ChangeObj(&testCtx, sc, func(lsc *storagev1.StorageClass) { + lsc.Annotations = map[string]string{storage.IsDefaultStorageClassAnnotation: "true"} + })).ShouldNot(HaveOccurred()) + opsRes := &OpsResource{ + Cluster: clusterObject, + Recorder: k8sManager.GetEventRecorderFor("opsrequest-controller"), + } + By("Test OpsManager.MainEnter function with ClusterOps") + Expect(testapps.ChangeObjStatus(&testCtx, clusterObject, func() { + clusterObject.Status.Phase = appsv1.RunningClusterPhase + clusterObject.Status.Components = map[string]appsv1.ClusterComponentStatus{ + consensusCompName: { + Phase: appsv1.RunningComponentPhase, + }, + } + })).ShouldNot(HaveOccurred()) + + By("Test volume expansion with instance template") + testVolumeExpansion(reqCtx, clusterObject, opsRes, "3Gi", "3Gi", initResourceForItsVolumeExpansion) + }) }) })