mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
tcp protocol hardening
- make it harder for the server to crash and also kill invalid sessions properly (instead of waiting for them to close) - introduce macros to reduce code duplication - added tests to check for valid handshake as well as garbage tcp
This commit is contained in:
parent
9d32e3f884
commit
df45d35c0e
@ -7,7 +7,11 @@ use crate::{
|
||||
scheduler::Scheduler,
|
||||
types::{Mid, Pid, Prio, Promises, Sid},
|
||||
};
|
||||
use async_std::{io, sync::RwLock, task};
|
||||
use async_std::{
|
||||
io,
|
||||
sync::{Mutex, RwLock},
|
||||
task,
|
||||
};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
sink::SinkExt,
|
||||
@ -50,9 +54,7 @@ pub struct Participant {
|
||||
a2b_steam_open_s: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
|
||||
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
|
||||
closed: AtomicBool,
|
||||
//We need a std::Mutex here, the async Mutex requeres a block in `Drop` which can `panic!`
|
||||
//It's only okay because `disconnect` is the only `fn` accessing it and it consumes self!
|
||||
a2s_disconnect_s: Arc<std::sync::Mutex<Option<ParticipantCloseChannel>>>,
|
||||
a2s_disconnect_s: Arc<Mutex<Option<ParticipantCloseChannel>>>,
|
||||
}
|
||||
|
||||
/// `Streams` represents a channel to send `n` messages with a certain priority
|
||||
@ -142,7 +144,7 @@ pub enum StreamError {
|
||||
pub struct Network {
|
||||
local_pid: Pid,
|
||||
participant_disconnect_sender:
|
||||
RwLock<HashMap<Pid, Arc<std::sync::Mutex<Option<ParticipantCloseChannel>>>>>,
|
||||
RwLock<HashMap<Pid, Arc<Mutex<Option<ParticipantCloseChannel>>>>>,
|
||||
listen_sender:
|
||||
RwLock<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<async_std::io::Result<()>>)>>,
|
||||
connect_sender:
|
||||
@ -394,7 +396,7 @@ impl Participant {
|
||||
a2b_steam_open_s: RwLock::new(a2b_steam_open_s),
|
||||
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
|
||||
closed: AtomicBool::new(false),
|
||||
a2s_disconnect_s: Arc::new(std::sync::Mutex::new(Some(a2s_disconnect_s))),
|
||||
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
|
||||
}
|
||||
}
|
||||
|
||||
@ -575,7 +577,7 @@ impl Participant {
|
||||
self.closed.store(true, Ordering::Relaxed);
|
||||
//Streams will be closed by BParticipant
|
||||
|
||||
match self.a2s_disconnect_s.lock().unwrap().take() {
|
||||
match self.a2s_disconnect_s.lock().await.take() {
|
||||
Some(mut a2s_disconnect_s) => {
|
||||
let (finished_sender, finished_receiver) = oneshot::channel();
|
||||
// Participant is connecting to Scheduler here, not as usual
|
||||
@ -840,13 +842,12 @@ impl Drop for Network {
|
||||
);
|
||||
let mut finished_receiver_list = vec![];
|
||||
task::block_on(async {
|
||||
// we need to carefully shut down here! as otherwise we might call
|
||||
// Participant::Drop with a2s_disconnect_s here which would open
|
||||
// another task::block, which would panic!
|
||||
// we MUST avoid nested block_on, good that Network::Drop no longer triggers
|
||||
// Participant::Drop directly but just the BParticipant
|
||||
for (remote_pid, a2s_disconnect_s) in
|
||||
self.participant_disconnect_sender.write().await.drain()
|
||||
{
|
||||
match a2s_disconnect_s.lock().unwrap().take() {
|
||||
match a2s_disconnect_s.lock().await.take() {
|
||||
Some(mut a2s_disconnect_s) => {
|
||||
trace!(?remote_pid, "Participants will be closed");
|
||||
let (finished_sender, finished_receiver) = oneshot::channel();
|
||||
@ -893,7 +894,7 @@ impl Drop for Participant {
|
||||
let pid = self.remote_pid;
|
||||
debug!(?pid, "Shutting down Participant");
|
||||
|
||||
match self.a2s_disconnect_s.lock().unwrap().take() {
|
||||
match task::block_on(self.a2s_disconnect_s.lock()).take() {
|
||||
None => trace!(
|
||||
?pid,
|
||||
"Participant has been shutdown cleanly, no further waiting is requiered!"
|
||||
|
@ -159,7 +159,7 @@ impl Handshake {
|
||||
&self,
|
||||
w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>,
|
||||
mut c2w_frame_s: mpsc::UnboundedSender<Frame>,
|
||||
_read_stop_sender: oneshot::Sender<()>,
|
||||
read_stop_sender: oneshot::Sender<()>,
|
||||
) -> Result<(Pid, Sid, u128), ()> {
|
||||
const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \
|
||||
something went wrong on network layer and connection will be closed";
|
||||
@ -170,7 +170,7 @@ impl Handshake {
|
||||
self.send_handshake(&mut c2w_frame_s).await;
|
||||
}
|
||||
|
||||
match w2c_cid_frame_r.next().await {
|
||||
let r = match w2c_cid_frame_r.next().await {
|
||||
Some((
|
||||
_,
|
||||
Frame::Handshake {
|
||||
@ -198,9 +198,8 @@ impl Handshake {
|
||||
.unwrap();
|
||||
c2w_frame_s.send(Frame::Shutdown).await.unwrap();
|
||||
}
|
||||
return Err(());
|
||||
}
|
||||
if version != VELOREN_NETWORK_VERSION {
|
||||
Err(())
|
||||
} else if version != VELOREN_NETWORK_VERSION {
|
||||
error!(?version, "Connection with wrong network version");
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
@ -225,13 +224,15 @@ impl Handshake {
|
||||
.unwrap();
|
||||
c2w_frame_s.send(Frame::Shutdown {}).await.unwrap();
|
||||
}
|
||||
return Err(());
|
||||
}
|
||||
debug!("Handshake completed");
|
||||
if self.init_handshake {
|
||||
self.send_init(&mut c2w_frame_s, &pid_string).await;
|
||||
Err(())
|
||||
} else {
|
||||
self.send_handshake(&mut c2w_frame_s).await;
|
||||
debug!("Handshake completed");
|
||||
if self.init_handshake {
|
||||
self.send_init(&mut c2w_frame_s, &pid_string).await;
|
||||
} else {
|
||||
self.send_handshake(&mut c2w_frame_s).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
Some((_, Frame::Shutdown)) => {
|
||||
@ -240,7 +241,7 @@ impl Handshake {
|
||||
.frames_in_total
|
||||
.with_label_values(&[&pid_string, &cid_string, "Shutdown"])
|
||||
.inc();
|
||||
return Err(());
|
||||
Err(())
|
||||
},
|
||||
Some((_, Frame::Raw(bytes))) => {
|
||||
self.metrics
|
||||
@ -251,19 +252,29 @@ impl Handshake {
|
||||
Ok(string) => error!(?string, ERR_S),
|
||||
_ => error!(?bytes, ERR_S),
|
||||
}
|
||||
return Err(());
|
||||
Err(())
|
||||
},
|
||||
Some((_, frame)) => {
|
||||
self.metrics
|
||||
.frames_in_total
|
||||
.with_label_values(&[&pid_string, &cid_string, frame.get_string()])
|
||||
.inc();
|
||||
return Err(());
|
||||
Err(())
|
||||
},
|
||||
None => return Err(()),
|
||||
None => Err(()),
|
||||
};
|
||||
if let Err(()) = r {
|
||||
if let Err(e) = read_stop_sender.send(()) {
|
||||
trace!(
|
||||
?e,
|
||||
"couldn't stop protocol, probably it encountered a Protocol Stop and closed \
|
||||
itself already, which is fine"
|
||||
);
|
||||
}
|
||||
return Err(());
|
||||
}
|
||||
|
||||
match w2c_cid_frame_r.next().await {
|
||||
let r = match w2c_cid_frame_r.next().await {
|
||||
Some((_, Frame::Init { pid, secret })) => {
|
||||
debug!(?pid, "Participant send their ID");
|
||||
pid_string = pid.to_string();
|
||||
@ -307,7 +318,17 @@ impl Handshake {
|
||||
Err(())
|
||||
},
|
||||
None => Err(()),
|
||||
};
|
||||
if r.is_err() {
|
||||
if let Err(e) = read_stop_sender.send(()) {
|
||||
trace!(
|
||||
?e,
|
||||
"couldn't stop protocol, probably it encountered a Protocol Stop and closed \
|
||||
itself already, which is fine"
|
||||
);
|
||||
}
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
async fn send_handshake(&self, c2w_frame_s: &mut mpsc::UnboundedSender<Frame>) {
|
||||
|
@ -8,7 +8,7 @@ use async_std::{
|
||||
};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
future::FutureExt,
|
||||
future::{Fuse, FutureExt},
|
||||
lock::Mutex,
|
||||
select,
|
||||
sink::SinkExt,
|
||||
@ -59,22 +59,35 @@ impl TcpProtocol {
|
||||
}
|
||||
|
||||
/// read_except and if it fails, close the protocol
|
||||
async fn read_except_or_close(
|
||||
async fn read_or_close(
|
||||
cid: Cid,
|
||||
mut stream: &TcpStream,
|
||||
mut bytes: &mut [u8],
|
||||
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
|
||||
) {
|
||||
if let Err(e) = stream.read_exact(&mut bytes).await {
|
||||
warn!(
|
||||
?e,
|
||||
"Closing tcp protocol due to read error, sending close frame to gracefully \
|
||||
shutdown"
|
||||
);
|
||||
w2c_cid_frame_s
|
||||
.send((cid, Frame::Shutdown))
|
||||
.await
|
||||
.expect("Channel or Participant seems no longer to exist to be Shutdown");
|
||||
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
|
||||
) -> bool {
|
||||
match select! {
|
||||
r = stream.read_exact(&mut bytes).fuse() => Some(r),
|
||||
_ = end_receiver => None,
|
||||
} {
|
||||
Some(Ok(_)) => false,
|
||||
Some(Err(e)) => {
|
||||
debug!(
|
||||
?cid,
|
||||
?e,
|
||||
"Closing tcp protocol due to read error, sending close frame to gracefully \
|
||||
shutdown"
|
||||
);
|
||||
w2c_cid_frame_s
|
||||
.send((cid, Frame::Shutdown))
|
||||
.await
|
||||
.expect("Channel or Participant seems no longer to exist to be Shutdown");
|
||||
true
|
||||
},
|
||||
None => {
|
||||
trace!(?cid, "shutdown requested");
|
||||
true
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,7 +95,7 @@ impl TcpProtocol {
|
||||
&self,
|
||||
cid: Cid,
|
||||
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
|
||||
end_receiver: oneshot::Receiver<()>,
|
||||
end_r: oneshot::Receiver<()>,
|
||||
) {
|
||||
trace!("Starting up tcp read()");
|
||||
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
|
||||
@ -90,24 +103,28 @@ impl TcpProtocol {
|
||||
.metrics
|
||||
.wire_in_throughput
|
||||
.with_label_values(&[&cid.to_string()]);
|
||||
let mut stream = self.stream.clone();
|
||||
let mut end_receiver = end_receiver.fuse();
|
||||
let stream = self.stream.clone();
|
||||
let mut end_r = end_r.fuse();
|
||||
|
||||
macro_rules! read_or_close {
|
||||
($x:expr) => {
|
||||
if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await {
|
||||
info!("Tcp stream closed, shutting down read");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut bytes = [0u8; 1];
|
||||
let r = select! {
|
||||
r = stream.read_exact(&mut bytes).fuse() => r,
|
||||
_ = end_receiver => break,
|
||||
let frame_no = {
|
||||
let mut bytes = [0u8; 1];
|
||||
read_or_close!(&mut bytes);
|
||||
bytes[0]
|
||||
};
|
||||
if r.is_err() {
|
||||
info!("Tcp stream closed, shutting down read");
|
||||
break;
|
||||
}
|
||||
let frame_no = bytes[0];
|
||||
let frame = match frame_no {
|
||||
FRAME_HANDSHAKE => {
|
||||
let mut bytes = [0u8; 19];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let magic_number = *<&[u8; 7]>::try_from(&bytes[0..7]).unwrap();
|
||||
Frame::Handshake {
|
||||
magic_number,
|
||||
@ -120,16 +137,16 @@ impl TcpProtocol {
|
||||
},
|
||||
FRAME_INIT => {
|
||||
let mut bytes = [0u8; 16];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let pid = Pid::from_le_bytes(bytes);
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let secret = u128::from_le_bytes(bytes);
|
||||
Frame::Init { pid, secret }
|
||||
},
|
||||
FRAME_SHUTDOWN => Frame::Shutdown,
|
||||
FRAME_OPEN_STREAM => {
|
||||
let mut bytes = [0u8; 10];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
let prio = bytes[8];
|
||||
let promises = bytes[9];
|
||||
@ -141,13 +158,13 @@ impl TcpProtocol {
|
||||
},
|
||||
FRAME_CLOSE_STREAM => {
|
||||
let mut bytes = [0u8; 8];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
Frame::CloseStream { sid }
|
||||
},
|
||||
FRAME_DATA_HEADER => {
|
||||
let mut bytes = [0u8; 24];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap());
|
||||
let length = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[16..24]).unwrap());
|
||||
@ -155,28 +172,30 @@ impl TcpProtocol {
|
||||
},
|
||||
FRAME_DATA => {
|
||||
let mut bytes = [0u8; 18];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap());
|
||||
let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&bytes[16..18]).unwrap());
|
||||
let mut data = vec![0; length as usize];
|
||||
throughput_cache.inc_by(length as i64);
|
||||
Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut data);
|
||||
Frame::Data { mid, start, data }
|
||||
},
|
||||
FRAME_RAW => {
|
||||
let mut bytes = [0u8; 2];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut bytes);
|
||||
let length = u16::from_le_bytes([bytes[0], bytes[1]]);
|
||||
let mut data = vec![0; length as usize];
|
||||
Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await;
|
||||
read_or_close!(&mut data);
|
||||
Frame::Raw(data)
|
||||
},
|
||||
_ => {
|
||||
other => {
|
||||
// report a RAW frame, but cannot rely on the next 2 bytes to be a size.
|
||||
// guessing 256 bytes, which might help to sort down issues
|
||||
let mut data = vec![0; 256];
|
||||
Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await;
|
||||
// guessing 32 bytes, which might help to sort down issues
|
||||
let mut data = vec![0; 32];
|
||||
//keep the first byte!
|
||||
read_or_close!(&mut data[1..]);
|
||||
data[0] = other;
|
||||
Frame::Raw(data)
|
||||
},
|
||||
};
|
||||
@ -193,24 +212,21 @@ impl TcpProtocol {
|
||||
async fn write_or_close(
|
||||
stream: &mut TcpStream,
|
||||
bytes: &[u8],
|
||||
to_wire_receiver: &mut mpsc::UnboundedReceiver<Frame>,
|
||||
c2w_frame_r: &mut mpsc::UnboundedReceiver<Frame>,
|
||||
) -> bool {
|
||||
match stream.write_all(&bytes).await {
|
||||
Err(e) => {
|
||||
warn!(
|
||||
debug!(
|
||||
?e,
|
||||
"Got an error writing to tcp, going to close this channel"
|
||||
);
|
||||
to_wire_receiver.close();
|
||||
c2w_frame_r.close();
|
||||
true
|
||||
},
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
//dezerialize here as this is executed in a seperate thread PER channel.
|
||||
// Limites Throughput per single Receiver but stays in same thread (maybe as its
|
||||
// in a threadpool) for TCP, UDP and MPSC
|
||||
pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver<Frame>) {
|
||||
trace!("Starting up tcp write()");
|
||||
let mut stream = self.stream.clone();
|
||||
@ -219,139 +235,70 @@ impl TcpProtocol {
|
||||
.metrics
|
||||
.wire_out_throughput
|
||||
.with_label_values(&[&cid.to_string()]);
|
||||
|
||||
macro_rules! write_or_close {
|
||||
($x:expr) => {
|
||||
if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await {
|
||||
info!("Tcp stream closed, shutting down write");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
while let Some(frame) = c2w_frame_r.next().await {
|
||||
metrics_cache.with_label_values(&frame).inc();
|
||||
if match frame {
|
||||
match frame {
|
||||
Frame::Handshake {
|
||||
magic_number,
|
||||
version,
|
||||
} => {
|
||||
Self::write_or_close(
|
||||
&mut stream,
|
||||
&FRAME_HANDSHAKE.to_be_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &magic_number, &mut c2w_frame_r).await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&version[0].to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&version[1].to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&version[2].to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
write_or_close!(&FRAME_HANDSHAKE.to_be_bytes());
|
||||
write_or_close!(&magic_number);
|
||||
write_or_close!(&version[0].to_le_bytes());
|
||||
write_or_close!(&version[1].to_le_bytes());
|
||||
write_or_close!(&version[2].to_le_bytes());
|
||||
},
|
||||
Frame::Init { pid, secret } => {
|
||||
Self::write_or_close(&mut stream, &FRAME_INIT.to_be_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &pid.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&secret.to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
write_or_close!(&FRAME_INIT.to_be_bytes());
|
||||
write_or_close!(&pid.to_le_bytes());
|
||||
write_or_close!(&secret.to_le_bytes());
|
||||
},
|
||||
Frame::Shutdown => {
|
||||
Self::write_or_close(
|
||||
&mut stream,
|
||||
&FRAME_SHUTDOWN.to_be_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
write_or_close!(&FRAME_SHUTDOWN.to_be_bytes());
|
||||
},
|
||||
Frame::OpenStream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
} => {
|
||||
Self::write_or_close(
|
||||
&mut stream,
|
||||
&FRAME_OPEN_STREAM.to_be_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &sid.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &prio.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&promises.to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
write_or_close!(&FRAME_OPEN_STREAM.to_be_bytes());
|
||||
write_or_close!(&sid.to_le_bytes());
|
||||
write_or_close!(&prio.to_le_bytes());
|
||||
write_or_close!(&promises.to_le_bytes());
|
||||
},
|
||||
Frame::CloseStream { sid } => {
|
||||
Self::write_or_close(
|
||||
&mut stream,
|
||||
&FRAME_CLOSE_STREAM.to_be_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &sid.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
write_or_close!(&FRAME_CLOSE_STREAM.to_be_bytes());
|
||||
write_or_close!(&sid.to_le_bytes());
|
||||
},
|
||||
Frame::DataHeader { mid, sid, length } => {
|
||||
Self::write_or_close(
|
||||
&mut stream,
|
||||
&FRAME_DATA_HEADER.to_be_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &mid.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &sid.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&length.to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
write_or_close!(&FRAME_DATA_HEADER.to_be_bytes());
|
||||
write_or_close!(&mid.to_le_bytes());
|
||||
write_or_close!(&sid.to_le_bytes());
|
||||
write_or_close!(&length.to_le_bytes());
|
||||
},
|
||||
Frame::Data { mid, start, data } => {
|
||||
throughput_cache.inc_by(data.len() as i64);
|
||||
Self::write_or_close(&mut stream, &FRAME_DATA.to_be_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &mid.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &start.to_le_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&(data.len() as u16).to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &data, &mut c2w_frame_r).await
|
||||
write_or_close!(&FRAME_DATA.to_be_bytes());
|
||||
write_or_close!(&mid.to_le_bytes());
|
||||
write_or_close!(&start.to_le_bytes());
|
||||
write_or_close!(&(data.len() as u16).to_le_bytes());
|
||||
write_or_close!(&data);
|
||||
},
|
||||
Frame::Raw(data) => {
|
||||
Self::write_or_close(&mut stream, &FRAME_RAW.to_be_bytes(), &mut c2w_frame_r)
|
||||
.await
|
||||
|| Self::write_or_close(
|
||||
&mut stream,
|
||||
&(data.len() as u16).to_le_bytes(),
|
||||
&mut c2w_frame_r,
|
||||
)
|
||||
.await
|
||||
|| Self::write_or_close(&mut stream, &data, &mut c2w_frame_r).await
|
||||
write_or_close!(&FRAME_RAW.to_be_bytes());
|
||||
write_or_close!(&(data.len() as u16).to_le_bytes());
|
||||
write_or_close!(&data);
|
||||
},
|
||||
} {
|
||||
//failure
|
||||
return;
|
||||
}
|
||||
}
|
||||
trace!("shutting down tcp write()");
|
||||
@ -377,7 +324,7 @@ impl UdpProtocol {
|
||||
&self,
|
||||
cid: Cid,
|
||||
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
|
||||
end_receiver: oneshot::Receiver<()>,
|
||||
end_r: oneshot::Receiver<()>,
|
||||
) {
|
||||
trace!("Starting up udp read()");
|
||||
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
|
||||
@ -386,10 +333,10 @@ impl UdpProtocol {
|
||||
.wire_in_throughput
|
||||
.with_label_values(&[&cid.to_string()]);
|
||||
let mut data_in = self.data_in.lock().await;
|
||||
let mut end_receiver = end_receiver.fuse();
|
||||
let mut end_r = end_r.fuse();
|
||||
while let Some(bytes) = select! {
|
||||
r = data_in.next().fuse() => r,
|
||||
_ = end_receiver => None,
|
||||
_ = end_r => None,
|
||||
} {
|
||||
trace!("Got raw UDP message with len: {}", bytes.len());
|
||||
let frame_no = bytes[0];
|
||||
@ -585,3 +532,108 @@ impl UdpProtocol {
|
||||
trace!("Shutting down udp write()");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
metrics::NetworkMetrics,
|
||||
types::{Cid, Pid},
|
||||
};
|
||||
use async_std::net;
|
||||
use futures::{executor::block_on, stream::StreamExt};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn tcp_read_handshake() {
|
||||
let pid = Pid::new();
|
||||
let cid = 80085;
|
||||
let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap());
|
||||
let addr = std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 50500);
|
||||
block_on(async {
|
||||
let server = net::TcpListener::bind(addr).await.unwrap();
|
||||
let mut client = net::TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
let s_stream = server.incoming().next().await.unwrap().unwrap();
|
||||
let prot = TcpProtocol::new(s_stream, metrics);
|
||||
|
||||
//Send Handshake
|
||||
client.write_all(&[FRAME_HANDSHAKE]).await.unwrap();
|
||||
client.write_all(b"HELLOWO").await.unwrap();
|
||||
client.write_all(&1337u32.to_le_bytes()).await.unwrap();
|
||||
client.write_all(&0u32.to_le_bytes()).await.unwrap();
|
||||
client.write_all(&42u32.to_le_bytes()).await.unwrap();
|
||||
client.flush();
|
||||
|
||||
//handle data
|
||||
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
|
||||
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
|
||||
let cid2 = cid;
|
||||
let t = std::thread::spawn(move || {
|
||||
block_on(async {
|
||||
prot.read_from_wire(cid2, &mut w2c_cid_frame_s, read_stop_receiver)
|
||||
.await;
|
||||
})
|
||||
});
|
||||
// Assert than we get some value back! Its a Handshake!
|
||||
//async_std::task::sleep(std::time::Duration::from_millis(1000));
|
||||
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
|
||||
assert_eq!(cid, cid_r);
|
||||
if let Frame::Handshake {
|
||||
magic_number,
|
||||
version,
|
||||
} = frame
|
||||
{
|
||||
assert_eq!(&magic_number, b"HELLOWO");
|
||||
assert_eq!(version, [1337, 0, 42]);
|
||||
} else {
|
||||
panic!("wrong handshake");
|
||||
}
|
||||
read_stop_sender.send(()).unwrap();
|
||||
t.join().unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tcp_read_garbage() {
|
||||
let pid = Pid::new();
|
||||
let cid = 80085;
|
||||
let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap());
|
||||
let addr = std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 50501);
|
||||
block_on(async {
|
||||
let server = net::TcpListener::bind(addr).await.unwrap();
|
||||
let mut client = net::TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
let s_stream = server.incoming().next().await.unwrap().unwrap();
|
||||
let prot = TcpProtocol::new(s_stream, metrics);
|
||||
|
||||
//Send Handshake
|
||||
client
|
||||
.write_all("x4hrtzsektfhxugzdtz5r78gzrtzfhxfdthfthuzhfzzufasgasdfg".as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
client.flush();
|
||||
|
||||
//handle data
|
||||
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
|
||||
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
|
||||
let cid2 = cid;
|
||||
let t = std::thread::spawn(move || {
|
||||
block_on(async {
|
||||
prot.read_from_wire(cid2, &mut w2c_cid_frame_s, read_stop_receiver)
|
||||
.await;
|
||||
})
|
||||
});
|
||||
// Assert than we get some value back! Its a Raw!
|
||||
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
|
||||
assert_eq!(cid, cid_r);
|
||||
if let Frame::Raw(data) = frame {
|
||||
assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf");
|
||||
} else {
|
||||
panic!("wrong frame type");
|
||||
}
|
||||
read_stop_sender.send(()).unwrap();
|
||||
t.join().unwrap();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -368,8 +368,21 @@ impl Scheduler {
|
||||
next = incoming.next().fuse() => next,
|
||||
_ = end_receiver => None,
|
||||
} {
|
||||
let stream = stream.unwrap();
|
||||
info!("Accepting Tcp from: {}", stream.peer_addr().unwrap());
|
||||
let stream = match stream {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!(?e, "TcpStream Error, ignoring connection attempt");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
let peer_addr = match stream.peer_addr() {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!(?e, "TcpStream Error, ignoring connection attempt");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
info!("Accepting Tcp from: {}", peer_addr);
|
||||
let protocol = TcpProtocol::new(stream, self.metrics.clone());
|
||||
self.init_protocol(Protocols::Tcp(protocol), None, true)
|
||||
.await;
|
||||
@ -583,6 +596,7 @@ impl Scheduler {
|
||||
Err(()) => {
|
||||
if let Some(pid_oneshot) = s2a_return_pid_s {
|
||||
// someone is waiting with `connect`, so give them their Error
|
||||
trace!("returning the Err to api who requested the connect");
|
||||
pid_oneshot
|
||||
.send(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::PermissionDenied,
|
||||
|
@ -343,9 +343,9 @@ impl Server {
|
||||
|
||||
// 3) Handle inputs from clients
|
||||
block_on(async {
|
||||
//TIMEOUT 0.01 ms for msg handling
|
||||
//TIMEOUT 0.1 ms for msg handling
|
||||
select!(
|
||||
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
|
||||
_ = Delay::new(std::time::Duration::from_micros(100)).fuse() => Ok(()),
|
||||
err = self.handle_new_connections(&mut frontend_events).fuse() => err,
|
||||
)
|
||||
})?;
|
||||
@ -597,6 +597,7 @@ impl Server {
|
||||
) -> Result<(), Error> {
|
||||
loop {
|
||||
let participant = self.network.connected().await?;
|
||||
debug!("New Participant connected to the server");
|
||||
let singleton_stream = participant.opened().await?;
|
||||
|
||||
let mut client = Client {
|
||||
@ -635,9 +636,9 @@ impl Server {
|
||||
time_of_day: *self.state.ecs().read_resource(),
|
||||
world_map: (WORLD_SIZE.map(|e| e as u32), self.map.clone()),
|
||||
});
|
||||
debug!("Done initial sync with client.");
|
||||
|
||||
frontend_events.push(Event::ClientConnected { entity });
|
||||
debug!("Done initial sync with client.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -473,9 +473,9 @@ impl<'a> System<'a> for Sys {
|
||||
let mut cnt = 0;
|
||||
|
||||
let network_err: Result<(), crate::error::Error> = block_on(async {
|
||||
//TIMEOUT 0.01 ms for msg handling
|
||||
//TIMEOUT 0.02 ms for msg handling
|
||||
select!(
|
||||
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
|
||||
_ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()),
|
||||
err = Self::handle_client_msg(
|
||||
&mut server_emitter,
|
||||
&mut new_chat_msgs,
|
||||
|
Loading…
Reference in New Issue
Block a user