//!run with
//! ```bash
//! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006
//! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006 --mode=client
//! ```
use clap::{Arg, Command};
use std::{sync::Arc, thread, time::Duration};
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock};
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises, Stream};

///This example contains a simple chatserver, that allows to send messages
/// between participants, it's neither pretty nor perfect, but it should show
/// how to integrate network
fn main() {
    let matches = Command::new("Chat example")
        .version("0.1.0")
        .author("Marcel Märtens <marcel.cochem@googlemail.com>")
        .about("example chat implemented with veloren-network")
        .arg(
            Arg::new("mode")
                .short('m')
                .long("mode")
                .value_parser(["server", "client", "both"])
                .default_value("both")
                .help(
                    "choose whether you want to start the server or client or both needed for \
                     this program",
                ),
        )
        .arg(
            Arg::new("port")
                .short('p')
                .long("port")
                .default_value("52000")
                .help("port to listen on"),
        )
        .arg(
            Arg::new("ip")
                .long("ip")
                .default_value("127.0.0.1")
                .help("ip to listen and connect to"),
        )
        .arg(
            Arg::new("protocol")
                .long("protocol")
                .default_value("tcp")
                .value_parser(["tcp", "upd", "mpsc"])
                .help(
                    "underlying protocol used for this test, mpsc can only combined with mode=both",
                ),
        )
        .arg(
            Arg::new("trace")
                .short('t')
                .long("trace")
                .default_value("warn")
                .value_parser(["trace", "debug", "info", "warn", "error"])
                .help("set trace level, not this has a performance impact!"),
        )
        .get_matches();

    let trace = matches.get_one::<String>("trace").unwrap();
    let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap());
    tracing_subscriber::FmtSubscriber::builder()
        .with_max_level(Level::TRACE)
        .with_env_filter(filter)
        .init();

    let port = matches.get_one::<u16>("port").unwrap();
    let ip: &str = matches.get_one::<String>("ip").unwrap();
    let addresses = match matches.get_one::<String>("protocol").map(|s| s.as_str()) {
        Some("tcp") => (
            ListenAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
            ConnectAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
        ),
        Some("udp") => (
            ListenAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
            ConnectAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
        ),
        _ => panic!("invalid mode, run --help!"),
    };

    let mut background = None;
    match matches.get_one::<String>("mode").map(|s| s.as_str()) {
        Some("server") => server(addresses.0),
        Some("client") => client(addresses.1),
        Some("both") => {
            let s = addresses.0;
            background = Some(thread::spawn(|| server(s)));
            thread::sleep(Duration::from_millis(200)); //start client after server
            client(addresses.1)
        },
        _ => panic!("invalid mode, run --help!"),
    };
    if let Some(background) = background {
        background.join().unwrap();
    }
}

fn server(address: ListenAddr) {
    let r = Arc::new(Runtime::new().unwrap());
    let mut server = Network::new(Pid::new(), &r);
    let participants = Arc::new(RwLock::new(Vec::new()));
    r.block_on(async {
        server.listen(address).await.unwrap();
        loop {
            let mut p1 = server.connected().await.unwrap();
            let s1 = p1.opened().await.unwrap();
            participants.write().await.push(p1);
            tokio::spawn(client_connection(s1, participants.clone()));
        }
    });
}

async fn client_connection(mut s1: Stream, participants: Arc<RwLock<Vec<Participant>>>) {
    let username = s1.recv::<String>().await.unwrap();
    println!("[{}] connected", username);
    loop {
        match s1.recv::<String>().await {
            Err(_) => {
                break;
            },
            Ok(msg) => {
                println!("[{}]: {}", username, msg);
                for p in participants.read().await.iter() {
                    match p
                        .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
                        .await
                    {
                        Err(_) => info!("error talking to client, //TODO drop it"),
                        Ok(s) => s.send((username.clone(), msg.clone())).unwrap(),
                    };
                }
            },
        }
    }
    println!("[{}] disconnected", username);
}

fn client(address: ConnectAddr) {
    let r = Arc::new(Runtime::new().unwrap());
    let client = Network::new(Pid::new(), &r);

    r.block_on(async {
        let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1
        let s1 = p1
            .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
            .await
            .unwrap(); //remote representation of s1
        let mut input_lines = io::BufReader::new(io::stdin());
        println!("Enter your username:");
        let mut username = String::new();
        input_lines.read_line(&mut username).await.unwrap();
        username = username.split_whitespace().collect();
        println!("Your username is: {}", username);
        println!("write /quit to close");
        tokio::spawn(read_messages(p1));
        s1.send(username).unwrap();
        loop {
            let mut line = String::new();
            input_lines.read_line(&mut line).await.unwrap();
            line = line.split_whitespace().collect();
            if line.as_str() == "/quit" {
                println!("goodbye");
                break;
            } else {
                s1.send(line).unwrap();
            }
        }
    });
    thread::sleep(Duration::from_millis(30)); // TODO: still needed for correct shutdown
}

// I am quite lazy, the sending is done in a single stream above, but for
// receiving i open and close a stream per message. this can be done easier but
// this allows me to be quite lazy on the server side and just get a list of
// all participants and send to them...
async fn read_messages(mut participant: Participant) {
    while let Ok(mut s) = participant.opened().await {
        let (username, message) = s.recv::<(String, String)>().await.unwrap();
        println!("[{}]: {}", username, message);
    }
    println!("gracefully shut down");
}