From 38b146d8405ab0a68626c1b7fc0a6f07b9b4181c Mon Sep 17 00:00:00 2001 From: Shane Handley Date: Tue, 2 Jun 2020 18:16:23 +1000 Subject: [PATCH] Add channel setup for persistence to move character operations off the main thread. --- server/src/lib.rs | 36 ++++-- server/src/persistence/character.rs | 174 ++++++++++++++++++++++++++-- server/src/persistence/error.rs | 2 + server/src/persistence/mod.rs | 15 ++- server/src/persistence/models.rs | 8 +- server/src/settings.rs | 2 - server/src/sys/message.rs | 55 +++------ 7 files changed, 225 insertions(+), 67 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index db19408fc0..1b6a6daf59 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -39,6 +39,7 @@ use common::{ vol::{ReadVol, RectVolSize}, }; use metrics::{ServerMetrics, TickMetrics}; +use persistence::character::{CharacterListUpdater, CharacterUpdater}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; use std::{ i32, @@ -100,10 +101,8 @@ impl Server { state.ecs_mut().insert(ChunkGenerator::new()); state .ecs_mut() - .insert(persistence::character::CharacterUpdater::new( - settings.persistence_db_dir.clone(), - )); - state.ecs_mut().insert(crate::settings::PersistenceDBDir( + .insert(CharacterUpdater::new(settings.persistence_db_dir.clone())); + state.ecs_mut().insert(CharacterListUpdater::new( settings.persistence_db_dir.clone(), )); @@ -302,8 +301,10 @@ impl Server { // 5) Go through the terrain update queue and apply all changes to // the terrain // 6) Send relevant state updates to all clients - // 7) Update Metrics with current data - // 8) Finish the tick, passing control of the main thread back + // 7) Check for persistence updates related to character data, and message the + // relevant entities + // 8) Update Metrics with current data + // 9) Finish the tick, passing control of the main thread back // to the frontend // 1) Build up a list of events for this frame, to be passed to the frontend. @@ -374,14 +375,33 @@ impl Server { .map(|(entity, _, _)| entity) .collect::>() }; + for entity in to_delete { if let Err(e) = self.state.delete_entity_recorded(entity) { error!(?e, "Failed to delete agent outside the terrain"); } } + // 7 Persistence updates + // Get completed updates to character lists and notify the client + self.state + .ecs() + .read_resource::() + .messages() + .for_each(|result| match result.result { + Ok(character_data) => self.notify_client( + result.entity, + ServerMsg::CharacterListUpdate(character_data), + ), + Err(error) => self.notify_client( + result.entity, + ServerMsg::CharacterActionError(error.to_string()), + ), + }); + let end_of_server_tick = Instant::now(); - // 7) Update Metrics + + // 8) Update Metrics // Get system timing info let entity_sync_nanos = self .state @@ -496,7 +516,7 @@ impl Server { .set(end_of_server_tick.elapsed().as_nanos() as i64); self.metrics.tick(); - // 8) Finish the tick, pass control back to the frontend. + // 9) Finish the tick, pass control back to the frontend. Ok(frontend_events) } diff --git a/server/src/persistence/character.rs b/server/src/persistence/character.rs index 58a08594ff..160b439fc8 100644 --- a/server/src/persistence/character.rs +++ b/server/src/persistence/character.rs @@ -1,3 +1,5 @@ +//! Database operations related to characters + extern crate diesel; use super::{ @@ -14,10 +16,160 @@ use common::{ character::{Character as CharacterData, CharacterItem, MAX_CHARACTERS_PER_PLAYER}, LoadoutBuilder, }; -use crossbeam::channel; +use crossbeam::{channel, channel::TryIter}; use diesel::prelude::*; use tracing::{error, warn}; +/// Available database operations when modifying a player's characetr list +enum CharacterListRequestKind { + CreateCharacter { + player_uuid: String, + character_alias: String, + character_tool: Option, + body: comp::Body, + }, + DeleteCharacter { + player_uuid: String, + character_id: i32, + }, + LoadCharacterList { + player_uuid: String, + }, +} + +/// Common format dispatched in response to an update request +#[derive(Debug)] +pub struct CharacterListResponse { + pub entity: specs::Entity, + pub result: CharacterListResult, +} + +/// A bi-directional messaging resource for making modifications to a player's +/// character list in a background thread. +/// +/// This is used exclusively during character selection, and handles loading the +/// player's character list, deleting characters, and creating new characters. +/// Operations not related to the character list (such as saving a character's +/// inventory, stats, etc..) are performed by the [`CharacterUpdater`] +pub struct CharacterListUpdater { + update_rx: Option>, + update_tx: Option>, + handle: Option>, +} + +type CharacterListRequest = (specs::Entity, CharacterListRequestKind); + +impl CharacterListUpdater { + pub fn new(db_dir: String) -> Self { + let (update_tx, internal_rx) = channel::unbounded::(); + let (internal_tx, update_rx) = channel::unbounded::(); + + let handle = std::thread::spawn(move || { + while let Ok(request) = internal_rx.recv() { + let (entity, kind) = request; + + if let Err(err) = internal_tx.send(CharacterListResponse { + entity, + result: match kind { + CharacterListRequestKind::CreateCharacter { + player_uuid, + character_alias, + character_tool, + body, + } => create_character( + &player_uuid, + character_alias, + character_tool, + &body, + &db_dir, + ), + CharacterListRequestKind::DeleteCharacter { + player_uuid, + character_id, + } => delete_character(&player_uuid, character_id, &db_dir), + CharacterListRequestKind::LoadCharacterList { player_uuid } => { + load_character_list(&player_uuid, &db_dir) + }, + }, + }) { + log::error!("Could not send persistence request: {:?}", err); + } + } + }); + + Self { + update_tx: Some(update_tx), + update_rx: Some(update_rx), + handle: Some(handle), + } + } + + /// Create a new character belonging to the player identified by + /// `player_uuid`. + pub fn create_character( + &self, + entity: specs::Entity, + player_uuid: String, + character_alias: String, + character_tool: Option, + body: comp::Body, + ) { + if let Err(err) = self.update_tx.as_ref().unwrap().send(( + entity, + CharacterListRequestKind::CreateCharacter { + player_uuid, + character_alias, + character_tool, + body, + }, + )) { + log::error!("Could not send character creation request: {:?}", err); + } + } + + /// Delete a character by `id` and `player_uuid`. + pub fn delete_character(&self, entity: specs::Entity, player_uuid: String, character_id: i32) { + if let Err(err) = self.update_tx.as_ref().unwrap().send(( + entity, + CharacterListRequestKind::DeleteCharacter { + player_uuid, + character_id, + }, + )) { + log::error!("Could not send character deletion request: {:?}", err); + } + } + + /// Loads a list of characters belonging to the player identified by + /// `player_uuid` + pub fn load_character_list(&self, entity: specs::Entity, player_uuid: String) { + if let Err(err) = self + .update_tx + .as_ref() + .unwrap() + .send((entity, CharacterListRequestKind::LoadCharacterList { + player_uuid, + })) + { + log::error!("Could not send character list load request: {:?}", err); + } + } + + /// Returns a non-blocking iterator over CharacterListResponse messages + pub fn messages(&self) -> TryIter { + self.update_rx.as_ref().unwrap().try_iter() + } +} + +impl Drop for CharacterListUpdater { + fn drop(&mut self) { + drop(self.update_tx.take()); + if let Err(err) = self.handle.take().unwrap().join() { + log::error!("Error from joining character update thread: {:?}", err); + } + } +} + type CharacterListResult = Result, Error>; /// Load stored data for a character. @@ -103,8 +255,7 @@ pub fn load_character_data( /// In the event that a join fails, for a character (i.e. they lack an entry for /// stats, body, etc...) the character is skipped, and no entry will be /// returned. - -pub fn load_character_list(player_uuid: &str, db_dir: &str) -> CharacterListResult { +fn load_character_list(player_uuid: &str, db_dir: &str) -> CharacterListResult { let data = schema::character::dsl::character .filter(schema::character::player_uuid.eq(player_uuid)) .order(schema::character::id.desc()) @@ -146,8 +297,8 @@ pub fn load_character_list(player_uuid: &str, db_dir: &str) -> CharacterListResu /// Note that sqlite does not support returning the inserted data after a /// successful insert. To workaround, we wrap this in a transaction which /// inserts, queries for the newly created chaacter id, then uses the character -/// id for insertion of the `body` table entry -pub fn create_character( +/// id for subsequent insertions +fn create_character( uuid: &str, character_alias: String, character_tool: Option, @@ -243,7 +394,7 @@ pub fn create_character( } /// Delete a character. Returns the updated character list. -pub fn delete_character(uuid: &str, character_id: i32, db_dir: &str) -> CharacterListResult { +fn delete_character(uuid: &str, character_id: i32, db_dir: &str) -> CharacterListResult { use schema::character::dsl::*; diesel::delete( @@ -256,6 +407,8 @@ pub fn delete_character(uuid: &str, character_id: i32, db_dir: &str) -> Characte load_character_list(uuid, db_dir) } +/// Before creating a character, we ensure that the limit on the number of +/// characters has not been exceeded fn check_character_limit(uuid: &str, db_dir: &str) -> Result<(), Error> { use diesel::dsl::count_star; use schema::character::dsl::*; @@ -277,8 +430,13 @@ fn check_character_limit(uuid: &str, db_dir: &str) -> Result<(), Error> { } } -pub type CharacterUpdateData = (StatsUpdate, InventoryUpdate, LoadoutUpdate); +type CharacterUpdateData = (StatsUpdate, InventoryUpdate, LoadoutUpdate); +/// A unidirectional messaging resource for saving characters in a +/// background thread. +/// +/// This is used to make updates to a character and their persisted components, +/// such as inventory, loadout, etc... pub struct CharacterUpdater { update_tx: Option>>, handle: Option>, @@ -299,6 +457,7 @@ impl CharacterUpdater { } } + /// Updates a collection of characters based on their id and components pub fn batch_update<'a>( &self, updates: impl Iterator, @@ -321,6 +480,7 @@ impl CharacterUpdater { } } + /// Updates a single character based on their id and components pub fn update( &self, character_id: i32, diff --git a/server/src/persistence/error.rs b/server/src/persistence/error.rs index 7d0398bf6a..0086fa5cb4 100644 --- a/server/src/persistence/error.rs +++ b/server/src/persistence/error.rs @@ -1,3 +1,5 @@ +//! Consolidates Diesel and validation errors under a common error type + extern crate diesel; use std::fmt; diff --git a/server/src/persistence/mod.rs b/server/src/persistence/mod.rs index 25363b00e3..34c1d4377a 100644 --- a/server/src/persistence/mod.rs +++ b/server/src/persistence/mod.rs @@ -1,3 +1,11 @@ +//! DB operations and schema migrations +//! +//! This code uses several [`Diesel ORM`](http://diesel.rs/) tools for DB operations: +//! - [`diesel-migrations`](https://docs.rs/diesel_migrations/1.4.0/diesel_migrations/) +//! for managing table migrations +//! - [`diesel-cli`](https://github.com/diesel-rs/diesel/tree/master/diesel_cli/) +//! for generating and testing migrations + pub mod character; mod error; @@ -16,6 +24,7 @@ use tracing::warn; // for the `embedded_migrations` call below. embed_migrations!(); +/// Runs any pending database migrations. This is executed during server startup pub fn run_migrations(db_dir: &str) -> Result<(), diesel_migrations::RunMigrationsError> { let db_dir = &apply_saves_dir_override(db_dir); let _ = fs::create_dir(format!("{}/", db_dir)); @@ -47,15 +56,13 @@ fn establish_connection(db_dir: &str) -> SqliteConnection { connection } -#[allow(clippy::single_match)] // TODO: Pending review in #587 fn apply_saves_dir_override(db_dir: &str) -> String { if let Some(saves_dir) = env::var_os("VELOREN_SAVES_DIR") { let path = PathBuf::from(saves_dir.clone()); if path.exists() || path.parent().map(|x| x.exists()).unwrap_or(false) { // Only allow paths with valid unicode characters - match path.to_str() { - Some(path) => return path.to_owned(), - None => {}, + if let Some(path) = path.to_str() { + return path.to_owned(); } } warn!(?saves_dir, "VELOREN_SAVES_DIR points to an invalid path."); diff --git a/server/src/persistence/models.rs b/server/src/persistence/models.rs index 9e4c910425..e3873777b7 100644 --- a/server/src/persistence/models.rs +++ b/server/src/persistence/models.rs @@ -79,8 +79,8 @@ impl From<&Body> for comp::Body { } } -/// `Stats` represents the stats for a character, which has a one-to-one -/// relationship with Characters. +/// `Stats` represents the stats for a character, and have a one-to-one +/// relationship with `Character`. #[derive(Associations, AsChangeset, Identifiable, Queryable, Debug, Insertable)] #[belongs_to(Character)] #[primary_key(character_id)] @@ -144,8 +144,8 @@ impl From<&comp::Stats> for StatsUpdate { /// Inventory storage and conversion. Inventories have a one-to-one relationship /// with characters. /// -/// We store the players inventory as a single TEXT column which is serialised -/// JSON representation of the Inventory component. +/// We store inventory rows as a (character_id, json) tuples, where the json is +/// a serialised Inventory component. #[derive(Associations, AsChangeset, Identifiable, Queryable, Debug, Insertable)] #[belongs_to(Character)] #[primary_key(character_id)] diff --git a/server/src/settings.rs b/server/src/settings.rs index 9cb9c60de2..98794d1c78 100644 --- a/server/src/settings.rs +++ b/server/src/settings.rs @@ -132,5 +132,3 @@ impl ServerSettings { fn get_settings_path() -> PathBuf { PathBuf::from(r"server_settings.ron") } } - -pub struct PersistenceDBDir(pub String); diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 84e8fc099c..cfb22998d2 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -1,6 +1,6 @@ use super::SysTimer; use crate::{ - auth_provider::AuthProvider, client::Client, persistence, settings::PersistenceDBDir, + auth_provider::AuthProvider, client::Client, persistence::character::CharacterListUpdater, CLIENT_TIMEOUT, }; use common::{ @@ -32,7 +32,7 @@ impl<'a> System<'a> for Sys { Entities<'a>, Read<'a, EventBus>, Read<'a, Time>, - ReadExpect<'a, PersistenceDBDir>, + ReadExpect<'a, CharacterListUpdater>, ReadExpect<'a, TerrainGrid>, Write<'a, SysTimer>, ReadStorage<'a, Uid>, @@ -60,7 +60,7 @@ impl<'a> System<'a> for Sys { entities, server_event_bus, time, - persistence_db_dir, + char_list, terrain, mut timer, uids, @@ -81,8 +81,6 @@ impl<'a> System<'a> for Sys { ) { timer.start(); - let persistence_db_dir = &persistence_db_dir.0; - let mut server_emitter = server_event_bus.emitter(); let mut new_chat_msgs = Vec::new(); @@ -334,54 +332,27 @@ impl<'a> System<'a> for Sys { }, ClientMsg::RequestCharacterList => { if let Some(player) = players.get(entity) { - match persistence::character::load_character_list( - &player.uuid().to_string(), - persistence_db_dir, - ) { - Ok(character_list) => { - client.notify(ServerMsg::CharacterListUpdate(character_list)); - }, - Err(error) => { - client - .notify(ServerMsg::CharacterActionError(error.to_string())); - }, - } + char_list.load_character_list(entity, player.uuid().to_string()) } }, ClientMsg::CreateCharacter { alias, tool, body } => { if let Some(player) = players.get(entity) { - match persistence::character::create_character( - &player.uuid().to_string(), + char_list.create_character( + entity, + player.uuid().to_string(), alias, tool, - &body, - persistence_db_dir, - ) { - Ok(character_list) => { - client.notify(ServerMsg::CharacterListUpdate(character_list)); - }, - Err(error) => { - client - .notify(ServerMsg::CharacterActionError(error.to_string())); - }, - } + body, + ); } }, ClientMsg::DeleteCharacter(character_id) => { if let Some(player) = players.get(entity) { - match persistence::character::delete_character( - &player.uuid().to_string(), + char_list.delete_character( + entity, + player.uuid().to_string(), character_id, - persistence_db_dir, - ) { - Ok(character_list) => { - client.notify(ServerMsg::CharacterListUpdate(character_list)); - }, - Err(error) => { - client - .notify(ServerMsg::CharacterActionError(error.to_string())); - }, - } + ); } }, }