Skip to content
4 changes: 4 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
},
decodingConcurrency: decodingConcurrency,
selectorBatchSize: selectorBatchSize,
maxSamplesPerQuery: opts.MaxSamples,
}
}

Expand Down Expand Up @@ -227,6 +228,7 @@ type Engine struct {
selectorBatchSize int64
enableAnalysis bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
maxSamplesPerQuery int
}

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down Expand Up @@ -444,7 +446,9 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
EnableAnalysis: e.enableAnalysis,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
SampleTracker: query.NewSampleTracker(e.maxSamplesPerQuery),
}

if opts == nil {
return res
}
Expand Down
150 changes: 150 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4794,6 +4794,156 @@ func TestQueryTimeout(t *testing.T) {
testutil.Equals(t, context.DeadlineExceeded, res.Err)
}

func TestMaxSamples(t *testing.T) {
t.Parallel()

t.Run("max_samples with rate function", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// Create 1000 series with samples every 15s for 5 minutes
for i := range 1000 {
for ts := int64(0); ts <= 300; ts += 15 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

// With 1000 series and a 2m window, rate() will keep ~8 samples per series in memory
// = ~8000 samples total
query := `rate(test_metric[2m])`
start := time.Unix(120, 0)
end := time.Unix(300, 0)
step := 30 * time.Second

t.Run("exceeds limit", func(t *testing.T) {
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 5000, // Lower than ~8000 expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err, "expected max_samples error")
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("within limit", func(t *testing.T) {
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 50000, // Higher than ~8000 expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.NoError(t, res.Err)
})
})

t.Run("max_samples with vector selector", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// 10000 series, each step will have 10000 samples in memory
for i := range 10000 {
for ts := int64(0); ts <= 300; ts += 30 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

query := `test_metric`
start := time.Unix(0, 0)
end := time.Unix(60, 0)
step := 30 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 5000, // Lower than 10000 series per step
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err)
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("max_samples with subquery", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// 1000 series with subquery that accumulates samples
for i := range 1000 {
for ts := int64(0); ts <= 600; ts += 15 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

// Subquery with 2m range and 30s step = 5 steps per evaluation
// With 1000 series, that's ~5000 samples in ring buffer
query := `sum_over_time(test_metric[2m:30s])`
start := time.Unix(120, 0)
end := time.Unix(300, 0)
step := 60 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1000, // Lower than expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err)
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("max_samples disabled by default", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
for i := range 100 {
for ts := int64(0); ts < 300; ts += 30 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

query := `rate(test_metric[1m])`
start := time.Unix(0, 0)
end := time.Unix(300, 0)
step := 30 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.NoError(t, res.Err)
})
}

type hintRecordingQuerier struct {
storage.Querier
mux sync.Mutex
Expand Down
49 changes: 49 additions & 0 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/prometheus/prometheus/model/labels"
)

const sampleLimitCheckPercentage = 0.05

type subqueryOperator struct {
next model.VectorOperator
paramOp model.VectorOperator
Expand Down Expand Up @@ -52,6 +54,9 @@ type subqueryOperator struct {
paramBuf []model.StepVector
param2Buf []model.StepVector
tempBuf []model.StepVector

currentTrackedSamples int
lastTrackedSamples int
}

func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
Expand Down Expand Up @@ -150,6 +155,9 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
for _, b := range o.buffers {
b.Reset(mint, maxt+o.subQuery.Offset.Milliseconds())
}
o.currentTrackedSamples = 0
o.lastTrackedSamples = 0
checkSampleLimitCounter := 0
if len(o.lastVectors) > 0 {
for _, v := range o.lastVectors[o.lastCollected+1:] {
if v.T > maxt {
Expand Down Expand Up @@ -184,6 +192,18 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
o.collect(vector, mint)
}

checkSampleLimitCounter++
if o.shouldCheckSampleLimit(checkSampleLimitCounter) {
if err := o.checkSampleLimit(); err != nil {
return 0, err
}
checkSampleLimitCounter = 0
}
}
if checkSampleLimitCounter > 0 {
if err := o.checkSampleLimit(); err != nil {
return 0, err
}
}

buf[n].Reset(o.currentStep)
Expand All @@ -210,6 +230,15 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
return n, nil
}

func (o *subqueryOperator) checkSampleLimit() error {
delta := o.currentTrackedSamples - o.lastTrackedSamples
if delta > 0 {
o.opts.SampleTracker.Add(delta)
}
o.lastTrackedSamples = o.currentTrackedSamples
return o.opts.SampleTracker.CheckLimit()
}

func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
if v.T < mint {
return
Expand All @@ -220,6 +249,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
continue
}
buffer.Push(v.T, ringbuffer.Value{F: s})
o.currentTrackedSamples++
}
for i, s := range v.Histograms {
buffer := o.buffers[v.HistogramIDs[i]]
Expand All @@ -245,6 +275,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
s.CounterResetHint = histogram.UnknownCounterReset
}
buffer.Push(v.T, ringbuffer.Value{H: s})
o.currentTrackedSamples += telemetry.CalculateHistogramSampleCount(s)
}

}
Expand Down Expand Up @@ -291,3 +322,21 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error {
})
return err
}

func (o *subqueryOperator) shouldCheckSampleLimit(checkSampleLimitCounter int) bool {
if len(o.series) == 0 {
return checkSampleLimitCounter >= 1
}

limit := o.opts.SampleTracker.Limit()
targetSamplesPerCheck := int(float64(limit) * sampleLimitCheckPercentage)

maxSamplesPerCall := len(o.series) * o.stepsBatch
if maxSamplesPerCall == 0 {
return checkSampleLimitCounter >= 1
}

interval := max(targetSamplesPerCheck/maxSamplesPerCall, 1)

return checkSampleLimitCounter >= interval
}
5 changes: 5 additions & 0 deletions query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Options struct {
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
EnableAnalysis bool
DecodingConcurrency int
SampleTracker SampleTracker // Tracks current samples in memory
}

// TotalSteps returns the total number of steps in the query, regardless of batching.
Expand Down Expand Up @@ -57,6 +58,10 @@ func NestedOptionsForSubquery(opts *Options, step, queryRange, offset time.Durat
NoStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
EnableAnalysis: opts.EnableAnalysis,
DecodingConcurrency: opts.DecodingConcurrency,
SampleTracker: opts.SampleTracker,
}
if nOpts.SampleTracker == nil {
nOpts.SampleTracker = NewSampleTracker(0)
}
if step != 0 {
nOpts.Step = step
Expand Down
67 changes: 67 additions & 0 deletions query/sample_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package query

import (
"fmt"
"math"
"sync/atomic"
)

type SampleTracker interface {
Add(count int)
Remove(count int)
CheckLimit() error
Limit() int64
}

type sampleTracker struct {
current atomic.Int64
limit int64
}

func NewSampleTracker(maxSamples int) SampleTracker {
if maxSamples <= 0 {
return nopSampleTracker{}
}
return &sampleTracker{
limit: int64(maxSamples),
}
}

func (st *sampleTracker) Add(count int) {
st.current.Add(int64(count))
}

func (st *sampleTracker) Remove(count int) {
st.current.Add(-int64(count))
}

func (st *sampleTracker) CheckLimit() error {
current := st.current.Load()
if current > st.limit {
return ErrMaxSamplesExceeded{Current: current, Limit: st.limit}
}
return nil
}

func (st *sampleTracker) Limit() int64 {
return st.limit
}

type nopSampleTracker struct{}

func (nopSampleTracker) Add(int) {}
func (nopSampleTracker) Remove(int) {}
func (nopSampleTracker) CheckLimit() error { return nil }
func (nopSampleTracker) Limit() int64 { return math.MaxInt64 }

type ErrMaxSamplesExceeded struct {
Current int64
Limit int64
}

func (e ErrMaxSamplesExceeded) Error() string {
return fmt.Sprintf("query processing would load too many samples into memory: current=%d, limit=%d", e.Current, e.Limit)
}
Loading
Loading