Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod migrations;
pub mod schema;
pub mod writer;
use std::collections::HashSet;

pub use migrations::run as setup_db;
use serde::Deserialize;
Expand All @@ -12,14 +13,15 @@ use crate::{
schema::LogRangeParams,
stream::{FlushBufferResponse, LogsStream},
},
web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, UserLogsStats},
web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, PreviousName, UserLogsStats},
Result,
};
use chrono::{DateTime, Datelike, Duration, Utc};
use clickhouse::{query::RowCursor, Client, Row};
use rand::{seq::IteratorRandom, thread_rng};
use schema::StructuredMessage;
use tracing::debug;
use futures::future::try_join_all;

const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14;

Expand Down Expand Up @@ -381,6 +383,63 @@ pub async fn get_user_stats(
})
}

pub async fn get_user_name_history(
db: &Client,
user_id: &str,
) -> Result<Vec<PreviousName>> {
#[derive(Deserialize, Row)]
struct SingleNameHistory {
pub last_timestamp: i32,
pub first_timestamp: i32,
}

let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned();
let name_query = db.query(&name_query).bind(user_id);
let distinct_logins = name_query.fetch_all::<String>().await?;
if distinct_logins.is_empty() {
return Ok(vec![]);
}

let sanitized_user_logins = distinct_logins
.iter()
.map(|login| login.trim_start_matches(':').to_owned());

let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned();

let name_history_rows = try_join_all(sanitized_user_logins.into_iter().map(|login| {
let query = history_query.clone();
async move {
let query = db.query(&query).bind(user_id).bind(&login);
query
.fetch_one::<SingleNameHistory>()
.await
.map(|history| (login, history))
}
}))
.await?;

let mut seen_logins = HashSet::new();

let names = name_history_rows
.into_iter()
.filter_map(|(login, history)| {
if seen_logins.insert(login.clone()) {
Some(PreviousName {
user_login: login,
last_timestamp: DateTime::from_timestamp(history.last_timestamp.into(), 0)
.expect("Invalid DateTime"),
first_timestamp: DateTime::from_timestamp(history.first_timestamp.into(), 0)
.expect("Invalid DateTime"),
})
} else {
None
}
})
.collect();

Ok(names)
}

fn apply_limit_offset(query: &mut String, buffer_response: &FlushBufferResponse) {
if let Some(limit) = buffer_response.normalized_limit() {
*query = format!("{query} LIMIT {limit}");
Expand Down
16 changes: 15 additions & 1 deletion src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
schema::{
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsByDatePath,
ChannelLogsStats, ChannelParam, ChannelsList, LogsParams, LogsPathChannel, SearchParams,
UserLogPathParams, UserLogsPath, UserLogsStats, UserParam,
UserLogPathParams, UserLogsPath, UserLogsStats, UserParam, UserNameHistoryParam
},
};
use crate::{
Expand Down Expand Up @@ -505,6 +505,20 @@ async fn search_user_logs(
Ok(logs)
}


pub async fn get_user_name_history(
app: State<App>,
Path(UserNameHistoryParam {
user_id,
}): Path<UserNameHistoryParam>,
) -> Result<impl IntoApiResponse> {
app.check_opted_out(&user_id, None)?;

let names = db::get_user_name_history(&app.db,&user_id).await?;

Ok(Json(names))
}

pub async fn optout(app: State<App>) -> Json<String> {
let mut rng = thread_rng();
let optout_code: String = (0..5).map(|_| rng.sample(Alphanumeric) as char).collect();
Expand Down
6 changes: 6 additions & 0 deletions src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
op.description("Get user stats")
}),
)
.api_route(
"/namehistory/:user_id",
get_with(handlers::get_user_name_history, |op| {
op.description("Get user name history by provided user id")
}),
)
.api_route("/optout", post(handlers::optout))
.api_route("/capabilities", get(capabilities))
.route("/docs", Redoc::new("/openapi.json").axum_route())
Expand Down
15 changes: 15 additions & 0 deletions src/web/schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::responders::logs::{JsonResponseType, LogsResponseType};
use chrono::{DateTime, Utc};
use schemars::JsonSchema;
use serde::{Deserialize, Deserializer, Serialize};
use std::fmt::Display;
Expand Down Expand Up @@ -176,3 +177,17 @@ pub struct UserLogsStats {
pub user_id: String,
pub message_count: u64,
}

#[derive(Deserialize, JsonSchema)]
pub struct UserNameHistoryParam {
pub user_id: String,
}

#[derive(Serialize, JsonSchema)]
pub struct PreviousName {
pub user_login: String,
#[schemars(with = "String")]
pub last_timestamp: DateTime<Utc>,
#[schemars(with = "String")]
pub first_timestamp: DateTime<Utc>,
}