diff --git a/todoblue/src/app/board/[board]/useWs.tsx b/todoblue/src/app/board/[board]/useWs.tsx index 51ccc1b..5d87070 100644 --- a/todoblue/src/app/board/[board]/useWs.tsx +++ b/todoblue/src/app/board/[board]/useWs.tsx @@ -3,10 +3,10 @@ import {default as React} from "react"; export interface UseWsHandlers { - onclose?: (event: CloseEvent) => void, - onerror?: (event: Event) => void, - onmessage?: (event: MessageEvent) => void, - onopen?: (event: Event) => void, + onclose?: (sock: WebSocket, event: CloseEvent) => void, + onerror?: (sock: WebSocket, event: Event) => void, + onmessage?: (sock: WebSocket, event: MessageEvent) => void, + onopen?: (sock: WebSocket, event: Event) => void, } @@ -15,10 +15,10 @@ export function useWs(url: string, {onclose, onerror, onmessage, onopen}: UseWsH React.useEffect(() => { const sock = new WebSocket(url); - sock.onclose = onclose ?? null; - sock.onerror = onerror ?? null; - sock.onmessage = onmessage ?? null; - sock.onopen = onopen ?? null; + sock.onclose = onclose ? (ev) => onclose(sock, ev) : null; + sock.onerror = onerror ? (ev) => onerror(sock, ev) : null; + sock.onmessage = onmessage ? (ev) => onmessage(sock, ev) : null; + sock.onopen = onopen ? (ev) => onopen(sock, ev) : null; setWebsocket(sock); return () => { sock.close(); diff --git a/todored/src/main.rs b/todored/src/main.rs index 2f843b4..b8a99a3 100644 --- a/todored/src/main.rs +++ b/todored/src/main.rs @@ -20,7 +20,7 @@ async fn main() { .route("/version", get(routes::root::version)) .route("/", post(routes::root::healthcheck)) .route("/healthcheck", post(routes::root::healthcheck)) - .route("/board/:board/ws", get(routes::board::websocket)) + .route("/board/:board/ws", get(routes::board::board_websocket)) .layer(axum::Extension(rclient)) .layer(tower_http::cors::CorsLayer::new() .allow_origin( diff --git a/todored/src/routes/board/mod.rs b/todored/src/routes/board/mod.rs index c3226f5..806a9b1 100644 --- a/todored/src/routes/board/mod.rs +++ b/todored/src/routes/board/mod.rs @@ -1,7 +1,9 @@ pub mod structs; -pub(crate) mod axum; +pub(self) mod axum; pub(self) mod ws; pub(self) mod ws_receive; pub(self) mod redis_xadd; pub(self) mod redis_xread; pub(self) mod ws_send; + +pub(crate) use self::axum::handler as board_websocket; diff --git a/todored/src/routes/board/redis_xadd.rs b/todored/src/routes/board/redis_xadd.rs index 01abfa9..b524e58 100644 --- a/todored/src/routes/board/redis_xadd.rs +++ b/todored/src/routes/board/redis_xadd.rs @@ -1,38 +1,31 @@ -use axum::extract::ws::{Message, WebSocket}; -use futures_util::stream::SplitStream; -use crate::routes::board::structs::BoardAction; +use std::sync::Arc; +use axum::extract::ws::CloseCode; +use deadqueue::unlimited::Queue; +use redis::aio::Connection; +use crate::routes::board::structs::{BoardAction, BoardRequest}; pub async fn handler( - mut receiver: SplitStream, - mut rconn: redis::aio::Connection, - board_name: &str, -) -> Result, ()> { + mut rconn: Connection, + key: String, + strings_to_process: Arc>, +) -> CloseCode { + log::trace!("Thread started!"); - log::trace!("Handling websocket frame..."); - match value { - Message::Text(value) => { - log::trace!("Trying to parse value from websocket as a BoardRequest..."); - let action = serde_json::de::from_str::(&value); + loop { + log::trace!("Waiting for strings to process..."); + let message = strings_to_process.pop().await; - if let Err(err) = action { - log::error!("Could not parse value received from websocket as a BoardRequest: {err:?}"); - return receiver; - } - let value = action.unwrap(); + log::trace!("Trying to parse string as a BoardAction..."); + let action = serde_json::de::from_str::(&message); - BoardRequest { - name: - } - - value.handle(&mut rconn).await; - } - Message::Binary(_) => {} - Message::Ping(_) => {} - Message::Pong(_) => {} - Message::Close(value) => { - log::debug!("Client closed websocket: {value:?}"); - return receiver; - } + if let Err(err) = action { + log::error!("Could not parse value received from websocket as a BoardRequest, closing connection: {err:?}"); + return 1002; } + let key = key.to_owned(); + let action = action.unwrap(); + + log::trace!("Handling BoardRequest..."); + BoardRequest { key, action }.handle(&mut rconn).await; } } diff --git a/todored/src/routes/board/redis_xread.rs b/todored/src/routes/board/redis_xread.rs index 27c3fef..ef881a3 100644 --- a/todored/src/routes/board/redis_xread.rs +++ b/todored/src/routes/board/redis_xread.rs @@ -1,26 +1,46 @@ -use axum::extract::ws::{Message, WebSocket}; -use futures_util::stream::SplitSink; +use std::sync::Arc; +use axum::extract::ws::{CloseCode, Message}; +use deadqueue::unlimited::Queue; +use redis::aio::Connection; + +pub type XReadResult = (String, String, String, String); pub async fn handler( - mut sender: SplitSink, - mut rconn: redis::aio::Connection, - board_name: &str, -) -> SplitSink { - log::trace!("Writer thread spawned successfully!"); + mut rconn: Connection, + key: String, + messages_to_send: Arc>, +) -> CloseCode { + log::trace!("Thread started!"); - log::trace!("Computing Redis key..."); - let stream_key = format!("board:{{{board}}}:stream"); + let mut seq = "0".to_string(); loop { + log::trace!("Waiting for events to broadcast for 5 seconds..."); let response = redis::cmd("XREAD") - .arg(&stream_key) .arg("COUNT") .arg(1) .arg("BLOCK") - .arg(30000) + .arg(5000) + .arg("STREAMS") + .arg(&key) + .arg(&seq) + .query_async::>(&mut rconn).await; + if let Err(err) = response { + log::error!("Could not XREAD Redis stream, closing connection: {err:?}"); + return 1002; + } + let response = response.unwrap(); + + if response.is_none() { + continue; + } + let response = response.unwrap(); + + seq = response.1; + let message = response.3; + + log::trace!("Received event, sending it: {message:?}"); + messages_to_send.push(Message::Text(message)) } - - log::trace!("Nothing to do, returning..."); - sender } diff --git a/todored/src/routes/board/structs.rs b/todored/src/routes/board/structs.rs index 53bcb89..0efa54d 100644 --- a/todored/src/routes/board/structs.rs +++ b/todored/src/routes/board/structs.rs @@ -6,9 +6,9 @@ use crate::task::{BoardChange, Task}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BoardRequest { /// The Redis key to act on. - key: String, + pub key: String, /// The [`BoardAction`] to perform. - action: BoardAction, + pub action: BoardAction, } impl BoardRequest { diff --git a/todored/src/routes/board/ws.rs b/todored/src/routes/board/ws.rs index ffea2ae..8f8a03f 100644 --- a/todored/src/routes/board/ws.rs +++ b/todored/src/routes/board/ws.rs @@ -1,8 +1,7 @@ -use axum::extract::ws::{Message, WebSocket}; -use tokio::sync::RwLock; +use axum::extract::ws::{CloseCode, CloseFrame, Message, WebSocket}; use std::sync::Arc; use deadqueue::unlimited::Queue; -use futures_util::{SinkExt, StreamExt}; +use futures_util::StreamExt; use super::{redis_xread, redis_xadd, ws_receive, ws_send}; pub async fn handler( @@ -10,43 +9,92 @@ pub async fn handler( rclient: redis::Client, websocket: WebSocket, ) { + log::trace!("Creating Redis connection for the XADD thread..."); + let xadd_redis = rclient.get_async_connection().await; + if xadd_redis.is_err() { + log::error!("Could not open Redis connection for the XADD thread."); + let _ = websocket.close().await; + return; + } + let xadd_redis = xadd_redis.unwrap(); + log::trace!("Created Redis connection for the XADD thread!"); + + log::trace!("Creating Redis connection for the XREAD thread..."); + let xread_redis = rclient.get_async_connection().await; + if xread_redis.is_err() { + log::error!("Could not open Redis connection for the XREAD thread."); + let _ = websocket.close().await; + return; + } + let xread_redis = xread_redis.unwrap(); + log::trace!("Created Redis connection for the XREAD thread!"); + log::trace!("Splitting socket into two separate pipes..."); - let (mut sender, receiver) = websocket.split(); + let (sender, receiver) = websocket.split(); - log::trace!("Creating Redis connection for the reader thread..."); - let reader_redis = rclient.get_async_connection().await; - if reader_redis.is_err() { - log::error!("Could not open Redis connection for the reader thread."); - let _ = sender.close().await; - return; - } - let reader_redis = reader_redis.unwrap(); - log::trace!("Created Redis connection for the reader thread!"); + log::trace!("Determining Redis key to operate on..."); + let redis_key = format!("board:{{{board_name}}}:stream"); + log::trace!("Redis key is: {redis_key:?}"); - log::trace!("Creating Redis connection for the writer thread..."); - let writer_redis = rclient.get_async_connection().await; - if writer_redis.is_err() { - log::error!("Could not open Redis connection for the writer thread."); - let _ = sender.close().await; - return; - } - let writer_redis = writer_redis.unwrap(); - log::trace!("Created Redis connection for the writer thread!"); - - let is_ws_closing: Arc> = Arc::new(RwLock::new(false)); + log::trace!("Creating synchronization structures..."); let strings_to_process: Arc> = Arc::new(Queue::new()); let messages_to_send: Arc> = Arc::new(Queue::new()); + log::trace!("Spawning ws_receive_thread..."); let ws_receive_thread = tokio::spawn(ws_receive::handler( receiver, - is_ws_closing.clone(), strings_to_process.clone(), messages_to_send.clone(), )); + let ws_receive_abort = ws_receive_thread.abort_handle(); - todo!(); - let ws_send_thread = tokio::spawn(ws_send::handler(sender)); - let redis_xadd_thread = tokio::spawn(redis_xadd::handler()); - let redis_xread_thread = tokio::spawn(redis_xread::handler()); + log::trace!("Spawning ws_send_thread..."); + let ws_send_thread = tokio::spawn(ws_send::handler( + sender, + messages_to_send.clone(), + )); + log::trace!("Spawning redis_xadd_thread..."); + let redis_xadd_thread = tokio::spawn(redis_xadd::handler( + xadd_redis, + redis_key.clone(), + strings_to_process.clone() + )); + let redis_xadd_abort = redis_xadd_thread.abort_handle(); + + log::trace!("Spawning redis_xread_thread..."); + let redis_xread_thread = tokio::spawn(redis_xread::handler( + xread_redis, + redis_key, + messages_to_send.clone() + )); + let redis_xread_abort = redis_xread_thread.abort_handle(); + + log::trace!("Waiting for the socket to close..."); + tokio::select!( + cc = ws_receive_thread => { + close_with_code(messages_to_send, match cc { Ok(cc) => cc, _ => 1000 }); + }, + cc = redis_xadd_thread => { + close_with_code(messages_to_send, match cc { Ok(cc) => cc, _ => 1000 }); + }, + cc = redis_xread_thread => { + close_with_code(messages_to_send, match cc { Ok(cc) => cc, _ => 1000 }); + }, + ); + ws_receive_abort.abort(); + redis_xadd_abort.abort(); + redis_xread_abort.abort(); + + log::trace!("Waiting for the last messages to be sent..."); + let _ws_send_join = tokio::join!(ws_send_thread); + + log::debug!("Websocket threads closed successfully!") +} + +fn close_with_code( + messages_to_send: Arc>, + close_code: CloseCode, +) { + messages_to_send.push(Message::Close(Some(CloseFrame { code: close_code, reason: "".into() }))); } diff --git a/todored/src/routes/board/ws_receive.rs b/todored/src/routes/board/ws_receive.rs index 038dcb6..4d1273e 100644 --- a/todored/src/routes/board/ws_receive.rs +++ b/todored/src/routes/board/ws_receive.rs @@ -1,17 +1,15 @@ -use axum::extract::ws::{CloseFrame, Message, WebSocket}; +use axum::extract::ws::{CloseCode, Message, WebSocket}; use futures_util::stream::SplitStream; use deadqueue::unlimited::Queue; -use tokio::sync::RwLock; use std::sync::Arc; use futures_util::StreamExt; pub async fn handler( mut receiver: SplitStream, - is_ws_closing: Arc>, strings_to_process: Arc>, messages_to_send: Arc>, -) { - log::trace!("ws_receive thread started!"); +) -> CloseCode { + log::trace!("Thread started!"); loop { log::trace!("Awaiting data from the websocket..."); @@ -21,46 +19,37 @@ pub async fn handler( log::trace!("Checking if the websocket timed out..."); if value.is_none() { log::debug!("Websocket timed out, closing connection."); - let is_ws_closing = is_ws_closing.write().await; - *is_ws_closing = true; - messages_to_send.push(Message::Close(Some(CloseFrame { code: 1001, reason: "Timed out".into() }))); - return; + return 1001; } let value = value.unwrap(); log::trace!("Checking if websocket returned an error..."); if let Err(err) = value { log::error!("Websocket returned error: {err:?}"); - return; + return 1002; } let value = value.unwrap(); log::trace!("Delegating websocket message..."); match value { Message::Text(msg) => { - log::trace!("Received a string, delegating to message handler."); + log::trace!("Received a string, delegating to message handler: {msg:?}"); strings_to_process.push(msg); } Message::Binary(_) => { log::warn!("Received a binary, closing connection."); - let is_ws_closing = is_ws_closing.write().await; - *is_ws_closing = true; - messages_to_send.push(Message::Close(Some(CloseFrame { code: 1003, reason: "Binary is unsupported".into() }))); - return; + return 1003; } Message::Ping(vec) => { - log::trace!("Received a ping, delegating to pong handler."); + log::trace!("Received a ping, delegating to pong handler: {vec:?}"); messages_to_send.push(Message::Pong(vec)); } Message::Pong(_) => { log::warn!("Received a pong, ignoring.") } - Message::Close(_) => { - log::debug!("Received a close, closing connection."); - let is_ws_closing = is_ws_closing.write().await; - *is_ws_closing = true; - messages_to_send.push(Message::Close(Some(CloseFrame { code: 1000, reason: "Closing as requested".into() }))); - return; + Message::Close(cls) => { + log::debug!("Received a close, closing connection: {cls:?}"); + return 1000; } } } diff --git a/todored/src/routes/board/ws_send.rs b/todored/src/routes/board/ws_send.rs index e69de29..392fd43 100644 --- a/todored/src/routes/board/ws_send.rs +++ b/todored/src/routes/board/ws_send.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; +use axum::extract::ws::{Message, WebSocket}; +use deadqueue::unlimited::Queue; +use futures_util::SinkExt; +use futures_util::stream::SplitSink; +use crate::outcome::LoggableOutcome; + +pub async fn handler( + mut sender: SplitSink, + messages_to_send: Arc>, +) { + log::trace!("Thread started!"); + + loop { + log::trace!("Awaiting data to send..."); + let message = messages_to_send.pop().await; + + let exit_when_done = match message { + Message::Close(_) => true, + _ => false, + }; + + log::trace!("Sending message: {message:?}"); + let _ = sender.send(message).await + .log_err_to_warn("Could not send message"); + + if exit_when_done { + log::trace!("Done sending messages, shutting down..."); + return; + } + } +} diff --git a/todored/src/task.rs b/todored/src/task.rs index 93c9224..6abf10c 100644 --- a/todored/src/task.rs +++ b/todored/src/task.rs @@ -26,7 +26,7 @@ impl BoardChange { .log_err_to_error("Failed to serialize BoardOperation") .map_err(|_| ())?; - log::trace!("Adding to the Redis stream {stream_key:?}..."); + log::trace!("Adding to the Redis stream {key:?}..."); let id = redis::cmd("XADD") .arg(key) .arg("*")