diff --git a/pkg/chains/legacyevm/chain.go b/pkg/chains/legacyevm/chain.go index e6a57b1259..837b753e83 100644 --- a/pkg/chains/legacyevm/chain.go +++ b/pkg/chains/legacyevm/chain.go @@ -236,6 +236,12 @@ func newChain(cfg *config.ChainScoped, nodes []*toml.Node, opts ChainRelayOpts, if opts.GenLogPoller != nil { logPoller = opts.GenLogPoller(chainID) } else { + var metrics *logpoller.PromBeholderMetrics + metrics, err = logpoller.NewPromBeholderMetrics(chainID.String(), chainselectors.FamilyEVM) + if err != nil { + return nil, fmt.Errorf("failed to create log poller metrics: %w", err) + } + lpOpts := logpoller.Opts{ PollPeriod: cfg.EVM().LogPollInterval(), UseFinalityTag: cfg.EVM().FinalityTagEnabled(), @@ -247,9 +253,11 @@ func newChain(cfg *config.ChainScoped, nodes []*toml.Node, opts ChainRelayOpts, BackupPollerBlockDelay: int64(cfg.EVM().BackupLogPollerBlockDelay()), ClientErrors: cfg.EVM().NodePool().Errors(), SkipEmptyBlocks: cfg.EVM().LogPollerSkipEmptyBlocks(), + Metrics: metrics, } - lpORM, err := logpoller.NewObservedORM(chainID, opts.DS, l) + var lpORM *logpoller.ObservedORM + lpORM, err = logpoller.NewObservedORM(chainID, opts.DS, l) if err != nil { return nil, fmt.Errorf("failed to create logpoller observed ORM: %w", err) } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 8d73e5ae89..4d02f64ec6 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -129,6 +129,7 @@ type logPoller struct { clientErrors config.ClientErrors backupPollerNextBlock int64 // next block to be processed by Backup LogPoller backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled + metrics Metrics filterMu sync.RWMutex filters map[string]Filter @@ -161,6 +162,7 @@ type Opts struct { LogPrunePageSize int64 ClientErrors config.ClientErrors SkipEmptyBlocks bool + Metrics Metrics // optional, if nil, no metrics will be recorded } // NewLogPoller creates a log poller. Note there is an assumption @@ -174,13 +176,18 @@ type Opts struct { // How fast that can be done depends largely on network speed and DB, but even for the fastest // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller { + m := opts.Metrics + lpLggr := logger.Sugared(logger.Named(lggr, "LogPoller")) + if m == nil { + m = NoopMetrics + } return &logPoller{ stopCh: make(chan struct{}), ec: ec, orm: orm, headTracker: headTracker, latencyMonitor: NewLatencyMonitor(ec, lggr, opts.PollPeriod), - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), + lggr: lpLggr, replayStart: make(chan int64), replayComplete: make(chan error), pollPeriod: opts.PollPeriod, @@ -195,6 +202,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke clientErrors: opts.ClientErrors, filters: make(map[string]Filter), filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. + metrics: m, } } @@ -698,6 +706,7 @@ func (lp *logPoller) run() { // Starting at the first finalized block. We do not backfill the first finalized block. start = latestFinalizedBlockNumber } else { + lp.metrics.RecordLastProcessedBlock(ctx, lastProcessed.BlockNumber) start = lastProcessed.BlockNumber + 1 } lp.PollAndSaveLogs(ctx, start, false) diff --git a/pkg/logpoller/metrics.go b/pkg/logpoller/metrics.go new file mode 100644 index 0000000000..95b6515a2f --- /dev/null +++ b/pkg/logpoller/metrics.go @@ -0,0 +1,60 @@ +package logpoller + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +var ( + promLpLastProcessedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "evm_log_poller_last_processed_block", + Help: "The last block that the log poller has processed. Main purpose is to signal if the log poller is stuck and not processing new blocks. May be reported with a delay.", + }, []string{"chainFamily", "chainID"}) +) + +type Metrics interface { + RecordLastProcessedBlock(ctx context.Context, lastProcessedBlock int64) +} + +var _ Metrics = (*PromBeholderMetrics)(nil) + +type PromBeholderMetrics struct { + chainID string + chainFamily string + lastProcessedBlock metric.Int64Gauge +} + +func NewPromBeholderMetrics(chainID string, chainFamily string) (*PromBeholderMetrics, error) { + lastProcessedBlock, err := beholder.GetMeter().Int64Gauge("evm_log_poller_last_processed_block") + if err != nil { + return nil, fmt.Errorf("failed to register last processed block metric: %w", err) + } + + return &PromBeholderMetrics{ + chainID: chainID, + chainFamily: chainFamily, + lastProcessedBlock: lastProcessedBlock, + }, nil +} + +func (m *PromBeholderMetrics) RecordLastProcessedBlock(ctx context.Context, lastProcessedBlock int64) { + promLpLastProcessedBlock.WithLabelValues(m.chainFamily, m.chainID).Set(float64(lastProcessedBlock)) + m.lastProcessedBlock.Record(ctx, lastProcessedBlock, metric.WithAttributes( + attribute.String("chainFamily", m.chainFamily), + attribute.String("chainID", m.chainID))) +} + +var _ Metrics = (*noopMetrics)(nil) + +var NoopMetrics = &noopMetrics{} + +type noopMetrics struct{} + +func (m *noopMetrics) RecordLastProcessedBlock(_ context.Context, _ int64) {}