Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,21 +604,29 @@ func (r *OpsRequest) checkVolumesAllowExpansion(ctx context.Context, cli client.
allowExpansion bool
requestStorage resource.Quantity
isShardingComponent bool
hasPvc bool
}

// component name/ sharding name -> vct name -> entity
// [component name]/ [sharding name]/ [component name.its name] -> vct name -> entity
vols := make(map[string]map[string]Entity)
setVols := func(vcts []OpsRequestVolumeClaimTemplate, componentName string) {
for _, vct := range vcts {
if _, ok := vols[componentName]; !ok {
vols[componentName] = make(map[string]Entity)
}
vols[componentName][vct.Name] = Entity{false, nil, false, vct.Storage, false}
vols[componentName][vct.Name] = Entity{false, nil, false, vct.Storage, false, false}
}
}

for _, comp := range r.Spec.VolumeExpansionList {
setVols(comp.VolumeClaimTemplates, comp.ComponentOps.ComponentName)
for _, compSpec := range cluster.Spec.ComponentSpecs {
if compSpec.Name == comp.ComponentOps.ComponentName {
for _, its := range compSpec.Instances {
setVols(comp.VolumeClaimTemplates, fmt.Sprintf("%s.%s", compSpec.Name, its.Name))
}
}
}
}
fillVol := func(vct appsv1.PersistentVolumeClaimTemplate, key string, isShardingComp bool) {
e, ok := vols[key][vct.Name]
Expand All @@ -638,26 +646,43 @@ func (r *OpsRequest) checkVolumesAllowExpansion(ctx context.Context, cli client.
fillVol(vct, componentName, isShardingComp)
}
}
fillItsVols := func(itsSpec appsv1.InstanceTemplate, cmpVcts []appsv1.PersistentVolumeClaimTemplate, key string) {
if _, ok := vols[key]; !ok {
return // ignore not-exist its
}
mergedVcts := mergeItsCmpTemplates(itsSpec.VolumeClaimTemplates, cmpVcts)
for _, vct := range mergedVcts {
fillVol(vct, key, false)
}
}
// traverse the spec to update volumes
for _, comp := range cluster.Spec.ComponentSpecs {
fillCompVols(comp, comp.Name, false)
for _, its := range comp.Instances {
// update its vct volumes
fillItsVols(its, comp.VolumeClaimTemplates, fmt.Sprintf("%s.%s", comp.Name, its.Name))
}
}
for _, sharding := range cluster.Spec.Shardings {
fillCompVols(sharding.Template, sharding.Name, true)
}

// check all used storage classes
var err error
for key, compVols := range vols {
for vname := range compVols {
e := vols[key][vname]
if !e.existInSpec {
continue
}
e.storageClassName, err = r.getSCNameByPvcAndCheckStorageSize(ctx, cli, key, vname, e.isShardingComponent, e.requestStorage)
found, scName, err := r.getSCNameByPvcAndCheckStorageSize(ctx, cli, key, vname, e.isShardingComponent, e.requestStorage)
if err != nil {
return err
}
if !found {
continue
}
e.hasPvc = found
e.storageClassName = scName
allowExpansion, err := r.checkStorageClassAllowExpansion(ctx, cli, e.storageClassName)
if err != nil {
continue // ignore the error and take it as not-supported
Expand All @@ -674,6 +699,9 @@ func (r *OpsRequest) checkVolumesAllowExpansion(ctx context.Context, cli client.
notSupportSc []string
)
for vct, e := range compVols {
if !e.hasPvc {
continue
}
if !e.existInSpec {
notFound = append(notFound, vct)
}
Expand Down Expand Up @@ -724,7 +752,7 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context,
key,
vctName string,
isShardingComponent bool,
requestStorage resource.Quantity) (*string, error) {
requestStorage resource.Quantity) (found bool, scName *string, err error) {
componentName := key
targetInsTPLName := ""
if strings.Contains(key, ".") {
Expand All @@ -743,7 +771,7 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context,
}
pvcList := &corev1.PersistentVolumeClaimList{}
if err := cli.List(ctx, pvcList, client.InNamespace(r.Namespace), matchingLabels); err != nil {
return nil, err
return false, nil, err
}
var pvc *corev1.PersistentVolumeClaim
for _, pvcItem := range pvcList.Items {
Expand All @@ -753,14 +781,34 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context,
}
}
if pvc == nil {
return nil, nil
return false, nil, nil
}
previousValue := *pvc.Status.Capacity.Storage()
if requestStorage.Cmp(previousValue) < 0 {
return nil, fmt.Errorf(`requested storage size of volumeClaimTemplate "%s" can not less than status.capacity.storage "%s" `,
return true, nil, fmt.Errorf(`requested storage size of volumeClaimTemplate "%s" can not less than status.capacity.storage "%s" `,
vctName, previousValue.String())
}
return pvc.Spec.StorageClassName, nil
return true, pvc.Spec.StorageClassName, nil
}

func mergeItsCmpTemplates(itsVcts []appsv1.PersistentVolumeClaimTemplate, cmpVcts []appsv1.PersistentVolumeClaimTemplate) []appsv1.PersistentVolumeClaimTemplate {
mergedVcts := make([]appsv1.PersistentVolumeClaimTemplate, 0)
mergedVcts = append(mergedVcts, cmpVcts...)
for _, itsVct := range itsVcts {
found := false
for i, cmpVct := range mergedVcts {
if itsVct.Name == cmpVct.Name {
// cmpVct will be override by itsVct
mergedVcts[i] = itsVct
found = true
break
}
}
if !found {
mergedVcts = append(mergedVcts, itsVct)
}
}
return mergedVcts
}

// validateVerticalResourceList checks if k8s resourceList is legal
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/instanceset/instance_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func GenerateAllInstanceNames(parentName string, replicas int32, templates []Ins
instanceNameList := make([]string, 0)
for _, template := range templates {
replicas := template.GetReplicas()
ordinalList, err := convertOrdinalsToSortedList(template.GetOrdinals())
ordinalList, err := ConvertOrdinalsToSortedList(template.GetOrdinals())
if err != nil {
return nil, err
}
Expand All @@ -271,7 +271,7 @@ func GenerateAllInstanceNames(parentName string, replicas int32, templates []Ins
totalReplicas += replicas
}
if totalReplicas < replicas {
ordinalList, err := convertOrdinalsToSortedList(defaultTemplateOrdinals)
ordinalList, err := ConvertOrdinalsToSortedList(defaultTemplateOrdinals)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func generateInstanceNamesWithOrdinalList(parentName, templateName string,
return instanceNameList, nil
}

func convertOrdinalsToSortedList(ordinals kbappsv1.Ordinals) ([]int32, error) {
func ConvertOrdinalsToSortedList(ordinals kbappsv1.Ordinals) ([]int32, error) {
ordinalList := sets.New(ordinals.Discrete...)
for _, item := range ordinals.Ranges {
start := item.Start
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/instanceset/instance_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ var _ = Describe("instance util test", func() {
},
Discrete: []int32{0, 6},
}
ordinalList, err := convertOrdinalsToSortedList(ordinals)
ordinalList, err := ConvertOrdinalsToSortedList(ordinals)
Expect(err).Should(BeNil())
sets.New(ordinalList...).Equal(sets.New[int32](0, 2, 3, 4, 6))
})
Expand All @@ -412,7 +412,7 @@ var _ = Describe("instance util test", func() {
},
Discrete: []int32{0},
}
ordinalList, err := convertOrdinalsToSortedList(ordinals)
ordinalList, err := ConvertOrdinalsToSortedList(ordinals)
errExpected := fmt.Errorf("range's end(%v) must >= start(%v)", 2, 4)
Expect(err).Should(Equal(errExpected))
Expect(ordinalList).Should(BeNil())
Expand Down
23 changes: 22 additions & 1 deletion pkg/operations/volume_expansion.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type volumeExpansionHelper struct {
vctName string
expectCount int
offlineInstanceNames []string
templateName string
ordinals appsv1.Ordinals
}

var _ OpsHandler = volumeExpansionOpsHandler{}
Expand Down Expand Up @@ -137,6 +139,18 @@ func (ve volumeExpansionOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCt
vctName: vct.Name,
offlineInstanceNames: compSpec.OfflineInstances,
})
for _, template := range compSpec.Instances {
// todo: consider instance template with volumeClaimTemplates
veHelpers = append(veHelpers, volumeExpansionHelper{
compOps: compOps,
fullComponentName: fullComponentName,
expectCount: int(*template.Replicas),
vctName: vct.Name,
offlineInstanceNames: compSpec.OfflineInstances,
templateName: template.Name,
ordinals: template.Ordinals,
})
}
}
}
}
Expand Down Expand Up @@ -285,6 +299,9 @@ func (ve volumeExpansionOpsHandler) handleVCTExpansionProgress(reqCtx intctrluti
completedCount int
err error
)
if veHelper.expectCount <= 0 {
return 0, 0, nil
}
matchingLabels := client.MatchingLabels{
constant.AppInstanceLabelKey: opsRes.Cluster.Name,
constant.VolumeClaimTemplateNameLabelKey: veHelper.vctName,
Expand All @@ -295,7 +312,11 @@ func (ve volumeExpansionOpsHandler) handleVCTExpansionProgress(reqCtx intctrluti
return 0, 0, err
}
workloadName := constant.GenerateWorkloadNamePattern(opsRes.Cluster.Name, veHelper.fullComponentName)
instanceNames, err := instanceset.GenerateInstanceNamesFromTemplate(workloadName, "", int32(veHelper.expectCount), veHelper.offlineInstanceNames, nil)
ordinalList, err := instanceset.ConvertOrdinalsToSortedList(veHelper.ordinals)
if err != nil {
return 0, 0, err
}
instanceNames, err := instanceset.GenerateInstanceNamesFromTemplate(workloadName, veHelper.templateName, int32(veHelper.expectCount), veHelper.offlineInstanceNames, ordinalList)
if err != nil {
return 0, 0, err
}
Expand Down
Loading
Loading