diff --git a/todored/src/routes/board/ws.rs b/todored/src/routes/board/ws.rs index 8f8a03f..1aac14a 100644 --- a/todored/src/routes/board/ws.rs +++ b/todored/src/routes/board/ws.rs @@ -2,6 +2,8 @@ use axum::extract::ws::{CloseCode, CloseFrame, Message, WebSocket}; use std::sync::Arc; use deadqueue::unlimited::Queue; use futures_util::StreamExt; +use uuid::Uuid; +use crate::task::BoardChange; use super::{redis_xread, redis_xadd, ws_receive, ws_send}; pub async fn handler( @@ -9,6 +11,16 @@ pub async fn handler( rclient: redis::Client, websocket: WebSocket, ) { + log::trace!("Creating Redis connection for the main thread..."); + let main_redis = rclient.get_async_connection().await; + if main_redis.is_err() { + log::error!("Could not open Redis connection for the main thread."); + let _ = websocket.close().await; + return; + } + let mut main_redis = main_redis.unwrap(); + log::trace!("Created Redis connection for the main thread!"); + log::trace!("Creating Redis connection for the XADD thread..."); let xadd_redis = rclient.get_async_connection().await; if xadd_redis.is_err() { @@ -36,6 +48,14 @@ pub async fn handler( let redis_key = format!("board:{{{board_name}}}:stream"); log::trace!("Redis key is: {redis_key:?}"); + log::trace!("Generating client UUID..."); + let client_uuid = Uuid::new_v4(); + log::trace!("Client UUID is: {client_uuid:?}"); + + log::trace!("Notifying clients of the new connection..."); + let _connect_id = BoardChange::Connect(client_uuid).store_in_redis(&mut main_redis, &redis_key).await; + log::trace!("Notified clients of the new connection successfully!"); + log::trace!("Creating synchronization structures..."); let strings_to_process: Arc> = Arc::new(Queue::new()); let messages_to_send: Arc> = Arc::new(Queue::new()); @@ -65,7 +85,7 @@ pub async fn handler( log::trace!("Spawning redis_xread_thread..."); let redis_xread_thread = tokio::spawn(redis_xread::handler( xread_redis, - redis_key, + redis_key.clone(), messages_to_send.clone() )); let redis_xread_abort = redis_xread_thread.abort_handle(); @@ -86,6 +106,10 @@ pub async fn handler( redis_xadd_abort.abort(); redis_xread_abort.abort(); + log::trace!("Notifying clients of the disconnection..."); + let _connect_id = BoardChange::Disconnect(client_uuid).store_in_redis(&mut main_redis, &redis_key).await; + log::trace!("Notified clients of the disconnection successfully!"); + log::trace!("Waiting for the last messages to be sent..."); let _ws_send_join = tokio::join!(ws_send_thread); diff --git a/todored/src/task.rs b/todored/src/task.rs index d268f5e..ae786de 100644 --- a/todored/src/task.rs +++ b/todored/src/task.rs @@ -12,6 +12,10 @@ pub enum BoardChange { Title(String), /// Create, update, or delete the [`Task`] with the given [`Uuid`]. Task(Uuid, Option), + /// Add the given client to the connected clients list. + Connect(Uuid), + /// Remove the given client from the connected clients list. + Disconnect(Uuid), } impl BoardChange {