protect against spoofed addressess

This commit is contained in:
crabman 2024-04-28 12:58:23 +00:00
parent 419ee88cc2
commit 8efe53ab45
No known key found for this signature in database
6 changed files with 122 additions and 33 deletions

1
Cargo.lock generated
View File

@ -7108,6 +7108,7 @@ name = "veloren-query-server"
version = "0.1.0"
dependencies = [
"protocol",
"rand 0.8.5",
"tokio",
"tracing",
"tracing-subscriber",

View File

@ -7,7 +7,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
server = ["dep:tokio"]
server = ["dep:tokio", "dep:rand"]
client = ["dep:tokio", "tokio/time"]
example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"]
default = ["server", "client"]
@ -17,6 +17,7 @@ tokio = { workspace = true, optional = true, features = ["net", "sync"] }
protocol = { version = "3.4.0", default-features = false, features = ["derive"] }
tracing-subscriber = { version = "0.3.7", optional = true }
tracing = { workspace = true }
rand = { workspace = true, optional = true }
[[example]]
name = "demo"

View File

@ -29,7 +29,7 @@ async fn main() {
tokio::task::spawn(async move { server.run(metrics2).await.unwrap() });
let client = QueryClient { addr };
let mut client = QueryClient::new(addr);
let (info, ping) = client.server_info().await.unwrap();
println!("Ping = {}ms", ping.as_millis());

View File

@ -6,26 +6,33 @@ use std::{
use protocol::Parcel;
use tokio::{net::UdpSocket, time::timeout};
use tracing::trace;
use crate::proto::{
QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE,
VELOREN_HEADER,
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER,
};
const MAX_REQUEST_RETRIES: usize = 5;
#[derive(Debug)]
pub enum QueryClientError {
Io(tokio::io::Error),
Protocol(protocol::Error),
InvalidResponse,
Timeout,
ChallengeFailed,
}
pub struct QueryClient {
pub addr: SocketAddr,
p: u64,
}
impl QueryClient {
pub async fn server_info(&self) -> Result<(ServerInfo, Duration), QueryClientError> {
pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } }
pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> {
self.send_query(QueryServerRequest::ServerInfo(Default::default()))
.await
.and_then(|(response, duration)| {
@ -39,33 +46,50 @@ impl QueryClient {
}
async fn send_query(
&self,
&mut self,
request: QueryServerRequest,
) -> Result<(QueryServerResponse, Duration), QueryClientError> {
let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE);
buf.extend(VELOREN_HEADER);
// 2 extra bytes for version information, currently unused
buf.extend([0; 2]);
buf.extend(<QueryServerRequest as Parcel>::raw_bytes(
&request,
&Default::default(),
)?);
let mut tries = 0;
while tries < MAX_REQUEST_RETRIES {
tries += 1;
let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE);
let query_sent = Instant::now();
socket.send_to(&buf, self.addr).await?;
buf.extend(VELOREN_HEADER);
// 2 extra bytes for version information, currently unused
buf.extend([0; 2]);
buf.extend(<RawQueryServerRequest as Parcel>::raw_bytes(
&RawQueryServerRequest { p: self.p, request },
&Default::default(),
)?);
let mut buf = vec![0; MAX_RESPONSE_SIZE];
let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
.await
.map_err(|_| QueryClientError::Timeout)?
.map_err(|_| QueryClientError::Timeout)?;
let query_sent = Instant::now();
socket.send_to(&buf, self.addr).await?;
let packet =
<QueryServerResponse as Parcel>::read(&mut io::Cursor::new(buf), &Default::default())?;
let mut buf = vec![0; MAX_RESPONSE_SIZE];
let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
.await
.map_err(|_| QueryClientError::Timeout)?
.map_err(|_| QueryClientError::Timeout)?;
Ok((packet, query_sent.elapsed()))
let packet = <RawQueryServerResponse as Parcel>::read(
&mut io::Cursor::new(buf),
&Default::default(),
)?;
match packet {
RawQueryServerResponse::Response(response) => {
return Ok((response, query_sent.elapsed()));
},
RawQueryServerResponse::P(p) => {
trace!(?p, "Resetting p");
self.p = p
},
}
}
Err(QueryClientError::ChallengeFailed)
}
}

View File

@ -4,6 +4,14 @@ pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n'];
pub const MAX_REQUEST_SIZE: usize = 300;
pub const MAX_RESPONSE_SIZE: usize = 256;
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
pub struct RawQueryServerRequest {
pub p: u64,
pub request: QueryServerRequest,
}
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
@ -13,6 +21,14 @@ pub enum QueryServerRequest {
// New requests should be added at the end to prevent breakage
}
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
pub enum RawQueryServerResponse {
Response(QueryServerResponse),
P(u64),
}
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]

View File

@ -1,12 +1,15 @@
#[allow(deprecated)] use std::hash::SipHasher;
use std::{
future::Future,
hash::{Hash, Hasher},
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
time::Duration,
time::{Duration, Instant},
};
use protocol::Parcel;
use rand::{thread_rng, Rng};
use tokio::{
net::UdpSocket,
sync::{watch, RwLock},
@ -15,10 +18,12 @@ use tokio::{
use tracing::{debug, trace};
use crate::proto::{
QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER,
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER,
};
const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2);
const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(60);
pub struct QueryServer {
pub addr: SocketAddr,
@ -50,6 +55,13 @@ impl QueryServer {
pub async fn run(&mut self, metrics: Arc<RwLock<Metrics>>) -> Result<(), tokio::io::Error> {
let socket = UdpSocket::bind(self.addr).await?;
let gen_secret = || {
let mut rng = thread_rng();
(rng.gen::<u64>(), rng.gen::<u64>())
};
let mut secrets = gen_secret();
let mut last_secret_refresh = Instant::now();
let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
loop {
*buf = [0; MAX_REQUEST_SIZE];
@ -78,6 +90,7 @@ impl QueryServer {
.process_datagram(
msg_buf,
remote_addr,
secrets,
(&mut new_metrics, Arc::clone(&metrics)),
)
.await
@ -87,6 +100,14 @@ impl QueryServer {
// Update metrics at the end of eath packet
*metrics.write().await += new_metrics;
{
let now = Instant::now();
if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL {
last_secret_refresh = now;
secrets = gen_secret();
}
}
}
}
@ -109,16 +130,27 @@ impl QueryServer {
&mut self,
datagram: &[u8],
remote: SocketAddr,
secrets: (u64, u64),
(new_metrics, metrics): (&mut Metrics, Arc<RwLock<Metrics>>),
) -> Result<(), tokio::io::Error> {
let Ok(packet): Result<QueryServerRequest, _> =
<QueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
let Ok(RawQueryServerRequest {
p: client_p,
request,
}) =
<RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
else {
new_metrics.invalid_packets += 1;
return Ok(());
};
trace!(?packet, "Received packet");
trace!(?request, "Received packet");
#[allow(deprecated)]
let real_p = {
let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
remote.ip().hash(&mut hasher);
hasher.finish()
};
async fn timed<'a, F: Future<Output = O> + 'a, O>(
fut: F,
@ -131,14 +163,29 @@ impl QueryServer {
None
}
}
match packet {
if real_p != client_p {
tokio::task::spawn(async move {
timed(
Self::send_response(RawQueryServerResponse::P(real_p), remote, &metrics),
&metrics,
)
.await;
});
return Ok(());
}
match request {
QueryServerRequest::ServerInfo(_) => {
new_metrics.info_requests += 1;
let server_info = *self.server_info.borrow();
tokio::task::spawn(async move {
timed(
Self::send_response(
QueryServerResponse::ServerInfo(server_info),
RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(
server_info,
)),
remote,
&metrics,
),
@ -153,7 +200,7 @@ impl QueryServer {
}
async fn send_response(
response: QueryServerResponse,
response: RawQueryServerResponse,
addr: SocketAddr,
metrics: &Arc<RwLock<Metrics>>,
) {
@ -165,7 +212,7 @@ impl QueryServer {
};
let buf = if let Ok(data) =
<QueryServerResponse as Parcel>::raw_bytes(&response, &Default::default())
<RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default())
{
data
} else {