Skip to content

Commit 8fa9508

Browse files
committed
add noop exemplar reservoir
Signed-off-by: emdneto <9735060+emdneto@users.noreply.github.com>
1 parent 0e31c12 commit 8fa9508

File tree

7 files changed

+254
-4
lines changed

7 files changed

+254
-4
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
from opentelemetry.sdk.metrics import (
18+
AlwaysOffExemplarFilter,
19+
MeterProvider,
20+
TraceBasedExemplarFilter,
21+
)
22+
from opentelemetry.sdk.metrics.export import (
23+
InMemoryMetricReader,
24+
)
25+
26+
27+
@pytest.fixture
28+
def meter_always_off():
29+
reader = InMemoryMetricReader()
30+
provider = MeterProvider(
31+
metric_readers=[reader],
32+
exemplar_filter=AlwaysOffExemplarFilter(),
33+
)
34+
return provider.get_meter("benchmark_always_off")
35+
36+
37+
@pytest.fixture
38+
def meter_trace_based():
39+
reader = InMemoryMetricReader()
40+
provider = MeterProvider(
41+
metric_readers=[reader],
42+
exemplar_filter=TraceBasedExemplarFilter(),
43+
)
44+
return provider.get_meter("benchmark_trace_based")
45+
46+
47+
def test_counter_add_always_off(benchmark, meter_always_off):
48+
"""Benchmark counter.add() with always_off exemplar filter (uses NoOpExemplarReservoir)"""
49+
counter = meter_always_off.create_counter("test_counter_always_off")
50+
labels = {"key": "value"}
51+
52+
def counter_add():
53+
counter.add(1, labels)
54+
55+
benchmark(counter_add)
56+
57+
58+
def test_counter_add_trace_based(benchmark, meter_trace_based):
59+
"""Benchmark counter.add() with trace_based exemplar filter"""
60+
counter = meter_trace_based.create_counter("test_counter_trace_based")
61+
labels = {"key": "value"}
62+
63+
def counter_add():
64+
counter.add(1, labels)
65+
66+
benchmark(counter_add)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
Exemplar,
2323
ExemplarFilter,
2424
ExemplarReservoir,
25+
NoOpExemplarReservoir,
2526
SimpleFixedSizeExemplarReservoir,
2627
TraceBasedExemplarFilter,
2728
)
@@ -48,6 +49,7 @@
4849
"Counter",
4950
"Histogram",
5051
"_Gauge",
52+
"NoOpExemplarReservoir",
5153
"ObservableCounter",
5254
"ObservableGauge",
5355
"ObservableUpDownCounter",

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
_Aggregation,
2626
_SumAggregation,
2727
)
28+
from opentelemetry.sdk.metrics._internal.exemplar import (
29+
AlwaysOffExemplarFilter,
30+
ExemplarFilter,
31+
NoOpExemplarReservoir,
32+
)
2833
from opentelemetry.sdk.metrics._internal.export import AggregationTemporality
2934
from opentelemetry.sdk.metrics._internal.measurement import Measurement
3035
from opentelemetry.sdk.metrics._internal.point import DataPointT
@@ -33,12 +38,18 @@
3338
_logger = getLogger(__name__)
3439

3540

41+
def _noop_reservoir_factory(aggregation_type):
42+
"""Reservoir factory that always returns a NoOpExemplarReservoir."""
43+
return NoOpExemplarReservoir
44+
45+
3646
class _ViewInstrumentMatch:
3747
def __init__(
3848
self,
3949
view: View,
4050
instrument: Instrument,
4151
instrument_class_aggregation: Dict[type, Aggregation],
52+
exemplar_filter: Optional[ExemplarFilter] = None,
4253
):
4354
self._view = view
4455
self._instrument = instrument
@@ -49,11 +60,20 @@ def __init__(
4960
self._description = (
5061
self._view._description or self._instrument.description
5162
)
63+
64+
# When the exemplar filter is AlwaysOff, short-circuit to a no-op
65+
# reservoir to avoid allocation and per-measurement overhead.
66+
if isinstance(exemplar_filter, AlwaysOffExemplarFilter):
67+
exemplar_reservoir_factory = _noop_reservoir_factory
68+
else:
69+
exemplar_reservoir_factory = self._view._exemplar_reservoir_factory
70+
self._exemplar_reservoir_factory = exemplar_reservoir_factory
71+
5272
if not isinstance(self._view._aggregation, DefaultAggregation):
5373
self._aggregation = self._view._aggregation._create_aggregation(
5474
self._instrument,
5575
None,
56-
self._view._exemplar_reservoir_factory,
76+
self._exemplar_reservoir_factory,
5777
0,
5878
)
5979
else:
@@ -62,7 +82,7 @@ def __init__(
6282
]._create_aggregation(
6383
self._instrument,
6484
None,
65-
self._view._exemplar_reservoir_factory,
85+
self._exemplar_reservoir_factory,
6686
0,
6787
)
6888

@@ -114,7 +134,7 @@ def consume_measurement(
114134
self._view._aggregation._create_aggregation(
115135
self._instrument,
116136
attributes,
117-
self._view._exemplar_reservoir_factory,
137+
self._exemplar_reservoir_factory,
118138
time_ns(),
119139
)
120140
)
@@ -124,7 +144,7 @@ def consume_measurement(
124144
]._create_aggregation(
125145
self._instrument,
126146
attributes,
127-
self._view._exemplar_reservoir_factory,
147+
self._exemplar_reservoir_factory,
128148
time_ns(),
129149
)
130150
self._attributes_aggregation[aggr_key] = aggregation

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
AlignedHistogramBucketExemplarReservoir,
2424
ExemplarReservoir,
2525
ExemplarReservoirBuilder,
26+
NoOpExemplarReservoir,
2627
SimpleFixedSizeExemplarReservoir,
2728
)
2829

@@ -35,5 +36,6 @@
3536
"AlignedHistogramBucketExemplarReservoir",
3637
"ExemplarReservoir",
3738
"ExemplarReservoirBuilder",
39+
"NoOpExemplarReservoir",
3840
"SimpleFixedSizeExemplarReservoir",
3941
]

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,30 @@ def collect(self, point_attributes: Attributes) -> List[Exemplar]:
8282
)
8383

8484

85+
class NoOpExemplarReservoir(ExemplarReservoir):
86+
"""A no-op reservoir that stores no exemplars.
87+
88+
This reservoir is used when the exemplar filter is ``AlwaysOffExemplarFilter``,
89+
avoiding the overhead of maintaining buckets, locks, and storage when no
90+
exemplars will ever be collected.
91+
"""
92+
93+
def __init__(self, **kwargs) -> None:
94+
pass
95+
96+
def offer(
97+
self,
98+
value: Union[int, float],
99+
time_unix_nano: int,
100+
attributes: Attributes,
101+
context: Context,
102+
) -> None:
103+
pass
104+
105+
def collect(self, point_attributes: Attributes) -> List[Exemplar]:
106+
return []
107+
108+
85109
class ExemplarBucket:
86110
def __init__(self) -> None:
87111
self.__value: Union[int, float] = 0

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def _get_or_init_view_instrument_match(
105105
instrument_class_aggregation=(
106106
self._instrument_class_aggregation
107107
),
108+
exemplar_filter=self._sdk_config.exemplar_filter,
108109
)
109110
)
110111
self._instrument_view_instrument_matches[instrument] = (
@@ -266,6 +267,7 @@ def _handle_view_instrument_match(
266267
instrument_class_aggregation=(
267268
self._instrument_class_aggregation
268269
),
270+
exemplar_filter=self._sdk_config.exemplar_filter,
269271
)
270272

271273
for (
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from time import time_ns
17+
from unittest import TestCase, mock
18+
from unittest.mock import MagicMock
19+
20+
from opentelemetry.context import Context
21+
from opentelemetry.sdk.metrics import MeterProvider
22+
from opentelemetry.sdk.metrics._internal._view_instrument_match import (
23+
_noop_reservoir_factory,
24+
_ViewInstrumentMatch,
25+
)
26+
from opentelemetry.sdk.metrics._internal.aggregation import (
27+
_ExplicitBucketHistogramAggregation,
28+
_SumAggregation,
29+
)
30+
from opentelemetry.sdk.metrics._internal.exemplar import (
31+
AlwaysOffExemplarFilter,
32+
NoOpExemplarReservoir,
33+
TraceBasedExemplarFilter,
34+
)
35+
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
36+
from opentelemetry.sdk.metrics.view import DefaultAggregation, View
37+
38+
39+
class TestNoOpExemplarReservoir(TestCase):
40+
def test_offer_does_nothing(self):
41+
reservoir = NoOpExemplarReservoir()
42+
reservoir.offer(42.0, time_ns(), {"key": "value"}, Context())
43+
44+
def test_collect_returns_empty_list(self):
45+
reservoir = NoOpExemplarReservoir()
46+
reservoir.offer(1.0, time_ns(), {"key": "value"}, Context())
47+
exemplars = reservoir.collect({"key": "value"})
48+
self.assertEqual(exemplars, [])
49+
50+
def test_noop_exemplar_reservoir_kwargs(self):
51+
reservoir = NoOpExemplarReservoir(size=10, boundaries=[0, 5, 10])
52+
self.assertIsInstance(reservoir, NoOpExemplarReservoir)
53+
54+
def test_collect_always_empty_after_multiple_offers(self):
55+
reservoir = NoOpExemplarReservoir()
56+
for i in range(100):
57+
reservoir.offer(float(i), time_ns(), {"i": str(i)}, Context())
58+
self.assertEqual(reservoir.collect({}), [])
59+
60+
61+
class TestNoOpReservoirWithAlwaysOffFilter(TestCase):
62+
def test_view_instrument_match_uses_noop_with_always_off_filter(self):
63+
instrument = MagicMock(name="test_instrument")
64+
instrument.instrumentation_scope = MagicMock()
65+
66+
vim = _ViewInstrumentMatch(
67+
view=View(instrument_name="test_instrument"),
68+
instrument=instrument,
69+
instrument_class_aggregation=MagicMock(
70+
**{"__getitem__.return_value": DefaultAggregation()}
71+
),
72+
exemplar_filter=AlwaysOffExemplarFilter(),
73+
)
74+
75+
self.assertIs(vim._exemplar_reservoir_factory, _noop_reservoir_factory)
76+
77+
def test_view_instrument_match_uses_default_without_always_off(self):
78+
instrument = MagicMock(name="test_instrument")
79+
instrument.instrumentation_scope = MagicMock()
80+
81+
view = View(instrument_name="test_instrument")
82+
vim = _ViewInstrumentMatch(
83+
view=view,
84+
instrument=instrument,
85+
instrument_class_aggregation=MagicMock(
86+
**{"__getitem__.return_value": DefaultAggregation()}
87+
),
88+
exemplar_filter=TraceBasedExemplarFilter(),
89+
)
90+
91+
self.assertIs(
92+
vim._exemplar_reservoir_factory,
93+
view._exemplar_reservoir_factory,
94+
)
95+
96+
def test_noop_reservoir_factory_returns_noop(self):
97+
self.assertIs(
98+
_noop_reservoir_factory(_SumAggregation),
99+
NoOpExemplarReservoir,
100+
)
101+
self.assertIs(
102+
_noop_reservoir_factory(_ExplicitBucketHistogramAggregation),
103+
NoOpExemplarReservoir,
104+
)
105+
106+
def test_meter_provider_always_off_programmatic(self):
107+
reader = InMemoryMetricReader()
108+
provider = MeterProvider(
109+
metric_readers=[reader],
110+
exemplar_filter=AlwaysOffExemplarFilter(),
111+
)
112+
meter = provider.get_meter("test")
113+
counter = meter.create_counter("test_counter")
114+
counter.add(10, {"key": "value"})
115+
116+
data = reader.get_metrics_data()
117+
metrics = data.resource_metrics[0].scope_metrics[0].metrics
118+
data_point = metrics[0].data.data_points[0]
119+
self.assertEqual(data_point.exemplars, [])
120+
121+
@mock.patch.dict(
122+
os.environ, {"OTEL_METRICS_EXEMPLAR_FILTER": "always_off"}
123+
)
124+
def test_meter_provider_always_off_env_var(self):
125+
reader = InMemoryMetricReader()
126+
provider = MeterProvider(metric_readers=[reader])
127+
meter = provider.get_meter("test")
128+
counter = meter.create_counter("test_counter")
129+
counter.add(10, {"key": "value"})
130+
131+
data = reader.get_metrics_data()
132+
metrics = data.resource_metrics[0].scope_metrics[0].metrics
133+
data_point = metrics[0].data.data_points[0]
134+
self.assertEqual(data_point.exemplars, [])

0 commit comments

Comments
 (0)