add a try_recv fn to Stream which is NOT async

This commit is contained in:
Marcel Märtens 2020-10-15 13:22:34 +02:00
parent ab411406b0
commit 572b83e262
2 changed files with 85 additions and 1 deletions

View File

@ -824,7 +824,7 @@ impl Stream {
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
/// //Recv Message
/// println!("{}", stream_a.recv::<String>().await?);
/// # Ok(())
/// })
@ -849,6 +849,50 @@ impl Stream {
let msg = self.b2a_msg_recv_r.next().await?;
Ok(msg.buffer)
}
/// use `try_recv` to check for a Message send from the remote side by their
/// `Stream`. This function does not block and returns immediately. It's
/// intended for use in non-async context only. Other then that, the
/// same rules apply than for [`recv`].
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # stream_p.send("Hello World");
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Try Recv Message
/// println!("{:?}", stream_a.try_recv::<String>()?);
/// # Ok(())
/// })
/// # }
/// ```
#[inline]
pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
match self.b2a_msg_recv_r.try_next() {
Err(_) => Ok(None),
Ok(None) => Err(StreamError::StreamClosed),
Ok(Some(msg)) => Ok(Some(message::deserialize::<M>(
msg.buffer,
#[cfg(feature = "compression")]
self.promises.contains(Promises::COMPRESSED),
)?)),
}
}
}
///

View File

@ -23,6 +23,16 @@ fn stream_simple() {
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
}
#[test]
fn stream_try_recv() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(4242u32).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv(), Ok(Some(4242u32)));
}
#[test]
fn stream_simple_3msg() {
let (_, _) = helper::setup(false, 0);
@ -182,3 +192,33 @@ fn wrong_parse() {
_ => panic!("this should fail, but it doesnt!"),
}
}
#[test]
fn multiple_try_recv() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send("asd").unwrap();
s1_a.send(11u32).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv(), Ok(Some("asd".to_string())));
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(11u32)));
assert_eq!(s1_b.try_recv::<String>(), Ok(None));
drop(s1_a);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
}
#[test]
fn dont_panic_on_multiply_recv_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(11u32).unwrap();
drop(s1_a);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(11u32)));
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
}