1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-11-22 08:14:18 +00:00

Get /board/{board}/ws to work, probably

This commit is contained in:
Steffo 2023-07-31 20:26:37 +02:00
parent 3ff50d8584
commit 88dae2ed3b
Signed by: steffo
GPG key ID: 2A24051445686895
10 changed files with 192 additions and 108 deletions

View file

@ -3,10 +3,10 @@ import {default as React} from "react";
export interface UseWsHandlers { export interface UseWsHandlers {
onclose?: (event: CloseEvent) => void, onclose?: (sock: WebSocket, event: CloseEvent) => void,
onerror?: (event: Event) => void, onerror?: (sock: WebSocket, event: Event) => void,
onmessage?: (event: MessageEvent) => void, onmessage?: (sock: WebSocket, event: MessageEvent) => void,
onopen?: (event: Event) => void, onopen?: (sock: WebSocket, event: Event) => void,
} }
@ -15,10 +15,10 @@ export function useWs(url: string, {onclose, onerror, onmessage, onopen}: UseWsH
React.useEffect(() => { React.useEffect(() => {
const sock = new WebSocket(url); const sock = new WebSocket(url);
sock.onclose = onclose ?? null; sock.onclose = onclose ? (ev) => onclose(sock, ev) : null;
sock.onerror = onerror ?? null; sock.onerror = onerror ? (ev) => onerror(sock, ev) : null;
sock.onmessage = onmessage ?? null; sock.onmessage = onmessage ? (ev) => onmessage(sock, ev) : null;
sock.onopen = onopen ?? null; sock.onopen = onopen ? (ev) => onopen(sock, ev) : null;
setWebsocket(sock); setWebsocket(sock);
return () => { return () => {
sock.close(); sock.close();

View file

@ -20,7 +20,7 @@ async fn main() {
.route("/version", get(routes::root::version)) .route("/version", get(routes::root::version))
.route("/", post(routes::root::healthcheck)) .route("/", post(routes::root::healthcheck))
.route("/healthcheck", post(routes::root::healthcheck)) .route("/healthcheck", post(routes::root::healthcheck))
.route("/board/:board/ws", get(routes::board::websocket)) .route("/board/:board/ws", get(routes::board::board_websocket))
.layer(axum::Extension(rclient)) .layer(axum::Extension(rclient))
.layer(tower_http::cors::CorsLayer::new() .layer(tower_http::cors::CorsLayer::new()
.allow_origin( .allow_origin(

View file

@ -1,7 +1,9 @@
pub mod structs; pub mod structs;
pub(crate) mod axum; pub(self) mod axum;
pub(self) mod ws; pub(self) mod ws;
pub(self) mod ws_receive; pub(self) mod ws_receive;
pub(self) mod redis_xadd; pub(self) mod redis_xadd;
pub(self) mod redis_xread; pub(self) mod redis_xread;
pub(self) mod ws_send; pub(self) mod ws_send;
pub(crate) use self::axum::handler as board_websocket;

View file

@ -1,38 +1,31 @@
use axum::extract::ws::{Message, WebSocket}; use std::sync::Arc;
use futures_util::stream::SplitStream; use axum::extract::ws::CloseCode;
use crate::routes::board::structs::BoardAction; use deadqueue::unlimited::Queue;
use redis::aio::Connection;
use crate::routes::board::structs::{BoardAction, BoardRequest};
pub async fn handler( pub async fn handler(
mut receiver: SplitStream<WebSocket>, mut rconn: Connection,
mut rconn: redis::aio::Connection, key: String,
board_name: &str, strings_to_process: Arc<Queue<String>>,
) -> Result<SplitStream<WebSocket>, ()> { ) -> CloseCode {
log::trace!("Thread started!");
log::trace!("Handling websocket frame..."); loop {
match value { log::trace!("Waiting for strings to process...");
Message::Text(value) => { let message = strings_to_process.pop().await;
log::trace!("Trying to parse value from websocket as a BoardRequest...");
let action = serde_json::de::from_str::<BoardAction>(&value); log::trace!("Trying to parse string as a BoardAction...");
let action = serde_json::de::from_str::<BoardAction>(&message);
if let Err(err) = action { if let Err(err) = action {
log::error!("Could not parse value received from websocket as a BoardRequest: {err:?}"); log::error!("Could not parse value received from websocket as a BoardRequest, closing connection: {err:?}");
return receiver; return 1002;
} }
let value = action.unwrap(); let key = key.to_owned();
let action = action.unwrap();
BoardRequest { log::trace!("Handling BoardRequest...");
name: BoardRequest { key, action }.handle(&mut rconn).await;
}
value.handle(&mut rconn).await;
}
Message::Binary(_) => {}
Message::Ping(_) => {}
Message::Pong(_) => {}
Message::Close(value) => {
log::debug!("Client closed websocket: {value:?}");
return receiver;
}
}
} }
} }

View file

@ -1,26 +1,46 @@
use axum::extract::ws::{Message, WebSocket}; use std::sync::Arc;
use futures_util::stream::SplitSink; use axum::extract::ws::{CloseCode, Message};
use deadqueue::unlimited::Queue;
use redis::aio::Connection;
pub type XReadResult = (String, String, String, String);
pub async fn handler( pub async fn handler(
mut sender: SplitSink<WebSocket, Message>, mut rconn: Connection,
mut rconn: redis::aio::Connection, key: String,
board_name: &str, messages_to_send: Arc<Queue<Message>>,
) -> SplitSink<WebSocket, Message> { ) -> CloseCode {
log::trace!("Writer thread spawned successfully!"); log::trace!("Thread started!");
log::trace!("Computing Redis key..."); let mut seq = "0".to_string();
let stream_key = format!("board:{{{board}}}:stream");
loop { loop {
log::trace!("Waiting for events to broadcast for 5 seconds...");
let response = redis::cmd("XREAD") let response = redis::cmd("XREAD")
.arg(&stream_key)
.arg("COUNT") .arg("COUNT")
.arg(1) .arg(1)
.arg("BLOCK") .arg("BLOCK")
.arg(30000) .arg(5000)
.arg("STREAMS")
.arg(&key)
.arg(&seq)
.query_async::<Connection, Option<XReadResult>>(&mut rconn).await;
if let Err(err) = response {
log::error!("Could not XREAD Redis stream, closing connection: {err:?}");
return 1002;
} }
let response = response.unwrap();
log::trace!("Nothing to do, returning..."); if response.is_none() {
sender continue;
}
let response = response.unwrap();
seq = response.1;
let message = response.3;
log::trace!("Received event, sending it: {message:?}");
messages_to_send.push(Message::Text(message))
}
} }

View file

@ -6,9 +6,9 @@ use crate::task::{BoardChange, Task};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BoardRequest { pub struct BoardRequest {
/// The Redis key to act on. /// The Redis key to act on.
key: String, pub key: String,
/// The [`BoardAction`] to perform. /// The [`BoardAction`] to perform.
action: BoardAction, pub action: BoardAction,
} }
impl BoardRequest { impl BoardRequest {

View file

@ -1,8 +1,7 @@
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{CloseCode, CloseFrame, Message, WebSocket};
use tokio::sync::RwLock;
use std::sync::Arc; use std::sync::Arc;
use deadqueue::unlimited::Queue; use deadqueue::unlimited::Queue;
use futures_util::{SinkExt, StreamExt}; use futures_util::StreamExt;
use super::{redis_xread, redis_xadd, ws_receive, ws_send}; use super::{redis_xread, redis_xadd, ws_receive, ws_send};
pub async fn handler( pub async fn handler(
@ -10,43 +9,92 @@ pub async fn handler(
rclient: redis::Client, rclient: redis::Client,
websocket: WebSocket, websocket: WebSocket,
) { ) {
log::trace!("Creating Redis connection for the XADD thread...");
let xadd_redis = rclient.get_async_connection().await;
if xadd_redis.is_err() {
log::error!("Could not open Redis connection for the XADD thread.");
let _ = websocket.close().await;
return;
}
let xadd_redis = xadd_redis.unwrap();
log::trace!("Created Redis connection for the XADD thread!");
log::trace!("Creating Redis connection for the XREAD thread...");
let xread_redis = rclient.get_async_connection().await;
if xread_redis.is_err() {
log::error!("Could not open Redis connection for the XREAD thread.");
let _ = websocket.close().await;
return;
}
let xread_redis = xread_redis.unwrap();
log::trace!("Created Redis connection for the XREAD thread!");
log::trace!("Splitting socket into two separate pipes..."); log::trace!("Splitting socket into two separate pipes...");
let (mut sender, receiver) = websocket.split(); let (sender, receiver) = websocket.split();
log::trace!("Creating Redis connection for the reader thread..."); log::trace!("Determining Redis key to operate on...");
let reader_redis = rclient.get_async_connection().await; let redis_key = format!("board:{{{board_name}}}:stream");
if reader_redis.is_err() { log::trace!("Redis key is: {redis_key:?}");
log::error!("Could not open Redis connection for the reader thread.");
let _ = sender.close().await;
return;
}
let reader_redis = reader_redis.unwrap();
log::trace!("Created Redis connection for the reader thread!");
log::trace!("Creating Redis connection for the writer thread..."); log::trace!("Creating synchronization structures...");
let writer_redis = rclient.get_async_connection().await;
if writer_redis.is_err() {
log::error!("Could not open Redis connection for the writer thread.");
let _ = sender.close().await;
return;
}
let writer_redis = writer_redis.unwrap();
log::trace!("Created Redis connection for the writer thread!");
let is_ws_closing: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
let strings_to_process: Arc<Queue<String>> = Arc::new(Queue::new()); let strings_to_process: Arc<Queue<String>> = Arc::new(Queue::new());
let messages_to_send: Arc<Queue<Message>> = Arc::new(Queue::new()); let messages_to_send: Arc<Queue<Message>> = Arc::new(Queue::new());
log::trace!("Spawning ws_receive_thread...");
let ws_receive_thread = tokio::spawn(ws_receive::handler( let ws_receive_thread = tokio::spawn(ws_receive::handler(
receiver, receiver,
is_ws_closing.clone(),
strings_to_process.clone(), strings_to_process.clone(),
messages_to_send.clone(), messages_to_send.clone(),
)); ));
let ws_receive_abort = ws_receive_thread.abort_handle();
todo!(); log::trace!("Spawning ws_send_thread...");
let ws_send_thread = tokio::spawn(ws_send::handler(sender)); let ws_send_thread = tokio::spawn(ws_send::handler(
let redis_xadd_thread = tokio::spawn(redis_xadd::handler()); sender,
let redis_xread_thread = tokio::spawn(redis_xread::handler()); messages_to_send.clone(),
));
log::trace!("Spawning redis_xadd_thread...");
let redis_xadd_thread = tokio::spawn(redis_xadd::handler(
xadd_redis,
redis_key.clone(),
strings_to_process.clone()
));
let redis_xadd_abort = redis_xadd_thread.abort_handle();
log::trace!("Spawning redis_xread_thread...");
let redis_xread_thread = tokio::spawn(redis_xread::handler(
xread_redis,
redis_key,
messages_to_send.clone()
));
let redis_xread_abort = redis_xread_thread.abort_handle();
log::trace!("Waiting for the socket to close...");
tokio::select!(
cc = ws_receive_thread => {
close_with_code(messages_to_send, match cc { Ok(cc) => cc, _ => 1000 });
},
cc = redis_xadd_thread => {
close_with_code(messages_to_send, match cc { Ok(cc) => cc, _ => 1000 });
},
cc = redis_xread_thread => {
close_with_code(messages_to_send, match cc { Ok(cc) => cc, _ => 1000 });
},
);
ws_receive_abort.abort();
redis_xadd_abort.abort();
redis_xread_abort.abort();
log::trace!("Waiting for the last messages to be sent...");
let _ws_send_join = tokio::join!(ws_send_thread);
log::debug!("Websocket threads closed successfully!")
}
fn close_with_code(
messages_to_send: Arc<Queue<Message>>,
close_code: CloseCode,
) {
messages_to_send.push(Message::Close(Some(CloseFrame { code: close_code, reason: "".into() })));
} }

View file

@ -1,17 +1,15 @@
use axum::extract::ws::{CloseFrame, Message, WebSocket}; use axum::extract::ws::{CloseCode, Message, WebSocket};
use futures_util::stream::SplitStream; use futures_util::stream::SplitStream;
use deadqueue::unlimited::Queue; use deadqueue::unlimited::Queue;
use tokio::sync::RwLock;
use std::sync::Arc; use std::sync::Arc;
use futures_util::StreamExt; use futures_util::StreamExt;
pub async fn handler( pub async fn handler(
mut receiver: SplitStream<WebSocket>, mut receiver: SplitStream<WebSocket>,
is_ws_closing: Arc<RwLock<bool>>,
strings_to_process: Arc<Queue<String>>, strings_to_process: Arc<Queue<String>>,
messages_to_send: Arc<Queue<Message>>, messages_to_send: Arc<Queue<Message>>,
) { ) -> CloseCode {
log::trace!("ws_receive thread started!"); log::trace!("Thread started!");
loop { loop {
log::trace!("Awaiting data from the websocket..."); log::trace!("Awaiting data from the websocket...");
@ -21,46 +19,37 @@ pub async fn handler(
log::trace!("Checking if the websocket timed out..."); log::trace!("Checking if the websocket timed out...");
if value.is_none() { if value.is_none() {
log::debug!("Websocket timed out, closing connection."); log::debug!("Websocket timed out, closing connection.");
let is_ws_closing = is_ws_closing.write().await; return 1001;
*is_ws_closing = true;
messages_to_send.push(Message::Close(Some(CloseFrame { code: 1001, reason: "Timed out".into() })));
return;
} }
let value = value.unwrap(); let value = value.unwrap();
log::trace!("Checking if websocket returned an error..."); log::trace!("Checking if websocket returned an error...");
if let Err(err) = value { if let Err(err) = value {
log::error!("Websocket returned error: {err:?}"); log::error!("Websocket returned error: {err:?}");
return; return 1002;
} }
let value = value.unwrap(); let value = value.unwrap();
log::trace!("Delegating websocket message..."); log::trace!("Delegating websocket message...");
match value { match value {
Message::Text(msg) => { Message::Text(msg) => {
log::trace!("Received a string, delegating to message handler."); log::trace!("Received a string, delegating to message handler: {msg:?}");
strings_to_process.push(msg); strings_to_process.push(msg);
} }
Message::Binary(_) => { Message::Binary(_) => {
log::warn!("Received a binary, closing connection."); log::warn!("Received a binary, closing connection.");
let is_ws_closing = is_ws_closing.write().await; return 1003;
*is_ws_closing = true;
messages_to_send.push(Message::Close(Some(CloseFrame { code: 1003, reason: "Binary is unsupported".into() })));
return;
} }
Message::Ping(vec) => { Message::Ping(vec) => {
log::trace!("Received a ping, delegating to pong handler."); log::trace!("Received a ping, delegating to pong handler: {vec:?}");
messages_to_send.push(Message::Pong(vec)); messages_to_send.push(Message::Pong(vec));
} }
Message::Pong(_) => { Message::Pong(_) => {
log::warn!("Received a pong, ignoring.") log::warn!("Received a pong, ignoring.")
} }
Message::Close(_) => { Message::Close(cls) => {
log::debug!("Received a close, closing connection."); log::debug!("Received a close, closing connection: {cls:?}");
let is_ws_closing = is_ws_closing.write().await; return 1000;
*is_ws_closing = true;
messages_to_send.push(Message::Close(Some(CloseFrame { code: 1000, reason: "Closing as requested".into() })));
return;
} }
} }
} }

View file

@ -0,0 +1,32 @@
use std::sync::Arc;
use axum::extract::ws::{Message, WebSocket};
use deadqueue::unlimited::Queue;
use futures_util::SinkExt;
use futures_util::stream::SplitSink;
use crate::outcome::LoggableOutcome;
pub async fn handler(
mut sender: SplitSink<WebSocket, Message>,
messages_to_send: Arc<Queue<Message>>,
) {
log::trace!("Thread started!");
loop {
log::trace!("Awaiting data to send...");
let message = messages_to_send.pop().await;
let exit_when_done = match message {
Message::Close(_) => true,
_ => false,
};
log::trace!("Sending message: {message:?}");
let _ = sender.send(message).await
.log_err_to_warn("Could not send message");
if exit_when_done {
log::trace!("Done sending messages, shutting down...");
return;
}
}
}

View file

@ -26,7 +26,7 @@ impl BoardChange {
.log_err_to_error("Failed to serialize BoardOperation") .log_err_to_error("Failed to serialize BoardOperation")
.map_err(|_| ())?; .map_err(|_| ())?;
log::trace!("Adding to the Redis stream {stream_key:?}..."); log::trace!("Adding to the Redis stream {key:?}...");
let id = redis::cmd("XADD") let id = redis::cmd("XADD")
.arg(key) .arg(key)
.arg("*") .arg("*")