* Replaced diesel with rusqlite and refinery

* Added "migration of migrations" to transfer the data from the __diesel_schema_migrations table to the refinery_schema_history table
* Removed all down migrations as refinery does not support down migrations
* Changed all diesel up migrations to refinery naming format
* Added --sql-log-mode parameter to veloren-server-cli to allow SQL tracing and profiling
* Added /disconnect_all_players admin command
* Added disconnectall CLI command
* Fixes for several potential persistence-related race conditions
This commit is contained in:
Ben Wallis 2021-04-13 22:05:47 +00:00
parent 9ccaec1aca
commit 1de94a9979
94 changed files with 1455 additions and 958 deletions

View File

@ -29,6 +29,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Villagers and guards now spawn with potions, and know how to use them.
- Combat music in dungeons when within range of enemies.
- New Command: "kit", place a set of items into your inventory
- Added --sql-log-mode profile/trace parameter to veloren-server-cli
- Added /disconnect_all_players admin command
- Added disconnectall CLI command
### Changed
@ -50,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed infinite armour values from most admin items
- Item tooltips during trades will now inform the user of what ctrl-click and shift-click do
- International keyboards can now display more key names on Linux and Windows instead of `Unknown`.
- There is now a brief period after a character leaves the world where they cannot rejoin until their data is saved
### Removed

140
Cargo.lock generated
View File

@ -1325,38 +1325,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0b7756d6eb729250618a3693b34b3311b282e12aeeee7970ae2a70997c03eb6"
[[package]]
name = "diesel"
version = "1.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "047bfc4d5c3bd2ef6ca6f981941046113524b9a9f9a7cbdfdd7ff40f58e6f542"
dependencies = [
"byteorder",
"diesel_derives",
"libsqlite3-sys",
]
[[package]]
name = "diesel_derives"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3"
dependencies = [
"proc-macro2 1.0.26",
"quote 1.0.9",
"syn 1.0.69",
]
[[package]]
name = "diesel_migrations"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c"
dependencies = [
"migrations_internals",
"migrations_macros",
]
[[package]]
name = "directories-next"
version = "2.0.0"
@ -1559,6 +1527,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fehler"
version = "1.0.0"
@ -2197,6 +2171,15 @@ dependencies = [
"serde",
]
[[package]]
name = "hashlink"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8"
dependencies = [
"hashbrown",
]
[[package]]
name = "heapless"
version = "0.5.6"
@ -2688,9 +2671,9 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]]
name = "libsqlite3-sys"
version = "0.18.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e704a02bcaecd4a08b93a23f6be59d0bd79cd161e0963e9499165a0a35df7bd"
checksum = "64d31059f22935e6c31830db5249ba2b7ecd54fd73a9909286f0a67aa55c2fbd"
dependencies = [
"cc",
"pkg-config",
@ -2879,27 +2862,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "migrations_internals"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b4fc84e4af020b837029e017966f86a1c2d5e83e64b589963d5047525995860"
dependencies = [
"diesel",
]
[[package]]
name = "migrations_macros"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9753f12909fd8d923f75ae5c3258cae1ed3c8ec052e1b38c93c21a6d157f789c"
dependencies = [
"migrations_internals",
"proc-macro2 1.0.26",
"quote 1.0.9",
"syn 1.0.69",
]
[[package]]
name = "minifb"
version = "0.19.1"
@ -4108,6 +4070,47 @@ dependencies = [
"redox_syscall 0.2.5",
]
[[package]]
name = "refinery"
version = "0.5.0"
source = "git+https://gitlab.com/veloren/refinery.git?rev=8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e#8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e"
dependencies = [
"refinery-core",
"refinery-macros",
]
[[package]]
name = "refinery-core"
version = "0.5.0"
source = "git+https://gitlab.com/veloren/refinery.git?rev=8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e#8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e"
dependencies = [
"async-trait",
"cfg-if 1.0.0",
"chrono",
"lazy_static",
"log",
"regex",
"rusqlite",
"serde",
"siphasher",
"thiserror",
"toml",
"url",
"walkdir 2.3.2",
]
[[package]]
name = "refinery-macros"
version = "0.5.0"
source = "git+https://gitlab.com/veloren/refinery.git?rev=8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e#8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e"
dependencies = [
"proc-macro2 1.0.26",
"quote 1.0.9",
"refinery-core",
"regex",
"syn 1.0.69",
]
[[package]]
name = "regalloc"
version = "0.0.31"
@ -4210,6 +4213,22 @@ version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84348444bd7ad45729d0c49a4240d7cdc11c9d512c06c5ad1835c1ad4acda6db"
[[package]]
name = "rusqlite"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38ee71cbab2c827ec0ac24e76f82eca723cee92c509a65f67dee393c25112"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"lazy_static",
"libsqlite3-sys",
"memchr",
"smallvec",
]
[[package]]
name = "rust-argon2"
version = "0.8.3"
@ -4621,6 +4640,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa8f3741c7372e75519bd9346068370c9cdaabcc1f9599cbcf2a2719352286b7"
[[package]]
name = "slab"
version = "0.4.2"
@ -5619,14 +5644,11 @@ dependencies = [
"authc",
"chrono",
"crossbeam-channel",
"diesel",
"diesel_migrations",
"dotenv",
"futures-util",
"hashbrown",
"itertools 0.10.0",
"lazy_static",
"libsqlite3-sys",
"num_cpus",
"portpicker",
"prometheus",
@ -5634,7 +5656,9 @@ dependencies = [
"rand 0.8.3",
"rand_distr",
"rayon",
"refinery",
"ron",
"rusqlite",
"scan_fmt",
"serde",
"serde_json",

View File

@ -4,6 +4,9 @@ const VELOREN_USERDATA_ENV: &str = "VELOREN_USERDATA";
// TODO: consider expanding this to a general install strategy variable that is
// also used for finding assets
// TODO: Ensure there are no NUL (\0) characters in userdata_dir (possible on
// MacOS but not Windows or Linux) as SQLite requires the database path does not
// include this character.
/// # `VELOREN_USERDATA_STRATEGY` environment variable
/// Read during compilation
/// Useful to set when compiling for distribution

View File

@ -47,6 +47,7 @@ pub enum ChatCommand {
BuildAreaRemove,
Campfire,
DebugColumn,
DisconnectAllPlayers,
DropAll,
Dummy,
Explosion,
@ -108,6 +109,7 @@ pub static CHAT_COMMANDS: &[ChatCommand] = &[
ChatCommand::BuildAreaRemove,
ChatCommand::Campfire,
ChatCommand::DebugColumn,
ChatCommand::DisconnectAllPlayers,
ChatCommand::DropAll,
ChatCommand::Dummy,
ChatCommand::Explosion,
@ -293,6 +295,11 @@ impl ChatCommand {
"Prints some debug information about a column",
NoAdmin,
),
ChatCommand::DisconnectAllPlayers => cmd(
vec![Any("confirm", Required)],
"Disconnects all players from the server",
Admin,
),
ChatCommand::DropAll => cmd(vec![], "Drops all your items on the ground", Admin),
ChatCommand::Dummy => cmd(vec![], "Spawns a training dummy", Admin),
ChatCommand::Explosion => cmd(
@ -538,6 +545,7 @@ impl ChatCommand {
ChatCommand::BuildAreaRemove => "build_area_remove",
ChatCommand::Campfire => "campfire",
ChatCommand::DebugColumn => "debug_column",
ChatCommand::DisconnectAllPlayers => "disconnect_all_players",
ChatCommand::DropAll => "dropall",
ChatCommand::Dummy => "dummy",
ChatCommand::Explosion => "explosion",

View File

@ -135,6 +135,7 @@ pub enum ServerEvent {
},
CreateWaypoint(Vec3<f32>),
ClientDisconnect(EcsEntity),
ClientDisconnectWithoutPersistence(EcsEntity),
ChunkRequest(EcsEntity, Vec2<i32>),
ChatCmd(EcsEntity, String),
/// Send a chat message to the player from an npc or other player

View File

@ -1,4 +1,5 @@
use core::time::Duration;
use server::persistence::SqlLogMode;
use std::sync::mpsc::Sender;
use tracing::{error, info, warn};
@ -10,6 +11,8 @@ pub enum Message {
AddAdmin(String),
RemoveAdmin(String),
LoadArea(u32),
SetSqlLogMode(SqlLogMode),
DisconnectAllClients,
}
struct Command<'a> {
@ -22,13 +25,13 @@ struct Command<'a> {
}
// TODO: maybe we could be using clap here?
const COMMANDS: [Command; 6] = [
const COMMANDS: [Command; 8] = [
Command {
name: "quit",
description: "Closes the server",
split_spaces: true,
args: 0,
cmd: |_, sender| sender.send(Message::Quit).unwrap(),
cmd: |_, sender| send(sender, Message::Quit),
},
Command {
name: "shutdown",
@ -38,16 +41,21 @@ const COMMANDS: [Command; 6] = [
args: 1,
cmd: |args, sender| {
if let Ok(grace_period) = args.first().unwrap().parse::<u64>() {
sender
.send(Message::Shutdown {
send(sender, Message::Shutdown {
grace_period: Duration::from_secs(grace_period),
})
.unwrap()
} else {
error!("Grace period must an integer")
}
},
},
Command {
name: "disconnectall",
description: "Disconnects all connected clients",
split_spaces: true,
args: 0,
cmd: |_, sender| send(sender, Message::DisconnectAllClients),
},
Command {
name: "loadarea",
description: "Loads up the chunks in a random area and adds a entity that mimics a player \
@ -56,7 +64,7 @@ const COMMANDS: [Command; 6] = [
args: 1,
cmd: |args, sender| {
if let Ok(view_distance) = args.first().unwrap().parse::<u32>() {
sender.send(Message::LoadArea(view_distance)).unwrap();
send(sender, Message::LoadArea(view_distance));
} else {
error!("View distance must be an integer");
}
@ -67,7 +75,7 @@ const COMMANDS: [Command; 6] = [
description: "Aborts a shutdown if one is in progress",
split_spaces: false,
args: 0,
cmd: |_, sender| sender.send(Message::AbortShutdown).unwrap(),
cmd: |_, sender| send(sender, Message::AbortShutdown),
},
Command {
name: "admin",
@ -76,15 +84,38 @@ const COMMANDS: [Command; 6] = [
args: 2,
cmd: |args, sender| match args.get(..2) {
Some([op, username]) if op == "add" => {
sender.send(Message::AddAdmin(username.clone())).unwrap()
send(sender, Message::AddAdmin(username.clone()));
},
Some([op, username]) if op == "remove" => {
sender.send(Message::RemoveAdmin(username.clone())).unwrap()
send(sender, Message::RemoveAdmin(username.clone()));
},
Some(_) => error!("First arg must be add or remove"),
_ => error!("Not enough args, should be unreachable"),
},
},
Command {
name: "sqllog",
description: "Sets the SQL logging mode, valid values are off, trace and profile",
split_spaces: true,
args: 1,
cmd: |args, sender| match args.get(0) {
Some(arg) => {
let sql_log_mode = match arg.to_lowercase().as_str() {
"off" => Some(SqlLogMode::Disabled),
"profile" => Some(SqlLogMode::Profile),
"trace" => Some(SqlLogMode::Trace),
_ => None,
};
if let Some(sql_log_mode) = sql_log_mode {
send(sender, Message::SetSqlLogMode(sql_log_mode));
} else {
error!("Invalid SQL log mode");
}
},
_ => error!("Not enough args"),
},
},
Command {
name: "help",
description: "List all command available",
@ -100,6 +131,12 @@ const COMMANDS: [Command; 6] = [
},
];
fn send(sender: &mut Sender<Message>, message: Message) {
sender
.send(message)
.unwrap_or_else(|err| error!("Failed to send CLI message, err: {:?}", err));
}
pub fn parse_command(input: &str, msg_s: &mut Sender<Message>) {
let mut args = input.split_whitespace();

View File

@ -17,7 +17,10 @@ use clap::{App, Arg, SubCommand};
use common::clock::Clock;
use common_base::span;
use core::sync::atomic::{AtomicUsize, Ordering};
use server::{Event, Input, Server};
use server::{
persistence::{DatabaseSettings, SqlLogMode},
Event, Input, Server,
};
use std::{
io,
sync::{atomic::AtomicBool, mpsc, Arc},
@ -48,6 +51,11 @@ fn main() -> io::Result<()> {
Arg::with_name("no-auth")
.long("no-auth")
.help("Runs without auth enabled"),
Arg::with_name("sql-log-mode")
.long("sql-log-mode")
.help("Enables SQL logging, valid values are \"trace\" and \"profile\"")
.possible_values(&["trace", "profile"])
.takes_value(true)
])
.subcommand(
SubCommand::with_name("admin")
@ -78,6 +86,12 @@ fn main() -> io::Result<()> {
let noninteractive = matches.is_present("non-interactive");
let no_auth = matches.is_present("no-auth");
let sql_log_mode = match matches.value_of("sql-log-mode") {
Some("trace") => SqlLogMode::Trace,
Some("profile") => SqlLogMode::Profile,
_ => SqlLogMode::Disabled,
};
// noninteractive implies basic
let basic = basic || noninteractive;
@ -118,6 +132,15 @@ fn main() -> io::Result<()> {
// Load server settings
let mut server_settings = server::Settings::load(&server_data_dir);
let mut editable_settings = server::EditableSettings::load(&server_data_dir);
// Relative to data_dir
const PERSISTENCE_DB_DIR: &str = "saves";
let database_settings = DatabaseSettings {
db_dir: server_data_dir.join(PERSISTENCE_DB_DIR),
sql_log_mode,
};
#[allow(clippy::single_match)] // Note: remove this when there are more subcommands
match matches.subcommand() {
("admin", Some(sub_m)) => {
@ -157,6 +180,7 @@ fn main() -> io::Result<()> {
let mut server = Server::new(
server_settings,
editable_settings,
database_settings,
&server_data_dir,
runtime,
)
@ -226,6 +250,12 @@ fn main() -> io::Result<()> {
Message::LoadArea(view_distance) => {
server.create_centered_persister(view_distance);
},
Message::SetSqlLogMode(sql_log_mode) => {
server.set_sql_log_mode(sql_log_mode);
},
Message::DisconnectAllClients => {
server.disconnect_all_clients();
},
},
Err(mpsc::TryRecvError::Empty) | Err(mpsc::TryRecvError::Disconnected) => {},
}

View File

@ -44,12 +44,12 @@ crossbeam-channel = "0.5"
prometheus = { version = "0.12", default-features = false}
portpicker = { git = "https://github.com/xMAC94x/portpicker-rs", rev = "df6b37872f3586ac3b21d08b56c8ec7cd92fb172" }
authc = { git = "https://gitlab.com/veloren/auth.git", rev = "fb3dcbc4962b367253f8f2f92760ef44d2679c9a" }
libsqlite3-sys = { version = "0.18", features = ["bundled"] }
diesel = { version = "1.4.3", features = ["sqlite"] }
diesel_migrations = "1.4.0"
dotenv = "0.15.0"
slab = "0.4"
rand_distr = "0.4.0"
rusqlite = { version = "0.24.2", features = ["array", "vtab", "bundled", "trace"] }
refinery = { git = "https://gitlab.com/veloren/refinery.git", rev = "8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e", features = ["rusqlite"] }
# Plugins
plugin-api = { package = "veloren-plugin-api", path = "../plugin/api"}

View File

@ -45,7 +45,7 @@ use world::util::Sampler;
use crate::{client::Client, login_provider::LoginProvider};
use scan_fmt::{scan_fmt, scan_fmt_some};
use tracing::error;
use tracing::{error, info, warn};
pub trait ChatCommandExt {
fn execute(&self, server: &mut Server, entity: EcsEntity, args: String);
@ -102,6 +102,7 @@ fn get_handler(cmd: &ChatCommand) -> CommandHandler {
ChatCommand::BuildAreaRemove => handle_build_area_remove,
ChatCommand::Campfire => handle_spawn_campfire,
ChatCommand::DebugColumn => handle_debug_column,
ChatCommand::DisconnectAllPlayers => handle_disconnect_all_players,
ChatCommand::DropAll => handle_drop_all,
ChatCommand::Dummy => handle_spawn_training_dummy,
ChatCommand::Explosion => handle_explosion,
@ -2118,6 +2119,46 @@ spawn_rate {:?} "#,
}
}
fn handle_disconnect_all_players(
server: &mut Server,
client: EcsEntity,
_target: EcsEntity,
args: String,
_action: &ChatCommand,
) -> CmdResult<()> {
if args != *"confirm" {
return Err(
"Please run the command again with the second argument of \"confirm\" to confirm that \
you really want to disconnect all players from the server"
.to_string(),
);
}
let ecs = server.state.ecs();
let players = &ecs.read_storage::<comp::Player>();
// TODO: This logging and verification of admin commands would be better moved
// to a more generic method used for auditing -all- admin commands.
let player_name;
if let Some(player) = players.get(client) {
player_name = &*player.alias;
} else {
warn!(
"Failed to get player name for admin who used /disconnect_all_players - ignoring \
command."
);
return Err("You do not exist, so you cannot use this command".to_string());
}
info!(
"Disconnecting all clients due to admin command from {}",
player_name
);
server.disconnect_all_clients_requested = true;
Ok(())
}
fn handle_skill_point(
server: &mut Server,
_client: EcsEntity,

View File

@ -1,5 +1,5 @@
use crate::persistence::error::PersistenceError;
use network::{NetworkError, ParticipantError, StreamError};
use std::fmt::{self, Display};
#[derive(Debug)]
@ -7,7 +7,8 @@ pub enum Error {
NetworkErr(NetworkError),
ParticipantErr(ParticipantError),
StreamErr(StreamError),
DatabaseErr(diesel::result::Error),
DatabaseErr(rusqlite::Error),
PersistenceErr(PersistenceError),
Other(String),
}
@ -23,8 +24,13 @@ impl From<StreamError> for Error {
fn from(err: StreamError) -> Self { Error::StreamErr(err) }
}
impl From<diesel::result::Error> for Error {
fn from(err: diesel::result::Error) -> Self { Error::DatabaseErr(err) }
// TODO: Don't expose rusqlite::Error from persistence module
impl From<rusqlite::Error> for Error {
fn from(err: rusqlite::Error) -> Self { Error::DatabaseErr(err) }
}
impl From<PersistenceError> for Error {
fn from(err: PersistenceError) -> Self { Error::PersistenceErr(err) }
}
impl Display for Error {
@ -34,6 +40,7 @@ impl Display for Error {
Self::ParticipantErr(err) => write!(f, "Participant Error: {}", err),
Self::StreamErr(err) => write!(f, "Stream Error: {}", err),
Self::DatabaseErr(err) => write!(f, "Database Error: {}", err),
Self::PersistenceErr(err) => write!(f, "Persistence Error: {}", err),
Self::Other(err) => write!(f, "Error: {}", err),
}
}

View File

@ -174,7 +174,10 @@ impl Server {
} => handle_create_ship(self, pos, ship, mountable, agent, rtsim_entity),
ServerEvent::CreateWaypoint(pos) => handle_create_waypoint(self, pos),
ServerEvent::ClientDisconnect(entity) => {
frontend_events.push(handle_client_disconnect(self, entity))
frontend_events.push(handle_client_disconnect(self, entity, false))
},
ServerEvent::ClientDisconnectWithoutPersistence(entity) => {
frontend_events.push(handle_client_disconnect(self, entity, true))
},
ServerEvent::ChunkRequest(entity, key) => {

View File

@ -1,5 +1,8 @@
use super::Event;
use crate::{client::Client, persistence, presence::Presence, state_ext::StateExt, Server};
use crate::{
client::Client, persistence::character_updater::CharacterUpdater, presence::Presence,
state_ext::StateExt, Server,
};
use common::{
comp,
comp::group,
@ -93,7 +96,11 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
}
}
pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event {
pub fn handle_client_disconnect(
server: &mut Server,
mut entity: EcsEntity,
skip_persistence: bool,
) -> Event {
span!(_guard, "handle_client_disconnect");
if let Some(client) = server
.state()
@ -150,30 +157,45 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
}
// Sync the player's character data to the database
let entity = persist_entity(state, entity);
if !skip_persistence {
entity = persist_entity(state, entity);
}
// Delete client entity
if let Err(e) = state.delete_entity_recorded(entity) {
if let Err(e) = server.state.delete_entity_recorded(entity) {
error!(?e, ?entity, "Failed to delete disconnected client");
}
Event::ClientDisconnected { entity }
}
// When a player logs out, their data is queued for persistence in the next tick
// of the persistence batch update. The player will be
// temporarily unable to log in during this period to avoid
// the race condition of their login fetching their old data
// and overwriting the data saved here.
fn persist_entity(state: &mut State, entity: EcsEntity) -> EcsEntity {
if let (Some(presences), Some(stats), Some(inventory), updater) = (
if let (Some(presence), Some(stats), Some(inventory), mut character_updater) = (
state.read_storage::<Presence>().get(entity),
state.read_storage::<comp::Stats>().get(entity),
state.read_storage::<comp::Inventory>().get(entity),
state
.ecs()
.read_resource::<persistence::character_updater::CharacterUpdater>(),
state.ecs().fetch_mut::<CharacterUpdater>(),
) {
if let PresenceKind::Character(character_id) = presences.kind {
let waypoint_read = state.read_storage::<comp::Waypoint>();
let waypoint = waypoint_read.get(entity);
updater.update(character_id, stats, inventory, waypoint);
}
match presence.kind {
PresenceKind::Character(char_id) => {
let waypoint = state
.ecs()
.read_storage::<common::comp::Waypoint>()
.get(entity)
.cloned();
character_updater.add_pending_logout_update(
char_id,
(stats.clone(), inventory.clone(), waypoint),
);
},
PresenceKind::Spectator => { /* Do nothing, spectators do not need persisting */ },
};
}
entity

View File

@ -2,6 +2,7 @@
#![allow(clippy::option_map_unit_fn)]
#![deny(clippy::clone_on_ref_ptr)]
#![feature(
box_patterns,
label_break_value,
bool_to_option,
drain_filter,
@ -101,15 +102,17 @@ use tokio::{runtime::Runtime, sync::Notify};
use tracing::{debug, error, info, trace};
use vek::*;
use crate::{
persistence::{DatabaseSettings, SqlLogMode},
sys::terrain,
};
use std::sync::RwLock;
#[cfg(feature = "worldgen")]
use world::{
sim::{FileOpts, WorldOpts, DEFAULT_WORLD_MAP},
IndexOwned, World,
};
#[macro_use] extern crate diesel;
#[macro_use] extern crate diesel_migrations;
#[derive(Copy, Clone)]
struct SpawnPoint(Vec3<f32>);
@ -124,6 +127,12 @@ pub struct HwStats {
rayon_threads: u32,
}
#[derive(Clone, Copy, PartialEq)]
enum DisconnectType {
WithPersistence,
WithoutPersistence,
}
// Start of Tick, used for metrics
#[derive(Copy, Clone)]
pub struct TickStart(Instant);
@ -139,6 +148,8 @@ pub struct Server {
runtime: Arc<Runtime>,
metrics_shutdown: Arc<Notify>,
database_settings: Arc<RwLock<DatabaseSettings>>,
disconnect_all_clients_requested: bool,
}
impl Server {
@ -148,6 +159,7 @@ impl Server {
pub fn new(
settings: Settings,
editable_settings: EditableSettings,
database_settings: DatabaseSettings,
data_dir: &std::path::Path,
runtime: Arc<Runtime>,
) -> Result<Self, Error> {
@ -156,15 +168,11 @@ impl Server {
info!("Authentication is disabled");
}
// Relative to data_dir
const PERSISTENCE_DB_DIR: &str = "saves";
let persistence_db_dir = data_dir.join(PERSISTENCE_DB_DIR);
// Run pending DB migrations (if any)
debug!("Running DB migrations...");
if let Some(e) = persistence::run_migrations(&persistence_db_dir).err() {
panic!("Migration error: {:?}", e);
}
persistence::run_migrations(&database_settings);
let database_settings = Arc::new(RwLock::new(database_settings));
let registry = Arc::new(Registry::new());
let chunk_gen_metrics = metrics::ChunkGenMetrics::new(&registry).unwrap();
@ -203,9 +211,10 @@ impl Server {
state
.ecs_mut()
.insert(ChunkGenerator::new(chunk_gen_metrics));
state
.ecs_mut()
.insert(CharacterUpdater::new(&persistence_db_dir)?);
state.ecs_mut().insert(CharacterUpdater::new(
Arc::<RwLock<DatabaseSettings>>::clone(&database_settings),
)?);
let ability_map = comp::item::tool::AbilityMap::<CharacterAbility>::load_expect_cloned(
"common.abilities.weapon_ability_manifest",
@ -215,9 +224,9 @@ impl Server {
let msm = comp::inventory::item::MaterialStatManifest::default();
state.ecs_mut().insert(msm);
state
.ecs_mut()
.insert(CharacterLoader::new(&persistence_db_dir)?);
state.ecs_mut().insert(CharacterLoader::new(
Arc::<RwLock<DatabaseSettings>>::clone(&database_settings),
)?);
// System schedulers to control execution of systems
state
@ -386,6 +395,8 @@ impl Server {
runtime,
metrics_shutdown,
database_settings,
disconnect_all_clients_requested: false,
};
debug!(?settings, "created veloren server with");
@ -506,6 +517,10 @@ impl Server {
let before_handle_events = Instant::now();
// Process any pending request to disconnect all clients, the disconnections
// will be processed once handle_events() is called below
let disconnect_type = self.disconnect_all_clients_if_requested();
// Handle game events
frontend_events.append(&mut self.handle_events());
@ -530,6 +545,14 @@ impl Server {
let before_entity_cleanup = Instant::now();
// In the event of a request to disconnect all players without persistence, we
// must run the terrain system a second time after the messages to
// perform client disconnections have been processed. This ensures that any
// items on the ground are deleted.
if let Some(DisconnectType::WithoutPersistence) = disconnect_type {
run_now::<terrain::Sys>(self.state.ecs_mut());
}
// Remove NPCs that are outside the view distances of all players
// This is done by removing NPCs in unloaded chunks
let to_delete = {
@ -573,6 +596,17 @@ impl Server {
}
}
if let Some(DisconnectType::WithoutPersistence) = disconnect_type {
info!(
"Disconnection of all players without persistence complete, signalling to \
persistence thread that character updates may continue to be processed"
);
self.state
.ecs()
.fetch_mut::<CharacterUpdater>()
.disconnected_success();
}
// 7 Persistence updates
let before_persistence_updates = Instant::now();
@ -773,6 +807,58 @@ impl Server {
Ok(Some(entity))
}
/// Disconnects all clients if requested by either an admin command or
/// due to a persistence transaction failure and returns the processed
/// DisconnectionType
fn disconnect_all_clients_if_requested(&mut self) -> Option<DisconnectType> {
let mut character_updater = self.state.ecs().fetch_mut::<CharacterUpdater>();
let disconnect_type = self.get_disconnect_all_clients_requested(&mut character_updater);
if let Some(disconnect_type) = disconnect_type {
let with_persistence = disconnect_type == DisconnectType::WithPersistence;
let clients = self.state.ecs().read_storage::<Client>();
let entities = self.state.ecs().entities();
info!(
"Disconnecting all clients ({} persistence) as requested",
if with_persistence { "with" } else { "without" }
);
for (_, entity) in (&clients, &entities).join() {
info!("Emitting client disconnect event for entity: {:?}", entity);
let event = if with_persistence {
ServerEvent::ClientDisconnect(entity)
} else {
ServerEvent::ClientDisconnectWithoutPersistence(entity)
};
self.state
.ecs()
.read_resource::<EventBus<ServerEvent>>()
.emitter()
.emit(event);
}
self.disconnect_all_clients_requested = false;
}
disconnect_type
}
fn get_disconnect_all_clients_requested(
&self,
character_updater: &mut CharacterUpdater,
) -> Option<DisconnectType> {
let without_persistence_requested = character_updater.disconnect_all_clients_requested();
let with_persistence_requested = self.disconnect_all_clients_requested;
if without_persistence_requested {
return Some(DisconnectType::WithoutPersistence);
};
if with_persistence_requested {
return Some(DisconnectType::WithPersistence);
};
None
}
/// Handle new client connections.
fn handle_new_connections(&mut self, frontend_events: &mut Vec<Event>) {
while let Ok(sender) = self.connection_handler.info_requester_receiver.try_recv() {
@ -1009,6 +1095,26 @@ impl Server {
.create_persister(pos, view_distance, &self.world, &self.index)
.build();
}
/// Sets the SQL log mode at runtime
pub fn set_sql_log_mode(&mut self, sql_log_mode: SqlLogMode) {
// Unwrap is safe here because we only perform a variable assignment with the
// RwLock taken meaning that no panic can occur that would cause the
// RwLock to become poisoned. This justification also means that calling
// unwrap() on the associated read() calls for this RwLock is also safe
// as long as no code that can panic is introduced here.
let mut database_settings = self.database_settings.write().unwrap();
database_settings.sql_log_mode = sql_log_mode;
// Drop the RwLockWriteGuard to avoid performing unnecessary actions (logging)
// with the lock taken.
drop(database_settings);
info!("SQL log mode changed to {:?}", sql_log_mode);
}
pub fn disconnect_all_clients(&mut self) {
info!("Disconnecting all clients due to local console command");
self.disconnect_all_clients_requested = true;
}
}
impl Drop for Server {

View File

@ -1 +0,0 @@
DROP TABLE IF EXISTS "character";

View File

@ -1 +0,0 @@
DROP TABLE IF EXISTS "body";

View File

@ -1 +0,0 @@
DROP TABLE IF EXISTS "stats";

View File

@ -1,46 +0,0 @@
-- SQLITE v < 3.25 does not support renaming columns.
ALTER TABLE
body RENAME TO body_tmp;
CREATE TABLE IF NOT EXISTS body (
character_id INT NOT NULL PRIMARY KEY,
race SMALLINT NOT NULL,
body_type SMALLINT NOT NULL,
hair_style SMALLINT NOT NULL,
beard SMALLINT NOT NULL,
eyebrows SMALLINT NOT NULL,
accessory SMALLINT NOT NULL,
hair_color SMALLINT NOT NULL,
skin SMALLINT NOT NULL,
eye_color SMALLINT NOT NULL,
FOREIGN KEY(character_id) REFERENCES "character"(id) ON DELETE CASCADE
);
INSERT INTO
body(
character_id,
race,
body_type,
hair_style,
beard,
eyebrows,
accessory,
hair_color,
skin,
eye_color
)
SELECT
character_id,
species,
body_type,
hair_style,
beard,
eyes,
accessory,
hair_color,
skin,
eye_color
FROM
body_tmp;
DROP TABLE body_tmp;

View File

@ -1 +0,0 @@
DROP TABLE IF EXISTS "inventory";

View File

@ -1 +0,0 @@
DROP TABLE IF EXISTS "loadout";

View File

@ -1,41 +0,0 @@
-- This migration downgrades the capacity of existing player inventories from 36 to 18. ITEMS WILL BE REMOVED.
UPDATE
inventory
SET
items = json_object(
'amount',
(
SELECT
json_extract(items, '$.amount')
from
inventory
),
'slots',
json_remove(
(
SELECT
json_extract(items, '$.slots')
from
inventory
),
'$[35]',
'$[34]',
'$[33]',
'$[32]',
'$[31]',
'$[30]',
'$[29]',
'$[28]',
'$[27]',
'$[26]',
'$[25]',
'$[25]',
'$[24]',
'$[23]',
'$[22]',
'$[21]',
'$[20]',
'$[19]',
'$[18]'
)
);

View File

@ -1,22 +0,0 @@
PRAGMA foreign_keys=off;
-- SQLite does not support removing columns from tables so we must rename the current table,
-- recreate the previous version of the table, then copy over the data from the renamed table
ALTER TABLE stats RENAME TO _stats_old;
CREATE TABLE "stats" (
character_id INT NOT NULL PRIMARY KEY,
level INT NOT NULL DEFAULT 1,
exp INT NOT NULL DEFAULT 0,
endurance INT NOT NULL DEFAULT 0,
fitness INT NOT NULL DEFAULT 0,
willpower INT NOT NULL DEFAULT 0,
FOREIGN KEY(character_id) REFERENCES "character"(id) ON DELETE CASCADE
);
INSERT INTO "stats" (character_id, level, exp, endurance, fitness, willpower)
SELECT character_id, level, exp, endurance, fitness, willpower FROM _stats_old;
DROP TABLE _stats_old;
PRAGMA foreign_keys=on;

View File

@ -1 +0,0 @@
-- Nothing to undo since up.sql only creates missing inventory/loadout records

View File

@ -1 +0,0 @@
-- Nothing to undo since up.sql only fixes corrupt JSON in loadouts

View File

@ -1 +0,0 @@
-- No down action for this migration

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1,11 +0,0 @@
DROP TABLE stats;
DROP TABLE character;
DROP TABLE body;
DROP TABLE item;
DROP TABLE entity;
ALTER TABLE _body_bak RENAME TO body;
ALTER TABLE _stats_bak RENAME TO stats;
ALTER TABLE _character_bak RENAME TO character;
ALTER TABLE _loadout_bak RENAME TO loadout;
ALTER TABLE _inventory_bak RENAME TO inventory;

View File

@ -1,6 +0,0 @@
-- This file should undo anything in `up.sql`
UPDATE item
SET item_definition_id = 'common.items.weapons.staff.sceptre_velorite_0' WHERE item_definition_id = 'common.items.weapons.sceptre.sceptre_velorite_0';
UPDATE item
SET item_definition_id = 'common.items.weapons.staff.staff_nature' WHERE item_definition_id = 'common.items.weapons.sceptre.staff_nature';

View File

@ -1,6 +0,0 @@
-- This file should undo anything in `up.sql`
UPDATE item
SET item_definition_id = 'common.items.npc_weapons.npcweapon.beast_claws' WHERE item_definition_id = 'common.items.npc_weapons.unique.beast_claws';
UPDATE item
SET item_definition_id = 'common.items.npc_weapons.npcweapon.stone_golems_fist' WHERE item_definition_id = 'common.items.npc_weapons.unique.stone_golems_fist';

View File

@ -1,38 +0,0 @@
-- Put waypoint data back into item table
UPDATE item
SET position = ( SELECT s.waypoint
FROM stats s
WHERE s.stats_id = item.item_id
AND item.item_definition_id = 'veloren.core.pseudo_containers.character'
AND s.waypoint IS NOT NULL)
WHERE EXISTS ( SELECT s.waypoint
FROM stats s
WHERE s.stats_id = item.item_id
AND item.item_definition_id = 'veloren.core.pseudo_containers.character'
AND s.waypoint IS NOT NULL);
-- SQLite does not support dropping columns on tables so the entire table must be
-- dropped and recreated without the 'waypoint' column
CREATE TABLE stats_new
(
stats_id INT NOT NULL
PRIMARY KEY
REFERENCES entity,
level INT NOT NULL,
exp INT NOT NULL,
endurance INT NOT NULL,
fitness INT NOT NULL,
willpower INT NOT NULL
);
INSERT INTO stats_new (stats_id, level, exp, endurance, fitness, willpower)
SELECT stats_id,
level,
exp,
endurance,
fitness,
willpower
FROM stats;
DROP TABLE stats;
ALTER TABLE stats_new RENAME TO stats;

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- What's a down migration?

View File

@ -1,2 +0,0 @@
UPDATE item
SET item_definition_id = 'common.items.weapons.crafting.shiny_gem' WHERE item_definition_id = 'common.items.crafting_ing.diamond';

View File

@ -1,10 +0,0 @@
UPDATE item
SET item_definition_id = 'common.items.armor.starter.glider' WHERE item_definition_id = 'common.items.glider.glider_cloverleaf';
UPDATE item
SET item_definition_id = 'common.items.armor.starter.lantern' WHERE item_definition_id = 'common.items.lantern.black_0';
UPDATE item
SET item_definition_id = 'common.items.armor.starter.rugged_chest' WHERE item_definition_id = 'common.items.armor.chest.rugged';
UPDATE item
SET item_definition_id = 'common.items.armor.starter.rugged_pants' WHERE item_definition_id = 'common.items.armor.pants.rugged';
UPDATE item
SET item_definition_id = 'common.items.armor.starter.sandals_0' WHERE item_definition_id = 'common.items.armor.foot.sandals_0';

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
-- This file should undo anything in `up.sql`

View File

@ -1 +0,0 @@
DATABASE_URL=../../../saves/db.sqlite

View File

@ -4,9 +4,9 @@
//! database updates and loading are communicated via requests to the
//! [`CharacterLoader`] and [`CharacterUpdater`] while results/responses are
//! polled and handled each server tick.
extern crate diesel;
extern crate rusqlite;
use super::{error::Error, models::*, schema, VelorenTransaction};
use super::{error::PersistenceError, models::*};
use crate::{
comp,
comp::{item::MaterialStatManifest, Inventory},
@ -20,14 +20,14 @@ use crate::{
convert_waypoint_to_database_json,
},
character_loader::{CharacterCreationResult, CharacterDataResult, CharacterListResult},
error::Error::DatabaseError,
error::PersistenceError::DatabaseError,
PersistedComponents,
},
};
use common::character::{CharacterId, CharacterItem, MAX_CHARACTERS_PER_PLAYER};
use core::ops::Range;
use diesel::{prelude::*, sql_query, sql_types::BigInt};
use std::{collections::VecDeque, sync::Arc};
use rusqlite::{types::Value, ToSql, Transaction, NO_PARAMS};
use std::{collections::VecDeque, rc::Rc};
use tracing::{error, trace, warn};
/// Private module for very tightly coupled database conversion methods. In
@ -53,15 +53,38 @@ struct CharacterContainers {
/// BFS the inventory/loadout to ensure that each is topologically sorted in the
/// sense required by convert_inventory_from_database_items to support recursive
/// items
pub fn load_items_bfs(connection: VelorenTransaction, root: i64) -> Result<Vec<Item>, Error> {
use schema::item::dsl::*;
pub fn load_items_bfs(
connection: &mut Transaction,
root: i64,
) -> Result<Vec<Item>, PersistenceError> {
let mut items = Vec::new();
let mut queue = VecDeque::new();
queue.push_front(root);
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
SELECT item_id,
parent_container_item_id,
item_definition_id,
stack_size,
position
FROM item
WHERE parent_container_item_id = ?1")?;
while let Some(id) = queue.pop_front() {
let frontier = item
.filter(parent_container_item_id.eq(id))
.load::<Item>(&*connection)?;
let frontier = stmt
.query_map(&[id], |row| {
Ok(Item {
item_id: row.get(0)?,
parent_container_item_id: row.get(1)?,
item_definition_id: row.get(2)?,
stack_size: row.get(3)?,
position: row.get(4)?,
})
})?
.filter_map(Result::ok)
.collect::<Vec<Item>>();
for i in frontier.iter() {
queue.push_back(i.item_id);
}
@ -77,34 +100,53 @@ pub fn load_items_bfs(connection: VelorenTransaction, root: i64) -> Result<Vec<I
pub fn load_character_data(
requesting_player_uuid: String,
char_id: CharacterId,
connection: VelorenTransaction,
connection: &mut Transaction,
msm: &MaterialStatManifest,
) -> CharacterDataResult {
use schema::{body::dsl::*, character::dsl::*, skill_group::dsl::*};
let character_containers = get_pseudo_containers(connection, char_id)?;
let inventory_items = load_items_bfs(connection, character_containers.inventory_container_id)?;
let loadout_items = load_items_bfs(connection, character_containers.loadout_container_id)?;
let character_data = character
.filter(
schema::character::dsl::character_id
.eq(char_id)
.and(player_uuid.eq(requesting_player_uuid)),
)
.first::<Character>(&*connection)?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
SELECT c.character_id,
c.alias,
c.waypoint,
b.variant,
b.body_data
FROM character c
JOIN body b ON (c.character_id = b.body_id)
WHERE c.player_uuid = ?1
AND c.character_id = ?2",
)?;
let char_body = body
.filter(schema::body::dsl::body_id.eq(char_id))
.first::<Body>(&*connection)?;
let (body_data, character_data) = stmt.query_row(
&[requesting_player_uuid.clone(), char_id.to_string()],
|row| {
let character_data = Character {
character_id: row.get(0)?,
player_uuid: requesting_player_uuid,
alias: row.get(1)?,
waypoint: row.get(2)?,
};
let body_data = Body {
body_id: row.get(0)?,
variant: row.get(3)?,
body_data: row.get(4)?,
};
Ok((body_data, character_data))
},
)?;
let char_waypoint = character_data.waypoint.as_ref().and_then(|x| {
match convert_waypoint_from_database_json(&x) {
Ok(w) => Some(w),
Err(e) => {
warn!(
"Error reading waypoint from database for character ID {}, error: {}",
"Error reading waypoint from database for character ID
{}, error: {}",
char_id, e
);
None
@ -112,16 +154,50 @@ pub fn load_character_data(
}
});
let skill_data = schema::skill::dsl::skill
.filter(schema::skill::dsl::entity_id.eq(char_id))
.load::<Skill>(&*connection)?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
SELECT skill,
level
FROM skill
WHERE entity_id = ?1",
)?;
let skill_group_data = skill_group
.filter(schema::skill_group::dsl::entity_id.eq(char_id))
.load::<SkillGroup>(&*connection)?;
let skill_data = stmt
.query_map(&[char_id], |row| {
Ok(Skill {
entity_id: char_id,
skill: row.get(0)?,
level: row.get(1)?,
})
})?
.filter_map(Result::ok)
.collect::<Vec<Skill>>();
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
SELECT skill_group_kind,
exp,
available_sp,
earned_sp
FROM skill_group
WHERE entity_id = ?1",
)?;
let skill_group_data = stmt
.query_map(&[char_id], |row| {
Ok(SkillGroup {
entity_id: char_id,
skill_group_kind: row.get(0)?,
exp: row.get(1)?,
available_sp: row.get(2)?,
earned_sp: row.get(3)?,
})
})?
.filter_map(Result::ok)
.collect::<Vec<SkillGroup>>();
Ok((
convert_body_from_database(&char_body)?,
convert_body_from_database(&body_data)?,
convert_stats_from_database(character_data.alias, &skill_data, &skill_group_data),
convert_inventory_from_database_items(
character_containers.inventory_container_id,
@ -143,24 +219,56 @@ pub fn load_character_data(
/// returned.
pub fn load_character_list(
player_uuid_: &str,
connection: VelorenTransaction,
connection: &mut Transaction,
msm: &MaterialStatManifest,
) -> CharacterListResult {
use schema::{body::dsl::*, character::dsl::*};
let characters;
{
#[rustfmt::skip]
let mut stmt = connection
.prepare_cached("
SELECT character_id,
alias
FROM character
WHERE player_uuid = ?1
ORDER BY character_id")?;
let result = character
.filter(player_uuid.eq(player_uuid_))
.order(schema::character::dsl::character_id.desc())
.load::<Character>(&*connection)?;
result
characters = stmt
.query_map(&[player_uuid_], |row| {
Ok(Character {
character_id: row.get(0)?,
alias: row.get(1)?,
player_uuid: player_uuid_.to_owned(),
waypoint: None, // Not used for character select
})
})?
.map(|x| x.unwrap())
.collect::<Vec<Character>>();
}
characters
.iter()
.map(|character_data| {
let char = convert_character_from_database(character_data);
let char = convert_character_from_database(&character_data);
let db_body = body
.filter(schema::body::dsl::body_id.eq(character_data.character_id))
.first::<Body>(&*connection)?;
let db_body;
{
#[rustfmt::skip]
let mut stmt = connection
.prepare_cached("\
SELECT body_id,\
variant,\
body_data \
FROM body \
WHERE body_id = ?1")?;
db_body = stmt.query_row(&[char.id], |row| {
Ok(Body {
body_id: row.get(0)?,
variant: row.get(1)?,
body_data: row.get(2)?,
})
})?;
}
let char_body = convert_body_from_database(&db_body)?;
@ -188,15 +296,11 @@ pub fn create_character(
uuid: &str,
character_alias: &str,
persisted_components: PersistedComponents,
connection: VelorenTransaction,
connection: &mut Transaction,
msm: &MaterialStatManifest,
) -> CharacterCreationResult {
use schema::item::dsl::*;
check_character_limit(uuid, connection)?;
use schema::{body, character, skill_group};
let (body, stats, inventory, waypoint) = persisted_components;
// Fetch new entity IDs for character, inventory and loadout
@ -230,66 +334,82 @@ pub fn create_character(
position: LOADOUT_PSEUDO_CONTAINER_POSITION.to_owned(),
},
];
let pseudo_container_count = diesel::insert_into(item)
.values(pseudo_containers)
.execute(&*connection)?;
if pseudo_container_count != 3 {
return Err(Error::OtherError(format!(
"Error inserting initial pseudo containers for character id {} (expected 3, actual {})",
character_id, pseudo_container_count
)));
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
INSERT INTO item (item_id,
parent_container_item_id,
item_definition_id,
stack_size,
position)
VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
for pseudo_container in pseudo_containers {
stmt.execute(&[
&pseudo_container.item_id as &dyn ToSql,
&pseudo_container.parent_container_item_id,
&pseudo_container.item_definition_id,
&pseudo_container.stack_size,
&pseudo_container.position,
])?;
}
drop(stmt);
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
INSERT INTO body (body_id,
variant,
body_data)
VALUES (?1, ?2, ?3)")?;
stmt.execute(&[
&character_id as &dyn ToSql,
&"humanoid".to_string(),
&convert_body_to_database_json(&body)?,
])?;
drop(stmt);
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
INSERT INTO character (character_id,
player_uuid,
alias,
waypoint)
VALUES (?1, ?2, ?3, ?4)")?;
stmt.execute(&[
&character_id as &dyn ToSql,
&uuid,
&character_alias,
&convert_waypoint_to_database_json(waypoint),
])?;
drop(stmt);
let skill_set = stats.skill_set;
// Insert body record
let new_body = Body {
body_id: character_id,
body_data: convert_body_to_database_json(&body)?,
variant: "humanoid".to_string(),
};
let body_count = diesel::insert_into(body::table)
.values(&new_body)
.execute(&*connection)?;
if body_count != 1 {
return Err(Error::OtherError(format!(
"Error inserting into body table for char_id {}",
character_id
)));
}
// Insert character record
let new_character = NewCharacter {
character_id,
player_uuid: uuid,
alias: &character_alias,
waypoint: convert_waypoint_to_database_json(waypoint),
};
let character_count = diesel::insert_into(character::table)
.values(&new_character)
.execute(&*connection)?;
if character_count != 1 {
return Err(Error::OtherError(format!(
"Error inserting into character table for char_id {}",
character_id
)));
}
let db_skill_groups = convert_skill_groups_to_database(character_id, skill_set.skill_groups);
let skill_groups_count = diesel::insert_into(skill_group::table)
.values(&db_skill_groups)
.execute(&*connection)?;
if skill_groups_count != 1 {
return Err(Error::OtherError(format!(
"Error inserting into skill_group table for char_id {}",
character_id
)));
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
INSERT INTO skill_group (entity_id,
skill_group_kind,
exp,
available_sp,
earned_sp)
VALUES (?1, ?2, ?3, ?4, ?5)")?;
for skill_group in db_skill_groups {
stmt.execute(&[
&character_id as &dyn ToSql,
&skill_group.skill_group_kind,
&skill_group.exp,
&skill_group.available_sp,
&skill_group.earned_sp,
])?;
}
drop(stmt);
// Insert default inventory and loadout item records
let mut inserts = Vec::new();
@ -305,21 +425,26 @@ pub fn create_character(
next_id
})?;
let expected_inserted_count = inserts.len();
let inserted_items = inserts
.into_iter()
.map(|item_pair| item_pair.model)
.collect::<Vec<_>>();
let inserted_count = diesel::insert_into(item)
.values(&inserted_items)
.execute(&*connection)?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
INSERT INTO item (item_id,
parent_container_item_id,
item_definition_id,
stack_size,
position)
VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
if expected_inserted_count != inserted_count {
return Err(Error::OtherError(format!(
"Expected insertions={}, actual={}, for char_id {}--unsafe to continue transaction.",
expected_inserted_count, inserted_count, character_id
)));
for item in inserts {
stmt.execute(&[
&item.model.item_id as &dyn ToSql,
&item.model.parent_container_item_id,
&item.model.item_definition_id,
&item.model.stack_size,
&item.model.position,
])?;
}
drop(stmt);
load_character_list(uuid, connection, msm).map(|list| (character_id, list))
}
@ -328,64 +453,81 @@ pub fn create_character(
pub fn delete_character(
requesting_player_uuid: &str,
char_id: CharacterId,
connection: VelorenTransaction,
connection: &mut Transaction,
msm: &MaterialStatManifest,
) -> CharacterListResult {
use schema::{body::dsl::*, character::dsl::*, skill::dsl::*, skill_group::dsl::*};
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
SELECT COUNT(1)
FROM character
WHERE character_id = ?1
AND player_uuid = ?2")?;
// Load the character to delete - ensures that the requesting player
// owns the character
let _character_data = character
.filter(
schema::character::dsl::character_id
.eq(char_id)
.and(player_uuid.eq(requesting_player_uuid)),
)
.first::<Character>(&*connection)?;
let result = stmt.query_row(&[&char_id as &dyn ToSql, &requesting_player_uuid], |row| {
let y: i64 = row.get(0)?;
Ok(y)
})?;
drop(stmt);
if result != 1 {
return Err(PersistenceError::OtherError(
"Requested character to delete does not belong to the requesting player".to_string(),
));
}
// Delete skills
diesel::delete(skill_group.filter(schema::skill_group::dsl::entity_id.eq(char_id)))
.execute(&*connection)?;
let mut stmt = connection.prepare_cached(
"
DELETE
FROM skill
WHERE entity_id = ?1",
)?;
diesel::delete(skill.filter(schema::skill::dsl::entity_id.eq(char_id)))
.execute(&*connection)?;
stmt.execute(&[&char_id])?;
drop(stmt);
// Delete skill groups
let mut stmt = connection.prepare_cached(
"
DELETE
FROM skill_group
WHERE entity_id = ?1",
)?;
stmt.execute(&[&char_id])?;
drop(stmt);
// Delete character
let character_count = diesel::delete(
character
.filter(schema::character::dsl::character_id.eq(char_id))
.filter(player_uuid.eq(requesting_player_uuid)),
)
.execute(&*connection)?;
let mut stmt = connection.prepare_cached(
"
DELETE
FROM character
WHERE character_id = ?1",
)?;
if character_count != 1 {
return Err(Error::OtherError(format!(
"Error deleting from character table for char_id {}",
char_id
)));
}
stmt.execute(&[&char_id])?;
drop(stmt);
// Delete body
let body_count = diesel::delete(body.filter(schema::body::dsl::body_id.eq(char_id)))
.execute(&*connection)?;
let mut stmt = connection.prepare_cached(
"
DELETE
FROM body
WHERE body_id = ?1",
)?;
if body_count != 1 {
return Err(Error::OtherError(format!(
"Error deleting from body table for char_id {}",
char_id
)));
}
stmt.execute(&[&char_id])?;
drop(stmt);
// Delete all items, recursively walking all containers starting from the
// "character" pseudo-container that is the root for all items owned by
// a character.
let item_count = diesel::sql_query(format!(
let mut stmt = connection.prepare_cached(
"
WITH RECURSIVE
parents AS (
SELECT item_id
FROM item
WHERE item.item_id = {} -- Item with character id is the character pseudo-container
WHERE item.item_id = ?1 -- Item with character id is the character pseudo-container
UNION ALL
SELECT item.item_id
FROM item,
@ -395,15 +537,16 @@ pub fn delete_character(
DELETE
FROM item
WHERE EXISTS (SELECT 1 FROM parents WHERE parents.item_id = item.item_id)",
char_id
))
.execute(&*connection)?;
)?;
if item_count < 3 {
return Err(Error::OtherError(format!(
let deleted_item_count = stmt.execute(&[&char_id])?;
drop(stmt);
if deleted_item_count < 3 {
return Err(PersistenceError::OtherError(format!(
"Error deleting from item table for char_id {} (expected at least 3 deletions, found \
{})",
char_id, item_count
char_id, deleted_item_count
)));
}
@ -412,24 +555,24 @@ pub fn delete_character(
/// Before creating a character, we ensure that the limit on the number of
/// characters has not been exceeded
pub fn check_character_limit(uuid: &str, connection: VelorenTransaction) -> Result<(), Error> {
use diesel::dsl::count_star;
use schema::character::dsl::*;
pub fn check_character_limit(
uuid: &str,
connection: &mut Transaction,
) -> Result<(), PersistenceError> {
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
SELECT COUNT(1)
FROM character
WHERE player_uuid = ?1")?;
let character_count = character
.select(count_star())
.filter(player_uuid.eq(uuid))
.load::<i64>(&*connection)?;
#[allow(clippy::needless_question_mark)]
let character_count: i64 = stmt.query_row(&[&uuid], |row| Ok(row.get(0)?))?;
drop(stmt);
match character_count.first() {
Some(count) => {
if count < &(MAX_CHARACTERS_PER_PLAYER as i64) {
if character_count < MAX_CHARACTERS_PER_PLAYER as i64 {
Ok(())
} else {
Err(Error::CharacterLimitReached)
}
},
_ => Ok(()),
Err(PersistenceError::CharacterLimitReached)
}
}
@ -440,48 +583,32 @@ pub fn check_character_limit(uuid: &str, connection: VelorenTransaction) -> Resu
///
/// These are then inserted into the entities table.
fn get_new_entity_ids(
conn: VelorenTransaction,
conn: &mut Transaction,
mut max: impl FnMut(i64) -> i64,
) -> Result<Range<EntityId>, Error> {
use super::schema::entity::dsl::*;
#[derive(QueryableByName)]
struct NextEntityId {
#[sql_type = "BigInt"]
entity_id: i64,
}
) -> Result<Range<EntityId>, PersistenceError> {
// The sqlite_sequence table is used here to avoid reusing entity IDs for
// deleted entities. This table always contains the highest used ID for each
// AUTOINCREMENT column in a SQLite database.
let next_entity_id = sql_query(
// deleted entities. This table always contains the highest used ID for
// each AUTOINCREMENT column in a SQLite database.
#[rustfmt::skip]
let mut stmt = conn.prepare_cached(
"
SELECT seq + 1 AS entity_id
FROM sqlite_sequence
WHERE name = 'entity'",
)
.load::<NextEntityId>(&*conn)?
.pop()
.ok_or_else(|| Error::OtherError("No rows returned for sqlite_sequence query ".to_string()))?
.entity_id;
)?;
#[allow(clippy::needless_question_mark)]
let next_entity_id = stmt.query_row(NO_PARAMS, |row| Ok(row.get(0)?))?;
let max_entity_id = max(next_entity_id);
// Create a new range of IDs and insert them into the entity table
let new_ids: Range<EntityId> = next_entity_id..max_entity_id;
let new_entities: Vec<Entity> = new_ids.clone().map(|x| Entity { entity_id: x }).collect();
let mut stmt = conn.prepare_cached("INSERT INTO entity (entity_id) VALUES (?1)")?;
let actual_count = diesel::insert_into(entity)
.values(&new_entities)
.execute(&*conn)?;
if actual_count != new_entities.len() {
return Err(Error::OtherError(format!(
"Error updating entity table: expected to add the range {:?}) to entities, but actual \
insertions={}",
new_ids, actual_count
)));
// TODO: bulk insert? rarray doesn't seem to work in VALUES clause
for i in new_ids.clone() {
stmt.execute(&[i])?;
}
trace!(
@ -498,9 +625,9 @@ fn get_new_entity_ids(
/// Fetches the pseudo_container IDs for a character
fn get_pseudo_containers(
connection: VelorenTransaction,
connection: &mut Transaction,
character_id: CharacterId,
) -> Result<CharacterContainers, Error> {
) -> Result<CharacterContainers, PersistenceError> {
let character_containers = CharacterContainers {
loadout_container_id: get_pseudo_container_id(
connection,
@ -518,20 +645,28 @@ fn get_pseudo_containers(
}
fn get_pseudo_container_id(
connection: VelorenTransaction,
connection: &mut Transaction,
character_id: CharacterId,
pseudo_container_position: &str,
) -> Result<EntityId, Error> {
use super::schema::item::dsl::*;
match item
.select(item_id)
.filter(
parent_container_item_id
.eq(character_id)
.and(position.eq(pseudo_container_position)),
)
.first::<EntityId>(&*connection)
{
) -> Result<EntityId, PersistenceError> {
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("\
SELECT item_id
FROM item
WHERE parent_container_item_id = ?1
AND position = ?2",
)?;
#[allow(clippy::needless_question_mark)]
let res = stmt.query_row(
&[
character_id.to_string(),
pseudo_container_position.to_string(),
],
|row| Ok(row.get(0)?),
);
match res {
Ok(id) => Ok(id),
Err(e) => {
error!(
@ -550,16 +685,14 @@ pub fn update(
char_stats: comp::Stats,
inventory: comp::Inventory,
char_waypoint: Option<comp::Waypoint>,
connection: VelorenTransaction,
) -> Result<Vec<Arc<common::comp::item::ItemId>>, Error> {
use super::schema::{character::dsl::*, item::dsl::*, skill_group::dsl::*};
connection: &mut Transaction,
) -> Result<(), PersistenceError> {
let pseudo_containers = get_pseudo_containers(connection, char_id)?;
let mut upserts = Vec::new();
// First, get all the entity IDs for any new items, and identify which slots to
// upsert and which ones to delete.
// First, get all the entity IDs for any new items, and identify which
// slots to upsert and which ones to delete.
get_new_entity_ids(connection, |mut next_id| {
let upserts_ = convert_items_to_database_items(
pseudo_containers.loadout_container_id,
@ -573,33 +706,36 @@ pub fn update(
// Next, delete any slots we aren't upserting.
trace!("Deleting items for character_id {}", char_id);
let mut existing_item_ids: Vec<i64> = vec![
pseudo_containers.inventory_container_id,
pseudo_containers.loadout_container_id,
let mut existing_item_ids: Vec<_> = vec![
Value::from(pseudo_containers.inventory_container_id),
Value::from(pseudo_containers.loadout_container_id),
];
for it in load_items_bfs(connection, pseudo_containers.inventory_container_id)? {
existing_item_ids.push(it.item_id);
existing_item_ids.push(Value::from(it.item_id));
}
for it in load_items_bfs(connection, pseudo_containers.loadout_container_id)? {
existing_item_ids.push(it.item_id);
existing_item_ids.push(Value::from(it.item_id));
}
let existing_items = parent_container_item_id.eq_any(existing_item_ids);
let non_upserted_items = item_id.ne_all(
upserts
.iter()
.map(|item_pair| item_pair.model.item_id)
.collect::<Vec<_>>(),
);
let delete_count = diesel::delete(item.filter(existing_items.and(non_upserted_items)))
.execute(&*connection)?;
let non_upserted_items = upserts
.iter()
.map(|item_pair| Value::from(item_pair.model.item_id))
.collect::<Vec<Value>>();
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
DELETE
FROM item
WHERE parent_container_item_id
IN rarray(?1)
AND item_id NOT IN rarray(?2)")?;
let delete_count = stmt.execute(&[Rc::new(existing_item_ids), Rc::new(non_upserted_items)])?;
trace!("Deleted {} items", delete_count);
// Upsert items
let expected_upsert_count = upserts.len();
let mut upserted_comps = Vec::new();
if expected_upsert_count > 0 {
let (upserted_items, upserted_comps_): (Vec<_>, Vec<_>) = upserts
let (upserted_items, _): (Vec<_>, Vec<_>) = upserts
.into_iter()
.map(|model_pair| {
debug_assert_eq!(
@ -609,7 +745,6 @@ pub fn update(
(model_pair.model, model_pair.comp)
})
.unzip();
upserted_comps = upserted_comps_;
trace!(
"Upserting items {:?} for character_id {}",
upserted_items,
@ -617,22 +752,32 @@ pub fn update(
);
// When moving inventory items around, foreign key constraints on
// `parent_container_item_id` can be temporarily violated by one upsert, but
// restored by another upsert. Deferred constraints allow SQLite to check this
// when committing the transaction. The `defer_foreign_keys` pragma treats the
// foreign key constraints as deferred for the next transaction (it turns itself
// `parent_container_item_id` can be temporarily violated by one
// upsert, but restored by another upsert. Deferred constraints
// allow SQLite to check this when committing the transaction.
// The `defer_foreign_keys` pragma treats the foreign key
// constraints as deferred for the next transaction (it turns itself
// off at the commit boundary). https://sqlite.org/foreignkeys.html#fk_deferred
connection.execute("PRAGMA defer_foreign_keys = ON;")?;
let upsert_count = diesel::replace_into(item)
.values(&upserted_items)
.execute(&*connection)?;
trace!("upsert_count: {}", upsert_count);
if upsert_count != expected_upsert_count {
return Err(Error::OtherError(format!(
"Expected upsertions={}, actual={}, for char_id {}--unsafe to continue \
transaction.",
expected_upsert_count, upsert_count, char_id
)));
connection.pragma_update(None, "defer_foreign_keys", &"ON".to_string())?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
REPLACE
INTO item (item_id,
parent_container_item_id,
item_definition_id,
stack_size,
position)
VALUES (?1, ?2, ?3, ?4, ?5)")?;
for item in upserted_items.iter() {
stmt.execute(&[
&item.item_id as &dyn ToSql,
&item.parent_container_item_id,
&item.item_definition_id,
&item.stack_size,
&item.position,
])?;
}
}
@ -640,43 +785,74 @@ pub fn update(
let db_skill_groups = convert_skill_groups_to_database(char_id, char_skill_set.skill_groups);
diesel::replace_into(skill_group)
.values(&db_skill_groups)
.execute(&*connection)?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
REPLACE
INTO skill_group (entity_id,
skill_group_kind,
exp,
available_sp,
earned_sp)
VALUES (?1, ?2, ?3, ?4, ?5)")?;
for skill_group in db_skill_groups {
stmt.execute(&[
&skill_group.entity_id as &dyn ToSql,
&skill_group.skill_group_kind,
&skill_group.exp,
&skill_group.available_sp,
&skill_group.earned_sp,
])?;
}
let db_skills = convert_skills_to_database(char_id, char_skill_set.skills);
let delete_count = diesel::delete(
schema::skill::dsl::skill.filter(
schema::skill::dsl::entity_id.eq(char_id).and(
schema::skill::dsl::skill_type.ne_all(
let known_skills = Rc::new(
db_skills
.iter()
.map(|x| x.skill_type.clone())
.collect::<Vec<_>>(),
),
),
),
)
.execute(&*connection)?;
.map(|x| Value::from(x.skill.clone()))
.collect::<Vec<Value>>(),
);
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
DELETE
FROM skill
WHERE entity_id = ?1
AND skill NOT IN rarray(?2)")?;
let delete_count = stmt.execute(&[&char_id as &dyn ToSql, &known_skills])?;
trace!("Deleted {} skills", delete_count);
diesel::replace_into(schema::skill::dsl::skill)
.values(&db_skills)
.execute(&*connection)?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
REPLACE
INTO skill (entity_id,
skill,
level)
VALUES (?1, ?2, ?3)")?;
for skill in db_skills {
stmt.execute(&[&skill.entity_id as &dyn ToSql, &skill.skill, &skill.level])?;
}
let db_waypoint = convert_waypoint_to_database_json(char_waypoint);
let waypoint_count =
diesel::update(character.filter(schema::character::dsl::character_id.eq(char_id)))
.set(waypoint.eq(db_waypoint))
.execute(&*connection)?;
#[rustfmt::skip]
let mut stmt = connection.prepare_cached("
UPDATE character
SET waypoint = ?1
WHERE character_id = ?2
")?;
let waypoint_count = stmt.execute(&[&db_waypoint as &dyn ToSql, &char_id])?;
if waypoint_count != 1 {
return Err(Error::OtherError(format!(
return Err(PersistenceError::OtherError(format!(
"Error updating character table for char_id {}",
char_id
)));
}
Ok(upserted_comps)
Ok(())
}

View File

@ -4,7 +4,7 @@ use crate::persistence::{
};
use crate::persistence::{
error::Error,
error::PersistenceError,
json_models::{self, CharacterPosition, HumanoidBody},
};
use common::{
@ -23,6 +23,7 @@ use common::{
use core::{convert::TryFrom, num::NonZeroU64};
use hashbrown::HashMap;
use std::{collections::VecDeque, sync::Arc};
use tracing::trace;
#[derive(Debug)]
pub struct ItemModelPair {
@ -174,17 +175,17 @@ pub fn convert_items_to_database_items(
}
}
upserts.sort_by_key(|pair| (depth[&pair.model.item_id], pair.model.item_id));
tracing::debug!("upserts: {:#?}", upserts);
trace!("upserts: {:#?}", upserts);
upserts
}
pub fn convert_body_to_database_json(body: &CompBody) -> Result<String, Error> {
pub fn convert_body_to_database_json(body: &CompBody) -> Result<String, PersistenceError> {
let json_model = match body {
common::comp::Body::Humanoid(humanoid_body) => HumanoidBody::from(humanoid_body),
_ => unimplemented!("Only humanoid bodies are currently supported for persistence"),
};
serde_json::to_string(&json_model).map_err(Error::SerializationError)
serde_json::to_string(&json_model).map_err(PersistenceError::SerializationError)
}
pub fn convert_waypoint_to_database_json(waypoint: Option<Waypoint>) -> Option<String> {
@ -196,7 +197,10 @@ pub fn convert_waypoint_to_database_json(waypoint: Option<Waypoint>) -> Option<S
Some(
serde_json::to_string(&charpos)
.map_err(|err| {
Error::ConversionError(format!("Error encoding waypoint: {:?}", err))
PersistenceError::ConversionError(format!(
"Error encoding waypoint: {:?}",
err
))
})
.ok()?,
)
@ -205,10 +209,10 @@ pub fn convert_waypoint_to_database_json(waypoint: Option<Waypoint>) -> Option<S
}
}
pub fn convert_waypoint_from_database_json(position: &str) -> Result<Waypoint, Error> {
pub fn convert_waypoint_from_database_json(position: &str) -> Result<Waypoint, PersistenceError> {
let character_position =
serde_json::de::from_str::<CharacterPosition>(position).map_err(|err| {
Error::ConversionError(format!(
PersistenceError::ConversionError(format!(
"Error de-serializing waypoint: {} err: {}",
position, err
))
@ -227,7 +231,7 @@ pub fn convert_inventory_from_database_items(
loadout_container_id: i64,
loadout_items: &[Item],
msm: &MaterialStatManifest,
) -> Result<Inventory, Error> {
) -> Result<Inventory, PersistenceError> {
// Loadout items must be loaded before inventory items since loadout items
// provide inventory slots. Since items stored inside loadout items actually
// have their parent_container_item_id as the loadout pseudo-container we rely
@ -248,7 +252,7 @@ pub fn convert_inventory_from_database_items(
// Item ID
comp.store(Some(NonZeroU64::try_from(db_item.item_id as u64).map_err(
|_| Error::ConversionError("Item with zero item_id".to_owned()),
|_| PersistenceError::ConversionError("Item with zero item_id".to_owned()),
)?));
// Stack Size
@ -257,13 +261,15 @@ pub fn convert_inventory_from_database_items(
// (to be dropped next to the player) as this could be the result of
// a change in the max amount for that item.
item.set_amount(u32::try_from(db_item.stack_size).map_err(|_| {
Error::ConversionError(format!(
PersistenceError::ConversionError(format!(
"Invalid item stack size for stackable={}: {}",
item.is_stackable(),
&db_item.stack_size
))
})?)
.map_err(|_| Error::ConversionError("Error setting amount for item".to_owned()))?;
.map_err(|_| {
PersistenceError::ConversionError("Error setting amount for item".to_owned())
})?;
}
// Insert item into inventory
@ -271,7 +277,7 @@ pub fn convert_inventory_from_database_items(
// Slot position
let slot = |s: &str| {
serde_json::from_str::<InvSlotId>(s).map_err(|_| {
Error::ConversionError(format!(
PersistenceError::ConversionError(format!(
"Failed to parse item position: {:?}",
&db_item.position
))
@ -288,7 +294,7 @@ pub fn convert_inventory_from_database_items(
// (to be dropped next to the player) as this could be the
// result of a change in the slot capacity for an equipped bag
// (or a change in the inventory size).
Error::ConversionError(format!(
PersistenceError::ConversionError(format!(
"Error inserting item into inventory, position: {:?}",
slot
))
@ -298,7 +304,7 @@ pub fn convert_inventory_from_database_items(
// If inventory.insert returns an item, it means it was swapped for an item that
// already occupied the slot. Multiple items being stored in the database for
// the same slot is an error.
return Err(Error::ConversionError(
return Err(PersistenceError::ConversionError(
"Inserted an item into the same slot twice".to_string(),
));
}
@ -306,14 +312,14 @@ pub fn convert_inventory_from_database_items(
if let Some(Some(parent)) = inventory.slot_mut(slot(&inventory_items[j].position)?) {
parent.add_component(item, msm);
} else {
return Err(Error::ConversionError(format!(
return Err(PersistenceError::ConversionError(format!(
"Parent slot {} for component {} was empty even though it occurred earlier in \
the loop?",
db_item.parent_container_item_id, db_item.item_id
)));
}
} else {
return Err(Error::ConversionError(format!(
return Err(PersistenceError::ConversionError(format!(
"Couldn't find parent item {} before item {} in inventory",
db_item.parent_container_item_id, db_item.item_id
)));
@ -327,7 +333,7 @@ pub fn convert_loadout_from_database_items(
loadout_container_id: i64,
database_items: &[Item],
msm: &MaterialStatManifest,
) -> Result<Loadout, Error> {
) -> Result<Loadout, PersistenceError> {
let loadout_builder = LoadoutBuilder::new();
let mut loadout = loadout_builder.build();
let mut item_indices = HashMap::new();
@ -340,16 +346,18 @@ pub fn convert_loadout_from_database_items(
// NOTE: item id is currently *unique*, so we can store the ID safely.
let comp = item.get_item_id_for_database();
comp.store(Some(NonZeroU64::try_from(db_item.item_id as u64).map_err(
|_| Error::ConversionError("Item with zero item_id".to_owned()),
|_| PersistenceError::ConversionError("Item with zero item_id".to_owned()),
)?));
let convert_error = |err| match err {
LoadoutError::InvalidPersistenceKey => {
Error::ConversionError(format!("Invalid persistence key: {}", &db_item.position))
},
LoadoutError::NoParentAtSlot => {
Error::ConversionError(format!("No parent item at slot: {}", &db_item.position))
},
LoadoutError::InvalidPersistenceKey => PersistenceError::ConversionError(format!(
"Invalid persistence key: {}",
&db_item.position
)),
LoadoutError::NoParentAtSlot => PersistenceError::ConversionError(format!(
"No parent item at slot: {}",
&db_item.position
)),
};
if db_item.parent_container_item_id == loadout_container_id {
@ -363,7 +371,7 @@ pub fn convert_loadout_from_database_items(
})
.map_err(convert_error)?;
} else {
return Err(Error::ConversionError(format!(
return Err(PersistenceError::ConversionError(format!(
"Couldn't find parent item {} before item {} in loadout",
db_item.parent_container_item_id, db_item.item_id
)));
@ -373,7 +381,7 @@ pub fn convert_loadout_from_database_items(
Ok(loadout)
}
pub fn convert_body_from_database(body: &Body) -> Result<CompBody, Error> {
pub fn convert_body_from_database(body: &Body) -> Result<CompBody, PersistenceError> {
Ok(match body.variant.as_str() {
"humanoid" => {
let json_model = serde_json::de::from_str::<HumanoidBody>(&body.body_data)?;
@ -381,13 +389,16 @@ pub fn convert_body_from_database(body: &Body) -> Result<CompBody, Error> {
species: common::comp::humanoid::ALL_SPECIES
.get(json_model.species as usize)
.ok_or_else(|| {
Error::ConversionError(format!("Missing species: {}", json_model.species))
PersistenceError::ConversionError(format!(
"Missing species: {}",
json_model.species
))
})?
.to_owned(),
body_type: common::comp::humanoid::ALL_BODY_TYPES
.get(json_model.body_type as usize)
.ok_or_else(|| {
Error::ConversionError(format!(
PersistenceError::ConversionError(format!(
"Missing body_type: {}",
json_model.body_type
))
@ -403,7 +414,7 @@ pub fn convert_body_from_database(body: &Body) -> Result<CompBody, Error> {
})
},
_ => {
return Err(Error::ConversionError(
return Err(PersistenceError::ConversionError(
"Only humanoid bodies are supported for characters".to_string(),
));
},
@ -439,9 +450,9 @@ pub fn convert_stats_from_database(
new_stats
}
fn get_item_from_asset(item_definition_id: &str) -> Result<common::comp::Item, Error> {
fn get_item_from_asset(item_definition_id: &str) -> Result<common::comp::Item, PersistenceError> {
common::comp::Item::new_from_asset(item_definition_id).map_err(|err| {
Error::AssetError(format!(
PersistenceError::AssetError(format!(
"Error loading item asset: {} - {}",
item_definition_id,
err.to_string()
@ -467,7 +478,7 @@ fn convert_skill_groups_from_database(skill_groups: &[SkillGroup]) -> Vec<skills
fn convert_skills_from_database(skills: &[Skill]) -> HashMap<skills::Skill, Option<u16>> {
let mut new_skills = HashMap::new();
for skill in skills.iter() {
let new_skill = json_models::db_string_to_skill(&skill.skill_type);
let new_skill = json_models::db_string_to_skill(&skill.skill);
new_skills.insert(new_skill, skill.level.map(|l| l as u16));
}
new_skills
@ -497,7 +508,7 @@ pub fn convert_skills_to_database(
.iter()
.map(|(s, l)| Skill {
entity_id,
skill_type: json_models::skill_to_db_string(*s),
skill: json_models::skill_to_db_string(*s),
level: l.map(|l| l as i32),
})
.collect()

View File

@ -1,7 +1,7 @@
use crate::persistence::{
character::{create_character, delete_character, load_character_data, load_character_list},
error::Error,
establish_connection, PersistedComponents,
error::PersistenceError,
establish_connection, DatabaseSettings, PersistedComponents,
};
use common::{
character::{CharacterId, CharacterItem},
@ -9,12 +9,14 @@ use common::{
};
use crossbeam_channel::{self, TryIter};
use lazy_static::lazy_static;
use std::path::Path;
use tracing::error;
use rusqlite::Transaction;
use std::sync::{Arc, RwLock};
use tracing::{error, trace};
pub(crate) type CharacterListResult = Result<Vec<CharacterItem>, Error>;
pub(crate) type CharacterCreationResult = Result<(CharacterId, Vec<CharacterItem>), Error>;
pub(crate) type CharacterDataResult = Result<PersistedComponents, Error>;
pub(crate) type CharacterListResult = Result<Vec<CharacterItem>, PersistenceError>;
pub(crate) type CharacterCreationResult =
Result<(CharacterId, Vec<CharacterItem>), PersistenceError>;
pub(crate) type CharacterDataResult = Result<PersistedComponents, PersistenceError>;
type CharacterLoaderRequest = (specs::Entity, CharacterLoaderRequestKind);
/// Available database operations when modifying a player's character list
@ -53,6 +55,17 @@ pub struct CharacterLoaderResponse {
pub result: CharacterLoaderResponseKind,
}
impl CharacterLoaderResponse {
pub fn is_err(&self) -> bool {
matches!(
&self.result,
CharacterLoaderResponseKind::CharacterData(box Err(_))
| CharacterLoaderResponseKind::CharacterList(Err(_))
| CharacterLoaderResponseKind::CharacterCreation(Err(_))
)
}
}
/// A bi-directional messaging resource for making requests to modify or load
/// character data in a background thread.
///
@ -78,84 +91,47 @@ lazy_static! {
}
impl CharacterLoader {
pub fn new(db_dir: &Path) -> diesel::QueryResult<Self> {
pub fn new(settings: Arc<RwLock<DatabaseSettings>>) -> Result<Self, PersistenceError> {
let (update_tx, internal_rx) = crossbeam_channel::unbounded::<CharacterLoaderRequest>();
let (internal_tx, update_rx) = crossbeam_channel::unbounded::<CharacterLoaderResponse>();
let mut conn = establish_connection(db_dir)?;
let builder = std::thread::Builder::new().name("persistence_loader".into());
builder
.spawn(move || {
for request in internal_rx {
let (entity, kind) = request;
// Unwrap here is safe as there is no code that can panic when the write lock is
// taken that could cause the RwLock to become poisoned.
let mut conn = establish_connection(&*settings.read().unwrap());
if let Err(e) = internal_tx.send(CharacterLoaderResponse {
entity,
result: match kind {
CharacterLoaderRequestKind::CreateCharacter {
player_uuid,
character_alias,
persisted_components,
} => CharacterLoaderResponseKind::CharacterCreation(conn.transaction(
|txn| {
create_character(
&player_uuid,
&character_alias,
persisted_components,
txn,
&MATERIAL_STATS_MANIFEST,
)
for request in internal_rx {
conn.update_log_mode(&settings);
match conn.connection.transaction() {
Ok(mut transaction) => {
let response =
CharacterLoader::process_request(request, &mut transaction);
if !response.is_err() {
match transaction.commit() {
Ok(()) => {
trace!("Commit for character loader completed");
},
)),
CharacterLoaderRequestKind::DeleteCharacter {
player_uuid,
character_id,
} => CharacterLoaderResponseKind::CharacterList(conn.transaction(
|txn| {
delete_character(
&player_uuid,
character_id,
txn,
&MATERIAL_STATS_MANIFEST,
)
},
)),
CharacterLoaderRequestKind::LoadCharacterList { player_uuid } => {
CharacterLoaderResponseKind::CharacterList(conn.transaction(
|txn| {
load_character_list(
&player_uuid,
txn,
&MATERIAL_STATS_MANIFEST,
)
},
))
},
CharacterLoaderRequestKind::LoadCharacterData {
player_uuid,
character_id,
} => {
let result = conn.transaction(|txn| {
load_character_data(
player_uuid,
character_id,
txn,
&MATERIAL_STATS_MANIFEST,
)
});
if result.is_err() {
error!(
?result,
"Error loading character data for character_id: {}",
character_id
);
Err(e) => error!(
"Failed to commit transaction for character loader, \
error: {:?}",
e
),
};
};
if let Err(e) = internal_tx.send(response) {
error!(?e, "Could not send character loader response");
}
CharacterLoaderResponseKind::CharacterData(Box::new(result))
},
Err(e) => {
error!(
"Failed to start transaction for character loader, error: {:?}",
e
)
},
}) {
error!(?e, "Could not send send persistence request");
}
}
})
@ -167,6 +143,66 @@ impl CharacterLoader {
})
}
// TODO: Refactor the way that we send errors to the client to not require a
// specific Result type per CharacterLoaderResponseKind, and remove
// CharacterLoaderResponse::is_err()
fn process_request(
request: CharacterLoaderRequest,
mut transaction: &mut Transaction,
) -> CharacterLoaderResponse {
let (entity, kind) = request;
CharacterLoaderResponse {
entity,
result: match kind {
CharacterLoaderRequestKind::CreateCharacter {
player_uuid,
character_alias,
persisted_components,
} => CharacterLoaderResponseKind::CharacterCreation(create_character(
&player_uuid,
&character_alias,
persisted_components,
&mut transaction,
&MATERIAL_STATS_MANIFEST,
)),
CharacterLoaderRequestKind::DeleteCharacter {
player_uuid,
character_id,
} => CharacterLoaderResponseKind::CharacterList(delete_character(
&player_uuid,
character_id,
&mut transaction,
&MATERIAL_STATS_MANIFEST,
)),
CharacterLoaderRequestKind::LoadCharacterList { player_uuid } => {
CharacterLoaderResponseKind::CharacterList(load_character_list(
&player_uuid,
&mut transaction,
&MATERIAL_STATS_MANIFEST,
))
},
CharacterLoaderRequestKind::LoadCharacterData {
player_uuid,
character_id,
} => {
let result = load_character_data(
player_uuid,
character_id,
&mut transaction,
&MATERIAL_STATS_MANIFEST,
);
if result.is_err() {
error!(
?result,
"Error loading character data for character_id: {}", character_id
);
}
CharacterLoaderResponseKind::CharacterData(Box::new(result))
},
},
}
}
/// Create a new character belonging to the player identified by
/// `player_uuid`
pub fn create_character(

View File

@ -1,36 +1,83 @@
use crate::comp;
use common::{character::CharacterId, comp::item::ItemId};
use common::character::CharacterId;
use crate::persistence::{establish_connection, VelorenConnection};
use std::{path::Path, sync::Arc};
use tracing::{error, trace};
use crate::persistence::{
error::PersistenceError, establish_connection, DatabaseSettings, VelorenConnection,
};
use rusqlite::DropBehavior;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
};
use tracing::{debug, error, info, trace, warn};
pub type CharacterUpdateData = (comp::Stats, comp::Inventory, Option<comp::Waypoint>);
pub enum CharacterUpdaterEvent {
BatchUpdate(Vec<(CharacterId, CharacterUpdateData)>),
DisconnectedSuccess,
}
/// A unidirectional messaging resource for saving characters in a
/// background thread.
///
/// This is used to make updates to a character and their persisted components,
/// such as inventory, loadout, etc...
pub struct CharacterUpdater {
update_tx: Option<crossbeam_channel::Sender<Vec<(CharacterId, CharacterUpdateData)>>>,
update_tx: Option<crossbeam_channel::Sender<CharacterUpdaterEvent>>,
handle: Option<std::thread::JoinHandle<()>>,
pending_logout_updates: HashMap<CharacterId, CharacterUpdateData>,
/// Will disconnect all characters (without persistence) on the next tick if
/// set to true
disconnect_all_clients_requested: Arc<AtomicBool>,
}
impl CharacterUpdater {
pub fn new(db_dir: &Path) -> diesel::QueryResult<Self> {
let (update_tx, update_rx) =
crossbeam_channel::unbounded::<Vec<(CharacterId, CharacterUpdateData)>>();
let mut conn = establish_connection(db_dir)?;
pub fn new(settings: Arc<RwLock<DatabaseSettings>>) -> rusqlite::Result<Self> {
let (update_tx, update_rx) = crossbeam_channel::unbounded::<CharacterUpdaterEvent>();
let disconnect_all_clients_requested = Arc::new(AtomicBool::new(false));
let disconnect_all_clients_requested_clone = Arc::clone(&disconnect_all_clients_requested);
let builder = std::thread::Builder::new().name("persistence_updater".into());
let handle = builder
.spawn(move || {
// Unwrap here is safe as there is no code that can panic when the write lock is
// taken that could cause the RwLock to become poisoned.
let mut conn = establish_connection(&*settings.read().unwrap());
while let Ok(updates) = update_rx.recv() {
trace!("Persistence batch update starting");
execute_batch_update(updates, &mut conn);
trace!("Persistence batch update finished");
match updates {
CharacterUpdaterEvent::BatchUpdate(updates) => {
if disconnect_all_clients_requested_clone.load(Ordering::Relaxed) {
debug!(
"Skipping persistence due to pending disconnection of all \
clients"
);
continue;
}
conn.update_log_mode(&settings);
if let Err(e) = execute_batch_update(updates, &mut conn) {
error!(
"Error during character batch update, disconnecting all \
clients to avoid loss of data integrity. Error: {:?}",
e
);
disconnect_all_clients_requested_clone
.store(true, Ordering::Relaxed);
};
},
CharacterUpdaterEvent::DisconnectedSuccess => {
info!(
"CharacterUpdater received DisconnectedSuccess event, resuming \
batch updates"
);
// Reset the disconnection request as we have had confirmation that all
// clients have been disconnected
disconnect_all_clients_requested_clone.store(false, Ordering::Relaxed);
},
}
}
})
.unwrap();
@ -38,12 +85,49 @@ impl CharacterUpdater {
Ok(Self {
update_tx: Some(update_tx),
handle: Some(handle),
pending_logout_updates: HashMap::new(),
disconnect_all_clients_requested,
})
}
/// Adds a character to the list of characters that have recently logged out
/// and will be persisted in the next batch update.
pub fn add_pending_logout_update(
&mut self,
character_id: CharacterId,
update_data: CharacterUpdateData,
) {
if !self
.disconnect_all_clients_requested
.load(Ordering::Relaxed)
{
self.pending_logout_updates
.insert(character_id, update_data);
} else {
warn!(
"Ignoring request to add pending logout update for character ID {} as there is a \
disconnection of all clients in progress",
character_id
);
}
}
/// Returns the character IDs of characters that have recently logged out
/// and are awaiting persistence in the next batch update.
pub fn characters_pending_logout(&self) -> impl Iterator<Item = CharacterId> + '_ {
self.pending_logout_updates.keys().copied()
}
/// Returns a value indicating whether there is a pending request to
/// disconnect all clients due to a batch update transaction failure
pub fn disconnect_all_clients_requested(&self) -> bool {
self.disconnect_all_clients_requested
.load(Ordering::Relaxed)
}
/// Updates a collection of characters based on their id and components
pub fn batch_update<'a>(
&self,
&mut self,
updates: impl Iterator<
Item = (
CharacterId,
@ -60,16 +144,22 @@ impl CharacterUpdater {
(stats.clone(), inventory.clone(), waypoint.cloned()),
)
})
.chain(self.pending_logout_updates.drain())
.collect::<Vec<_>>();
if let Err(e) = self.update_tx.as_ref().unwrap().send(updates) {
if let Err(e) = self
.update_tx
.as_ref()
.unwrap()
.send(CharacterUpdaterEvent::BatchUpdate(updates))
{
error!(?e, "Could not send stats updates");
}
}
/// Updates a single character based on their id and components
pub fn update(
&self,
&mut self,
character_id: CharacterId,
stats: &comp::Stats,
inventory: &comp::Inventory,
@ -77,32 +167,37 @@ impl CharacterUpdater {
) {
self.batch_update(std::iter::once((character_id, stats, inventory, waypoint)));
}
/// Indicates to the batch update thread that a requested disconnection of
/// all clients has been processed
pub fn disconnected_success(&mut self) {
self.update_tx
.as_ref()
.unwrap()
.send(CharacterUpdaterEvent::DisconnectedSuccess)
.expect(
"Failed to send DisconnectedSuccess event - not sending this event will prevent \
future persistence batches from running",
);
}
}
fn execute_batch_update(
updates: Vec<(CharacterId, CharacterUpdateData)>,
connection: &mut VelorenConnection,
) {
let mut inserted_items = Vec::<Arc<ItemId>>::new();
if let Err(e) = connection.transaction::<_, super::error::Error, _>(|txn| {
for (character_id, (stats, inventory, waypoint)) in updates {
inserted_items.append(&mut super::character::update(
character_id,
stats,
inventory,
waypoint,
txn,
)?);
}
) -> Result<(), PersistenceError> {
let mut transaction = connection.connection.transaction()?;
transaction.set_drop_behavior(DropBehavior::Rollback);
trace!("Transaction started for character batch update");
updates
.into_iter()
.try_for_each(|(character_id, (stats, inventory, waypoint))| {
super::character::update(character_id, stats, inventory, waypoint, &mut transaction)
})?;
transaction.commit()?;
trace!("Commit for character batch update completed");
Ok(())
}) {
error!(?e, "Error during character batch update transaction");
}
// NOTE: On success, updating thee atomics is already taken care of
// internally.
}
impl Drop for CharacterUpdater {

View File

@ -1,5 +0,0 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "schema.rs"

View File

@ -0,0 +1,109 @@
use crate::persistence::{error::PersistenceError, VelorenConnection};
use rusqlite::NO_PARAMS;
use tracing::{debug, info};
/// Performs a one-time migration from diesel to refinery migrations. Copies
/// diesel's __diesel_schema_migrations table records to refinery_schema_history
/// and drops __diesel_schema_migrations.
// At some point in the future, when it is deemed no longer necessary to
// support migrations from pre-rusqlite databases this method should be deleted.
pub(crate) fn migrate_from_diesel(
connection: &mut VelorenConnection,
) -> Result<(), PersistenceError> {
let transaction = connection
.connection
.transaction()
.expect("failed to start transaction");
#[rustfmt::skip]
let mut stmt = transaction.prepare("
SELECT COUNT(1)
FROM sqlite_master
WHERE type='table'
AND name='__diesel_schema_migrations';
",
)?;
let diesel_migrations_table_exists = stmt.query_row(NO_PARAMS, |row| {
let row_count: i32 = row.get(0)?;
Ok(row_count > 0)
})?;
drop(stmt);
if !diesel_migrations_table_exists {
debug!(
"__diesel_schema_migrations table does not exist, skipping diesel to refinery \
migration"
);
return Ok(());
}
#[rustfmt::skip]
transaction.execute_batch("
-- Create temporary table to store Diesel > Refinery mapping data in
CREATE TEMP TABLE IF NOT EXISTS _migration_map (
diesel_version VARCHAR(50) NOT NULL,
refinery_version INT4 NOT NULL,
refinery_name VARCHAR(255) NOT NULL,
refinery_checksum VARCHAR(255) NOT NULL
);
-- Insert mapping records to _migration_map
INSERT INTO _migration_map VALUES ('20200411202519',1,'character','18301154882232874638');
INSERT INTO _migration_map VALUES ('20200419025352',2,'body','6687048722955029379');
INSERT INTO _migration_map VALUES ('20200420072214',3,'stats','2322064461300660230');
INSERT INTO _migration_map VALUES ('20200524235534',4,'race_species','16440275012526526388');
INSERT INTO _migration_map VALUES ('20200527145044',5,'inventory','13535458920968937266');
INSERT INTO _migration_map VALUES ('20200528210610',6,'loadout','18209914188629128082');
INSERT INTO _migration_map VALUES ('20200602210738',7,'inv_increase','3368217138206467823');
INSERT INTO _migration_map VALUES ('20200703194516',8,'skills','9202176632428664476');
INSERT INTO _migration_map VALUES ('20200707201052',9,'add_missing_inv_loadout','9127886123837666846');
INSERT INTO _migration_map VALUES ('20200710162552',10,'dash_melee_energy_cost_fix','14010543160640061685');
INSERT INTO _migration_map VALUES ('20200716044718',11,'migrate_armour_stats','1617484395098403184');
INSERT INTO _migration_map VALUES ('20200719223917',12,'update_item_stats','12571040280459413049');
INSERT INTO _migration_map VALUES ('20200724191205',13,'fix_projectile_stats','5178981757717265745');
INSERT INTO _migration_map VALUES ('20200729204534',14,'power_stat_for_weapons','17299186713398844906');
INSERT INTO _migration_map VALUES ('20200806212413',15,'fix_various_problems','17258097957115914749');
INSERT INTO _migration_map VALUES ('20200816130513',16,'item_persistence','18222209741267759587');
INSERT INTO _migration_map VALUES ('20200925200504',17,'move_sceptres','8956411670404874637');
INSERT INTO _migration_map VALUES ('20201107182406',18,'rename_npcweapons','10703468376963165521');
INSERT INTO _migration_map VALUES ('20201116173524',19,'move_waypoint_to_stats','10083555685813984571');
INSERT INTO _migration_map VALUES ('20201128205542',20,'item_storage','11912657465469442777');
INSERT INTO _migration_map VALUES ('20201213172324',21,'shinygem_to_diamond','7188502861698656149');
INSERT INTO _migration_map VALUES ('20210124141845',22,'skills','1249519966980586191');
INSERT INTO _migration_map VALUES ('20210125202618',23,'purge_duplicate_items','10597564860189510441');
INSERT INTO _migration_map VALUES ('20210212054315',24,'remove_duplicate_possess_stick','10774303849135897742');
INSERT INTO _migration_map VALUES ('20210220191847',25,'starter_gear','7937903884108396352');
INSERT INTO _migration_map VALUES ('20210224230149',26,'weapon_replacements','16314806319051099277');
INSERT INTO _migration_map VALUES ('20210301053817',27,'armor_reorganization','17623676960765703100');
INSERT INTO _migration_map VALUES ('20210302023541',28,'fix_sturdy_red_backpack','10808562637001569925');
INSERT INTO _migration_map VALUES ('20210302041950',29,'fix_other_backpacks','3143452502889073613');
INSERT INTO _migration_map VALUES ('20210302182544',30,'fix_leather_set','5238543158379875836');
INSERT INTO _migration_map VALUES ('20210303195917',31,'fix_debug_armor','13228825131487923091');
INSERT INTO _migration_map VALUES ('20210306213310',32,'reset_sceptre_skills','626800208872263587');
INSERT INTO _migration_map VALUES ('20210329012510',33,'fix_amethyst_staff','11008696478673746982');
-- Create refinery_schema_history table
CREATE TABLE refinery_schema_history (
version INT4 PRIMARY KEY,
name VARCHAR(255),
applied_on VARCHAR(255),
checksum VARCHAR(255)
);
-- Migrate diesel migration records to refinery migrations table
INSERT INTO refinery_schema_history
SELECT m.refinery_version,
m.refinery_name,
'2021-03-27T00:00:00.000000000+00:00',
m.refinery_checksum
FROM _migration_map m
JOIN __diesel_schema_migrations d ON (d.version = m.diesel_version);
DROP TABLE __diesel_schema_migrations;"
)?;
transaction.commit()?;
info!("Successfully performed one-time diesel to refinery migration");
Ok(())
}

View File

@ -1,21 +1,19 @@
//! Consolidates Diesel and validation errors under a common error type
//! Consolidates rusqlite and validation errors under a common error type
extern crate diesel;
extern crate rusqlite;
use std::fmt;
#[derive(Debug)]
pub enum Error {
pub enum PersistenceError {
// An invalid asset was returned from the database
AssetError(String),
// The player has already reached the max character limit
CharacterLimitReached,
// An error occurred while establish a db connection
DatabaseConnectionError(diesel::ConnectionError),
// An error occurred while running migrations
DatabaseMigrationError(diesel_migrations::RunMigrationsError),
DatabaseConnectionError(rusqlite::Error),
// An error occurred when performing a database action
DatabaseError(diesel::result::Error),
DatabaseError(rusqlite::Error),
// Unable to load body or stats for a character
CharacterDataError,
SerializationError(serde_json::Error),
@ -23,14 +21,13 @@ pub enum Error {
OtherError(String),
}
impl fmt::Display for Error {
impl fmt::Display for PersistenceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", match self {
Self::AssetError(error) => error.to_string(),
Self::CharacterLimitReached => String::from("Character limit exceeded"),
Self::DatabaseError(error) => error.to_string(),
Self::DatabaseConnectionError(error) => error.to_string(),
Self::DatabaseMigrationError(error) => error.to_string(),
Self::CharacterDataError => String::from("Error while loading character data"),
Self::SerializationError(error) => error.to_string(),
Self::ConversionError(error) => error.to_string(),
@ -39,20 +36,12 @@ impl fmt::Display for Error {
}
}
impl From<diesel::result::Error> for Error {
fn from(error: diesel::result::Error) -> Error { Error::DatabaseError(error) }
impl From<rusqlite::Error> for PersistenceError {
fn from(error: rusqlite::Error) -> PersistenceError { PersistenceError::DatabaseError(error) }
}
impl From<diesel::ConnectionError> for Error {
fn from(error: diesel::ConnectionError) -> Error { Error::DatabaseConnectionError(error) }
}
impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Error { Error::SerializationError(error) }
}
impl From<diesel_migrations::RunMigrationsError> for Error {
fn from(error: diesel_migrations::RunMigrationsError) -> Error {
Error::DatabaseMigrationError(error)
impl From<serde_json::Error> for PersistenceError {
fn from(error: serde_json::Error) -> PersistenceError {
PersistenceError::SerializationError(error)
}
}

View File

@ -1,23 +1,21 @@
//! DB operations and schema migrations
//!
//! This code uses several [`Diesel ORM`](http://diesel.rs/) tools for DB operations:
//! - [`diesel-migrations`](https://docs.rs/diesel_migrations/1.4.0/diesel_migrations/)
//! for managing table migrations
//! - [`diesel-cli`](https://github.com/diesel-rs/diesel/tree/master/diesel_cli/)
//! for generating and testing migrations
pub(in crate::persistence) mod character;
pub mod character_loader;
pub mod character_updater;
mod error;
mod diesel_to_rusqlite;
pub mod error;
mod json_models;
mod models;
mod schema;
use common::comp;
use diesel::{connection::SimpleConnection, prelude::*};
use diesel_migrations::embed_migrations;
use std::{fs, path::Path};
use refinery::Report;
use rusqlite::{Connection, OpenFlags};
use std::{
path::PathBuf,
sync::{Arc, RwLock},
time::Duration,
};
use tracing::info;
/// A tuple of the components that are persisted to the DB for each character
@ -28,92 +26,144 @@ pub type PersistedComponents = (
Option<comp::Waypoint>,
);
// See: https://docs.rs/diesel_migrations/1.4.0/diesel_migrations/macro.embed_migrations.html
// See: https://docs.rs/refinery/0.5.0/refinery/macro.embed_migrations.html
// This macro is called at build-time, and produces the necessary migration info
// for the `embedded_migrations` call below.
//
// NOTE: Adding a useless comment to trigger the migrations being run. Alter
// when needed.
embed_migrations!();
struct TracingOut;
impl std::io::Write for TracingOut {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
info!("{}", String::from_utf8_lossy(buf));
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> { Ok(()) }
}
/// Runs any pending database migrations. This is executed during server startup
pub fn run_migrations(db_dir: &Path) -> Result<(), diesel_migrations::RunMigrationsError> {
let _ = fs::create_dir(format!("{}/", db_dir.display()));
embedded_migrations::run_with_output(
&establish_connection(db_dir)
.expect(
"If we cannot execute migrations, we should not be allowed to launch the server, \
so we don't populate it with bad data.",
)
.0,
&mut std::io::LineWriter::new(TracingOut),
)
// for the `run_migrations` call below.
mod embedded {
use refinery::embed_migrations;
embed_migrations!("./src/migrations");
}
/// A database connection blessed by Veloren.
pub struct VelorenConnection(SqliteConnection);
/// A transaction blessed by Veloren.
#[derive(Clone, Copy)]
pub struct VelorenTransaction<'a>(&'a SqliteConnection);
pub(crate) struct VelorenConnection {
connection: Connection,
sql_log_mode: SqlLogMode,
}
impl VelorenConnection {
/// Open a transaction in order to be able to run a set of queries against
/// the database. We require the use of a transaction, rather than
/// allowing direct session access, so that (1) we can control things
/// like the retry process (at a future date), and (2) to avoid
/// accidentally forgetting to open or reuse a transaction.
///
/// We could make things even more foolproof, but we restrict ourselves to
/// this for now.
pub fn transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
where
F: for<'a> FnOnce(VelorenTransaction<'a>) -> Result<T, E>,
E: From<diesel::result::Error>,
{
self.0.transaction(|| f(VelorenTransaction(&self.0)))
fn new(connection: Connection) -> Self {
Self {
connection,
sql_log_mode: SqlLogMode::Disabled,
}
}
impl<'a> core::ops::Deref for VelorenTransaction<'a> {
type Target = SqliteConnection;
fn deref(&self) -> &Self::Target { &self.0 }
/// Updates the SQLite log mode if DatabaseSetting.sql_log_mode has changed
pub fn update_log_mode(&mut self, database_settings: &Arc<RwLock<DatabaseSettings>>) {
let settings = database_settings
.read()
.expect("DatabaseSettings RwLock was poisoned");
if self.sql_log_mode == (*settings).sql_log_mode {
return;
}
pub fn establish_connection(db_dir: &Path) -> QueryResult<VelorenConnection> {
let database_url = format!("{}/db.sqlite", db_dir.display());
set_log_mode(&mut self.connection, (*settings).sql_log_mode);
self.sql_log_mode = (*settings).sql_log_mode;
let connection = SqliteConnection::establish(&database_url)
.unwrap_or_else(|_| panic!("Error connecting to {}", database_url));
info!(
"SQL log mode for connection changed to {:?}",
settings.sql_log_mode
);
}
}
fn set_log_mode(connection: &mut Connection, sql_log_mode: SqlLogMode) {
// Rusqlite's trace and profile logging are mutually exclusive and cannot be
// used together
match sql_log_mode {
SqlLogMode::Trace => {
connection.trace(Some(rusqlite_trace_callback));
},
SqlLogMode::Profile => {
connection.profile(Some(rusqlite_profile_callback));
},
SqlLogMode::Disabled => {
connection.trace(None);
connection.profile(None);
},
};
}
#[derive(Clone)]
pub struct DatabaseSettings {
pub db_dir: PathBuf,
pub sql_log_mode: SqlLogMode,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SqlLogMode {
/// Logging is disabled
Disabled,
/// Records timings for each SQL statement
Profile,
/// Prints all executed SQL statements
Trace,
}
/// Runs any pending database migrations. This is executed during server startup
pub fn run_migrations(settings: &DatabaseSettings) {
let mut conn = establish_connection(settings);
diesel_to_rusqlite::migrate_from_diesel(&mut conn)
.expect("One-time migration from Diesel to Refinery failed");
// If migrations fail to run, the server cannot start since the database will
// not be in the required state.
let report: Report = embedded::migrations::runner()
.set_abort_divergent(false)
.run(&mut conn.connection)
.expect("Database migrations failed, server startup aborted");
let applied_migrations = report.applied_migrations().len();
info!("Applied {} database migrations", applied_migrations);
}
// These callbacks use info logging because they are never enabled by default,
// only when explicitly turned on via CLI arguments or interactive CLI commands.
// Setting them to anything other than info would remove the ability to get SQL
// logging from a running server that wasn't started at higher than info.
fn rusqlite_trace_callback(log_message: &str) {
info!("{}", log_message);
}
fn rusqlite_profile_callback(log_message: &str, dur: Duration) {
info!("{} Duration: {:?}", log_message, dur);
}
pub(crate) fn establish_connection(settings: &DatabaseSettings) -> VelorenConnection {
let connection = Connection::open_with_flags(
&settings.db_dir.join("db.sqlite"),
OpenFlags::SQLITE_OPEN_PRIVATE_CACHE | OpenFlags::default(),
)
.unwrap_or_else(|err| {
panic!(
"Error connecting to {}, Error: {:?}",
settings.db_dir.join("db.sqlite").display(),
err
)
});
let mut veloren_connection = VelorenConnection::new(connection);
let connection = &mut veloren_connection.connection;
set_log_mode(connection, settings.sql_log_mode);
veloren_connection.sql_log_mode = settings.sql_log_mode;
rusqlite::vtab::array::load_module(&connection).expect("Failed to load sqlite array module");
connection.set_prepared_statement_cache_capacity(100);
// Use Write-Ahead-Logging for improved concurrency: https://sqlite.org/wal.html
// Set a busy timeout (in ms): https://sqlite.org/c3ref/busy_timeout.html
connection
.batch_execute(
"
PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
PRAGMA busy_timeout = 250;
",
)
.expect(
"Failed adding PRAGMA statements while establishing sqlite connection, including \
enabling foreign key constraints. We will not allow connecting to the server under \
these conditions.",
);
.pragma_update(None, "foreign_keys", &"ON")
.expect("Failed to set foreign_keys PRAGMA");
connection
.pragma_update(None, "journal_mode", &"WAL")
.expect("Failed to set journal_mode PRAGMA");
connection
.pragma_update(None, "busy_timeout", &"250")
.expect("Failed to set busy_timeout PRAGMA");
Ok(VelorenConnection(connection))
veloren_connection
}

View File

@ -1,25 +1,3 @@
extern crate serde_json;
use super::schema::{body, character, entity, item, skill, skill_group};
#[derive(Debug, Insertable, PartialEq)]
#[table_name = "entity"]
pub struct Entity {
pub entity_id: i64,
}
#[derive(Insertable)]
#[table_name = "character"]
pub struct NewCharacter<'a> {
pub character_id: i64,
pub player_uuid: &'a str,
pub alias: &'a str,
pub waypoint: Option<String>,
}
#[derive(Identifiable, Queryable, Debug)]
#[table_name = "character"]
#[primary_key(character_id)]
pub struct Character {
pub character_id: i64,
pub player_uuid: String,
@ -27,9 +5,7 @@ pub struct Character {
pub waypoint: Option<String>,
}
#[derive(Debug, Insertable, Queryable, AsChangeset)]
#[table_name = "item"]
#[primary_key(item_id)]
#[derive(Debug)]
pub struct Item {
pub item_id: i64,
pub parent_container_item_id: i64,
@ -38,27 +14,18 @@ pub struct Item {
pub position: String,
}
#[derive(Associations, Identifiable, Insertable, Queryable, Debug)]
#[primary_key(body_id)]
#[table_name = "body"]
pub struct Body {
pub body_id: i64,
pub variant: String,
pub body_data: String,
}
#[derive(Associations, Identifiable, Insertable, Queryable, Debug)]
#[primary_key(entity_id, skill_type)]
#[table_name = "skill"]
pub struct Skill {
pub entity_id: i64,
pub skill_type: String,
pub skill: String,
pub level: Option<i32>,
}
#[derive(Associations, Identifiable, Insertable, Queryable, Debug)]
#[primary_key(entity_id, skill_group_kind)]
#[table_name = "skill_group"]
pub struct SkillGroup {
pub entity_id: i64,
pub skill_group_kind: String,

View File

@ -1,55 +0,0 @@
table! {
body (body_id) {
body_id -> BigInt,
variant -> Text,
body_data -> Text,
}
}
table! {
character (character_id) {
character_id -> BigInt,
player_uuid -> Text,
alias -> Text,
waypoint -> Nullable<Text>,
}
}
table! {
entity (entity_id) {
entity_id -> BigInt,
}
}
table! {
item (item_id) {
item_id -> BigInt,
parent_container_item_id -> BigInt,
item_definition_id -> Text,
stack_size -> Integer,
position -> Text,
}
}
table! {
skill (entity_id, skill_type) {
entity_id -> BigInt,
#[sql_name = "skill"]
skill_type -> Text,
level -> Nullable<Integer>,
}
}
table! {
skill_group (entity_id, skill_group_kind) {
entity_id -> BigInt,
skill_group_kind -> Text,
exp -> Integer,
available_sp -> Integer,
earned_sp -> Integer,
}
}
joinable!(character -> body (character_id));
allow_tables_to_appear_in_same_query!(body, character, entity, item);

View File

@ -1,6 +1,10 @@
use crate::{
alias_validator::AliasValidator, character_creator, client::Client,
persistence::character_loader::CharacterLoader, presence::Presence, EditableSettings,
alias_validator::AliasValidator,
character_creator,
client::Client,
persistence::{character_loader::CharacterLoader, character_updater::CharacterUpdater},
presence::Presence,
EditableSettings,
};
use common::{
comp::{ChatType, Player, UnresolvedChatMsg},
@ -20,6 +24,7 @@ impl Sys {
entity: specs::Entity,
client: &Client,
character_loader: &ReadExpect<'_, CharacterLoader>,
character_updater: &ReadExpect<'_, CharacterUpdater>,
uids: &ReadStorage<'_, Uid>,
players: &ReadStorage<'_, Player>,
presences: &ReadStorage<'_, Presence>,
@ -40,6 +45,28 @@ impl Sys {
if let Some(player) = players.get(entity) {
if presences.contains(entity) {
debug!("player already ingame, aborting");
} else if character_updater
.characters_pending_logout()
.any(|x| x == character_id)
{
debug!("player recently logged out pending persistence, aborting");
client.send(ServerGeneral::CharacterDataLoadError(
"You have recently logged out, please wait a few seconds and try again"
.to_string(),
))?;
} else if character_updater.disconnect_all_clients_requested() {
// If we're in the middle of disconnecting all clients due to a persistence
// transaction failure, prevent new logins
// temporarily.
debug!(
"Rejecting player login while pending disconnection of all players is \
in progress"
);
client.send(ServerGeneral::CharacterDataLoadError(
"The server is currently recovering from an error, please wait a few \
seconds and try again"
.to_string(),
))?;
} else {
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
@ -127,6 +154,7 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, CharacterLoader>,
ReadExpect<'a, CharacterUpdater>,
ReadStorage<'a, Uid>,
ReadStorage<'a, Client>,
ReadStorage<'a, Player>,
@ -145,6 +173,7 @@ impl<'a> System<'a> for Sys {
entities,
server_event_bus,
character_loader,
character_updater,
uids,
clients,
players,
@ -162,6 +191,7 @@ impl<'a> System<'a> for Sys {
entity,
client,
&character_loader,
&character_updater,
&uids,
&players,
&presences,

View File

@ -2,7 +2,7 @@ use crate::{persistence::character_updater, presence::Presence, sys::SysSchedule
use common::comp::{Inventory, Stats, Waypoint};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::PresenceKind;
use specs::{Join, ReadExpect, ReadStorage, Write};
use specs::{Join, ReadStorage, Write, WriteExpect};
#[derive(Default)]
pub struct Sys;
@ -14,7 +14,7 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Stats>,
ReadStorage<'a, Inventory>,
ReadStorage<'a, Waypoint>,
ReadExpect<'a, character_updater::CharacterUpdater>,
WriteExpect<'a, character_updater::CharacterUpdater>,
Write<'a, SysScheduler<Self>>,
);
@ -29,7 +29,7 @@ impl<'a> System<'a> for Sys {
player_stats,
player_inventories,
player_waypoint,
updater,
mut updater,
mut scheduler,
): Self::SystemData,
) {

View File

@ -1,6 +1,9 @@
use common::clock::Clock;
use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};
use server::{Error as ServerError, Event, Input, Server};
use server::{
persistence::{DatabaseSettings, SqlLogMode},
Error as ServerError, Event, Input, Server,
};
use std::{
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
@ -95,6 +98,17 @@ impl Singleplayer {
let settings2 = settings.clone();
// Relative to data_dir
const PERSISTENCE_DB_DIR: &str = "saves";
let database_settings = DatabaseSettings {
db_dir: server_data_dir.join(PERSISTENCE_DB_DIR),
sql_log_mode: SqlLogMode::Disabled, /* Voxygen doesn't take in command-line arguments
* so SQL logging can't be enabled for
* singleplayer without changing this line
* manually */
};
let paused = Arc::new(AtomicBool::new(false));
let paused1 = Arc::clone(&paused);
@ -109,6 +123,7 @@ impl Singleplayer {
match Server::new(
settings2,
editable_settings,
database_settings,
&server_data_dir,
Arc::clone(&runtime),
) {