- switched participant to only error!() once a sec instead of every occurence in order to not spam error

- switched the behavior of prio to keep the order of a stream, EVEN if messages of different length are used!
  This needs to be addresses to fulfill PROMISES_ORDERED
  I think we need to make thoughs if PRIO needs to handle this, or we add additional meta info and handle it on receiver site
This commit is contained in:
Marcel Märtens 2020-06-26 15:30:56 +02:00
parent a3eafe9b70
commit 57453291e4
2 changed files with 113 additions and 9 deletions

View File

@ -63,6 +63,7 @@ pub struct BParticipant {
running_mgr: AtomicUsize,
run_channels: Option<ControlChannels>,
metrics: Arc<NetworkMetrics>,
no_channel_error_info: RwLock<(Instant, u64)>,
}
impl BParticipant {
@ -104,6 +105,7 @@ impl BParticipant {
running_mgr: AtomicUsize::new(0),
run_channels,
metrics,
no_channel_error_info: RwLock::new((Instant::now(), 0)),
},
a2b_steam_open_s,
b2a_stream_opened_r,
@ -251,7 +253,16 @@ impl BParticipant {
true
}
} else {
error!("participant has no channel to communicate on");
let mut guard = self.no_channel_error_info.write().await;
let now = Instant::now();
if now.duration_since(guard.0) > Duration::from_secs(1) {
guard.0 = now;
let occurrences = guard.1 + 1;
guard.1 = 0;
error!(?occurrences, "participant has no channel to communicate on");
} else {
guard.1 += 1;
}
false
}
}

View File

@ -281,8 +281,8 @@ impl PrioManager {
}
}
} else {
error!(?msg.mid, "repush message");
self.messages[prio as usize].push_back((sid, msg));
trace!(?msg.mid, "repush message");
self.messages[prio as usize].push_front((sid, msg));
}
},
None => unreachable!("msg not in VecDeque, but queued"),
@ -516,11 +516,11 @@ mod tests {
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert_header(&mut frames, 2, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert!(frames.is_empty());
}
@ -531,12 +531,10 @@ mod tests {
msg_tx.send(mock_out_large(16, 1)).unwrap();
msg_tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(3, &mut frames));
block_on(mgr.fill_frames(2, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_header(&mut frames, 2, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
msg_tx.send(mock_out(0, 3)).unwrap();
@ -545,8 +543,10 @@ mod tests {
assert_header(&mut frames, 3, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert_header(&mut frames, 2, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert!(frames.is_empty());
}
@ -600,4 +600,97 @@ mod tests {
assert_header(&mut frames, 2, 3);
//unimportant
}
#[test]
fn gigantic_message() {
let (mut mgr, msg_tx, _flush_tx) = mock_new();
let mut data = vec![1; USIZE];
data.extend_from_slice(&vec![2; USIZE]);
data.extend_from_slice(&vec![3; USIZE]);
data.extend_from_slice(&vec![4; USIZE]);
data.extend_from_slice(&vec![5; USIZE]);
let sid = Sid::new(2);
msg_tx
.send((16, sid, OutgoingMessage {
buffer: Arc::new(MessageBuffer { data }),
cursor: 0,
mid: 1,
sid,
}))
.unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 7000);
assert_data(&mut frames, 0, vec![1; USIZE]);
assert_data(&mut frames, 1400, vec![2; USIZE]);
assert_data(&mut frames, 2800, vec![3; USIZE]);
assert_data(&mut frames, 4200, vec![4; USIZE]);
assert_data(&mut frames, 5600, vec![5; USIZE]);
}
#[test]
fn gigantic_message_order() {
let (mut mgr, msg_tx, _flush_tx) = mock_new();
let mut data = vec![1; USIZE];
data.extend_from_slice(&vec![2; USIZE]);
data.extend_from_slice(&vec![3; USIZE]);
data.extend_from_slice(&vec![4; USIZE]);
data.extend_from_slice(&vec![5; USIZE]);
let sid = Sid::new(2);
msg_tx
.send((16, sid, OutgoingMessage {
buffer: Arc::new(MessageBuffer { data }),
cursor: 0,
mid: 1,
sid,
}))
.unwrap();
msg_tx.send(mock_out(16, 8)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 7000);
assert_data(&mut frames, 0, vec![1; USIZE]);
assert_data(&mut frames, 1400, vec![2; USIZE]);
assert_data(&mut frames, 2800, vec![3; USIZE]);
assert_data(&mut frames, 4200, vec![4; USIZE]);
assert_data(&mut frames, 5600, vec![5; USIZE]);
assert_header(&mut frames, 8, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
}
#[test]
fn gigantic_message_order_other_prio() {
let (mut mgr, msg_tx, _flush_tx) = mock_new();
let mut data = vec![1; USIZE];
data.extend_from_slice(&vec![2; USIZE]);
data.extend_from_slice(&vec![3; USIZE]);
data.extend_from_slice(&vec![4; USIZE]);
data.extend_from_slice(&vec![5; USIZE]);
let sid = Sid::new(2);
msg_tx
.send((16, sid, OutgoingMessage {
buffer: Arc::new(MessageBuffer { data }),
cursor: 0,
mid: 1,
sid,
}))
.unwrap();
msg_tx.send(mock_out(20, 8)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 7000);
assert_data(&mut frames, 0, vec![1; USIZE]);
assert_header(&mut frames, 8, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
assert_data(&mut frames, 1400, vec![2; USIZE]);
assert_data(&mut frames, 2800, vec![3; USIZE]);
assert_data(&mut frames, 4200, vec![4; USIZE]);
assert_data(&mut frames, 5600, vec![5; USIZE]);
}
}