Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 81 additions & 12 deletions internal/persistence/leveldb/leveldb_persistence.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2025 Kaleido, Inc.
// Copyright © 2025 - 2026 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -356,9 +356,9 @@ func (p *leveldbPersistence) cleanupOrphanedTXIdxKeys(ctx context.Context, orpha
}
}

func (p *leveldbPersistence) listTransactionsByIndex(ctx context.Context, collectionPrefix, collectionEnd, afterStr string, limit int, dir txhandler.SortDirection) ([]*apitypes.ManagedTX, error) {

p.txMux.RLock()
// listTransactionsByIndexLocked performs the actual list operation without acquiring locks.
// The caller must already hold a lock (read lock is sufficient)
func (p *leveldbPersistence) listTransactionsByIndexLocked(ctx context.Context, collectionPrefix, collectionEnd, afterStr string, limit int, dir txhandler.SortDirection) ([]*apitypes.ManagedTX, [][]byte, error) {
transactions := make([]*apitypes.ManagedTX, 0)
orphanedIdxKeys, err := p.listJSON(ctx, collectionPrefix, collectionEnd, afterStr, limit, dir,
func() interface{} { var v *apitypes.ManagedTX; return &v },
Expand All @@ -369,6 +369,16 @@ func (p *leveldbPersistence) listTransactionsByIndex(ctx context.Context, collec
},
p.indexLookupCallback,
)
if err != nil {
return nil, nil, err
}
return transactions, orphanedIdxKeys, nil
}

func (p *leveldbPersistence) listTransactionsByIndex(ctx context.Context, collectionPrefix, collectionEnd, afterStr string, limit int, dir txhandler.SortDirection) ([]*apitypes.ManagedTX, error) {

p.txMux.RLock()
transactions, orphanedIdxKeys, err := p.listTransactionsByIndexLocked(ctx, collectionPrefix, collectionEnd, afterStr, limit, dir)
p.txMux.RUnlock()
if err != nil {
return nil, err
Expand Down Expand Up @@ -499,6 +509,63 @@ func (p *leveldbPersistence) InsertTransactionWithNextNonce(ctx context.Context,

}

// InsertTransactionsWithNextNonce inserts multiple transactions with their next nonce assigned.
func (p *leveldbPersistence) InsertTransactionsWithNextNonce(ctx context.Context, txs []*apitypes.ManagedTX, nextNonceCB txhandler.NextNonceCallback) []error {
if len(txs) == 0 {
return nil
}
errs := make([]error, len(txs))

// for leveldb, we process transactions sequentially but group by signer
// to optimize nonce assignment. All writes happen within the same lock.
p.txMux.Lock()
defer p.txMux.Unlock()

// group by signer for efficient nonce assignment, tracking indices
type txWithIndex struct {
tx *apitypes.ManagedTX
index int
}
txsBySigner := make(map[string][]txWithIndex)
for i, tx := range txs {
if tx.From == "" {
errs[i] = i18n.NewError(ctx, tmmsgs.MsgTransactionOpInvalid)
continue
}
txsBySigner[tx.From] = append(txsBySigner[tx.From], txWithIndex{tx: tx, index: i})
}

// assign nonces and write all transactions, tracking errors per transaction
for signer, signerTxs := range txsBySigner {
for _, txWithIdx := range signerTxs {
// use assignAndLockNonceLocked since we already hold the write lock
lockedNonce, err := p.assignAndLockNonceLocked(ctx, txWithIdx.tx.ID, signer, nextNonceCB)
if err != nil {
errs[txWithIdx.index] = err
continue
}

//nolint:gosec // Safe conversion as nonce is always positive
txWithIdx.tx.Nonce = fftypes.NewFFBigInt(int64(lockedNonce.nonce))

// use writeTransactionLocked since we already hold the write lock
if err = p.writeTransactionLocked(ctx, &apitypes.TXWithStatus{
ManagedTX: txWithIdx.tx,
}, true); err != nil {
lockedNonce.complete(ctx)
errs[txWithIdx.index] = err
continue
}

lockedNonce.spent = true
lockedNonce.complete(ctx)
errs[txWithIdx.index] = nil
}
}

return errs
}

func (p *leveldbPersistence) InsertTransactionPreAssignedNonce(ctx context.Context, tx *apitypes.ManagedTX) (err error) {
return p.writeTransaction(ctx, &apitypes.TXWithStatus{
ManagedTX: tx,
Expand Down Expand Up @@ -591,14 +658,9 @@ func (p *leveldbPersistence) UpdateTransaction(ctx context.Context, txID string,
return p.writeTransaction(ctx, tx, false)
}

func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.TXWithStatus, newTx bool) (err error) {
// We take a write-lock here, because we are writing multiple values (the indexes), and anybody
// attempting to read the critical nonce allocation index must know the difference between a partial write
// (we crashed before we completed all the writes) and an incomplete write that's in process.
// The reading code detects partial writes and cleans them up if it finds them.
p.txMux.Lock()
defer p.txMux.Unlock()

// writeTransactionLocked performs the actual write operation without acquiring locks.
// The caller must already hold the write lock
func (p *leveldbPersistence) writeTransactionLocked(ctx context.Context, tx *apitypes.TXWithStatus, newTx bool) (err error) {
// We don't double store these values.
// Would be great to reconcile out this historical oddity, once the only place it's available is on the
// API, and FireFly core and any other consumers of the API have had time to update to use the base fields
Expand Down Expand Up @@ -652,6 +714,13 @@ func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.
return err
}

func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.TXWithStatus, newTx bool) (err error) {
p.txMux.Lock()
defer p.txMux.Unlock()

return p.writeTransactionLocked(ctx, tx, newTx)
}

func (p *leveldbPersistence) DeleteTransaction(ctx context.Context, txID string) error {
var tx *apitypes.ManagedTX
err := p.readJSON(ctx, txDataKey(txID), &tx)
Expand Down
162 changes: 162 additions & 0 deletions internal/persistence/leveldb/leveldb_persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,3 +1155,165 @@ func TestAddReceivedStatusWhenNothingSet(t *testing.T) {
assert.Equal(t, apitypes.TxSubStatusReceived, txh.History[0].Status)
assert.Equal(t, apitypes.TxActionSubmitTransaction, txh.History[0].Actions[0].Action)
}

func TestInsertTransactionsWithNextNonceLevelDB(t *testing.T) {
ctx, p, done := newTestLevelDBPersistence(t)
defer done()

// test a batch using the same signing address
nonce := uint64(1000)
nextNonceCB := func(ctx context.Context, signer string) (uint64, error) {
nonce++
return nonce, nil
}

txs := make([]*apitypes.ManagedTX, 5)
for i := 0; i < 5; i++ {
txs[i] = &apitypes.ManagedTX{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "signer_0",
},
TransactionData: fmt.Sprintf("0x%04d", i),
}
}

errs := p.InsertTransactionsWithNextNonce(ctx, txs, nextNonceCB)
assert.Len(t, errs, 5)
for i, err := range errs {
assert.NoError(t, err, "Transaction %d should succeed", i)
assert.NotEmpty(t, txs[i].SequenceID, "Transaction %d should have sequence ID", i)
assert.NotNil(t, txs[i].Nonce, "Transaction %d should have nonce", i)
// Verify nonces are sequential
if i > 0 {
assert.Equal(t, txs[i-1].Nonce.Int64()+1, txs[i].Nonce.Int64(), "Nonces should be sequential")
}
}

// verify all transactions were persisted
for i, tx := range txs {
retrieved, err := p.GetTransactionByID(ctx, tx.ID)
assert.NoError(t, err)
assert.NotNil(t, retrieved, "Transaction %d should be retrievable", i)
assert.Equal(t, tx.ID, retrieved.ID)
assert.Equal(t, tx.Nonce, retrieved.Nonce)
}
}

func TestInsertTransactionsWithNextNonceMixedSignersLevelDB(t *testing.T) {
ctx, p, done := newTestLevelDBPersistence(t)
defer done()

// test a batch using different signing addresses
nonce1 := uint64(2000)
nonce2 := uint64(3000)
nextNonceCB := func(ctx context.Context, signer string) (uint64, error) {
if signer == "signer_1" {
nonce1++
return nonce1, nil
}
nonce2++
return nonce2, nil
}

txs := []*apitypes.ManagedTX{
{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "signer_1",
},
},
{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "signer_2",
},
},
{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "signer_1",
},
},
}

errs := p.InsertTransactionsWithNextNonce(ctx, txs, nextNonceCB)
assert.Len(t, errs, 3)
for i, err := range errs {
assert.NoError(t, err, "Transaction %d should succeed", i)
assert.NotEmpty(t, txs[i].SequenceID, "Transaction %d should have sequence ID", i)
}
}

func TestInsertTransactionsWithNextNonceInvalidLevelDB(t *testing.T) {
ctx, p, done := newTestLevelDBPersistence(t)
defer done()

// test a batch with an invalid transaction (missing From)
nextNonceCB := func(ctx context.Context, signer string) (uint64, error) {
return 1000, nil
}

txs := []*apitypes.ManagedTX{
{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "signer_0",
},
},
{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "", // Invalid - missing From
},
},
{
ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()),
Status: apitypes.TxStatusPending,
Created: fftypes.Now(),
TransactionHeaders: ffcapi.TransactionHeaders{
From: "signer_0",
},
},
}

errs := p.InsertTransactionsWithNextNonce(ctx, txs, nextNonceCB)
assert.Len(t, errs, 3)
assert.NoError(t, errs[0], "First transaction should succeed")
assert.Error(t, errs[1], "Second transaction should fail (invalid)")
assert.NoError(t, errs[2], "Third transaction should succeed")

// verify valid transactions were persisted
retrieved1, err := p.GetTransactionByID(ctx, txs[0].ID)
assert.NoError(t, err)
assert.NotNil(t, retrieved1)

retrieved3, err := p.GetTransactionByID(ctx, txs[2].ID)
assert.NoError(t, err)
assert.NotNil(t, retrieved3)
}

func TestInsertTransactionsWithNextNonceEmptyLevelDB(t *testing.T) {
ctx, p, done := newTestLevelDBPersistence(t)
defer done()

nextNonceCB := func(ctx context.Context, signer string) (uint64, error) {
return 1000, nil
}

// test an empty batch
errs := p.InsertTransactionsWithNextNonce(ctx, []*apitypes.ManagedTX{}, nextNonceCB)
assert.Nil(t, errs)
}
48 changes: 40 additions & 8 deletions internal/persistence/leveldb/nonces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,21 @@ func (ln *lockedNonce) complete(ctx context.Context) {
ln.th.nonceMux.Unlock()
}

// assignAndLockNonceLocked assigns and locks a nonce without acquiring the txMux write lock.
// The caller must already hold the write lock
func (p *leveldbPersistence) assignAndLockNonceLocked(ctx context.Context, nsOpID, signer string, nextNonceCB txhandler.NextNonceCallback) (*lockedNonce, error) {
return p.assignAndLockNonceWithCalc(ctx, nsOpID, signer, nextNonceCB, p.calcNextNonceLocked)
}

func (p *leveldbPersistence) assignAndLockNonce(ctx context.Context, nsOpID, signer string, nextNonceCB txhandler.NextNonceCallback) (*lockedNonce, error) {
return p.assignAndLockNonceWithCalc(ctx, nsOpID, signer, nextNonceCB, p.calcNextNonce)
}

// assignAndLockNonceWithCalc is the common implementation for assigning and locking a nonce.
// It takes a calcNextNonceFunc parameter to allow using either the locked or unlocked version.
type calcNextNonceFunc func(ctx context.Context, signer string, nextNonceCB txhandler.NextNonceCallback) (uint64, error)

func (p *leveldbPersistence) assignAndLockNonceWithCalc(ctx context.Context, nsOpID, signer string, nextNonceCB txhandler.NextNonceCallback, calcNextNonceFunc calcNextNonceFunc) (*lockedNonce, error) {
for {
// Take the lock to query our nonce cache, and check if we are already locked
p.nonceMux.Lock()
Expand All @@ -66,14 +79,13 @@ func (p *leveldbPersistence) assignAndLockNonce(ctx context.Context, nsOpID, sig
}
p.nonceMux.Unlock()

// If we're locked, then wait
if isLocked {
log.L(ctx).Debugf("Contention for next nonce for signer %s", signer)
<-locked.unlocked
} else if doLookup {
// We have to ensure we either successfully return a nonce,
// or otherwise we unlock when we send the error
nextNonce, err := p.calcNextNonce(ctx, signer, nextNonceCB)
nextNonce, err := calcNextNonceFunc(ctx, signer, nextNonceCB)
if err != nil {
locked.complete(ctx)
return nil, err
Expand All @@ -82,19 +94,40 @@ func (p *leveldbPersistence) assignAndLockNonce(ctx context.Context, nsOpID, sig
return locked, nil
}
}
}

// calcNextNonceLocked calculates the next nonce without acquiring locks.
// The caller must already hold the write lock (p.txMux.Lock()).
func (p *leveldbPersistence) calcNextNonceLocked(ctx context.Context, signer string, nextNonceCB txhandler.NextNonceCallback) (uint64, error) {
afterStr := ""
txns, orphanedIdxKeys, err := p.listTransactionsByIndexLocked(ctx, signerNoncePrefix(signer), signerNonceEnd(signer), afterStr, 1, 1)
if err != nil {
return 0, err
}
// Clean up orphaned keys if found (we already hold the write lock)
if len(orphanedIdxKeys) > 0 {
err := p.deleteKeys(ctx, orphanedIdxKeys...)
if err != nil {
log.L(ctx).Warnf("Failed to clean up orphaned index keys: %s", err)
}
}
return p.calcNextNonceFromTxs(ctx, signer, nextNonceCB, txns)
}

func (p *leveldbPersistence) calcNextNonce(ctx context.Context, signer string, nextNonceCB txhandler.NextNonceCallback) (uint64, error) {

// First we check our DB to find the last nonce we used for this address.
// Note we are within the nonce-lock in assignAndLockNonce for this signer, so we can be sure we're the
// only routine attempting this right now.
var lastTxn *apitypes.ManagedTX
txns, err := p.ListTransactionsByNonce(ctx, signer, nil, 1, 1)
if err != nil {
return 0, err
}
return p.calcNextNonceFromTxs(ctx, signer, nextNonceCB, txns)
}

// calcNextNonceFromTxs contains the common logic for calculating the next nonce from a list of transactions.
// First we check our DB to find the last nonce we used for this address.
// Note we are within the nonce-lock in assignAndLockNonce for this signer, so we can be sure we're the
// only routine attempting this right now.
func (p *leveldbPersistence) calcNextNonceFromTxs(ctx context.Context, signer string, nextNonceCB txhandler.NextNonceCallback, txns []*apitypes.ManagedTX) (uint64, error) {
var lastTxn *apitypes.ManagedTX
if len(txns) > 0 {
lastTxn = txns[0]
if time.Since(*lastTxn.Created.Time()) < p.nonceStateTimeout {
Expand All @@ -119,5 +152,4 @@ func (p *leveldbPersistence) calcNextNonce(ctx context.Context, signer string, n
}

return nextNonce, nil

}
Loading