use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo};
use futures_util::{FutureExt, StreamExt};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
    fs, join,
    runtime::Runtime,
    sync::{mpsc, Mutex, RwLock},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};

#[derive(Debug)]
struct ControlChannels {
    command_receiver: mpsc::UnboundedReceiver<LocalCommand>,
}

pub struct Server {
    run_channels: Option<ControlChannels>,
    network: Network,
    served: RwLock<Vec<FileInfo>>,
    remotes: RwLock<HashMap<Pid, Arc<Mutex<RemoteInfo>>>>,
    receiving_files: Mutex<HashMap<u32, Option<String>>>,
}

impl Server {
    pub fn new(runtime: Arc<Runtime>) -> (Self, mpsc::UnboundedSender<LocalCommand>) {
        let (command_sender, command_receiver) = mpsc::unbounded_channel();

        let network = Network::new(Pid::new(), &runtime);

        let run_channels = Some(ControlChannels { command_receiver });
        (
            Server {
                run_channels,
                network,
                served: RwLock::new(vec![]),
                remotes: RwLock::new(HashMap::new()),
                receiving_files: Mutex::new(HashMap::new()),
            },
            command_sender,
        )
    }

    pub async fn run(mut self, address: ProtocolAddr) {
        let run_channels = self.run_channels.take().unwrap();

        self.network.listen(address).await.unwrap();

        join!(
            self.command_manager(run_channels.command_receiver,),
            self.connect_manager(),
        );
    }

    async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver<LocalCommand>) {
        trace!("Start command_manager");
        let command_receiver = UnboundedReceiverStream::new(command_receiver);
        command_receiver
            .for_each_concurrent(None, async move |cmd| {
                match cmd {
                    LocalCommand::Shutdown => println!("Shutting down service"),
                    LocalCommand::Disconnect => {
                        self.remotes.write().await.clear();
                        println!("Disconnecting all connections");
                    },
                    LocalCommand::Connect(addr) => {
                        println!("Trying to connect to: {:?}", &addr);
                        match self.network.connect(addr.clone()).await {
                            Ok(p) => self.loop_participant(p).await,
                            Err(e) => println!("Failed to connect to {:?}, err: {:?}", &addr, e),
                        }
                    },
                    LocalCommand::Serve(fileinfo) => {
                        self.served.write().await.push(fileinfo.clone());
                        println!("Serving file: {:?}", fileinfo.path);
                    },
                    LocalCommand::List => {
                        let mut total_file_infos = vec![];
                        for ri in self.remotes.read().await.values() {
                            let mut ri = ri.lock().await;
                            ri.cmd_out.send(Command::List).unwrap();
                            let mut file_infos = ri.cmd_out.recv::<Vec<FileInfo>>().await.unwrap();
                            ri.insert_infos(file_infos.clone());
                            total_file_infos.append(&mut file_infos);
                        }
                        print_fileinfos(&total_file_infos);
                    },
                    LocalCommand::Get(id, path) => {
                        // i dont know the owner, just broadcast, i am laaaazyyy
                        for ri in self.remotes.read().await.values() {
                            let mut ri = ri.lock().await;
                            if ri.get_info(id).is_some() {
                                //found provider, send request.
                                self.receiving_files.lock().await.insert(id, path.clone());
                                ri.cmd_out.send(Command::Get(id)).unwrap();
                                // the answer is handled via the other stream!
                                break;
                            }
                        }
                    },
                }
            })
            .await;
        trace!("Stop command_manager");
    }

    async fn connect_manager(&self) {
        trace!("Start connect_manager");
        let iter = futures_util::stream::unfold((), |_| {
            self.network.connected().map(|r| r.ok().map(|v| (v, ())))
        });

        iter.for_each_concurrent(/* limit */ None, async move |participant| {
            self.loop_participant(participant).await;
        })
        .await;
        trace!("Stop connect_manager");
    }

    #[allow(clippy::eval_order_dependence)]
    async fn loop_participant(&self, p: Participant) {
        if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = (
            p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0)
                .await,
            p.open(6, Promises::CONSISTENCY, 0).await,
            p.opened().await,
            p.opened().await,
        ) {
            debug!(?p, "Connection successfully initiated");
            let id = p.remote_pid();
            let ri = Arc::new(Mutex::new(RemoteInfo::new(cmd_out, file_out, p)));
            self.remotes.write().await.insert(id, ri.clone());
            join!(
                self.handle_remote_cmd(cmd_in, ri.clone()),
                self.handle_files(file_in, ri.clone()),
            );
        }
    }

    async fn handle_remote_cmd(&self, mut stream: Stream, remote_info: Arc<Mutex<RemoteInfo>>) {
        while let Ok(msg) = stream.recv::<Command>().await {
            println!("Got message: {:?}", &msg);
            match msg {
                Command::List => {
                    info!("Request to send my list");
                    let served = self.served.read().await.clone();
                    stream.send(served).unwrap();
                },
                Command::Get(id) => {
                    for file_info in self.served.read().await.iter() {
                        if file_info.id() == id {
                            info!("Request to send file i got, sending it");
                            if let Ok(data) = file_info.load().await {
                                match remote_info.lock().await.file_out.send((file_info, data)) {
                                    Ok(_) => debug!("send file"),
                                    Err(e) => error!(?e, "sending file failed"),
                                }
                            } else {
                                warn!("Cannot send file as loading failed, oes it still exist?");
                            }
                        }
                    }
                },
            }
        }
    }

    async fn handle_files(&self, mut stream: Stream, _remote_info: Arc<Mutex<RemoteInfo>>) {
        while let Ok((fi, data)) = stream.recv::<(FileInfo, Vec<u8>)>().await {
            debug!(?fi, "Got file");
            let path = self.receiving_files.lock().await.remove(&fi.id()).flatten();
            let path: PathBuf = match &path {
                Some(path) => shellexpand::tilde(&path).parse().unwrap(),
                None => {
                    let mut path = std::env::current_dir().unwrap();
                    path.push(fi.path().file_name().unwrap());
                    trace!("No path provided, saving down to {:?}", path);
                    path
                },
            };
            debug!("Received file, going to save it under {:?}", path);
            fs::write(path, data).await.unwrap();
        }
    }
}

fn print_fileinfos(infos: &[FileInfo]) {
    let mut i = 0;
    for info in infos {
        let bytes = info.size;
        match bytes {
            0..100_000 => println!("{}: {}bytes '{}'", info.id(), bytes, info.path),
            100_000..100_000_000 => {
                println!("{}: {}bytes '{}'", info.id(), bytes / 1024, info.path)
            },
            _ => println!(
                "{}: {}bytes '{}'",
                info.id(),
                bytes / 1024 / 1024,
                info.path
            ),
        }
        i += 1;
    }
    println!("-- {} files available", i);
}