Skip to content

Commit 8b790de

Browse files
committed
sharding restore and inc backup validations
1 parent a24a1e6 commit 8b790de

File tree

5 files changed

+49
-22
lines changed

5 files changed

+49
-22
lines changed

controllers/apps/cluster/transformer_cluster_restore.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,43 +72,51 @@ func (c *clusterRestoreTransformer) Transform(ctx graph.TransformContext, dag *g
7272
if err != nil {
7373
return err
7474
}
75+
targets := backup.Status.Targets
7576
// obtain components that have already been assigned targets.
7677
allocateTargetMap := map[string]string{}
7778
restoreDoneForShardComponents := true
7879
for _, v := range shardComponents {
7980
if model.IsObjectDeleting(&v) {
8081
continue
8182
}
82-
if v.Annotations[constant.RestoreDoneAnnotationKey] != "true" {
83+
84+
compName := v.Labels[constant.KBAppComponentLabelKey]
85+
compAnnotations := c.initClusterAnnotations(compName)
86+
87+
if v.Annotations[constant.BackupSourceTargetAnnotationKey] != "" && v.Annotations[constant.RestoreDoneAnnotationKey] != "true" {
8388
restoreDoneForShardComponents = false
8489
}
8590
if targetName, ok := v.Annotations[constant.BackupSourceTargetAnnotationKey]; ok {
86-
compName := v.Labels[constant.KBAppComponentLabelKey]
8791
allocateTargetMap[targetName] = compName
88-
c.initClusterAnnotations(compName)
89-
c.annotations[compName][constant.BackupSourceTargetAnnotationKey] = targetName
92+
compAnnotations[constant.BackupSourceTargetAnnotationKey] = targetName
9093
}
9194
}
92-
if len(allocateTargetMap) == len(backup.Status.Targets) {
95+
if len(allocateTargetMap) == len(targets) {
9396
// check if the restore is completed when all source target have allocated.
9497
if err = c.cleanupRestoreAnnotationForSharding(dag, spec.Name, restoreDoneForShardComponents); err != nil {
9598
return err
9699
}
97-
continue
98100
}
99-
for _, target := range backup.Status.Targets {
101+
for _, target := range targets {
100102
if _, ok = allocateTargetMap[target.Name]; ok {
101103
continue
102104
}
103105
for _, compSpec := range c.shardingComps[spec.Name] {
104-
if _, ok = c.annotations[compSpec.Name][constant.BackupSourceTargetAnnotationKey]; ok {
106+
compAnnotations := c.initClusterAnnotations(compSpec.Name)
107+
if _, ok = compAnnotations[constant.BackupSourceTargetAnnotationKey]; ok {
105108
continue
106109
}
107-
c.initClusterAnnotations(compSpec.Name)
108-
c.annotations[compSpec.Name][constant.BackupSourceTargetAnnotationKey] = target.Name
110+
compAnnotations[constant.BackupSourceTargetAnnotationKey] = target.Name
109111
break
110112
}
111113
}
114+
for _, compSpec := range c.shardingComps[spec.Name] {
115+
compAnnotations := c.initClusterAnnotations(compSpec.Name)
116+
if compAnnotations[constant.BackupSourceTargetAnnotationKey] == "" {
117+
compAnnotations[constant.SkipRestoreAnnotationKey] = "true"
118+
}
119+
}
112120
}
113121
// if component needs to do post ready restore after cluster is running, annotate component
114122
if c.Cluster.Status.Phase == appsv1.RunningClusterPhase {
@@ -132,13 +140,14 @@ func (c *clusterRestoreTransformer) Transform(ctx graph.TransformContext, dag *g
132140
return nil
133141
}
134142

135-
func (c *clusterRestoreTransformer) initClusterAnnotations(compName string) {
143+
func (c *clusterRestoreTransformer) initClusterAnnotations(compName string) map[string]string {
136144
if c.annotations == nil {
137-
c.annotations = map[string]map[string]string{}
145+
c.annotations = make(map[string]map[string]string)
138146
}
139147
if c.annotations[compName] == nil {
140-
c.annotations[compName] = map[string]string{}
148+
c.annotations[compName] = make(map[string]string)
141149
}
150+
return c.annotations[compName]
142151
}
143152

144153
func (c *clusterRestoreTransformer) cleanupRestoreAnnotationForSharding(dag *graph.DAG,

controllers/dataprotection/backup_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ func prepare4Incremental(request *dpbackup.Request) (*dpbackup.Request, error) {
10711071
return nil, fmt.Errorf("backupRepo for incremental backup can't be empty")
10721072
}
10731073
// get and validate parent backup
1074-
parentBackup, err := GetParentBackup(request.Ctx, request.Client, request.Backup, request.BackupMethod, request.BackupRepo.Name)
1074+
parentBackup, err := GetParentBackup(request.Ctx, request.Client, request.Backup, request.BackupMethod, request.BackupPolicy, request.BackupRepo.Name)
10751075
if err != nil {
10761076
return nil, err
10771077
}

controllers/dataprotection/utils.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ func fromFlattenName(flatten string) (name string, namespace string) {
590590
// then validate and return the parent backup.
591591
// If parentBackupName is not specified, find the latest valid parent backup.
592592
func GetParentBackup(ctx context.Context, cli client.Client, backup *dpv1alpha1.Backup,
593-
backupMethod *dpv1alpha1.BackupMethod, backupRepoName string) (*dpv1alpha1.Backup, error) {
593+
backupMethod *dpv1alpha1.BackupMethod, backupPolicy *dpv1alpha1.BackupPolicy, backupRepoName string) (*dpv1alpha1.Backup, error) {
594594
if backup == nil || backupMethod == nil {
595595
return nil, fmt.Errorf("backup or backupMethod is nil")
596596
}
@@ -616,16 +616,23 @@ func GetParentBackup(ctx context.Context, cli client.Client, backup *dpv1alpha1.
616616
}, parentBackup); err != nil {
617617
return nil, err
618618
}
619-
if err := ValidateParentBackup(ctx, cli, backup, parentBackup, backupMethod, backupRepoName); err != nil {
619+
if err := ValidateParentBackup(ctx, cli, backup, parentBackup, backupMethod, backupPolicy, backupRepoName); err != nil {
620620
return nil, fmt.Errorf("failed to validate parent backup %s: %w", parentBackupName, err)
621621
}
622622
return parentBackup, nil
623623
}
624-
parentBackup, err := FindParentBackupIfNotSet(ctx, cli, backup, backupMethod, backupRepoName)
624+
parentBackup, err := FindParentBackupIfNotSet(ctx, cli, backup, backupMethod, backupPolicy, backupRepoName)
625625
if err != nil {
626626
return nil, fmt.Errorf("failed to find parent backup: %w", err)
627627
}
628628
if parentBackup == nil {
629+
// output sharding message
630+
expectedTargets := dputils.GetBackupTargets(backupPolicy, backupMethod)
631+
if len(expectedTargets) > 0 {
632+
return nil, fmt.Errorf("failed to find a valid parent backup for backup %s/%s: "+
633+
"current backup expects %d shards, this may be due to shard count mismatch with existing backups",
634+
backup.Namespace, backup.Name, len(expectedTargets))
635+
}
629636
return nil, fmt.Errorf("failed to find a valid parent backup for backup %s/%s", backup.Namespace, backup.Name)
630637
}
631638
return parentBackup, nil
@@ -637,7 +644,7 @@ func GetParentBackup(ctx context.Context, cli client.Client, backup *dpv1alpha1.
637644
// b. return the latest incremental backup.
638645
// c. return the latest full backup if incremental backups are not found.
639646
func FindParentBackupIfNotSet(ctx context.Context, cli client.Client, backup *dpv1alpha1.Backup,
640-
backupMethod *dpv1alpha1.BackupMethod, backupRepoName string) (*dpv1alpha1.Backup, error) {
647+
backupMethod *dpv1alpha1.BackupMethod, backupPolicy *dpv1alpha1.BackupPolicy, backupRepoName string) (*dpv1alpha1.Backup, error) {
641648
getLatestBackup := func(backupList []*dpv1alpha1.Backup) *dpv1alpha1.Backup {
642649
if len(backupList) == 0 {
643650
return nil
@@ -655,7 +662,7 @@ func FindParentBackupIfNotSet(ctx context.Context, cli client.Client, backup *dp
655662
client.MatchingLabels(labels)); err != nil && !apierrors.IsNotFound(err) {
656663
return nil, err
657664
}
658-
filteredbackupList := FilterParentBackups(ctx, cli, backupList, backup, backupMethod, incremental, backupRepoName)
665+
filteredbackupList := FilterParentBackups(ctx, cli, backupList, backup, backupMethod, backupPolicy, incremental, backupRepoName)
659666
return getLatestBackup(filteredbackupList), nil
660667
}
661668

@@ -701,13 +708,13 @@ func FindParentBackupIfNotSet(ctx context.Context, cli client.Client, backup *dp
701708

702709
// FilterParentBackups filters the parent backups by backup phase, backup method and end time.
703710
func FilterParentBackups(ctx context.Context, cli client.Client, backupList *dpv1alpha1.BackupList, targetBackup *dpv1alpha1.Backup,
704-
backupMethod *dpv1alpha1.BackupMethod, incremental bool, backupRepoName string) []*dpv1alpha1.Backup {
711+
backupMethod *dpv1alpha1.BackupMethod, backupPolicy *dpv1alpha1.BackupPolicy, incremental bool, backupRepoName string) []*dpv1alpha1.Backup {
705712
var res []*dpv1alpha1.Backup
706713
if backupList == nil || len(backupList.Items) == 0 {
707714
return res
708715
}
709716
for i, backup := range backupList.Items {
710-
if err := ValidateParentBackup(ctx, cli, targetBackup, &backup, backupMethod, backupRepoName); err != nil {
717+
if err := ValidateParentBackup(ctx, cli, targetBackup, &backup, backupMethod, backupPolicy, backupRepoName); err != nil {
711718
continue
712719
}
713720
// backups are listed by backup type label, validate if the backup method matches
@@ -728,7 +735,7 @@ func FilterParentBackups(ctx context.Context, cli client.Client, backupList *dpv
728735

729736
// ValidateParentBackup validates the parent backup.
730737
func ValidateParentBackup(ctx context.Context, cli client.Client, backup *dpv1alpha1.Backup, parentBackup *dpv1alpha1.Backup,
731-
backupMethod *dpv1alpha1.BackupMethod, backupRepoName string) error {
738+
backupMethod *dpv1alpha1.BackupMethod, backupPolicy *dpv1alpha1.BackupPolicy, backupRepoName string) error {
732739
// validate parent backup is completed
733740
if parentBackup.Status.Phase != dpv1alpha1.BackupPhaseCompleted {
734741
return fmt.Errorf("parent backup %s/%s is not completed", parentBackup.Namespace, parentBackup.Name)
@@ -759,6 +766,13 @@ func ValidateParentBackup(ctx context.Context, cli client.Client, backup *dpv1al
759766
parentBackup.Namespace, parentBackup.Name, err)
760767
}
761768
}
769+
// validate shard count consistency for incremental backup
770+
expectedTargets := dputils.GetBackupTargets(backupPolicy, backupMethod)
771+
if len(expectedTargets) > 0 && len(parentBackup.Status.Targets) != len(expectedTargets) {
772+
return fmt.Errorf("parent backup %s/%s has %d shards, but current backup expects %d shards; "+
773+
"incremental backup requires consistent shard count",
774+
parentBackup.Namespace, parentBackup.Name, len(parentBackup.Status.Targets), len(expectedTargets))
775+
}
762776
return nil
763777
}
764778

pkg/constant/annotations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
RestoreFromBackupAnnotationKey = "kubeblocks.io/restore-from-backup"
3434
RestoreDoneAnnotationKey = "kubeblocks.io/restore-done"
3535
BackupSourceTargetAnnotationKey = "kubeblocks.io/backup-source-target" // RestoreFromBackupAnnotationKey specifies the component to recover from the backup.
36+
SkipRestoreAnnotationKey = "kubeblocks.io/skip-restore" // SkipRestoreAnnotationKey indicates the shard component should skip sharding restore scheduling.
3637

3738
KBAppClusterUIDKey = "apps.kubeblocks.io/cluster-uid"
3839
BackupPolicyTemplateAnnotationKey = "apps.kubeblocks.io/backup-policy-template"

pkg/controller/plan/restore.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ func NewRestoreManager(ctx context.Context,
8686
}
8787

8888
func (r *RestoreManager) DoRestore(comp *component.SynthesizedComponent, compObj *appsv1.Component, postProvisionDone bool) error {
89+
if compObj.Annotations[constant.SkipRestoreAnnotationKey] == "true" {
90+
return nil
91+
}
8992
backupObj, err := r.initFromAnnotation(comp, compObj)
9093
if err != nil {
9194
return err

0 commit comments

Comments
 (0)