Make the persistence system code more generic so that it handles all

data associated with a character, rather than individually as we were
planning to do with stats/inv/etc... This removes potential for DB locking when we deal with each individually, and we
should have plenty of room for additional writes within the transaction.
This commit is contained in:
S Handley 2020-06-01 21:34:52 +00:00 committed by Imbris
parent 43dc2322bf
commit b1d191301a
16 changed files with 286 additions and 118 deletions

View File

@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- NPCs call for help when attacked - NPCs call for help when attacked
- Eyebrows and shapes can now be selected - Eyebrows and shapes can now be selected
- Character name and level information to chat, social tab and `/players` command. - Character name and level information to chat, social tab and `/players` command.
- Added inventory saving
### Changed ### Changed

1
Cargo.lock generated
View File

@ -5039,6 +5039,7 @@ dependencies = [
"scan_fmt", "scan_fmt",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json",
"specs", "specs",
"specs-idvs", "specs-idvs",
"uvth", "uvth",

View File

@ -23,6 +23,7 @@ scan_fmt = "0.2.4"
ron = "0.5.1" ron = "0.5.1"
serde = "1.0.102" serde = "1.0.102"
serde_derive = "1.0.102" serde_derive = "1.0.102"
serde_json = "1.0"
rand = { version = "0.7.2", features = ["small_rng"] } rand = { version = "0.7.2", features = ["small_rng"] }
chrono = "0.4.9" chrono = "0.4.9"
hashbrown = { version = "0.6.2", features = ["rayon", "serde", "nightly"] } hashbrown = { version = "0.6.2", features = ["rayon", "serde", "nightly"] }

View File

@ -72,13 +72,16 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
} }
// Sync the player's character data to the database // Sync the player's character data to the database
if let (Some(player), Some(stats), updater) = ( if let (Some(player), Some(stats), Some(inventory), updater) = (
state.read_storage::<Player>().get(entity), state.read_storage::<Player>().get(entity),
state.read_storage::<comp::Stats>().get(entity), state.read_storage::<comp::Stats>().get(entity),
state.ecs().read_resource::<persistence::stats::Updater>(), state.read_storage::<comp::Inventory>().get(entity),
state
.ecs()
.read_resource::<persistence::character::CharacterUpdater>(),
) { ) {
if let Some(character_id) = player.character_id { if let Some(character_id) = player.character_id {
updater.update(character_id, stats); updater.update(character_id, stats, inventory);
} }
} }

View File

@ -94,7 +94,9 @@ impl Server {
.insert(AuthProvider::new(settings.auth_server_address.clone())); .insert(AuthProvider::new(settings.auth_server_address.clone()));
state.ecs_mut().insert(Tick(0)); state.ecs_mut().insert(Tick(0));
state.ecs_mut().insert(ChunkGenerator::new()); state.ecs_mut().insert(ChunkGenerator::new());
state.ecs_mut().insert(persistence::stats::Updater::new( state
.ecs_mut()
.insert(persistence::character::CharacterUpdater::new(
settings.persistence_db_dir.clone(), settings.persistence_db_dir.clone(),
)); ));
state.ecs_mut().insert(crate::settings::PersistenceDBDir( state.ecs_mut().insert(crate::settings::PersistenceDBDir(
@ -110,16 +112,12 @@ impl Server {
state.ecs_mut().insert(sys::TerrainTimer::default()); state.ecs_mut().insert(sys::TerrainTimer::default());
state.ecs_mut().insert(sys::WaypointTimer::default()); state.ecs_mut().insert(sys::WaypointTimer::default());
state.ecs_mut().insert(sys::SpeechBubbleTimer::default()); state.ecs_mut().insert(sys::SpeechBubbleTimer::default());
state state.ecs_mut().insert(sys::PersistenceTimer::default());
.ecs_mut()
.insert(sys::StatsPersistenceTimer::default());
// System schedulers to control execution of systems // System schedulers to control execution of systems
state state
.ecs_mut() .ecs_mut()
.insert(sys::StatsPersistenceScheduler::every(Duration::from_secs( .insert(sys::PersistenceScheduler::every(Duration::from_secs(10)));
10,
)));
// Server-only components // Server-only components
state.ecs_mut().register::<RegionSubscription>(); state.ecs_mut().register::<RegionSubscription>();
@ -398,7 +396,7 @@ impl Server {
let stats_persistence_nanos = self let stats_persistence_nanos = self
.state .state
.ecs() .ecs()
.read_resource::<sys::StatsPersistenceTimer>() .read_resource::<sys::PersistenceTimer>()
.nanos as i64; .nanos as i64;
let total_sys_ran_in_dispatcher_nanos = terrain_nanos + waypoint_nanos; let total_sys_ran_in_dispatcher_nanos = terrain_nanos + waypoint_nanos;

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS "inventory";

View File

@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS "inventory" (
character_id INTEGER PRIMARY KEY NOT NULL,
items TEXT NOT NULL,
FOREIGN KEY(character_id) REFERENCES "character"(id) ON DELETE CASCADE
);

View File

@ -3,11 +3,15 @@ extern crate diesel;
use super::{ use super::{
error::Error, error::Error,
establish_connection, establish_connection,
models::{Body, Character, NewCharacter, Stats, StatsJoinData}, models::{
Body, Character, Inventory, InventoryUpdate, NewCharacter, Stats, StatsJoinData,
StatsUpdate,
},
schema, schema,
}; };
use crate::comp; use crate::comp;
use common::character::{Character as CharacterData, CharacterItem, MAX_CHARACTERS_PER_PLAYER}; use common::character::{Character as CharacterData, CharacterItem, MAX_CHARACTERS_PER_PLAYER};
use crossbeam::channel;
use diesel::prelude::*; use diesel::prelude::*;
type CharacterListResult = Result<Vec<CharacterItem>, Error>; type CharacterListResult = Result<Vec<CharacterItem>, Error>;
@ -16,18 +20,47 @@ type CharacterListResult = Result<Vec<CharacterItem>, Error>;
/// ///
/// After first logging in, and after a character is selected, we fetch this /// After first logging in, and after a character is selected, we fetch this
/// data for the purpose of inserting their persisted data for the entity. /// data for the purpose of inserting their persisted data for the entity.
pub fn load_character_data(character_id: i32, db_dir: &str) -> Result<comp::Stats, Error> { pub fn load_character_data(
let (character_data, body_data, stats_data) = schema::character::dsl::character character_id: i32,
db_dir: &str,
) -> Result<(comp::Stats, comp::Inventory), Error> {
let connection = establish_connection(db_dir);
let (character_data, body_data, stats_data, maybe_inventory) =
schema::character::dsl::character
.filter(schema::character::id.eq(character_id)) .filter(schema::character::id.eq(character_id))
.inner_join(schema::body::table) .inner_join(schema::body::table)
.inner_join(schema::stats::table) .inner_join(schema::stats::table)
.first::<(Character, Body, Stats)>(&establish_connection(db_dir))?; .left_join(schema::inventory::table)
.first::<(Character, Body, Stats, Option<Inventory>)>(&connection)?;
Ok(comp::Stats::from(StatsJoinData { Ok((
comp::Stats::from(StatsJoinData {
alias: &character_data.alias, alias: &character_data.alias,
body: &comp::Body::from(&body_data), body: &comp::Body::from(&body_data),
stats: &stats_data, stats: &stats_data,
})) }),
maybe_inventory.map_or_else(
|| {
// If no inventory record was found for the character, create it now
let row = Inventory::from((character_data.id, comp::Inventory::default()));
if let Err(error) = diesel::insert_into(schema::inventory::table)
.values(&row)
.execute(&connection)
{
log::warn!(
"Failed to create an inventory record for character {}: {}",
&character_data.id,
error
)
}
comp::Inventory::default()
},
|inv| comp::Inventory::from(inv),
),
))
} }
/// Loads a list of characters belonging to the player. This data is a small /// Loads a list of characters belonging to the player. This data is a small
@ -79,7 +112,7 @@ pub fn create_character(
let connection = establish_connection(db_dir); let connection = establish_connection(db_dir);
connection.transaction::<_, diesel::result::Error, _>(|| { connection.transaction::<_, diesel::result::Error, _>(|| {
use schema::{body, character, character::dsl::*, stats}; use schema::{body, character, character::dsl::*, inventory, stats};
match body { match body {
comp::Body::Humanoid(body_data) => { comp::Body::Humanoid(body_data) => {
@ -130,6 +163,14 @@ pub fn create_character(
diesel::insert_into(stats::table) diesel::insert_into(stats::table)
.values(&new_stats) .values(&new_stats)
.execute(&connection)?; .execute(&connection)?;
// Default inventory
let inventory =
Inventory::from((inserted_character.id, comp::Inventory::default()));
diesel::insert_into(inventory::table)
.values(&inventory)
.execute(&connection)?;
}, },
_ => log::warn!("Creating non-humanoid characters is not supported."), _ => log::warn!("Creating non-humanoid characters is not supported."),
}; };
@ -174,3 +215,103 @@ fn check_character_limit(uuid: &str, db_dir: &str) -> Result<(), Error> {
_ => Ok(()), _ => Ok(()),
} }
} }
pub type CharacterUpdateData = (StatsUpdate, InventoryUpdate);
pub struct CharacterUpdater {
update_tx: Option<channel::Sender<Vec<(i32, CharacterUpdateData)>>>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl CharacterUpdater {
pub fn new(db_dir: String) -> Self {
let (update_tx, update_rx) = channel::unbounded::<Vec<(i32, CharacterUpdateData)>>();
let handle = std::thread::spawn(move || {
while let Ok(updates) = update_rx.recv() {
batch_update(updates.into_iter(), &db_dir);
}
});
Self {
update_tx: Some(update_tx),
handle: Some(handle),
}
}
pub fn batch_update<'a>(
&self,
updates: impl Iterator<Item = (i32, &'a comp::Stats, &'a comp::Inventory)>,
) {
let updates = updates
.map(|(id, stats, inventory)| {
(
id,
(StatsUpdate::from(stats), InventoryUpdate::from(inventory)),
)
})
.collect();
if let Err(err) = self.update_tx.as_ref().unwrap().send(updates) {
log::error!("Could not send stats updates: {:?}", err);
}
}
pub fn update(&self, character_id: i32, stats: &comp::Stats, inventory: &comp::Inventory) {
self.batch_update(std::iter::once((character_id, stats, inventory)));
}
}
fn batch_update(updates: impl Iterator<Item = (i32, CharacterUpdateData)>, db_dir: &str) {
let connection = establish_connection(db_dir);
if let Err(err) = connection.transaction::<_, diesel::result::Error, _>(|| {
updates.for_each(|(character_id, (stats_update, inventory_update))| {
update(character_id, &stats_update, &inventory_update, &connection)
});
Ok(())
}) {
log::error!("Error during stats batch update transaction: {:?}", err);
}
}
fn update(
character_id: i32,
stats: &StatsUpdate,
inventory: &InventoryUpdate,
connection: &SqliteConnection,
) {
if let Err(error) =
diesel::update(schema::stats::table.filter(schema::stats::character_id.eq(character_id)))
.set(stats)
.execute(connection)
{
log::warn!(
"Failed to update stats for character: {:?}: {:?}",
character_id,
error
)
}
if let Err(error) = diesel::update(
schema::inventory::table.filter(schema::inventory::character_id.eq(character_id)),
)
.set(inventory)
.execute(connection)
{
log::warn!(
"Failed to update inventory for character: {:?}: {:?}",
character_id,
error
)
}
}
impl Drop for CharacterUpdater {
fn drop(&mut self) {
drop(self.update_tx.take());
if let Err(err) = self.handle.take().unwrap().join() {
log::error!("Error from joining character update thread: {:?}", err);
}
}
}

View File

@ -1,5 +1,4 @@
pub mod character; pub mod character;
pub mod stats;
mod error; mod error;
mod models; mod models;

View File

@ -1,6 +1,10 @@
use super::schema::{body, character, stats}; extern crate serde_json;
use super::schema::{body, character, inventory, stats};
use crate::comp; use crate::comp;
use common::character::Character as CharacterData; use common::character::Character as CharacterData;
use diesel::sql_types::Text;
use serde::{Deserialize, Serialize};
/// The required elements to build comp::Stats from database data /// The required elements to build comp::Stats from database data
pub struct StatsJoinData<'a> { pub struct StatsJoinData<'a> {
@ -132,6 +136,84 @@ impl From<&comp::Stats> for StatsUpdate {
} }
} }
#[derive(Associations, AsChangeset, Identifiable, Queryable, Debug, Insertable)]
#[belongs_to(Character)]
#[primary_key(character_id)]
#[table_name = "inventory"]
pub struct Inventory {
character_id: i32,
items: InventoryData,
}
impl From<(i32, comp::Inventory)> for Inventory {
fn from(data: (i32, comp::Inventory)) -> Inventory {
let (character_id, inventory) = data;
Inventory {
character_id,
items: InventoryData(inventory),
}
}
}
impl From<Inventory> for comp::Inventory {
fn from(inventory: Inventory) -> comp::Inventory { inventory.items.0 }
}
#[derive(AsChangeset, Debug, PartialEq)]
#[primary_key(character_id)]
#[table_name = "inventory"]
pub struct InventoryUpdate {
pub items: InventoryData,
}
impl From<&comp::Inventory> for InventoryUpdate {
fn from(inventory: &comp::Inventory) -> InventoryUpdate {
InventoryUpdate {
items: InventoryData(inventory.clone()),
}
}
}
/// Type handling for a character's inventory, which is stored as JSON strings
#[derive(SqlType, AsExpression, Debug, Deserialize, Serialize, FromSqlRow, PartialEq)]
#[sql_type = "Text"]
pub struct InventoryData(comp::Inventory);
impl<DB> diesel::deserialize::FromSql<Text, DB> for InventoryData
where
DB: diesel::backend::Backend,
String: diesel::deserialize::FromSql<Text, DB>,
{
fn from_sql(
bytes: Option<&<DB as diesel::backend::Backend>::RawValue>,
) -> diesel::deserialize::Result<Self> {
let t = String::from_sql(bytes)?;
match serde_json::from_str(&t) {
Ok(data) => Ok(Self(data)),
Err(error) => {
log::warn!("Failed to deserialise inventory data: {}", error);
Ok(Self(comp::Inventory::default()))
},
}
}
}
impl<DB> diesel::serialize::ToSql<Text, DB> for InventoryData
where
DB: diesel::backend::Backend,
{
fn to_sql<W: std::io::Write>(
&self,
out: &mut diesel::serialize::Output<W, DB>,
) -> diesel::serialize::Result {
let s = serde_json::to_string(&self.0)?;
<String as diesel::serialize::ToSql<Text, DB>>::to_sql(&s, out)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -22,6 +22,13 @@ table! {
} }
} }
table! {
inventory (character_id) {
character_id -> Integer,
items -> Text,
}
}
table! { table! {
stats (character_id) { stats (character_id) {
character_id -> Integer, character_id -> Integer,
@ -34,6 +41,7 @@ table! {
} }
joinable!(body -> character (character_id)); joinable!(body -> character (character_id));
joinable!(inventory -> character (character_id));
joinable!(stats -> character (character_id)); joinable!(stats -> character (character_id));
allow_tables_to_appear_in_same_query!(body, character, stats,); allow_tables_to_appear_in_same_query!(body, character, inventory, stats);

View File

@ -1,76 +0,0 @@
extern crate diesel;
use super::{establish_connection, models::StatsUpdate, schema};
use crate::comp;
use crossbeam::channel;
use diesel::prelude::*;
fn update(character_id: i32, stats: &StatsUpdate, connection: &SqliteConnection) {
if let Err(error) =
diesel::update(schema::stats::table.filter(schema::stats::character_id.eq(character_id)))
.set(stats)
.execute(connection)
{
log::warn!(
"Failed to update stats for character: {:?}: {:?}",
character_id,
error
)
}
}
fn batch_update(updates: impl Iterator<Item = (i32, StatsUpdate)>, db_dir: &str) {
let connection = establish_connection(db_dir);
if let Err(err) = connection.transaction::<_, diesel::result::Error, _>(|| {
updates.for_each(|(character_id, stats_update)| {
update(character_id, &stats_update, &connection)
});
Ok(())
}) {
log::error!("Error during stats batch update transaction: {:?}", err);
}
}
pub struct Updater {
update_tx: Option<channel::Sender<Vec<(i32, StatsUpdate)>>>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl Updater {
pub fn new(db_dir: String) -> Self {
let (update_tx, update_rx) = channel::unbounded::<Vec<(i32, StatsUpdate)>>();
let handle = std::thread::spawn(move || {
while let Ok(updates) = update_rx.recv() {
batch_update(updates.into_iter(), &db_dir);
}
});
Self {
update_tx: Some(update_tx),
handle: Some(handle),
}
}
pub fn batch_update<'a>(&self, updates: impl Iterator<Item = (i32, &'a comp::Stats)>) {
let updates = updates
.map(|(id, stats)| (id, StatsUpdate::from(stats)))
.collect();
if let Err(err) = self.update_tx.as_ref().unwrap().send(updates) {
log::error!("Could not send stats updates: {:?}", err);
}
}
pub fn update(&self, character_id: i32, stats: &comp::Stats) {
self.batch_update(std::iter::once((character_id, stats)));
}
}
impl Drop for Updater {
fn drop(&mut self) {
drop(self.update_tx.take());
if let Err(err) = self.handle.take().unwrap().join() {
log::error!("Error from joining stats update thread: {:?}", err);
}
}
}

View File

@ -169,7 +169,10 @@ impl StateExt for State {
character_id, character_id,
&server_settings.persistence_db_dir, &server_settings.persistence_db_dir,
) { ) {
Ok(stats) => self.write_component(entity, stats), Ok((stats, inventory)) => {
self.write_component(entity, stats);
self.write_component(entity, inventory);
},
Err(error) => { Err(error) => {
log::warn!( log::warn!(
"{}", "{}",
@ -206,7 +209,6 @@ impl StateExt for State {
self.write_component(entity, comp::Gravity(1.0)); self.write_component(entity, comp::Gravity(1.0));
self.write_component(entity, comp::CharacterState::default()); self.write_component(entity, comp::CharacterState::default());
self.write_component(entity, comp::Alignment::Owned(entity)); self.write_component(entity, comp::Alignment::Owned(entity));
self.write_component(entity, comp::Inventory::default());
self.write_component( self.write_component(
entity, entity,
comp::InventoryUpdate::new(comp::InventoryUpdateEvent::default()), comp::InventoryUpdate::new(comp::InventoryUpdateEvent::default()),

View File

@ -22,8 +22,8 @@ pub type TerrainTimer = SysTimer<terrain::Sys>;
pub type TerrainSyncTimer = SysTimer<terrain_sync::Sys>; pub type TerrainSyncTimer = SysTimer<terrain_sync::Sys>;
pub type WaypointTimer = SysTimer<waypoint::Sys>; pub type WaypointTimer = SysTimer<waypoint::Sys>;
pub type SpeechBubbleTimer = SysTimer<speech_bubble::Sys>; pub type SpeechBubbleTimer = SysTimer<speech_bubble::Sys>;
pub type StatsPersistenceTimer = SysTimer<persistence::stats::Sys>; pub type PersistenceTimer = SysTimer<persistence::Sys>;
pub type StatsPersistenceScheduler = SysScheduler<persistence::stats::Sys>; pub type PersistenceScheduler = SysScheduler<persistence::Sys>;
// System names // System names
// Note: commented names may be useful in the future // Note: commented names may be useful in the future
@ -34,13 +34,13 @@ pub type StatsPersistenceScheduler = SysScheduler<persistence::stats::Sys>;
const TERRAIN_SYS: &str = "server_terrain_sys"; const TERRAIN_SYS: &str = "server_terrain_sys";
const WAYPOINT_SYS: &str = "waypoint_sys"; const WAYPOINT_SYS: &str = "waypoint_sys";
const SPEECH_BUBBLE_SYS: &str = "speech_bubble_sys"; const SPEECH_BUBBLE_SYS: &str = "speech_bubble_sys";
const STATS_PERSISTENCE_SYS: &str = "stats_persistence_sys"; const PERSISTENCE_SYS: &str = "persistence_sys";
pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
dispatch_builder.add(terrain::Sys, TERRAIN_SYS, &[]); dispatch_builder.add(terrain::Sys, TERRAIN_SYS, &[]);
dispatch_builder.add(waypoint::Sys, WAYPOINT_SYS, &[]); dispatch_builder.add(waypoint::Sys, WAYPOINT_SYS, &[]);
dispatch_builder.add(speech_bubble::Sys, SPEECH_BUBBLE_SYS, &[]); dispatch_builder.add(speech_bubble::Sys, SPEECH_BUBBLE_SYS, &[]);
dispatch_builder.add(persistence::stats::Sys, STATS_PERSISTENCE_SYS, &[]); dispatch_builder.add(persistence::Sys, PERSISTENCE_SYS, &[]);
} }
pub fn run_sync_systems(ecs: &mut specs::World) { pub fn run_sync_systems(ecs: &mut specs::World) {

View File

@ -1,8 +1,8 @@
use crate::{ use crate::{
persistence::stats, persistence::character,
sys::{SysScheduler, SysTimer}, sys::{SysScheduler, SysTimer},
}; };
use common::comp::{Player, Stats}; use common::comp::{Inventory, Player, Stats};
use specs::{Join, ReadExpect, ReadStorage, System, Write}; use specs::{Join, ReadExpect, ReadStorage, System, Write};
pub struct Sys; pub struct Sys;
@ -11,21 +11,24 @@ impl<'a> System<'a> for Sys {
type SystemData = ( type SystemData = (
ReadStorage<'a, Player>, ReadStorage<'a, Player>,
ReadStorage<'a, Stats>, ReadStorage<'a, Stats>,
ReadExpect<'a, stats::Updater>, ReadStorage<'a, Inventory>,
ReadExpect<'a, character::CharacterUpdater>,
Write<'a, SysScheduler<Self>>, Write<'a, SysScheduler<Self>>,
Write<'a, SysTimer<Self>>, Write<'a, SysTimer<Self>>,
); );
fn run( fn run(
&mut self, &mut self,
(players, player_stats, updater, mut scheduler, mut timer): Self::SystemData, (players, player_stats, player_inventories, updater, mut scheduler, mut timer): Self::SystemData,
) { ) {
if scheduler.should_run() { if scheduler.should_run() {
timer.start(); timer.start();
updater.batch_update( updater.batch_update(
(&players, &player_stats) (&players, &player_stats, &player_inventories)
.join() .join()
.filter_map(|(player, stats)| player.character_id.map(|id| (id, stats))), .filter_map(|(player, stats, inventory)| {
player.character_id.map(|id| (id, stats, inventory))
}),
); );
timer.end(); timer.end();
} }

View File

@ -1 +0,0 @@
pub mod stats;