Skip to content

Commit b83ba60

Browse files
committed
Classify PITR slicer errors as fatal vs retriable to stop infinite restart loop
When a slicer fails with a permanent error (e.g. insufficient oplog range, missing base backup), PITR now stops cluster-wide and stays stopped until a successful backup clears the error state. Previously, leadNomination unconditionally called InitMeta which cleared the error, causing an infinite fail-stop-restart loop where healthy replica sets accumulated useless oplog chunks.
1 parent 068bfb3 commit b83ba60

File tree

3 files changed

+72
-14
lines changed

3 files changed

+72
-14
lines changed

cmd/pbm-agent/backup.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/percona/percona-backup-mongodb/pbm/errors"
1212
"github.com/percona/percona-backup-mongodb/pbm/lock"
1313
"github.com/percona/percona-backup-mongodb/pbm/log"
14+
"github.com/percona/percona-backup-mongodb/pbm/oplog"
1415
"github.com/percona/percona-backup-mongodb/pbm/prio"
1516
"github.com/percona/percona-backup-mongodb/pbm/storage"
1617
"github.com/percona/percona-backup-mongodb/pbm/topo"
@@ -258,6 +259,19 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
258259
}
259260
} else {
260261
l.Info("backup finished")
262+
263+
// A successful backup creates a new PITR starting point.
264+
// Clear any fatal PITR error so the supervisor can restart slicing.
265+
if nodeInfo.IsLeader() {
266+
status, serr := oplog.GetClusterStatus(ctx, a.leadConn)
267+
if serr == nil && status == oplog.StatusError {
268+
if ierr := oplog.SetClusterStatus(ctx, a.leadConn, oplog.StatusUnset); ierr != nil {
269+
l.Warning("clear PITR error status: %v", ierr)
270+
} else {
271+
l.Info("PITR error state cleared after successful backup")
272+
}
273+
}
274+
}
261275
}
262276
}
263277

cmd/pbm-agent/pitr.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,14 @@ func (a *Agent) pitr(ctx context.Context) error {
245245
return nil
246246
}
247247

248+
// If PITR is in a fatal error state (e.g. no base backup), skip
249+
// nomination and wait cycles. A new backup is needed to clear this.
250+
cStatus, serr := oplog.GetClusterStatus(ctx, a.leadConn)
251+
if serr == nil && cStatus == oplog.StatusError {
252+
l.Info("pitr is in error state, new backup is required to resume")
253+
return nil
254+
}
255+
248256
if nodeInfo.IsClusterLeader() {
249257
// start monitor jobs on cluster leader
250258
a.startMon(ctx, cfg)
@@ -284,8 +292,12 @@ func (a *Agent) pitr(ctx context.Context) error {
284292
}
285293

286294
defer func() {
287-
if err != nil {
288-
l.Debug("setting RS error status for err: %v", err)
295+
if err == nil {
296+
return
297+
}
298+
l.Debug("setting RS error status for err: %v", err)
299+
var fatalErr slicer.FatalSlicerError
300+
if errors.As(err, &fatalErr) {
289301
if err := oplog.SetErrorRSStatus(ctx, a.leadConn, nodeInfo.SetName, nodeInfo.Me, err.Error()); err != nil {
290302
l.Error("error while setting error status: %v", err)
291303
}
@@ -379,10 +391,19 @@ func (a *Agent) pitr(ctx context.Context) error {
379391
monitorPrio,
380392
)
381393
if streamErr != nil {
382-
l.Error("streaming oplog: %v", streamErr)
383-
retErr := errors.Wrap(streamErr, "streaming oplog")
384-
if err := oplog.SetErrorRSStatus(ctx, a.leadConn, nodeInfo.SetName, nodeInfo.Me, retErr.Error()); err != nil {
385-
l.Error("setting RS status to StatusError: %v", err)
394+
var movedErr slicer.OpMovedError
395+
var fatalErr slicer.FatalSlicerError
396+
397+
switch {
398+
case errors.As(streamErr, &movedErr):
399+
l.Info("streaming stopped: %v", streamErr)
400+
case errors.As(streamErr, &fatalErr):
401+
l.Error("streaming oplog: %v", streamErr)
402+
if err := oplog.SetErrorRSStatus(ctx, a.leadConn, nodeInfo.SetName, nodeInfo.Me, streamErr.Error()); err != nil {
403+
l.Error("setting RS status to StatusError: %v", err)
404+
}
405+
default:
406+
l.Error("streaming oplog: %v", streamErr)
386407
}
387408
}
388409

@@ -402,6 +423,16 @@ func (a *Agent) leadNomination(
402423
) {
403424
l := log.LogEventFromContext(ctx)
404425

426+
status, err := oplog.GetClusterStatus(ctx, a.leadConn)
427+
if err != nil && !errors.Is(err, errors.ErrNotFound) {
428+
l.Error("get cluster status: %v", err)
429+
return
430+
}
431+
if status == oplog.StatusError {
432+
l.Info("pitr is in error state, new backup is required to resume")
433+
return
434+
}
435+
405436
l.Debug("checking locks in the whole cluster")
406437
noLocks, err := a.waitAllOpLockRelease(ctx)
407438
if err != nil {
@@ -884,11 +915,14 @@ func (a *Agent) pitrErrorMonitor(ctx context.Context) {
884915
continue
885916
}
886917

887-
l.Debug("error while executing pitr, pitr procedure will be restarted")
918+
l.Debug("error while executing pitr, pitr procedure will be stopped")
888919
err = oplog.SetClusterStatus(ctx, a.leadConn, oplog.StatusError)
889920
if err != nil {
890921
l.Error("error while setting cluster status Error: %v", err)
891922
}
923+
a.removePitr()
924+
a.stopMon()
925+
return
892926

893927
case <-ctx.Done():
894928
return

pbm/slicer/slicer.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *Slicer) Catchup(ctx context.Context) error {
8484
lastBackup, err := backup.GetLastBackup(ctx, s.leadClient, nil)
8585
if err != nil {
8686
if errors.Is(err, errors.ErrNotFound) {
87-
err = errors.New("no backup found. full backup is required to start PITR")
87+
return FatalSlicerError{errors.New("no backup found. full backup is required to start PITR")}
8888
}
8989
return errors.Wrap(err, "get last backup")
9090
}
@@ -98,19 +98,19 @@ func (s *Slicer) Catchup(ctx context.Context) error {
9898
}
9999
}
100100
if rs == nil {
101-
return errors.Errorf("no replset %q in the last backup %q. "+
101+
return FatalSlicerError{errors.Errorf("no replset %q in the last backup %q. "+
102102
"full backup is required to start PITR",
103-
s.rs, lastBackup.Name)
103+
s.rs, lastBackup.Name)}
104104
}
105105

106106
lastRestore, err := restore.GetLastRestore(ctx, s.leadClient)
107107
if err != nil && !errors.Is(err, errors.ErrNotFound) {
108108
return errors.Wrap(err, "get last restore")
109109
}
110110
if lastRestore != nil && lastBackup.StartTS < lastRestore.StartTS {
111-
return errors.Errorf("no backup found after the restored %s, "+
111+
return FatalSlicerError{errors.Errorf("no backup found after the restored %s, "+
112112
"a new backup is required to resume PITR",
113-
lastRestore.Backup)
113+
lastRestore.Backup)}
114114
}
115115

116116
lastChunk, err := oplog.PITRLastChunkMeta(ctx, s.leadClient, s.rs)
@@ -220,7 +220,7 @@ func (s *Slicer) OplogOnlyCatchup(ctx context.Context) error {
220220
return errors.Wrapf(err, "check oplog sufficiency for %v", lastChunk)
221221
}
222222
if !ok {
223-
return oplog.InsuffRangeError{lastChunk.EndTS}
223+
return FatalSlicerError{oplog.InsuffRangeError{lastChunk.EndTS}}
224224
}
225225

226226
s.lastTS = lastChunk.EndTS
@@ -291,6 +291,16 @@ func (e OpMovedError) Is(err error) bool {
291291
return ok
292292
}
293293

294+
// FatalSlicerError wraps errors that are permanent and cannot be resolved
295+
// by retrying. PITR should not restart until the underlying issue is fixed
296+
// (e.g. a new backup is taken).
297+
type FatalSlicerError struct {
298+
Err error
299+
}
300+
301+
func (e FatalSlicerError) Error() string { return e.Err.Error() }
302+
func (e FatalSlicerError) Unwrap() error { return e.Err }
303+
294304
// LogStartMsg message to log on successful streaming start
295305
const LogStartMsg = "start_ok"
296306

@@ -321,7 +331,7 @@ func (s *Slicer) Stream(
321331
return errors.Wrap(err, "check oplog sufficiency")
322332
}
323333
if !ok {
324-
return oplog.InsuffRangeError{s.lastTS}
334+
return FatalSlicerError{oplog.InsuffRangeError{s.lastTS}}
325335
}
326336
s.l.Debug(LogStartMsg)
327337

0 commit comments

Comments
 (0)