Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/chains/legacyevm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func newChain(cfg *config.ChainScoped, nodes []*toml.Node, opts ChainRelayOpts,
if opts.GenLogPoller != nil {
logPoller = opts.GenLogPoller(chainID)
} else {
var metrics *logpoller.PromBeholderMetrics
metrics, err = logpoller.NewPromBeholderMetrics(chainID.String(), chainselectors.FamilyEVM)
if err != nil {
return nil, fmt.Errorf("failed to create log poller metrics: %w", err)
}

lpOpts := logpoller.Opts{
PollPeriod: cfg.EVM().LogPollInterval(),
UseFinalityTag: cfg.EVM().FinalityTagEnabled(),
Expand All @@ -247,9 +253,11 @@ func newChain(cfg *config.ChainScoped, nodes []*toml.Node, opts ChainRelayOpts,
BackupPollerBlockDelay: int64(cfg.EVM().BackupLogPollerBlockDelay()),
ClientErrors: cfg.EVM().NodePool().Errors(),
SkipEmptyBlocks: cfg.EVM().LogPollerSkipEmptyBlocks(),
Metrics: metrics,
}

lpORM, err := logpoller.NewObservedORM(chainID, opts.DS, l)
var lpORM *logpoller.ObservedORM
lpORM, err = logpoller.NewObservedORM(chainID, opts.DS, l)
if err != nil {
return nil, fmt.Errorf("failed to create logpoller observed ORM: %w", err)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type logPoller struct {
clientErrors config.ClientErrors
backupPollerNextBlock int64 // next block to be processed by Backup LogPoller
backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled
metrics Metrics

filterMu sync.RWMutex
filters map[string]Filter
Expand Down Expand Up @@ -161,6 +162,7 @@ type Opts struct {
LogPrunePageSize int64
ClientErrors config.ClientErrors
SkipEmptyBlocks bool
Metrics Metrics // optional, if nil, no metrics will be recorded
}

// NewLogPoller creates a log poller. Note there is an assumption
Expand All @@ -174,13 +176,18 @@ type Opts struct {
// How fast that can be done depends largely on network speed and DB, but even for the fastest
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller {
m := opts.Metrics
lpLggr := logger.Sugared(logger.Named(lggr, "LogPoller"))
if m == nil {
m = NoopMetrics
}
return &logPoller{
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
latencyMonitor: NewLatencyMonitor(ec, lggr, opts.PollPeriod),
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
lggr: lpLggr,
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
Expand All @@ -195,6 +202,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke
clientErrors: opts.ClientErrors,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
metrics: m,
}
}

Expand Down Expand Up @@ -698,6 +706,7 @@ func (lp *logPoller) run() {
// Starting at the first finalized block. We do not backfill the first finalized block.
start = latestFinalizedBlockNumber
} else {
lp.metrics.RecordLastProcessedBlock(ctx, lastProcessed.BlockNumber)
Comment thread
dhaidashenko marked this conversation as resolved.
start = lastProcessed.BlockNumber + 1
}
lp.PollAndSaveLogs(ctx, start, false)
Expand Down
60 changes: 60 additions & 0 deletions pkg/logpoller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package logpoller

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

var (
promLpLastProcessedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "evm_log_poller_last_processed_block",
Help: "The last block that the log poller has processed. Main purpose is to signal if the log poller is stuck and not processing new blocks. May be reported with a delay.",
}, []string{"chainFamily", "chainID"})
)

type Metrics interface {
RecordLastProcessedBlock(ctx context.Context, lastProcessedBlock int64)
}

var _ Metrics = (*PromBeholderMetrics)(nil)

type PromBeholderMetrics struct {
chainID string
chainFamily string
lastProcessedBlock metric.Int64Gauge
}

func NewPromBeholderMetrics(chainID string, chainFamily string) (*PromBeholderMetrics, error) {
lastProcessedBlock, err := beholder.GetMeter().Int64Gauge("evm_log_poller_last_processed_block")
if err != nil {
return nil, fmt.Errorf("failed to register last processed block metric: %w", err)
}

return &PromBeholderMetrics{
chainID: chainID,
chainFamily: chainFamily,
lastProcessedBlock: lastProcessedBlock,
}, nil
}

func (m *PromBeholderMetrics) RecordLastProcessedBlock(ctx context.Context, lastProcessedBlock int64) {
promLpLastProcessedBlock.WithLabelValues(m.chainFamily, m.chainID).Set(float64(lastProcessedBlock))
m.lastProcessedBlock.Record(ctx, lastProcessedBlock, metric.WithAttributes(
attribute.String("chainFamily", m.chainFamily),
attribute.String("chainID", m.chainID)))
}

var _ Metrics = (*noopMetrics)(nil)

var NoopMetrics = &noopMetrics{}

type noopMetrics struct{}

func (m *noopMetrics) RecordLastProcessedBlock(_ context.Context, _ int64) {}
Loading