1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-11-25 01:34:18 +00:00

Get /board/{board}/ws to work, definitely

This commit is contained in:
Steffo 2023-08-01 01:52:12 +02:00
parent 88dae2ed3b
commit 2f3cc201c7
Signed by: steffo
GPG key ID: 2A24051445686895
9 changed files with 84 additions and 43 deletions

View file

@ -0,0 +1,19 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Check with clippy" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="command" value="clippy" />
<option name="workingDirectory" value="file://$PROJECT_DIR$/todored" />
<option name="emulateTerminal" value="false" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<envs />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

View file

@ -1,14 +1,16 @@
'use client'; 'use client';
import {default as React} from "react"; import {default as React} from "react";
import {useWs} from "@/app/board/[board]/useWs" import {useWs} from "@/app/board/[board]/useWs"
export function useBoard(url: string) { export function useBoard(url: string) {
const socket = useWs(url, { const socket = useWs(url, {
onopen: React.useCallback(() => { onopen: React.useCallback((sock: WebSocket, event: Event) => {
console.debug("[useBoard] Connected to the server!"); console.debug("[useBoard] Connected to the server!");
sock.send('{"Title": "sus"}')
}, []), }, []),
onmessage: React.useCallback((event: MessageEvent) => { onmessage: React.useCallback((sock: WebSocket, event: MessageEvent) => {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
console.debug("[useBoard] Received ServerOperation: ", data); console.debug("[useBoard] Received ServerOperation: ", data);
}, []) }, [])

View file

@ -35,7 +35,7 @@ async fn main() {
) )
); );
axum::Server::bind(&std::net::SocketAddr::from_str(&**config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr")) axum::Server::bind(&std::net::SocketAddr::from_str(&config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr"))
.serve(router.into_make_service()) .serve(router.into_make_service())
.await .await
.expect("to be able to run the Axum server"); .expect("to be able to run the Axum server");

View file

@ -26,6 +26,6 @@ pub async fn handler(
let action = action.unwrap(); let action = action.unwrap();
log::trace!("Handling BoardRequest..."); log::trace!("Handling BoardRequest...");
BoardRequest { key, action }.handle(&mut rconn).await; BoardRequest { board: key, action }.handle(&mut rconn).await;
} }
} }

View file

@ -2,8 +2,8 @@ use std::sync::Arc;
use axum::extract::ws::{CloseCode, Message}; use axum::extract::ws::{CloseCode, Message};
use deadqueue::unlimited::Queue; use deadqueue::unlimited::Queue;
use redis::aio::Connection; use redis::aio::Connection;
use redis::{AsyncCommands, FromRedisValue, RedisResult};
pub type XReadResult = (String, String, String, String); use redis::streams::{StreamReadOptions, StreamReadReply};
pub async fn handler( pub async fn handler(
mut rconn: Connection, mut rconn: Connection,
@ -16,31 +16,54 @@ pub async fn handler(
loop { loop {
log::trace!("Waiting for events to broadcast for 5 seconds..."); log::trace!("Waiting for events to broadcast for 5 seconds...");
let response = redis::cmd("XREAD") let response: RedisResult<StreamReadReply> = rconn.xread_options(
.arg("COUNT") &[&key],
.arg(1) &[&seq],
.arg("BLOCK") &StreamReadOptions::default().block(5000)
.arg(5000) ).await;
.arg("STREAMS")
.arg(&key)
.arg(&seq)
.query_async::<Connection, Option<XReadResult>>(&mut rconn).await;
if let Err(err) = response { match response {
log::error!("Could not XREAD Redis stream, closing connection: {err:?}"); Err(err) => {
return 1002; log::error!("Could not XREAD Redis stream, closing connection: {err:?}");
return 1002;
},
Ok(reply) => {
match reply.keys.get(0) {
None => {
log::trace!("Stream does not exist yet, retrying...");
}
Some(key) => {
key.ids.iter().for_each(|id| {
match id.map.get("change") {
None => {
log::warn!("Malformed event, skipping: {id:?}");
}
Some(value) => {
match value {
redis::Value::Data(data) => {
match String::from_byte_vec(data) {
None => {
log::warn!("Event with no data, skipping: {data:?}");
}
Some(strings) => {
strings.into_iter().for_each(|string| {
log::trace!("Received event, sending it: {string:?}");
messages_to_send.push(Message::Text(string))
})
}
}
}
_ => {
log::warn!("Malformed value, skipping...");
}
}
}
}
seq = id.id.clone();
})
}
}
},
} }
let response = response.unwrap();
if response.is_none() {
continue;
}
let response = response.unwrap();
seq = response.1;
let message = response.3;
log::trace!("Received event, sending it: {message:?}");
messages_to_send.push(Message::Text(message))
} }
} }

View file

@ -6,7 +6,7 @@ use crate::task::{BoardChange, Task};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BoardRequest { pub struct BoardRequest {
/// The Redis key to act on. /// The Redis key to act on.
pub key: String, pub board: String,
/// The [`BoardAction`] to perform. /// The [`BoardAction`] to perform.
pub action: BoardAction, pub action: BoardAction,
} }
@ -19,24 +19,24 @@ impl BoardRequest {
BoardAction::Title(title) => { BoardAction::Title(title) => {
log::debug!("Setting board Title: {title:?}"); log::debug!("Setting board Title: {title:?}");
let operation = BoardChange::Title(title); let operation = BoardChange::Title(title);
let _id = operation.store_in_redis(rconn, &self.key).await; let _id = operation.store_in_redis(rconn, &self.board).await;
}, },
BoardAction::Task(None, Some(task)) => { BoardAction::Task(None, Some(task)) => {
log::debug!("Creating Task: {task:?}"); log::debug!("Creating Task: {task:?}");
let id = Uuid::new_v4(); let id = Uuid::new_v4();
log::trace!("Assigned id {id:?} to Task: {task:?}"); log::trace!("Assigned id {id:?} to Task: {task:?}");
let operation = BoardChange::Task(id, Some(task)); let operation = BoardChange::Task(id, Some(task));
let _id = operation.store_in_redis(rconn, &self.key).await; let _id = operation.store_in_redis(rconn, &self.board).await;
}, },
BoardAction::Task(Some(id), Some(task)) => { BoardAction::Task(Some(id), Some(task)) => {
log::debug!("Editing Task {id:?}: {task:?}"); log::debug!("Editing Task {id:?}: {task:?}");
let operation = BoardChange::Task(id, Some(task)); let operation = BoardChange::Task(id, Some(task));
let _id = operation.store_in_redis(rconn, &self.key).await; let _id = operation.store_in_redis(rconn, &self.board).await;
}, },
BoardAction::Task(Some(id), None) => { BoardAction::Task(Some(id), None) => {
log::debug!("Deleting Task {id:?}..."); log::debug!("Deleting Task {id:?}...");
let operation = BoardChange::Task(id, None); let operation = BoardChange::Task(id, None);
let _id = operation.store_in_redis(rconn, &self.key).await; let _id = operation.store_in_redis(rconn, &self.board).await;
}, },
_ => { _ => {
log::warn!("Received unknown BoardRequest: {self:?}"); log::warn!("Received unknown BoardRequest: {self:?}");

View file

@ -15,10 +15,7 @@ pub async fn handler(
log::trace!("Awaiting data to send..."); log::trace!("Awaiting data to send...");
let message = messages_to_send.pop().await; let message = messages_to_send.pop().await;
let exit_when_done = match message { let exit_when_done = matches!(message, Message::Close(_));
Message::Close(_) => true,
_ => false,
};
log::trace!("Sending message: {message:?}"); log::trace!("Sending message: {message:?}");
let _ = sender.send(message).await let _ = sender.send(message).await

View file

@ -33,7 +33,7 @@ pub async fn healthcheck(
response.eq("PONG") response.eq("PONG")
.then_some(()) .then_some(())
.log_err_to_error("Received invalid PONG from Redis") .log_err_to_error("Received invalid PONG from Redis")
.ok_or_else(|| StatusCode::INTERNAL_SERVER_ERROR)?; .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(compose_version())) Ok(Json(compose_version()))
} }

View file

@ -22,7 +22,7 @@ impl BoardChange {
log::debug!("Storing BoardOperation in Redis: {:?}", &self); log::debug!("Storing BoardOperation in Redis: {:?}", &self);
log::trace!("Serializing BoardOperation to JSON..."); log::trace!("Serializing BoardOperation to JSON...");
let operation = serde_json::ser::to_string(self) let change = serde_json::ser::to_string(self)
.log_err_to_error("Failed to serialize BoardOperation") .log_err_to_error("Failed to serialize BoardOperation")
.map_err(|_| ())?; .map_err(|_| ())?;
@ -30,8 +30,8 @@ impl BoardChange {
let id = redis::cmd("XADD") let id = redis::cmd("XADD")
.arg(key) .arg(key)
.arg("*") .arg("*")
.arg("operation") .arg("change")
.arg(operation) .arg(change)
.query_async::<redis::aio::Connection, String>(rconn).await .query_async::<redis::aio::Connection, String>(rconn).await
.log_err_to_error("Failed to XADD to Redis") .log_err_to_error("Failed to XADD to Redis")
.map_err(|_| ())?; .map_err(|_| ())?;