Add channel setup for persistence to move character operations off the

main thread.
This commit is contained in:
Shane Handley 2020-06-02 18:16:23 +10:00
parent 3628bfc2a6
commit 38b146d840
7 changed files with 225 additions and 67 deletions

View File

@ -39,6 +39,7 @@ use common::{
vol::{ReadVol, RectVolSize},
};
use metrics::{ServerMetrics, TickMetrics};
use persistence::character::{CharacterListUpdater, CharacterUpdater};
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
use std::{
i32,
@ -100,10 +101,8 @@ impl Server {
state.ecs_mut().insert(ChunkGenerator::new());
state
.ecs_mut()
.insert(persistence::character::CharacterUpdater::new(
settings.persistence_db_dir.clone(),
));
state.ecs_mut().insert(crate::settings::PersistenceDBDir(
.insert(CharacterUpdater::new(settings.persistence_db_dir.clone()));
state.ecs_mut().insert(CharacterListUpdater::new(
settings.persistence_db_dir.clone(),
));
@ -302,8 +301,10 @@ impl Server {
// 5) Go through the terrain update queue and apply all changes to
// the terrain
// 6) Send relevant state updates to all clients
// 7) Update Metrics with current data
// 8) Finish the tick, passing control of the main thread back
// 7) Check for persistence updates related to character data, and message the
// relevant entities
// 8) Update Metrics with current data
// 9) Finish the tick, passing control of the main thread back
// to the frontend
// 1) Build up a list of events for this frame, to be passed to the frontend.
@ -374,14 +375,33 @@ impl Server {
.map(|(entity, _, _)| entity)
.collect::<Vec<_>>()
};
for entity in to_delete {
if let Err(e) = self.state.delete_entity_recorded(entity) {
error!(?e, "Failed to delete agent outside the terrain");
}
}
// 7 Persistence updates
// Get completed updates to character lists and notify the client
self.state
.ecs()
.read_resource::<persistence::character::CharacterListUpdater>()
.messages()
.for_each(|result| match result.result {
Ok(character_data) => self.notify_client(
result.entity,
ServerMsg::CharacterListUpdate(character_data),
),
Err(error) => self.notify_client(
result.entity,
ServerMsg::CharacterActionError(error.to_string()),
),
});
let end_of_server_tick = Instant::now();
// 7) Update Metrics
// 8) Update Metrics
// Get system timing info
let entity_sync_nanos = self
.state
@ -496,7 +516,7 @@ impl Server {
.set(end_of_server_tick.elapsed().as_nanos() as i64);
self.metrics.tick();
// 8) Finish the tick, pass control back to the frontend.
// 9) Finish the tick, pass control back to the frontend.
Ok(frontend_events)
}

View File

@ -1,3 +1,5 @@
//! Database operations related to characters
extern crate diesel;
use super::{
@ -14,10 +16,160 @@ use common::{
character::{Character as CharacterData, CharacterItem, MAX_CHARACTERS_PER_PLAYER},
LoadoutBuilder,
};
use crossbeam::channel;
use crossbeam::{channel, channel::TryIter};
use diesel::prelude::*;
use tracing::{error, warn};
/// Available database operations when modifying a player's characetr list
enum CharacterListRequestKind {
CreateCharacter {
player_uuid: String,
character_alias: String,
character_tool: Option<String>,
body: comp::Body,
},
DeleteCharacter {
player_uuid: String,
character_id: i32,
},
LoadCharacterList {
player_uuid: String,
},
}
/// Common format dispatched in response to an update request
#[derive(Debug)]
pub struct CharacterListResponse {
pub entity: specs::Entity,
pub result: CharacterListResult,
}
/// A bi-directional messaging resource for making modifications to a player's
/// character list in a background thread.
///
/// This is used exclusively during character selection, and handles loading the
/// player's character list, deleting characters, and creating new characters.
/// Operations not related to the character list (such as saving a character's
/// inventory, stats, etc..) are performed by the [`CharacterUpdater`]
pub struct CharacterListUpdater {
update_rx: Option<channel::Receiver<CharacterListResponse>>,
update_tx: Option<channel::Sender<CharacterListRequest>>,
handle: Option<std::thread::JoinHandle<()>>,
}
type CharacterListRequest = (specs::Entity, CharacterListRequestKind);
impl CharacterListUpdater {
pub fn new(db_dir: String) -> Self {
let (update_tx, internal_rx) = channel::unbounded::<CharacterListRequest>();
let (internal_tx, update_rx) = channel::unbounded::<CharacterListResponse>();
let handle = std::thread::spawn(move || {
while let Ok(request) = internal_rx.recv() {
let (entity, kind) = request;
if let Err(err) = internal_tx.send(CharacterListResponse {
entity,
result: match kind {
CharacterListRequestKind::CreateCharacter {
player_uuid,
character_alias,
character_tool,
body,
} => create_character(
&player_uuid,
character_alias,
character_tool,
&body,
&db_dir,
),
CharacterListRequestKind::DeleteCharacter {
player_uuid,
character_id,
} => delete_character(&player_uuid, character_id, &db_dir),
CharacterListRequestKind::LoadCharacterList { player_uuid } => {
load_character_list(&player_uuid, &db_dir)
},
},
}) {
log::error!("Could not send persistence request: {:?}", err);
}
}
});
Self {
update_tx: Some(update_tx),
update_rx: Some(update_rx),
handle: Some(handle),
}
}
/// Create a new character belonging to the player identified by
/// `player_uuid`.
pub fn create_character(
&self,
entity: specs::Entity,
player_uuid: String,
character_alias: String,
character_tool: Option<String>,
body: comp::Body,
) {
if let Err(err) = self.update_tx.as_ref().unwrap().send((
entity,
CharacterListRequestKind::CreateCharacter {
player_uuid,
character_alias,
character_tool,
body,
},
)) {
log::error!("Could not send character creation request: {:?}", err);
}
}
/// Delete a character by `id` and `player_uuid`.
pub fn delete_character(&self, entity: specs::Entity, player_uuid: String, character_id: i32) {
if let Err(err) = self.update_tx.as_ref().unwrap().send((
entity,
CharacterListRequestKind::DeleteCharacter {
player_uuid,
character_id,
},
)) {
log::error!("Could not send character deletion request: {:?}", err);
}
}
/// Loads a list of characters belonging to the player identified by
/// `player_uuid`
pub fn load_character_list(&self, entity: specs::Entity, player_uuid: String) {
if let Err(err) = self
.update_tx
.as_ref()
.unwrap()
.send((entity, CharacterListRequestKind::LoadCharacterList {
player_uuid,
}))
{
log::error!("Could not send character list load request: {:?}", err);
}
}
/// Returns a non-blocking iterator over CharacterListResponse messages
pub fn messages(&self) -> TryIter<CharacterListResponse> {
self.update_rx.as_ref().unwrap().try_iter()
}
}
impl Drop for CharacterListUpdater {
fn drop(&mut self) {
drop(self.update_tx.take());
if let Err(err) = self.handle.take().unwrap().join() {
log::error!("Error from joining character update thread: {:?}", err);
}
}
}
type CharacterListResult = Result<Vec<CharacterItem>, Error>;
/// Load stored data for a character.
@ -103,8 +255,7 @@ pub fn load_character_data(
/// In the event that a join fails, for a character (i.e. they lack an entry for
/// stats, body, etc...) the character is skipped, and no entry will be
/// returned.
pub fn load_character_list(player_uuid: &str, db_dir: &str) -> CharacterListResult {
fn load_character_list(player_uuid: &str, db_dir: &str) -> CharacterListResult {
let data = schema::character::dsl::character
.filter(schema::character::player_uuid.eq(player_uuid))
.order(schema::character::id.desc())
@ -146,8 +297,8 @@ pub fn load_character_list(player_uuid: &str, db_dir: &str) -> CharacterListResu
/// Note that sqlite does not support returning the inserted data after a
/// successful insert. To workaround, we wrap this in a transaction which
/// inserts, queries for the newly created chaacter id, then uses the character
/// id for insertion of the `body` table entry
pub fn create_character(
/// id for subsequent insertions
fn create_character(
uuid: &str,
character_alias: String,
character_tool: Option<String>,
@ -243,7 +394,7 @@ pub fn create_character(
}
/// Delete a character. Returns the updated character list.
pub fn delete_character(uuid: &str, character_id: i32, db_dir: &str) -> CharacterListResult {
fn delete_character(uuid: &str, character_id: i32, db_dir: &str) -> CharacterListResult {
use schema::character::dsl::*;
diesel::delete(
@ -256,6 +407,8 @@ pub fn delete_character(uuid: &str, character_id: i32, db_dir: &str) -> Characte
load_character_list(uuid, db_dir)
}
/// Before creating a character, we ensure that the limit on the number of
/// characters has not been exceeded
fn check_character_limit(uuid: &str, db_dir: &str) -> Result<(), Error> {
use diesel::dsl::count_star;
use schema::character::dsl::*;
@ -277,8 +430,13 @@ fn check_character_limit(uuid: &str, db_dir: &str) -> Result<(), Error> {
}
}
pub type CharacterUpdateData = (StatsUpdate, InventoryUpdate, LoadoutUpdate);
type CharacterUpdateData = (StatsUpdate, InventoryUpdate, LoadoutUpdate);
/// A unidirectional messaging resource for saving characters in a
/// background thread.
///
/// This is used to make updates to a character and their persisted components,
/// such as inventory, loadout, etc...
pub struct CharacterUpdater {
update_tx: Option<channel::Sender<Vec<(i32, CharacterUpdateData)>>>,
handle: Option<std::thread::JoinHandle<()>>,
@ -299,6 +457,7 @@ impl CharacterUpdater {
}
}
/// Updates a collection of characters based on their id and components
pub fn batch_update<'a>(
&self,
updates: impl Iterator<Item = (i32, &'a comp::Stats, &'a comp::Inventory, &'a comp::Loadout)>,
@ -321,6 +480,7 @@ impl CharacterUpdater {
}
}
/// Updates a single character based on their id and components
pub fn update(
&self,
character_id: i32,

View File

@ -1,3 +1,5 @@
//! Consolidates Diesel and validation errors under a common error type
extern crate diesel;
use std::fmt;

View File

@ -1,3 +1,11 @@
//! DB operations and schema migrations
//!
//! This code uses several [`Diesel ORM`](http://diesel.rs/) tools for DB operations:
//! - [`diesel-migrations`](https://docs.rs/diesel_migrations/1.4.0/diesel_migrations/)
//! for managing table migrations
//! - [`diesel-cli`](https://github.com/diesel-rs/diesel/tree/master/diesel_cli/)
//! for generating and testing migrations
pub mod character;
mod error;
@ -16,6 +24,7 @@ use tracing::warn;
// for the `embedded_migrations` call below.
embed_migrations!();
/// Runs any pending database migrations. This is executed during server startup
pub fn run_migrations(db_dir: &str) -> Result<(), diesel_migrations::RunMigrationsError> {
let db_dir = &apply_saves_dir_override(db_dir);
let _ = fs::create_dir(format!("{}/", db_dir));
@ -47,15 +56,13 @@ fn establish_connection(db_dir: &str) -> SqliteConnection {
connection
}
#[allow(clippy::single_match)] // TODO: Pending review in #587
fn apply_saves_dir_override(db_dir: &str) -> String {
if let Some(saves_dir) = env::var_os("VELOREN_SAVES_DIR") {
let path = PathBuf::from(saves_dir.clone());
if path.exists() || path.parent().map(|x| x.exists()).unwrap_or(false) {
// Only allow paths with valid unicode characters
match path.to_str() {
Some(path) => return path.to_owned(),
None => {},
if let Some(path) = path.to_str() {
return path.to_owned();
}
}
warn!(?saves_dir, "VELOREN_SAVES_DIR points to an invalid path.");

View File

@ -79,8 +79,8 @@ impl From<&Body> for comp::Body {
}
}
/// `Stats` represents the stats for a character, which has a one-to-one
/// relationship with Characters.
/// `Stats` represents the stats for a character, and have a one-to-one
/// relationship with `Character`.
#[derive(Associations, AsChangeset, Identifiable, Queryable, Debug, Insertable)]
#[belongs_to(Character)]
#[primary_key(character_id)]
@ -144,8 +144,8 @@ impl From<&comp::Stats> for StatsUpdate {
/// Inventory storage and conversion. Inventories have a one-to-one relationship
/// with characters.
///
/// We store the players inventory as a single TEXT column which is serialised
/// JSON representation of the Inventory component.
/// We store inventory rows as a (character_id, json) tuples, where the json is
/// a serialised Inventory component.
#[derive(Associations, AsChangeset, Identifiable, Queryable, Debug, Insertable)]
#[belongs_to(Character)]
#[primary_key(character_id)]

View File

@ -132,5 +132,3 @@ impl ServerSettings {
fn get_settings_path() -> PathBuf { PathBuf::from(r"server_settings.ron") }
}
pub struct PersistenceDBDir(pub String);

View File

@ -1,6 +1,6 @@
use super::SysTimer;
use crate::{
auth_provider::AuthProvider, client::Client, persistence, settings::PersistenceDBDir,
auth_provider::AuthProvider, client::Client, persistence::character::CharacterListUpdater,
CLIENT_TIMEOUT,
};
use common::{
@ -32,7 +32,7 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>,
ReadExpect<'a, PersistenceDBDir>,
ReadExpect<'a, CharacterListUpdater>,
ReadExpect<'a, TerrainGrid>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
@ -60,7 +60,7 @@ impl<'a> System<'a> for Sys {
entities,
server_event_bus,
time,
persistence_db_dir,
char_list,
terrain,
mut timer,
uids,
@ -81,8 +81,6 @@ impl<'a> System<'a> for Sys {
) {
timer.start();
let persistence_db_dir = &persistence_db_dir.0;
let mut server_emitter = server_event_bus.emitter();
let mut new_chat_msgs = Vec::new();
@ -334,54 +332,27 @@ impl<'a> System<'a> for Sys {
},
ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) {
match persistence::character::load_character_list(
&player.uuid().to_string(),
persistence_db_dir,
) {
Ok(character_list) => {
client.notify(ServerMsg::CharacterListUpdate(character_list));
},
Err(error) => {
client
.notify(ServerMsg::CharacterActionError(error.to_string()));
},
}
char_list.load_character_list(entity, player.uuid().to_string())
}
},
ClientMsg::CreateCharacter { alias, tool, body } => {
if let Some(player) = players.get(entity) {
match persistence::character::create_character(
&player.uuid().to_string(),
char_list.create_character(
entity,
player.uuid().to_string(),
alias,
tool,
&body,
persistence_db_dir,
) {
Ok(character_list) => {
client.notify(ServerMsg::CharacterListUpdate(character_list));
},
Err(error) => {
client
.notify(ServerMsg::CharacterActionError(error.to_string()));
},
}
body,
);
}
},
ClientMsg::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
match persistence::character::delete_character(
&player.uuid().to_string(),
char_list.delete_character(
entity,
player.uuid().to_string(),
character_id,
persistence_db_dir,
) {
Ok(character_list) => {
client.notify(ServerMsg::CharacterListUpdate(character_list));
},
Err(error) => {
client
.notify(ServerMsg::CharacterActionError(error.to_string()));
},
}
);
}
},
}