zests fix - capitalize first letter

This commit is contained in:
Marcel Märtens 2020-07-06 00:13:53 +02:00
parent 5f902b5eab
commit 4cefdcefea
13 changed files with 184 additions and 184 deletions

View File

@ -44,7 +44,7 @@ impl FileInfo {
let mt = match fs::metadata(&path).await { let mt = match fs::metadata(&path).await {
Err(e) => { Err(e) => {
println!( println!(
"cannot get metadata for file: {:?}, does it exist? Error: {:?}", "Cannot get metadata for file: {:?}, does it exist? Error: {:?}",
&path, &e &path, &e
); );
return None; return None;

View File

@ -70,7 +70,7 @@ fn file_exists(file: String) -> Result<(), String> {
if file.exists() { if file.exists() {
Ok(()) Ok(())
} else { } else {
Err(format!("file does not exist")) Err(format!("File does not exist"))
} }
} }

View File

@ -54,12 +54,12 @@ impl Server {
} }
async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver<LocalCommand>) { async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver<LocalCommand>) {
trace!("start command_manager"); trace!("Start command_manager");
command_receiver command_receiver
.for_each_concurrent(None, async move |cmd| { .for_each_concurrent(None, async move |cmd| {
match cmd { match cmd {
LocalCommand::Shutdown => { LocalCommand::Shutdown => {
println!("shutting down service"); println!("Shutting down service");
return; return;
}, },
LocalCommand::Disconnect => { LocalCommand::Disconnect => {
@ -67,21 +67,21 @@ impl Server {
for (_, p) in self.network.participants().await.drain() { for (_, p) in self.network.participants().await.drain() {
self.network.disconnect(p).await.unwrap(); self.network.disconnect(p).await.unwrap();
} }
println!("disconnecting all connections"); println!("Disconnecting all connections");
return; return;
}, },
LocalCommand::Connect(addr) => { LocalCommand::Connect(addr) => {
println!("trying to connect to: {:?}", &addr); println!("Trying to connect to: {:?}", &addr);
match self.network.connect(addr.clone()).await { match self.network.connect(addr.clone()).await {
Ok(p) => self.loop_participant(p).await, Ok(p) => self.loop_participant(p).await,
Err(e) => { Err(e) => {
println!("failled to connect to {:?}, err: {:?}", &addr, e); println!("Failled to connect to {:?}, err: {:?}", &addr, e);
}, },
} }
}, },
LocalCommand::Serve(fileinfo) => { LocalCommand::Serve(fileinfo) => {
self.served.write().await.push(fileinfo.clone()); self.served.write().await.push(fileinfo.clone());
println!("serving file: {:?}", fileinfo.path); println!("Serving file: {:?}", fileinfo.path);
}, },
LocalCommand::List => { LocalCommand::List => {
let mut total_file_infos = vec![]; let mut total_file_infos = vec![];
@ -110,11 +110,11 @@ impl Server {
} }
}) })
.await; .await;
trace!("stop command_manager"); trace!("Stop command_manager");
} }
async fn connect_manager(&self) { async fn connect_manager(&self) {
trace!("start connect_manager"); trace!("Start connect_manager");
let iter = futures::stream::unfold((), |_| { let iter = futures::stream::unfold((), |_| {
self.network.connected().map(|r| r.ok().map(|v| (v, ()))) self.network.connected().map(|r| r.ok().map(|v| (v, ())))
}); });
@ -123,7 +123,7 @@ impl Server {
self.loop_participant(participant).await; self.loop_participant(participant).await;
}) })
.await; .await;
trace!("stop connect_manager"); trace!("Stop connect_manager");
} }
async fn loop_participant(&self, p: Arc<Participant>) { async fn loop_participant(&self, p: Arc<Participant>) {
@ -133,7 +133,7 @@ impl Server {
p.opened().await, p.opened().await,
p.opened().await, p.opened().await,
) { ) {
debug!(?p, "connection successfully initiated"); debug!(?p, "Connection successfully initiated");
let id = p.remote_pid(); let id = p.remote_pid();
let ri = Arc::new(Mutex::new(RemoteInfo::new(cmd_out, file_out, p))); let ri = Arc::new(Mutex::new(RemoteInfo::new(cmd_out, file_out, p)));
self.remotes.write().await.insert(id, ri.clone()); self.remotes.write().await.insert(id, ri.clone());
@ -146,24 +146,24 @@ impl Server {
async fn handle_remote_cmd(&self, mut stream: Stream, remote_info: Arc<Mutex<RemoteInfo>>) { async fn handle_remote_cmd(&self, mut stream: Stream, remote_info: Arc<Mutex<RemoteInfo>>) {
while let Ok(msg) = stream.recv::<Command>().await { while let Ok(msg) = stream.recv::<Command>().await {
println!("got message: {:?}", &msg); println!("Got message: {:?}", &msg);
match msg { match msg {
Command::List => { Command::List => {
info!("request to send my list"); info!("Request to send my list");
let served = self.served.read().await.clone(); let served = self.served.read().await.clone();
stream.send(served).unwrap(); stream.send(served).unwrap();
}, },
Command::Get(id) => { Command::Get(id) => {
for file_info in self.served.read().await.iter() { for file_info in self.served.read().await.iter() {
if file_info.id() == id { if file_info.id() == id {
info!("request to send file i got, sending it"); info!("Request to send file i got, sending it");
if let Ok(data) = file_info.load().await { if let Ok(data) = file_info.load().await {
match remote_info.lock().await.file_out.send((file_info, data)) { match remote_info.lock().await.file_out.send((file_info, data)) {
Ok(_) => debug!("send file"), Ok(_) => debug!("send file"),
Err(e) => error!(?e, "sending file failed"), Err(e) => error!(?e, "sending file failed"),
} }
} else { } else {
warn!("cannot send file as loading failed, oes it still exist?"); warn!("Cannot send file as loading failed, oes it still exist?");
} }
} }
} }
@ -174,18 +174,18 @@ impl Server {
async fn handle_files(&self, mut stream: Stream, _remote_info: Arc<Mutex<RemoteInfo>>) { async fn handle_files(&self, mut stream: Stream, _remote_info: Arc<Mutex<RemoteInfo>>) {
while let Ok((fi, data)) = stream.recv::<(FileInfo, Vec<u8>)>().await { while let Ok((fi, data)) = stream.recv::<(FileInfo, Vec<u8>)>().await {
debug!(?fi, "got file"); debug!(?fi, "Got file");
let path = self.receiving_files.lock().await.remove(&fi.id()).flatten(); let path = self.receiving_files.lock().await.remove(&fi.id()).flatten();
let path: PathBuf = match &path { let path: PathBuf = match &path {
Some(path) => shellexpand::tilde(&path).parse().unwrap(), Some(path) => shellexpand::tilde(&path).parse().unwrap(),
None => { None => {
let mut path = std::env::current_dir().unwrap(); let mut path = std::env::current_dir().unwrap();
path.push(fi.path().file_name().unwrap()); path.push(fi.path().file_name().unwrap());
trace!("no path provided, saving down to {:?}", path); trace!("No path provided, saving down to {:?}", path);
PathBuf::from(path) PathBuf::from(path)
}, },
}; };
debug!("received file, going to save it under {:?}", path); debug!("Received file, going to save it under {:?}", path);
fs::write(path, data).await.unwrap(); fs::write(path, data).await.unwrap();
} }
} }

View File

@ -98,7 +98,7 @@ fn main() {
let address = match matches.value_of("protocol") { let address = match matches.value_of("protocol") {
Some("tcp") => Address::Tcp(format!("{}:{}", ip, port).parse().unwrap()), Some("tcp") => Address::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
Some("udp") => Address::Udp(format!("{}:{}", ip, port).parse().unwrap()), Some("udp") => Address::Udp(format!("{}:{}", ip, port).parse().unwrap()),
_ => panic!("invalid mode, run --help!"), _ => panic!("Invalid mode, run --help!"),
}; };
let mut background = None; let mut background = None;
@ -111,7 +111,7 @@ fn main() {
thread::sleep(Duration::from_millis(200)); //start client after server thread::sleep(Duration::from_millis(200)); //start client after server
client(address); client(address);
}, },
_ => panic!("invalid mode, run --help!"), _ => panic!("Invalid mode, run --help!"),
}; };
if let Some(background) = background { if let Some(background) = background {
background.join().unwrap(); background.join().unwrap();
@ -126,7 +126,7 @@ fn server(address: Address) {
block_on(server.listen(address)).unwrap(); block_on(server.listen(address)).unwrap();
loop { loop {
info!("waiting for participant to connect"); info!("Waiting for participant to connect");
let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 let p1 = block_on(server.connected()).unwrap(); //remote representation of p1
let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
block_on(async { block_on(async {
@ -138,10 +138,10 @@ fn server(address: Address) {
let new = Instant::now(); let new = Instant::now();
let diff = new.duration_since(last); let diff = new.duration_since(last);
last = new; last = new;
println!("recv 1.000.000 took {}", diff.as_millis()); println!("Recv 1.000.000 took {}", diff.as_millis());
} }
} }
info!("other stream was closed"); info!("Other stream was closed");
}); });
} }
} }
@ -170,17 +170,17 @@ fn client(address: Address) {
let new = Instant::now(); let new = Instant::now();
let diff = new.duration_since(last); let diff = new.duration_since(last);
last = new; last = new;
println!("send 1.000.000 took {}", diff.as_millis()); println!("Send 1.000.000 took {}", diff.as_millis());
} }
if id > 2000000 { if id > 2000000 {
println!("stop"); println!("Stop");
std::thread::sleep(std::time::Duration::from_millis(5000)); std::thread::sleep(std::time::Duration::from_millis(5000));
break; break;
} }
} }
drop(s1); drop(s1);
std::thread::sleep(std::time::Duration::from_millis(5000)); std::thread::sleep(std::time::Duration::from_millis(5000));
info!("closing participant"); info!("Closing participant");
block_on(client.disconnect(p1)).unwrap(); block_on(client.disconnect(p1)).unwrap();
std::thread::sleep(std::time::Duration::from_millis(25000)); std::thread::sleep(std::time::Duration::from_millis(25000));
info!("DROPPING! client"); info!("DROPPING! client");

View File

@ -55,7 +55,7 @@ impl SimpleMetrics {
Ok(Some(rq)) => rq, Ok(Some(rq)) => rq,
Ok(None) => continue, Ok(None) => continue,
Err(e) => { Err(e) => {
println!("error: {}", e); println!("Error: {}", e);
break; break;
}, },
}; };
@ -76,7 +76,7 @@ impl SimpleMetrics {
_ => (), _ => (),
} }
} }
debug!("stopping tiny_http server to serve metrics"); debug!("Stopping tiny_http server to serve metrics");
})); }));
Ok(()) Ok(())
} }

View File

@ -198,7 +198,7 @@ impl Network {
registry: Option<&Registry>, registry: Option<&Registry>,
) -> (Self, impl std::ops::FnOnce()) { ) -> (Self, impl std::ops::FnOnce()) {
let p = participant_id; let p = participant_id;
debug!(?p, "starting Network"); debug!(?p, "Starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(participant_id, registry); Scheduler::new(participant_id, registry);
( (
@ -211,13 +211,13 @@ impl Network {
shutdown_sender: Some(shutdown_sender), shutdown_sender: Some(shutdown_sender),
}, },
move || { move || {
trace!(?p, "starting sheduler in own thread"); trace!(?p, "Starting sheduler in own thread");
let _handle = task::block_on( let _handle = task::block_on(
scheduler scheduler
.run() .run()
.instrument(tracing::info_span!("scheduler", ?p)), .instrument(tracing::info_span!("scheduler", ?p)),
); );
trace!(?p, "stopping sheduler and his own thread"); trace!(?p, "Stopping sheduler and his own thread");
}, },
) )
} }
@ -309,7 +309,7 @@ impl Network {
/// [`Addresses`]: crate::api::Address /// [`Addresses`]: crate::api::Address
pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> { pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>(); let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
debug!(?address, "connect to address"); debug!(?address, "Connect to address");
self.connect_sender self.connect_sender
.write() .write()
.await .await
@ -322,7 +322,7 @@ impl Network {
let pid = participant.remote_pid; let pid = participant.remote_pid;
debug!( debug!(
?pid, ?pid,
"received Participant id from remote and return to user" "Received Participant id from remote and return to user"
); );
let participant = Arc::new(participant); let participant = Arc::new(participant);
self.participants self.participants
@ -427,21 +427,21 @@ impl Network {
pub async fn disconnect(&self, participant: Arc<Participant>) -> Result<(), NetworkError> { pub async fn disconnect(&self, participant: Arc<Participant>) -> Result<(), NetworkError> {
// Remove, Close and try_unwrap error when unwrap fails! // Remove, Close and try_unwrap error when unwrap fails!
let pid = participant.remote_pid; let pid = participant.remote_pid;
debug!(?pid, "removing participant from network"); debug!(?pid, "Removing participant from network");
self.participants.write().await.remove(&pid)?; self.participants.write().await.remove(&pid)?;
participant.closed.store(true, Ordering::Relaxed); participant.closed.store(true, Ordering::Relaxed);
match Arc::try_unwrap(participant) { match Arc::try_unwrap(participant) {
Err(_) => { Err(_) => {
warn!( warn!(
"you are disconnecting and still keeping a reference to this participant, \ "You are disconnecting and still keeping a reference to this participant, \
this is a bad idea. Participant will only be dropped when you drop your last \ this is a bad idea. Participant will only be dropped when you drop your last \
reference" reference"
); );
Ok(()) Ok(())
}, },
Ok(mut participant) => { Ok(mut participant) => {
trace!("waiting now for participant to close"); trace!("Waiting now for participant to close");
let (finished_sender, finished_receiver) = oneshot::channel(); let (finished_sender, finished_receiver) = oneshot::channel();
// we are deleting here asyncly before DROP is called. Because this is done // we are deleting here asyncly before DROP is called. Because this is done
// nativly async, while drop needs an BLOCK! Drop will recognis // nativly async, while drop needs an BLOCK! Drop will recognis
@ -452,7 +452,7 @@ impl Network {
.unwrap() .unwrap()
.send((pid, finished_sender)) .send((pid, finished_sender))
.await .await
.expect("something is wrong in internal scheduler coding"); .expect("Something is wrong in internal scheduler coding");
match finished_receiver.await { match finished_receiver.await {
Ok(Ok(())) => { Ok(Ok(())) => {
trace!(?pid, "Participant is now closed"); trace!(?pid, "Participant is now closed");
@ -618,13 +618,13 @@ impl Participant {
// TODO: not sure if we can paralise that, check in future // TODO: not sure if we can paralise that, check in future
let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await; let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await;
if self.closed.load(Ordering::Relaxed) { if self.closed.load(Ordering::Relaxed) {
warn!(?self.remote_pid, "participant is closed but another open is tried on it"); warn!(?self.remote_pid, "Participant is closed but another open is tried on it");
return Err(ParticipantError::ParticipantClosed); return Err(ParticipantError::ParticipantClosed);
} }
match stream_opened_receiver.next().await { match stream_opened_receiver.next().await {
Some(stream) => { Some(stream) => {
let sid = stream.sid; let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "receive opened stream"); debug!(?sid, ?self.remote_pid, "Receive opened stream");
Ok(stream) Ok(stream)
}, },
None => { None => {
@ -840,10 +840,10 @@ impl Stream {
impl Drop for Network { impl Drop for Network {
fn drop(&mut self) { fn drop(&mut self) {
let pid = self.local_pid; let pid = self.local_pid;
debug!(?pid, "shutting down Network"); debug!(?pid, "Shutting down Network");
debug!( debug!(
?pid, ?pid,
"shutting down Participants of Network, while we still have metrics" "Shutting down Participants of Network, while we still have metrics"
); );
task::block_on(async { task::block_on(async {
// we need to carefully shut down here! as otherwise we might call // we need to carefully shut down here! as otherwise we might call
@ -855,20 +855,20 @@ impl Drop for Network {
if let Err(e) = self.disconnect(p).await { if let Err(e) = self.disconnect(p).await {
error!( error!(
?e, ?e,
"error while dropping network, the error occured when dropping a \ "Error while dropping network, the error occured when dropping a \
participant but can't be notified to the user any more" participant but can't be notified to the user any more"
); );
} }
} }
self.participants.write().await.clear(); self.participants.write().await.clear();
}); });
debug!(?pid, "shutting down Scheduler"); debug!(?pid, "Shutting down Scheduler");
self.shutdown_sender self.shutdown_sender
.take() .take()
.unwrap() .unwrap()
.send(()) .send(())
.expect("scheduler is closed, but nobody other should be able to close it"); .expect("Scheduler is closed, but nobody other should be able to close it");
debug!(?pid, "participants have shut down!"); debug!(?pid, "Participants have shut down!");
} }
} }
@ -877,7 +877,7 @@ impl Drop for Participant {
// ignore closed, as we need to send it even though we disconnected the // ignore closed, as we need to send it even though we disconnected the
// participant from network // participant from network
let pid = self.remote_pid; let pid = self.remote_pid;
debug!(?pid, "shutting down Participant"); debug!(?pid, "Shutting down Participant");
match self.a2s_disconnect_s.take() { match self.a2s_disconnect_s.take() {
None => debug!( None => debug!(
?pid, ?pid,
@ -886,14 +886,14 @@ impl Drop for Participant {
Some(mut a2s_disconnect_s) => { Some(mut a2s_disconnect_s) => {
debug!( debug!(
?pid, ?pid,
"unclean shutdown detected, active waiting for client to be disconnected" "Unclean shutdown detected, active waiting for client to be disconnected"
); );
task::block_on(async { task::block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel(); let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s a2s_disconnect_s
.send((self.remote_pid, finished_sender)) .send((self.remote_pid, finished_sender))
.await .await
.expect("something is wrong in internal scheduler coding"); .expect("Something is wrong in internal scheduler coding");
match finished_receiver.await { match finished_receiver.await {
Ok(Err(e)) => error!( Ok(Err(e)) => error!(
?pid, ?pid,
@ -912,7 +912,7 @@ impl Drop for Participant {
}); });
}, },
} }
debug!(?pid, "network dropped"); debug!(?pid, "Network dropped");
} }
} }
@ -922,7 +922,7 @@ impl Drop for Stream {
if !self.closed.load(Ordering::Relaxed) { if !self.closed.load(Ordering::Relaxed) {
let sid = self.sid; let sid = self.sid;
let pid = self.pid; let pid = self.pid;
debug!(?pid, ?sid, "shutting down Stream"); debug!(?pid, ?sid, "Shutting down Stream");
if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() { if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() {
warn!( warn!(
"Other side got already dropped, probably due to timing, other side will \ "Other side got already dropped, probably due to timing, other side will \
@ -932,7 +932,7 @@ impl Drop for Stream {
} else { } else {
let sid = self.sid; let sid = self.sid;
let pid = self.pid; let pid = self.pid;
debug!(?pid, ?sid, "not needed"); debug!(?pid, ?sid, "Drop not needed");
} }
} }
} }
@ -1011,7 +1011,7 @@ impl core::fmt::Display for StreamError {
impl core::fmt::Display for ParticipantError { impl core::fmt::Display for ParticipantError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self { match self {
ParticipantError::ParticipantClosed => write!(f, "participant closed"), ParticipantError::ParticipantClosed => write!(f, "Participant closed"),
} }
} }
} }
@ -1019,10 +1019,10 @@ impl core::fmt::Display for ParticipantError {
impl core::fmt::Display for NetworkError { impl core::fmt::Display for NetworkError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self { match self {
NetworkError::NetworkClosed => write!(f, "network closed"), NetworkError::NetworkClosed => write!(f, "Network closed"),
NetworkError::ListenFailed(_) => write!(f, "listening failed"), NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
NetworkError::ConnectFailed(_) => write!(f, "connecting failed"), NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
NetworkError::GracefulDisconnectFailed(_) => write!(f, "graceful disconnect failed"), NetworkError::GracefulDisconnectFailed(_) => write!(f, "Graceful disconnect failed"),
} }
} }
} }

View File

@ -48,13 +48,13 @@ impl Channel {
//reapply leftovers from handshake //reapply leftovers from handshake
let cnt = leftover_cid_frame.len(); let cnt = leftover_cid_frame.len();
trace!(?self.cid, ?cnt, "reapplying leftovers"); trace!(?self.cid, ?cnt, "Reapplying leftovers");
for cid_frame in leftover_cid_frame.drain(..) { for cid_frame in leftover_cid_frame.drain(..) {
w2c_cid_frame_s.send(cid_frame).await.unwrap(); w2c_cid_frame_s.send(cid_frame).await.unwrap();
} }
trace!(?self.cid, ?cnt, "all leftovers reapplied"); trace!(?self.cid, ?cnt, "All leftovers reapplied");
trace!(?self.cid, "start up channel"); trace!(?self.cid, "Start up channel");
match protocol { match protocol {
Protocols::Tcp(tcp) => { Protocols::Tcp(tcp) => {
futures::join!( futures::join!(
@ -70,7 +70,7 @@ impl Channel {
}, },
} }
trace!(?self.cid, "shut down channel"); trace!(?self.cid, "Shut down channel");
} }
} }
@ -178,20 +178,20 @@ impl Handshake {
version, version,
}, },
)) => { )) => {
trace!(?magic_number, ?version, "recv handshake"); trace!(?magic_number, ?version, "Recv handshake");
self.metrics self.metrics
.frames_in_total .frames_in_total
.with_label_values(&["", &cid_string, "Handshake"]) .with_label_values(&["", &cid_string, "Handshake"])
.inc(); .inc();
if magic_number != VELOREN_MAGIC_NUMBER { if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "connection with invalid magic_number"); error!(?magic_number, "Connection with invalid magic_number");
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
self.metrics self.metrics
.frames_out_total .frames_out_total
.with_label_values(&["", &cid_string, "Raw"]) .with_label_values(&["", &cid_string, "Raw"])
.inc(); .inc();
debug!("sending client instructions before killing"); debug!("Sending client instructions before killing");
c2w_frame_s c2w_frame_s
.send(Frame::Raw(Self::WRONG_NUMBER.to_vec())) .send(Frame::Raw(Self::WRONG_NUMBER.to_vec()))
.await .await
@ -201,10 +201,10 @@ impl Handshake {
return Err(()); return Err(());
} }
if version != VELOREN_NETWORK_VERSION { if version != VELOREN_NETWORK_VERSION {
error!(?version, "connection with wrong network version"); error!(?version, "Connection with wrong network version");
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
debug!("sending client instructions before killing"); debug!("Sending client instructions before killing");
self.metrics self.metrics
.frames_out_total .frames_out_total
.with_label_values(&["", &cid_string, "Raw"]) .with_label_values(&["", &cid_string, "Raw"])
@ -227,7 +227,7 @@ impl Handshake {
} }
return Err(()); return Err(());
} }
debug!("handshake completed"); debug!("Handshake completed");
if self.init_handshake { if self.init_handshake {
self.send_init(&mut c2w_frame_s, &pid_string).await; self.send_init(&mut c2w_frame_s, &pid_string).await;
} else { } else {
@ -235,7 +235,7 @@ impl Handshake {
} }
}, },
Some((_, Frame::Shutdown)) => { Some((_, Frame::Shutdown)) => {
info!("shutdown signal received"); info!("Shutdown signal received");
self.metrics self.metrics
.frames_in_total .frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Shutdown"]) .with_label_values(&[&pid_string, &cid_string, "Shutdown"])
@ -277,11 +277,11 @@ impl Handshake {
self.send_init(&mut c2w_frame_s, &pid_string).await; self.send_init(&mut c2w_frame_s, &pid_string).await;
STREAM_ID_OFFSET2 STREAM_ID_OFFSET2
}; };
info!(?pid, "this Handshake is now configured!"); info!(?pid, "This Handshake is now configured!");
Ok((pid, stream_id_offset, secret)) Ok((pid, stream_id_offset, secret))
}, },
Some((_, Frame::Shutdown)) => { Some((_, Frame::Shutdown)) => {
info!("shutdown signal received"); info!("Shutdown signal received");
self.metrics self.metrics
.frames_in_total .frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Shutdown"]) .with_label_values(&[&pid_string, &cid_string, "Shutdown"])

View File

@ -67,7 +67,7 @@
//! let client = server_network.connected().await?; //! let client = server_network.connected().await?;
//! let mut stream = client.opened().await?; //! let mut stream = client.opened().await?;
//! let msg: String = stream.recv().await?; //! let msg: String = stream.recv().await?;
//! println!("got message: {}", msg); //! println!("Got message: {}", msg);
//! assert_eq!(msg, "Hello World"); //! assert_eq!(msg, "Hello World");
//! Ok(()) //! Ok(())
//! } //! }

View File

@ -50,50 +50,50 @@ impl NetworkMetrics {
let listen_requests_total = IntCounterVec::new( let listen_requests_total = IntCounterVec::new(
Opts::new( Opts::new(
"listen_requests_total", "listen_requests_total",
"shows the number of listen requests to the scheduler", "Shows the number of listen requests to the scheduler",
), ),
&["protocol"], &["protocol"],
)?; )?;
let connect_requests_total = IntCounterVec::new( let connect_requests_total = IntCounterVec::new(
Opts::new( Opts::new(
"connect_requests_total", "connect_requests_total",
"shows the number of connect requests to the scheduler", "Shows the number of connect requests to the scheduler",
), ),
&["protocol"], &["protocol"],
)?; )?;
let participants_connected_total = IntCounter::with_opts(Opts::new( let participants_connected_total = IntCounter::with_opts(Opts::new(
"participants_connected_total", "participants_connected_total",
"shows the number of participants connected to the network", "Shows the number of participants connected to the network",
))?; ))?;
let participants_disconnected_total = IntCounter::with_opts(Opts::new( let participants_disconnected_total = IntCounter::with_opts(Opts::new(
"participants_disconnected_total", "participants_disconnected_total",
"shows the number of participants disconnected to the network", "Shows the number of participants disconnected to the network",
))?; ))?;
let channels_connected_total = IntCounterVec::new( let channels_connected_total = IntCounterVec::new(
Opts::new( Opts::new(
"channels_connected_total", "channels_connected_total",
"number of all channels currently connected on the network", "Number of all channels currently connected on the network",
), ),
&["participant"], &["participant"],
)?; )?;
let channels_disconnected_total = IntCounterVec::new( let channels_disconnected_total = IntCounterVec::new(
Opts::new( Opts::new(
"channels_disconnected_total", "channels_disconnected_total",
"number of all channels currently disconnected on the network", "Number of all channels currently disconnected on the network",
), ),
&["participant"], &["participant"],
)?; )?;
let streams_opened_total = IntCounterVec::new( let streams_opened_total = IntCounterVec::new(
Opts::new( Opts::new(
"streams_opened_total", "streams_opened_total",
"number of all streams currently open on the network", "Number of all streams currently open on the network",
), ),
&["participant"], &["participant"],
)?; )?;
let streams_closed_total = IntCounterVec::new( let streams_closed_total = IntCounterVec::new(
Opts::new( Opts::new(
"streams_closed_total", "streams_closed_total",
"number of all streams currently open on the network", "Number of all streams currently open on the network",
), ),
&["participant"], &["participant"],
)?; )?;
@ -112,42 +112,42 @@ impl NetworkMetrics {
let frames_out_total = IntCounterVec::new( let frames_out_total = IntCounterVec::new(
Opts::new( Opts::new(
"frames_out_total", "frames_out_total",
"number of all frames send per channel, at the channel level", "Number of all frames send per channel, at the channel level",
), ),
&["participant", "channel", "frametype"], &["participant", "channel", "frametype"],
)?; )?;
let frames_in_total = IntCounterVec::new( let frames_in_total = IntCounterVec::new(
Opts::new( Opts::new(
"frames_in_total", "frames_in_total",
"number of all frames received per channel, at the channel level", "Number of all frames received per channel, at the channel level",
), ),
&["participant", "channel", "frametype"], &["participant", "channel", "frametype"],
)?; )?;
let frames_wire_out_total = IntCounterVec::new( let frames_wire_out_total = IntCounterVec::new(
Opts::new( Opts::new(
"frames_wire_out_total", "frames_wire_out_total",
"number of all frames send per channel, at the protocol level", "Number of all frames send per channel, at the protocol level",
), ),
&["channel", "frametype"], &["channel", "frametype"],
)?; )?;
let frames_wire_in_total = IntCounterVec::new( let frames_wire_in_total = IntCounterVec::new(
Opts::new( Opts::new(
"frames_wire_in_total", "frames_wire_in_total",
"number of all frames received per channel, at the protocol level", "Number of all frames received per channel, at the protocol level",
), ),
&["channel", "frametype"], &["channel", "frametype"],
)?; )?;
let wire_out_throughput = IntCounterVec::new( let wire_out_throughput = IntCounterVec::new(
Opts::new( Opts::new(
"wire_out_throughput", "wire_out_throughput",
"throupgput of all data frames send per channel, at the protocol level", "Throupgput of all data frames send per channel, at the protocol level",
), ),
&["channel"], &["channel"],
)?; )?;
let wire_in_throughput = IntCounterVec::new( let wire_in_throughput = IntCounterVec::new(
Opts::new( Opts::new(
"wire_in_throughput", "wire_in_throughput",
"throupgput of all data frames send per channel, at the protocol level", "Throupgput of all data frames send per channel, at the protocol level",
), ),
&["channel"], &["channel"],
)?; )?;
@ -155,7 +155,7 @@ impl NetworkMetrics {
let message_out_total = IntCounterVec::new( let message_out_total = IntCounterVec::new(
Opts::new( Opts::new(
"message_out_total", "message_out_total",
"number of messages send by streams on the network", "Number of messages send by streams on the network",
), ),
&["participant", "stream"], &["participant", "stream"],
)?; )?;
@ -163,28 +163,28 @@ impl NetworkMetrics {
let message_out_throughput = IntCounterVec::new( let message_out_throughput = IntCounterVec::new(
Opts::new( Opts::new(
"message_out_throughput", "message_out_throughput",
"throughput of messages send by streams on the network", "Throughput of messages send by streams on the network",
), ),
&["participant", "stream"], &["participant", "stream"],
)?; )?;
let queued_count = IntGaugeVec::new( let queued_count = IntGaugeVec::new(
Opts::new( Opts::new(
"queued_count", "queued_count",
"queued number of messages by participant on the network", "Queued number of messages by participant on the network",
), ),
&["channel"], &["channel"],
)?; )?;
let queued_bytes = IntGaugeVec::new( let queued_bytes = IntGaugeVec::new(
Opts::new( Opts::new(
"queued_bytes", "queued_bytes",
"queued bytes of messages by participant on the network", "Queued bytes of messages by participant on the network",
), ),
&["channel"], &["channel"],
)?; )?;
let participants_ping = IntGaugeVec::new( let participants_ping = IntGaugeVec::new(
Opts::new( Opts::new(
"participants_ping", "participants_ping",
"ping time to participants on the network", "Ping time to participants on the network",
), ),
&["channel"], &["channel"],
)?; )?;

View File

@ -187,7 +187,7 @@ impl BParticipant {
const FRAMES_PER_TICK: usize = 10005; const FRAMES_PER_TICK: usize = 10005;
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
let mut closing_up = false; let mut closing_up = false;
trace!("start send_mgr"); trace!("Start send_mgr");
let mut send_cache = let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
//while !self.closed.load(Ordering::Relaxed) { //while !self.closed.load(Ordering::Relaxed) {
@ -196,7 +196,7 @@ impl BParticipant {
prios.fill_frames(FRAMES_PER_TICK, &mut frames).await; prios.fill_frames(FRAMES_PER_TICK, &mut frames).await;
let len = frames.len(); let len = frames.len();
if len > 0 { if len > 0 {
trace!("tick {}", len); trace!("Tick {}", len);
} }
for (_, frame) in frames { for (_, frame) in frames {
self.send_frame(frame, &mut send_cache).await; self.send_frame(frame, &mut send_cache).await;
@ -216,7 +216,7 @@ impl BParticipant {
closing_up = true; closing_up = true;
} }
} }
trace!("stop send_mgr"); trace!("Stop send_mgr");
b2b_prios_flushed_s.send(()).unwrap(); b2b_prios_flushed_s.send(()).unwrap();
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
@ -242,13 +242,13 @@ impl BParticipant {
if let Err(e) = ci.b2w_frame_s.send(frame).await { if let Err(e) = ci.b2w_frame_s.send(frame).await {
warn!( warn!(
?e, ?e,
"the channel got closed unexpectedly, cleaning it up now." "The channel got closed unexpectedly, cleaning it up now."
); );
let ci = lock.remove(0); let ci = lock.remove(0);
if let Err(e) = ci.b2r_read_shutdown.send(()) { if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!( debug!(
?e, ?e,
"error shutdowning channel, which is prob fine as we detected it to no \ "Error shutdowning channel, which is prob fine as we detected it to no \
longer work in the first place" longer work in the first place"
); );
}; };
@ -270,7 +270,7 @@ impl BParticipant {
guard.0 = now; guard.0 = now;
let occurrences = guard.1 + 1; let occurrences = guard.1 + 1;
guard.1 = 0; guard.1 = 0;
error!(?occurrences, "participant has no channel to communicate on"); error!(?occurrences, "Participant has no channel to communicate on");
} else { } else {
guard.1 += 1; guard.1 += 1;
} }
@ -286,7 +286,7 @@ impl BParticipant {
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start handle_frames_mgr"); trace!("Start handle_frames_mgr");
let mut messages = HashMap::new(); let mut messages = HashMap::new();
let mut dropped_instant = Instant::now(); let mut dropped_instant = Instant::now();
let mut dropped_cnt = 0u64; let mut dropped_cnt = 0u64;
@ -310,7 +310,7 @@ impl BParticipant {
.create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s) .create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s)
.await; .await;
b2a_stream_opened_s.send(stream).await.unwrap(); b2a_stream_opened_s.send(stream).await.unwrap();
trace!("opened frame from remote"); trace!("Opened frame from remote");
}, },
Frame::CloseStream { sid } => { Frame::CloseStream { sid } => {
// Closing is realised by setting a AtomicBool to true, however we also have a // Closing is realised by setting a AtomicBool to true, however we also have a
@ -320,7 +320,7 @@ impl BParticipant {
// be dropped... from remote, notify local // be dropped... from remote, notify local
trace!( trace!(
?sid, ?sid,
"got remote request to close a stream, without flushing it, local \ "Got remote request to close a stream, without flushing it, local \
messages are dropped" messages are dropped"
); );
// no wait for flush here, as the remote wouldn't care anyway. // no wait for flush here, as the remote wouldn't care anyway.
@ -330,11 +330,11 @@ impl BParticipant {
.with_label_values(&[&self.remote_pid_string]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
si.closed.store(true, Ordering::Relaxed); si.closed.store(true, Ordering::Relaxed);
trace!(?sid, "closed stream from remote"); trace!(?sid, "Closed stream from remote");
} else { } else {
warn!( warn!(
?sid, ?sid,
"couldn't find stream to close, either this is a duplicate message, \ "Couldn't find stream to close, either this is a duplicate message, \
or the local copy of the Stream got closed simultaniously" or the local copy of the Stream got closed simultaniously"
); );
} }
@ -367,7 +367,7 @@ impl BParticipant {
warn!( warn!(
?e, ?e,
?mid, ?mid,
"dropping message, as streams seem to be in act of beeing \ "Dropping message, as streams seem to be in act of beeing \
dropped right now" dropped right now"
); );
} }
@ -379,7 +379,7 @@ impl BParticipant {
{ {
warn!( warn!(
?dropped_cnt, ?dropped_cnt,
"dropping multiple messages as stream no longer seems to \ "Dropping multiple messages as stream no longer seems to \
exist because it was dropped probably." exist because it was dropped probably."
); );
dropped_cnt = 0; dropped_cnt = 0;
@ -395,17 +395,17 @@ impl BParticipant {
debug!("Shutdown received from remote side"); debug!("Shutdown received from remote side");
self.close_participant(2).await; self.close_participant(2).await;
}, },
f => unreachable!("never reaches frame!: {:?}", f), f => unreachable!("Frame should never reache participant!: {:?}", f),
} }
} }
if dropped_cnt > 0 { if dropped_cnt > 0 {
warn!( warn!(
?dropped_cnt, ?dropped_cnt,
"dropping multiple messages as stream no longer seems to exist because it was \ "Dropping multiple messages as stream no longer seems to exist because it was \
dropped probably." dropped probably."
); );
} }
trace!("stop handle_frames_mgr"); trace!("Stop handle_frames_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
@ -422,7 +422,7 @@ impl BParticipant {
w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start create_channel_mgr"); trace!("Start create_channel_mgr");
s2b_create_channel_r s2b_create_channel_r
.for_each_concurrent( .for_each_concurrent(
None, None,
@ -444,7 +444,7 @@ impl BParticipant {
.channels_connected_total .channels_connected_total
.with_label_values(&[&self.remote_pid_string]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
trace!(?cid, "running channel in participant"); trace!(?cid, "Running channel in participant");
channel channel
.run(protocol, w2b_frames_s, leftover_cid_frame) .run(protocol, w2b_frames_s, leftover_cid_frame)
.await; .await;
@ -452,12 +452,12 @@ impl BParticipant {
.channels_disconnected_total .channels_disconnected_total
.with_label_values(&[&self.remote_pid_string]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
trace!(?cid, "channel got closed"); trace!(?cid, "Channel got closed");
} }
}, },
) )
.await; .await;
trace!("stop create_channel_mgr"); trace!("Stop create_channel_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
@ -469,7 +469,7 @@ impl BParticipant {
shutdown_open_mgr_receiver: oneshot::Receiver<()>, shutdown_open_mgr_receiver: oneshot::Receiver<()>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start open_mgr"); trace!("Start open_mgr");
let mut stream_ids = self.offset_sid; let mut stream_ids = self.offset_sid;
let mut send_cache = let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
@ -479,7 +479,7 @@ impl BParticipant {
next = a2b_steam_open_r.next().fuse() => next, next = a2b_steam_open_r.next().fuse() => next,
_ = shutdown_open_mgr_receiver => None, _ = shutdown_open_mgr_receiver => None,
} { } {
debug!(?prio, ?promises, "got request to open a new steam"); debug!(?prio, ?promises, "Got request to open a new steam");
let a2p_msg_s = a2p_msg_s.clone(); let a2p_msg_s = a2p_msg_s.clone();
let sid = stream_ids; let sid = stream_ids;
let stream = self let stream = self
@ -502,7 +502,7 @@ impl BParticipant {
stream_ids += Sid::from(1); stream_ids += Sid::from(1);
} }
} }
trace!("stop open_mgr"); trace!("Stop open_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
@ -515,11 +515,11 @@ impl BParticipant {
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>, s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start participant_shutdown_mgr"); trace!("Start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap(); let sender = s2b_shutdown_bparticipant_r.await.unwrap();
self.close_participant(1).await; self.close_participant(1).await;
sender.send(Ok(())).unwrap(); sender.send(Ok(())).unwrap();
trace!("stop participant_shutdown_mgr"); trace!("Stop participant_shutdown_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
@ -530,7 +530,7 @@ impl BParticipant {
b2p_notify_empty_stream_s: crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>, b2p_notify_empty_stream_s: crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start stream_close_mgr"); trace!("Start stream_close_mgr");
let mut send_cache = let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse();
@ -542,7 +542,7 @@ impl BParticipant {
} { } {
//TODO: make this concurrent! //TODO: make this concurrent!
//TODO: Performance, closing is slow! //TODO: Performance, closing is slow!
trace!(?sid, "got request from api to close steam"); trace!(?sid, "Got request from api to close steam");
//This needs to first stop clients from sending any more. //This needs to first stop clients from sending any more.
//Then it will wait for all pending messages (in prio) to be send to the //Then it will wait for all pending messages (in prio) to be send to the
// protocol After this happened the stream is closed // protocol After this happened the stream is closed
@ -550,24 +550,24 @@ impl BParticipant {
// frame! If we would send it before, all followup messages couldn't // frame! If we would send it before, all followup messages couldn't
// be handled at the remote side. // be handled at the remote side.
trace!(?sid, "stopping api to use this stream"); trace!(?sid, "Stopping api to use this stream");
match self.streams.read().await.get(&sid) { match self.streams.read().await.get(&sid) {
Some(si) => { Some(si) => {
si.closed.store(true, Ordering::Relaxed); si.closed.store(true, Ordering::Relaxed);
}, },
None => warn!("couldn't find the stream, might be simulanious close from remote"), None => warn!("Couldn't find the stream, might be simulanious close from remote"),
} }
//TODO: what happens if RIGHT NOW the remote sends a StreamClose and this //TODO: what happens if RIGHT NOW the remote sends a StreamClose and this
// streams get closed and removed? RACE CONDITION // streams get closed and removed? RACE CONDITION
trace!(?sid, "wait for stream to be flushed"); trace!(?sid, "Wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel(); let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
b2p_notify_empty_stream_s b2p_notify_empty_stream_s
.send((sid, s2b_stream_finished_closed_s)) .send((sid, s2b_stream_finished_closed_s))
.unwrap(); .unwrap();
s2b_stream_finished_closed_r.await.unwrap(); s2b_stream_finished_closed_r.await.unwrap();
trace!(?sid, "stream was successfully flushed"); trace!(?sid, "Stream was successfully flushed");
self.metrics self.metrics
.streams_closed_total .streams_closed_total
.with_label_values(&[&self.remote_pid_string]) .with_label_values(&[&self.remote_pid_string])
@ -577,7 +577,7 @@ impl BParticipant {
self.send_frame(Frame::CloseStream { sid }, &mut send_cache) self.send_frame(Frame::CloseStream { sid }, &mut send_cache)
.await; .await;
} }
trace!("stop stream_close_mgr"); trace!("Stop stream_close_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
@ -617,7 +617,7 @@ impl BParticipant {
/// allowed_managers: the number of open managers to sleep on. Must be 1 for /// allowed_managers: the number of open managers to sleep on. Must be 1 for
/// shutdown_mgr and 2 if it comes from a send error. /// shutdown_mgr and 2 if it comes from a send error.
async fn close_participant(&self, allowed_managers: usize) { async fn close_participant(&self, allowed_managers: usize) {
trace!("participant shutdown triggered"); trace!("Participant shutdown triggered");
let mut info = match self.shutdown_info.lock().await.take() { let mut info = match self.shutdown_info.lock().await.take() {
Some(info) => info, Some(info) => info,
None => { None => {
@ -628,23 +628,23 @@ impl BParticipant {
return; return;
}, },
}; };
debug!("closing all managers"); debug!("Closing all managers");
for sender in info.mgr_to_shutdown.drain(..) { for sender in info.mgr_to_shutdown.drain(..) {
if let Err(e) = sender.send(()) { if let Err(e) = sender.send(()) {
warn!(?e, "manager seems to be closed already, weird, maybe a bug"); warn!(?e, "Manager seems to be closed already, weird, maybe a bug");
}; };
} }
debug!("closing all streams"); debug!("Closing all streams");
for (sid, si) in self.streams.write().await.drain() { for (sid, si) in self.streams.write().await.drain() {
trace!(?sid, "shutting down Stream"); trace!(?sid, "Shutting down Stream");
si.closed.store(true, Ordering::Relaxed); si.closed.store(true, Ordering::Relaxed);
} }
debug!("waiting for prios to be flushed"); debug!("Waiting for prios to be flushed");
info.b2b_prios_flushed_r.await.unwrap(); info.b2b_prios_flushed_r.await.unwrap();
debug!("closing all channels"); debug!("Closing all channels");
for ci in self.channels.write().await.drain(..) { for ci in self.channels.write().await.drain(..) {
if let Err(e) = ci.b2r_read_shutdown.send(()) { if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact");
}; };
} }
//Wait for other bparticipants mgr to close via AtomicUsize //Wait for other bparticipants mgr to close via AtomicUsize
@ -656,14 +656,14 @@ impl BParticipant {
if i.rem_euclid(10) == 1 { if i.rem_euclid(10) == 1 {
trace!( trace!(
?allowed_managers, ?allowed_managers,
"waiting for bparticipant mgr to shut down, remaining {}", "Waiting for bparticipant mgr to shut down, remaining {}",
self.running_mgr.load(Ordering::Relaxed) - allowed_managers self.running_mgr.load(Ordering::Relaxed) - allowed_managers
); );
} }
async_std::task::sleep(SLEEP_TIME * i).await; async_std::task::sleep(SLEEP_TIME * i).await;
} }
trace!("all bparticipant mgr (except me) are shut down now"); trace!("All BParticipant mgr (except me) are shut down now");
self.metrics.participants_disconnected_total.inc(); self.metrics.participants_disconnected_total.inc();
debug!("bparticipant close done"); debug!("BParticipant close done");
} }
} }

View File

@ -265,7 +265,7 @@ impl PrioManager {
} }
//decrease pid_sid counter by 1 again //decrease pid_sid counter by 1 again
let cnt = self.sid_owned.get_mut(&sid).expect( let cnt = self.sid_owned.get_mut(&sid).expect(
"the pid_sid_owned counter works wrong, more pid,sid removed than \ "The pid_sid_owned counter works wrong, more pid,sid removed than \
inserted", inserted",
); );
cnt.len -= 1; cnt.len -= 1;
@ -276,7 +276,7 @@ impl PrioManager {
} }
} }
} else { } else {
trace!(?msg.mid, "repush message"); trace!(?msg.mid, "Repush message");
self.messages[prio as usize].push_front((sid, msg)); self.messages[prio as usize].push_front((sid, msg));
} }
}, },
@ -358,28 +358,28 @@ mod tests {
fn assert_header(frames: &mut VecDeque<(Sid, Frame)>, f_sid: u64, f_length: u64) { fn assert_header(frames: &mut VecDeque<(Sid, Frame)>, f_sid: u64, f_length: u64) {
let frame = frames let frame = frames
.pop_front() .pop_front()
.expect("frames vecdeque doesn't contain enough frames!") .expect("Frames vecdeque doesn't contain enough frames!")
.1; .1;
if let Frame::DataHeader { mid, sid, length } = frame { if let Frame::DataHeader { mid, sid, length } = frame {
assert_eq!(mid, 1); assert_eq!(mid, 1);
assert_eq!(sid, Sid::new(f_sid)); assert_eq!(sid, Sid::new(f_sid));
assert_eq!(length, f_length); assert_eq!(length, f_length);
} else { } else {
panic!("wrong frame type!, expected DataHeader"); panic!("Wrong frame type!, expected DataHeader");
} }
} }
fn assert_data(frames: &mut VecDeque<(Sid, Frame)>, f_start: u64, f_data: Vec<u8>) { fn assert_data(frames: &mut VecDeque<(Sid, Frame)>, f_start: u64, f_data: Vec<u8>) {
let frame = frames let frame = frames
.pop_front() .pop_front()
.expect("frames vecdeque doesn't contain enough frames!") .expect("Frames vecdeque doesn't contain enough frames!")
.1; .1;
if let Frame::Data { mid, start, data } = frame { if let Frame::Data { mid, start, data } = frame {
assert_eq!(mid, 1); assert_eq!(mid, 1);
assert_eq!(start, f_start); assert_eq!(start, f_start);
assert_eq!(data, f_data); assert_eq!(data, f_data);
} else { } else {
panic!("wrong frame type!, expected Data"); panic!("Wrong frame type!, expected Data");
} }
} }

View File

@ -68,7 +68,7 @@ impl TcpProtocol {
if let Err(e) = stream.read_exact(&mut bytes).await { if let Err(e) = stream.read_exact(&mut bytes).await {
warn!( warn!(
?e, ?e,
"closing tcp protocol due to read error, sending close frame to gracefully \ "Closing tcp protocol due to read error, sending close frame to gracefully \
shutdown" shutdown"
); );
w2c_cid_frame_s w2c_cid_frame_s
@ -84,7 +84,7 @@ impl TcpProtocol {
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
end_receiver: oneshot::Receiver<()>, end_receiver: oneshot::Receiver<()>,
) { ) {
trace!("starting up tcp read()"); trace!("Starting up tcp read()");
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid); let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
let throughput_cache = self let throughput_cache = self
.metrics .metrics
@ -100,7 +100,7 @@ impl TcpProtocol {
_ = end_receiver => break, _ = end_receiver => break,
}; };
if r.is_err() { if r.is_err() {
info!("tcp stream closed, shutting down read"); info!("Tcp stream closed, shutting down read");
break; break;
} }
let frame_no = bytes[0]; let frame_no = bytes[0];
@ -210,7 +210,7 @@ impl TcpProtocol {
.await .await
.expect("Channel or Participant seems no longer to exist"); .expect("Channel or Participant seems no longer to exist");
} }
trace!("shutting down tcp read()"); trace!("Shutting down tcp read()");
} }
/// read_except and if it fails, close the protocol /// read_except and if it fails, close the protocol
@ -223,7 +223,7 @@ impl TcpProtocol {
Err(e) => { Err(e) => {
warn!( warn!(
?e, ?e,
"got an error writing to tcp, going to close this channel" "Got an error writing to tcp, going to close this channel"
); );
to_wire_receiver.close(); to_wire_receiver.close();
true true
@ -236,7 +236,7 @@ impl TcpProtocol {
// Limites Throughput per single Receiver but stays in same thread (maybe as its // Limites Throughput per single Receiver but stays in same thread (maybe as its
// in a threadpool) for TCP, UDP and MPSC // in a threadpool) for TCP, UDP and MPSC
pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver<Frame>) { pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver<Frame>) {
trace!("starting up tcp write()"); trace!("Starting up tcp write()");
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid);
let throughput_cache = self let throughput_cache = self
@ -404,7 +404,7 @@ impl UdpProtocol {
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
end_receiver: oneshot::Receiver<()>, end_receiver: oneshot::Receiver<()>,
) { ) {
trace!("starting up udp read()"); trace!("Starting up udp read()");
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid); let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
let throughput_cache = self let throughput_cache = self
.metrics .metrics
@ -416,7 +416,7 @@ impl UdpProtocol {
r = data_in.next().fuse() => r, r = data_in.next().fuse() => r,
_ = end_receiver => None, _ = end_receiver => None,
} { } {
trace!("got raw UDP message with len: {}", bytes.len()); trace!("Got raw UDP message with len: {}", bytes.len());
let frame_no = bytes[0]; let frame_no = bytes[0];
let frame = match frame_no { let frame = match frame_no {
FRAME_HANDSHAKE => { FRAME_HANDSHAKE => {
@ -511,11 +511,11 @@ impl UdpProtocol {
metrics_cache.with_label_values(&frame).inc(); metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, frame)).await.unwrap(); w2c_cid_frame_s.send((cid, frame)).await.unwrap();
} }
trace!("shutting down udp read()"); trace!("Shutting down udp read()");
} }
pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver<Frame>) { pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver<Frame>) {
trace!("starting up udp write()"); trace!("Starting up udp write()");
let mut buffer = [0u8; 2000]; let mut buffer = [0u8; 2000];
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid);
let throughput_cache = self let throughput_cache = self
@ -588,7 +588,7 @@ impl UdpProtocol {
}; };
let mut start = 0; let mut start = 0;
while start < len { while start < len {
trace!(?start, ?len, "splitting up udp frame in multiple packages"); trace!(?start, ?len, "Splitting up udp frame in multiple packages");
match self match self
.socket .socket
.send_to(&buffer[start..len], self.remote_addr) .send_to(&buffer[start..len], self.remote_addr)
@ -603,10 +603,10 @@ impl UdpProtocol {
); );
} }
}, },
Err(e) => error!(?e, "need to handle that error!"), Err(e) => error!(?e, "Need to handle that error!"),
} }
} }
} }
trace!("shutting down udp write()"); trace!("Shutting down udp write()");
} }
} }

View File

@ -158,13 +158,13 @@ impl Scheduler {
&self, &self,
a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>, a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
) { ) {
trace!("start listen_mgr"); trace!("Start listen_mgr");
a2s_listen_r a2s_listen_r
.for_each_concurrent(None, |(address, s2a_listen_result_s)| { .for_each_concurrent(None, |(address, s2a_listen_result_s)| {
let address = address; let address = address;
async move { async move {
debug!(?address, "got request to open a channel_creator"); debug!(?address, "Got request to open a channel_creator");
self.metrics self.metrics
.listen_requests_total .listen_requests_total
.with_label_values(&[match address { .with_label_values(&[match address {
@ -183,7 +183,7 @@ impl Scheduler {
} }
}) })
.await; .await;
trace!("stop listen_mgr"); trace!("Stop listen_mgr");
} }
async fn connect_mgr( async fn connect_mgr(
@ -193,7 +193,7 @@ impl Scheduler {
oneshot::Sender<io::Result<Participant>>, oneshot::Sender<io::Result<Participant>>,
)>, )>,
) { ) {
trace!("start connect_mgr"); trace!("Start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.next().await { while let Some((addr, pid_sender)) = a2s_connect_r.next().await {
let (protocol, handshake) = match addr { let (protocol, handshake) = match addr {
Address::Tcp(addr) => { Address::Tcp(addr) => {
@ -249,7 +249,7 @@ impl Scheduler {
self.init_protocol(protocol, Some(pid_sender), handshake) self.init_protocol(protocol, Some(pid_sender), handshake)
.await; .await;
} }
trace!("stop connect_mgr"); trace!("Stop connect_mgr");
} }
async fn disconnect_mgr( async fn disconnect_mgr(
@ -259,14 +259,14 @@ impl Scheduler {
oneshot::Sender<async_std::io::Result<()>>, oneshot::Sender<async_std::io::Result<()>>,
)>, )>,
) { ) {
trace!("start disconnect_mgr"); trace!("Start disconnect_mgr");
while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await { while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await {
//Closing Participants is done the following way: //Closing Participants is done the following way:
// 1. We drop our senders and receivers // 1. We drop our senders and receivers
// 2. we need to close BParticipant, this will drop its senderns and receivers // 2. we need to close BParticipant, this will drop its senderns and receivers
// 3. Participant will try to access the BParticipant senders and receivers with // 3. Participant will try to access the BParticipant senders and receivers with
// their next api action, it will fail and be closed then. // their next api action, it will fail and be closed then.
trace!(?pid, "got request to close participant"); trace!(?pid, "Got request to close participant");
if let Some(mut pi) = self.participants.write().await.remove(&pid) { if let Some(mut pi) = self.participants.write().await.remove(&pid) {
let (finished_sender, finished_receiver) = oneshot::channel(); let (finished_sender, finished_receiver) = oneshot::channel();
pi.s2b_shutdown_bparticipant_s pi.s2b_shutdown_bparticipant_s
@ -278,36 +278,36 @@ impl Scheduler {
let e = finished_receiver.await.unwrap(); let e = finished_receiver.await.unwrap();
return_once_successful_shutdown.send(e).unwrap(); return_once_successful_shutdown.send(e).unwrap();
} else { } else {
debug!(?pid, "looks like participant is already dropped"); debug!(?pid, "Looks like participant is already dropped");
return_once_successful_shutdown.send(Ok(())).unwrap(); return_once_successful_shutdown.send(Ok(())).unwrap();
} }
trace!(?pid, "closed participant"); trace!(?pid, "Closed participant");
} }
trace!("stop disconnect_mgr"); trace!("Stop disconnect_mgr");
} }
async fn prio_adj_mgr( async fn prio_adj_mgr(
&self, &self,
mut b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>, mut b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>,
) { ) {
trace!("start prio_adj_mgr"); trace!("Start prio_adj_mgr");
while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.next().await { while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.next().await {
//TODO adjust prios in participants here! //TODO adjust prios in participants here!
} }
trace!("stop prio_adj_mgr"); trace!("Stop prio_adj_mgr");
} }
async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) { async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) {
trace!("start scheduler_shutdown_mgr"); trace!("Start scheduler_shutdown_mgr");
a2s_scheduler_shutdown_r.await.unwrap(); a2s_scheduler_shutdown_r.await.unwrap();
self.closed.store(true, Ordering::Relaxed); self.closed.store(true, Ordering::Relaxed);
debug!("shutting down all BParticipants gracefully"); debug!("Shutting down all BParticipants gracefully");
let mut participants = self.participants.write().await; let mut participants = self.participants.write().await;
let waitings = participants let waitings = participants
.drain() .drain()
.map(|(pid, mut pi)| { .map(|(pid, mut pi)| {
trace!(?pid, "shutting down BParticipants"); trace!(?pid, "Shutting down BParticipants");
let (finished_sender, finished_receiver) = oneshot::channel(); let (finished_sender, finished_receiver) = oneshot::channel();
pi.s2b_shutdown_bparticipant_s pi.s2b_shutdown_bparticipant_s
.take() .take()
@ -317,13 +317,13 @@ impl Scheduler {
(pid, finished_receiver) (pid, finished_receiver)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
debug!("wait for partiticipants to be shut down"); debug!("Wait for partiticipants to be shut down");
for (pid, recv) in waitings { for (pid, recv) in waitings {
if let Err(e) = recv.await { if let Err(e) = recv.await {
error!( error!(
?pid, ?pid,
?e, ?e,
"failed to finish sending all remainding messages to participant when \ "Failed to finish sending all remainding messages to participant when \
shutting down" shutting down"
); );
}; };
@ -332,7 +332,7 @@ impl Scheduler {
// some mgr: // some mgr:
self.participant_channels.lock().await.take(); self.participant_channels.lock().await.take();
trace!("stop scheduler_shutdown_mgr"); trace!("Stop scheduler_shutdown_mgr");
} }
async fn channel_creator( async fn channel_creator(
@ -341,7 +341,7 @@ impl Scheduler {
s2s_stop_listening_r: oneshot::Receiver<()>, s2s_stop_listening_r: oneshot::Receiver<()>,
s2a_listen_result_s: oneshot::Sender<io::Result<()>>, s2a_listen_result_s: oneshot::Sender<io::Result<()>>,
) { ) {
trace!(?addr, "start up channel creator"); trace!(?addr, "Start up channel creator");
match addr { match addr {
Address::Tcp(addr) => { Address::Tcp(addr) => {
let listener = match net::TcpListener::bind(addr).await { let listener = match net::TcpListener::bind(addr).await {
@ -353,13 +353,13 @@ impl Scheduler {
info!( info!(
?addr, ?addr,
?e, ?e,
"listener couldn't be started due to error on tcp bind" "Listener couldn't be started due to error on tcp bind"
); );
s2a_listen_result_s.send(Err(e)).unwrap(); s2a_listen_result_s.send(Err(e)).unwrap();
return; return;
}, },
}; };
trace!(?addr, "listener bound"); trace!(?addr, "Listener bound");
let mut incoming = listener.incoming(); let mut incoming = listener.incoming();
let mut end_receiver = s2s_stop_listening_r.fuse(); let mut end_receiver = s2s_stop_listening_r.fuse();
while let Some(stream) = select! { while let Some(stream) = select! {
@ -383,13 +383,13 @@ impl Scheduler {
info!( info!(
?addr, ?addr,
?e, ?e,
"listener couldn't be started due to error on udp bind" "Listener couldn't be started due to error on udp bind"
); );
s2a_listen_result_s.send(Err(e)).unwrap(); s2a_listen_result_s.send(Err(e)).unwrap();
return; return;
}, },
}; };
trace!(?addr, "listener bound"); trace!(?addr, "Listener bound");
// receiving is done from here and will be piped to protocol as UDP does not // receiving is done from here and will be piped to protocol as UDP does not
// have any state // have any state
let mut listeners = HashMap::new(); let mut listeners = HashMap::new();
@ -424,7 +424,7 @@ impl Scheduler {
}, },
_ => unimplemented!(), _ => unimplemented!(),
} }
trace!(?addr, "ending channel creator"); trace!(?addr, "Ending channel creator");
} }
async fn udp_single_channel_connect( async fn udp_single_channel_connect(
@ -432,7 +432,7 @@ impl Scheduler {
mut w2p_udp_package_s: mpsc::UnboundedSender<Vec<u8>>, mut w2p_udp_package_s: mpsc::UnboundedSender<Vec<u8>>,
) { ) {
let addr = socket.local_addr(); let addr = socket.local_addr();
trace!(?addr, "start udp_single_channel_connect"); trace!(?addr, "Start udp_single_channel_connect");
//TODO: implement real closing //TODO: implement real closing
let (_end_sender, end_receiver) = oneshot::channel::<()>(); let (_end_sender, end_receiver) = oneshot::channel::<()>();
@ -448,7 +448,7 @@ impl Scheduler {
datavec.extend_from_slice(&data[0..size]); datavec.extend_from_slice(&data[0..size]);
w2p_udp_package_s.send(datavec).await.unwrap(); w2p_udp_package_s.send(datavec).await.unwrap();
} }
trace!(?addr, "stop udp_single_channel_connect"); trace!(?addr, "Stop udp_single_channel_connect");
} }
async fn init_protocol( async fn init_protocol(
@ -477,7 +477,7 @@ impl Scheduler {
// this is necessary for UDP to work at all and to remove code duplication // this is necessary for UDP to work at all and to remove code duplication
self.pool.spawn_ok( self.pool.spawn_ok(
async move { async move {
trace!(?cid, "open channel and be ready for Handshake"); trace!(?cid, "Open channel and be ready for Handshake");
let handshake = Handshake::new( let handshake = Handshake::new(
cid, cid,
local_pid, local_pid,
@ -490,11 +490,11 @@ impl Scheduler {
trace!( trace!(
?cid, ?cid,
?pid, ?pid,
"detected that my channel is ready!, activating it :)" "Detected that my channel is ready!, activating it :)"
); );
let mut participants = participants.write().await; let mut participants = participants.write().await;
if !participants.contains_key(&pid) { if !participants.contains_key(&pid) {
debug!(?cid, "new participant connected via a channel"); debug!(?cid, "New participant connected via a channel");
let ( let (
bparticipant, bparticipant,
a2b_steam_open_s, a2b_steam_open_s,
@ -557,7 +557,7 @@ impl Scheduler {
?secret, ?secret,
"Detected incompatible Secret!, this is probably an attack!" "Detected incompatible Secret!, this is probably an attack!"
); );
error!("just dropping here, TODO handle this correctly!"); error!("Just dropping here, TODO handle this correctly!");
//TODO //TODO
if let Some(pid_oneshot) = s2a_return_pid_s { if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
@ -571,7 +571,7 @@ impl Scheduler {
return; return;
} }
error!( error!(
"ufff i cant answer the pid_oneshot. as i need to create the SAME \ "Ufff i cant answer the pid_oneshot. as i need to create the SAME \
participant. maybe switch to ARC" participant. maybe switch to ARC"
); );
} }
@ -584,7 +584,7 @@ impl Scheduler {
pid_oneshot pid_oneshot
.send(Err(std::io::Error::new( .send(Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied, std::io::ErrorKind::PermissionDenied,
"handshake failed, denying connection", "Handshake failed, denying connection",
))) )))
.unwrap(); .unwrap();
} }