From 3ee1152c9f45695cd99eb2dbdb5536ae44f37760 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Sun, 30 Jul 2023 02:24:43 +0200 Subject: [PATCH] Create initial board server page --- todored/Cargo.lock | 1 + todored/Cargo.toml | 1 + todored/src/main.rs | 1 + todored/src/routes/board.rs | 68 +++++++++++++++++++++++++++++++++++++ todored/src/routes/mod.rs | 3 +- 5 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 todored/src/routes/board.rs diff --git a/todored/Cargo.lock b/todored/Cargo.lock index 9a516ae..9dbb3b9 100644 --- a/todored/Cargo.lock +++ b/todored/Cargo.lock @@ -1041,6 +1041,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "futures-util", "log", "micronfig", "pkg-version", diff --git a/todored/Cargo.toml b/todored/Cargo.toml index 8c18728..e76d818 100644 --- a/todored/Cargo.toml +++ b/todored/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] async-trait = "0.1.72" axum = { version = "0.6.19", features = ["ws"] } +futures-util = "0.3.28" log = "0.4.19" micronfig = "0.2.0" pkg-version = "1.0.0" diff --git a/todored/src/main.rs b/todored/src/main.rs index 9a15ce5..0e024e0 100644 --- a/todored/src/main.rs +++ b/todored/src/main.rs @@ -21,6 +21,7 @@ async fn main() { .route("/version", get(routes::root::version)) .route("/", post(routes::root::healthcheck)) .route("/healthcheck", post(routes::root::healthcheck)) + .route("/board/:board/ws", get(routes::board::websocket)) .layer(axum::Extension(rclient)) .layer(tower_http::cors::CorsLayer::new() .allow_origin( diff --git a/todored/src/routes/board.rs b/todored/src/routes/board.rs new file mode 100644 index 0000000..0327d90 --- /dev/null +++ b/todored/src/routes/board.rs @@ -0,0 +1,68 @@ +use axum::Extension; +use axum::extract::{Path, WebSocketUpgrade}; +use axum::extract::ws::{Message, WebSocket}; +use futures_util::{SinkExt, stream::{StreamExt, SplitSink, SplitStream}}; +use axum::response::Response; + + +pub(crate) async fn websocket( + Path(board): Path, + Extension(rclient): Extension, + ws: WebSocketUpgrade, +) -> Response { + log::trace!("Received websocket request, upgrading..."); + ws.on_upgrade(|socket| splitter(socket, rclient, board)) +} + +async fn splitter( + socket: WebSocket, + rclient: redis::Client, + board: String, +) { + log::trace!("Splitting socket into two separate pipes..."); + let (mut sender, receiver) = socket.split(); + + log::trace!("Creating Redis connection for the reader thread..."); + let reader_redis = rclient.get_async_connection().await; + if reader_redis.is_err() { + log::error!("Could not open Redis connection for the reader thread."); + let _ = sender.close().await; + return; + } + let reader_redis = reader_redis.unwrap(); + log::trace!("Created Redis connection for the reader thread!"); + + log::trace!("Creating Redis connection for the writer thread..."); + let writer_redis = rclient.get_async_connection().await; + if writer_redis.is_err() { + log::error!("Could not open Redis connection for the writer thread."); + let _ = sender.close().await; + return; + } + let writer_redis = writer_redis.unwrap(); + log::trace!("Created Redis connection for the writer thread!"); + + let reader_thread = tokio::spawn(reader(receiver, reader_redis)); + let writer_thread = tokio::spawn(writer(sender, writer_redis)); +} + +async fn reader( + receiver: SplitStream, + reader_redis: redis::aio::Connection, +) -> SplitStream { + log::trace!("Reader thread spawned successfully!"); + todo!() +} + +async fn writer( + mut sender: SplitSink, + writer_redis: redis::aio::Connection, +) -> SplitSink { + log::trace!("Writer thread spawned successfully!"); + + log::trace!("Sending test message..."); + let _ = sender.send(Message::Text("\"Garasauto\"".to_string())).await; + log::trace!("Sent test message!"); + + todo!() +} \ No newline at end of file diff --git a/todored/src/routes/mod.rs b/todored/src/routes/mod.rs index b7ca40a..0de7e88 100644 --- a/todored/src/routes/mod.rs +++ b/todored/src/routes/mod.rs @@ -1 +1,2 @@ -pub(crate) mod root; \ No newline at end of file +pub(crate) mod root; +pub(crate) mod board;