Skip to content

Commit 61e6a0f

Browse files
committed
Make buckets generic
It allows: - reusable bucket storage, e.g. Arc<[f64]> - optimized bucket storage, e.g. linear/exponential distribution - optimized bucket position computation, e.g. power of 2 buckets
1 parent 3e13aab commit 61e6a0f

File tree

1 file changed

+74
-48
lines changed

1 file changed

+74
-48
lines changed

src/lib.rs

Lines changed: 74 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,41 @@ use loom::{
2222
sync::atomic::{AtomicU64, AtomicU8, Ordering},
2323
};
2424

25-
pub trait HistogramValue: Clone + Add<Output = Self> + PartialOrd + Sized {
26-
const MAX: Self;
27-
fn from_bits(bits: u64) -> Self;
25+
pub trait HistogramValue: Add<Output = Self> + PartialOrd + Sized {
26+
const HAS_NAN: bool;
27+
fn into_f64(self) -> f64;
28+
fn is_nan(&self) -> bool;
2829
fn atomic_add(counter: &AtomicU64, value: Self, ordering: Ordering);
30+
fn from_bits(bits: u64) -> Self;
2931
}
3032

3133
impl HistogramValue for u64 {
32-
const MAX: Self = u64::MAX;
34+
const HAS_NAN: bool = false;
35+
fn into_f64(self) -> f64 {
36+
self as f64
37+
}
38+
fn is_nan(&self) -> bool {
39+
false
40+
}
3341
fn from_bits(bits: u64) -> Self {
3442
bits
3543
}
36-
3744
fn atomic_add(counter: &AtomicU64, value: Self, ordering: Ordering) {
3845
counter.fetch_add(value, ordering);
3946
}
4047
}
4148

4249
impl HistogramValue for f64 {
43-
const MAX: Self = f64::INFINITY;
50+
const HAS_NAN: bool = true;
51+
fn into_f64(self) -> f64 {
52+
self
53+
}
54+
fn is_nan(&self) -> bool {
55+
f64::is_nan(*self)
56+
}
4457
fn from_bits(bits: u64) -> Self {
4558
f64::from_bits(bits)
4659
}
47-
4860
fn atomic_add(counter: &AtomicU64, value: Self, ordering: Ordering) {
4961
counter
5062
.fetch_update(ordering, Ordering::Relaxed, |c| {
@@ -54,16 +66,37 @@ impl HistogramValue for f64 {
5466
}
5567
}
5668

69+
#[expect(clippy::len_without_is_empty)]
70+
pub trait HistogramBuckets<T> {
71+
fn len(&self) -> usize;
72+
fn bucket_index(&self, value: &T) -> Option<usize>;
73+
fn iter(&self) -> impl Iterator<Item = f64> + '_;
74+
}
75+
76+
impl<T: HistogramValue + Clone + 'static, B: AsRef<[T]>> HistogramBuckets<T> for B {
77+
fn len(&self) -> usize {
78+
self.as_ref().len()
79+
}
80+
81+
fn bucket_index(&self, value: &T) -> Option<usize> {
82+
self.as_ref().iter().position(|b| value <= b)
83+
}
84+
85+
fn iter(&self) -> impl Iterator<Item = f64> + '_ {
86+
self.as_ref().iter().cloned().map(T::into_f64)
87+
}
88+
}
89+
5790
#[derive(Debug)]
58-
pub struct Histogram<T = f64>(Arc<HistogramInner<T>>);
59-
60-
impl<T: HistogramValue> Histogram<T> {
61-
pub fn new(buckets: impl IntoIterator<Item = T>) -> Self {
62-
let buckets = buckets
63-
.into_iter()
64-
.chain(iter::once(T::MAX))
65-
.collect::<Vec<_>>();
66-
let shards = array::from_fn(|_| Shard::new(buckets.len()));
91+
pub struct Histogram<T = f64, B = Vec<T>>(Arc<HistogramInner<T, B>>);
92+
93+
impl<T: HistogramValue, B: HistogramBuckets<T>> Histogram<T, B> {
94+
fn bucket_count(buckets: &B) -> usize {
95+
buckets.len() + /* inf */ 1 + /* nan */ if T::HAS_NAN { 1 } else { 0 }
96+
}
97+
98+
pub fn new(buckets: B) -> Self {
99+
let shards = array::from_fn(|_| Shard::new(Self::bucket_count(&buckets)));
67100
Self(Arc::new(HistogramInner {
68101
buckets,
69102
hot_shard: AtomicU8::new(0),
@@ -74,38 +107,40 @@ impl<T: HistogramValue> Histogram<T> {
74107
}
75108

76109
pub fn observe(&self, value: T) {
77-
let bucket_index = self.0.buckets.iter().position(|b| value <= *b);
110+
let buckets = &self.0.buckets;
111+
let fallback_bucket = || buckets.len() + if value.is_nan() { 1 } else { 0 };
112+
let bucket_index = buckets.bucket_index(&value).unwrap_or_else(fallback_bucket);
78113
let hot_shard = self.0.hot_shard.load(Ordering::Relaxed) as usize;
79-
self.0.shards[hot_shard].observe(value, bucket_index, self.0.buckets.len(), &self.0.waker);
114+
self.0.shards[hot_shard].observe(value, bucket_index, &self.0.waker);
80115
}
81116

82-
pub fn collect(&self) -> (u64, T, Vec<(T, u64)>) {
117+
pub fn collect(&self) -> (u64, f64, Vec<(f64, u64)>) {
83118
let _guard = self.0.collector.lock().unwrap();
84119
let hot_shard = self.0.hot_shard.load(Ordering::Relaxed) as usize;
85120
let cold_shard = hot_shard ^ 1;
86-
let bucket_count = self.0.buckets.len();
121+
let bucket_count = Self::bucket_count(&self.0.buckets);
87122
let (count_cold, sum_cold, buckets_cold) =
88123
self.0.shards[cold_shard].collect(bucket_count, &self.0.waker);
89124
self.0.hot_shard.store(cold_shard as u8, Ordering::Relaxed);
90125
let (count_hot, sum_hot, buckets_hot) =
91126
self.0.shards[hot_shard].collect(bucket_count, &self.0.waker);
92-
let buckets = (self.0.buckets.iter())
127+
let buckets = (self.0.buckets.iter().chain([f64::INFINITY]))
93128
.zip(iter::zip(buckets_cold, buckets_hot))
94-
.map(|(b, (cold, hot))| (b.clone(), cold + hot))
129+
.map(|(b, (cold, hot))| (b, cold + hot))
95130
.collect();
96131
(count_cold + count_hot, sum_cold + sum_hot, buckets)
97132
}
98133
}
99134

100-
impl<T> Clone for Histogram<T> {
135+
impl<T, B> Clone for Histogram<T, B> {
101136
fn clone(&self) -> Self {
102137
Self(self.0.clone())
103138
}
104139
}
105140

106141
#[derive(Debug)]
107-
struct HistogramInner<T> {
108-
buckets: Vec<T>,
142+
struct HistogramInner<T, B> {
143+
buckets: B,
109144
hot_shard: AtomicU8,
110145
shards: [Shard<T>; 2],
111146
collector: Mutex<()>,
@@ -134,8 +169,7 @@ struct Shard<T> {
134169
impl<T: HistogramValue> Shard<T> {
135170
fn new(bucket_count: usize) -> Self {
136171
Self {
137-
// `+ 3` for _count, _sum and `NaN` bucket
138-
counters: (0..(bucket_count + 3).div_ceil(COUNTERS_PER_CACHE_LINE))
172+
counters: (0..(/* _count + _sum */2 + bucket_count).div_ceil(COUNTERS_PER_CACHE_LINE))
139173
.map(|_| CachePadded::new(array::from_fn(|_| AtomicU64::new(0))))
140174
.collect(),
141175
_phantom: PhantomData,
@@ -163,15 +197,8 @@ impl<T: HistogramValue> Shard<T> {
163197
.take(bucket_count)
164198
}
165199

166-
fn observe(
167-
&self,
168-
value: T,
169-
bucket_index: Option<usize>,
170-
bucket_count: usize,
171-
waker: &AtomicWaker,
172-
) {
173-
self.bucket(bucket_index.unwrap_or(bucket_count))
174-
.fetch_add(1, Ordering::Relaxed);
200+
fn observe(&self, value: T, bucket_index: usize, waker: &AtomicWaker) {
201+
self.bucket(bucket_index).fetch_add(1, Ordering::Relaxed);
175202
T::atomic_add(self.sum(), value, Ordering::Release);
176203
let count = self.count().fetch_add(1, Ordering::Release);
177204
if count & WAITING_FLAG != 0 {
@@ -183,19 +210,18 @@ impl<T: HistogramValue> Shard<T> {
183210
}
184211
}
185212

186-
fn read_sum_and_buckets(&self, buckets: &mut [u64]) -> (T, u64) {
213+
fn read_sum_and_buckets(&self, buckets: &mut [u64]) -> (f64, u64) {
187214
let bucket_count = buckets.len();
188-
let sum = T::from_bits(self.sum().load(Ordering::Acquire));
215+
let sum = T::from_bits(self.sum().load(Ordering::Acquire)).into_f64();
189216
let mut expected_count = 0;
190217
for (count, counter) in buckets.iter_mut().zip(self.buckets(bucket_count)) {
191218
*count = counter.load(Ordering::Relaxed);
192219
expected_count += *count;
193220
}
194-
expected_count += self.bucket(bucket_count).load(Ordering::Relaxed);
195221
(sum, expected_count)
196222
}
197223

198-
fn collect(&self, bucket_count: usize, waker: &AtomicWaker) -> (u64, T, Vec<u64>) {
224+
fn collect(&self, bucket_count: usize, waker: &AtomicWaker) -> (u64, f64, Vec<u64>) {
199225
let mut buckets = vec![0; bucket_count];
200226
for _ in 0..SPIN_LOOP_LIMIT {
201227
let count = self.count().load(Ordering::Acquire) & COUNT_MASK;
@@ -208,7 +234,7 @@ impl<T: HistogramValue> Shard<T> {
208234
}
209235

210236
#[cold]
211-
fn collect_cold(&self, buckets: &mut Vec<u64>, waker: &AtomicWaker) -> (u64, T, Vec<u64>) {
237+
fn collect_cold(&self, buckets: &mut Vec<u64>, waker: &AtomicWaker) -> (u64, f64, Vec<u64>) {
212238
block_on(poll_fn(move |cx| {
213239
#[cfg(not(loom))]
214240
waker.register(cx.waker());
@@ -258,19 +284,19 @@ mod tests {
258284
t1.join().unwrap();
259285
let (count, sum, buckets) = t2.join().unwrap();
260286
assert!(count <= 3);
261-
let [(10, b0), (100, b1), (u64::MAX, b2)] = buckets[..] else {
287+
let [(10.0, b0), (100.0, b1), (f64::INFINITY, b2)] = buckets[..] else {
262288
unreachable!()
263289
};
264290
assert_eq!(count, b0 + b1 + b2);
265-
let if_bucket = |b, v| if b == 1 { v } else { 0 };
291+
let if_bucket = |b, v| if b == 1 { v } else { 0.0 };
266292
assert_eq!(
267293
sum,
268-
if_bucket(b0, 7) + if_bucket(b1, 42) + if_bucket(b2, 80100)
294+
if_bucket(b0, 7.0) + if_bucket(b1, 42.0) + if_bucket(b2, 80100.0)
269295
);
270296
let (count, sum, buckets) = histogram.collect();
271297
assert_eq!(count, 3);
272-
assert_eq!(sum, 80149);
273-
assert_eq!(buckets, vec![(10, 1), (100, 1), (u64::MAX, 1)]);
298+
assert_eq!(sum, 80149.0);
299+
assert_eq!(buckets, vec![(10.0, 1), (100.0, 1), (f64::INFINITY, 1)]);
274300
});
275301
}
276302

@@ -289,12 +315,12 @@ mod tests {
289315
let (_, sum1, _) = histogram.collect();
290316
let (_, sum2, _) = histogram.collect();
291317
edge_case2.fetch_or(
292-
sum1 == 0 && sum2 == 42,
318+
sum1 == 0.0 && sum2 == 42.0,
293319
std::sync::atomic::Ordering::Relaxed,
294320
);
295321
t1.join().unwrap();
296322
let (_, sum, _) = histogram.collect();
297-
assert_eq!(sum, 49);
323+
assert_eq!(sum, 49.0);
298324
});
299325
assert!(edge_case.load(std::sync::atomic::Ordering::Relaxed));
300326
}

0 commit comments

Comments
 (0)