Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/mempool"
)

// Supported ABCI Query prefixes and paths
Expand Down Expand Up @@ -965,6 +966,10 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
app.prepareCheckStater(app.checkState.Context())
}

if promoter, ok := app.mempool.(mempool.QueuePromoter); ok {
promoter.PromoteQueued(app.checkState.Context())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems I missed this part, but lately I fixed to use app.getState() function to get state to prevent race condition
#5

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so good to merge release/v0.50.x and fix to use getState function instead

}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
app.snapshotManager.SnapshotIfApplicable(header.Height)

Expand Down
17 changes: 16 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cockroachdb/errors"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/crypto/tmhash"
cmtmempool "github.com/cometbft/cometbft/mempool"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -417,6 +418,19 @@ func (app *BaseApp) Mempool() mempool.Mempool {
return app.mempool
}

// ConnectMempoolEvents implements servertypes.MempoolEventConnector.
func (app *BaseApp) ConnectMempoolEvents(eventCh chan cmtmempool.AppMempoolEvent) {
if receiver, ok := app.mempool.(mempool.EventReceiver); ok {
receiver.SetEventCh(eventCh)
}
}

// GetContextForSimulate returns a non-mutating context derived from the latest
// committed state, suitable for transaction simulation (e.g. mempool cleanup).
func (app *BaseApp) GetContextForSimulate(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeSimulate, txBytes)
}

// Init initializes the app. It seals the app, preventing any
// further modifications. In addition, it validates the app against
// the earlier provided settings. Returns an error if validation fails.
Expand Down Expand Up @@ -979,7 +993,8 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, res

switch mode {
case execModeCheck:
if err := app.mempool.Insert(mempoolCtx, tx); err != nil {
insertCtx := context.WithValue(mempoolCtx, mempool.TxBytesContextKey{}, txBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
insertCtx := context.WithValue(mempoolCtx, mempool.TxBytesContextKey{}, txBytes)
insertCtx := mempoolCtx.WithValue(mempool.TxBytesContextKey{}, txBytes)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I can see getContextForTx return context with txBytes already, so probably this is unnecessary?

	ctx := modeState.Context().
		WithTxBytes(txBytes).
		WithGasMeter(storetypes.NewInfiniteGasMeter())

if err := app.mempool.Insert(insertCtx, tx); err != nil {
return gInfo, nil, anteEvents, err
}

Expand Down
17 changes: 17 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands"
cmtcfg "github.com/cometbft/cometbft/config"
cmtjson "github.com/cometbft/cometbft/libs/json"
cmtmempool "github.com/cometbft/cometbft/mempool"
"github.com/cometbft/cometbft/node"
"github.com/cometbft/cometbft/p2p"
pvm "github.com/cometbft/cometbft/privval"
Expand Down Expand Up @@ -327,6 +328,8 @@ func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx clien
}
defer cleanupFn()

connectMempoolEvents(tmNode.Mempool(), app)

// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local CometBFT RPC client.
Expand Down Expand Up @@ -403,6 +406,20 @@ func startCmtNode(
return tmNode, cleanupFn, nil
}

// connectMempoolEvents wires the CometBFT mempool event channel to the
// application if both sides support it.
func connectMempoolEvents(mp cmtmempool.Mempool, app types.Application) {
ep, ok := mp.(cmtmempool.EventProvider)
if !ok {
return
}
connector, ok := app.(types.MempoolEventConnector)
if !ok {
return
}
connector.ConnectMempoolEvents(ep.AppEventCh())
}

func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"io"

cmtmempool "github.com/cometbft/cometbft/mempool"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
cmttypes "github.com/cometbft/cometbft/types"
dbm "github.com/cosmos/cosmos-db"
Expand Down Expand Up @@ -84,6 +85,14 @@ type (
ConsensusParams cmtproto.ConsensusParams
}

// MempoolEventConnector is optionally implemented by applications that
// receive CometBFT ProxyMempool event channel after node startup.
// The app uses this channel to push EventTxInserted and EventTxRemoved
// events so the CometBFT reactor can manage gossip and tx caching.
MempoolEventConnector interface {
ConnectMempoolEvents(eventCh chan cmtmempool.AppMempoolEvent)
}

// AppExporter is a function that dumps all app state to
// JSON-serializable structure and returns the current validator set.
AppExporter func(
Expand Down
33 changes: 30 additions & 3 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
upgradekeeper "cosmossdk.io/x/upgrade/keeper"
upgradetypes "cosmossdk.io/x/upgrade/types"
abci "github.com/cometbft/cometbft/abci/types"

dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
"github.com/spf13/cast"
Expand All @@ -53,6 +54,7 @@ import (
"github.com/cosmos/cosmos-sdk/std"
testdata_pulsar "github.com/cosmos/cosmos-sdk/testutil/testdata/testpb"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/cosmos/cosmos-sdk/types/msgservice"
sigtypes "github.com/cosmos/cosmos-sdk/types/tx/signing"
Expand Down Expand Up @@ -249,6 +251,18 @@ func NewSimApp(
}
baseAppOptions = append(baseAppOptions, voteExtOp, baseapp.SetOptimisticExecution())

// create the ProxyAppMempool and set it as a BaseApp option so it is used
// before the default proposal handler is created in NewBaseApp.
proxyAppMempool := sdkmempool.NewProxyAppMempool(
sdkmempool.ProxyAppMempoolConfig{
MaxTxsPerSender: 16,
MaxTotalTxs: 500,
},
nil,
txConfig.TxEncoder(),
)
baseAppOptions = append(baseAppOptions, baseapp.SetMempool(proxyAppMempool))

bApp := baseapp.NewBaseApp(appName, logger, db, txConfig.TxDecoder(), baseAppOptions...)
bApp.SetCommitMultiStoreTracer(traceStore)
bApp.SetVersion(version.Version)
Expand Down Expand Up @@ -288,6 +302,9 @@ func NewSimApp(
// add keepers
app.AccountKeeper = authkeeper.NewAccountKeeper(appCodec, runtime.NewKVStoreService(keys[authtypes.StoreKey]), authtypes.ProtoBaseAccount, maccPerms, authcodec.NewBech32Codec(sdk.Bech32MainPrefix), sdk.Bech32MainPrefix, authtypes.NewModuleAddress(govtypes.ModuleName).String())

// wire the AccountKeeper
proxyAppMempool.SetAccountKeeper(app.AccountKeeper)

app.BankKeeper = bankkeeper.NewBaseKeeper(
appCodec,
runtime.NewKVStoreService(keys[banktypes.StoreKey]),
Expand Down Expand Up @@ -565,7 +582,8 @@ func NewSimApp(
}

func (app *SimApp) setAnteHandler(txConfig client.TxConfig) {
anteHandler, err := NewAnteHandler(
// Full ante handler for PrepareProposal/ProcessProposal/FinalizeBlock
fullHandler, err := NewAnteHandler(
HandlerOptions{
ante.HandlerOptions{
AccountKeeper: app.AccountKeeper,
Expand All @@ -581,8 +599,17 @@ func (app *SimApp) setAnteHandler(txConfig client.TxConfig) {
panic(err)
}

// Set the AnteHandler for the app
app.SetAnteHandler(anteHandler)
minimalHandler, err := ante.NewMinimalAnteHandler(ante.MinimalHandlerOptions{
AccountKeeper: app.AccountKeeper,
SignModeHandler: txConfig.SignModeHandler(),
SigGasConsumer: ante.DefaultSigVerificationGasConsumer,
})
if err != nil {
panic(err)
}

dualHandler := ante.NewDualAnteHandler(minimalHandler, fullHandler)
app.SetAnteHandler(dualHandler)
}

func (app *SimApp) setPostHandler() {
Expand Down
Loading