Merge branch 'xMAC94x/network_tracing' into 'master'

xmac94x/network tracing

See merge request veloren/veloren!1311
This commit is contained in:
Marcel 2020-08-21 17:24:31 +00:00
commit 6a2c2f2694
14 changed files with 296 additions and 168 deletions

View File

@ -1383,8 +1383,10 @@ impl Client {
); );
}, },
ServerMsg::Disconnect => { ServerMsg::Disconnect => {
debug!("finally sendinge ClientMsg::Terminate");
frontend_events.push(Event::Disconnect); frontend_events.push(Event::Disconnect);
self.singleton_stream.send(ClientMsg::Terminate)?; self.singleton_stream.send(ClientMsg::Terminate)?;
break Ok(());
}, },
ServerMsg::CharacterListUpdate(character_list) => { ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list; self.character_list.characters = character_list;

View File

@ -1,6 +1,7 @@
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use crate::metrics::NetworkMetrics; use crate::metrics::NetworkMetrics;
use crate::{ use crate::{
participant::C2pFrame,
protocols::Protocols, protocols::Protocols,
types::{ types::{
Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
@ -41,8 +42,8 @@ impl Channel {
pub async fn run( pub async fn run(
mut self, mut self,
protocol: Protocols, protocol: Protocols,
mut w2c_cid_frame_s: mpsc::UnboundedSender<(Cid, Frame)>, mut w2c_cid_frame_s: mpsc::UnboundedSender<C2pFrame>,
mut leftover_cid_frame: Vec<(Cid, Frame)>, mut leftover_cid_frame: Vec<C2pFrame>,
) { ) {
let c2w_frame_r = self.c2w_frame_r.take().unwrap(); let c2w_frame_r = self.c2w_frame_r.take().unwrap();
let read_stop_receiver = self.read_stop_receiver.take().unwrap(); let read_stop_receiver = self.read_stop_receiver.take().unwrap();
@ -58,13 +59,13 @@ impl Channel {
trace!(?self.cid, "Start up channel"); trace!(?self.cid, "Start up channel");
match protocol { match protocol {
Protocols::Tcp(tcp) => { Protocols::Tcp(tcp) => {
futures::join!( join!(
tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
tcp.write_to_wire(self.cid, c2w_frame_r), tcp.write_to_wire(self.cid, c2w_frame_r),
); );
}, },
Protocols::Udp(udp) => { Protocols::Udp(udp) => {
futures::join!( join!(
udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
udp.write_to_wire(self.cid, c2w_frame_r), udp.write_to_wire(self.cid, c2w_frame_r),
); );
@ -113,12 +114,9 @@ impl Handshake {
} }
} }
pub async fn setup( pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec<C2pFrame>), ()> {
self,
protocol: &Protocols,
) -> Result<(Pid, Sid, u128, Vec<(Cid, Frame)>), ()> {
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::<Frame>(); let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::<Frame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let handler_future = let handler_future =
@ -160,7 +158,7 @@ impl Handshake {
async fn frame_handler( async fn frame_handler(
&self, &self,
w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>, w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<C2pFrame>,
mut c2w_frame_s: mpsc::UnboundedSender<Frame>, mut c2w_frame_s: mpsc::UnboundedSender<Frame>,
read_stop_sender: oneshot::Sender<()>, read_stop_sender: oneshot::Sender<()>,
) -> Result<(Pid, Sid, u128), ()> { ) -> Result<(Pid, Sid, u128), ()> {
@ -176,7 +174,7 @@ impl Handshake {
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{ {
if let Some(ref frame) = frame { if let Some(Ok(ref frame)) = frame {
self.metrics self.metrics
.frames_in_total .frames_in_total
.with_label_values(&["", &cid_string, &frame.get_string()]) .with_label_values(&["", &cid_string, &frame.get_string()])
@ -184,10 +182,10 @@ impl Handshake {
} }
} }
let r = match frame { let r = match frame {
Some(Frame::Handshake { Some(Ok(Frame::Handshake {
magic_number, magic_number,
version, version,
}) => { })) => {
trace!(?magic_number, ?version, "Recv handshake"); trace!(?magic_number, ?version, "Recv handshake");
if magic_number != VELOREN_MAGIC_NUMBER { if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "Connection with invalid magic_number"); error!(?magic_number, "Connection with invalid magic_number");
@ -221,18 +219,24 @@ impl Handshake {
Ok(()) Ok(())
} }
}, },
Some(Frame::Shutdown) => { Some(Ok(frame)) => {
info!("Shutdown signal received"); #[cfg(feature = "metrics")]
Err(()) self.metrics
}, .frames_in_total
Some(Frame::Raw(bytes)) => { .with_label_values(&["", &cid_string, frame.get_string()])
.inc();
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) { match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S), Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S), _ => error!(?bytes, ERR_S),
} }
}
Err(())
},
Some(Err(())) => {
info!("Protocol got interrupted");
Err(()) Err(())
}, },
Some(_) => Err(()),
None => Err(()), None => Err(()),
}; };
if let Err(()) = r { if let Err(()) = r {
@ -248,7 +252,7 @@ impl Handshake {
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
let r = match frame { let r = match frame {
Some(Frame::Init { pid, secret }) => { Some(Ok(Frame::Init { pid, secret })) => {
debug!(?pid, "Participant send their ID"); debug!(?pid, "Participant send their ID");
let pid_string = pid.to_string(); let pid_string = pid.to_string();
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -265,20 +269,22 @@ impl Handshake {
info!(?pid, "This Handshake is now configured!"); info!(?pid, "This Handshake is now configured!");
Ok((pid, stream_id_offset, secret)) Ok((pid, stream_id_offset, secret))
}, },
Some(frame) => { Some(Ok(frame)) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
self.metrics self.metrics
.frames_in_total .frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()]) .with_label_values(&["", &cid_string, frame.get_string()])
.inc(); .inc();
match frame { if let Frame::Raw(bytes) = frame {
Frame::Shutdown => info!("Shutdown signal received"), match std::str::from_utf8(bytes.as_slice()) {
Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S), Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S), _ => error!(?bytes, ERR_S),
},
_ => (),
} }
}
Err(())
},
Some(Err(())) => {
info!("Protocol got interrupted");
Err(()) Err(())
}, },
None => Err(()), None => Err(()),

View File

@ -34,6 +34,8 @@ pub struct NetworkMetrics {
pub message_out_total: IntCounterVec, pub message_out_total: IntCounterVec,
// send(prio) Messages throughput, seperated by STREAM AND PARTICIPANT, // send(prio) Messages throughput, seperated by STREAM AND PARTICIPANT,
pub message_out_throughput: IntCounterVec, pub message_out_throughput: IntCounterVec,
// flushed(prio) stream count, seperated by PARTICIPANT,
pub streams_flushed: IntCounterVec,
// TODO: queued Messages, seperated by STREAM (add PART, CHANNEL), // TODO: queued Messages, seperated by STREAM (add PART, CHANNEL),
// queued Messages, seperated by PARTICIPANT // queued Messages, seperated by PARTICIPANT
pub queued_count: IntGaugeVec, pub queued_count: IntGaugeVec,
@ -167,6 +169,13 @@ impl NetworkMetrics {
), ),
&["participant", "stream"], &["participant", "stream"],
)?; )?;
let streams_flushed = IntCounterVec::new(
Opts::new(
"stream_flushed",
"Number of flushed streams requested to PrioManager at participant level",
),
&["participant"],
)?;
let queued_count = IntGaugeVec::new( let queued_count = IntGaugeVec::new(
Opts::new( Opts::new(
"queued_count", "queued_count",
@ -207,6 +216,7 @@ impl NetworkMetrics {
wire_in_throughput, wire_in_throughput,
message_out_total, message_out_total,
message_out_throughput, message_out_throughput,
streams_flushed,
queued_count, queued_count,
queued_bytes, queued_bytes,
participants_ping, participants_ping,

View File

@ -25,9 +25,11 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tracing::*; use tracing::*;
use tracing_futures::Instrument;
pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender<Stream>); pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender<Stream>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>); pub(crate) type C2pFrame = (Cid, Result<Frame, ()>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<C2pFrame>, oneshot::Sender<()>);
pub(crate) type S2bShutdownBparticipant = oneshot::Sender<Result<(), ParticipantError>>; pub(crate) type S2bShutdownBparticipant = oneshot::Sender<Result<(), ParticipantError>>;
pub(crate) type B2sPrioStatistic = (Pid, u64, u64); pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
@ -69,7 +71,7 @@ pub struct BParticipant {
remote_pid: Pid, remote_pid: Pid,
remote_pid_string: String, //optimisation remote_pid_string: String, //optimisation
offset_sid: Sid, offset_sid: Sid,
channels: Arc<RwLock<Vec<ChannelInfo>>>, channels: Arc<RwLock<HashMap<Cid, ChannelInfo>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>, streams: RwLock<HashMap<Sid, StreamInfo>>,
running_mgr: AtomicUsize, running_mgr: AtomicUsize,
run_channels: Option<ControlChannels>, run_channels: Option<ControlChannels>,
@ -118,7 +120,7 @@ impl BParticipant {
remote_pid, remote_pid,
remote_pid_string: remote_pid.to_string(), remote_pid_string: remote_pid.to_string(),
offset_sid, offset_sid,
channels: Arc::new(RwLock::new(vec![])), channels: Arc::new(RwLock::new(HashMap::new())),
streams: RwLock::new(HashMap::new()), streams: RwLock::new(HashMap::new()),
running_mgr: AtomicUsize::new(0), running_mgr: AtomicUsize::new(0),
run_channels, run_channels,
@ -142,7 +144,7 @@ impl BParticipant {
oneshot::channel(); oneshot::channel();
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<C2pFrame>();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
self.metrics.clone(), self.metrics.clone(),
@ -205,13 +207,11 @@ impl BParticipant {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
let mut send_cache = let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut i: u64 = 0;
loop { loop {
let mut frames = VecDeque::new(); let mut frames = VecDeque::new();
prios.fill_frames(FRAMES_PER_TICK, &mut frames).await; prios.fill_frames(FRAMES_PER_TICK, &mut frames).await;
let len = frames.len(); let len = frames.len();
if len > 0 {
trace!("Tick {}", len);
}
for (_, frame) in frames { for (_, frame) in frames {
self.send_frame( self.send_frame(
frame, frame,
@ -225,6 +225,10 @@ impl BParticipant {
.await .await
.unwrap(); .unwrap();
async_std::task::sleep(TICK_TIME).await; async_std::task::sleep(TICK_TIME).await;
i += 1;
if i.rem_euclid(1000) == 0 {
trace!("Did 1000 ticks");
}
//shutdown after all msg are send! //shutdown after all msg are send!
if closing_up && (len == 0) { if closing_up && (len == 0) {
break; break;
@ -251,34 +255,30 @@ impl BParticipant {
// find out ideal channel here // find out ideal channel here
//TODO: just take first //TODO: just take first
let mut lock = self.channels.write().await; let mut lock = self.channels.write().await;
if let Some(ci) = lock.get_mut(0) { if let Some(ci) = lock.values_mut().next() {
//note: this is technically wrong we should only increase when it suceeded, but //we are increasing metrics without checking the result to please
// this requiered me to clone `frame` which is a to big performance impact for // borrow_checker. otherwise we would need to close `frame` what we
// error handling // dont want!
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
frames_out_total_cache frames_out_total_cache
.with_label_values(ci.cid, &frame) .with_label_values(ci.cid, &frame)
.inc(); .inc();
if let Err(e) = ci.b2w_frame_s.send(frame).await { if let Err(e) = ci.b2w_frame_s.send(frame).await {
warn!( let cid = ci.cid;
?e, info!(?e, ?cid, "channel no longer available");
"The channel got closed unexpectedly, cleaning it up now." if let Some(ci) = self.channels.write().await.remove(&cid) {
); trace!(?cid, "stopping read protocol");
let ci = lock.remove(0);
if let Err(e) = ci.b2r_read_shutdown.send(()) { if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!( trace!(?cid, ?e, "seems like was already shut down");
?e, }
"Error shutdowning channel, which is prob fine as we detected it to no \ }
longer work in the first place"
);
};
//TODO FIXME tags: takeover channel multiple //TODO FIXME tags: takeover channel multiple
info!( info!(
"FIXME: the frame is actually drop. which is fine for now as the participant \ "FIXME: the frame is actually drop. which is fine for now as the participant \
will be closed, but not if we do channel-takeover" will be closed, but not if we do channel-takeover"
); );
//TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant
self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
.await; .await;
false false
} else { } else {
@ -291,7 +291,12 @@ impl BParticipant {
guard.0 = now; guard.0 = now;
let occurrences = guard.1 + 1; let occurrences = guard.1 + 1;
guard.1 = 0; guard.1 = 0;
error!(?occurrences, "Participant has no channel to communicate on"); let lastframe = frame;
error!(
?occurrences,
?lastframe,
"Participant has no channel to communicate on"
);
} else { } else {
guard.1 += 1; guard.1 += 1;
} }
@ -301,7 +306,7 @@ impl BParticipant {
async fn handle_frames_mgr( async fn handle_frames_mgr(
&self, &self,
mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, mut w2b_frames_r: mpsc::UnboundedReceiver<C2pFrame>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>, mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>, a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
@ -313,8 +318,22 @@ impl BParticipant {
let mut dropped_cnt = 0u64; let mut dropped_cnt = 0u64;
let mut dropped_sid = Sid::new(0); let mut dropped_sid = Sid::new(0);
while let Some((cid, frame)) = w2b_frames_r.next().await { while let Some((cid, result_frame)) = w2b_frames_r.next().await {
//trace!("handling frame"); //trace!(?result_frame, "handling frame");
let frame = match result_frame {
Ok(frame) => frame,
Err(()) => {
// The read protocol stopped, i need to make sure that write gets stopped
debug!("read protocol was closed. Stopping write protocol");
if let Some(ci) = self.channels.write().await.get_mut(&cid) {
ci.b2w_frame_s
.close()
.await
.expect("couldn't stop write protocol");
}
continue;
},
};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{ {
let cid_string = cid.to_string(); let cid_string = cid.to_string();
@ -323,8 +342,6 @@ impl BParticipant {
.with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()])
.inc(); .inc();
} }
#[cfg(not(feature = "metrics"))]
let _cid = cid;
match frame { match frame {
Frame::OpenStream { Frame::OpenStream {
sid, sid,
@ -395,7 +412,7 @@ impl BParticipant {
false false
}; };
if finished { if finished {
//debug!(?mid, "finished receiving message"); //trace!(?mid, "finished receiving message");
let imsg = messages.remove(&mid).unwrap(); let imsg = messages.remove(&mid).unwrap();
if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) { if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) {
if let Err(e) = si.b2a_msg_recv_s.send(imsg).await { if let Err(e) = si.b2a_msg_recv_s.send(imsg).await {
@ -448,7 +465,7 @@ impl BParticipant {
async fn create_channel_mgr( async fn create_channel_mgr(
&self, &self,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>, s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, w2b_frames_s: mpsc::UnboundedSender<C2pFrame>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start create_channel_mgr"); trace!("Start create_channel_mgr");
@ -462,7 +479,7 @@ impl BParticipant {
let channels = self.channels.clone(); let channels = self.channels.clone();
async move { async move {
let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid); let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid);
channels.write().await.push(ChannelInfo { channels.write().await.insert(cid, ChannelInfo {
cid, cid,
cid_string: cid.to_string(), cid_string: cid.to_string(),
b2w_frame_s, b2w_frame_s,
@ -477,13 +494,20 @@ impl BParticipant {
trace!(?cid, "Running channel in participant"); trace!(?cid, "Running channel in participant");
channel channel
.run(protocol, w2b_frames_s, leftover_cid_frame) .run(protocol, w2b_frames_s, leftover_cid_frame)
.instrument(tracing::info_span!("", ?cid))
.await; .await;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
self.metrics self.metrics
.channels_disconnected_total .channels_disconnected_total
.with_label_values(&[&self.remote_pid_string]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
trace!(?cid, "Channel got closed"); info!(?cid, "Channel got closed");
//maybe channel got already dropped, we don't know.
channels.write().await.remove(&cid);
trace!(?cid, "Channel cleanup completed");
//TEMP FIX: as we dont have channel takeover yet drop the whole
// bParticipant
self.close_write_api(None).await;
} }
}, },
) )
@ -562,6 +586,9 @@ impl BParticipant {
trace!("Start participant_shutdown_mgr"); trace!("Start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap(); let sender = s2b_shutdown_bparticipant_r.await.unwrap();
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
self.close_api(None).await; self.close_api(None).await;
debug!("Closing all managers"); debug!("Closing all managers");
@ -572,10 +599,30 @@ impl BParticipant {
} }
b2b_prios_flushed_r.await.unwrap(); b2b_prios_flushed_r.await.unwrap();
if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error
{
debug!("Sending shutdown frame after flushed all prios");
if !self
.send_frame(
Frame::Shutdown,
#[cfg(feature = "metrics")]
&mut send_cache,
)
.await
{
warn!("couldn't send shutdown frame, are channels already closed?");
}
}
debug!("Closing all channels, after flushed prios"); debug!("Closing all channels, after flushed prios");
for ci in self.channels.write().await.drain(..) { for (cid, ci) in self.channels.write().await.drain() {
if let Err(e) = ci.b2r_read_shutdown.send(()) { if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); debug!(
?e,
?cid,
"Seems like this read protocol got already dropped by closing the Stream \
itself, ignoring"
);
}; };
} }
@ -647,7 +694,7 @@ impl BParticipant {
si.send_closed.store(true, Ordering::Relaxed); si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.close_channel(); si.b2a_msg_recv_s.close_channel();
}, },
None => warn!("Couldn't find the stream, might be simulanious close from remote"), None => warn!("Couldn't find the stream, might be simultaneous close from remote"),
} }
//TODO: what happens if RIGHT NOW the remote sends a StreamClose and this //TODO: what happens if RIGHT NOW the remote sends a StreamClose and this
@ -711,21 +758,29 @@ impl BParticipant {
) )
} }
/// close streams and set err async fn close_write_api(&self, reason: Option<ParticipantError>) {
async fn close_api(&self, reason: Option<ParticipantError>) { trace!(?reason, "close_api");
//closing api::Participant is done by closing all channels, exepct for the
// shutdown channel at this point!
let mut lock = self.shutdown_info.write().await; let mut lock = self.shutdown_info.write().await;
if let Some(r) = reason { if let Some(r) = reason {
lock.error = Some(r); lock.error = Some(r);
} }
lock.b2a_stream_opened_s.close_channel(); lock.b2a_stream_opened_s.close_channel();
debug!("Closing all streams for write");
for (sid, si) in self.streams.write().await.iter() {
trace!(?sid, "Shutting down Stream for write");
si.send_closed.store(true, Ordering::Relaxed);
}
}
///closing api::Participant is done by closing all channels, exepct for the
/// shutdown channel at this point!
async fn close_api(&self, reason: Option<ParticipantError>) {
self.close_write_api(reason).await;
debug!("Closing all streams"); debug!("Closing all streams");
for (sid, si) in self.streams.write().await.drain() { for (sid, si) in self.streams.write().await.drain() {
trace!(?sid, "Shutting down Stream"); trace!(?sid, "Shutting down Stream");
si.b2a_msg_recv_s.close_channel(); si.b2a_msg_recv_s.close_channel();
si.send_closed.store(true, Ordering::Relaxed);
} }
} }
} }

View File

@ -16,8 +16,6 @@ use futures::channel::oneshot;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
#[cfg(feature = "metrics")] use std::sync::Arc; #[cfg(feature = "metrics")] use std::sync::Arc;
use tracing::*;
const PRIO_MAX: usize = 64; const PRIO_MAX: usize = 64;
#[derive(Default)] #[derive(Default)]
@ -148,11 +146,8 @@ impl PrioManager {
async fn tick(&mut self) { async fn tick(&mut self) {
// Check Range // Check Range
let mut messages = 0;
let mut closed = 0;
for (prio, sid, msg) in self.messages_rx.try_iter() { for (prio, sid, msg) in self.messages_rx.try_iter() {
debug_assert!(prio as usize <= PRIO_MAX); debug_assert!(prio as usize <= PRIO_MAX);
messages += 1;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{ {
let sid_string = sid.to_string(); let sid_string = sid.to_string();
@ -173,7 +168,11 @@ impl PrioManager {
} }
//this must be AFTER messages //this must be AFTER messages
for (sid, return_sender) in self.sid_flushed_rx.try_iter() { for (sid, return_sender) in self.sid_flushed_rx.try_iter() {
closed += 1; #[cfg(feature = "metrics")]
self.metrics
.streams_flushed
.with_label_values(&[&self.pid])
.inc();
if let Some(cnt) = self.sid_owned.get_mut(&sid) { if let Some(cnt) = self.sid_owned.get_mut(&sid) {
// register sender // register sender
cnt.empty_notify = Some(return_sender); cnt.empty_notify = Some(return_sender);
@ -182,9 +181,6 @@ impl PrioManager {
return_sender.send(()).unwrap(); return_sender.send(()).unwrap();
} }
} }
if messages > 0 || closed > 0 {
trace!(?messages, ?closed, "tick");
}
} }
//if None returned, we are empty! //if None returned, we are empty!
@ -256,7 +252,6 @@ impl PrioManager {
} }
} }
} else { } else {
trace!(?msg.mid, "Repush message");
self.messages[prio as usize].push_front((sid, msg)); self.messages[prio as usize].push_front((sid, msg));
} }
}, },

View File

@ -1,6 +1,9 @@
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use crate::metrics::{CidFrameCache, NetworkMetrics}; use crate::metrics::{CidFrameCache, NetworkMetrics};
use crate::types::{Cid, Frame, Mid, Pid, Sid}; use crate::{
participant::C2pFrame,
types::{Cid, Frame, Mid, Pid, Sid},
};
use async_std::{ use async_std::{
net::{TcpStream, UdpSocket}, net::{TcpStream, UdpSocket},
prelude::*, prelude::*,
@ -71,8 +74,8 @@ impl TcpProtocol {
cid: Cid, cid: Cid,
mut stream: &TcpStream, mut stream: &TcpStream,
mut bytes: &mut [u8], mut bytes: &mut [u8],
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>, mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
) -> bool { ) -> bool {
match select! { match select! {
r = stream.read_exact(&mut bytes).fuse() => Some(r), r = stream.read_exact(&mut bytes).fuse() => Some(r),
@ -80,20 +83,18 @@ impl TcpProtocol {
} { } {
Some(Ok(_)) => false, Some(Ok(_)) => false,
Some(Err(e)) => { Some(Err(e)) => {
debug!( info!(?e, "Closing tcp protocol due to read error");
?cid, //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every
?e, // channel is holding a sender! thats why Ne need a explicit
"Closing tcp protocol due to read error, sending close frame to gracefully \ // STOP here
shutdown"
);
w2c_cid_frame_s w2c_cid_frame_s
.send((cid, Frame::Shutdown)) .send((cid, Err(())))
.await .await
.expect("Channel or Participant seems no longer to exist to be Shutdown"); .expect("Channel or Participant seems no longer to exist");
true true
}, },
None => { None => {
trace!(?cid, "shutdown requested"); trace!("shutdown requested");
true true
}, },
} }
@ -102,7 +103,7 @@ impl TcpProtocol {
pub async fn read_from_wire( pub async fn read_from_wire(
&self, &self,
cid: Cid, cid: Cid,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
end_r: oneshot::Receiver<()>, end_r: oneshot::Receiver<()>,
) { ) {
trace!("Starting up tcp read()"); trace!("Starting up tcp read()");
@ -118,8 +119,8 @@ impl TcpProtocol {
macro_rules! read_or_close { macro_rules! read_or_close {
($x:expr) => { ($x:expr) => {
if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await { if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await {
info!("Tcp stream closed, shutting down read"); trace!("read_or_close requested a shutdown");
break; break;
} }
}; };
@ -213,7 +214,7 @@ impl TcpProtocol {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc(); metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s w2c_cid_frame_s
.send((cid, frame)) .send((cid, Ok(frame)))
.await .await
.expect("Channel or Participant seems no longer to exist"); .expect("Channel or Participant seems no longer to exist");
} }
@ -228,7 +229,7 @@ impl TcpProtocol {
) -> bool { ) -> bool {
match stream.write_all(&bytes).await { match stream.write_all(&bytes).await {
Err(e) => { Err(e) => {
debug!( info!(
?e, ?e,
"Got an error writing to tcp, going to close this channel" "Got an error writing to tcp, going to close this channel"
); );
@ -255,7 +256,7 @@ impl TcpProtocol {
macro_rules! write_or_close { macro_rules! write_or_close {
($x:expr) => { ($x:expr) => {
if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await { if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await {
info!("Tcp stream closed, shutting down write"); trace!("write_or_close requested a shutdown");
break; break;
} }
}; };
@ -342,7 +343,7 @@ impl UdpProtocol {
pub async fn read_from_wire( pub async fn read_from_wire(
&self, &self,
cid: Cid, cid: Cid,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
end_r: oneshot::Receiver<()>, end_r: oneshot::Receiver<()>,
) { ) {
trace!("Starting up udp read()"); trace!("Starting up udp read()");
@ -356,7 +357,14 @@ impl UdpProtocol {
let mut data_in = self.data_in.lock().await; let mut data_in = self.data_in.lock().await;
let mut end_r = end_r.fuse(); let mut end_r = end_r.fuse();
while let Some(bytes) = select! { while let Some(bytes) = select! {
r = data_in.next().fuse() => r, r = data_in.next().fuse() => match r {
Some(r) => Some(r),
None => {
info!("Udp read ended");
w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist");
None
}
},
_ = end_r => None, _ = end_r => None,
} { } {
trace!("Got raw UDP message with len: {}", bytes.len()); trace!("Got raw UDP message with len: {}", bytes.len());
@ -454,7 +462,7 @@ impl UdpProtocol {
}; };
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc(); metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, frame)).await.unwrap(); w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap();
} }
trace!("Shutting down udp read()"); trace!("Shutting down udp read()");
} }
@ -565,10 +573,7 @@ impl UdpProtocol {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{ use crate::{metrics::NetworkMetrics, types::Pid};
metrics::NetworkMetrics,
types::{Cid, Pid},
};
use async_std::net; use async_std::net;
use futures::{executor::block_on, stream::StreamExt}; use futures::{executor::block_on, stream::StreamExt};
use std::sync::Arc; use std::sync::Arc;
@ -595,7 +600,7 @@ mod tests {
client.flush(); client.flush();
//handle data //handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid; let cid2 = cid;
let t = std::thread::spawn(move || { let t = std::thread::spawn(move || {
@ -608,10 +613,10 @@ mod tests {
//async_std::task::sleep(std::time::Duration::from_millis(1000)); //async_std::task::sleep(std::time::Duration::from_millis(1000));
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r); assert_eq!(cid, cid_r);
if let Frame::Handshake { if let Ok(Frame::Handshake {
magic_number, magic_number,
version, version,
} = frame }) = frame
{ {
assert_eq!(&magic_number, b"HELLOWO"); assert_eq!(&magic_number, b"HELLOWO");
assert_eq!(version, [1337, 0, 42]); assert_eq!(version, [1337, 0, 42]);
@ -644,7 +649,7 @@ mod tests {
client.flush(); client.flush();
//handle data //handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid; let cid2 = cid;
let t = std::thread::spawn(move || { let t = std::thread::spawn(move || {
@ -656,7 +661,7 @@ mod tests {
// Assert than we get some value back! Its a Raw! // Assert than we get some value back! Its a Raw!
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r); assert_eq!(cid, cid_r);
if let Frame::Raw(data) = frame { if let Ok(Frame::Raw(data)) = frame {
assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf"); assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf");
} else { } else {
panic!("wrong frame type"); panic!("wrong frame type");

View File

@ -334,6 +334,13 @@ impl Scheduler {
); );
}; };
} }
debug!("shutting down protocol listeners");
for (addr, end_channel_sender) in self.channel_listener.write().await.drain() {
trace!(?addr, "stopping listen on protocol");
if let Err(e) = end_channel_sender.send(()) {
warn!(?addr, ?e, "listener crashed/disconnected already");
}
}
debug!("Scheduler shut down gracefully"); debug!("Scheduler shut down gracefully");
//removing the possibility to create new participants, needed to close down //removing the possibility to create new participants, needed to close down
// some mgr: // some mgr:
@ -512,7 +519,11 @@ impl Scheduler {
metrics.clone(), metrics.clone(),
send_handshake, send_handshake,
); );
match handshake.setup(&protocol).await { match handshake
.setup(&protocol)
.instrument(tracing::info_span!("handshake", ?cid))
.await
{
Ok((pid, sid, secret, leftover_cid_frame)) => { Ok((pid, sid, secret, leftover_cid_frame)) => {
trace!( trace!(
?cid, ?cid,
@ -583,14 +594,18 @@ impl Scheduler {
} }
} else { } else {
let pi = &participants[&pid]; let pi = &participants[&pid];
trace!("2nd+ channel of participant, going to compare security ids"); trace!(
?cid,
"2nd+ channel of participant, going to compare security ids"
);
if pi.secret != secret { if pi.secret != secret {
warn!( warn!(
?cid,
?pid, ?pid,
?secret, ?secret,
"Detected incompatible Secret!, this is probably an attack!" "Detected incompatible Secret!, this is probably an attack!"
); );
error!("Just dropping here, TODO handle this correctly!"); error!(?cid, "Just dropping here, TODO handle this correctly!");
//TODO //TODO
if let Some(pid_oneshot) = s2a_return_pid_s { if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
@ -604,6 +619,7 @@ impl Scheduler {
return; return;
} }
error!( error!(
?cid,
"Ufff i cant answer the pid_oneshot. as i need to create the SAME \ "Ufff i cant answer the pid_oneshot. as i need to create the SAME \
participant. maybe switch to ARC" participant. maybe switch to ARC"
); );
@ -612,9 +628,10 @@ impl Scheduler {
// move directly to participant! // move directly to participant!
}, },
Err(()) => { Err(()) => {
debug!(?cid, "Handshake from a new connection failed");
if let Some(pid_oneshot) = s2a_return_pid_s { if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
trace!("returning the Err to api who requested the connect"); trace!(?cid, "returning the Err to api who requested the connect");
pid_oneshot pid_oneshot
.send(Err(std::io::Error::new( .send(Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied, std::io::ErrorKind::PermissionDenied,
@ -625,7 +642,7 @@ impl Scheduler {
}, },
} }
} }
.instrument(tracing::trace_span!("")), .instrument(tracing::info_span!("")),
); /*WORKAROUND FOR SPAN NOT TO GET LOST*/ ); /*WORKAROUND FOR SPAN NOT TO GET LOST*/
} }
} }

View File

@ -146,10 +146,18 @@ impl Pid {
/// This will panic if pid i greater than 7, as I do not want you to use /// This will panic if pid i greater than 7, as I do not want you to use
/// this in production! /// this in production!
#[doc(hidden)] #[doc(hidden)]
pub fn fake(pid: u8) -> Self { pub fn fake(pid_offset: u8) -> Self {
assert!(pid < 8); assert!(pid_offset < 8);
let o = pid_offset as u128;
const OFF: [u128; 5] = [
0x40,
0x40 * 0x40,
0x40 * 0x40 * 0x40,
0x40 * 0x40 * 0x40 * 0x40,
0x40 * 0x40 * 0x40 * 0x40 * 0x40,
];
Self { Self {
internal: pid as u128, internal: o + o * OFF[0] + o * OFF[1] + o * OFF[2] + o * OFF[3] + o * OFF[4],
} }
} }
@ -251,7 +259,9 @@ mod tests {
#[test] #[test]
fn frame_creation() { fn frame_creation() {
Pid::new(); Pid::new();
assert_eq!(format!("{}", Pid::fake(2)), "CAAAAA"); assert_eq!(format!("{}", Pid::fake(0)), "AAAAAA");
assert_eq!(format!("{}", Pid::fake(1)), "BBBBBB");
assert_eq!(format!("{}", Pid::fake(2)), "CCCCCC");
} }
#[test] #[test]

View File

@ -42,10 +42,11 @@ fn close_participant() {
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap(); block_on(p1_a.disconnect()).unwrap();
assert_eq!( //As no more read/write is run disconnect is successful or already disconnected
block_on(p1_b.disconnect()), match block_on(p1_b.disconnect()) {
Err(ParticipantError::ParticipantDisconnected) Ok(_) | Err(ParticipantError::ParticipantDisconnected) => (),
); e => panic!("wrong disconnect type {:?}", e),
};
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!( assert_eq!(
@ -284,9 +285,9 @@ fn failed_stream_open_after_remote_part_is_closed() {
#[test] #[test]
fn open_participant_before_remote_part_is_closed() { fn open_participant_before_remote_part_is_closed() {
let (n_a, f) = Network::new(Pid::fake(1)); let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f); std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(2)); let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f); std::thread::spawn(f);
let addr = tcp(); let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap(); block_on(n_a.listen(addr.clone())).unwrap();
@ -304,9 +305,9 @@ fn open_participant_before_remote_part_is_closed() {
#[test] #[test]
fn open_participant_after_remote_part_is_closed() { fn open_participant_after_remote_part_is_closed() {
let (n_a, f) = Network::new(Pid::fake(1)); let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f); std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(2)); let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f); std::thread::spawn(f);
let addr = tcp(); let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap(); block_on(n_a.listen(addr.clone())).unwrap();
@ -321,3 +322,25 @@ fn open_participant_after_remote_part_is_closed() {
let mut s1_a = block_on(p_a.opened()).unwrap(); let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
} }
#[test]
fn close_network_scheduler_completely() {
let (n_a, f) = Network::new(Pid::fake(0));
let ha = std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(1));
let hb = std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
drop(n_a);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
ha.join().unwrap();
hb.join().unwrap();
}

View File

@ -10,10 +10,7 @@ use tracing_subscriber::EnvFilter;
use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE}; use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE};
#[allow(dead_code)] #[allow(dead_code)]
pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
if tracing {
sleep += 1000
}
if sleep > 0 { if sleep > 0 {
thread::sleep(Duration::from_millis(sleep)); thread::sleep(Duration::from_millis(sleep));
} }
@ -49,9 +46,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
pub async fn network_participant_stream( pub async fn network_participant_stream(
addr: ProtocolAddr, addr: ProtocolAddr,
) -> (Network, Participant, Stream, Network, Participant, Stream) { ) -> (Network, Participant, Stream, Network, Participant, Stream) {
let (n_a, f_a) = Network::new(Pid::fake(1)); let (n_a, f_a) = Network::new(Pid::fake(0));
std::thread::spawn(f_a); std::thread::spawn(f_a);
let (n_b, f_b) = Network::new(Pid::fake(2)); let (n_b, f_b) = Network::new(Pid::fake(1));
std::thread::spawn(f_b); std::thread::spawn(f_b);
n_a.listen(addr.clone()).await.unwrap(); n_a.listen(addr.clone()).await.unwrap();

View File

@ -79,30 +79,32 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event { pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event {
if let Some(client) = server.state().read_storage::<Client>().get(entity) { if let Some(client) = server.state().read_storage::<Client>().get(entity) {
trace!("Closing participant of client");
let participant = match client.participant.try_lock() { let participant = match client.participant.try_lock() {
Ok(mut p) => p.take().unwrap(), Ok(mut p) => p.take().unwrap(),
Err(e) => { Err(e) => {
error!(?e, "coudln't lock participant for removal"); error!(?e, ?entity, "coudln't lock participant for removal");
return Event::ClientDisconnected { entity }; return Event::ClientDisconnected { entity };
}, },
}; };
std::thread::spawn(|| {
let pid = participant.remote_pid(); let pid = participant.remote_pid();
std::thread::spawn(move || {
let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity);
let _enter = span.enter();
let now = std::time::Instant::now(); let now = std::time::Instant::now();
trace!(?pid, "start disconnect"); debug!(?pid, ?entity, "Start handle disconnect of client");
if let Err(e) = block_on(participant.disconnect()) { if let Err(e) = block_on(participant.disconnect()) {
debug!( debug!(
?e, ?e,
?pid,
"Error when disconnecting client, maybe the pipe already broke" "Error when disconnecting client, maybe the pipe already broke"
); );
}; };
trace!(?pid, "finished disconnect"); trace!(?pid, "finished disconnect");
let elapsed = now.elapsed(); let elapsed = now.elapsed();
if elapsed.as_millis() > 100 { if elapsed.as_millis() > 100 {
warn!(?elapsed, "disconecting took quite long"); warn!(?elapsed, ?pid, "disconecting took quite long");
} else { } else {
debug!(?elapsed, "disconecting took"); debug!(?elapsed, ?pid, "disconecting took");
} }
}); });
} }

View File

@ -274,16 +274,19 @@ impl Server {
let mut metrics = ServerMetrics::new(); let mut metrics = ServerMetrics::new();
// register all metrics submodules here // register all metrics submodules here
let tick_metrics = TickMetrics::new(metrics.registry(), metrics.tick_clone()) let tick_metrics = TickMetrics::new(metrics.tick_clone())
.expect("Failed to initialize server tick metrics submodule."); .expect("Failed to initialize server tick metrics submodule.");
metrics tick_metrics
.run(settings.metrics_address) .register(&metrics.registry())
.expect("Failed to initialize server metrics submodule."); .expect("failed to register tick metrics");
let thread_pool = ThreadPoolBuilder::new() let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string()) .name("veloren-worker".to_string())
.build(); .build();
let (network, f) = Network::new(Pid::new()); let (network, f) = Network::new_with_registry(Pid::new(), &metrics.registry());
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
thread_pool.execute(f); thread_pool.execute(f);
block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;

View File

@ -33,7 +33,7 @@ pub struct ServerMetrics {
} }
impl TickMetrics { impl TickMetrics {
pub fn new(registry: &Registry, tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> { pub fn new(tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
let player_online = IntGauge::with_opts(Opts::new( let player_online = IntGauge::with_opts(Opts::new(
"player_online", "player_online",
"shows the number of clients connected to the server", "shows the number of clients connected to the server",
@ -74,15 +74,6 @@ impl TickMetrics {
.expect("Time went backwards"); .expect("Time went backwards");
start_time.set(since_the_epoch.as_secs().try_into()?); start_time.set(since_the_epoch.as_secs().try_into()?);
registry.register(Box::new(player_online.clone()))?;
registry.register(Box::new(entity_count.clone()))?;
registry.register(Box::new(build_info.clone()))?;
registry.register(Box::new(start_time.clone()))?;
registry.register(Box::new(time_of_day.clone()))?;
registry.register(Box::new(chonks_count.clone()))?;
registry.register(Box::new(chunks_count.clone()))?;
registry.register(Box::new(tick_time.clone()))?;
Ok(Self { Ok(Self {
chonks_count, chonks_count,
chunks_count, chunks_count,
@ -97,6 +88,18 @@ impl TickMetrics {
}) })
} }
pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
registry.register(Box::new(self.player_online.clone()))?;
registry.register(Box::new(self.entity_count.clone()))?;
registry.register(Box::new(self.build_info.clone()))?;
registry.register(Box::new(self.start_time.clone()))?;
registry.register(Box::new(self.time_of_day.clone()))?;
registry.register(Box::new(self.chonks_count.clone()))?;
registry.register(Box::new(self.chunks_count.clone()))?;
registry.register(Box::new(self.tick_time.clone()))?;
Ok(())
}
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
} }

View File

@ -25,6 +25,7 @@ use hashbrown::HashMap;
use specs::{ use specs::{
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
}; };
use tracing::{debug, error, info, warn};
impl Sys { impl Sys {
///We needed to move this to a async fn, if we would use a async closures ///We needed to move this to a async fn, if we would use a async closures
@ -267,17 +268,13 @@ impl Sys {
let msg = mode.new_message(*from, message); let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg)); new_chat_msgs.push((Some(entity), msg));
} else { } else {
tracing::error!("Could not send message. Missing player uid"); error!("Could not send message. Missing player uid");
} }
}, },
Err(ChatMsgValidationError::TooLong) => { Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG; let max = MAX_BYTES_CHAT_MSG;
let len = message.len(); let len = message.len();
tracing::warn!( warn!(?len, ?max, "Recieved a chat message that's too long")
?len,
?max,
"Recieved a chat message that's too long"
)
}, },
} }
}, },
@ -342,7 +339,9 @@ impl Sys {
client.notify(ServerMsg::Disconnect); client.notify(ServerMsg::Disconnect);
}, },
ClientMsg::Terminate => { ClientMsg::Terminate => {
debug!(?entity, "Client send message to termitate session");
server_emitter.emit(ServerEvent::ClientDisconnect(entity)); server_emitter.emit(ServerEvent::ClientDisconnect(entity));
break Ok(());
}, },
ClientMsg::RequestCharacterList => { ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) { if let Some(player) = players.get(entity) {
@ -351,11 +350,7 @@ impl Sys {
}, },
ClientMsg::CreateCharacter { alias, tool, body } => { ClientMsg::CreateCharacter { alias, tool, body } => {
if let Err(error) = alias_validator.validate(&alias) { if let Err(error) = alias_validator.validate(&alias) {
tracing::debug!( debug!(?error, ?alias, "denied alias as it contained a banned word");
?error,
?alias,
"denied alias as it contained a banned word"
);
client.notify(ServerMsg::CharacterActionError(error.to_string())); client.notify(ServerMsg::CharacterActionError(error.to_string()));
} else if let Some(player) = players.get(entity) { } else if let Some(player) = players.get(entity) {
character_loader.create_character( character_loader.create_character(
@ -522,10 +517,15 @@ impl<'a> System<'a> for Sys {
// Update client ping. // Update client ping.
if cnt > 0 { if cnt > 0 {
client.last_ping = time.0 client.last_ping = time.0
} else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout } else if time.0 - client.last_ping > CLIENT_TIMEOUT
|| network_err.is_err() // Timeout
{
info!(?entity, "timeout error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if network_err.is_err()
// Postbox error // Postbox error
{ {
debug!(?entity, "postbox error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect(entity)); server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 { } else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 {
// Try pinging the client if the timeout is nearing. // Try pinging the client if the timeout is nearing.