1
Fork 0
mirror of https://github.com/Steffo99/todocolors.git synced 2024-11-22 16:24:19 +00:00

More WIP stuff on rate limiting

This commit is contained in:
Steffo 2023-10-09 01:36:43 +02:00
parent d47b7261e5
commit e53583672c
Signed by: steffo
GPG key ID: 2A24051445686895
4 changed files with 46 additions and 7 deletions

View file

@ -4,3 +4,4 @@ use crate::proxy::ReverseProxyInfoList;
required!(REDIS_CONN, String); required!(REDIS_CONN, String);
required!(AXUM_HOST, String); // FIXME: Use SocketAddr when possible required!(AXUM_HOST, String); // FIXME: Use SocketAddr when possible
optional!(AXUM_XFORWARDED, ReverseProxyInfoList); optional!(AXUM_XFORWARDED, ReverseProxyInfoList);
required!(TODORED_RATE_LIMIT_CONNECTIONS, usize);

View file

@ -44,12 +44,13 @@ pub struct ExtractReverseProxy {
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExtractReverseProxyOption ( Option<ExtractReverseProxy> ); pub struct ExtractReverseProxyOption ( pub Option<ExtractReverseProxy> );
#[async_trait] #[async_trait]
impl<S> FromRequestParts<S> for ExtractReverseProxyOption where S: Send + Sync { impl<S> FromRequestParts<S> for ExtractReverseProxyOption where S: Send + Sync {
type Rejection = ResponseError; type Rejection = ResponseError;
// TODO: Pending a security audit, as in second thought this doesn't seem so secure...
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> { async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let proxy_list = config::AXUM_XFORWARDED.clone(); let proxy_list = config::AXUM_XFORWARDED.clone();

View file

@ -1,17 +1,53 @@
use axum::Extension; use axum::{Extension};
use axum::extract::{Path, WebSocketUpgrade}; use axum::extract::{Path, WebSocketUpgrade};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use crate::kebab::Skewer; use crate::kebab::Skewer;
use crate::proxy::ExtractReverseProxyOption;
use super::ws; use super::ws;
pub(crate) async fn handler( pub(crate) async fn handler(
Path(board): Path<String>, Path(board): Path<String>,
Extension(rclient): Extension<redis::Client>, Extension(rclient): Extension<redis::Client>,
ExtractReverseProxyOption(proxy_opt): ExtractReverseProxyOption,
upgrade_request: WebSocketUpgrade, upgrade_request: WebSocketUpgrade,
) -> axum::response::Response { ) -> axum::response::Response {
log::trace!("Kebabifying board name..."); log::trace!("Kebabifying board name...");
let board = board.to_kebab_lowercase(); let board = board.to_kebab_lowercase();
log::trace!("Kebabified board name to: {board:?}"); log::trace!("Kebabified board name to: {board:?}");
log::trace!("Creating Redis connection for the handler...");
let handle_redis = rclient.get_async_connection().await;
if handle_redis.is_err() {
log::error!("Could not open Redis connection for the handler.");
return Err::<(), StatusCode>(StatusCode::INTERNAL_SERVER_ERROR).into_response()
}
let mut handle_redis = handle_redis.unwrap();
log::trace!("Created Redis connection for the main thread!");
let count = *crate::config::TODORED_RATE_LIMIT_CONNECTIONS;
log::trace!("TODORED_RATE_LIMIT_CONNECTIONS is {count}.");
if count > 0 {
if proxy_opt.is_none() {
log::error!("TODORED_RATE_LIMIT_CONNECTIONS is {count}, but a request has been received without the proxy headers!");
return Err::<(), StatusCode>(StatusCode::BAD_GATEWAY).into_response();
}
let proxy = proxy_opt.unwrap();
log::trace!("Checking X-Forwarded-For header...");
let ip = proxy.r#for.ip();
log::trace!("User's IP is: {ip}");
let key = format!("limit:{{{ip}}}:connections");
log::trace!("Rate-limiting key is: {key:?}");
log::trace!("Running rate-limiting function...");
let result = super::limit::rate_limit_by_key(&mut handle_redis, key, 1, count, 1).await;
if result.is_err() {
log::warn!("User with IP {ip} hit connection rate limit!");
return Err::<(), StatusCode>(StatusCode::BAD_REQUEST).into_response()
}
}
log::trace!("Received websocket request, upgrading..."); log::trace!("Received websocket request, upgrading...");
upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket)) upgrade_request.on_upgrade(|websocket| ws::handler(board, rclient, websocket))
} }

View file

@ -1,27 +1,28 @@
//! Rate limiting for board websocket. //! Rate limiting for board websocket.
use axum::extract::ws::CloseCode; use axum::extract::ws::CloseCode;
use redis::Connection;
use crate::outcome::LoggableOutcome; use crate::outcome::LoggableOutcome;
pub fn rate_limit_by_key( pub async fn rate_limit_by_key(
mut rconn: Connection, rconn: &mut redis::aio::Connection,
key: String, key: String,
increment: usize, increment: usize,
limit: usize, limit: usize,
expiration_s: usize expiration_s: usize
) -> Result<(), CloseCode> { ) -> Result<(), CloseCode> {
log::trace!("Incrementing rate limit counter for {key:?}..."); log::trace!("Incrementing rate limit counter for {key:?}...");
let response: usize = rconn.cmd("INCRBY") let response: usize = redis::cmd("INCRBY")
.arg(&key) .arg(&key)
.arg(increment) .arg(increment)
.query_async::<redis::aio::Connection, usize>(rconn).await
.log_err_to_error("Could not increase rate limit counter") .log_err_to_error("Could not increase rate limit counter")
.map_err(|_| 1011u16)?; .map_err(|_| 1011u16)?;
log::trace!("Refreshing rate limit counter expiration for {key:?}..."); log::trace!("Refreshing rate limit counter expiration for {key:?}...");
rconn.cmd("EXPIRE") let _ = redis::cmd("EXPIRE")
.arg(&key) .arg(&key)
.arg(expiration_s) .arg(expiration_s)
.query_async::<redis::aio::Connection, ()>(rconn).await
.log_err_to_warn("Could not set expiration for rate limit counter"); .log_err_to_warn("Could not set expiration for rate limit counter");
log::trace!("Checking rate limit of {limit} / {expiration_s} s for {key:?}..."); log::trace!("Checking rate limit of {limit} / {expiration_s} s for {key:?}...");