1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-11-22 16:24:19 +00:00

Implement rate limits and code cleanup

This commit is contained in:
Steffo 2023-10-09 15:59:53 +02:00
parent e53583672c
commit 60952484de
Signed by: steffo
GPG key ID: 2A24051445686895
12 changed files with 172 additions and 40 deletions

View file

@ -0,0 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run client with local proxy" type="js.build_tools.npm">
<package-json value="$PROJECT_DIR$/todoblue/package.json" />
<command value="run" />
<scripts>
<script value="dev" />
</scripts>
<arguments value="--port=8081" />
<node-interpreter value="project" />
<envs>
<env name="NEXT_PUBLIC_TODOBLUE_OVERRIDE_BASE_URL" value="http://ethernet.nitro.home.steffo.eu:80" />
</envs>
<method v="2" />
</configuration>
</component>

View file

@ -13,6 +13,7 @@
<env name="AXUM_HOST" value="0.0.0.0:8080" /> <env name="AXUM_HOST" value="0.0.0.0:8080" />
<env name="REDIS_CONN" value="redis://127.0.0.1:6379/" /> <env name="REDIS_CONN" value="redis://127.0.0.1:6379/" />
<env name="RUST_LOG" value="todored" /> <env name="RUST_LOG" value="todored" />
<env name="TODORED_RATE_LIMIT_CONNECTIONS" value="0" />
</envs> </envs>
<option name="isRedirectInput" value="false" /> <option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" /> <option name="redirectInputPath" value="" />

View file

@ -0,0 +1,26 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run server with rate limiting" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="command" value="run" />
<option name="workingDirectory" value="file://$PROJECT_DIR$/todored" />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<envs>
<env name="AXUM_HOST" value="0.0.0.0:8080" />
<env name="AXUM_XFORWARDED" value="http://ethernet.nitro.home.steffo.eu" />
<env name="REDIS_CONN" value="redis://127.0.0.1:6379/" />
<env name="RUST_LOG" value="todored" />
<env name="TODORED_RATE_LIMIT_CONNECTIONS_PER_MINUTE" value="1" />
<env name="TODORED_RATE_LIMIT_MESSAGES_PER_MINUTE" value="1" />
</envs>
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

View file

@ -8,7 +8,8 @@ export async function RootFooter() {
<p> <p>
© <a href="https://steffo.eu">Stefano Pigozzi</a> - © <a href="https://steffo.eu">Stefano Pigozzi</a> -
<a href="https://www.gnu.org/licenses/agpl-3.0.en.html">AGPL 3.0</a> - <a href="https://www.gnu.org/licenses/agpl-3.0.en.html">AGPL 3.0</a> -
<a href="https://github.com/Steffo99/todocolors">GitHub</a> <a href="https://github.com/Steffo99/todocolors">GitHub</a> -
Connecting to <a href={process.env.NEXT_PUBLIC_TODOBLUE_OVERRIDE_BASE_URL}>{process.env.NEXT_PUBLIC_TODOBLUE_OVERRIDE_BASE_URL}</a>
</p> </p>
</footer> </footer>
) )

25
todopod/compose-build.yml Normal file
View file

@ -0,0 +1,25 @@
services:
redis:
image: "redis"
restart: unless-stopped
command: >-
redis-server
--save 60 1
--loglevel notice
volumes:
- "./data/redis/rdata:/data"
red:
build: "../todored"
restart: unless-stopped
environment:
REDIS_CONN: "redis://redis:6379/" # You probably don't need to change this
blue:
build: "../todoblue"
restart: unless-stopped
caddy:
image: "caddy"
restart: unless-stopped
volumes:
- "./data/caddy:/data"
- "./config/caddy/Caddyfile:/etc/caddy/Caddyfile"
network_mode: host

View file

@ -4,4 +4,5 @@ use crate::proxy::ReverseProxyInfoList;
required!(REDIS_CONN, String); required!(REDIS_CONN, String);
required!(AXUM_HOST, String); // FIXME: Use SocketAddr when possible required!(AXUM_HOST, String); // FIXME: Use SocketAddr when possible
optional!(AXUM_XFORWARDED, ReverseProxyInfoList); optional!(AXUM_XFORWARDED, ReverseProxyInfoList);
required!(TODORED_RATE_LIMIT_CONNECTIONS, usize); required!(TODORED_RATE_LIMIT_CONNECTIONS_PER_MINUTE, usize);
required!(TODORED_RATE_LIMIT_MESSAGES_PER_MINUTE, usize);

View file

@ -37,6 +37,7 @@ async fn main() {
) )
); );
log::info!("Starting web server!");
axum::Server::bind(&std::net::SocketAddr::from_str(&config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr")) axum::Server::bind(&std::net::SocketAddr::from_str(&config::AXUM_HOST).expect("AXUM_HOST to be a valid SocketAddr"))
.serve(router.into_make_service()) .serve(router.into_make_service())
.await .await

View file

@ -1,9 +1,8 @@
use std::net::SocketAddr; use std::net::IpAddr;
use std::str::FromStr; use std::str::FromStr;
use axum::async_trait; use axum::async_trait;
use axum::extract::FromRequestParts; use axum::extract::FromRequestParts;
use axum::http::request::Parts; use axum::http::request::Parts;
use axum::http::StatusCode;
use crate::config; use crate::config;
use crate::outcome::ResponseError; use crate::outcome::ResponseError;
@ -39,7 +38,7 @@ impl FromStr for ReverseProxyInfoList {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExtractReverseProxy { pub struct ExtractReverseProxy {
pub r#for: SocketAddr, pub r#for: IpAddr,
pub info: ReverseProxyInfo, pub info: ReverseProxyInfo,
} }
@ -52,54 +51,84 @@ impl<S> FromRequestParts<S> for ExtractReverseProxyOption where S: Send + Sync {
// TODO: Pending a security audit, as in second thought this doesn't seem so secure... // TODO: Pending a security audit, as in second thought this doesn't seem so secure...
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> { async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
log::debug!("Extracting reverse proxy headers...");
log::trace!("Getting authorized proxy list...");
let proxy_list = config::AXUM_XFORWARDED.clone(); let proxy_list = config::AXUM_XFORWARDED.clone();
// Reverse proxying is not configured // Reverse proxying is not configured
log::trace!("Making sure a proxy list has been defined...");
if proxy_list.is_none() { if proxy_list.is_none() {
log::trace!("No authorized proxies have been defined, extracting None...");
return Ok(ExtractReverseProxyOption(None)) return Ok(ExtractReverseProxyOption(None))
} }
let proxy_list = proxy_list.unwrap().0; let proxy_list = proxy_list.unwrap().0;
log::trace!("Authorized proxies are: {proxy_list:?}");
log::trace!("Parsing X-Forwarded headers...");
let r#for = parts.headers.get("X-Forwarded-For"); let r#for = parts.headers.get("X-Forwarded-For");
log::trace!("Raw X-Forwarded-For: {:?}", r#for);
let proto = parts.headers.get("X-Forwarded-Proto"); let proto = parts.headers.get("X-Forwarded-Proto");
log::trace!("Raw X-Forwarded-Proto: {:?}", proto);
let host = parts.headers.get("X-Forwarded-Host"); let host = parts.headers.get("X-Forwarded-Host");
log::trace!("Raw X-Forwarded-For: {:?}", host);
// Accessing the server without a reverse proxy // Accessing the server without a reverse proxy
log::trace!("Checking for presence of headers...");
if r#for.is_none() || proto.is_none() || host.is_none() { if r#for.is_none() || proto.is_none() || host.is_none() {
// TODO: Should this return None instead? log::warn!("X-Forwarded headers are missing, extracting None...");
return Err(StatusCode::BAD_GATEWAY) return Ok(ExtractReverseProxyOption(None))
} }
log::trace!("Converting X-Forwarded headers to &str...");
let r#for = r#for.unwrap().to_str(); let r#for = r#for.unwrap().to_str();
log::trace!("Stringified X-Forwarded-For: {:?}", r#for);
let proto = proto.unwrap().to_str(); let proto = proto.unwrap().to_str();
log::trace!("Stringified X-Forwarded-Proto: {:?}", proto);
let host = host.unwrap().to_str(); let host = host.unwrap().to_str();
log::trace!("Stringified X-Forwarded-For: {:?}", host);
// Control characters in X-Forwarded headers // Control characters in X-Forwarded headers
log::trace!("Checking for control characters...");
if r#for.is_err() || proto.is_err() || host.is_err() { if r#for.is_err() || proto.is_err() || host.is_err() {
return Err(StatusCode::BAD_GATEWAY) log::warn!("X-Forwarded headers have invalid characters in them, extracting None...");
return Ok(ExtractReverseProxyOption(None))
} }
log::trace!("Cloning X-Forwarded headers...");
let r#for = r#for.unwrap().to_string(); let r#for = r#for.unwrap().to_string();
let proto = proto.unwrap().to_string(); let proto = proto.unwrap().to_string();
let host = host.unwrap().to_string(); let host = host.unwrap().to_string();
log::trace!("Constructing ReverseProxyInfo...");
let info = ReverseProxyInfo { proto, host }; let info = ReverseProxyInfo { proto, host };
log::trace!("Constructed ReverseProxyInfo: {info:?}");
// X-Forwarded-Host is not authorized // X-Forwarded-Host is not authorized
log::trace!("Checking if proxy is authorized...");
if !proxy_list.contains(&info) { if !proxy_list.contains(&info) {
return Err(StatusCode::BAD_GATEWAY) log::warn!("X-Forwarded-Host is not an authorized proxy, extracting None...");
return Ok(ExtractReverseProxyOption(None))
} }
let r#for = r#for.parse::<SocketAddr>(); log::trace!("Parsing X-Forwarded-For as a IpAddr...");
let r#for = r#for.parse::<IpAddr>();
// X-Forwarded-For is not a valid IP address // X-Forwarded-For is not a valid IP address
log::trace!("Making sure X-Forwarded-For is a valid SocketAddr...");
if r#for.is_err() { if r#for.is_err() {
return Err(StatusCode::BAD_GATEWAY) log::warn!("X-Forwarded-For is not a valid SocketAddr, extracting None...");
return Ok(ExtractReverseProxyOption(None))
} }
let r#for = r#for.unwrap(); let r#for = r#for.unwrap();
log::trace!("Parsing X-Forwarded-For as: {:?}", r#for);
Ok(ExtractReverseProxyOption(Some(ExtractReverseProxy { r#for, info }))) log::trace!("Constructing result...");
let result = Ok(ExtractReverseProxyOption(Some(ExtractReverseProxy { r#for, info })));
log::debug!("Extracted reverse proxy headers as: {result:?}");
result
} }
} }

View file

@ -17,37 +17,41 @@ pub(crate) async fn handler(
log::trace!("Kebabified board name to: {board:?}"); log::trace!("Kebabified board name to: {board:?}");
log::trace!("Creating Redis connection for the handler..."); log::trace!("Creating Redis connection for the handler...");
let handle_redis = rclient.get_async_connection().await; let rconn = rclient.get_async_connection().await;
if handle_redis.is_err() { if rconn.is_err() {
log::error!("Could not open Redis connection for the handler."); log::error!("Could not open Redis connection for the handler.");
return Err::<(), StatusCode>(StatusCode::INTERNAL_SERVER_ERROR).into_response() return Err::<(), StatusCode>(StatusCode::INTERNAL_SERVER_ERROR).into_response()
} }
let mut handle_redis = handle_redis.unwrap(); let mut handle_redis = rconn.unwrap();
log::trace!("Created Redis connection for the main thread!"); log::trace!("Created Redis connection for the main thread!");
let count = *crate::config::TODORED_RATE_LIMIT_CONNECTIONS; let rate_limit_key = match proxy_opt {
log::trace!("TODORED_RATE_LIMIT_CONNECTIONS is {count}."); None => {
if count > 0 { log::warn!("Reverse proxying has not been detected, rate-limiting is disabled.");
if proxy_opt.is_none() { None
log::error!("TODORED_RATE_LIMIT_CONNECTIONS is {count}, but a request has been received without the proxy headers!");
return Err::<(), StatusCode>(StatusCode::BAD_GATEWAY).into_response();
} }
let proxy = proxy_opt.unwrap(); Some(proxy) => {
log::trace!("Checking X-Forwarded-For header..."); log::debug!("Reverse proxying has been detected, rate-limiting is enabled.");
let ip = proxy.r#for.ip(); let ip = proxy.r#for;
log::trace!("User's IP is: {ip}"); log::trace!("User agent's IP is: {ip}");
let key = format!("limit:{{{ip}}}:connections"); let key = format!("limit:{{{ip}}}:connections");
log::trace!("Rate-limiting key is: {key:?}"); log::trace!("Rate-limiting key is: {key:?}");
Some(key)
}
};
log::trace!("Running rate-limiting function..."); if let Some(rate_limit_key) = &rate_limit_key {
let result = super::limit::rate_limit_by_key(&mut handle_redis, key, 1, count, 1).await; let count = *crate::config::TODORED_RATE_LIMIT_CONNECTIONS_PER_MINUTE;
log::trace!("Connection rate limit is: {count} / 60 s");
if result.is_err() { if count > 0 {
log::warn!("User with IP {ip} hit connection rate limit!"); log::trace!("Checking rate limit...");
return Err::<(), StatusCode>(StatusCode::BAD_REQUEST).into_response() let result = super::limit::rate_limit_by_key(&mut handle_redis, &rate_limit_key, 1, count, 60).await;
if result.is_err() {
return Err::<(), StatusCode>(StatusCode::BAD_REQUEST).into_response()
}
} }
} }
log::trace!("Received websocket request, upgrading..."); log::trace!("Received websocket request, upgrading...");
upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket)) upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket, rate_limit_key))
} }

View file

@ -5,11 +5,11 @@ use crate::outcome::LoggableOutcome;
pub async fn rate_limit_by_key( pub async fn rate_limit_by_key(
rconn: &mut redis::aio::Connection, rconn: &mut redis::aio::Connection,
key: String, key: &str,
increment: usize, increment: usize,
limit: usize, limit: usize,
expiration_s: usize expiration_s: usize
) -> Result<(), CloseCode> { ) -> Result<usize, CloseCode> {
log::trace!("Incrementing rate limit counter for {key:?}..."); log::trace!("Incrementing rate limit counter for {key:?}...");
let response: usize = redis::cmd("INCRBY") let response: usize = redis::cmd("INCRBY")
.arg(&key) .arg(&key)
@ -25,11 +25,11 @@ pub async fn rate_limit_by_key(
.query_async::<redis::aio::Connection, ()>(rconn).await .query_async::<redis::aio::Connection, ()>(rconn).await
.log_err_to_warn("Could not set expiration for rate limit counter"); .log_err_to_warn("Could not set expiration for rate limit counter");
log::trace!("Checking rate limit of {limit} / {expiration_s} s for {key:?}..."); log::trace!("Checking rate limit of {limit} per {expiration_s} s for {key:?}...");
if response > limit { if response > limit {
log::warn!("Hit rate limit of {limit} / {expiration_s} s for {key:?}: counter is at {response}!"); log::warn!("Hit rate limit with {response} out of {limit} per {expiration_s} s for {key:?}!");
return Err(1008u16); return Err(1008u16);
} }
Ok(()) Ok(response)
} }

View file

@ -11,6 +11,7 @@ pub async fn handler(
board_name: String, board_name: String,
rclient: redis::Client, rclient: redis::Client,
websocket: WebSocket, websocket: WebSocket,
rate_limit_key: Option<String>,
) { ) {
log::trace!("Creating Redis connection for the main thread..."); log::trace!("Creating Redis connection for the main thread...");
let main_redis = rclient.get_async_connection().await; let main_redis = rclient.get_async_connection().await;
@ -22,6 +23,16 @@ pub async fn handler(
let mut main_redis = main_redis.unwrap(); let mut main_redis = main_redis.unwrap();
log::trace!("Created Redis connection for the main thread!"); log::trace!("Created Redis connection for the main thread!");
log::trace!("Creating Redis connection for the receive thread...");
let receive_redis = rclient.get_async_connection().await;
if receive_redis.is_err() {
log::error!("Could not open Redis connection for the receive thread.");
let _ = websocket.close().await;
return;
}
let receive_redis = receive_redis.unwrap();
log::trace!("Created Redis connection for the receive thread!");
log::trace!("Creating Redis connection for the XADD thread..."); log::trace!("Creating Redis connection for the XADD thread...");
let xadd_redis = rclient.get_async_connection().await; let xadd_redis = rclient.get_async_connection().await;
if xadd_redis.is_err() { if xadd_redis.is_err() {
@ -63,9 +74,11 @@ pub async fn handler(
log::trace!("Spawning ws_receive_thread..."); log::trace!("Spawning ws_receive_thread...");
let ws_receive_thread = tokio::spawn(ws_receive::handler( let ws_receive_thread = tokio::spawn(ws_receive::handler(
receive_redis,
receiver, receiver,
strings_to_process.clone(), strings_to_process.clone(),
messages_to_send.clone(), messages_to_send.clone(),
rate_limit_key,
)); ));
let ws_receive_abort = ws_receive_thread.abort_handle(); let ws_receive_abort = ws_receive_thread.abort_handle();
@ -116,7 +129,7 @@ pub async fn handler(
ws_send_thread ws_send_thread
); );
log::debug!("Websocket threads closed successfully!") log::debug!("Websocket threads closed successfully!");
} }
fn close_with_code( fn close_with_code(

View file

@ -2,12 +2,15 @@ use axum::extract::ws::{CloseCode, Message, WebSocket};
use futures_util::stream::SplitStream; use futures_util::stream::SplitStream;
use deadqueue::unlimited::Queue; use deadqueue::unlimited::Queue;
use std::sync::Arc; use std::sync::Arc;
use redis::aio::Connection;
use futures_util::StreamExt; use futures_util::StreamExt;
pub async fn handler( pub async fn handler(
mut rconn: Connection,
mut receiver: SplitStream<WebSocket>, mut receiver: SplitStream<WebSocket>,
strings_to_process: Arc<Queue<String>>, strings_to_process: Arc<Queue<String>>,
messages_to_send: Arc<Queue<Message>>, messages_to_send: Arc<Queue<Message>>,
rate_limit_key: Option<String>,
) -> Result<(), CloseCode> { ) -> Result<(), CloseCode> {
log::trace!("Thread started!"); log::trace!("Thread started!");
@ -16,6 +19,19 @@ pub async fn handler(
let value = receiver.next().await; let value = receiver.next().await;
log::trace!("Received from websocket: {value:?}"); log::trace!("Received from websocket: {value:?}");
if let Some(rate_limit_key) = &rate_limit_key {
let count = *crate::config::TODORED_RATE_LIMIT_MESSAGES_PER_MINUTE;
log::trace!("Connection rate limit is: {count} / 60 s");
if count > 0 {
log::trace!("Checking rate limit...");
let result = super::limit::rate_limit_by_key(&mut rconn, &rate_limit_key, 1, count, 60).await;
if result.is_err() {
log::warn!("Hit rate limit, closing connection.");
return Err(1008u16);
}
}
}
log::trace!("Checking if the websocket timed out..."); log::trace!("Checking if the websocket timed out...");
if value.is_none() { if value.is_none() {
log::debug!("Websocket timed out, closing connection."); log::debug!("Websocket timed out, closing connection.");