diff --git a/src/db/mod.rs b/src/db/mod.rs index 521874c..cba1437 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,6 +1,7 @@ mod migrations; pub mod schema; pub mod writer; +use std::collections::HashSet; pub use migrations::run as setup_db; use serde::Deserialize; @@ -12,7 +13,7 @@ use crate::{ schema::LogRangeParams, stream::{FlushBufferResponse, LogsStream}, }, - web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, UserLogsStats}, + web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, PreviousName, UserLogsStats}, Result, }; use chrono::{DateTime, Datelike, Duration, Utc}; @@ -20,6 +21,7 @@ use clickhouse::{query::RowCursor, Client, Row}; use rand::{seq::IteratorRandom, thread_rng}; use schema::StructuredMessage; use tracing::debug; +use futures::future::try_join_all; const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14; @@ -381,6 +383,63 @@ pub async fn get_user_stats( }) } +pub async fn get_user_name_history( + db: &Client, + user_id: &str, +) -> Result> { + #[derive(Deserialize, Row)] + struct SingleNameHistory { + pub last_timestamp: i32, + pub first_timestamp: i32, + } + + let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); + let name_query = db.query(&name_query).bind(user_id); + let distinct_logins = name_query.fetch_all::().await?; + if distinct_logins.is_empty() { + return Ok(vec![]); + } + + let sanitized_user_logins = distinct_logins + .iter() + .map(|login| login.trim_start_matches(':').to_owned()); + + let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); + + let name_history_rows = try_join_all(sanitized_user_logins.into_iter().map(|login| { + let query = history_query.clone(); + async move { + let query = db.query(&query).bind(user_id).bind(&login); + query + .fetch_one::() + .await + .map(|history| (login, history)) + } + })) + .await?; + + let mut seen_logins = HashSet::new(); + + let names = name_history_rows + .into_iter() + .filter_map(|(login, history)| { + if seen_logins.insert(login.clone()) { + Some(PreviousName { + user_login: login, + last_timestamp: DateTime::from_timestamp(history.last_timestamp.into(), 0) + .expect("Invalid DateTime"), + first_timestamp: DateTime::from_timestamp(history.first_timestamp.into(), 0) + .expect("Invalid DateTime"), + }) + } else { + None + } + }) + .collect(); + + Ok(names) +} + fn apply_limit_offset(query: &mut String, buffer_response: &FlushBufferResponse) { if let Some(limit) = buffer_response.normalized_limit() { *query = format!("{query} LIMIT {limit}"); diff --git a/src/web/handlers.rs b/src/web/handlers.rs index f2e33a6..4df120f 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -3,7 +3,7 @@ use super::{ schema::{ AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsByDatePath, ChannelLogsStats, ChannelParam, ChannelsList, LogsParams, LogsPathChannel, SearchParams, - UserLogPathParams, UserLogsPath, UserLogsStats, UserParam, + UserLogPathParams, UserLogsPath, UserLogsStats, UserParam, UserNameHistoryParam }, }; use crate::{ @@ -505,6 +505,20 @@ async fn search_user_logs( Ok(logs) } + +pub async fn get_user_name_history( + app: State, + Path(UserNameHistoryParam { + user_id, + }): Path, +) -> Result { + app.check_opted_out(&user_id, None)?; + + let names = db::get_user_name_history(&app.db,&user_id).await?; + + Ok(Json(names)) +} + pub async fn optout(app: State) -> Json { let mut rng = thread_rng(); let optout_code: String = (0..5).map(|_| rng.sample(Alphanumeric) as char).collect(); diff --git a/src/web/mod.rs b/src/web/mod.rs index ba557cc..98d387e 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -167,6 +167,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender, + #[schemars(with = "String")] + pub first_timestamp: DateTime, +}