Added concurrent execution of plugins

This commit is contained in:
ccgauche 2021-06-21 22:28:54 +02:00
parent cd694b0d6c
commit f6af1e911e
6 changed files with 89 additions and 48 deletions

View File

@ -1,6 +1,6 @@
use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicPtr, Ordering};
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use specs::{ use specs::{
storage::GenericReadStorage, Component, Entities, Entity, Read, ReadStorage, WriteStorage, storage::GenericReadStorage, Component, Entities, Entity, Read, ReadStorage, WriteStorage,
}; };
@ -104,19 +104,8 @@ impl EcsAccessManager {
} }
} }
pub struct MemoryManager { #[derive(Default)]
pub pointer: AtomicU64, pub struct MemoryManager;
pub length: AtomicU32,
}
impl Default for MemoryManager {
fn default() -> Self {
Self {
pointer: AtomicU64::new(0),
length: AtomicU32::new(0),
}
}
}
impl MemoryManager { impl MemoryManager {
/// This function check if the buffer is wide enough if not it realloc the /// This function check if the buffer is wide enough if not it realloc the
@ -128,9 +117,6 @@ impl MemoryManager {
object_length: u32, object_length: u32,
allocator: &Function, allocator: &Function,
) -> Result<u64, MemoryAllocationError> { ) -> Result<u64, MemoryAllocationError> {
if self.length.load(Ordering::SeqCst) >= object_length {
return Ok(self.pointer.load(Ordering::SeqCst));
}
let pointer = allocator let pointer = allocator
.call(&[Value::I32(object_length as i32)]) .call(&[Value::I32(object_length as i32)])
.map_err(MemoryAllocationError::CantAllocate)?; .map_err(MemoryAllocationError::CantAllocate)?;
@ -139,8 +125,6 @@ impl MemoryManager {
.i64() .i64()
.ok_or(MemoryAllocationError::InvalidReturnType)?, .ok_or(MemoryAllocationError::InvalidReturnType)?,
); );
self.length.store(object_length, Ordering::SeqCst);
self.pointer.store(pointer, Ordering::SeqCst);
Ok(pointer) Ok(pointer)
} }

View File

@ -38,6 +38,39 @@ pub struct Plugin {
} }
impl Plugin { impl Plugin {
pub fn from_path(path: &Path) -> Result<Self, PluginError> {
if !path.is_dir() {
return Err(PluginError::NoConfig);
}
let mut toml = PathBuf::from(path);
toml.push("plugin.toml");
let data =
toml::de::from_slice::<PluginData>(&std::fs::read(toml).map_err(PluginError::Io)?)
.map_err(PluginError::Toml)?;
let modules = data
.modules
.iter()
.map(|path1| {
let mut module_file = PathBuf::from(path);
module_file.push(path1);
let wasm_data = std::fs::read(module_file).map_err(PluginError::Io)?;
PluginModule::new(data.name.to_owned(), &wasm_data).map_err(|e| {
PluginError::PluginModuleError(data.name.to_owned(), "<init>".to_owned(), e)
})
})
.collect::<Result<_, _>>()?;
Ok(Plugin {
data,
modules,
files: HashMap::new(),
})
}
pub fn from_reader<R: Read>(mut reader: R) -> Result<Self, PluginError> { pub fn from_reader<R: Read>(mut reader: R) -> Result<Self, PluginError> {
let mut buf = Vec::new(); let mut buf = Vec::new();
reader.read_to_end(&mut buf).map_err(PluginError::Io)?; reader.read_to_end(&mut buf).map_err(PluginError::Io)?;
@ -165,7 +198,7 @@ impl PluginMgr {
Plugin::from_reader(fs::File::open(entry.path()).map_err(PluginError::Io)?) Plugin::from_reader(fs::File::open(entry.path()).map_err(PluginError::Io)?)
.map(Some) .map(Some)
} else { } else {
Ok(None) Plugin::from_path(&entry.path()).map(Some)
} }
}) })
.filter_map(Result::transpose) .filter_map(Result::transpose)

View File

@ -3,7 +3,7 @@ use std::{
borrow::Cow, borrow::Cow,
convert::TryInto, convert::TryInto,
marker::PhantomData, marker::PhantomData,
sync::{Arc, Mutex}, sync::{Arc, RwLock},
}; };
use specs::saveload::MarkerAllocator; use specs::saveload::MarkerAllocator;
@ -17,14 +17,14 @@ use super::{
use plugin_api::{ use plugin_api::{
raw::{RawAction, RawRequest, RawResponse}, raw::{RawAction, RawRequest, RawResponse},
EcsAccessError, Event, Event,
}; };
#[derive(Clone)] #[derive(Clone)]
/// This structure represent the WASM State of the plugin. /// This structure represent the WASM State of the plugin.
pub struct PluginModule { pub struct PluginModule {
ecs: Arc<EcsAccessManager>, ecs: Arc<EcsAccessManager>,
wasm_state: Arc<Mutex<Instance>>, wasm_state: Arc<RwLock<Instance>>,
memory_manager: Arc<MemoryManager>, memory_manager: Arc<MemoryManager>,
events: HashSet<String>, events: HashSet<String>,
allocator: Function, allocator: Function,
@ -57,7 +57,7 @@ impl PluginModule {
fn raw_request(env: &HostFunctionEnvironment, ptr: i64, len: i64) -> i64 { fn raw_request(env: &HostFunctionEnvironment, ptr: i64, len: i64) -> i64 {
let out = match env.read_data(from_i64(ptr), from_i64(len)) { let out = match env.read_data(from_i64(ptr), from_i64(len)) {
Ok(data) => request(&env.ecs, data), Ok(data) => request(&env.ecs, data),
Err(e) => Err(()), Err(_) => Err(()),
}; };
// If an error happen set the i64 to 0 so the WASM side can tell an error // If an error happen set the i64 to 0 so the WASM side can tell an error
@ -109,7 +109,7 @@ impl PluginModule {
.iter() .iter()
.map(|(name, _)| name.to_string()) .map(|(name, _)| name.to_string())
.collect(), .collect(),
wasm_state: Arc::new(Mutex::new(instance)), wasm_state: Arc::new(RwLock::new(instance)),
name, name,
}) })
} }
@ -129,8 +129,8 @@ impl PluginModule {
} }
// Store the ECS Pointer for later use in `retreives` // Store the ECS Pointer for later use in `retreives`
let bytes = match self.ecs.execute_with(ecs, || { let bytes = match self.ecs.execute_with(ecs, || {
let mut state = self.wasm_state.lock().unwrap(); let state = self.wasm_state.read().unwrap();
execute_raw(self, &mut state, &request.function_name, &request.bytes) execute_raw(self, &state, &request.function_name, &request.bytes)
}) { }) {
Ok(e) => e, Ok(e) => e,
Err(e) => return Some(Err(e)), Err(e) => return Some(Err(e)),
@ -189,7 +189,7 @@ pub fn from_i64(i: i64) -> u64 { u64::from_le_bytes(i.to_le_bytes()) }
#[allow(clippy::needless_range_loop)] #[allow(clippy::needless_range_loop)]
fn execute_raw( fn execute_raw(
module: &PluginModule, module: &PluginModule,
instance: &mut Instance, instance: &Instance,
event_name: &str, event_name: &str,
bytes: &[u8], bytes: &[u8],
) -> Result<Vec<u8>, PluginModuleError> { ) -> Result<Vec<u8>, PluginModuleError> {

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use wasmer::{Function, HostEnvInitError, Instance, LazyInit, Memory, WasmerEnv}; use wasmer::{Function, HostEnvInitError, Instance, LazyInit, Memory, WasmerEnv};
use super::{ use super::{

View File

@ -14,16 +14,17 @@ pub fn on_load(game: &Game, init: event::Init) {
#[event_handler] #[event_handler]
pub fn on_command_test(game: &Game, cmd: event::Command) -> Result<Vec<String>, String> { pub fn on_command_test(game: &Game, cmd: event::Command) -> Result<Vec<String>, String> {
Ok(vec![ Ok(vec!["test".to_owned()])
format!( // Ok(vec![
"Entity with uid {:?} named {} with {:?} sent command with args {:?}", // format!(
cmd.entity().uid(), // "Entity with uid {:?} named {} with {:?} sent command with args
cmd.entity().get_name(), // {:?}", cmd.entity().uid(),
cmd.entity().get_health(), // cmd.entity().get_name(),
cmd.args(), // cmd.entity().get_health(),
) // cmd.args(),
.into(), // )
]) // .into(),
// ])
} }
#[global_state] #[global_state]

View File

@ -49,10 +49,13 @@ pub fn emit_actions(actions: &[api::raw::RawAction]) {
} }
pub fn print_str(s: &str) { pub fn print_str(s: &str) {
let bytes = s.as_bytes(); #[cfg(target_arch = "wasm32")]
unsafe { {
// Safety: ptr and len are valid for byte slice let bytes = s.as_bytes();
raw_print(to_i64(bytes.as_ptr() as _), to_i64(bytes.len() as _)); unsafe {
// Safety: ptr and len are valid for byte slice
raw_print(to_i64(bytes.as_ptr() as _), to_i64(bytes.len() as _));
}
} }
} }
@ -67,7 +70,15 @@ where
T: Deserialize<'a>, T: Deserialize<'a>,
{ {
let slice = unsafe { ::std::slice::from_raw_parts(from_i64(ptr) as _, from_i64(len) as _) }; let slice = unsafe { ::std::slice::from_raw_parts(from_i64(ptr) as _, from_i64(len) as _) };
bincode::deserialize(slice).map_err(|_| "Failed to deserialize function input") let output = bincode::deserialize(slice).map_err(|_| "Failed to deserialize function input");
// We free the allocated buffer if it exists.
if let Some((a, b)) = BUFFERS
.iter_mut()
.find(|(a, b)| !*a && b.as_ptr() as u64 == from_i64(ptr))
{
*a = true;
}
output
} }
/// This function split a u128 in two u64 encoding them as le bytes /// This function split a u128 in two u64 encoding them as le bytes
@ -107,13 +118,25 @@ pub fn write_output(value: impl Serialize) -> i64 {
} }
} }
static mut BUFFERS: Vec<u8> = Vec::new(); // Synchronisation safety is handled by the bool which enforces the Buffer to be
// used once at a time so no problem (is_free_to_use, data)
static mut BUFFERS: Vec<(bool, Vec<u8>)> = Vec::new();
/// Allocate buffer from wasm linear memory /// Allocate buffer from wasm linear memory
/// # Safety /// # Safety
/// This function should never be used only intented to by used by the host /// This function should never be used only intented to by used by the host
#[no_mangle] #[no_mangle]
pub unsafe fn wasm_prepare_buffer(size: i32) -> i64 { pub unsafe fn wasm_prepare_buffer(size: i32) -> i64 {
BUFFERS = vec![0u8; size as usize]; if let Some((a, x)) = BUFFERS.iter_mut().find(|(x, _)| *x) {
BUFFERS.as_ptr() as i64 *a = false;
if x.len() < size as usize {
*x = vec![0u8; size as usize];
}
x.as_ptr() as i64
} else {
let vec = vec![0u8; size as usize];
let ptr = vec.as_ptr() as i64;
BUFFERS.push((false, vec));
ptr
}
} }