From 3ff50d8584cb79b05206bdc0c319bc84ef04f655 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Mon, 31 Jul 2023 16:57:38 +0200 Subject: [PATCH] *Git refused to commit for a while* --- todored/Cargo.lock | 34 ++++++++- todored/Cargo.toml | 4 +- todored/src/main.rs | 7 +- todored/src/op.rs | 53 -------------- todored/src/outcome.rs | 91 +++++++++++++++++++++++++ todored/src/routes/board.rs | 67 ------------------ todored/src/routes/board/axum.rs | 12 ++++ todored/src/routes/board/mod.rs | 7 ++ todored/src/routes/board/redis_xadd.rs | 38 +++++++++++ todored/src/routes/board/redis_xread.rs | 26 +++++++ todored/src/routes/board/structs.rs | 56 +++++++++++++++ todored/src/routes/board/ws.rs | 52 ++++++++++++++ todored/src/routes/board/ws_receive.rs | 67 ++++++++++++++++++ todored/src/routes/board/ws_send.rs | 0 todored/src/routes/root.rs | 24 ++++--- todored/src/task.rs | 55 +++++++++++++++ todored/src/utils.rs | 49 ------------- 17 files changed, 456 insertions(+), 186 deletions(-) delete mode 100644 todored/src/op.rs create mode 100644 todored/src/outcome.rs delete mode 100644 todored/src/routes/board.rs create mode 100644 todored/src/routes/board/axum.rs create mode 100644 todored/src/routes/board/mod.rs create mode 100644 todored/src/routes/board/redis_xadd.rs create mode 100644 todored/src/routes/board/redis_xread.rs create mode 100644 todored/src/routes/board/structs.rs create mode 100644 todored/src/routes/board/ws.rs create mode 100644 todored/src/routes/board/ws_receive.rs create mode 100644 todored/src/routes/board/ws_send.rs delete mode 100644 todored/src/utils.rs diff --git a/todored/Cargo.lock b/todored/Cargo.lock index 9dbb3b9..a9c8b43 100644 --- a/todored/Cargo.lock +++ b/todored/Cargo.lock @@ -207,6 +207,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -223,6 +242,16 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" +[[package]] +name = "deadqueue" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16a2561fd313df162315935989dceb8c99db4ee1933358270a57a3cfb8c957f3" +dependencies = [ + "crossbeam-queue", + "tokio", +] + [[package]] name = "digest" version = "0.10.7" @@ -1039,8 +1068,8 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" name = "todored" version = "0.1.0" dependencies = [ - "async-trait", "axum", + "deadqueue", "futures-util", "log", "micronfig", @@ -1048,6 +1077,7 @@ dependencies = [ "pretty_env_logger", "redis", "serde", + "serde_json", "tokio", "tower-http", "uuid", @@ -1261,8 +1291,8 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ + "getrandom", "serde", - "sha1_smol", ] [[package]] diff --git a/todored/Cargo.toml b/todored/Cargo.toml index a2df0e8..90a2414 100644 --- a/todored/Cargo.toml +++ b/todored/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.72" axum = { version = "0.6.19", features = ["ws"] } +deadqueue = "0.2.4" futures-util = "0.3.28" log = "0.4.19" micronfig = "0.2.0" @@ -15,5 +15,7 @@ pkg-version = "1.0.0" pretty_env_logger = "0.5.0" redis = { version = "0.23.1", features = ["r2d2", "ahash", "cluster", "tokio-comp", "connection-manager"] } serde = { version = "1.0.178", features = ["derive"] } +serde_json = "1.0.104" tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] } tower-http = { version = "0.4.3", features = ["cors"] } +uuid = { version = "1.4.1", features = ["serde", "v4"] } diff --git a/todored/src/main.rs b/todored/src/main.rs index ebd5ea2..2f843b4 100644 --- a/todored/src/main.rs +++ b/todored/src/main.rs @@ -2,11 +2,10 @@ use std::str::FromStr; use axum::routing::{get, post}; +pub mod outcome; pub mod task; -pub(crate) mod op; -mod config; +pub(crate) mod config; mod routes; -mod utils; #[tokio::main] async fn main() { @@ -40,4 +39,4 @@ async fn main() { .serve(router.into_make_service()) .await .expect("to be able to run the Axum server"); -} \ No newline at end of file +} diff --git a/todored/src/op.rs b/todored/src/op.rs deleted file mode 100644 index 6eb071b..0000000 --- a/todored/src/op.rs +++ /dev/null @@ -1,53 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use super::task::Task; - -/// An operation sent from the client to the server. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum ClientOperation { - /// Set the board's title. - Title(String), - /// Create a new [`Task`], or update or delete the task with the given [`Uuid`]. - Task(Option, Option), -} - -/// An operation sent from the server to the clients, and stored on the database. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum BoardOperation { - /// Set the board's title. - Title(String), - /// Create, update, or delete the [`Task`] with the given [`Uuid`]. - Task(String, Option), -} - -impl ClientOperation { - fn handle(self) { - todo!() - } -} - -impl BoardOperation { - /// Store this in Redis. - pub(crate) async fn store_or_500(&self, rconn: &mut redis::aio::Connection, board: &str) -> Result { - log::debug!("Storing BoardOperation in Redis: {:?}", &self); - - log::trace!("Computing Redis key..."); - let stream_key = format!("board:{{{board}}}:stream"); - - log::trace!("Serializing BoardOperation to JSON..."); - let operation = serde_json::ser::to_string(self) - .expect_or_500_and_log("Failed to serialize BoardOperation")?; - - log::trace!("Adding to the Redis stream {stream_key:?}..."); - let id = redis::cmd("XADD") - .arg(stream_key) - .arg("*") - .arg("operation") - .arg(operation) - .query_async::(rconn).await - .expect_or_500_and_log("Failed to XADD to Redis")?; - - log::trace!("Added to Redis stream with id {id:?}!"); - Ok(id) - } -} diff --git a/todored/src/outcome.rs b/todored/src/outcome.rs new file mode 100644 index 0000000..af554ea --- /dev/null +++ b/todored/src/outcome.rs @@ -0,0 +1,91 @@ +//! Module containing various utility type and trait definitions related to error handling. + +use std::fmt::Debug; +use axum::http::StatusCode; + +pub type ResponseError = StatusCode; +pub type Response = Result; + +/// Trait to easily [`log`] function outcomes. +pub(crate) trait LoggableOutcome { + fn log_err_to_trace(self, msg: &str) -> Self; + fn log_err_to_debug(self, msg: &str) -> Self; + fn log_err_to_info(self, msg: &str) -> Self; + fn log_err_to_warn(self, msg: &str) -> Self; + fn log_err_to_error(self, msg: &str) -> Self; +} + +impl LoggableOutcome for Result where E: Debug { + fn log_err_to_trace(self, msg: &str) -> Self { + self.map_err(|err| { + log::trace!("{msg}: {err:?}"); + err + }) + } + + fn log_err_to_debug(self, msg: &str) -> Self { + self.map_err(|err| { + log::debug!("{msg}: {err:?}"); + err + }) + } + + fn log_err_to_info(self, msg: &str) -> Self { + self.map_err(|err| { + log::info!("{msg}: {err:?}"); + err + }) + } + + fn log_err_to_warn(self, msg: &str) -> Self { + self.map_err(|err| { + log::warn!("{msg}: {err:?}"); + err + }) + } + + fn log_err_to_error(self, msg: &str) -> Self { + self.map_err(|err| { + log::error!("{msg}: {err:?}"); + err + }) + } +} + +impl LoggableOutcome for Option { + fn log_err_to_trace(self, msg: &str) -> Self { + if self.is_none() { + log::trace!("{msg}"); + } + self + } + + fn log_err_to_debug(self, msg: &str) -> Self { + if self.is_none() { + log::debug!("{msg}"); + } + self + } + + fn log_err_to_info(self, msg: &str) -> Self { + if self.is_none() { + log::info!("{msg}"); + } + self + } + + fn log_err_to_warn(self, msg: &str) -> Self { + if self.is_none() { + log::warn!("{msg}"); + } + self + } + + fn log_err_to_error(self, msg: &str) -> Self { + if self.is_none() { + log::error!("{msg}"); + } + self + } + +} diff --git a/todored/src/routes/board.rs b/todored/src/routes/board.rs deleted file mode 100644 index 06c811c..0000000 --- a/todored/src/routes/board.rs +++ /dev/null @@ -1,67 +0,0 @@ -use axum::Extension; -use axum::extract::{Path, WebSocketUpgrade}; -use axum::extract::ws::{Message, WebSocket}; -use axum::response::Response; -use futures_util::{SinkExt, stream::{SplitSink, SplitStream, StreamExt}}; - -pub(crate) async fn websocket( - Path(board): Path, - Extension(rclient): Extension, - ws: WebSocketUpgrade, -) -> Response { - log::trace!("Received websocket request, upgrading..."); - ws.on_upgrade(|socket| splitter(socket, rclient, board)) -} - -async fn splitter( - socket: WebSocket, - rclient: redis::Client, - board: String, -) { - log::trace!("Splitting socket into two separate pipes..."); - let (mut sender, receiver) = socket.split(); - - log::trace!("Creating Redis connection for the reader thread..."); - let reader_redis = rclient.get_async_connection().await; - if reader_redis.is_err() { - log::error!("Could not open Redis connection for the reader thread."); - let _ = sender.close().await; - return; - } - let reader_redis = reader_redis.unwrap(); - log::trace!("Created Redis connection for the reader thread!"); - - log::trace!("Creating Redis connection for the writer thread..."); - let writer_redis = rclient.get_async_connection().await; - if writer_redis.is_err() { - log::error!("Could not open Redis connection for the writer thread."); - let _ = sender.close().await; - return; - } - let writer_redis = writer_redis.unwrap(); - log::trace!("Created Redis connection for the writer thread!"); - - let reader_thread = tokio::spawn(reader(receiver, reader_redis)); - let writer_thread = tokio::spawn(writer(sender, writer_redis)); -} - -async fn reader( - receiver: SplitStream, - reader_redis: redis::aio::Connection, -) -> SplitStream { - log::trace!("Reader thread spawned successfully!"); - todo!() -} - -async fn writer( - mut sender: SplitSink, - writer_redis: redis::aio::Connection, -) -> SplitSink { - log::trace!("Writer thread spawned successfully!"); - - log::trace!("Sending test message..."); - let _ = sender.send(Message::Text("\"Garasauto\"".to_string())).await; - log::trace!("Sent test message!"); - - todo!() -} \ No newline at end of file diff --git a/todored/src/routes/board/axum.rs b/todored/src/routes/board/axum.rs new file mode 100644 index 0000000..52d543e --- /dev/null +++ b/todored/src/routes/board/axum.rs @@ -0,0 +1,12 @@ +use axum::Extension; +use axum::extract::{Path, WebSocketUpgrade}; +use super::ws; + +pub(crate) async fn handler( + Path(board): Path, + Extension(rclient): Extension, + upgrade_request: WebSocketUpgrade, +) -> axum::response::Response { + log::trace!("Received websocket request, upgrading..."); + upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket)) +} diff --git a/todored/src/routes/board/mod.rs b/todored/src/routes/board/mod.rs new file mode 100644 index 0000000..c3226f5 --- /dev/null +++ b/todored/src/routes/board/mod.rs @@ -0,0 +1,7 @@ +pub mod structs; +pub(crate) mod axum; +pub(self) mod ws; +pub(self) mod ws_receive; +pub(self) mod redis_xadd; +pub(self) mod redis_xread; +pub(self) mod ws_send; diff --git a/todored/src/routes/board/redis_xadd.rs b/todored/src/routes/board/redis_xadd.rs new file mode 100644 index 0000000..01abfa9 --- /dev/null +++ b/todored/src/routes/board/redis_xadd.rs @@ -0,0 +1,38 @@ +use axum::extract::ws::{Message, WebSocket}; +use futures_util::stream::SplitStream; +use crate::routes::board::structs::BoardAction; + +pub async fn handler( + mut receiver: SplitStream, + mut rconn: redis::aio::Connection, + board_name: &str, +) -> Result, ()> { + + log::trace!("Handling websocket frame..."); + match value { + Message::Text(value) => { + log::trace!("Trying to parse value from websocket as a BoardRequest..."); + let action = serde_json::de::from_str::(&value); + + if let Err(err) = action { + log::error!("Could not parse value received from websocket as a BoardRequest: {err:?}"); + return receiver; + } + let value = action.unwrap(); + + BoardRequest { + name: + } + + value.handle(&mut rconn).await; + } + Message::Binary(_) => {} + Message::Ping(_) => {} + Message::Pong(_) => {} + Message::Close(value) => { + log::debug!("Client closed websocket: {value:?}"); + return receiver; + } + } + } +} diff --git a/todored/src/routes/board/redis_xread.rs b/todored/src/routes/board/redis_xread.rs new file mode 100644 index 0000000..27c3fef --- /dev/null +++ b/todored/src/routes/board/redis_xread.rs @@ -0,0 +1,26 @@ +use axum::extract::ws::{Message, WebSocket}; +use futures_util::stream::SplitSink; + +pub async fn handler( + mut sender: SplitSink, + mut rconn: redis::aio::Connection, + board_name: &str, +) -> SplitSink { + log::trace!("Writer thread spawned successfully!"); + + log::trace!("Computing Redis key..."); + let stream_key = format!("board:{{{board}}}:stream"); + + loop { + let response = redis::cmd("XREAD") + .arg(&stream_key) + .arg("COUNT") + .arg(1) + .arg("BLOCK") + .arg(30000) + + } + + log::trace!("Nothing to do, returning..."); + sender +} diff --git a/todored/src/routes/board/structs.rs b/todored/src/routes/board/structs.rs new file mode 100644 index 0000000..53bcb89 --- /dev/null +++ b/todored/src/routes/board/structs.rs @@ -0,0 +1,56 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use crate::task::{BoardChange, Task}; + +/// A request sent from a client to the server to perform a [`BoardAction`] on a board. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BoardRequest { + /// The Redis key to act on. + key: String, + /// The [`BoardAction`] to perform. + action: BoardAction, +} + +impl BoardRequest { + pub(crate) async fn handle(self, rconn: &mut redis::aio::Connection) { + log::trace!("Handling BoardRequest: {self:?}"); + + match self.action { + BoardAction::Title(title) => { + log::debug!("Setting board Title: {title:?}"); + let operation = BoardChange::Title(title); + let _id = operation.store_in_redis(rconn, &self.key).await; + }, + BoardAction::Task(None, Some(task)) => { + log::debug!("Creating Task: {task:?}"); + let id = Uuid::new_v4(); + log::trace!("Assigned id {id:?} to Task: {task:?}"); + let operation = BoardChange::Task(id, Some(task)); + let _id = operation.store_in_redis(rconn, &self.key).await; + }, + BoardAction::Task(Some(id), Some(task)) => { + log::debug!("Editing Task {id:?}: {task:?}"); + let operation = BoardChange::Task(id, Some(task)); + let _id = operation.store_in_redis(rconn, &self.key).await; + }, + BoardAction::Task(Some(id), None) => { + log::debug!("Deleting Task {id:?}..."); + let operation = BoardChange::Task(id, None); + let _id = operation.store_in_redis(rconn, &self.key).await; + }, + _ => { + log::warn!("Received unknown BoardRequest: {self:?}"); + } + } + } +} + +/// An action that can be performed on a board. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[non_exhaustive] +pub enum BoardAction { + /// Set the board's title. + Title(String), + /// Create a new [`Task`], or update or delete the task with the given id. + Task(Option, Option), +} diff --git a/todored/src/routes/board/ws.rs b/todored/src/routes/board/ws.rs new file mode 100644 index 0000000..ffea2ae --- /dev/null +++ b/todored/src/routes/board/ws.rs @@ -0,0 +1,52 @@ +use axum::extract::ws::{Message, WebSocket}; +use tokio::sync::RwLock; +use std::sync::Arc; +use deadqueue::unlimited::Queue; +use futures_util::{SinkExt, StreamExt}; +use super::{redis_xread, redis_xadd, ws_receive, ws_send}; + +pub async fn handler( + board_name: String, + rclient: redis::Client, + websocket: WebSocket, +) { + log::trace!("Splitting socket into two separate pipes..."); + let (mut sender, receiver) = websocket.split(); + + log::trace!("Creating Redis connection for the reader thread..."); + let reader_redis = rclient.get_async_connection().await; + if reader_redis.is_err() { + log::error!("Could not open Redis connection for the reader thread."); + let _ = sender.close().await; + return; + } + let reader_redis = reader_redis.unwrap(); + log::trace!("Created Redis connection for the reader thread!"); + + log::trace!("Creating Redis connection for the writer thread..."); + let writer_redis = rclient.get_async_connection().await; + if writer_redis.is_err() { + log::error!("Could not open Redis connection for the writer thread."); + let _ = sender.close().await; + return; + } + let writer_redis = writer_redis.unwrap(); + log::trace!("Created Redis connection for the writer thread!"); + + let is_ws_closing: Arc> = Arc::new(RwLock::new(false)); + let strings_to_process: Arc> = Arc::new(Queue::new()); + let messages_to_send: Arc> = Arc::new(Queue::new()); + + let ws_receive_thread = tokio::spawn(ws_receive::handler( + receiver, + is_ws_closing.clone(), + strings_to_process.clone(), + messages_to_send.clone(), + )); + + todo!(); + let ws_send_thread = tokio::spawn(ws_send::handler(sender)); + let redis_xadd_thread = tokio::spawn(redis_xadd::handler()); + let redis_xread_thread = tokio::spawn(redis_xread::handler()); + +} diff --git a/todored/src/routes/board/ws_receive.rs b/todored/src/routes/board/ws_receive.rs new file mode 100644 index 0000000..038dcb6 --- /dev/null +++ b/todored/src/routes/board/ws_receive.rs @@ -0,0 +1,67 @@ +use axum::extract::ws::{CloseFrame, Message, WebSocket}; +use futures_util::stream::SplitStream; +use deadqueue::unlimited::Queue; +use tokio::sync::RwLock; +use std::sync::Arc; +use futures_util::StreamExt; + +pub async fn handler( + mut receiver: SplitStream, + is_ws_closing: Arc>, + strings_to_process: Arc>, + messages_to_send: Arc>, +) { + log::trace!("ws_receive thread started!"); + + loop { + log::trace!("Awaiting data from the websocket..."); + let value = receiver.next().await; + log::trace!("Received from websocket: {value:?}"); + + log::trace!("Checking if the websocket timed out..."); + if value.is_none() { + log::debug!("Websocket timed out, closing connection."); + let is_ws_closing = is_ws_closing.write().await; + *is_ws_closing = true; + messages_to_send.push(Message::Close(Some(CloseFrame { code: 1001, reason: "Timed out".into() }))); + return; + } + let value = value.unwrap(); + + log::trace!("Checking if websocket returned an error..."); + if let Err(err) = value { + log::error!("Websocket returned error: {err:?}"); + return; + } + let value = value.unwrap(); + + log::trace!("Delegating websocket message..."); + match value { + Message::Text(msg) => { + log::trace!("Received a string, delegating to message handler."); + strings_to_process.push(msg); + } + Message::Binary(_) => { + log::warn!("Received a binary, closing connection."); + let is_ws_closing = is_ws_closing.write().await; + *is_ws_closing = true; + messages_to_send.push(Message::Close(Some(CloseFrame { code: 1003, reason: "Binary is unsupported".into() }))); + return; + } + Message::Ping(vec) => { + log::trace!("Received a ping, delegating to pong handler."); + messages_to_send.push(Message::Pong(vec)); + } + Message::Pong(_) => { + log::warn!("Received a pong, ignoring.") + } + Message::Close(_) => { + log::debug!("Received a close, closing connection."); + let is_ws_closing = is_ws_closing.write().await; + *is_ws_closing = true; + messages_to_send.push(Message::Close(Some(CloseFrame { code: 1000, reason: "Closing as requested".into() }))); + return; + } + } + } +} diff --git a/todored/src/routes/board/ws_send.rs b/todored/src/routes/board/ws_send.rs new file mode 100644 index 0000000..e69de29 diff --git a/todored/src/routes/root.rs b/todored/src/routes/root.rs index cc9aa65..b486052 100644 --- a/todored/src/routes/root.rs +++ b/todored/src/routes/root.rs @@ -1,7 +1,7 @@ use axum::{Extension, Json}; use axum::http::StatusCode; -use crate::utils::{RedisConnectOr500, UnwrapOr500, Result}; +use crate::outcome::{Response, LoggableOutcome}; const MAJOR: u32 = pkg_version::pkg_version_major!(); const MINOR: u32 = pkg_version::pkg_version_minor!(); @@ -12,24 +12,28 @@ fn compose_version() -> String { } -pub async fn version() -> Json { - Json(compose_version()) +pub async fn version() -> Response> { + Ok(Json(compose_version())) } pub async fn healthcheck( Extension(rclient): Extension -) -> Result> { - let mut rconn = rclient.get_connection_or_500().await?; +) -> Response> { + let mut rconn = rclient.get_async_connection().await + .log_err_to_error("Failed to connect to Redis") + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; log::trace!("Sending PING..."); let response = redis::cmd("PING") .query_async::(&mut rconn).await - .expect_or_500_and_log("Failed to PING Redis")?; + .log_err_to_error("Failed to PING Redis") + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; log::trace!("Sent PING and received: {:?}", response); + response.eq("PONG") + .then_some(()) + .log_err_to_error("Received invalid PONG from Redis") + .ok_or_else(|| StatusCode::INTERNAL_SERVER_ERROR)?; - match response == "PONG" { - false => Err(StatusCode::INTERNAL_SERVER_ERROR), - true => Ok(Json(compose_version())), - } + Ok(Json(compose_version())) } diff --git a/todored/src/task.rs b/todored/src/task.rs index 38af47f..93c9224 100644 --- a/todored/src/task.rs +++ b/todored/src/task.rs @@ -1,4 +1,59 @@ +//! Module with containers related to boards and tasks. + use serde::{Deserialize, Serialize}; +use crate::outcome::LoggableOutcome; +use uuid::Uuid; + +/// A change to a board's contents. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[non_exhaustive] +pub enum BoardChange { + /// Set the board's title. + Title(String), + /// Set the board's grouping. + Group(BoardGrouping), + /// Create, update, or delete the [`Task`] with the given [`Uuid`]. + Task(Uuid, Option), +} + +impl BoardChange { + /// Store this in Redis. + pub(crate) async fn store_in_redis(&self, rconn: &mut redis::aio::Connection, key: &str) -> Result { + log::debug!("Storing BoardOperation in Redis: {:?}", &self); + + log::trace!("Serializing BoardOperation to JSON..."); + let operation = serde_json::ser::to_string(self) + .log_err_to_error("Failed to serialize BoardOperation") + .map_err(|_| ())?; + + log::trace!("Adding to the Redis stream {stream_key:?}..."); + let id = redis::cmd("XADD") + .arg(key) + .arg("*") + .arg("operation") + .arg(operation) + .query_async::(rconn).await + .log_err_to_error("Failed to XADD to Redis") + .map_err(|_| ())?; + + log::trace!("Added to Redis stream with id {id:?}!"); + Ok(id) + } +} + +/// A possible grouping of a board's tasks. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[non_exhaustive] +pub enum BoardGrouping { + /// Group tasks by icon. + Icon, + /// Group tasks by importance. + Importance, + /// Group tasks by priority. + Priority, + /// Group tasks by status. + Status, +} /// A task that can be displayed on the board. #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/todored/src/utils.rs b/todored/src/utils.rs deleted file mode 100644 index 8401405..0000000 --- a/todored/src/utils.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::fmt::Debug; -use async_trait::async_trait; -use axum::http::StatusCode; - -pub type Result = std::result::Result; - -pub(crate) trait UnwrapOr500 { - fn unwrap_or_500(self) -> Result; - fn unwrap_or_500_and_log(self) -> Result where E: Debug; - fn expect_or_500_and_log(self, text: &str) -> Result where E: Debug; -} - -impl UnwrapOr500 for std::result::Result { - fn unwrap_or_500(self) -> Result { - self.or(Err(StatusCode::INTERNAL_SERVER_ERROR)) - } - - fn unwrap_or_500_and_log(self) -> Result where E: Debug { - self.map_err(|e| { - log::error!("{e:?}"); - e - }).unwrap_or_500() - } - - fn expect_or_500_and_log(self, text: &str) -> Result where E: Debug { - self.map_err(|e| { - log::error!("{text}: {e:?}"); - e - }).unwrap_or_500() - } -} - -#[async_trait] -pub(crate) trait RedisConnectOr500 { - async fn get_connection_or_500(&self) -> Result; -} - -#[async_trait] -impl RedisConnectOr500 for redis::Client { - async fn get_connection_or_500(&self) -> Result { - log::trace!("Connecting to Redis..."); - - let rconn = self.get_async_connection().await - .expect_or_500_and_log("Failed to connect to Redis")?; - - log::trace!("Connection successful!"); - Ok(rconn) - } -}