1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-10-16 07:17:28 +00:00

Version 0.3.0

- Rate limiting
- Server at the bottom of the page
- New run configurations
This commit is contained in:
Steffo 2023-10-09 16:22:30 +02:00
parent 1de08ce469
commit 72c0353c90
Signed by: steffo
GPG key ID: 2A24051445686895
15 changed files with 223 additions and 17 deletions

View file

@ -0,0 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run client with local proxy" type="js.build_tools.npm">
<package-json value="$PROJECT_DIR$/todoblue/package.json" />
<command value="run" />
<scripts>
<script value="dev" />
</scripts>
<arguments value="--port=8081" />
<node-interpreter value="project" />
<envs>
<env name="NEXT_PUBLIC_TODOBLUE_OVERRIDE_BASE_URL" value="http://ethernet.nitro.home.steffo.eu:80" />
</envs>
<method v="2" />
</configuration>
</component>

View file

@ -13,6 +13,7 @@
<env name="AXUM_HOST" value="0.0.0.0:8080" />
<env name="REDIS_CONN" value="redis://127.0.0.1:6379/" />
<env name="RUST_LOG" value="todored" />
<env name="TODORED_RATE_LIMIT_CONNECTIONS" value="0" />
</envs>
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />

View file

@ -0,0 +1,26 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run server with rate limiting" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="command" value="run" />
<option name="workingDirectory" value="file://$PROJECT_DIR$/todored" />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<envs>
<env name="AXUM_HOST" value="0.0.0.0:8080" />
<env name="AXUM_XFORWARDED" value="http://ethernet.nitro.home.steffo.eu" />
<env name="REDIS_CONN" value="redis://127.0.0.1:6379/" />
<env name="RUST_LOG" value="todored" />
<env name="TODORED_RATE_LIMIT_CONNECTIONS_PER_MINUTE" value="1" />
<env name="TODORED_RATE_LIMIT_MESSAGES_PER_MINUTE" value="1" />
</envs>
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

View file

@ -1,6 +1,6 @@
{
"name": "todoblue",
"version": "0.2.0",
"version": "0.3.0",
"private": true,
"scripts": {
"dev": "next dev",

View file

@ -8,7 +8,8 @@ export async function RootFooter() {
<p>
© <a href="https://steffo.eu">Stefano Pigozzi</a> -
<a href="https://www.gnu.org/licenses/agpl-3.0.en.html">AGPL 3.0</a> -
<a href="https://github.com/Steffo99/todocolors">GitHub</a>
<a href="https://github.com/Steffo99/todocolors">GitHub</a> -
Connecting to <a href={process.env.NEXT_PUBLIC_TODOBLUE_OVERRIDE_BASE_URL}>{process.env.NEXT_PUBLIC_TODOBLUE_OVERRIDE_BASE_URL}</a>
</p>
</footer>
)

25
todopod/compose-build.yml Normal file
View file

@ -0,0 +1,25 @@
services:
redis:
image: "redis"
restart: unless-stopped
command: >-
redis-server
--save 60 1
--loglevel notice
volumes:
- "./data/redis/rdata:/data"
red:
build: "../todored"
restart: unless-stopped
environment:
REDIS_CONN: "redis://redis:6379/" # You probably don't need to change this
blue:
build: "../todoblue"
restart: unless-stopped
caddy:
image: "caddy"
restart: unless-stopped
volumes:
- "./data/caddy:/data"
- "./config/caddy/Caddyfile:/etc/caddy/Caddyfile"
network_mode: host

View file

@ -1,6 +1,6 @@
[package]
name = "todored"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View file

@ -4,3 +4,5 @@ use crate::proxy::ReverseProxyInfoList;
required!(REDIS_CONN, String);
required!(AXUM_HOST, String); // FIXME: Use SocketAddr when possible
optional!(AXUM_XFORWARDED, ReverseProxyInfoList);
required!(TODORED_RATE_LIMIT_CONNECTIONS_PER_MINUTE, usize);
required!(TODORED_RATE_LIMIT_MESSAGES_PER_MINUTE, usize);

View file

@ -37,6 +37,7 @@ async fn main() {
)
);
log::info!("Starting web server!");
axum::Server::bind(&std::net::SocketAddr::from_str(&config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr"))
.serve(router.into_make_service())
.await

View file

@ -1,9 +1,8 @@
use std::net::SocketAddr;
use std::net::IpAddr;
use std::str::FromStr;
use axum::async_trait;
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::http::StatusCode;
use crate::config;
use crate::outcome::ResponseError;
@ -39,66 +38,97 @@ impl FromStr for ReverseProxyInfoList {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExtractReverseProxy {
pub r#for: SocketAddr,
pub r#for: IpAddr,
pub info: ReverseProxyInfo,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExtractReverseProxyOption ( Option<ExtractReverseProxy> );
pub struct ExtractReverseProxyOption ( pub Option<ExtractReverseProxy> );
#[async_trait]
impl<S> FromRequestParts<S> for ExtractReverseProxyOption where S: Send + Sync {
type Rejection = ResponseError;
// TODO: Pending a security audit, as in second thought this doesn't seem so secure...
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
log::debug!("Extracting reverse proxy headers...");
log::trace!("Getting authorized proxy list...");
let proxy_list = config::AXUM_XFORWARDED.clone();
// Reverse proxying is not configured
log::trace!("Making sure a proxy list has been defined...");
if proxy_list.is_none() {
log::trace!("No authorized proxies have been defined, extracting None...");
return Ok(ExtractReverseProxyOption(None))
}
let proxy_list = proxy_list.unwrap().0;
log::trace!("Authorized proxies are: {proxy_list:?}");
log::trace!("Parsing X-Forwarded headers...");
let r#for = parts.headers.get("X-Forwarded-For");
log::trace!("Raw X-Forwarded-For: {:?}", r#for);
let proto = parts.headers.get("X-Forwarded-Proto");
log::trace!("Raw X-Forwarded-Proto: {:?}", proto);
let host = parts.headers.get("X-Forwarded-Host");
log::trace!("Raw X-Forwarded-For: {:?}", host);
// Accessing the server without a reverse proxy
log::trace!("Checking for presence of headers...");
if r#for.is_none() || proto.is_none() || host.is_none() {
// TODO: Should this return None instead?
return Err(StatusCode::BAD_GATEWAY)
log::warn!("X-Forwarded headers are missing, extracting None...");
return Ok(ExtractReverseProxyOption(None))
}
log::trace!("Converting X-Forwarded headers to &str...");
let r#for = r#for.unwrap().to_str();
log::trace!("Stringified X-Forwarded-For: {:?}", r#for);
let proto = proto.unwrap().to_str();
log::trace!("Stringified X-Forwarded-Proto: {:?}", proto);
let host = host.unwrap().to_str();
log::trace!("Stringified X-Forwarded-For: {:?}", host);
// Control characters in X-Forwarded headers
log::trace!("Checking for control characters...");
if r#for.is_err() || proto.is_err() || host.is_err() {
return Err(StatusCode::BAD_GATEWAY)
log::warn!("X-Forwarded headers have invalid characters in them, extracting None...");
return Ok(ExtractReverseProxyOption(None))
}
log::trace!("Cloning X-Forwarded headers...");
let r#for = r#for.unwrap().to_string();
let proto = proto.unwrap().to_string();
let host = host.unwrap().to_string();
log::trace!("Constructing ReverseProxyInfo...");
let info = ReverseProxyInfo { proto, host };
log::trace!("Constructed ReverseProxyInfo: {info:?}");
// X-Forwarded-Host is not authorized
log::trace!("Checking if proxy is authorized...");
if !proxy_list.contains(&info) {
return Err(StatusCode::BAD_GATEWAY)
log::warn!("X-Forwarded-Host is not an authorized proxy, extracting None...");
return Ok(ExtractReverseProxyOption(None))
}
let r#for = r#for.parse::<SocketAddr>();
log::trace!("Parsing X-Forwarded-For as a IpAddr...");
let r#for = r#for.parse::<IpAddr>();
// X-Forwarded-For is not a valid IP address
log::trace!("Making sure X-Forwarded-For is a valid SocketAddr...");
if r#for.is_err() {
return Err(StatusCode::BAD_GATEWAY)
log::warn!("X-Forwarded-For is not a valid SocketAddr, extracting None...");
return Ok(ExtractReverseProxyOption(None))
}
let r#for = r#for.unwrap();
log::trace!("Parsing X-Forwarded-For as: {:?}", r#for);
Ok(ExtractReverseProxyOption(Some(ExtractReverseProxy { r#for, info })))
log::trace!("Constructing result...");
let result = Ok(ExtractReverseProxyOption(Some(ExtractReverseProxy { r#for, info })));
log::debug!("Extracted reverse proxy headers as: {result:?}");
result
}
}

View file

@ -1,17 +1,57 @@
use axum::Extension;
use axum::{Extension};
use axum::extract::{Path, WebSocketUpgrade};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use crate::kebab::Skewer;
use crate::proxy::ExtractReverseProxyOption;
use super::ws;
pub(crate) async fn handler(
Path(board): Path<String>,
Extension(rclient): Extension<redis::Client>,
ExtractReverseProxyOption(proxy_opt): ExtractReverseProxyOption,
upgrade_request: WebSocketUpgrade,
) -> axum::response::Response {
log::trace!("Kebabifying board name...");
let board = board.to_kebab_lowercase();
log::trace!("Kebabified board name to: {board:?}");
log::trace!("Creating Redis connection for the handler...");
let rconn = rclient.get_async_connection().await;
if rconn.is_err() {
log::error!("Could not open Redis connection for the handler.");
return Err::<(), StatusCode>(StatusCode::INTERNAL_SERVER_ERROR).into_response()
}
let mut handle_redis = rconn.unwrap();
log::trace!("Created Redis connection for the main thread!");
let rate_limit_key = match proxy_opt {
None => {
log::warn!("Reverse proxying has not been detected, rate-limiting is disabled.");
None
}
Some(proxy) => {
log::debug!("Reverse proxying has been detected, rate-limiting is enabled.");
let ip = proxy.r#for;
log::trace!("User agent's IP is: {ip}");
let key = format!("limit:{{{ip}}}:connections");
log::trace!("Rate-limiting key is: {key:?}");
Some(key)
}
};
if let Some(rate_limit_key) = &rate_limit_key {
let count = *crate::config::TODORED_RATE_LIMIT_CONNECTIONS_PER_MINUTE;
log::trace!("Connection rate limit is: {count} / 60 s");
if count > 0 {
log::trace!("Checking rate limit...");
let result = super::limit::rate_limit_by_key(&mut handle_redis, &rate_limit_key, 1, count, 60).await;
if result.is_err() {
return Err::<(), StatusCode>(StatusCode::BAD_REQUEST).into_response()
}
}
}
log::trace!("Received websocket request, upgrading...");
upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket))
upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket, rate_limit_key))
}

View file

@ -0,0 +1,35 @@
//! Rate limiting for board websocket.
use axum::extract::ws::CloseCode;
use crate::outcome::LoggableOutcome;
pub async fn rate_limit_by_key(
rconn: &mut redis::aio::Connection,
key: &str,
increment: usize,
limit: usize,
expiration_s: usize
) -> Result<usize, CloseCode> {
log::trace!("Incrementing rate limit counter for {key:?}...");
let response: usize = redis::cmd("INCRBY")
.arg(&key)
.arg(increment)
.query_async::<redis::aio::Connection, usize>(rconn).await
.log_err_to_error("Could not increase rate limit counter")
.map_err(|_| 1011u16)?;
log::trace!("Refreshing rate limit counter expiration for {key:?}...");
let _ = redis::cmd("EXPIRE")
.arg(&key)
.arg(expiration_s)
.query_async::<redis::aio::Connection, ()>(rconn).await
.log_err_to_warn("Could not set expiration for rate limit counter");
log::trace!("Checking rate limit of {limit} per {expiration_s} s for {key:?}...");
if response > limit {
log::warn!("Hit rate limit with {response} out of {limit} per {expiration_s} s for {key:?}!");
return Err(1008u16);
}
Ok(response)
}

View file

@ -7,5 +7,6 @@ pub(self) mod ws_receive;
pub(self) mod redis_xadd;
pub(self) mod redis_xread;
pub(self) mod ws_send;
pub(self) mod limit;
pub(crate) use self::axum::handler as board_websocket;

View file

@ -11,6 +11,7 @@ pub async fn handler(
board_name: String,
rclient: redis::Client,
websocket: WebSocket,
rate_limit_key: Option<String>,
) {
log::trace!("Creating Redis connection for the main thread...");
let main_redis = rclient.get_async_connection().await;
@ -22,6 +23,16 @@ pub async fn handler(
let mut main_redis = main_redis.unwrap();
log::trace!("Created Redis connection for the main thread!");
log::trace!("Creating Redis connection for the receive thread...");
let receive_redis = rclient.get_async_connection().await;
if receive_redis.is_err() {
log::error!("Could not open Redis connection for the receive thread.");
let _ = websocket.close().await;
return;
}
let receive_redis = receive_redis.unwrap();
log::trace!("Created Redis connection for the receive thread!");
log::trace!("Creating Redis connection for the XADD thread...");
let xadd_redis = rclient.get_async_connection().await;
if xadd_redis.is_err() {
@ -63,9 +74,11 @@ pub async fn handler(
log::trace!("Spawning ws_receive_thread...");
let ws_receive_thread = tokio::spawn(ws_receive::handler(
receive_redis,
receiver,
strings_to_process.clone(),
messages_to_send.clone(),
rate_limit_key,
));
let ws_receive_abort = ws_receive_thread.abort_handle();
@ -116,7 +129,7 @@ pub async fn handler(
ws_send_thread
);
log::debug!("Websocket threads closed successfully!")
log::debug!("Websocket threads closed successfully!");
}
fn close_with_code(

View file

@ -2,12 +2,15 @@ use axum::extract::ws::{CloseCode, Message, WebSocket};
use futures_util::stream::SplitStream;
use deadqueue::unlimited::Queue;
use std::sync::Arc;
use redis::aio::Connection;
use futures_util::StreamExt;
pub async fn handler(
mut rconn: Connection,
mut receiver: SplitStream<WebSocket>,
strings_to_process: Arc<Queue<String>>,
messages_to_send: Arc<Queue<Message>>,
rate_limit_key: Option<String>,
) -> Result<(), CloseCode> {
log::trace!("Thread started!");
@ -16,6 +19,19 @@ pub async fn handler(
let value = receiver.next().await;
log::trace!("Received from websocket: {value:?}");
if let Some(rate_limit_key) = &rate_limit_key {
let count = *crate::config::TODORED_RATE_LIMIT_MESSAGES_PER_MINUTE;
log::trace!("Connection rate limit is: {count} / 60 s");
if count > 0 {
log::trace!("Checking rate limit...");
let result = super::limit::rate_limit_by_key(&mut rconn, &rate_limit_key, 1, count, 60).await;
if result.is_err() {
log::warn!("Hit rate limit, closing connection.");
return Err(1008u16);
}
}
}
log::trace!("Checking if the websocket timed out...");
if value.is_none() {
log::debug!("Websocket timed out, closing connection.");