1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-11-22 08:14:18 +00:00

*Git refused to commit for a while*

This commit is contained in:
Steffo 2023-07-31 16:57:38 +02:00
parent 345d59268d
commit 3ff50d8584
Signed by: steffo
GPG key ID: 2A24051445686895
17 changed files with 456 additions and 186 deletions

34
todored/Cargo.lock generated
View file

@ -207,6 +207,25 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
dependencies = [
"cfg-if",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -223,6 +242,16 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "deadqueue"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16a2561fd313df162315935989dceb8c99db4ee1933358270a57a3cfb8c957f3"
dependencies = [
"crossbeam-queue",
"tokio",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -1039,8 +1068,8 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
name = "todored"
version = "0.1.0"
dependencies = [
"async-trait",
"axum",
"deadqueue",
"futures-util",
"log",
"micronfig",
@ -1048,6 +1077,7 @@ dependencies = [
"pretty_env_logger",
"redis",
"serde",
"serde_json",
"tokio",
"tower-http",
"uuid",
@ -1261,8 +1291,8 @@ version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"getrandom",
"serde",
"sha1_smol",
]
[[package]]

View file

@ -6,8 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.72"
axum = { version = "0.6.19", features = ["ws"] }
deadqueue = "0.2.4"
futures-util = "0.3.28"
log = "0.4.19"
micronfig = "0.2.0"
@ -15,5 +15,7 @@ pkg-version = "1.0.0"
pretty_env_logger = "0.5.0"
redis = { version = "0.23.1", features = ["r2d2", "ahash", "cluster", "tokio-comp", "connection-manager"] }
serde = { version = "1.0.178", features = ["derive"] }
serde_json = "1.0.104"
tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] }
tower-http = { version = "0.4.3", features = ["cors"] }
uuid = { version = "1.4.1", features = ["serde", "v4"] }

View file

@ -2,11 +2,10 @@ use std::str::FromStr;
use axum::routing::{get, post};
pub mod outcome;
pub mod task;
pub(crate) mod op;
mod config;
pub(crate) mod config;
mod routes;
mod utils;
#[tokio::main]
async fn main() {
@ -40,4 +39,4 @@ async fn main() {
.serve(router.into_make_service())
.await
.expect("to be able to run the Axum server");
}
}

View file

@ -1,53 +0,0 @@
use serde::{Deserialize, Serialize};
use super::task::Task;
/// An operation sent from the client to the server.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ClientOperation {
/// Set the board's title.
Title(String),
/// Create a new [`Task`], or update or delete the task with the given [`Uuid`].
Task(Option<String>, Option<Task>),
}
/// An operation sent from the server to the clients, and stored on the database.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BoardOperation {
/// Set the board's title.
Title(String),
/// Create, update, or delete the [`Task`] with the given [`Uuid`].
Task(String, Option<Task>),
}
impl ClientOperation {
fn handle(self) {
todo!()
}
}
impl BoardOperation {
/// Store this in Redis.
pub(crate) async fn store_or_500(&self, rconn: &mut redis::aio::Connection, board: &str) -> Result<String> {
log::debug!("Storing BoardOperation in Redis: {:?}", &self);
log::trace!("Computing Redis key...");
let stream_key = format!("board:{{{board}}}:stream");
log::trace!("Serializing BoardOperation to JSON...");
let operation = serde_json::ser::to_string(self)
.expect_or_500_and_log("Failed to serialize BoardOperation")?;
log::trace!("Adding to the Redis stream {stream_key:?}...");
let id = redis::cmd("XADD")
.arg(stream_key)
.arg("*")
.arg("operation")
.arg(operation)
.query_async::<redis::aio::Connection, String>(rconn).await
.expect_or_500_and_log("Failed to XADD to Redis")?;
log::trace!("Added to Redis stream with id {id:?}!");
Ok(id)
}
}

91
todored/src/outcome.rs Normal file
View file

@ -0,0 +1,91 @@
//! Module containing various utility type and trait definitions related to error handling.
use std::fmt::Debug;
use axum::http::StatusCode;
pub type ResponseError = StatusCode;
pub type Response<T> = Result<T, ResponseError>;
/// Trait to easily [`log`] function outcomes.
pub(crate) trait LoggableOutcome {
fn log_err_to_trace(self, msg: &str) -> Self;
fn log_err_to_debug(self, msg: &str) -> Self;
fn log_err_to_info(self, msg: &str) -> Self;
fn log_err_to_warn(self, msg: &str) -> Self;
fn log_err_to_error(self, msg: &str) -> Self;
}
impl<T, E> LoggableOutcome for Result<T, E> where E: Debug {
fn log_err_to_trace(self, msg: &str) -> Self {
self.map_err(|err| {
log::trace!("{msg}: {err:?}");
err
})
}
fn log_err_to_debug(self, msg: &str) -> Self {
self.map_err(|err| {
log::debug!("{msg}: {err:?}");
err
})
}
fn log_err_to_info(self, msg: &str) -> Self {
self.map_err(|err| {
log::info!("{msg}: {err:?}");
err
})
}
fn log_err_to_warn(self, msg: &str) -> Self {
self.map_err(|err| {
log::warn!("{msg}: {err:?}");
err
})
}
fn log_err_to_error(self, msg: &str) -> Self {
self.map_err(|err| {
log::error!("{msg}: {err:?}");
err
})
}
}
impl<T> LoggableOutcome for Option<T> {
fn log_err_to_trace(self, msg: &str) -> Self {
if self.is_none() {
log::trace!("{msg}");
}
self
}
fn log_err_to_debug(self, msg: &str) -> Self {
if self.is_none() {
log::debug!("{msg}");
}
self
}
fn log_err_to_info(self, msg: &str) -> Self {
if self.is_none() {
log::info!("{msg}");
}
self
}
fn log_err_to_warn(self, msg: &str) -> Self {
if self.is_none() {
log::warn!("{msg}");
}
self
}
fn log_err_to_error(self, msg: &str) -> Self {
if self.is_none() {
log::error!("{msg}");
}
self
}
}

View file

@ -1,67 +0,0 @@
use axum::Extension;
use axum::extract::{Path, WebSocketUpgrade};
use axum::extract::ws::{Message, WebSocket};
use axum::response::Response;
use futures_util::{SinkExt, stream::{SplitSink, SplitStream, StreamExt}};
pub(crate) async fn websocket(
Path(board): Path<String>,
Extension(rclient): Extension<redis::Client>,
ws: WebSocketUpgrade,
) -> Response {
log::trace!("Received websocket request, upgrading...");
ws.on_upgrade(|socket| splitter(socket, rclient, board))
}
async fn splitter(
socket: WebSocket,
rclient: redis::Client,
board: String,
) {
log::trace!("Splitting socket into two separate pipes...");
let (mut sender, receiver) = socket.split();
log::trace!("Creating Redis connection for the reader thread...");
let reader_redis = rclient.get_async_connection().await;
if reader_redis.is_err() {
log::error!("Could not open Redis connection for the reader thread.");
let _ = sender.close().await;
return;
}
let reader_redis = reader_redis.unwrap();
log::trace!("Created Redis connection for the reader thread!");
log::trace!("Creating Redis connection for the writer thread...");
let writer_redis = rclient.get_async_connection().await;
if writer_redis.is_err() {
log::error!("Could not open Redis connection for the writer thread.");
let _ = sender.close().await;
return;
}
let writer_redis = writer_redis.unwrap();
log::trace!("Created Redis connection for the writer thread!");
let reader_thread = tokio::spawn(reader(receiver, reader_redis));
let writer_thread = tokio::spawn(writer(sender, writer_redis));
}
async fn reader(
receiver: SplitStream<WebSocket>,
reader_redis: redis::aio::Connection,
) -> SplitStream<WebSocket> {
log::trace!("Reader thread spawned successfully!");
todo!()
}
async fn writer(
mut sender: SplitSink<WebSocket, Message>,
writer_redis: redis::aio::Connection,
) -> SplitSink<WebSocket, Message> {
log::trace!("Writer thread spawned successfully!");
log::trace!("Sending test message...");
let _ = sender.send(Message::Text("\"Garasauto\"".to_string())).await;
log::trace!("Sent test message!");
todo!()
}

View file

@ -0,0 +1,12 @@
use axum::Extension;
use axum::extract::{Path, WebSocketUpgrade};
use super::ws;
pub(crate) async fn handler(
Path(board): Path<String>,
Extension(rclient): Extension<redis::Client>,
upgrade_request: WebSocketUpgrade,
) -> axum::response::Response {
log::trace!("Received websocket request, upgrading...");
upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket))
}

View file

@ -0,0 +1,7 @@
pub mod structs;
pub(crate) mod axum;
pub(self) mod ws;
pub(self) mod ws_receive;
pub(self) mod redis_xadd;
pub(self) mod redis_xread;
pub(self) mod ws_send;

View file

@ -0,0 +1,38 @@
use axum::extract::ws::{Message, WebSocket};
use futures_util::stream::SplitStream;
use crate::routes::board::structs::BoardAction;
pub async fn handler(
mut receiver: SplitStream<WebSocket>,
mut rconn: redis::aio::Connection,
board_name: &str,
) -> Result<SplitStream<WebSocket>, ()> {
log::trace!("Handling websocket frame...");
match value {
Message::Text(value) => {
log::trace!("Trying to parse value from websocket as a BoardRequest...");
let action = serde_json::de::from_str::<BoardAction>(&value);
if let Err(err) = action {
log::error!("Could not parse value received from websocket as a BoardRequest: {err:?}");
return receiver;
}
let value = action.unwrap();
BoardRequest {
name:
}
value.handle(&mut rconn).await;
}
Message::Binary(_) => {}
Message::Ping(_) => {}
Message::Pong(_) => {}
Message::Close(value) => {
log::debug!("Client closed websocket: {value:?}");
return receiver;
}
}
}
}

View file

@ -0,0 +1,26 @@
use axum::extract::ws::{Message, WebSocket};
use futures_util::stream::SplitSink;
pub async fn handler(
mut sender: SplitSink<WebSocket, Message>,
mut rconn: redis::aio::Connection,
board_name: &str,
) -> SplitSink<WebSocket, Message> {
log::trace!("Writer thread spawned successfully!");
log::trace!("Computing Redis key...");
let stream_key = format!("board:{{{board}}}:stream");
loop {
let response = redis::cmd("XREAD")
.arg(&stream_key)
.arg("COUNT")
.arg(1)
.arg("BLOCK")
.arg(30000)
}
log::trace!("Nothing to do, returning...");
sender
}

View file

@ -0,0 +1,56 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::task::{BoardChange, Task};
/// A request sent from a client to the server to perform a [`BoardAction`] on a board.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BoardRequest {
/// The Redis key to act on.
key: String,
/// The [`BoardAction`] to perform.
action: BoardAction,
}
impl BoardRequest {
pub(crate) async fn handle(self, rconn: &mut redis::aio::Connection) {
log::trace!("Handling BoardRequest: {self:?}");
match self.action {
BoardAction::Title(title) => {
log::debug!("Setting board Title: {title:?}");
let operation = BoardChange::Title(title);
let _id = operation.store_in_redis(rconn, &self.key).await;
},
BoardAction::Task(None, Some(task)) => {
log::debug!("Creating Task: {task:?}");
let id = Uuid::new_v4();
log::trace!("Assigned id {id:?} to Task: {task:?}");
let operation = BoardChange::Task(id, Some(task));
let _id = operation.store_in_redis(rconn, &self.key).await;
},
BoardAction::Task(Some(id), Some(task)) => {
log::debug!("Editing Task {id:?}: {task:?}");
let operation = BoardChange::Task(id, Some(task));
let _id = operation.store_in_redis(rconn, &self.key).await;
},
BoardAction::Task(Some(id), None) => {
log::debug!("Deleting Task {id:?}...");
let operation = BoardChange::Task(id, None);
let _id = operation.store_in_redis(rconn, &self.key).await;
},
_ => {
log::warn!("Received unknown BoardRequest: {self:?}");
}
}
}
}
/// An action that can be performed on a board.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub enum BoardAction {
/// Set the board's title.
Title(String),
/// Create a new [`Task`], or update or delete the task with the given id.
Task(Option<Uuid>, Option<Task>),
}

View file

@ -0,0 +1,52 @@
use axum::extract::ws::{Message, WebSocket};
use tokio::sync::RwLock;
use std::sync::Arc;
use deadqueue::unlimited::Queue;
use futures_util::{SinkExt, StreamExt};
use super::{redis_xread, redis_xadd, ws_receive, ws_send};
pub async fn handler(
board_name: String,
rclient: redis::Client,
websocket: WebSocket,
) {
log::trace!("Splitting socket into two separate pipes...");
let (mut sender, receiver) = websocket.split();
log::trace!("Creating Redis connection for the reader thread...");
let reader_redis = rclient.get_async_connection().await;
if reader_redis.is_err() {
log::error!("Could not open Redis connection for the reader thread.");
let _ = sender.close().await;
return;
}
let reader_redis = reader_redis.unwrap();
log::trace!("Created Redis connection for the reader thread!");
log::trace!("Creating Redis connection for the writer thread...");
let writer_redis = rclient.get_async_connection().await;
if writer_redis.is_err() {
log::error!("Could not open Redis connection for the writer thread.");
let _ = sender.close().await;
return;
}
let writer_redis = writer_redis.unwrap();
log::trace!("Created Redis connection for the writer thread!");
let is_ws_closing: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
let strings_to_process: Arc<Queue<String>> = Arc::new(Queue::new());
let messages_to_send: Arc<Queue<Message>> = Arc::new(Queue::new());
let ws_receive_thread = tokio::spawn(ws_receive::handler(
receiver,
is_ws_closing.clone(),
strings_to_process.clone(),
messages_to_send.clone(),
));
todo!();
let ws_send_thread = tokio::spawn(ws_send::handler(sender));
let redis_xadd_thread = tokio::spawn(redis_xadd::handler());
let redis_xread_thread = tokio::spawn(redis_xread::handler());
}

View file

@ -0,0 +1,67 @@
use axum::extract::ws::{CloseFrame, Message, WebSocket};
use futures_util::stream::SplitStream;
use deadqueue::unlimited::Queue;
use tokio::sync::RwLock;
use std::sync::Arc;
use futures_util::StreamExt;
pub async fn handler(
mut receiver: SplitStream<WebSocket>,
is_ws_closing: Arc<RwLock<bool>>,
strings_to_process: Arc<Queue<String>>,
messages_to_send: Arc<Queue<Message>>,
) {
log::trace!("ws_receive thread started!");
loop {
log::trace!("Awaiting data from the websocket...");
let value = receiver.next().await;
log::trace!("Received from websocket: {value:?}");
log::trace!("Checking if the websocket timed out...");
if value.is_none() {
log::debug!("Websocket timed out, closing connection.");
let is_ws_closing = is_ws_closing.write().await;
*is_ws_closing = true;
messages_to_send.push(Message::Close(Some(CloseFrame { code: 1001, reason: "Timed out".into() })));
return;
}
let value = value.unwrap();
log::trace!("Checking if websocket returned an error...");
if let Err(err) = value {
log::error!("Websocket returned error: {err:?}");
return;
}
let value = value.unwrap();
log::trace!("Delegating websocket message...");
match value {
Message::Text(msg) => {
log::trace!("Received a string, delegating to message handler.");
strings_to_process.push(msg);
}
Message::Binary(_) => {
log::warn!("Received a binary, closing connection.");
let is_ws_closing = is_ws_closing.write().await;
*is_ws_closing = true;
messages_to_send.push(Message::Close(Some(CloseFrame { code: 1003, reason: "Binary is unsupported".into() })));
return;
}
Message::Ping(vec) => {
log::trace!("Received a ping, delegating to pong handler.");
messages_to_send.push(Message::Pong(vec));
}
Message::Pong(_) => {
log::warn!("Received a pong, ignoring.")
}
Message::Close(_) => {
log::debug!("Received a close, closing connection.");
let is_ws_closing = is_ws_closing.write().await;
*is_ws_closing = true;
messages_to_send.push(Message::Close(Some(CloseFrame { code: 1000, reason: "Closing as requested".into() })));
return;
}
}
}
}

View file

View file

@ -1,7 +1,7 @@
use axum::{Extension, Json};
use axum::http::StatusCode;
use crate::utils::{RedisConnectOr500, UnwrapOr500, Result};
use crate::outcome::{Response, LoggableOutcome};
const MAJOR: u32 = pkg_version::pkg_version_major!();
const MINOR: u32 = pkg_version::pkg_version_minor!();
@ -12,24 +12,28 @@ fn compose_version() -> String {
}
pub async fn version() -> Json<String> {
Json(compose_version())
pub async fn version() -> Response<Json<String>> {
Ok(Json(compose_version()))
}
pub async fn healthcheck(
Extension(rclient): Extension<redis::Client>
) -> Result<Json<String>> {
let mut rconn = rclient.get_connection_or_500().await?;
) -> Response<Json<String>> {
let mut rconn = rclient.get_async_connection().await
.log_err_to_error("Failed to connect to Redis")
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
log::trace!("Sending PING...");
let response = redis::cmd("PING")
.query_async::<redis::aio::Connection, String>(&mut rconn).await
.expect_or_500_and_log("Failed to PING Redis")?;
.log_err_to_error("Failed to PING Redis")
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
log::trace!("Sent PING and received: {:?}", response);
response.eq("PONG")
.then_some(())
.log_err_to_error("Received invalid PONG from Redis")
.ok_or_else(|| StatusCode::INTERNAL_SERVER_ERROR)?;
match response == "PONG" {
false => Err(StatusCode::INTERNAL_SERVER_ERROR),
true => Ok(Json(compose_version())),
}
Ok(Json(compose_version()))
}

View file

@ -1,4 +1,59 @@
//! Module with containers related to boards and tasks.
use serde::{Deserialize, Serialize};
use crate::outcome::LoggableOutcome;
use uuid::Uuid;
/// A change to a board's contents.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub enum BoardChange {
/// Set the board's title.
Title(String),
/// Set the board's grouping.
Group(BoardGrouping),
/// Create, update, or delete the [`Task`] with the given [`Uuid`].
Task(Uuid, Option<Task>),
}
impl BoardChange {
/// Store this in Redis.
pub(crate) async fn store_in_redis(&self, rconn: &mut redis::aio::Connection, key: &str) -> Result<String, ()> {
log::debug!("Storing BoardOperation in Redis: {:?}", &self);
log::trace!("Serializing BoardOperation to JSON...");
let operation = serde_json::ser::to_string(self)
.log_err_to_error("Failed to serialize BoardOperation")
.map_err(|_| ())?;
log::trace!("Adding to the Redis stream {stream_key:?}...");
let id = redis::cmd("XADD")
.arg(key)
.arg("*")
.arg("operation")
.arg(operation)
.query_async::<redis::aio::Connection, String>(rconn).await
.log_err_to_error("Failed to XADD to Redis")
.map_err(|_| ())?;
log::trace!("Added to Redis stream with id {id:?}!");
Ok(id)
}
}
/// A possible grouping of a board's tasks.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub enum BoardGrouping {
/// Group tasks by icon.
Icon,
/// Group tasks by importance.
Importance,
/// Group tasks by priority.
Priority,
/// Group tasks by status.
Status,
}
/// A task that can be displayed on the board.
#[derive(Clone, Debug, Serialize, Deserialize)]

View file

@ -1,49 +0,0 @@
use std::fmt::Debug;
use async_trait::async_trait;
use axum::http::StatusCode;
pub type Result<T> = std::result::Result<T, StatusCode>;
pub(crate) trait UnwrapOr500<T, E> {
fn unwrap_or_500(self) -> Result<T>;
fn unwrap_or_500_and_log(self) -> Result<T> where E: Debug;
fn expect_or_500_and_log(self, text: &str) -> Result<T> where E: Debug;
}
impl<T, E> UnwrapOr500<T, E> for std::result::Result<T, E> {
fn unwrap_or_500(self) -> Result<T> {
self.or(Err(StatusCode::INTERNAL_SERVER_ERROR))
}
fn unwrap_or_500_and_log(self) -> Result<T> where E: Debug {
self.map_err(|e| {
log::error!("{e:?}");
e
}).unwrap_or_500()
}
fn expect_or_500_and_log(self, text: &str) -> Result<T> where E: Debug {
self.map_err(|e| {
log::error!("{text}: {e:?}");
e
}).unwrap_or_500()
}
}
#[async_trait]
pub(crate) trait RedisConnectOr500 {
async fn get_connection_or_500(&self) -> Result<redis::aio::Connection>;
}
#[async_trait]
impl RedisConnectOr500 for redis::Client {
async fn get_connection_or_500(&self) -> Result<redis::aio::Connection> {
log::trace!("Connecting to Redis...");
let rconn = self.get_async_connection().await
.expect_or_500_and_log("Failed to connect to Redis")?;
log::trace!("Connection successful!");
Ok(rconn)
}
}