From f6af1e911e5e64d78f024dc6bc39ddc1d6c3fbfb Mon Sep 17 00:00:00 2001 From: ccgauche Date: Mon, 21 Jun 2021 22:28:54 +0200 Subject: [PATCH] Added concurrent execution of plugins --- common/state/src/plugin/memory_manager.rs | 24 +++----------- common/state/src/plugin/mod.rs | 35 +++++++++++++++++++- common/state/src/plugin/module.rs | 16 +++++----- common/state/src/plugin/wasm_env.rs | 2 +- plugin/rt/examples/hello.rs | 21 ++++++------ plugin/rt/src/lib.rs | 39 ++++++++++++++++++----- 6 files changed, 89 insertions(+), 48 deletions(-) diff --git a/common/state/src/plugin/memory_manager.rs b/common/state/src/plugin/memory_manager.rs index 5e23e4f0e0..bf3a3ab36b 100644 --- a/common/state/src/plugin/memory_manager.rs +++ b/common/state/src/plugin/memory_manager.rs @@ -1,6 +1,6 @@ -use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicPtr, Ordering}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use specs::{ storage::GenericReadStorage, Component, Entities, Entity, Read, ReadStorage, WriteStorage, }; @@ -104,19 +104,8 @@ impl EcsAccessManager { } } -pub struct MemoryManager { - pub pointer: AtomicU64, - pub length: AtomicU32, -} - -impl Default for MemoryManager { - fn default() -> Self { - Self { - pointer: AtomicU64::new(0), - length: AtomicU32::new(0), - } - } -} +#[derive(Default)] +pub struct MemoryManager; impl MemoryManager { /// This function check if the buffer is wide enough if not it realloc the @@ -128,9 +117,6 @@ impl MemoryManager { object_length: u32, allocator: &Function, ) -> Result { - if self.length.load(Ordering::SeqCst) >= object_length { - return Ok(self.pointer.load(Ordering::SeqCst)); - } let pointer = allocator .call(&[Value::I32(object_length as i32)]) .map_err(MemoryAllocationError::CantAllocate)?; @@ -139,8 +125,6 @@ impl MemoryManager { .i64() .ok_or(MemoryAllocationError::InvalidReturnType)?, ); - self.length.store(object_length, Ordering::SeqCst); - self.pointer.store(pointer, Ordering::SeqCst); Ok(pointer) } diff --git a/common/state/src/plugin/mod.rs b/common/state/src/plugin/mod.rs index 123762bd18..5617bf96ea 100644 --- a/common/state/src/plugin/mod.rs +++ b/common/state/src/plugin/mod.rs @@ -38,6 +38,39 @@ pub struct Plugin { } impl Plugin { + pub fn from_path(path: &Path) -> Result { + if !path.is_dir() { + return Err(PluginError::NoConfig); + } + + let mut toml = PathBuf::from(path); + + toml.push("plugin.toml"); + + let data = + toml::de::from_slice::(&std::fs::read(toml).map_err(PluginError::Io)?) + .map_err(PluginError::Toml)?; + + let modules = data + .modules + .iter() + .map(|path1| { + let mut module_file = PathBuf::from(path); + module_file.push(path1); + let wasm_data = std::fs::read(module_file).map_err(PluginError::Io)?; + PluginModule::new(data.name.to_owned(), &wasm_data).map_err(|e| { + PluginError::PluginModuleError(data.name.to_owned(), "".to_owned(), e) + }) + }) + .collect::>()?; + + Ok(Plugin { + data, + modules, + files: HashMap::new(), + }) + } + pub fn from_reader(mut reader: R) -> Result { let mut buf = Vec::new(); reader.read_to_end(&mut buf).map_err(PluginError::Io)?; @@ -165,7 +198,7 @@ impl PluginMgr { Plugin::from_reader(fs::File::open(entry.path()).map_err(PluginError::Io)?) .map(Some) } else { - Ok(None) + Plugin::from_path(&entry.path()).map(Some) } }) .filter_map(Result::transpose) diff --git a/common/state/src/plugin/module.rs b/common/state/src/plugin/module.rs index ccaff2112f..30680552c6 100644 --- a/common/state/src/plugin/module.rs +++ b/common/state/src/plugin/module.rs @@ -3,7 +3,7 @@ use std::{ borrow::Cow, convert::TryInto, marker::PhantomData, - sync::{Arc, Mutex}, + sync::{Arc, RwLock}, }; use specs::saveload::MarkerAllocator; @@ -17,14 +17,14 @@ use super::{ use plugin_api::{ raw::{RawAction, RawRequest, RawResponse}, - EcsAccessError, Event, + Event, }; #[derive(Clone)] /// This structure represent the WASM State of the plugin. pub struct PluginModule { ecs: Arc, - wasm_state: Arc>, + wasm_state: Arc>, memory_manager: Arc, events: HashSet, allocator: Function, @@ -57,7 +57,7 @@ impl PluginModule { fn raw_request(env: &HostFunctionEnvironment, ptr: i64, len: i64) -> i64 { let out = match env.read_data(from_i64(ptr), from_i64(len)) { Ok(data) => request(&env.ecs, data), - Err(e) => Err(()), + Err(_) => Err(()), }; // If an error happen set the i64 to 0 so the WASM side can tell an error @@ -109,7 +109,7 @@ impl PluginModule { .iter() .map(|(name, _)| name.to_string()) .collect(), - wasm_state: Arc::new(Mutex::new(instance)), + wasm_state: Arc::new(RwLock::new(instance)), name, }) } @@ -129,8 +129,8 @@ impl PluginModule { } // Store the ECS Pointer for later use in `retreives` let bytes = match self.ecs.execute_with(ecs, || { - let mut state = self.wasm_state.lock().unwrap(); - execute_raw(self, &mut state, &request.function_name, &request.bytes) + let state = self.wasm_state.read().unwrap(); + execute_raw(self, &state, &request.function_name, &request.bytes) }) { Ok(e) => e, Err(e) => return Some(Err(e)), @@ -189,7 +189,7 @@ pub fn from_i64(i: i64) -> u64 { u64::from_le_bytes(i.to_le_bytes()) } #[allow(clippy::needless_range_loop)] fn execute_raw( module: &PluginModule, - instance: &mut Instance, + instance: &Instance, event_name: &str, bytes: &[u8], ) -> Result, PluginModuleError> { diff --git a/common/state/src/plugin/wasm_env.rs b/common/state/src/plugin/wasm_env.rs index 210ece921b..0b23f8300b 100644 --- a/common/state/src/plugin/wasm_env.rs +++ b/common/state/src/plugin/wasm_env.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use wasmer::{Function, HostEnvInitError, Instance, LazyInit, Memory, WasmerEnv}; use super::{ diff --git a/plugin/rt/examples/hello.rs b/plugin/rt/examples/hello.rs index ca0b0ce547..f811dde9d5 100644 --- a/plugin/rt/examples/hello.rs +++ b/plugin/rt/examples/hello.rs @@ -14,16 +14,17 @@ pub fn on_load(game: &Game, init: event::Init) { #[event_handler] pub fn on_command_test(game: &Game, cmd: event::Command) -> Result, String> { - Ok(vec![ - format!( - "Entity with uid {:?} named {} with {:?} sent command with args {:?}", - cmd.entity().uid(), - cmd.entity().get_name(), - cmd.entity().get_health(), - cmd.args(), - ) - .into(), - ]) + Ok(vec!["test".to_owned()]) + // Ok(vec![ + // format!( + // "Entity with uid {:?} named {} with {:?} sent command with args + // {:?}", cmd.entity().uid(), + // cmd.entity().get_name(), + // cmd.entity().get_health(), + // cmd.args(), + // ) + // .into(), + // ]) } #[global_state] diff --git a/plugin/rt/src/lib.rs b/plugin/rt/src/lib.rs index d149f410b3..3afc94444e 100644 --- a/plugin/rt/src/lib.rs +++ b/plugin/rt/src/lib.rs @@ -49,10 +49,13 @@ pub fn emit_actions(actions: &[api::raw::RawAction]) { } pub fn print_str(s: &str) { - let bytes = s.as_bytes(); - unsafe { - // Safety: ptr and len are valid for byte slice - raw_print(to_i64(bytes.as_ptr() as _), to_i64(bytes.len() as _)); + #[cfg(target_arch = "wasm32")] + { + let bytes = s.as_bytes(); + unsafe { + // Safety: ptr and len are valid for byte slice + raw_print(to_i64(bytes.as_ptr() as _), to_i64(bytes.len() as _)); + } } } @@ -67,7 +70,15 @@ where T: Deserialize<'a>, { let slice = unsafe { ::std::slice::from_raw_parts(from_i64(ptr) as _, from_i64(len) as _) }; - bincode::deserialize(slice).map_err(|_| "Failed to deserialize function input") + let output = bincode::deserialize(slice).map_err(|_| "Failed to deserialize function input"); + // We free the allocated buffer if it exists. + if let Some((a, b)) = BUFFERS + .iter_mut() + .find(|(a, b)| !*a && b.as_ptr() as u64 == from_i64(ptr)) + { + *a = true; + } + output } /// This function split a u128 in two u64 encoding them as le bytes @@ -107,13 +118,25 @@ pub fn write_output(value: impl Serialize) -> i64 { } } -static mut BUFFERS: Vec = Vec::new(); +// Synchronisation safety is handled by the bool which enforces the Buffer to be +// used once at a time so no problem (is_free_to_use, data) +static mut BUFFERS: Vec<(bool, Vec)> = Vec::new(); /// Allocate buffer from wasm linear memory /// # Safety /// This function should never be used only intented to by used by the host #[no_mangle] pub unsafe fn wasm_prepare_buffer(size: i32) -> i64 { - BUFFERS = vec![0u8; size as usize]; - BUFFERS.as_ptr() as i64 + if let Some((a, x)) = BUFFERS.iter_mut().find(|(x, _)| *x) { + *a = false; + if x.len() < size as usize { + *x = vec![0u8; size as usize]; + } + x.as_ptr() as i64 + } else { + let vec = vec![0u8; size as usize]; + let ptr = vec.as_ptr() as i64; + BUFFERS.push((false, vec)); + ptr + } }