11mod migrations;
22pub mod schema;
33pub mod writer;
4+ use std:: collections:: HashSet ;
45
56pub use migrations:: run as setup_db;
67use serde:: Deserialize ;
@@ -12,14 +13,15 @@ use crate::{
1213 schema:: LogRangeParams ,
1314 stream:: { FlushBufferResponse , LogsStream } ,
1415 } ,
15- web:: schema:: { AvailableLogDate , ChannelLogsStats , LogsParams , UserLogsStats } ,
16+ web:: schema:: { AvailableLogDate , ChannelLogsStats , LogsParams , PreviousName , UserLogsStats } ,
1617 Result ,
1718} ;
1819use chrono:: { DateTime , Datelike , Duration , Utc } ;
1920use clickhouse:: { query:: RowCursor , Client , Row } ;
2021use rand:: { seq:: IteratorRandom , thread_rng} ;
2122use schema:: StructuredMessage ;
2223use tracing:: debug;
24+ use futures:: future:: try_join_all;
2325
2426const CHANNEL_MULTI_QUERY_SIZE_DAYS : i64 = 14 ;
2527
@@ -381,6 +383,63 @@ pub async fn get_user_stats(
381383 } )
382384}
383385
386+ pub async fn get_user_name_history (
387+ db : & Client ,
388+ user_id : & str ,
389+ ) -> Result < Vec < PreviousName > > {
390+ #[ derive( Deserialize , Row ) ]
391+ struct SingleNameHistory {
392+ pub last_timestamp : i32 ,
393+ pub first_timestamp : i32 ,
394+ }
395+
396+ let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600" . to_owned ( ) ;
397+ let name_query = db. query ( & name_query) . bind ( user_id) ;
398+ let distinct_logins = name_query. fetch_all :: < String > ( ) . await ?;
399+ if distinct_logins. is_empty ( ) {
400+ return Ok ( vec ! [ ] ) ;
401+ }
402+
403+ let sanitized_user_logins = distinct_logins
404+ . iter ( )
405+ . map ( |login| login. trim_start_matches ( ':' ) . to_owned ( ) ) ;
406+
407+ let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600" . to_owned ( ) ;
408+
409+ let name_history_rows = try_join_all ( sanitized_user_logins. into_iter ( ) . map ( |login| {
410+ let query = history_query. clone ( ) ;
411+ async move {
412+ let query = db. query ( & query) . bind ( user_id) . bind ( & login) ;
413+ query
414+ . fetch_one :: < SingleNameHistory > ( )
415+ . await
416+ . map ( |history| ( login, history) )
417+ }
418+ } ) )
419+ . await ?;
420+
421+ let mut seen_logins = HashSet :: new ( ) ;
422+
423+ let names = name_history_rows
424+ . into_iter ( )
425+ . filter_map ( |( login, history) | {
426+ if seen_logins. insert ( login. clone ( ) ) {
427+ Some ( PreviousName {
428+ user_login : login,
429+ last_timestamp : DateTime :: from_timestamp ( history. last_timestamp . into ( ) , 0 )
430+ . expect ( "Invalid DateTime" ) ,
431+ first_timestamp : DateTime :: from_timestamp ( history. first_timestamp . into ( ) , 0 )
432+ . expect ( "Invalid DateTime" ) ,
433+ } )
434+ } else {
435+ None
436+ }
437+ } )
438+ . collect ( ) ;
439+
440+ Ok ( names)
441+ }
442+
384443fn apply_limit_offset ( query : & mut String , buffer_response : & FlushBufferResponse ) {
385444 if let Some ( limit) = buffer_response. normalized_limit ( ) {
386445 * query = format ! ( "{query} LIMIT {limit}" ) ;
0 commit comments