Merge branch 'xMAC94x/network_tracing' into 'master'

xmac94x/network tracing

See merge request veloren/veloren!1311
This commit is contained in:
Marcel 2020-08-21 17:24:31 +00:00
commit 7e9194eb73
14 changed files with 296 additions and 168 deletions

View File

@ -1383,8 +1383,10 @@ impl Client {
);
},
ServerMsg::Disconnect => {
debug!("finally sendinge ClientMsg::Terminate");
frontend_events.push(Event::Disconnect);
self.singleton_stream.send(ClientMsg::Terminate)?;
break Ok(());
},
ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list;

View File

@ -1,6 +1,7 @@
#[cfg(feature = "metrics")]
use crate::metrics::NetworkMetrics;
use crate::{
participant::C2pFrame,
protocols::Protocols,
types::{
Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
@ -41,8 +42,8 @@ impl Channel {
pub async fn run(
mut self,
protocol: Protocols,
mut w2c_cid_frame_s: mpsc::UnboundedSender<(Cid, Frame)>,
mut leftover_cid_frame: Vec<(Cid, Frame)>,
mut w2c_cid_frame_s: mpsc::UnboundedSender<C2pFrame>,
mut leftover_cid_frame: Vec<C2pFrame>,
) {
let c2w_frame_r = self.c2w_frame_r.take().unwrap();
let read_stop_receiver = self.read_stop_receiver.take().unwrap();
@ -58,13 +59,13 @@ impl Channel {
trace!(?self.cid, "Start up channel");
match protocol {
Protocols::Tcp(tcp) => {
futures::join!(
join!(
tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
tcp.write_to_wire(self.cid, c2w_frame_r),
);
},
Protocols::Udp(udp) => {
futures::join!(
join!(
udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
udp.write_to_wire(self.cid, c2w_frame_r),
);
@ -113,12 +114,9 @@ impl Handshake {
}
}
pub async fn setup(
self,
protocol: &Protocols,
) -> Result<(Pid, Sid, u128, Vec<(Cid, Frame)>), ()> {
pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec<C2pFrame>), ()> {
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::<Frame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let handler_future =
@ -160,7 +158,7 @@ impl Handshake {
async fn frame_handler(
&self,
w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>,
w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<C2pFrame>,
mut c2w_frame_s: mpsc::UnboundedSender<Frame>,
read_stop_sender: oneshot::Sender<()>,
) -> Result<(Pid, Sid, u128), ()> {
@ -176,7 +174,7 @@ impl Handshake {
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
#[cfg(feature = "metrics")]
{
if let Some(ref frame) = frame {
if let Some(Ok(ref frame)) = frame {
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, &frame.get_string()])
@ -184,10 +182,10 @@ impl Handshake {
}
}
let r = match frame {
Some(Frame::Handshake {
Some(Ok(Frame::Handshake {
magic_number,
version,
}) => {
})) => {
trace!(?magic_number, ?version, "Recv handshake");
if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "Connection with invalid magic_number");
@ -221,18 +219,24 @@ impl Handshake {
Ok(())
}
},
Some(Frame::Shutdown) => {
info!("Shutdown signal received");
Err(())
},
Some(Frame::Raw(bytes)) => {
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
Some(Ok(frame)) => {
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()])
.inc();
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
}
}
Err(())
},
Some(_) => Err(()),
Some(Err(())) => {
info!("Protocol got interrupted");
Err(())
},
None => Err(()),
};
if let Err(()) = r {
@ -248,7 +252,7 @@ impl Handshake {
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
let r = match frame {
Some(Frame::Init { pid, secret }) => {
Some(Ok(Frame::Init { pid, secret })) => {
debug!(?pid, "Participant send their ID");
let pid_string = pid.to_string();
#[cfg(feature = "metrics")]
@ -265,22 +269,24 @@ impl Handshake {
info!(?pid, "This Handshake is now configured!");
Ok((pid, stream_id_offset, secret))
},
Some(frame) => {
Some(Ok(frame)) => {
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()])
.inc();
match frame {
Frame::Shutdown => info!("Shutdown signal received"),
Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) {
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
},
_ => (),
}
}
Err(())
},
Some(Err(())) => {
info!("Protocol got interrupted");
Err(())
},
None => Err(()),
};
if r.is_err() {

View File

@ -34,6 +34,8 @@ pub struct NetworkMetrics {
pub message_out_total: IntCounterVec,
// send(prio) Messages throughput, seperated by STREAM AND PARTICIPANT,
pub message_out_throughput: IntCounterVec,
// flushed(prio) stream count, seperated by PARTICIPANT,
pub streams_flushed: IntCounterVec,
// TODO: queued Messages, seperated by STREAM (add PART, CHANNEL),
// queued Messages, seperated by PARTICIPANT
pub queued_count: IntGaugeVec,
@ -167,6 +169,13 @@ impl NetworkMetrics {
),
&["participant", "stream"],
)?;
let streams_flushed = IntCounterVec::new(
Opts::new(
"stream_flushed",
"Number of flushed streams requested to PrioManager at participant level",
),
&["participant"],
)?;
let queued_count = IntGaugeVec::new(
Opts::new(
"queued_count",
@ -207,6 +216,7 @@ impl NetworkMetrics {
wire_in_throughput,
message_out_total,
message_out_throughput,
streams_flushed,
queued_count,
queued_bytes,
participants_ping,

View File

@ -25,9 +25,11 @@ use std::{
time::{Duration, Instant},
};
use tracing::*;
use tracing_futures::Instrument;
pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender<Stream>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>);
pub(crate) type C2pFrame = (Cid, Result<Frame, ()>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<C2pFrame>, oneshot::Sender<()>);
pub(crate) type S2bShutdownBparticipant = oneshot::Sender<Result<(), ParticipantError>>;
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
@ -69,7 +71,7 @@ pub struct BParticipant {
remote_pid: Pid,
remote_pid_string: String, //optimisation
offset_sid: Sid,
channels: Arc<RwLock<Vec<ChannelInfo>>>,
channels: Arc<RwLock<HashMap<Cid, ChannelInfo>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>,
running_mgr: AtomicUsize,
run_channels: Option<ControlChannels>,
@ -118,7 +120,7 @@ impl BParticipant {
remote_pid,
remote_pid_string: remote_pid.to_string(),
offset_sid,
channels: Arc::new(RwLock::new(vec![])),
channels: Arc::new(RwLock::new(HashMap::new())),
streams: RwLock::new(HashMap::new()),
running_mgr: AtomicUsize::new(0),
run_channels,
@ -142,7 +144,7 @@ impl BParticipant {
oneshot::channel();
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<C2pFrame>();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(
#[cfg(feature = "metrics")]
self.metrics.clone(),
@ -205,13 +207,11 @@ impl BParticipant {
#[cfg(feature = "metrics")]
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut i: u64 = 0;
loop {
let mut frames = VecDeque::new();
prios.fill_frames(FRAMES_PER_TICK, &mut frames).await;
let len = frames.len();
if len > 0 {
trace!("Tick {}", len);
}
for (_, frame) in frames {
self.send_frame(
frame,
@ -225,6 +225,10 @@ impl BParticipant {
.await
.unwrap();
async_std::task::sleep(TICK_TIME).await;
i += 1;
if i.rem_euclid(1000) == 0 {
trace!("Did 1000 ticks");
}
//shutdown after all msg are send!
if closing_up && (len == 0) {
break;
@ -251,34 +255,30 @@ impl BParticipant {
// find out ideal channel here
//TODO: just take first
let mut lock = self.channels.write().await;
if let Some(ci) = lock.get_mut(0) {
//note: this is technically wrong we should only increase when it suceeded, but
// this requiered me to clone `frame` which is a to big performance impact for
// error handling
if let Some(ci) = lock.values_mut().next() {
//we are increasing metrics without checking the result to please
// borrow_checker. otherwise we would need to close `frame` what we
// dont want!
#[cfg(feature = "metrics")]
frames_out_total_cache
.with_label_values(ci.cid, &frame)
.inc();
if let Err(e) = ci.b2w_frame_s.send(frame).await {
warn!(
?e,
"The channel got closed unexpectedly, cleaning it up now."
);
let ci = lock.remove(0);
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(
?e,
"Error shutdowning channel, which is prob fine as we detected it to no \
longer work in the first place"
);
};
let cid = ci.cid;
info!(?e, ?cid, "channel no longer available");
if let Some(ci) = self.channels.write().await.remove(&cid) {
trace!(?cid, "stopping read protocol");
if let Err(e) = ci.b2r_read_shutdown.send(()) {
trace!(?cid, ?e, "seems like was already shut down");
}
}
//TODO FIXME tags: takeover channel multiple
info!(
"FIXME: the frame is actually drop. which is fine for now as the participant \
will be closed, but not if we do channel-takeover"
);
//TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant
self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
.await;
false
} else {
@ -291,7 +291,12 @@ impl BParticipant {
guard.0 = now;
let occurrences = guard.1 + 1;
guard.1 = 0;
error!(?occurrences, "Participant has no channel to communicate on");
let lastframe = frame;
error!(
?occurrences,
?lastframe,
"Participant has no channel to communicate on"
);
} else {
guard.1 += 1;
}
@ -301,7 +306,7 @@ impl BParticipant {
async fn handle_frames_mgr(
&self,
mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut w2b_frames_r: mpsc::UnboundedReceiver<C2pFrame>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
@ -313,8 +318,22 @@ impl BParticipant {
let mut dropped_cnt = 0u64;
let mut dropped_sid = Sid::new(0);
while let Some((cid, frame)) = w2b_frames_r.next().await {
//trace!("handling frame");
while let Some((cid, result_frame)) = w2b_frames_r.next().await {
//trace!(?result_frame, "handling frame");
let frame = match result_frame {
Ok(frame) => frame,
Err(()) => {
// The read protocol stopped, i need to make sure that write gets stopped
debug!("read protocol was closed. Stopping write protocol");
if let Some(ci) = self.channels.write().await.get_mut(&cid) {
ci.b2w_frame_s
.close()
.await
.expect("couldn't stop write protocol");
}
continue;
},
};
#[cfg(feature = "metrics")]
{
let cid_string = cid.to_string();
@ -323,8 +342,6 @@ impl BParticipant {
.with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()])
.inc();
}
#[cfg(not(feature = "metrics"))]
let _cid = cid;
match frame {
Frame::OpenStream {
sid,
@ -395,7 +412,7 @@ impl BParticipant {
false
};
if finished {
//debug!(?mid, "finished receiving message");
//trace!(?mid, "finished receiving message");
let imsg = messages.remove(&mid).unwrap();
if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) {
if let Err(e) = si.b2a_msg_recv_s.send(imsg).await {
@ -448,7 +465,7 @@ impl BParticipant {
async fn create_channel_mgr(
&self,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>,
w2b_frames_s: mpsc::UnboundedSender<C2pFrame>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start create_channel_mgr");
@ -462,7 +479,7 @@ impl BParticipant {
let channels = self.channels.clone();
async move {
let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid);
channels.write().await.push(ChannelInfo {
channels.write().await.insert(cid, ChannelInfo {
cid,
cid_string: cid.to_string(),
b2w_frame_s,
@ -477,13 +494,20 @@ impl BParticipant {
trace!(?cid, "Running channel in participant");
channel
.run(protocol, w2b_frames_s, leftover_cid_frame)
.instrument(tracing::info_span!("", ?cid))
.await;
#[cfg(feature = "metrics")]
self.metrics
.channels_disconnected_total
.with_label_values(&[&self.remote_pid_string])
.inc();
trace!(?cid, "Channel got closed");
info!(?cid, "Channel got closed");
//maybe channel got already dropped, we don't know.
channels.write().await.remove(&cid);
trace!(?cid, "Channel cleanup completed");
//TEMP FIX: as we dont have channel takeover yet drop the whole
// bParticipant
self.close_write_api(None).await;
}
},
)
@ -562,6 +586,9 @@ impl BParticipant {
trace!("Start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap();
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
self.close_api(None).await;
debug!("Closing all managers");
@ -572,10 +599,30 @@ impl BParticipant {
}
b2b_prios_flushed_r.await.unwrap();
if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error
{
debug!("Sending shutdown frame after flushed all prios");
if !self
.send_frame(
Frame::Shutdown,
#[cfg(feature = "metrics")]
&mut send_cache,
)
.await
{
warn!("couldn't send shutdown frame, are channels already closed?");
}
}
debug!("Closing all channels, after flushed prios");
for ci in self.channels.write().await.drain(..) {
for (cid, ci) in self.channels.write().await.drain() {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact");
debug!(
?e,
?cid,
"Seems like this read protocol got already dropped by closing the Stream \
itself, ignoring"
);
};
}
@ -647,7 +694,7 @@ impl BParticipant {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.close_channel();
},
None => warn!("Couldn't find the stream, might be simulanious close from remote"),
None => warn!("Couldn't find the stream, might be simultaneous close from remote"),
}
//TODO: what happens if RIGHT NOW the remote sends a StreamClose and this
@ -711,21 +758,29 @@ impl BParticipant {
)
}
/// close streams and set err
async fn close_api(&self, reason: Option<ParticipantError>) {
//closing api::Participant is done by closing all channels, exepct for the
// shutdown channel at this point!
async fn close_write_api(&self, reason: Option<ParticipantError>) {
trace!(?reason, "close_api");
let mut lock = self.shutdown_info.write().await;
if let Some(r) = reason {
lock.error = Some(r);
}
lock.b2a_stream_opened_s.close_channel();
debug!("Closing all streams for write");
for (sid, si) in self.streams.write().await.iter() {
trace!(?sid, "Shutting down Stream for write");
si.send_closed.store(true, Ordering::Relaxed);
}
}
///closing api::Participant is done by closing all channels, exepct for the
/// shutdown channel at this point!
async fn close_api(&self, reason: Option<ParticipantError>) {
self.close_write_api(reason).await;
debug!("Closing all streams");
for (sid, si) in self.streams.write().await.drain() {
trace!(?sid, "Shutting down Stream");
si.b2a_msg_recv_s.close_channel();
si.send_closed.store(true, Ordering::Relaxed);
}
}
}

View File

@ -16,8 +16,6 @@ use futures::channel::oneshot;
use std::collections::{HashMap, HashSet, VecDeque};
#[cfg(feature = "metrics")] use std::sync::Arc;
use tracing::*;
const PRIO_MAX: usize = 64;
#[derive(Default)]
@ -148,11 +146,8 @@ impl PrioManager {
async fn tick(&mut self) {
// Check Range
let mut messages = 0;
let mut closed = 0;
for (prio, sid, msg) in self.messages_rx.try_iter() {
debug_assert!(prio as usize <= PRIO_MAX);
messages += 1;
#[cfg(feature = "metrics")]
{
let sid_string = sid.to_string();
@ -173,7 +168,11 @@ impl PrioManager {
}
//this must be AFTER messages
for (sid, return_sender) in self.sid_flushed_rx.try_iter() {
closed += 1;
#[cfg(feature = "metrics")]
self.metrics
.streams_flushed
.with_label_values(&[&self.pid])
.inc();
if let Some(cnt) = self.sid_owned.get_mut(&sid) {
// register sender
cnt.empty_notify = Some(return_sender);
@ -182,9 +181,6 @@ impl PrioManager {
return_sender.send(()).unwrap();
}
}
if messages > 0 || closed > 0 {
trace!(?messages, ?closed, "tick");
}
}
//if None returned, we are empty!
@ -256,7 +252,6 @@ impl PrioManager {
}
}
} else {
trace!(?msg.mid, "Repush message");
self.messages[prio as usize].push_front((sid, msg));
}
},

View File

@ -1,6 +1,9 @@
#[cfg(feature = "metrics")]
use crate::metrics::{CidFrameCache, NetworkMetrics};
use crate::types::{Cid, Frame, Mid, Pid, Sid};
use crate::{
participant::C2pFrame,
types::{Cid, Frame, Mid, Pid, Sid},
};
use async_std::{
net::{TcpStream, UdpSocket},
prelude::*,
@ -71,8 +74,8 @@ impl TcpProtocol {
cid: Cid,
mut stream: &TcpStream,
mut bytes: &mut [u8],
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
) -> bool {
match select! {
r = stream.read_exact(&mut bytes).fuse() => Some(r),
@ -80,20 +83,18 @@ impl TcpProtocol {
} {
Some(Ok(_)) => false,
Some(Err(e)) => {
debug!(
?cid,
?e,
"Closing tcp protocol due to read error, sending close frame to gracefully \
shutdown"
);
info!(?e, "Closing tcp protocol due to read error");
//w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every
// channel is holding a sender! thats why Ne need a explicit
// STOP here
w2c_cid_frame_s
.send((cid, Frame::Shutdown))
.send((cid, Err(())))
.await
.expect("Channel or Participant seems no longer to exist to be Shutdown");
.expect("Channel or Participant seems no longer to exist");
true
},
None => {
trace!(?cid, "shutdown requested");
trace!("shutdown requested");
true
},
}
@ -102,7 +103,7 @@ impl TcpProtocol {
pub async fn read_from_wire(
&self,
cid: Cid,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
end_r: oneshot::Receiver<()>,
) {
trace!("Starting up tcp read()");
@ -118,8 +119,8 @@ impl TcpProtocol {
macro_rules! read_or_close {
($x:expr) => {
if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await {
info!("Tcp stream closed, shutting down read");
if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await {
trace!("read_or_close requested a shutdown");
break;
}
};
@ -213,7 +214,7 @@ impl TcpProtocol {
#[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s
.send((cid, frame))
.send((cid, Ok(frame)))
.await
.expect("Channel or Participant seems no longer to exist");
}
@ -228,7 +229,7 @@ impl TcpProtocol {
) -> bool {
match stream.write_all(&bytes).await {
Err(e) => {
debug!(
info!(
?e,
"Got an error writing to tcp, going to close this channel"
);
@ -255,7 +256,7 @@ impl TcpProtocol {
macro_rules! write_or_close {
($x:expr) => {
if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await {
info!("Tcp stream closed, shutting down write");
trace!("write_or_close requested a shutdown");
break;
}
};
@ -342,7 +343,7 @@ impl UdpProtocol {
pub async fn read_from_wire(
&self,
cid: Cid,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
end_r: oneshot::Receiver<()>,
) {
trace!("Starting up udp read()");
@ -356,7 +357,14 @@ impl UdpProtocol {
let mut data_in = self.data_in.lock().await;
let mut end_r = end_r.fuse();
while let Some(bytes) = select! {
r = data_in.next().fuse() => r,
r = data_in.next().fuse() => match r {
Some(r) => Some(r),
None => {
info!("Udp read ended");
w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist");
None
}
},
_ = end_r => None,
} {
trace!("Got raw UDP message with len: {}", bytes.len());
@ -454,7 +462,7 @@ impl UdpProtocol {
};
#[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, frame)).await.unwrap();
w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap();
}
trace!("Shutting down udp read()");
}
@ -565,10 +573,7 @@ impl UdpProtocol {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
metrics::NetworkMetrics,
types::{Cid, Pid},
};
use crate::{metrics::NetworkMetrics, types::Pid};
use async_std::net;
use futures::{executor::block_on, stream::StreamExt};
use std::sync::Arc;
@ -595,7 +600,7 @@ mod tests {
client.flush();
//handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid;
let t = std::thread::spawn(move || {
@ -608,10 +613,10 @@ mod tests {
//async_std::task::sleep(std::time::Duration::from_millis(1000));
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r);
if let Frame::Handshake {
if let Ok(Frame::Handshake {
magic_number,
version,
} = frame
}) = frame
{
assert_eq!(&magic_number, b"HELLOWO");
assert_eq!(version, [1337, 0, 42]);
@ -644,7 +649,7 @@ mod tests {
client.flush();
//handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid;
let t = std::thread::spawn(move || {
@ -656,7 +661,7 @@ mod tests {
// Assert than we get some value back! Its a Raw!
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r);
if let Frame::Raw(data) = frame {
if let Ok(Frame::Raw(data)) = frame {
assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf");
} else {
panic!("wrong frame type");

View File

@ -334,6 +334,13 @@ impl Scheduler {
);
};
}
debug!("shutting down protocol listeners");
for (addr, end_channel_sender) in self.channel_listener.write().await.drain() {
trace!(?addr, "stopping listen on protocol");
if let Err(e) = end_channel_sender.send(()) {
warn!(?addr, ?e, "listener crashed/disconnected already");
}
}
debug!("Scheduler shut down gracefully");
//removing the possibility to create new participants, needed to close down
// some mgr:
@ -512,7 +519,11 @@ impl Scheduler {
metrics.clone(),
send_handshake,
);
match handshake.setup(&protocol).await {
match handshake
.setup(&protocol)
.instrument(tracing::info_span!("handshake", ?cid))
.await
{
Ok((pid, sid, secret, leftover_cid_frame)) => {
trace!(
?cid,
@ -583,14 +594,18 @@ impl Scheduler {
}
} else {
let pi = &participants[&pid];
trace!("2nd+ channel of participant, going to compare security ids");
trace!(
?cid,
"2nd+ channel of participant, going to compare security ids"
);
if pi.secret != secret {
warn!(
?cid,
?pid,
?secret,
"Detected incompatible Secret!, this is probably an attack!"
);
error!("Just dropping here, TODO handle this correctly!");
error!(?cid, "Just dropping here, TODO handle this correctly!");
//TODO
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error
@ -604,6 +619,7 @@ impl Scheduler {
return;
}
error!(
?cid,
"Ufff i cant answer the pid_oneshot. as i need to create the SAME \
participant. maybe switch to ARC"
);
@ -612,9 +628,10 @@ impl Scheduler {
// move directly to participant!
},
Err(()) => {
debug!(?cid, "Handshake from a new connection failed");
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error
trace!("returning the Err to api who requested the connect");
trace!(?cid, "returning the Err to api who requested the connect");
pid_oneshot
.send(Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
@ -625,7 +642,7 @@ impl Scheduler {
},
}
}
.instrument(tracing::trace_span!("")),
.instrument(tracing::info_span!("")),
); /*WORKAROUND FOR SPAN NOT TO GET LOST*/
}
}

View File

@ -146,10 +146,18 @@ impl Pid {
/// This will panic if pid i greater than 7, as I do not want you to use
/// this in production!
#[doc(hidden)]
pub fn fake(pid: u8) -> Self {
assert!(pid < 8);
pub fn fake(pid_offset: u8) -> Self {
assert!(pid_offset < 8);
let o = pid_offset as u128;
const OFF: [u128; 5] = [
0x40,
0x40 * 0x40,
0x40 * 0x40 * 0x40,
0x40 * 0x40 * 0x40 * 0x40,
0x40 * 0x40 * 0x40 * 0x40 * 0x40,
];
Self {
internal: pid as u128,
internal: o + o * OFF[0] + o * OFF[1] + o * OFF[2] + o * OFF[3] + o * OFF[4],
}
}
@ -251,7 +259,9 @@ mod tests {
#[test]
fn frame_creation() {
Pid::new();
assert_eq!(format!("{}", Pid::fake(2)), "CAAAAA");
assert_eq!(format!("{}", Pid::fake(0)), "AAAAAA");
assert_eq!(format!("{}", Pid::fake(1)), "BBBBBB");
assert_eq!(format!("{}", Pid::fake(2)), "CCCCCC");
}
#[test]

View File

@ -42,10 +42,11 @@ fn close_participant() {
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap();
assert_eq!(
block_on(p1_b.disconnect()),
Err(ParticipantError::ParticipantDisconnected)
);
//As no more read/write is run disconnect is successful or already disconnected
match block_on(p1_b.disconnect()) {
Ok(_) | Err(ParticipantError::ParticipantDisconnected) => (),
e => panic!("wrong disconnect type {:?}", e),
};
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
@ -284,9 +285,9 @@ fn failed_stream_open_after_remote_part_is_closed() {
#[test]
fn open_participant_before_remote_part_is_closed() {
let (n_a, f) = Network::new(Pid::fake(1));
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(2));
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
@ -304,9 +305,9 @@ fn open_participant_before_remote_part_is_closed() {
#[test]
fn open_participant_after_remote_part_is_closed() {
let (n_a, f) = Network::new(Pid::fake(1));
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(2));
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
@ -321,3 +322,25 @@ fn open_participant_after_remote_part_is_closed() {
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn close_network_scheduler_completely() {
let (n_a, f) = Network::new(Pid::fake(0));
let ha = std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(1));
let hb = std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
drop(n_a);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
ha.join().unwrap();
hb.join().unwrap();
}

View File

@ -10,10 +10,7 @@ use tracing_subscriber::EnvFilter;
use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE};
#[allow(dead_code)]
pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
if tracing {
sleep += 1000
}
pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
if sleep > 0 {
thread::sleep(Duration::from_millis(sleep));
}
@ -49,9 +46,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
pub async fn network_participant_stream(
addr: ProtocolAddr,
) -> (Network, Participant, Stream, Network, Participant, Stream) {
let (n_a, f_a) = Network::new(Pid::fake(1));
let (n_a, f_a) = Network::new(Pid::fake(0));
std::thread::spawn(f_a);
let (n_b, f_b) = Network::new(Pid::fake(2));
let (n_b, f_b) = Network::new(Pid::fake(1));
std::thread::spawn(f_b);
n_a.listen(addr.clone()).await.unwrap();

View File

@ -79,30 +79,32 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event {
if let Some(client) = server.state().read_storage::<Client>().get(entity) {
trace!("Closing participant of client");
let participant = match client.participant.try_lock() {
Ok(mut p) => p.take().unwrap(),
Err(e) => {
error!(?e, "coudln't lock participant for removal");
error!(?e, ?entity, "coudln't lock participant for removal");
return Event::ClientDisconnected { entity };
},
};
std::thread::spawn(|| {
let pid = participant.remote_pid();
let pid = participant.remote_pid();
std::thread::spawn(move || {
let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity);
let _enter = span.enter();
let now = std::time::Instant::now();
trace!(?pid, "start disconnect");
debug!(?pid, ?entity, "Start handle disconnect of client");
if let Err(e) = block_on(participant.disconnect()) {
debug!(
?e,
?pid,
"Error when disconnecting client, maybe the pipe already broke"
);
};
trace!(?pid, "finished disconnect");
let elapsed = now.elapsed();
if elapsed.as_millis() > 100 {
warn!(?elapsed, "disconecting took quite long");
warn!(?elapsed, ?pid, "disconecting took quite long");
} else {
debug!(?elapsed, "disconecting took");
debug!(?elapsed, ?pid, "disconecting took");
}
});
}

View File

@ -274,16 +274,19 @@ impl Server {
let mut metrics = ServerMetrics::new();
// register all metrics submodules here
let tick_metrics = TickMetrics::new(metrics.registry(), metrics.tick_clone())
let tick_metrics = TickMetrics::new(metrics.tick_clone())
.expect("Failed to initialize server tick metrics submodule.");
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
tick_metrics
.register(&metrics.registry())
.expect("failed to register tick metrics");
let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string())
.build();
let (network, f) = Network::new(Pid::new());
let (network, f) = Network::new_with_registry(Pid::new(), &metrics.registry());
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
thread_pool.execute(f);
block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;

View File

@ -33,7 +33,7 @@ pub struct ServerMetrics {
}
impl TickMetrics {
pub fn new(registry: &Registry, tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
pub fn new(tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
let player_online = IntGauge::with_opts(Opts::new(
"player_online",
"shows the number of clients connected to the server",
@ -74,15 +74,6 @@ impl TickMetrics {
.expect("Time went backwards");
start_time.set(since_the_epoch.as_secs().try_into()?);
registry.register(Box::new(player_online.clone()))?;
registry.register(Box::new(entity_count.clone()))?;
registry.register(Box::new(build_info.clone()))?;
registry.register(Box::new(start_time.clone()))?;
registry.register(Box::new(time_of_day.clone()))?;
registry.register(Box::new(chonks_count.clone()))?;
registry.register(Box::new(chunks_count.clone()))?;
registry.register(Box::new(tick_time.clone()))?;
Ok(Self {
chonks_count,
chunks_count,
@ -97,6 +88,18 @@ impl TickMetrics {
})
}
pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
registry.register(Box::new(self.player_online.clone()))?;
registry.register(Box::new(self.entity_count.clone()))?;
registry.register(Box::new(self.build_info.clone()))?;
registry.register(Box::new(self.start_time.clone()))?;
registry.register(Box::new(self.time_of_day.clone()))?;
registry.register(Box::new(self.chonks_count.clone()))?;
registry.register(Box::new(self.chunks_count.clone()))?;
registry.register(Box::new(self.tick_time.clone()))?;
Ok(())
}
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
}

View File

@ -25,6 +25,7 @@ use hashbrown::HashMap;
use specs::{
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
};
use tracing::{debug, error, info, warn};
impl Sys {
///We needed to move this to a async fn, if we would use a async closures
@ -267,17 +268,13 @@ impl Sys {
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
tracing::error!("Could not send message. Missing player uid");
error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
tracing::warn!(
?len,
?max,
"Recieved a chat message that's too long"
)
warn!(?len, ?max, "Recieved a chat message that's too long")
},
}
},
@ -342,7 +339,9 @@ impl Sys {
client.notify(ServerMsg::Disconnect);
},
ClientMsg::Terminate => {
debug!(?entity, "Client send message to termitate session");
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
break Ok(());
},
ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) {
@ -351,11 +350,7 @@ impl Sys {
},
ClientMsg::CreateCharacter { alias, tool, body } => {
if let Err(error) = alias_validator.validate(&alias) {
tracing::debug!(
?error,
?alias,
"denied alias as it contained a banned word"
);
debug!(?error, ?alias, "denied alias as it contained a banned word");
client.notify(ServerMsg::CharacterActionError(error.to_string()));
} else if let Some(player) = players.get(entity) {
character_loader.create_character(
@ -522,10 +517,15 @@ impl<'a> System<'a> for Sys {
// Update client ping.
if cnt > 0 {
client.last_ping = time.0
} else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout
|| network_err.is_err()
} else if time.0 - client.last_ping > CLIENT_TIMEOUT
// Timeout
{
info!(?entity, "timeout error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if network_err.is_err()
// Postbox error
{
debug!(?entity, "postbox error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 {
// Try pinging the client if the timeout is nearing.