diff --git a/go.mod b/go.mod index f7e765eb9..d524f720a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77 github.com/mitchellh/mapstructure v1.3.3 github.com/neilotoole/errgroup v0.1.5 - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/segmentio/fasthash v1.0.3 github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.6.1 diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index c280f6ce9..a4ff5fb52 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -16,11 +16,11 @@ package reconciler import ( "context" - "errors" "fmt" "log" "time" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/coinbase/rosetta-sdk-go/parser" @@ -149,7 +149,7 @@ func (r *Reconciler) QueueChanges( }: return nil case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } @@ -159,10 +159,10 @@ func (r *Reconciler) queueWorker(ctx context.Context) error { select { case req := <-r.processQueue: if err := r.queueChanges(ctx, req.Block, req.Changes); err != nil { - return err + return errors.WithStack(err) } case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } } @@ -212,7 +212,7 @@ func (r *Reconciler) queueChanges( change.Currency, HeadBehind, ); err != nil { - return err + return errors.WithStack(err) } continue @@ -230,7 +230,7 @@ func (r *Reconciler) queueChanges( err := r.inactiveAccountQueue(false, acctCurrency, block, true) r.inactiveQueueMutex.Unlock() if err != nil { - return err + return errors.WithStack(err) } // Add change to queueMap before enqueuing to ensure @@ -282,11 +282,7 @@ func (r *Reconciler) CompareBalance( // Head block should be set before we CompareBalance head, err := r.helper.CurrentBlock(ctx, dbTx) if err != nil { - return zeroString, "", 0, fmt.Errorf( - "%w: %v", - ErrGetCurrentBlockFailed, - err, - ) + return zeroString, "", 0, errors.Wrapf(ErrGetCurrentBlockFailed, "%v", err) } // Check if live block is < head (or wait) @@ -302,17 +298,17 @@ func (r *Reconciler) CompareBalance( // Check if live block is in store (ensure not reorged) canonical, err := r.helper.CanonicalBlock(ctx, dbTx, liveBlock) if err != nil { - return zeroString, "", 0, fmt.Errorf( - "%w: %v: on live block %+v", + return zeroString, "", 0, errors.Wrapf( ErrBlockExistsFailed, + "%v: on live block %+v", err, liveBlock, ) } if !canonical { - return zeroString, "", head.Index, fmt.Errorf( - "%w %+v", + return zeroString, "", head.Index, errors.Wrapf( ErrBlockGone, + "%+v", liveBlock, ) } @@ -327,17 +323,17 @@ func (r *Reconciler) CompareBalance( ) if err != nil { if errors.Is(err, storageErrors.ErrAccountMissing) { - return zeroString, "", head.Index, fmt.Errorf( - "%w for %+v:%+v", + return zeroString, "", head.Index, errors.Wrapf( storageErrors.ErrAccountMissing, + "for %+v:%+v", account, currency, ) } - return zeroString, "", head.Index, fmt.Errorf( - "%w for %+v:%+v: %v", + return zeroString, "", head.Index, errors.Wrapf( ErrGetComputedBalanceFailed, + "for %+v:%+v: %v", account, currency, err, @@ -346,7 +342,7 @@ func (r *Reconciler) CompareBalance( difference, err := types.SubtractValues(liveBalance, computedBalance.Value) if err != nil { - return "", "", -1, err + return "", "", -1, errors.WithStack(err) } return difference, computedBalance.Value, head.Index, nil @@ -377,9 +373,9 @@ func (r *Reconciler) bestLiveBalance( lookupIndex, ) if err != nil { - return nil, nil, fmt.Errorf( - "%w: unable to get live balance for %s %s at %d", + return nil, nil, errors.Wrapf( err, + "unable to get live balance for %s %s at %d", types.PrintStruct(account), types.PrintStruct(currency), lookupIndex, @@ -411,7 +407,7 @@ func (r *Reconciler) handleBalanceMismatch( if exemption != nil { // Return handler result (regardless if error) so that we don't invoke the handler for // a failed reconciliation as well. - return r.handler.ReconciliationExempt( + return errors.WithStack(r.handler.ReconciliationExempt( ctx, reconciliationType, account, @@ -420,7 +416,7 @@ func (r *Reconciler) handleBalanceMismatch( liveBalance, block, exemption, - ) + )) } // If we didn't find a matching exemption, @@ -436,7 +432,7 @@ func (r *Reconciler) handleBalanceMismatch( block, ) if err != nil { // error only returned if we should exit on failure - return err + return errors.WithStack(err) } return nil @@ -495,13 +491,13 @@ func (r *Reconciler) accountReconciliation( // after this new highWaterMark. r.highWaterMark = liveBlock.Index - return r.handler.ReconciliationSkipped( + return errors.WithStack(r.handler.ReconciliationSkipped( ctx, reconciliationType, account, currency, HeadBehind, - ) + )) } if errors.Is(err, ErrBlockGone) { @@ -512,13 +508,13 @@ func (r *Reconciler) accountReconciliation( types.PrintStruct(liveBlock), ) - return r.handler.ReconciliationSkipped( + return errors.WithStack(r.handler.ReconciliationSkipped( ctx, reconciliationType, account, currency, BlockGone, - ) + )) } if errors.Is(err, storageErrors.ErrAccountMissing) { @@ -547,7 +543,7 @@ func (r *Reconciler) accountReconciliation( } if difference != zeroString { - return r.handleBalanceMismatch( + return errors.WithStack(r.handleBalanceMismatch( ctx, difference, reconciliationType, @@ -556,20 +552,20 @@ func (r *Reconciler) accountReconciliation( computedBalance, liveAmount, liveBlock, - ) + )) } - return r.handler.ReconciliationSucceeded( + return errors.WithStack(r.handler.ReconciliationSucceeded( ctx, reconciliationType, accountCurrency.Account, accountCurrency.Currency, liveAmount, liveBlock, - ) + )) } - return ctx.Err() + return errors.WithStack(ctx.Err()) } func (r *Reconciler) inactiveAccountQueue( @@ -577,7 +573,7 @@ func (r *Reconciler) inactiveAccountQueue( accountCurrency *types.AccountCurrency, liveBlock *types.BlockIdentifier, hasLock bool, -) error { +) error { // nolint if !hasLock { r.inactiveQueueMutex.Lock(false) defer r.inactiveQueueMutex.Unlock() @@ -627,12 +623,12 @@ func (r *Reconciler) pruneBalances( return nil } - return r.helper.PruneBalances( + return errors.WithStack(r.helper.PruneBalances( ctx, acctCurrency.Account, acctCurrency.Currency, index-safeBalancePruneDepth, - ) + )) } // skipAndPrune calls the ReconciliationSkipped @@ -649,10 +645,10 @@ func (r *Reconciler) skipAndPrune( change.Currency, skipCause, ); err != nil { - return err + return errors.WithStack(err) } - return r.updateQueueMap( + return errors.WithStack(r.updateQueueMap( ctx, &types.AccountCurrency{ Account: change.Account, @@ -660,7 +656,7 @@ func (r *Reconciler) skipAndPrune( }, change.Block.Index, pruneActiveReconciliation, - ) + )) } // updateQueueMap removes a *parser.BalanceChange @@ -708,7 +704,7 @@ func (r *Reconciler) updateQueueMap( return nil } - return r.pruneBalances(ctx, acctCurrency, index) + return errors.WithStack(r.pruneBalances(ctx, acctCurrency, index)) } // reconcileActiveAccounts selects an account @@ -720,7 +716,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol for { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case balanceChange := <-r.changeQueue: if balanceChange.Block.Index < r.highWaterMark { r.debugLog( @@ -728,7 +724,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol ) if err := r.skipAndPrune(ctx, balanceChange, HeadBehind); err != nil { - return err + return errors.WithStack(err) } continue @@ -745,14 +741,14 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol // context is canceled. if errors.Is(err, context.Canceled) { r.wrappedActiveEnqueue(ctx, balanceChange) - return err + return errors.WithStack(err) } tip, tErr := r.helper.IndexAtTip(ctx, balanceChange.Block.Index) switch { case tErr == nil && tip: if err := r.skipAndPrune(ctx, balanceChange, TipFailure); err != nil { - return err + return errors.WithStack(err) } continue @@ -760,7 +756,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol fmt.Printf("%v: could not determine if at tip\n", tErr) } - return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) + return errors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) } err = r.accountReconciliation( @@ -778,7 +774,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol r.wrappedActiveEnqueue(ctx, balanceChange) } - return err + return errors.WithStack(err) } // Attempt to prune historical balances that will not be used @@ -792,7 +788,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol balanceChange.Block.Index, pruneActiveReconciliation, ); err != nil { - return err + return errors.WithStack(err) } r.updateLastChecked(balanceChange.Block.Index) @@ -894,7 +890,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // Ensure we don't leak reconciliations r.wrappedInactiveEnqueue(nextAcct.Entry, block) if errors.Is(err, context.Canceled) { - return err + return errors.WithStack(err) } tip, tErr := r.helper.IndexAtTip(ctx, head.Index) @@ -907,7 +903,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit nextAcct.Entry.Currency, TipFailure, ); err != nil { - return err + return errors.WithStack(err) } if err := r.updateQueueMap( @@ -916,7 +912,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit head.Index, pruneInactiveReconciliation, ); err != nil { - return err + return errors.WithStack(err) } continue @@ -924,7 +920,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit fmt.Printf("%v: could not determine if at tip\n", tErr) } - return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) + return errors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) } err = r.accountReconciliation( @@ -937,7 +933,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit ) if err != nil { r.wrappedInactiveEnqueue(nextAcct.Entry, block) - return err + return errors.WithStack(err) } // We always prune relative to the index we inserted @@ -950,7 +946,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit head.Index, pruneInactiveReconciliation, ); err != nil { - return err + return errors.WithStack(err) } // Always re-enqueue accounts after they have been inactively @@ -958,7 +954,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // these accounts again. err = r.inactiveAccountQueue(true, nextAcct.Entry, block, false) if err != nil { - return err + return errors.WithStack(err) } } else { r.inactiveQueueMutex.Unlock() @@ -972,7 +968,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit } } - return ctx.Err() + return errors.WithStack(ctx.Err()) } // Reconcile starts the active and inactive Reconciler goroutines. @@ -980,23 +976,23 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit func (r *Reconciler) Reconcile(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return r.queueWorker(ctx) + return errors.WithStack(r.queueWorker(ctx)) }) for j := 0; j < r.ActiveConcurrency; j++ { g.Go(func() error { - return r.reconcileActiveAccounts(ctx) + return errors.WithStack(r.reconcileActiveAccounts(ctx)) }) } for j := 0; j < r.InactiveConcurrency; j++ { g.Go(func() error { - return r.reconcileInactiveAccounts(ctx) + return errors.WithStack(r.reconcileInactiveAccounts(ctx)) }) } if err := g.Wait(); err != nil { - return err + return errors.WithStack(err) } return nil