diff --git a/network/src/api.rs b/network/src/api.rs index 972c71b400..be4a7118c9 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -824,7 +824,7 @@ impl Stream { /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; - /// //Send Message + /// //Recv Message /// println!("{}", stream_a.recv::().await?); /// # Ok(()) /// }) @@ -849,6 +849,50 @@ impl Stream { let msg = self.b2a_msg_recv_r.next().await?; Ok(msg.buffer) } + + /// use `try_recv` to check for a Message send from the remote side by their + /// `Stream`. This function does not block and returns immediately. It's + /// intended for use in non-async context only. Other then that, the + /// same rules apply than for [`recv`]. + /// + /// # Example + /// ``` + /// use veloren_network::{Network, ProtocolAddr, Pid}; + /// # use veloren_network::Promises; + /// use futures::executor::block_on; + /// + /// # fn main() -> std::result::Result<(), Box> { + /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it + /// let (network, f) = Network::new(Pid::new()); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new()); + /// # std::thread::spawn(fr); + /// block_on(async { + /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; + /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; + /// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # stream_p.send("Hello World"); + /// # std::thread::sleep(std::time::Duration::from_secs(1)); + /// let participant_a = network.connected().await?; + /// let mut stream_a = participant_a.opened().await?; + /// //Try Recv Message + /// println!("{:?}", stream_a.try_recv::()?); + /// # Ok(()) + /// }) + /// # } + /// ``` + #[inline] + pub fn try_recv(&mut self) -> Result, StreamError> { + match self.b2a_msg_recv_r.try_next() { + Err(_) => Ok(None), + Ok(None) => Err(StreamError::StreamClosed), + Ok(Some(msg)) => Ok(Some(message::deserialize::( + msg.buffer, + #[cfg(feature = "compression")] + self.promises.contains(Promises::COMPRESSED), + )?)), + } + } } /// diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 30e1f79fd6..58088afed7 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -23,6 +23,16 @@ fn stream_simple() { assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); } +#[test] +fn stream_try_recv() { + let (_, _) = helper::setup(false, 0); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + + s1_a.send(4242u32).unwrap(); + std::thread::sleep(std::time::Duration::from_secs(1)); + assert_eq!(s1_b.try_recv(), Ok(Some(4242u32))); +} + #[test] fn stream_simple_3msg() { let (_, _) = helper::setup(false, 0); @@ -182,3 +192,33 @@ fn wrong_parse() { _ => panic!("this should fail, but it doesnt!"), } } + +#[test] +fn multiple_try_recv() { + let (_, _) = helper::setup(false, 0); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + + s1_a.send("asd").unwrap(); + s1_a.send(11u32).unwrap(); + std::thread::sleep(std::time::Duration::from_secs(1)); + assert_eq!(s1_b.try_recv(), Ok(Some("asd".to_string()))); + assert_eq!(s1_b.try_recv::(), Ok(Some(11u32))); + assert_eq!(s1_b.try_recv::(), Ok(None)); + + drop(s1_a); + std::thread::sleep(std::time::Duration::from_secs(1)); + assert_eq!(s1_b.try_recv::(), Err(StreamError::StreamClosed)); +} + +#[test] +fn dont_panic_on_multiply_recv_after_close() { + let (_, _) = helper::setup(false, 0); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + + s1_a.send(11u32).unwrap(); + drop(s1_a); + std::thread::sleep(std::time::Duration::from_secs(1)); + assert_eq!(s1_b.try_recv::(), Ok(Some(11u32))); + assert_eq!(s1_b.try_recv::(), Err(StreamError::StreamClosed)); + assert_eq!(s1_b.try_recv::(), Err(StreamError::StreamClosed)); +}