From 2f3cc201c75f900980bb809bc99ca7e8278e428f Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Tue, 1 Aug 2023 01:52:12 +0200 Subject: [PATCH] Get `/board/{board}/ws` to work, definitely --- .idea/runConfigurations/Check_with_clippy.xml | 19 +++++ todoblue/src/app/board/[board]/useBoard.tsx | 6 +- todored/src/main.rs | 2 +- todored/src/routes/board/redis_xadd.rs | 2 +- todored/src/routes/board/redis_xread.rs | 75 ++++++++++++------- todored/src/routes/board/structs.rs | 10 +-- todored/src/routes/board/ws_send.rs | 5 +- todored/src/routes/root.rs | 2 +- todored/src/task.rs | 6 +- 9 files changed, 84 insertions(+), 43 deletions(-) create mode 100644 .idea/runConfigurations/Check_with_clippy.xml diff --git a/.idea/runConfigurations/Check_with_clippy.xml b/.idea/runConfigurations/Check_with_clippy.xml new file mode 100644 index 0000000..ffeb569 --- /dev/null +++ b/.idea/runConfigurations/Check_with_clippy.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/todoblue/src/app/board/[board]/useBoard.tsx b/todoblue/src/app/board/[board]/useBoard.tsx index dfb4b0c..dca5999 100644 --- a/todoblue/src/app/board/[board]/useBoard.tsx +++ b/todoblue/src/app/board/[board]/useBoard.tsx @@ -1,14 +1,16 @@ 'use client'; + import {default as React} from "react"; import {useWs} from "@/app/board/[board]/useWs" export function useBoard(url: string) { const socket = useWs(url, { - onopen: React.useCallback(() => { + onopen: React.useCallback((sock: WebSocket, event: Event) => { console.debug("[useBoard] Connected to the server!"); + sock.send('{"Title": "sus"}') }, []), - onmessage: React.useCallback((event: MessageEvent) => { + onmessage: React.useCallback((sock: WebSocket, event: MessageEvent) => { const data = JSON.parse(event.data); console.debug("[useBoard] Received ServerOperation: ", data); }, []) diff --git a/todored/src/main.rs b/todored/src/main.rs index b8a99a3..5262fcf 100644 --- a/todored/src/main.rs +++ b/todored/src/main.rs @@ -35,7 +35,7 @@ async fn main() { ) ); - axum::Server::bind(&std::net::SocketAddr::from_str(&**config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr")) + axum::Server::bind(&std::net::SocketAddr::from_str(&config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr")) .serve(router.into_make_service()) .await .expect("to be able to run the Axum server"); diff --git a/todored/src/routes/board/redis_xadd.rs b/todored/src/routes/board/redis_xadd.rs index b524e58..9fe20d5 100644 --- a/todored/src/routes/board/redis_xadd.rs +++ b/todored/src/routes/board/redis_xadd.rs @@ -26,6 +26,6 @@ pub async fn handler( let action = action.unwrap(); log::trace!("Handling BoardRequest..."); - BoardRequest { key, action }.handle(&mut rconn).await; + BoardRequest { board: key, action }.handle(&mut rconn).await; } } diff --git a/todored/src/routes/board/redis_xread.rs b/todored/src/routes/board/redis_xread.rs index ef881a3..7e21f70 100644 --- a/todored/src/routes/board/redis_xread.rs +++ b/todored/src/routes/board/redis_xread.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use axum::extract::ws::{CloseCode, Message}; use deadqueue::unlimited::Queue; use redis::aio::Connection; - -pub type XReadResult = (String, String, String, String); +use redis::{AsyncCommands, FromRedisValue, RedisResult}; +use redis::streams::{StreamReadOptions, StreamReadReply}; pub async fn handler( mut rconn: Connection, @@ -16,31 +16,54 @@ pub async fn handler( loop { log::trace!("Waiting for events to broadcast for 5 seconds..."); - let response = redis::cmd("XREAD") - .arg("COUNT") - .arg(1) - .arg("BLOCK") - .arg(5000) - .arg("STREAMS") - .arg(&key) - .arg(&seq) - .query_async::>(&mut rconn).await; + let response: RedisResult = rconn.xread_options( + &[&key], + &[&seq], + &StreamReadOptions::default().block(5000) + ).await; - if let Err(err) = response { - log::error!("Could not XREAD Redis stream, closing connection: {err:?}"); - return 1002; + match response { + Err(err) => { + log::error!("Could not XREAD Redis stream, closing connection: {err:?}"); + return 1002; + }, + Ok(reply) => { + match reply.keys.get(0) { + None => { + log::trace!("Stream does not exist yet, retrying..."); + } + Some(key) => { + key.ids.iter().for_each(|id| { + match id.map.get("change") { + None => { + log::warn!("Malformed event, skipping: {id:?}"); + } + Some(value) => { + match value { + redis::Value::Data(data) => { + match String::from_byte_vec(data) { + None => { + log::warn!("Event with no data, skipping: {data:?}"); + } + Some(strings) => { + strings.into_iter().for_each(|string| { + log::trace!("Received event, sending it: {string:?}"); + messages_to_send.push(Message::Text(string)) + }) + } + } + } + _ => { + log::warn!("Malformed value, skipping..."); + } + } + } + } + seq = id.id.clone(); + }) + } + } + }, } - let response = response.unwrap(); - - if response.is_none() { - continue; - } - let response = response.unwrap(); - - seq = response.1; - let message = response.3; - - log::trace!("Received event, sending it: {message:?}"); - messages_to_send.push(Message::Text(message)) } } diff --git a/todored/src/routes/board/structs.rs b/todored/src/routes/board/structs.rs index 0efa54d..75679c3 100644 --- a/todored/src/routes/board/structs.rs +++ b/todored/src/routes/board/structs.rs @@ -6,7 +6,7 @@ use crate::task::{BoardChange, Task}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BoardRequest { /// The Redis key to act on. - pub key: String, + pub board: String, /// The [`BoardAction`] to perform. pub action: BoardAction, } @@ -19,24 +19,24 @@ impl BoardRequest { BoardAction::Title(title) => { log::debug!("Setting board Title: {title:?}"); let operation = BoardChange::Title(title); - let _id = operation.store_in_redis(rconn, &self.key).await; + let _id = operation.store_in_redis(rconn, &self.board).await; }, BoardAction::Task(None, Some(task)) => { log::debug!("Creating Task: {task:?}"); let id = Uuid::new_v4(); log::trace!("Assigned id {id:?} to Task: {task:?}"); let operation = BoardChange::Task(id, Some(task)); - let _id = operation.store_in_redis(rconn, &self.key).await; + let _id = operation.store_in_redis(rconn, &self.board).await; }, BoardAction::Task(Some(id), Some(task)) => { log::debug!("Editing Task {id:?}: {task:?}"); let operation = BoardChange::Task(id, Some(task)); - let _id = operation.store_in_redis(rconn, &self.key).await; + let _id = operation.store_in_redis(rconn, &self.board).await; }, BoardAction::Task(Some(id), None) => { log::debug!("Deleting Task {id:?}..."); let operation = BoardChange::Task(id, None); - let _id = operation.store_in_redis(rconn, &self.key).await; + let _id = operation.store_in_redis(rconn, &self.board).await; }, _ => { log::warn!("Received unknown BoardRequest: {self:?}"); diff --git a/todored/src/routes/board/ws_send.rs b/todored/src/routes/board/ws_send.rs index 392fd43..44f434f 100644 --- a/todored/src/routes/board/ws_send.rs +++ b/todored/src/routes/board/ws_send.rs @@ -15,10 +15,7 @@ pub async fn handler( log::trace!("Awaiting data to send..."); let message = messages_to_send.pop().await; - let exit_when_done = match message { - Message::Close(_) => true, - _ => false, - }; + let exit_when_done = matches!(message, Message::Close(_)); log::trace!("Sending message: {message:?}"); let _ = sender.send(message).await diff --git a/todored/src/routes/root.rs b/todored/src/routes/root.rs index b486052..2189170 100644 --- a/todored/src/routes/root.rs +++ b/todored/src/routes/root.rs @@ -33,7 +33,7 @@ pub async fn healthcheck( response.eq("PONG") .then_some(()) .log_err_to_error("Received invalid PONG from Redis") - .ok_or_else(|| StatusCode::INTERNAL_SERVER_ERROR)?; + .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(compose_version())) } diff --git a/todored/src/task.rs b/todored/src/task.rs index 6abf10c..bf8097d 100644 --- a/todored/src/task.rs +++ b/todored/src/task.rs @@ -22,7 +22,7 @@ impl BoardChange { log::debug!("Storing BoardOperation in Redis: {:?}", &self); log::trace!("Serializing BoardOperation to JSON..."); - let operation = serde_json::ser::to_string(self) + let change = serde_json::ser::to_string(self) .log_err_to_error("Failed to serialize BoardOperation") .map_err(|_| ())?; @@ -30,8 +30,8 @@ impl BoardChange { let id = redis::cmd("XADD") .arg(key) .arg("*") - .arg("operation") - .arg(operation) + .arg("change") + .arg(change) .query_async::(rconn).await .log_err_to_error("Failed to XADD to Redis") .map_err(|_| ())?;