diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index 255b85e09e2..289bab30da0 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -8,8 +8,10 @@ import ( "sort" "strconv" "sync" + "sync/atomic" "time" + "github.com/patrickmn/go-cache" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/exp/maps" @@ -39,6 +41,22 @@ var ( }, []string{"streamID"}, ) + promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "datasource", + Name: "cache_hit_count", + Help: "Number of local observation cache hits", + }, + []string{"streamID"}, + ) + promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "datasource", + Name: "cache_miss_count", + Help: "Number of local observation cache misses", + }, + []string{"streamID"}, + ) ) type Registry interface { @@ -77,16 +95,32 @@ var _ llo.DataSource = &dataSource{} type dataSource struct { lggr logger.Logger registry Registry + t Telemeter - t Telemeter + shouldCache *atomic.Bool + cache *cache.Cache } func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { - return newDataSource(lggr, registry, t) + return newDataSource(lggr, registry, t, true) } -func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSource { - return &dataSource{logger.Named(lggr, "DataSource"), registry, t} +func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, cacheEnabled bool) *dataSource { + shouldCache := &atomic.Bool{} + shouldCache.Store(cacheEnabled) + + return &dataSource{ + lggr: logger.Named(lggr, "DataSource"), + registry: registry, + t: t, + + // Cache valid observations between rounds for 1 second to avoid exhausting + // node network and the underlying adapter's resources when dealing + // with a large number of streams. It is cleaned up every minute to + // remove stale observations for removed streams. + shouldCache: shouldCache, + cache: cache.New(time.Second, time.Minute), + } } // Observe looks up all streams in the registry and populates a map of stream ID => value @@ -137,17 +171,26 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, for _, streamID := range maps.Keys(streamValues) { go func(streamID llotypes.StreamID) { defer wg.Done() - val, err := oc.Observe(ctx, streamID, opts) - if err != nil { - strmIDStr := strconv.FormatUint(uint64(streamID), 10) - if errors.As(err, &MissingStreamError{}) { - promMissingStreamCount.WithLabelValues(strmIDStr).Inc() + var val llo.StreamValue + var err error + + // check for valid cached value before observing + if val = d.fromCache(streamID); val == nil { + // no valid cached value, observe the stream + if val, err = oc.Observe(ctx, streamID, opts); err != nil { + strmIDStr := strconv.FormatUint(uint64(streamID), 10) + if errors.As(err, &MissingStreamError{}) { + promMissingStreamCount.WithLabelValues(strmIDStr).Inc() + } + promObservationErrorCount.WithLabelValues(strmIDStr).Inc() + mu.Lock() + errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) + mu.Unlock() + return } - promObservationErrorCount.WithLabelValues(strmIDStr).Inc() - mu.Lock() - errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) - mu.Unlock() - return + + // cache the observed value + d.toCache(streamID, val) } mu.Lock() @@ -192,3 +235,25 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, return nil } + +func (d *dataSource) fromCache(streamID llotypes.StreamID) llo.StreamValue { + if d.shouldCache.Load() { + cacheKey := strconv.FormatUint(uint64(streamID), 10) + if cachedVal, found := d.cache.Get(cacheKey); found && cachedVal != nil { + streamValue := cachedVal.(llo.StreamValue) + promCacheHitCount.WithLabelValues(cacheKey).Inc() + return streamValue + } + promCacheMissCount.WithLabelValues(cacheKey).Inc() + } + return nil +} + +func (d *dataSource) toCache(streamID llotypes.StreamID, val llo.StreamValue) { + if d.shouldCache.Load() && val != nil { + cacheKey := strconv.FormatUint(uint64(streamID), 10) + + // set with default expiration + d.cache.SetDefault(cacheKey, val) + } +} diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 7a97b116d53..55ab4faefa5 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + gocache "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,9 +23,7 @@ import ( ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" - "github.com/smartcontractkit/chainlink-data-streams/llo" - datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" @@ -121,17 +121,17 @@ func (m *mockTelemeter) MakeObservationScopedTelemetryCh(opts llo.DSOpts, size i m.ch = make(chan interface{}, size) return m.ch } -func (m *mockTelemeter) GetOutcomeTelemetryCh() chan<- *datastreamsllo.LLOOutcomeTelemetry { +func (m *mockTelemeter) GetOutcomeTelemetryCh() chan<- *llo.LLOOutcomeTelemetry { return nil } -func (m *mockTelemeter) GetReportTelemetryCh() chan<- *datastreamsllo.LLOReportTelemetry { return nil } -func (m *mockTelemeter) CaptureEATelemetry() bool { return true } -func (m *mockTelemeter) CaptureObservationTelemetry() bool { return true } +func (m *mockTelemeter) GetReportTelemetryCh() chan<- *llo.LLOReportTelemetry { return nil } +func (m *mockTelemeter) CaptureEATelemetry() bool { return true } +func (m *mockTelemeter) CaptureObservationTelemetry() bool { return true } func Test_DataSource(t *testing.T) { lggr := logger.TestLogger(t) reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, NullTelemeter) + ds := newDataSource(lggr, reg, telem.NullTelemeter, false) ctx := testutils.Context(t) opts := &mockOpts{} @@ -168,8 +168,8 @@ func Test_DataSource(t *testing.T) { assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), 1: nil, + 2: llo.ToDecimal(decimal.NewFromInt(40602)), 3: nil, }, vals) }) @@ -258,6 +258,122 @@ func Test_DataSource(t *testing.T) { assert.Nil(t, pkt.val) assert.Error(t, pkt.err) }) + + t.Run("uses cached values when available", func(t *testing.T) { + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + + // First observation to populate cache + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) + reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + + vals := makeStreamValues() + err := ds.Observe(ctx, vals, opts) + require.NoError(t, err) + + // Verify initial values + assert.Equal(t, llo.StreamValues{ + 1: llo.ToDecimal(decimal.NewFromInt(2181)), + 2: llo.ToDecimal(decimal.NewFromInt(40602)), + 3: nil, + }, vals) + + // Change pipeline results + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil) + reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil) + + // Second observation should use cached values + vals = makeStreamValues() + err = ds.Observe(ctx, vals, opts) + require.NoError(t, err) + + // Should still have original values from cache + assert.Equal(t, llo.StreamValues{ + 1: llo.ToDecimal(decimal.NewFromInt(2181)), + 2: llo.ToDecimal(decimal.NewFromInt(40602)), + 3: nil, + }, vals) + + // Verify cache metrics + assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheHitCount.WithLabelValues("1")), 0.0001) + assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheHitCount.WithLabelValues("2")), 0.0001) + assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheMissCount.WithLabelValues("1")), 0.0001) + assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheMissCount.WithLabelValues("2")), 0.0001) + }) + + t.Run("refreshes cache after expiration", func(t *testing.T) { + // Create a new data source with a very short cache TTL + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + ds.cache = gocache.New(10*time.Millisecond, 1*time.Minute) + + // First observation + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + vals := llo.StreamValues{1: nil} + + err := ds.Observe(ctx, vals, opts) + require.NoError(t, err) + + // Wait for cache to expire + time.Sleep(20 * time.Millisecond) + + // Change pipeline result + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(200), nil) + + // Second observation should use new value + vals = llo.StreamValues{1: nil} + err = ds.Observe(ctx, vals, opts) + require.NoError(t, err) + + assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(200))}, vals) + }) + + t.Run("handles concurrent cache access", func(t *testing.T) { + // Create a new data source + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + + // Set up pipeline to return different values + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + + // First observation to cache + vals := llo.StreamValues{1: nil} + err := ds.Observe(ctx, vals, opts) + require.NoError(t, err) + + // Run multiple observations concurrently + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + vals := llo.StreamValues{1: nil} + err := ds.Observe(ctx, vals, opts) + assert.NoError(t, err) + assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals) + }() + } + wg.Wait() + + // Verify pipeline was only called once + assert.Equal(t, 1, reg.pipelines[1].runCount) + }) + + t.Run("handles cache errors gracefully", func(t *testing.T) { + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + ds.cache = gocache.New(100*time.Millisecond, 1*time.Minute) + + // First observation with error + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, nil, errors.New("pipeline error")) + vals := makeStreamValues() + err := ds.Observe(ctx, vals, opts) + require.NoError(t, err) // Observe returns nil error even if some streams fail + + // Second observation should try again (not use cache for error case) + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + vals = llo.StreamValues{1: nil} + err = ds.Observe(ctx, vals, opts) + require.NoError(t, err) + + assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals) + }) }) } @@ -317,7 +433,7 @@ multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion result1 -> multiply2; result2 -> result2_parse; -result3 -> result3_parse -> multiply3; +result3 -> result3_parse -> multiply3; `, i+n, i+2*n, i+3*n), }, } @@ -325,7 +441,7 @@ result3 -> result3_parse -> multiply3; require.NoError(b, err) } - ds := newDataSource(lggr, r, NullTelemeter) + ds := newDataSource(lggr, r, telem.NullTelemeter, false) vals := make(map[llotypes.StreamID]llo.StreamValue) for i := uint32(0); i < 4*n; i++ { vals[i] = nil diff --git a/core/services/llo/mercurytransmitter/orm.go b/core/services/llo/mercurytransmitter/orm.go index 54be2c866b1..00e87f530db 100644 --- a/core/services/llo/mercurytransmitter/orm.go +++ b/core/services/llo/mercurytransmitter/orm.go @@ -208,10 +208,10 @@ func (o *orm) Prune(ctx context.Context, serverURL string, maxSize, batchSize in res, err = o.ds.ExecContext(ctx, ` DELETE FROM llo_mercury_transmit_queue AS q USING ( - SELECT transmission_hash + SELECT transmission_hash FROM llo_mercury_transmit_queue - WHERE don_id = $1 - AND server_url = $2 + WHERE don_id = $1 + AND server_url = $2 AND seq_nr < $3 ORDER BY seq_nr ASC LIMIT $4 diff --git a/core/services/llo/mercurytransmitter/queue.go b/core/services/llo/mercurytransmitter/queue.go index d715b849eee..60525b2776d 100644 --- a/core/services/llo/mercurytransmitter/queue.go +++ b/core/services/llo/mercurytransmitter/queue.go @@ -145,6 +145,12 @@ func (tq *transmitQueue) IsEmpty() bool { return tq.pq.Len() == 0 } +func (tq *transmitQueue) Len() int { + tq.mu.RLock() + defer tq.mu.RUnlock() + return tq.pq.Len() +} + func (tq *transmitQueue) Start(context.Context) error { return tq.StartOnce("TransmitQueue", func() error { t := services.NewTicker(promInterval) diff --git a/core/services/llo/mercurytransmitter/queue_test.go b/core/services/llo/mercurytransmitter/queue_test.go index 253e5cf9b42..276896d9d3b 100644 --- a/core/services/llo/mercurytransmitter/queue_test.go +++ b/core/services/llo/mercurytransmitter/queue_test.go @@ -133,7 +133,7 @@ func Test_Queue(t *testing.T) { } tq.Push(testTransmissions[maxSize+3]) // push one more to trigger eviction - require.Equal(t, maxSize, tq.(*transmitQueue).pq.Len()) + require.Equal(t, maxSize, tq.(*transmitQueue).Len()) require.Len(t, deleter.hashes, 4) // evicted overfill entries (3 oversize plus 1 more to make room) // oldest entries removed diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 654768e97ed..50632ff2248 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -30,7 +31,9 @@ import ( const ( // Mercury server error codes - DuplicateReport = 2 + DuplicateReport = 2 + commitInterval = time.Millisecond * 25 + commitBufferSize = 1000 ) var ( @@ -124,6 +127,8 @@ type transmitter struct { stopCh services.StopChan wg *sync.WaitGroup + + commitCh chan *Transmission } type Opts struct { @@ -159,6 +164,7 @@ func newTransmitter(opts Opts) *transmitter { fmt.Sprintf("%x", opts.FromAccount), make(services.StopChan), &sync.WaitGroup{}, + make(chan *Transmission, 1000*len(servers)), } } @@ -201,6 +207,7 @@ func (mt *transmitter) Start(ctx context.Context) (err error) { }) } + mt.spawnCommitLoops() return g.Wait() }) } @@ -249,37 +256,36 @@ func (mt *transmitter) Transmit( sigs []types.AttributedOnchainSignature, ) (err error) { ok := mt.IfStarted(func() { - err = mt.transmit(ctx, digest, seqNr, report, sigs) + for serverURL := range mt.servers { + t := &Transmission{ + ServerURL: serverURL, + ConfigDigest: digest, + SeqNr: seqNr, + Report: report, + Sigs: sigs, + } + select { + case mt.commitCh <- t: + case <-ctx.Done(): + err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err()) + } + } }) + if !ok { return errors.New("transmitter is not started") } - return + + return err } -func (mt *transmitter) transmit( - ctx context.Context, - digest types.ConfigDigest, - seqNr uint64, - report ocr3types.ReportWithInfo[llotypes.ReportInfo], - sigs []types.AttributedOnchainSignature, -) error { +func (mt *transmitter) transmit(ctx context.Context, transmissions []*Transmission) error { // On shutdown appears that libocr can pass us a pre-canceled context; // don't even bother trying to insert/transmit in this case if ctx.Err() != nil { return fmt.Errorf("cannot transmit; context already canceled: %w", ctx.Err()) } - transmissions := make([]*Transmission, 0, len(mt.servers)) - for serverURL := range mt.servers { - transmissions = append(transmissions, &Transmission{ - ServerURL: serverURL, - ConfigDigest: digest, - SeqNr: seqNr, - Report: report, - Sigs: sigs, - }) - } // NOTE: This insert on its own can leave orphaned records in the case of // shutdown, because: // 1. Transmitter is shut down after oracle @@ -313,11 +319,15 @@ func (mt *transmitter) transmit( for i := range transmissions { t := transmissions[i] if mt.verboseLogging { - mt.lggr.Debugw("Transmit report", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + mt.lggr.Debugw("Transmit report", + "digest", t.ConfigDigest.Hex(), "seqNr", t.SeqNr, "reportFormat", t.Report.Info.ReportFormat, + "reportLifeCycleStage", t.Report.Info.LifeCycleStage, + "transmissionHash", fmt.Sprintf("%x", t.Hash())) } - s := mt.servers[t.ServerURL] + // OK to do this synchronously since pushing to queue is just a mutex // lock and array append and ought to be extremely fast + s := mt.servers[t.ServerURL] if ok := s.q.Push(t); !ok { s.transmitQueuePushErrorCount.Inc() // This shouldn't be possible since transmitter is always shut down @@ -333,3 +343,49 @@ func (mt *transmitter) transmit( func (mt *transmitter) FromAccount(ctx context.Context) (ocrtypes.Account, error) { return ocrtypes.Account(mt.fromAccount), nil } + +func (mt *transmitter) spawnCommitLoops() { + for x := 0; x < len(mt.servers); x++ { + mt.wg.Add(1) + + go func() { + defer mt.wg.Done() + + var err error + ctx := context.Background() + buff := cap(mt.commitCh) / 10 + transmissions := make([]*Transmission, 0, buff) + ticker := time.NewTicker(commitInterval) + defer ticker.Stop() + + for { + select { + case <-mt.stopCh: + if len(transmissions) >= buff { + if err = mt.transmit(ctx, transmissions); err != nil { + mt.lggr.Error("Error transmitting records", "error", err) + } + } + return + + case <-ticker.C: + if len(transmissions) > 0 { + err = mt.transmit(ctx, transmissions) + transmissions = make([]*Transmission, 0, buff) + } + + case t := <-mt.commitCh: + transmissions = append(transmissions, t) + if len(transmissions) >= buff { + err = mt.transmit(ctx, transmissions) + transmissions = make([]*Transmission, 0, buff) + } + } + + if err != nil { + mt.lggr.Error("Error transmitting records", "error", err) + } + } + }() + } +} diff --git a/core/services/llo/mercurytransmitter/transmitter_test.go b/core/services/llo/mercurytransmitter/transmitter_test.go index 3769a4f247b..fbd6eeacd22 100644 --- a/core/services/llo/mercurytransmitter/transmitter_test.go +++ b/core/services/llo/mercurytransmitter/transmitter_test.go @@ -109,6 +109,7 @@ func Test_Transmitter_Transmit(t *testing.T) { require.NoError(t, mt.servers[sURL].q.Init([]*Transmission{})) require.NoError(t, mt.servers[sURL2].q.Init([]*Transmission{})) require.NoError(t, mt.servers[sURL3].q.Init([]*Transmission{})) + mt.spawnCommitLoops() return nil }) @@ -124,8 +125,11 @@ func Test_Transmitter_Transmit(t *testing.T) { err = mt.Transmit(testutils.Context(t), digest, seqNr, report, sigs) require.NoError(t, err) + // wait for the commit loop to run + time.Sleep(2 * commitInterval) + // ensure it was added to the queue - require.Equal(t, 1, mt.servers[sURL].q.(*transmitQueue).pq.Len()) + require.Equal(t, 1, mt.servers[sURL].q.(*transmitQueue).Len()) assert.Equal(t, &Transmission{ ServerURL: sURL, ConfigDigest: digest, @@ -133,7 +137,7 @@ func Test_Transmitter_Transmit(t *testing.T) { Report: report, Sigs: sigs, }, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission)) - require.Equal(t, 1, mt.servers[sURL2].q.(*transmitQueue).pq.Len()) + require.Equal(t, 1, mt.servers[sURL2].q.(*transmitQueue).Len()) assert.Equal(t, &Transmission{ ServerURL: sURL2, ConfigDigest: digest, @@ -141,7 +145,7 @@ func Test_Transmitter_Transmit(t *testing.T) { Report: report, Sigs: sigs, }, mt.servers[sURL2].q.(*transmitQueue).pq.Pop().(*Transmission)) - require.Equal(t, 1, mt.servers[sURL3].q.(*transmitQueue).pq.Len()) + require.Equal(t, 1, mt.servers[sURL3].q.(*transmitQueue).Len()) assert.Equal(t, &Transmission{ ServerURL: sURL3, ConfigDigest: digest,