1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-11-25 17:54:18 +00:00

Implement Connect and Disconnect BoardChanges on Todored

This commit is contained in:
Steffo 2023-08-22 05:09:12 +02:00
parent d4ac9b9cbd
commit 022efc3ef7
Signed by: steffo
GPG key ID: 2A24051445686895
2 changed files with 29 additions and 1 deletions

View file

@ -2,6 +2,8 @@ use axum::extract::ws::{CloseCode, CloseFrame, Message, WebSocket};
use std::sync::Arc; use std::sync::Arc;
use deadqueue::unlimited::Queue; use deadqueue::unlimited::Queue;
use futures_util::StreamExt; use futures_util::StreamExt;
use uuid::Uuid;
use crate::task::BoardChange;
use super::{redis_xread, redis_xadd, ws_receive, ws_send}; use super::{redis_xread, redis_xadd, ws_receive, ws_send};
pub async fn handler( pub async fn handler(
@ -9,6 +11,16 @@ pub async fn handler(
rclient: redis::Client, rclient: redis::Client,
websocket: WebSocket, websocket: WebSocket,
) { ) {
log::trace!("Creating Redis connection for the main thread...");
let main_redis = rclient.get_async_connection().await;
if main_redis.is_err() {
log::error!("Could not open Redis connection for the main thread.");
let _ = websocket.close().await;
return;
}
let mut main_redis = main_redis.unwrap();
log::trace!("Created Redis connection for the main thread!");
log::trace!("Creating Redis connection for the XADD thread..."); log::trace!("Creating Redis connection for the XADD thread...");
let xadd_redis = rclient.get_async_connection().await; let xadd_redis = rclient.get_async_connection().await;
if xadd_redis.is_err() { if xadd_redis.is_err() {
@ -36,6 +48,14 @@ pub async fn handler(
let redis_key = format!("board:{{{board_name}}}:stream"); let redis_key = format!("board:{{{board_name}}}:stream");
log::trace!("Redis key is: {redis_key:?}"); log::trace!("Redis key is: {redis_key:?}");
log::trace!("Generating client UUID...");
let client_uuid = Uuid::new_v4();
log::trace!("Client UUID is: {client_uuid:?}");
log::trace!("Notifying clients of the new connection...");
let _connect_id = BoardChange::Connect(client_uuid).store_in_redis(&mut main_redis, &redis_key).await;
log::trace!("Notified clients of the new connection successfully!");
log::trace!("Creating synchronization structures..."); log::trace!("Creating synchronization structures...");
let strings_to_process: Arc<Queue<String>> = Arc::new(Queue::new()); let strings_to_process: Arc<Queue<String>> = Arc::new(Queue::new());
let messages_to_send: Arc<Queue<Message>> = Arc::new(Queue::new()); let messages_to_send: Arc<Queue<Message>> = Arc::new(Queue::new());
@ -65,7 +85,7 @@ pub async fn handler(
log::trace!("Spawning redis_xread_thread..."); log::trace!("Spawning redis_xread_thread...");
let redis_xread_thread = tokio::spawn(redis_xread::handler( let redis_xread_thread = tokio::spawn(redis_xread::handler(
xread_redis, xread_redis,
redis_key, redis_key.clone(),
messages_to_send.clone() messages_to_send.clone()
)); ));
let redis_xread_abort = redis_xread_thread.abort_handle(); let redis_xread_abort = redis_xread_thread.abort_handle();
@ -86,6 +106,10 @@ pub async fn handler(
redis_xadd_abort.abort(); redis_xadd_abort.abort();
redis_xread_abort.abort(); redis_xread_abort.abort();
log::trace!("Notifying clients of the disconnection...");
let _connect_id = BoardChange::Disconnect(client_uuid).store_in_redis(&mut main_redis, &redis_key).await;
log::trace!("Notified clients of the disconnection successfully!");
log::trace!("Waiting for the last messages to be sent..."); log::trace!("Waiting for the last messages to be sent...");
let _ws_send_join = tokio::join!(ws_send_thread); let _ws_send_join = tokio::join!(ws_send_thread);

View file

@ -12,6 +12,10 @@ pub enum BoardChange {
Title(String), Title(String),
/// Create, update, or delete the [`Task`] with the given [`Uuid`]. /// Create, update, or delete the [`Task`] with the given [`Uuid`].
Task(Uuid, Option<Task>), Task(Uuid, Option<Task>),
/// Add the given client to the connected clients list.
Connect(Uuid),
/// Remove the given client from the connected clients list.
Disconnect(Uuid),
} }
impl BoardChange { impl BoardChange {