diff --git a/network/src/message.rs b/network/src/message.rs index fbfb9f3ae0..d1e86f01ec 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -112,7 +112,7 @@ impl std::fmt::Debug for MessageBuffer { if len > 20 { write!( f, - "MessageBuffer(len: {}, {}, {}, {}, {:?}..{:?})", + "MessageBuffer(len: {}, {}, {}, {}, {:X?}..{:X?})", len, u32::from_le_bytes([self.data[0], self.data[1], self.data[2], self.data[3]]), u32::from_le_bytes([self.data[4], self.data[5], self.data[6], self.data[7]]), diff --git a/network/src/participant.rs b/network/src/participant.rs index 802fbb7759..68a9832354 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -380,7 +380,7 @@ impl BParticipant { } } }, - _ => unreachable!("never reaches frame!"), + f => unreachable!("never reaches frame!: {:?}", f), } } if dropped_cnt > 0 { diff --git a/network/src/prios.rs b/network/src/prios.rs index 2d9dc9a834..2ab4f57f1f 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -21,6 +21,7 @@ use tracing::*; const PRIO_MAX: usize = 64; +#[derive(Default)] struct PidSidInfo { len: u64, empty_notify: Option>, @@ -148,25 +149,19 @@ impl PrioManager { for (prio, sid, msg) in self.messages_rx.try_iter() { debug_assert!(prio as usize <= PRIO_MAX); messages += 1; + let sid_string = sid.to_string(); self.metrics .message_out_total - .with_label_values(&[&self.pid, &sid.to_string()]) + .with_label_values(&[&self.pid, &sid_string]) .inc(); self.metrics .message_out_throughput - .with_label_values(&[&self.pid, &sid.to_string()]) + .with_label_values(&[&self.pid, &sid_string]) .inc_by(msg.buffer.data.len() as i64); - //trace!(?prio, ?sid, "tick"); + //trace!(?prio, ?sid_string, "tick"); self.queued.insert(prio); self.messages[prio as usize].push_back((sid, msg)); - if let Some(cnt) = self.sid_owned.get_mut(&sid) { - cnt.len += 1; - } else { - self.sid_owned.insert(sid, PidSidInfo { - len: 1, - empty_notify: None, - }); - } + self.sid_owned.entry(sid).or_default().len += 1; } //this must be AFTER messages for (sid, return_sender) in self.sid_flushed_rx.try_iter() { @@ -200,6 +195,10 @@ impl PrioManager { } } lowest_id + /* + self.queued + .iter() + .min_by_key(|&n| self.points[*n as usize]).cloned()*/ } /// returns if msg is empty @@ -223,8 +222,7 @@ impl PrioManager { frames.extend(std::iter::once((msg_sid, Frame::Data { mid: msg.mid, start: msg.cursor, - data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize] - .to_vec(), + data: msg.buffer.data[msg.cursor as usize..][..to_send as usize].to_vec(), }))); }; msg.cursor += to_send; @@ -258,32 +256,28 @@ impl PrioManager { //pop message from front of VecDeque, handle it and push it back, so that all // => messages with same prio get a fair chance :) //TODO: evalaute not poping every time - match self.messages[prio as usize].pop_front() { - Some((sid, mut msg)) => { - if Self::tick_msg(&mut msg, sid, frames) { - //debug!(?m.mid, "finish message"); - //check if prio is empty - if self.messages[prio as usize].is_empty() { - self.queued.remove(&prio); - } - //decrease pid_sid counter by 1 again - let cnt = self.sid_owned.get_mut(&sid).expect( - "the pid_sid_owned counter works wrong, more pid,sid removed \ - than inserted", - ); - cnt.len -= 1; - if cnt.len == 0 { - let cnt = self.sid_owned.remove(&sid).unwrap(); - if let Some(empty_notify) = cnt.empty_notify { - empty_notify.send(()).unwrap(); - } - } - } else { - trace!(?msg.mid, "repush message"); - self.messages[prio as usize].push_front((sid, msg)); + let (sid, mut msg) = self.messages[prio as usize].pop_front().unwrap(); + if Self::tick_msg(&mut msg, sid, frames) { + //trace!(?m.mid, "finish message"); + //check if prio is empty + if self.messages[prio as usize].is_empty() { + self.queued.remove(&prio); + } + //decrease pid_sid counter by 1 again + let cnt = self.sid_owned.get_mut(&sid).expect( + "the pid_sid_owned counter works wrong, more pid,sid removed than \ + inserted", + ); + cnt.len -= 1; + if cnt.len == 0 { + let cnt = self.sid_owned.remove(&sid).unwrap(); + if let Some(empty_notify) = cnt.empty_notify { + empty_notify.send(()).unwrap(); } - }, - None => unreachable!("msg not in VecDeque, but queued"), + } + } else { + trace!(?msg.mid, "repush message"); + self.messages[prio as usize].push_front((sid, msg)); } }, None => { diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 9cd0db19cb..1fe32dd947 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -523,72 +523,20 @@ impl UdpProtocol { } => { let x = FRAME_HANDSHAKE.to_be_bytes(); buffer[0] = x[0]; - buffer[1] = magic_number[0]; - buffer[2] = magic_number[1]; - buffer[3] = magic_number[2]; - buffer[4] = magic_number[3]; - buffer[5] = magic_number[4]; - buffer[6] = magic_number[5]; - buffer[7] = magic_number[6]; - let x = version[0].to_le_bytes(); - buffer[8] = x[0]; - buffer[9] = x[1]; - buffer[10] = x[2]; - buffer[11] = x[3]; - let x = version[1].to_le_bytes(); - buffer[12] = x[0]; - buffer[13] = x[1]; - buffer[14] = x[2]; - buffer[15] = x[3]; - let x = version[2].to_le_bytes(); - buffer[16] = x[0]; - buffer[17] = x[1]; - buffer[18] = x[2]; - buffer[19] = x[3]; + buffer[1..8].copy_from_slice(&magic_number); + buffer[8..12].copy_from_slice(&version[0].to_le_bytes()); + buffer[12..16].copy_from_slice(&version[1].to_le_bytes()); + buffer[16..20].copy_from_slice(&version[2].to_le_bytes()); 20 }, Frame::Init { pid, secret } => { - let x = FRAME_INIT.to_be_bytes(); - buffer[0] = x[0]; - let x = pid.to_le_bytes(); - buffer[1] = x[0]; - buffer[2] = x[1]; - buffer[3] = x[2]; - buffer[4] = x[3]; - buffer[5] = x[4]; - buffer[6] = x[5]; - buffer[7] = x[6]; - buffer[8] = x[7]; - buffer[9] = x[8]; - buffer[10] = x[9]; - buffer[11] = x[10]; - buffer[12] = x[11]; - buffer[13] = x[12]; - buffer[14] = x[13]; - buffer[15] = x[14]; - buffer[16] = x[15]; - let x = secret.to_le_bytes(); - buffer[17] = x[0]; - buffer[18] = x[1]; - buffer[19] = x[2]; - buffer[20] = x[3]; - buffer[21] = x[4]; - buffer[22] = x[5]; - buffer[23] = x[6]; - buffer[24] = x[7]; - buffer[25] = x[8]; - buffer[26] = x[9]; - buffer[27] = x[10]; - buffer[28] = x[11]; - buffer[29] = x[12]; - buffer[30] = x[13]; - buffer[31] = x[14]; - buffer[32] = x[15]; + buffer[0] = FRAME_INIT.to_be_bytes()[0]; + buffer[1..17].copy_from_slice(&pid.to_le_bytes()); + buffer[17..33].copy_from_slice(&secret.to_le_bytes()); 33 }, Frame::Shutdown => { - let x = FRAME_SHUTDOWN.to_be_bytes(); - buffer[0] = x[0]; + buffer[0] = FRAME_SHUTDOWN.to_be_bytes()[0]; 1 }, Frame::OpenStream { @@ -596,103 +544,36 @@ impl UdpProtocol { prio, promises, } => { - let x = FRAME_OPEN_STREAM.to_be_bytes(); - buffer[0] = x[0]; - let x = sid.to_le_bytes(); - buffer[1] = x[0]; - buffer[2] = x[1]; - buffer[3] = x[2]; - buffer[4] = x[3]; - buffer[5] = x[4]; - buffer[6] = x[5]; - buffer[7] = x[6]; - buffer[8] = x[7]; - let x = prio.to_le_bytes(); - buffer[9] = x[0]; - let x = promises.to_le_bytes(); - buffer[10] = x[0]; + buffer[0] = FRAME_OPEN_STREAM.to_be_bytes()[0]; + buffer[1..9].copy_from_slice(&sid.to_le_bytes()); + buffer[9] = prio.to_le_bytes()[0]; + buffer[10] = promises.to_le_bytes()[0]; 11 }, Frame::CloseStream { sid } => { - let x = FRAME_CLOSE_STREAM.to_be_bytes(); - buffer[0] = x[0]; - let x = sid.to_le_bytes(); - buffer[1] = x[0]; - buffer[2] = x[1]; - buffer[3] = x[2]; - buffer[4] = x[3]; - buffer[5] = x[4]; - buffer[6] = x[5]; - buffer[7] = x[6]; - buffer[8] = x[7]; + buffer[0] = FRAME_CLOSE_STREAM.to_be_bytes()[0]; + buffer[1..9].copy_from_slice(&sid.to_le_bytes()); 9 }, Frame::DataHeader { mid, sid, length } => { - let x = FRAME_DATA_HEADER.to_be_bytes(); - buffer[0] = x[0]; - let x = mid.to_le_bytes(); - buffer[1] = x[0]; - buffer[2] = x[1]; - buffer[3] = x[2]; - buffer[4] = x[3]; - buffer[5] = x[4]; - buffer[6] = x[5]; - buffer[7] = x[6]; - buffer[8] = x[7]; - let x = sid.to_le_bytes(); - buffer[9] = x[0]; - buffer[10] = x[1]; - buffer[11] = x[2]; - buffer[12] = x[3]; - buffer[13] = x[4]; - buffer[14] = x[5]; - buffer[15] = x[6]; - buffer[16] = x[7]; - let x = length.to_le_bytes(); - buffer[17] = x[0]; - buffer[18] = x[1]; - buffer[19] = x[2]; - buffer[20] = x[3]; - buffer[21] = x[4]; - buffer[22] = x[5]; - buffer[23] = x[6]; - buffer[24] = x[7]; + buffer[0] = FRAME_DATA_HEADER.to_be_bytes()[0]; + buffer[1..9].copy_from_slice(&mid.to_le_bytes()); + buffer[9..17].copy_from_slice(&sid.to_le_bytes()); + buffer[17..25].copy_from_slice(&length.to_le_bytes()); 25 }, Frame::Data { mid, start, data } => { - let x = FRAME_DATA.to_be_bytes(); - buffer[0] = x[0]; - let x = mid.to_le_bytes(); - buffer[1] = x[0]; - buffer[2] = x[1]; - buffer[3] = x[2]; - buffer[4] = x[3]; - buffer[5] = x[4]; - buffer[6] = x[5]; - buffer[7] = x[6]; - buffer[8] = x[7]; - let x = start.to_le_bytes(); - buffer[9] = x[0]; - buffer[10] = x[1]; - buffer[11] = x[2]; - buffer[12] = x[3]; - buffer[13] = x[4]; - buffer[14] = x[5]; - buffer[15] = x[6]; - buffer[16] = x[7]; - let x = (data.len() as u16).to_le_bytes(); - buffer[17] = x[0]; - buffer[18] = x[1]; + buffer[0] = FRAME_DATA.to_be_bytes()[0]; + buffer[1..9].copy_from_slice(&mid.to_le_bytes()); + buffer[9..17].copy_from_slice(&start.to_le_bytes()); + buffer[17..19].copy_from_slice(&(data.len() as u16).to_le_bytes()); buffer[19..(data.len() + 19)].clone_from_slice(&data[..]); throughput_cache.inc_by(data.len() as i64); 19 + data.len() }, Frame::Raw(data) => { - let x = FRAME_RAW.to_be_bytes(); - buffer[0] = x[0]; - let x = (data.len() as u16).to_le_bytes(); - buffer[1] = x[0]; - buffer[2] = x[1]; + buffer[0] = FRAME_RAW.to_be_bytes()[0]; + buffer[1..3].copy_from_slice(&(data.len() as u16).to_le_bytes()); buffer[3..(data.len() + 3)].clone_from_slice(&data[..]); 3 + data.len() }, diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 7179a8491d..31cd0bd7ba 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -260,7 +260,7 @@ impl Scheduler { )>, ) { trace!("start disconnect_mgr"); - while let Some((pid, return_once_successfull_shutdown)) = a2s_disconnect_r.next().await { + while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await { //Closing Participants is done the following way: // 1. We drop our senders and receivers // 2. we need to close BParticipant, this will drop its senderns and receivers @@ -276,10 +276,10 @@ impl Scheduler { .unwrap(); drop(pi); let e = finished_receiver.await.unwrap(); - return_once_successfull_shutdown.send(e).unwrap(); + return_once_successful_shutdown.send(e).unwrap(); } else { debug!(?pid, "looks like participant is already dropped"); - return_once_successfull_shutdown.send(Ok(())).unwrap(); + return_once_successful_shutdown.send(Ok(())).unwrap(); } trace!(?pid, "closed participant"); } @@ -304,17 +304,19 @@ impl Scheduler { self.closed.store(true, Ordering::Relaxed); debug!("shutting down all BParticipants gracefully"); let mut participants = self.participants.write().await; - let mut waitings = vec![]; - for (pid, mut pi) in participants.drain() { - trace!(?pid, "shutting down BParticipants"); - let (finished_sender, finished_receiver) = oneshot::channel(); - waitings.push((pid, finished_receiver)); - pi.s2b_shutdown_bparticipant_s - .take() - .unwrap() - .send(finished_sender) - .unwrap(); - } + let waitings = participants + .drain() + .map(|(pid, mut pi)| { + trace!(?pid, "shutting down BParticipants"); + let (finished_sender, finished_receiver) = oneshot::channel(); + pi.s2b_shutdown_bparticipant_s + .take() + .unwrap() + .send(finished_sender) + .unwrap(); + (pid, finished_receiver) + }) + .collect::>(); debug!("wait for partiticipants to be shut down"); for (pid, recv) in waitings { if let Err(e) = recv.await { @@ -392,7 +394,8 @@ impl Scheduler { // have any state let mut listeners = HashMap::new(); let mut end_receiver = s2s_stop_listening_r.fuse(); - let mut data = [0u8; 9216]; + const UDP_MAXIMUM_SINGLE_PACKET_SIZE_EVER: usize = 9216; + let mut data = [0u8; UDP_MAXIMUM_SINGLE_PACKET_SIZE_EVER]; while let Ok((size, remote_addr)) = select! { next = socket.recv_from(&mut data).fuse() => next, _ = end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")), diff --git a/network/src/types.rs b/network/src/types.rs index 288a954293..aeb409ac0c 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -108,28 +108,13 @@ impl Frame { pub fn get_int(&self) -> u8 { match self { - Frame::Handshake { - magic_number: _, - version: _, - } => 0, - Frame::Init { pid: _, secret: _ } => 1, + Frame::Handshake { .. } => 0, + Frame::Init { .. } => 1, Frame::Shutdown => 2, - Frame::OpenStream { - sid: _, - prio: _, - promises: _, - } => 3, - Frame::CloseStream { sid: _ } => 4, - Frame::DataHeader { - mid: _, - sid: _, - length: _, - } => 5, - Frame::Data { - mid: _, - start: _, - data: _, - } => 6, + Frame::OpenStream { .. } => 3, + Frame::CloseStream { .. } => 4, + Frame::DataHeader { .. } => 5, + Frame::Data { .. } => 6, Frame::Raw(_) => 7, } } @@ -238,75 +223,8 @@ impl std::fmt::Display for Sid { } } -#[inline] fn sixlet_to_str(sixlet: u128) -> char { - match sixlet { - 0 => 'A', - 1 => 'B', - 2 => 'C', - 3 => 'D', - 4 => 'E', - 5 => 'F', - 6 => 'G', - 7 => 'H', - 8 => 'I', - 9 => 'J', - 10 => 'K', - 11 => 'L', - 12 => 'M', - 13 => 'N', - 14 => 'O', - 15 => 'P', - 16 => 'Q', - 17 => 'R', - 18 => 'S', - 19 => 'T', - 20 => 'U', - 21 => 'V', - 22 => 'W', - 23 => 'X', - 24 => 'Y', - 25 => 'Z', - 26 => 'a', - 27 => 'b', - 28 => 'c', - 29 => 'd', - 30 => 'e', - 31 => 'f', - 32 => 'g', - 33 => 'h', - 34 => 'i', - 35 => 'j', - 36 => 'k', - 37 => 'l', - 38 => 'm', - 39 => 'n', - 40 => 'o', - 41 => 'p', - 42 => 'q', - 43 => 'r', - 44 => 's', - 45 => 't', - 46 => 'u', - 47 => 'v', - 48 => 'w', - 49 => 'x', - 50 => 'y', - 51 => 'z', - 52 => '0', - 53 => '1', - 54 => '2', - 55 => '3', - 56 => '4', - 57 => '5', - 58 => '6', - 59 => '7', - 60 => '8', - 61 => '9', - 62 => '+', - 63 => '/', - _ => '-', - } + b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[sixlet as usize] as char } #[cfg(test)] @@ -335,7 +253,7 @@ mod tests { #[test] fn test_sixlet_to_str() { assert_eq!(sixlet_to_str(0), 'A'); + assert_eq!(sixlet_to_str(29), 'd'); assert_eq!(sixlet_to_str(63), '/'); - assert_eq!(sixlet_to_str(64), '-'); } }