fixing various smaller network party from issue 657

This commit is contained in:
Marcel Märtens 2020-07-04 02:04:33 +02:00
parent c590bf7b65
commit 1da6f15a43
6 changed files with 84 additions and 288 deletions

View File

@ -112,7 +112,7 @@ impl std::fmt::Debug for MessageBuffer {
if len > 20 {
write!(
f,
"MessageBuffer(len: {}, {}, {}, {}, {:?}..{:?})",
"MessageBuffer(len: {}, {}, {}, {}, {:X?}..{:X?})",
len,
u32::from_le_bytes([self.data[0], self.data[1], self.data[2], self.data[3]]),
u32::from_le_bytes([self.data[4], self.data[5], self.data[6], self.data[7]]),

View File

@ -380,7 +380,7 @@ impl BParticipant {
}
}
},
_ => unreachable!("never reaches frame!"),
f => unreachable!("never reaches frame!: {:?}", f),
}
}
if dropped_cnt > 0 {

View File

@ -21,6 +21,7 @@ use tracing::*;
const PRIO_MAX: usize = 64;
#[derive(Default)]
struct PidSidInfo {
len: u64,
empty_notify: Option<oneshot::Sender<()>>,
@ -148,25 +149,19 @@ impl PrioManager {
for (prio, sid, msg) in self.messages_rx.try_iter() {
debug_assert!(prio as usize <= PRIO_MAX);
messages += 1;
let sid_string = sid.to_string();
self.metrics
.message_out_total
.with_label_values(&[&self.pid, &sid.to_string()])
.with_label_values(&[&self.pid, &sid_string])
.inc();
self.metrics
.message_out_throughput
.with_label_values(&[&self.pid, &sid.to_string()])
.with_label_values(&[&self.pid, &sid_string])
.inc_by(msg.buffer.data.len() as i64);
//trace!(?prio, ?sid, "tick");
//trace!(?prio, ?sid_string, "tick");
self.queued.insert(prio);
self.messages[prio as usize].push_back((sid, msg));
if let Some(cnt) = self.sid_owned.get_mut(&sid) {
cnt.len += 1;
} else {
self.sid_owned.insert(sid, PidSidInfo {
len: 1,
empty_notify: None,
});
}
self.sid_owned.entry(sid).or_default().len += 1;
}
//this must be AFTER messages
for (sid, return_sender) in self.sid_flushed_rx.try_iter() {
@ -200,6 +195,10 @@ impl PrioManager {
}
}
lowest_id
/*
self.queued
.iter()
.min_by_key(|&n| self.points[*n as usize]).cloned()*/
}
/// returns if msg is empty
@ -223,8 +222,7 @@ impl PrioManager {
frames.extend(std::iter::once((msg_sid, Frame::Data {
mid: msg.mid,
start: msg.cursor,
data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize]
.to_vec(),
data: msg.buffer.data[msg.cursor as usize..][..to_send as usize].to_vec(),
})));
};
msg.cursor += to_send;
@ -258,32 +256,28 @@ impl PrioManager {
//pop message from front of VecDeque, handle it and push it back, so that all
// => messages with same prio get a fair chance :)
//TODO: evalaute not poping every time
match self.messages[prio as usize].pop_front() {
Some((sid, mut msg)) => {
if Self::tick_msg(&mut msg, sid, frames) {
//debug!(?m.mid, "finish message");
//check if prio is empty
if self.messages[prio as usize].is_empty() {
self.queued.remove(&prio);
}
//decrease pid_sid counter by 1 again
let cnt = self.sid_owned.get_mut(&sid).expect(
"the pid_sid_owned counter works wrong, more pid,sid removed \
than inserted",
);
cnt.len -= 1;
if cnt.len == 0 {
let cnt = self.sid_owned.remove(&sid).unwrap();
if let Some(empty_notify) = cnt.empty_notify {
empty_notify.send(()).unwrap();
}
}
} else {
trace!(?msg.mid, "repush message");
self.messages[prio as usize].push_front((sid, msg));
let (sid, mut msg) = self.messages[prio as usize].pop_front().unwrap();
if Self::tick_msg(&mut msg, sid, frames) {
//trace!(?m.mid, "finish message");
//check if prio is empty
if self.messages[prio as usize].is_empty() {
self.queued.remove(&prio);
}
//decrease pid_sid counter by 1 again
let cnt = self.sid_owned.get_mut(&sid).expect(
"the pid_sid_owned counter works wrong, more pid,sid removed than \
inserted",
);
cnt.len -= 1;
if cnt.len == 0 {
let cnt = self.sid_owned.remove(&sid).unwrap();
if let Some(empty_notify) = cnt.empty_notify {
empty_notify.send(()).unwrap();
}
},
None => unreachable!("msg not in VecDeque, but queued"),
}
} else {
trace!(?msg.mid, "repush message");
self.messages[prio as usize].push_front((sid, msg));
}
},
None => {

View File

@ -523,72 +523,20 @@ impl UdpProtocol {
} => {
let x = FRAME_HANDSHAKE.to_be_bytes();
buffer[0] = x[0];
buffer[1] = magic_number[0];
buffer[2] = magic_number[1];
buffer[3] = magic_number[2];
buffer[4] = magic_number[3];
buffer[5] = magic_number[4];
buffer[6] = magic_number[5];
buffer[7] = magic_number[6];
let x = version[0].to_le_bytes();
buffer[8] = x[0];
buffer[9] = x[1];
buffer[10] = x[2];
buffer[11] = x[3];
let x = version[1].to_le_bytes();
buffer[12] = x[0];
buffer[13] = x[1];
buffer[14] = x[2];
buffer[15] = x[3];
let x = version[2].to_le_bytes();
buffer[16] = x[0];
buffer[17] = x[1];
buffer[18] = x[2];
buffer[19] = x[3];
buffer[1..8].copy_from_slice(&magic_number);
buffer[8..12].copy_from_slice(&version[0].to_le_bytes());
buffer[12..16].copy_from_slice(&version[1].to_le_bytes());
buffer[16..20].copy_from_slice(&version[2].to_le_bytes());
20
},
Frame::Init { pid, secret } => {
let x = FRAME_INIT.to_be_bytes();
buffer[0] = x[0];
let x = pid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
buffer[9] = x[8];
buffer[10] = x[9];
buffer[11] = x[10];
buffer[12] = x[11];
buffer[13] = x[12];
buffer[14] = x[13];
buffer[15] = x[14];
buffer[16] = x[15];
let x = secret.to_le_bytes();
buffer[17] = x[0];
buffer[18] = x[1];
buffer[19] = x[2];
buffer[20] = x[3];
buffer[21] = x[4];
buffer[22] = x[5];
buffer[23] = x[6];
buffer[24] = x[7];
buffer[25] = x[8];
buffer[26] = x[9];
buffer[27] = x[10];
buffer[28] = x[11];
buffer[29] = x[12];
buffer[30] = x[13];
buffer[31] = x[14];
buffer[32] = x[15];
buffer[0] = FRAME_INIT.to_be_bytes()[0];
buffer[1..17].copy_from_slice(&pid.to_le_bytes());
buffer[17..33].copy_from_slice(&secret.to_le_bytes());
33
},
Frame::Shutdown => {
let x = FRAME_SHUTDOWN.to_be_bytes();
buffer[0] = x[0];
buffer[0] = FRAME_SHUTDOWN.to_be_bytes()[0];
1
},
Frame::OpenStream {
@ -596,103 +544,36 @@ impl UdpProtocol {
prio,
promises,
} => {
let x = FRAME_OPEN_STREAM.to_be_bytes();
buffer[0] = x[0];
let x = sid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
let x = prio.to_le_bytes();
buffer[9] = x[0];
let x = promises.to_le_bytes();
buffer[10] = x[0];
buffer[0] = FRAME_OPEN_STREAM.to_be_bytes()[0];
buffer[1..9].copy_from_slice(&sid.to_le_bytes());
buffer[9] = prio.to_le_bytes()[0];
buffer[10] = promises.to_le_bytes()[0];
11
},
Frame::CloseStream { sid } => {
let x = FRAME_CLOSE_STREAM.to_be_bytes();
buffer[0] = x[0];
let x = sid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
buffer[0] = FRAME_CLOSE_STREAM.to_be_bytes()[0];
buffer[1..9].copy_from_slice(&sid.to_le_bytes());
9
},
Frame::DataHeader { mid, sid, length } => {
let x = FRAME_DATA_HEADER.to_be_bytes();
buffer[0] = x[0];
let x = mid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
let x = sid.to_le_bytes();
buffer[9] = x[0];
buffer[10] = x[1];
buffer[11] = x[2];
buffer[12] = x[3];
buffer[13] = x[4];
buffer[14] = x[5];
buffer[15] = x[6];
buffer[16] = x[7];
let x = length.to_le_bytes();
buffer[17] = x[0];
buffer[18] = x[1];
buffer[19] = x[2];
buffer[20] = x[3];
buffer[21] = x[4];
buffer[22] = x[5];
buffer[23] = x[6];
buffer[24] = x[7];
buffer[0] = FRAME_DATA_HEADER.to_be_bytes()[0];
buffer[1..9].copy_from_slice(&mid.to_le_bytes());
buffer[9..17].copy_from_slice(&sid.to_le_bytes());
buffer[17..25].copy_from_slice(&length.to_le_bytes());
25
},
Frame::Data { mid, start, data } => {
let x = FRAME_DATA.to_be_bytes();
buffer[0] = x[0];
let x = mid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
let x = start.to_le_bytes();
buffer[9] = x[0];
buffer[10] = x[1];
buffer[11] = x[2];
buffer[12] = x[3];
buffer[13] = x[4];
buffer[14] = x[5];
buffer[15] = x[6];
buffer[16] = x[7];
let x = (data.len() as u16).to_le_bytes();
buffer[17] = x[0];
buffer[18] = x[1];
buffer[0] = FRAME_DATA.to_be_bytes()[0];
buffer[1..9].copy_from_slice(&mid.to_le_bytes());
buffer[9..17].copy_from_slice(&start.to_le_bytes());
buffer[17..19].copy_from_slice(&(data.len() as u16).to_le_bytes());
buffer[19..(data.len() + 19)].clone_from_slice(&data[..]);
throughput_cache.inc_by(data.len() as i64);
19 + data.len()
},
Frame::Raw(data) => {
let x = FRAME_RAW.to_be_bytes();
buffer[0] = x[0];
let x = (data.len() as u16).to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[0] = FRAME_RAW.to_be_bytes()[0];
buffer[1..3].copy_from_slice(&(data.len() as u16).to_le_bytes());
buffer[3..(data.len() + 3)].clone_from_slice(&data[..]);
3 + data.len()
},

View File

@ -260,7 +260,7 @@ impl Scheduler {
)>,
) {
trace!("start disconnect_mgr");
while let Some((pid, return_once_successfull_shutdown)) = a2s_disconnect_r.next().await {
while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await {
//Closing Participants is done the following way:
// 1. We drop our senders and receivers
// 2. we need to close BParticipant, this will drop its senderns and receivers
@ -276,10 +276,10 @@ impl Scheduler {
.unwrap();
drop(pi);
let e = finished_receiver.await.unwrap();
return_once_successfull_shutdown.send(e).unwrap();
return_once_successful_shutdown.send(e).unwrap();
} else {
debug!(?pid, "looks like participant is already dropped");
return_once_successfull_shutdown.send(Ok(())).unwrap();
return_once_successful_shutdown.send(Ok(())).unwrap();
}
trace!(?pid, "closed participant");
}
@ -304,17 +304,19 @@ impl Scheduler {
self.closed.store(true, Ordering::Relaxed);
debug!("shutting down all BParticipants gracefully");
let mut participants = self.participants.write().await;
let mut waitings = vec![];
for (pid, mut pi) in participants.drain() {
trace!(?pid, "shutting down BParticipants");
let (finished_sender, finished_receiver) = oneshot::channel();
waitings.push((pid, finished_receiver));
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send(finished_sender)
.unwrap();
}
let waitings = participants
.drain()
.map(|(pid, mut pi)| {
trace!(?pid, "shutting down BParticipants");
let (finished_sender, finished_receiver) = oneshot::channel();
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send(finished_sender)
.unwrap();
(pid, finished_receiver)
})
.collect::<Vec<_>>();
debug!("wait for partiticipants to be shut down");
for (pid, recv) in waitings {
if let Err(e) = recv.await {
@ -392,7 +394,8 @@ impl Scheduler {
// have any state
let mut listeners = HashMap::new();
let mut end_receiver = s2s_stop_listening_r.fuse();
let mut data = [0u8; 9216];
const UDP_MAXIMUM_SINGLE_PACKET_SIZE_EVER: usize = 9216;
let mut data = [0u8; UDP_MAXIMUM_SINGLE_PACKET_SIZE_EVER];
while let Ok((size, remote_addr)) = select! {
next = socket.recv_from(&mut data).fuse() => next,
_ = end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")),

View File

@ -108,28 +108,13 @@ impl Frame {
pub fn get_int(&self) -> u8 {
match self {
Frame::Handshake {
magic_number: _,
version: _,
} => 0,
Frame::Init { pid: _, secret: _ } => 1,
Frame::Handshake { .. } => 0,
Frame::Init { .. } => 1,
Frame::Shutdown => 2,
Frame::OpenStream {
sid: _,
prio: _,
promises: _,
} => 3,
Frame::CloseStream { sid: _ } => 4,
Frame::DataHeader {
mid: _,
sid: _,
length: _,
} => 5,
Frame::Data {
mid: _,
start: _,
data: _,
} => 6,
Frame::OpenStream { .. } => 3,
Frame::CloseStream { .. } => 4,
Frame::DataHeader { .. } => 5,
Frame::Data { .. } => 6,
Frame::Raw(_) => 7,
}
}
@ -238,75 +223,8 @@ impl std::fmt::Display for Sid {
}
}
#[inline]
fn sixlet_to_str(sixlet: u128) -> char {
match sixlet {
0 => 'A',
1 => 'B',
2 => 'C',
3 => 'D',
4 => 'E',
5 => 'F',
6 => 'G',
7 => 'H',
8 => 'I',
9 => 'J',
10 => 'K',
11 => 'L',
12 => 'M',
13 => 'N',
14 => 'O',
15 => 'P',
16 => 'Q',
17 => 'R',
18 => 'S',
19 => 'T',
20 => 'U',
21 => 'V',
22 => 'W',
23 => 'X',
24 => 'Y',
25 => 'Z',
26 => 'a',
27 => 'b',
28 => 'c',
29 => 'd',
30 => 'e',
31 => 'f',
32 => 'g',
33 => 'h',
34 => 'i',
35 => 'j',
36 => 'k',
37 => 'l',
38 => 'm',
39 => 'n',
40 => 'o',
41 => 'p',
42 => 'q',
43 => 'r',
44 => 's',
45 => 't',
46 => 'u',
47 => 'v',
48 => 'w',
49 => 'x',
50 => 'y',
51 => 'z',
52 => '0',
53 => '1',
54 => '2',
55 => '3',
56 => '4',
57 => '5',
58 => '6',
59 => '7',
60 => '8',
61 => '9',
62 => '+',
63 => '/',
_ => '-',
}
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[sixlet as usize] as char
}
#[cfg(test)]
@ -335,7 +253,7 @@ mod tests {
#[test]
fn test_sixlet_to_str() {
assert_eq!(sixlet_to_str(0), 'A');
assert_eq!(sixlet_to_str(29), 'd');
assert_eq!(sixlet_to_str(63), '/');
assert_eq!(sixlet_to_str(64), '-');
}
}