veloren/network/examples/fileshare/server.rs
Marcel Märtens 9028578bc8 Change the way Network is dropped.
Instead of keeping Runtime and manually spawn a task on `drop` this task is spawned at start and will wait to be triggered.
The `drop` methods then wait for completion, UNLESS they are in a async context, then they MUST NOT BLOCK (deadlock potential), so they defer it to the Runtime and HOPE for the runtime to exist long enough.
This get rid of the weird `block_in_place` which is only accessable with `rt-multi-threaded` and has some disadvantages.
We also wont requiere the runtime to be active all the time. Though its needed for a clean shutdown
2021-03-03 11:28:40 +01:00

208 lines
8.2 KiB
Rust

use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo};
use futures_util::{FutureExt, StreamExt};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
fs, join,
runtime::Runtime,
sync::{mpsc, Mutex, RwLock},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
#[derive(Debug)]
struct ControlChannels {
command_receiver: mpsc::UnboundedReceiver<LocalCommand>,
}
pub struct Server {
run_channels: Option<ControlChannels>,
network: Network,
served: RwLock<Vec<FileInfo>>,
remotes: RwLock<HashMap<Pid, Arc<Mutex<RemoteInfo>>>>,
receiving_files: Mutex<HashMap<u32, Option<String>>>,
}
impl Server {
pub fn new(runtime: Arc<Runtime>) -> (Self, mpsc::UnboundedSender<LocalCommand>) {
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let network = Network::new(Pid::new(), &runtime);
let run_channels = Some(ControlChannels { command_receiver });
(
Server {
run_channels,
network,
served: RwLock::new(vec![]),
remotes: RwLock::new(HashMap::new()),
receiving_files: Mutex::new(HashMap::new()),
},
command_sender,
)
}
pub async fn run(mut self, address: ProtocolAddr) {
let run_channels = self.run_channels.take().unwrap();
self.network.listen(address).await.unwrap();
join!(
self.command_manager(run_channels.command_receiver,),
self.connect_manager(),
);
}
async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver<LocalCommand>) {
trace!("Start command_manager");
let command_receiver = UnboundedReceiverStream::new(command_receiver);
command_receiver
.for_each_concurrent(None, async move |cmd| {
match cmd {
LocalCommand::Shutdown => println!("Shutting down service"),
LocalCommand::Disconnect => {
self.remotes.write().await.clear();
println!("Disconnecting all connections");
},
LocalCommand::Connect(addr) => {
println!("Trying to connect to: {:?}", &addr);
match self.network.connect(addr.clone()).await {
Ok(p) => self.loop_participant(p).await,
Err(e) => println!("Failed to connect to {:?}, err: {:?}", &addr, e),
}
},
LocalCommand::Serve(fileinfo) => {
self.served.write().await.push(fileinfo.clone());
println!("Serving file: {:?}", fileinfo.path);
},
LocalCommand::List => {
let mut total_file_infos = vec![];
for ri in self.remotes.read().await.values() {
let mut ri = ri.lock().await;
ri.cmd_out.send(Command::List).unwrap();
let mut file_infos = ri.cmd_out.recv::<Vec<FileInfo>>().await.unwrap();
ri.insert_infos(file_infos.clone());
total_file_infos.append(&mut file_infos);
}
print_fileinfos(&total_file_infos);
},
LocalCommand::Get(id, path) => {
// i dont know the owner, just broadcast, i am laaaazyyy
for ri in self.remotes.read().await.values() {
let mut ri = ri.lock().await;
if ri.get_info(id).is_some() {
//found provider, send request.
self.receiving_files.lock().await.insert(id, path.clone());
ri.cmd_out.send(Command::Get(id)).unwrap();
// the answer is handled via the other stream!
break;
}
}
},
}
})
.await;
trace!("Stop command_manager");
}
async fn connect_manager(&self) {
trace!("Start connect_manager");
let iter = futures_util::stream::unfold((), |_| {
self.network.connected().map(|r| r.ok().map(|v| (v, ())))
});
iter.for_each_concurrent(/* limit */ None, async move |participant| {
self.loop_participant(participant).await;
})
.await;
trace!("Stop connect_manager");
}
#[allow(clippy::eval_order_dependence)]
async fn loop_participant(&self, p: Participant) {
if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = (
p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await,
p.open(6, Promises::CONSISTENCY, 0).await,
p.opened().await,
p.opened().await,
) {
debug!(?p, "Connection successfully initiated");
let id = p.remote_pid();
let ri = Arc::new(Mutex::new(RemoteInfo::new(cmd_out, file_out, p)));
self.remotes.write().await.insert(id, ri.clone());
join!(
self.handle_remote_cmd(cmd_in, ri.clone()),
self.handle_files(file_in, ri.clone()),
);
}
}
async fn handle_remote_cmd(&self, mut stream: Stream, remote_info: Arc<Mutex<RemoteInfo>>) {
while let Ok(msg) = stream.recv::<Command>().await {
println!("Got message: {:?}", &msg);
match msg {
Command::List => {
info!("Request to send my list");
let served = self.served.read().await.clone();
stream.send(served).unwrap();
},
Command::Get(id) => {
for file_info in self.served.read().await.iter() {
if file_info.id() == id {
info!("Request to send file i got, sending it");
if let Ok(data) = file_info.load().await {
match remote_info.lock().await.file_out.send((file_info, data)) {
Ok(_) => debug!("send file"),
Err(e) => error!(?e, "sending file failed"),
}
} else {
warn!("Cannot send file as loading failed, oes it still exist?");
}
}
}
},
}
}
}
async fn handle_files(&self, mut stream: Stream, _remote_info: Arc<Mutex<RemoteInfo>>) {
while let Ok((fi, data)) = stream.recv::<(FileInfo, Vec<u8>)>().await {
debug!(?fi, "Got file");
let path = self.receiving_files.lock().await.remove(&fi.id()).flatten();
let path: PathBuf = match &path {
Some(path) => shellexpand::tilde(&path).parse().unwrap(),
None => {
let mut path = std::env::current_dir().unwrap();
path.push(fi.path().file_name().unwrap());
trace!("No path provided, saving down to {:?}", path);
path
},
};
debug!("Received file, going to save it under {:?}", path);
fs::write(path, data).await.unwrap();
}
}
}
fn print_fileinfos(infos: &[FileInfo]) {
let mut i = 0;
for info in infos {
let bytes = info.size;
match bytes {
0..100_000 => println!("{}: {}bytes '{}'", info.id(), bytes, info.path),
100_000..100_000_000 => {
println!("{}: {}bytes '{}'", info.id(), bytes / 1024, info.path)
},
_ => println!(
"{}: {}bytes '{}'",
info.id(),
bytes / 1024 / 1024,
info.path
),
}
i += 1;
}
println!("-- {} files available", i);
}