- now last digit version is compatible 0.6.0 will connect to 0.6.1 - the TCP DATA Frames no longer contain START field, as it's not needed - the TCP OPENSTREAM Frames will now contain the BANDWIDTH field - MID is not Protocol internal Update network - update API with Bandwidth Update veloren - introduce better runtime and `async` things that are IO bound. - Remove `uvth` and instead use `tokio::runtime::Runtime::spawn_blocking` - remove futures_execute from client and server use tokio::runtime::Runtime instead - give threads a Name
//!run with
//! ```bash
//! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006
//! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006 --mode=client
//! ```
use clap::{App, Arg};
use std::{sync::Arc, thread, time::Duration};
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock};
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr};
///This example contains a simple chatserver, that allows to send messages
/// between participants, it's neither pretty nor perfect, but it should show
/// how to integrate network
fn main() {
let matches = App::new("Chat example")
.author("Marcel Märtens <marcel.cochem@googlemail.com>")
.about("example chat implemented with veloren-network")
.possible_values(&["server", "client", "both"])
"choose whether you want to start the server or client or both needed for \
this program",
.help("port to listen on"),
.help("ip to listen and connect to"),
.possible_values(&["tcp", "upd", "mpsc"])
"underlying protocol used for this test, mpsc can only combined with mode=both",
.possible_values(&["trace", "debug", "info", "warn", "error"])
.help("set trace level, not this has a performance impact!"),
let trace = matches.value_of("trace").unwrap();
let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap());
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
let ip: &str = matches.value_of("ip").unwrap();
let address = match matches.value_of("protocol") {
Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
_ => panic!("invalid mode, run --help!"),
let mut background = None;
match matches.value_of("mode") {
Some("server") => server(address),
Some("client") => client(address),
Some("both") => {
let address1 = address.clone();
background = Some(thread::spawn(|| server(address1)));
thread::sleep(Duration::from_millis(200)); //start client after server
_ => panic!("invalid mode, run --help!"),
if let Some(background) = background {
fn server(address: ProtocolAddr) {
let r = Arc::new(Runtime::new().unwrap());
let server = Network::new(Pid::new(), Arc::clone(&r));
let server = Arc::new(server);
let participants = Arc::new(RwLock::new(Vec::new()));
r.block_on(async {
loop {
let p1 = Arc::new(server.connected().await.unwrap());
let server1 = server.clone();
tokio::spawn(client_connection(server1, p1, participants.clone()));
async fn client_connection(
_network: Arc<Network>,
participant: Arc<Participant>,
participants: Arc<RwLock<Vec<Arc<Participant>>>>,
) {
let mut s1 = participant.opened().await.unwrap();
let username = s1.recv::<String>().await.unwrap();
println!("[{}] connected", username);
loop {
match s1.recv::<String>().await {
Err(_) => {
Ok(msg) => {
println!("[{}]: {}", username, msg);
for p in participants.read().await.iter() {
match p
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
Err(_) => info!("error talking to client, //TODO drop it"),
Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(),
println!("[{}] disconnected", username);
fn client(address: ProtocolAddr) {
let r = Arc::new(Runtime::new().unwrap());
let client = Network::new(Pid::new(), Arc::clone(&r));
r.block_on(async {
let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1
let mut s1 = p1
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.unwrap(); //remote representation of s1
let mut input_lines = io::BufReader::new(io::stdin());
println!("Enter your username:");
let mut username = String::new();
input_lines.read_line(&mut username).await.unwrap();
username = username.split_whitespace().collect();
println!("Your username is: {}", username);
println!("write /quit to close");
loop {
let mut line = String::new();
input_lines.read_line(&mut line).await.unwrap();
line = line.split_whitespace().collect();
if line.as_str() == "/quit" {
} else {
thread::sleep(Duration::from_millis(30)); // TODO: still needed for correct shutdown
// I am quite lazy, the sending is done in a single stream above, but for
// receiving i open and close a stream per message. this can be done easier but
// this allows me to be quite lazy on the server side and just get a list of
// all participants and send to them...
async fn read_messages(participant: Participant) {
while let Ok(mut s) = participant.opened().await {
let (username, message) = s.recv::<(String, String)>().await.unwrap();
println!("[{}]: {}", username, message);
println!("gracefully shut down");