mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
- switched participant to only error!() once a sec instead of every occurence in order to not spam error
- switched the behavior of prio to keep the order of a stream, EVEN if messages of different length are used! This needs to be addresses to fulfill PROMISES_ORDERED I think we need to make thoughs if PRIO needs to handle this, or we add additional meta info and handle it on receiver site
This commit is contained in:
parent
74f668f0b6
commit
dc5cc593ff
@ -63,6 +63,7 @@ pub struct BParticipant {
|
|||||||
running_mgr: AtomicUsize,
|
running_mgr: AtomicUsize,
|
||||||
run_channels: Option<ControlChannels>,
|
run_channels: Option<ControlChannels>,
|
||||||
metrics: Arc<NetworkMetrics>,
|
metrics: Arc<NetworkMetrics>,
|
||||||
|
no_channel_error_info: RwLock<(Instant, u64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BParticipant {
|
impl BParticipant {
|
||||||
@ -104,6 +105,7 @@ impl BParticipant {
|
|||||||
running_mgr: AtomicUsize::new(0),
|
running_mgr: AtomicUsize::new(0),
|
||||||
run_channels,
|
run_channels,
|
||||||
metrics,
|
metrics,
|
||||||
|
no_channel_error_info: RwLock::new((Instant::now(), 0)),
|
||||||
},
|
},
|
||||||
a2b_steam_open_s,
|
a2b_steam_open_s,
|
||||||
b2a_stream_opened_r,
|
b2a_stream_opened_r,
|
||||||
@ -251,7 +253,16 @@ impl BParticipant {
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("participant has no channel to communicate on");
|
let mut guard = self.no_channel_error_info.write().await;
|
||||||
|
let now = Instant::now();
|
||||||
|
if now.duration_since(guard.0) > Duration::from_secs(1) {
|
||||||
|
guard.0 = now;
|
||||||
|
let occurrences = guard.1 + 1;
|
||||||
|
guard.1 = 0;
|
||||||
|
error!(?occurrences, "participant has no channel to communicate on");
|
||||||
|
} else {
|
||||||
|
guard.1 += 1;
|
||||||
|
}
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -281,8 +281,8 @@ impl PrioManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!(?msg.mid, "repush message");
|
trace!(?msg.mid, "repush message");
|
||||||
self.messages[prio as usize].push_back((sid, msg));
|
self.messages[prio as usize].push_front((sid, msg));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => unreachable!("msg not in VecDeque, but queued"),
|
None => unreachable!("msg not in VecDeque, but queued"),
|
||||||
@ -516,11 +516,11 @@ mod tests {
|
|||||||
|
|
||||||
assert_header(&mut frames, 1, SIZE * 2 + 20);
|
assert_header(&mut frames, 1, SIZE * 2 + 20);
|
||||||
assert_data(&mut frames, 0, vec![48; USIZE]);
|
assert_data(&mut frames, 0, vec![48; USIZE]);
|
||||||
|
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
||||||
|
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
||||||
assert_header(&mut frames, 2, SIZE * 2 + 20);
|
assert_header(&mut frames, 2, SIZE * 2 + 20);
|
||||||
assert_data(&mut frames, 0, vec![48; USIZE]);
|
assert_data(&mut frames, 0, vec![48; USIZE]);
|
||||||
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
||||||
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
|
||||||
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
|
||||||
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
||||||
assert!(frames.is_empty());
|
assert!(frames.is_empty());
|
||||||
}
|
}
|
||||||
@ -531,12 +531,10 @@ mod tests {
|
|||||||
msg_tx.send(mock_out_large(16, 1)).unwrap();
|
msg_tx.send(mock_out_large(16, 1)).unwrap();
|
||||||
msg_tx.send(mock_out_large(16, 2)).unwrap();
|
msg_tx.send(mock_out_large(16, 2)).unwrap();
|
||||||
let mut frames = VecDeque::new();
|
let mut frames = VecDeque::new();
|
||||||
block_on(mgr.fill_frames(3, &mut frames));
|
block_on(mgr.fill_frames(2, &mut frames));
|
||||||
|
|
||||||
assert_header(&mut frames, 1, SIZE * 2 + 20);
|
assert_header(&mut frames, 1, SIZE * 2 + 20);
|
||||||
assert_data(&mut frames, 0, vec![48; USIZE]);
|
assert_data(&mut frames, 0, vec![48; USIZE]);
|
||||||
assert_header(&mut frames, 2, SIZE * 2 + 20);
|
|
||||||
assert_data(&mut frames, 0, vec![48; USIZE]);
|
|
||||||
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
||||||
|
|
||||||
msg_tx.send(mock_out(0, 3)).unwrap();
|
msg_tx.send(mock_out(0, 3)).unwrap();
|
||||||
@ -545,8 +543,10 @@ mod tests {
|
|||||||
assert_header(&mut frames, 3, 3);
|
assert_header(&mut frames, 3, 3);
|
||||||
assert_data(&mut frames, 0, vec![48, 49, 50]);
|
assert_data(&mut frames, 0, vec![48, 49, 50]);
|
||||||
|
|
||||||
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
|
||||||
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
||||||
|
assert_header(&mut frames, 2, SIZE * 2 + 20);
|
||||||
|
assert_data(&mut frames, 0, vec![48; USIZE]);
|
||||||
|
assert_data(&mut frames, SIZE, vec![49; USIZE]);
|
||||||
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
|
||||||
assert!(frames.is_empty());
|
assert!(frames.is_empty());
|
||||||
}
|
}
|
||||||
@ -600,4 +600,97 @@ mod tests {
|
|||||||
assert_header(&mut frames, 2, 3);
|
assert_header(&mut frames, 2, 3);
|
||||||
//unimportant
|
//unimportant
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn gigantic_message() {
|
||||||
|
let (mut mgr, msg_tx, _flush_tx) = mock_new();
|
||||||
|
let mut data = vec![1; USIZE];
|
||||||
|
data.extend_from_slice(&vec![2; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![3; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![4; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![5; USIZE]);
|
||||||
|
let sid = Sid::new(2);
|
||||||
|
msg_tx
|
||||||
|
.send((16, sid, OutgoingMessage {
|
||||||
|
buffer: Arc::new(MessageBuffer { data }),
|
||||||
|
cursor: 0,
|
||||||
|
mid: 1,
|
||||||
|
sid,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut frames = VecDeque::new();
|
||||||
|
block_on(mgr.fill_frames(2000, &mut frames));
|
||||||
|
|
||||||
|
assert_header(&mut frames, 2, 7000);
|
||||||
|
assert_data(&mut frames, 0, vec![1; USIZE]);
|
||||||
|
assert_data(&mut frames, 1400, vec![2; USIZE]);
|
||||||
|
assert_data(&mut frames, 2800, vec![3; USIZE]);
|
||||||
|
assert_data(&mut frames, 4200, vec![4; USIZE]);
|
||||||
|
assert_data(&mut frames, 5600, vec![5; USIZE]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn gigantic_message_order() {
|
||||||
|
let (mut mgr, msg_tx, _flush_tx) = mock_new();
|
||||||
|
let mut data = vec![1; USIZE];
|
||||||
|
data.extend_from_slice(&vec![2; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![3; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![4; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![5; USIZE]);
|
||||||
|
let sid = Sid::new(2);
|
||||||
|
msg_tx
|
||||||
|
.send((16, sid, OutgoingMessage {
|
||||||
|
buffer: Arc::new(MessageBuffer { data }),
|
||||||
|
cursor: 0,
|
||||||
|
mid: 1,
|
||||||
|
sid,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
msg_tx.send(mock_out(16, 8)).unwrap();
|
||||||
|
|
||||||
|
let mut frames = VecDeque::new();
|
||||||
|
block_on(mgr.fill_frames(2000, &mut frames));
|
||||||
|
|
||||||
|
assert_header(&mut frames, 2, 7000);
|
||||||
|
assert_data(&mut frames, 0, vec![1; USIZE]);
|
||||||
|
assert_data(&mut frames, 1400, vec![2; USIZE]);
|
||||||
|
assert_data(&mut frames, 2800, vec![3; USIZE]);
|
||||||
|
assert_data(&mut frames, 4200, vec![4; USIZE]);
|
||||||
|
assert_data(&mut frames, 5600, vec![5; USIZE]);
|
||||||
|
assert_header(&mut frames, 8, 3);
|
||||||
|
assert_data(&mut frames, 0, vec![48, 49, 50]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn gigantic_message_order_other_prio() {
|
||||||
|
let (mut mgr, msg_tx, _flush_tx) = mock_new();
|
||||||
|
let mut data = vec![1; USIZE];
|
||||||
|
data.extend_from_slice(&vec![2; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![3; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![4; USIZE]);
|
||||||
|
data.extend_from_slice(&vec![5; USIZE]);
|
||||||
|
let sid = Sid::new(2);
|
||||||
|
msg_tx
|
||||||
|
.send((16, sid, OutgoingMessage {
|
||||||
|
buffer: Arc::new(MessageBuffer { data }),
|
||||||
|
cursor: 0,
|
||||||
|
mid: 1,
|
||||||
|
sid,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
msg_tx.send(mock_out(20, 8)).unwrap();
|
||||||
|
|
||||||
|
let mut frames = VecDeque::new();
|
||||||
|
block_on(mgr.fill_frames(2000, &mut frames));
|
||||||
|
|
||||||
|
assert_header(&mut frames, 2, 7000);
|
||||||
|
assert_data(&mut frames, 0, vec![1; USIZE]);
|
||||||
|
assert_header(&mut frames, 8, 3);
|
||||||
|
assert_data(&mut frames, 0, vec![48, 49, 50]);
|
||||||
|
assert_data(&mut frames, 1400, vec![2; USIZE]);
|
||||||
|
assert_data(&mut frames, 2800, vec![3; USIZE]);
|
||||||
|
assert_data(&mut frames, 4200, vec![4; USIZE]);
|
||||||
|
assert_data(&mut frames, 5600, vec![5; USIZE]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user