query server ratelimiting

This commit is contained in:
crabman 2024-04-28 13:49:37 +00:00
parent 8efe53ab45
commit 4c0cadabcf
No known key found for this signature in database
5 changed files with 124 additions and 6 deletions

View File

@ -5,6 +5,7 @@ use std::{
}; };
use tokio::sync::{watch, RwLock}; use tokio::sync::{watch, RwLock};
use tracing::error;
use veloren_query_server::{ use veloren_query_server::{
client::QueryClient, client::QueryClient,
proto::{ServerBattleMode, ServerInfo}, proto::{ServerBattleMode, ServerInfo},
@ -38,8 +39,10 @@ async fn main() {
let start = Instant::now(); let start = Instant::now();
for _i in 0..10000 { for _i in 0..32 {
client.server_info().await.unwrap(); if let Err(error) = client.server_info().await {
error!(?error, "Server info request error");
}
} }
println!("Metrics = {:#?}", metrics.read().await); println!("Metrics = {:#?}", metrics.read().await);

View File

@ -1,3 +1,4 @@
#[cfg(feature = "client")] pub mod client; #[cfg(feature = "client")] pub mod client;
pub mod proto; pub mod proto;
mod ratelimit;
#[cfg(feature = "server")] pub mod server; #[cfg(feature = "server")] pub mod server;

View File

@ -0,0 +1,89 @@
use std::{
collections::HashMap,
net::IpAddr,
time::{Duration, Instant},
};
const SHIFT_EVERY: Duration = Duration::from_secs(15);
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub enum ReducedIpAddr {
V4(u32),
V6(u64),
}
/// Per-IP state, divided into 4 segments of [`SHIFT_EVERY`] each (one minute at
/// the time of writing).
pub struct IpState([u16; 4]);
pub struct RateLimiter {
states: HashMap<ReducedIpAddr, IpState>,
last_shift: Instant,
/// Maximum amount requests that can be done in `4 * SHIFT_EVERY`
limit: u16,
}
impl RateLimiter {
pub fn new(limit: u16) -> Self {
Self {
states: Default::default(),
last_shift: Instant::now(),
limit,
}
}
pub fn maintain(&mut self, now: Instant) {
if now.duration_since(self.last_shift) > SHIFT_EVERY {
for (_, state) in self.states.iter_mut() {
state.shift();
}
// Remove empty states
self.states.retain(|_, state| !state.is_empty());
}
}
pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool {
if let Some(state) = self.states.get_mut(&ip) {
if state.total() >= self.limit {
state.0[0] = state.0[0].saturating_add(1);
false
} else {
state.0[1] += 1;
true
}
} else {
self.states.insert(ip, IpState::default());
true
}
}
}
impl IpState {
fn shift(&mut self) {
self.0.rotate_right(1);
self.0[0] = 0;
}
fn is_empty(&self) -> bool { self.0.iter().all(|&freq| freq == 0) }
fn total(&self) -> u16 { self.0.iter().fold(0, |total, &v| total.saturating_add(v)) }
}
impl Default for IpState {
fn default() -> Self { Self([1, 0, 0, 0]) }
}
impl From<IpAddr> for ReducedIpAddr {
fn from(value: IpAddr) -> Self {
match value {
IpAddr::V4(v4) => Self::V4(u32::from_be_bytes(v4.octets())),
IpAddr::V6(v6) => {
let bytes = v6.octets();
Self::V6(u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]))
},
}
}
}

View File

@ -17,9 +17,12 @@ use tokio::{
}; };
use tracing::{debug, trace}; use tracing::{debug, trace};
use crate::proto::{ use crate::{
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, proto::{
ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER,
},
ratelimit::{RateLimiter, ReducedIpAddr},
}; };
const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2);
@ -29,6 +32,7 @@ pub struct QueryServer {
pub addr: SocketAddr, pub addr: SocketAddr,
server_info: watch::Receiver<ServerInfo>, server_info: watch::Receiver<ServerInfo>,
settings: protocol::Settings, settings: protocol::Settings,
ratelimit: RateLimiter,
} }
#[derive(Default, Clone, Copy, Debug)] #[derive(Default, Clone, Copy, Debug)]
@ -41,6 +45,7 @@ pub struct Metrics {
pub sent_responses: u32, pub sent_responses: u32,
pub failed_responses: u32, pub failed_responses: u32,
pub timed_out_responses: u32, pub timed_out_responses: u32,
pub ratelimited: u32,
} }
impl QueryServer { impl QueryServer {
@ -48,6 +53,7 @@ impl QueryServer {
Self { Self {
addr, addr,
server_info, server_info,
ratelimit: RateLimiter::new(30),
settings: Default::default(), settings: Default::default(),
} }
} }
@ -107,6 +113,8 @@ impl QueryServer {
last_secret_refresh = now; last_secret_refresh = now;
secrets = gen_secret(); secrets = gen_secret();
} }
self.ratelimit.maintain(now);
} }
} }
} }
@ -148,7 +156,7 @@ impl QueryServer {
#[allow(deprecated)] #[allow(deprecated)]
let real_p = { let real_p = {
let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1); let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
remote.ip().hash(&mut hasher); ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
hasher.finish() hasher.finish()
}; };
@ -176,6 +184,12 @@ impl QueryServer {
return Ok(()); return Ok(());
} }
if !self.ratelimit.can_request(remote.ip().into()) {
trace!("Ratelimited request");
new_metrics.ratelimited += 1;
return Ok(());
}
match request { match request {
QueryServerRequest::ServerInfo(_) => { QueryServerRequest::ServerInfo(_) => {
new_metrics.info_requests += 1; new_metrics.info_requests += 1;
@ -242,6 +256,7 @@ impl std::ops::AddAssign for Metrics {
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
ratelimited,
}: Self, }: Self,
) { ) {
self.received_packets += received_packets; self.received_packets += received_packets;
@ -252,6 +267,7 @@ impl std::ops::AddAssign for Metrics {
self.sent_responses += sent_responses; self.sent_responses += sent_responses;
self.failed_responses += failed_responses; self.failed_responses += failed_responses;
self.timed_out_responses += timed_out_responses; self.timed_out_responses += timed_out_responses;
self.ratelimited += ratelimited;
} }
} }

View File

@ -79,6 +79,7 @@ pub struct QueryServerMetrics {
pub sent_responses: IntGauge, pub sent_responses: IntGauge,
pub failed_responses: IntGauge, pub failed_responses: IntGauge,
pub timed_out_responses: IntGauge, pub timed_out_responses: IntGauge,
pub ratelimited: IntGauge,
} }
impl PhysicsMetrics { impl PhysicsMetrics {
@ -471,6 +472,10 @@ impl QueryServerMetrics {
"query_server::timed_out_responses", "query_server::timed_out_responses",
"Amount of responses which timed out", "Amount of responses which timed out",
))?; ))?;
let ratelimited = IntGauge::with_opts(Opts::new(
"query_server::ratelimited",
"Ratelimited requests to the query server",
))?;
registry.register(Box::new(received_packets.clone()))?; registry.register(Box::new(received_packets.clone()))?;
registry.register(Box::new(dropped_packets.clone()))?; registry.register(Box::new(dropped_packets.clone()))?;
@ -480,6 +485,7 @@ impl QueryServerMetrics {
registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(sent_responses.clone()))?;
registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?;
registry.register(Box::new(timed_out_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?;
registry.register(Box::new(ratelimited.clone()))?;
Ok(Self { Ok(Self {
received_packets, received_packets,
@ -490,6 +496,7 @@ impl QueryServerMetrics {
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
ratelimited,
}) })
} }
@ -504,6 +511,7 @@ impl QueryServerMetrics {
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
ratelimited,
}: veloren_query_server::server::Metrics, }: veloren_query_server::server::Metrics,
) { ) {
self.received_packets.set(received_packets as i64); self.received_packets.set(received_packets as i64);
@ -514,5 +522,6 @@ impl QueryServerMetrics {
self.sent_responses.set(sent_responses as i64); self.sent_responses.set(sent_responses as i64);
self.failed_responses.set(failed_responses as i64); self.failed_responses.set(failed_responses as i64);
self.timed_out_responses.set(timed_out_responses as i64); self.timed_out_responses.set(timed_out_responses as i64);
self.ratelimited.set(ratelimited as i64);
} }
} }