22use std:: sync:: atomic:: { AtomicU64 , AtomicU8 , Ordering } ;
33use std:: {
44 array,
5+ fmt:: Error ,
56 future:: poll_fn,
67 iter,
78 marker:: PhantomData ,
@@ -21,6 +22,10 @@ use loom::{
2122 future:: { block_on, AtomicWaker } ,
2223 sync:: atomic:: { AtomicU64 , AtomicU8 , Ordering } ,
2324} ;
25+ use prometheus_client:: {
26+ encoding:: { EncodeMetric , MetricEncoder , NoLabelSet } ,
27+ metrics:: { MetricType , TypedMetric } ,
28+ } ;
2429
2530pub trait HistogramValue : Add < Output = Self > + PartialOrd + Sized {
2631 const HAS_NAN : bool ;
@@ -38,12 +43,12 @@ impl HistogramValue for u64 {
3843 fn is_nan ( & self ) -> bool {
3944 false
4045 }
41- fn from_bits ( bits : u64 ) -> Self {
42- bits
43- }
4446 fn atomic_add ( counter : & AtomicU64 , value : Self , ordering : Ordering ) {
4547 counter. fetch_add ( value, ordering) ;
4648 }
49+ fn from_bits ( bits : u64 ) -> Self {
50+ bits
51+ }
4752}
4853
4954impl HistogramValue for f64 {
@@ -54,23 +59,23 @@ impl HistogramValue for f64 {
5459 fn is_nan ( & self ) -> bool {
5560 f64:: is_nan ( * self )
5661 }
57- fn from_bits ( bits : u64 ) -> Self {
58- f64:: from_bits ( bits)
59- }
6062 fn atomic_add ( counter : & AtomicU64 , value : Self , ordering : Ordering ) {
6163 counter
6264 . fetch_update ( ordering, Ordering :: Relaxed , |c| {
6365 Some ( f64:: to_bits ( f64:: from_bits ( c) + value) )
6466 } )
6567 . unwrap ( ) ;
6668 }
69+ fn from_bits ( bits : u64 ) -> Self {
70+ f64:: from_bits ( bits)
71+ }
6772}
6873
6974#[ expect( clippy:: len_without_is_empty) ]
7075pub trait HistogramBuckets < T > {
7176 fn len ( & self ) -> usize ;
7277 fn bucket_index ( & self , value : & T ) -> Option < usize > ;
73- fn iter ( & self ) -> impl Iterator < Item = f64 > + ' _ ;
78+ fn iter ( & self ) -> impl Iterator < Item = f64 > ;
7479}
7580
7681impl < T : HistogramValue + Clone + ' static , B : AsRef < [ T ] > > HistogramBuckets < T > for B {
@@ -82,7 +87,7 @@ impl<T: HistogramValue + Clone + 'static, B: AsRef<[T]>> HistogramBuckets<T> for
8287 self . as_ref ( ) . iter ( ) . position ( |b| value <= b)
8388 }
8489
85- fn iter ( & self ) -> impl Iterator < Item = f64 > + ' _ {
90+ fn iter ( & self ) -> impl Iterator < Item = f64 > {
8691 self . as_ref ( ) . iter ( ) . cloned ( ) . map ( T :: into_f64)
8792 }
8893}
@@ -114,7 +119,7 @@ impl<T: HistogramValue, B: HistogramBuckets<T>> Histogram<T, B> {
114119 self . 0 . shards [ hot_shard] . observe ( value, bucket_index, & self . 0 . waker ) ;
115120 }
116121
117- pub fn collect ( & self ) -> ( u64 , f64 , Vec < ( f64 , u64 ) > ) {
122+ pub fn collect ( & self ) -> ( u64 , f64 , impl Iterator < Item = ( f64 , u64 ) > ) {
118123 let _guard = self . 0 . collector . lock ( ) . unwrap ( ) ;
119124 let hot_shard = self . 0 . hot_shard . load ( Ordering :: Relaxed ) as usize ;
120125 let cold_shard = hot_shard ^ 1 ;
@@ -126,8 +131,7 @@ impl<T: HistogramValue, B: HistogramBuckets<T>> Histogram<T, B> {
126131 self . 0 . shards [ hot_shard] . collect ( bucket_count, & self . 0 . waker ) ;
127132 let buckets = ( self . 0 . buckets . iter ( ) . chain ( [ f64:: INFINITY ] ) )
128133 . zip ( iter:: zip ( buckets_cold, buckets_hot) )
129- . map ( |( b, ( cold, hot) ) | ( b, cold + hot) )
130- . collect ( ) ;
134+ . map ( |( b, ( cold, hot) ) | ( b, cold + hot) ) ;
131135 ( count_cold + count_hot, sum_cold + sum_hot, buckets)
132136 }
133137}
@@ -256,11 +260,27 @@ impl<T: HistogramValue> Shard<T> {
256260 }
257261}
258262
263+ impl < T , B > TypedMetric for Histogram < T , B > {
264+ const TYPE : MetricType = MetricType :: Histogram ;
265+ }
266+
267+ impl < T : HistogramValue , B : HistogramBuckets < T > > EncodeMetric for Histogram < T , B > {
268+ fn encode ( & self , mut encoder : MetricEncoder ) -> Result < ( ) , Error > {
269+ let ( count, sum, buckets) = self . collect ( ) ;
270+ encoder. encode_histogram :: < NoLabelSet > ( sum, count, & buckets. collect :: < Vec < _ > > ( ) , None )
271+ }
272+
273+ fn metric_type ( & self ) -> MetricType {
274+ Self :: TYPE
275+ }
276+ }
277+
259278#[ cfg( test) ]
260279mod tests {
261280 #[ cfg( not( loom) ) ]
262281 use std:: thread;
263282
283+ use itertools:: Itertools ;
264284 #[ cfg( loom) ]
265285 use loom:: { model, thread} ;
266286
@@ -278,7 +298,10 @@ mod tests {
278298 let h1 = histogram. clone ( ) ;
279299 let h2 = histogram. clone ( ) ;
280300 let t1 = thread:: spawn ( move || h1. observe ( 42 ) ) ;
281- let t2 = thread:: spawn ( move || h2. collect ( ) ) ;
301+ let t2 = thread:: spawn ( move || {
302+ let ( count, sum, buckets) = h2. collect ( ) ;
303+ ( count, sum, buckets. collect_vec ( ) )
304+ } ) ;
282305 histogram. observe ( 7 ) ;
283306 histogram. observe ( 80100 ) ;
284307 t1. join ( ) . unwrap ( ) ;
@@ -296,7 +319,10 @@ mod tests {
296319 let ( count, sum, buckets) = histogram. collect ( ) ;
297320 assert_eq ! ( count, 3 ) ;
298321 assert_eq ! ( sum, 80149.0 ) ;
299- assert_eq ! ( buckets, vec![ ( 10.0 , 1 ) , ( 100.0 , 1 ) , ( f64 :: INFINITY , 1 ) ] ) ;
322+ assert_eq ! (
323+ buckets. collect_vec( ) ,
324+ vec![ ( 10.0 , 1 ) , ( 100.0 , 1 ) , ( f64 :: INFINITY , 1 ) ]
325+ ) ;
300326 } ) ;
301327 }
302328
@@ -330,10 +356,11 @@ mod tests {
330356 fn observe_inf ( ) {
331357 let histogram = Histogram :: new ( [ 1.0 ] ) ;
332358 histogram. observe ( f64:: INFINITY ) ;
333- assert_eq ! (
334- histogram. collect( ) ,
335- ( 1 , f64 :: INFINITY , vec![ ( 1.0 , 0 ) , ( f64 :: INFINITY , 1 ) ] )
336- ) ;
359+ histogram. observe ( 1.0 ) ;
360+ let ( count, sum, buckets) = histogram. collect ( ) ;
361+ assert_eq ! ( count, 1 ) ;
362+ assert_eq ! ( sum, f64 :: INFINITY ) ;
363+ assert_eq ! ( buckets. collect_vec( ) , vec![ ( 1.0 , 1 ) , ( f64 :: INFINITY , 1 ) ] ) ;
337364 }
338365
339366 #[ cfg( not( loom) ) ]
@@ -344,6 +371,6 @@ mod tests {
344371 let ( count, sum, buckets) = histogram. collect ( ) ;
345372 assert_eq ! ( count, 1 ) ;
346373 assert ! ( sum. is_nan( ) ) ;
347- assert_eq ! ( buckets, vec![ ( 1.0 , 0 ) , ( f64 :: INFINITY , 0 ) ] ) ;
374+ assert_eq ! ( buckets. collect_vec ( ) , vec![ ( 1.0 , 0 ) , ( f64 :: INFINITY , 0 ) ] ) ;
348375 }
349376}
0 commit comments