diff --git a/network/src/participant.rs b/network/src/participant.rs index 0dec87fd70..1221dec4b5 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -63,6 +63,7 @@ pub struct BParticipant { running_mgr: AtomicUsize, run_channels: Option, metrics: Arc, + no_channel_error_info: RwLock<(Instant, u64)>, } impl BParticipant { @@ -104,6 +105,7 @@ impl BParticipant { running_mgr: AtomicUsize::new(0), run_channels, metrics, + no_channel_error_info: RwLock::new((Instant::now(), 0)), }, a2b_steam_open_s, b2a_stream_opened_r, @@ -251,7 +253,16 @@ impl BParticipant { true } } else { - error!("participant has no channel to communicate on"); + let mut guard = self.no_channel_error_info.write().await; + let now = Instant::now(); + if now.duration_since(guard.0) > Duration::from_secs(1) { + guard.0 = now; + let occurrences = guard.1 + 1; + guard.1 = 0; + error!(?occurrences, "participant has no channel to communicate on"); + } else { + guard.1 += 1; + } false } } diff --git a/network/src/prios.rs b/network/src/prios.rs index dac46270ee..aee7ffd2cc 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -281,8 +281,8 @@ impl PrioManager { } } } else { - error!(?msg.mid, "repush message"); - self.messages[prio as usize].push_back((sid, msg)); + trace!(?msg.mid, "repush message"); + self.messages[prio as usize].push_front((sid, msg)); } }, None => unreachable!("msg not in VecDeque, but queued"), @@ -516,11 +516,11 @@ mod tests { assert_header(&mut frames, 1, SIZE * 2 + 20); assert_data(&mut frames, 0, vec![48; USIZE]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); + assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert_header(&mut frames, 2, SIZE * 2 + 20); assert_data(&mut frames, 0, vec![48; USIZE]); assert_data(&mut frames, SIZE, vec![49; USIZE]); - assert_data(&mut frames, SIZE, vec![49; USIZE]); - assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert!(frames.is_empty()); } @@ -531,12 +531,10 @@ mod tests { msg_tx.send(mock_out_large(16, 1)).unwrap(); msg_tx.send(mock_out_large(16, 2)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(3, &mut frames)); + block_on(mgr.fill_frames(2, &mut frames)); assert_header(&mut frames, 1, SIZE * 2 + 20); assert_data(&mut frames, 0, vec![48; USIZE]); - assert_header(&mut frames, 2, SIZE * 2 + 20); - assert_data(&mut frames, 0, vec![48; USIZE]); assert_data(&mut frames, SIZE, vec![49; USIZE]); msg_tx.send(mock_out(0, 3)).unwrap(); @@ -545,8 +543,10 @@ mod tests { assert_header(&mut frames, 3, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); - assert_data(&mut frames, SIZE, vec![49; USIZE]); assert_data(&mut frames, SIZE * 2, vec![50; 20]); + assert_header(&mut frames, 2, SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![48; USIZE]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert!(frames.is_empty()); } @@ -600,4 +600,97 @@ mod tests { assert_header(&mut frames, 2, 3); //unimportant } + + #[test] + fn gigantic_message() { + let (mut mgr, msg_tx, _flush_tx) = mock_new(); + let mut data = vec![1; USIZE]; + data.extend_from_slice(&vec![2; USIZE]); + data.extend_from_slice(&vec![3; USIZE]); + data.extend_from_slice(&vec![4; USIZE]); + data.extend_from_slice(&vec![5; USIZE]); + let sid = Sid::new(2); + msg_tx + .send((16, sid, OutgoingMessage { + buffer: Arc::new(MessageBuffer { data }), + cursor: 0, + mid: 1, + sid, + })) + .unwrap(); + + let mut frames = VecDeque::new(); + block_on(mgr.fill_frames(2000, &mut frames)); + + assert_header(&mut frames, 2, 7000); + assert_data(&mut frames, 0, vec![1; USIZE]); + assert_data(&mut frames, 1400, vec![2; USIZE]); + assert_data(&mut frames, 2800, vec![3; USIZE]); + assert_data(&mut frames, 4200, vec![4; USIZE]); + assert_data(&mut frames, 5600, vec![5; USIZE]); + } + + #[test] + fn gigantic_message_order() { + let (mut mgr, msg_tx, _flush_tx) = mock_new(); + let mut data = vec![1; USIZE]; + data.extend_from_slice(&vec![2; USIZE]); + data.extend_from_slice(&vec![3; USIZE]); + data.extend_from_slice(&vec![4; USIZE]); + data.extend_from_slice(&vec![5; USIZE]); + let sid = Sid::new(2); + msg_tx + .send((16, sid, OutgoingMessage { + buffer: Arc::new(MessageBuffer { data }), + cursor: 0, + mid: 1, + sid, + })) + .unwrap(); + msg_tx.send(mock_out(16, 8)).unwrap(); + + let mut frames = VecDeque::new(); + block_on(mgr.fill_frames(2000, &mut frames)); + + assert_header(&mut frames, 2, 7000); + assert_data(&mut frames, 0, vec![1; USIZE]); + assert_data(&mut frames, 1400, vec![2; USIZE]); + assert_data(&mut frames, 2800, vec![3; USIZE]); + assert_data(&mut frames, 4200, vec![4; USIZE]); + assert_data(&mut frames, 5600, vec![5; USIZE]); + assert_header(&mut frames, 8, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + } + + #[test] + fn gigantic_message_order_other_prio() { + let (mut mgr, msg_tx, _flush_tx) = mock_new(); + let mut data = vec![1; USIZE]; + data.extend_from_slice(&vec![2; USIZE]); + data.extend_from_slice(&vec![3; USIZE]); + data.extend_from_slice(&vec![4; USIZE]); + data.extend_from_slice(&vec![5; USIZE]); + let sid = Sid::new(2); + msg_tx + .send((16, sid, OutgoingMessage { + buffer: Arc::new(MessageBuffer { data }), + cursor: 0, + mid: 1, + sid, + })) + .unwrap(); + msg_tx.send(mock_out(20, 8)).unwrap(); + + let mut frames = VecDeque::new(); + block_on(mgr.fill_frames(2000, &mut frames)); + + assert_header(&mut frames, 2, 7000); + assert_data(&mut frames, 0, vec![1; USIZE]); + assert_header(&mut frames, 8, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_data(&mut frames, 1400, vec![2; USIZE]); + assert_data(&mut frames, 2800, vec![3; USIZE]); + assert_data(&mut frames, 4200, vec![4; USIZE]); + assert_data(&mut frames, 5600, vec![5; USIZE]); + } }