Client::new can now resolve DNS requests, better networking error messages

This commit is contained in:
Marcel Märtens 2021-02-22 00:48:30 +01:00
parent 1a7c179bbb
commit 3f5c64bec0
20 changed files with 394 additions and 187 deletions

2
Cargo.lock generated
View File

@ -5681,7 +5681,7 @@ dependencies = [
[[package]] [[package]]
name = "veloren-network-protocol" name = "veloren-network-protocol"
version = "0.5.0" version = "0.6.0"
dependencies = [ dependencies = [
"async-channel", "async-channel",
"async-trait", "async-trait",

View File

@ -48,6 +48,7 @@ https://veloren.net/account/."#,
"main.login.server_shut_down": "Server shut down", "main.login.server_shut_down": "Server shut down",
"main.login.already_logged_in": "You are already logged into the server.", "main.login.already_logged_in": "You are already logged into the server.",
"main.login.network_error": "Network error", "main.login.network_error": "Network error",
"main.login.network_wrong_version": "The server is running a different version than you are. Check your version and update your game.",
"main.login.failed_sending_request": "Request to Auth server failed", "main.login.failed_sending_request": "Request to Auth server failed",
"main.login.invalid_character": "The selected character is invalid", "main.login.invalid_character": "The selected character is invalid",
"main.login.client_crashed": "Client crashed", "main.login.client_crashed": "Client crashed",

View File

@ -5,14 +5,13 @@
use common::{clock::Clock, comp}; use common::{clock::Clock, comp};
use std::{ use std::{
io, io,
net::ToSocketAddrs,
sync::{mpsc, Arc}, sync::{mpsc, Arc},
thread, thread,
time::Duration, time::Duration,
}; };
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tracing::{error, info}; use tracing::{error, info};
use veloren_client::{Client, Event}; use veloren_client::{addr::ConnectionArgs, Client, Event};
const TPS: u64 = 10; // Low value is okay, just reading messages. const TPS: u64 = 10; // Low value is okay, just reading messages.
@ -50,11 +49,7 @@ fn main() {
// Create a client. // Create a client.
let mut client = runtime let mut client = runtime
.block_on(Client::new( .block_on(Client::new(
server_addr ConnectionArgs::HostnameAndOptionalPort(server_addr, false),
.to_socket_addrs()
.expect("Invalid server address")
.next()
.unwrap(),
None, None,
runtime2, runtime2,
)) ))

169
client/src/addr.rs Normal file
View File

@ -0,0 +1,169 @@
use std::net::SocketAddr;
use tokio::net::lookup_host;
use tracing::trace;
#[derive(Clone, Debug)]
pub enum ConnectionArgs {
/// <hostname/ip>:[<port>] + preferIpv6 flag
HostnameAndOptionalPort(String, bool),
IpAndPort(Vec<SocketAddr>),
Mpsc(u64),
}
impl ConnectionArgs {
const DEFAULT_PORT: u16 = 14004;
/// Parse ip address or resolves hostname, moves HostnameAndOptionalPort to
/// IpAndPort state.
/// Note: If you use an ipv6 address, the number after
/// the last colon will be used as the port unless you use [] around the
/// address.
pub async fn resolve(&mut self) -> Result<(), std::io::Error> {
if let ConnectionArgs::HostnameAndOptionalPort(server_address, prefer_ipv6) = self {
// 1. Try if server_address already contains a port
if let Ok(addr) = server_address.parse::<SocketAddr>() {
trace!("Server address with port found");
*self = ConnectionArgs::IpAndPort(vec![addr]);
return Ok(());
}
// 2, Try server_address and port
if let Ok(addr) =
format!("{}:{}", server_address, Self::DEFAULT_PORT).parse::<SocketAddr>()
{
trace!("Server address without port found");
*self = ConnectionArgs::IpAndPort(vec![addr]);
return Ok(());
}
// 3. Assert it's a hostname + port
let new = match lookup_host(server_address.to_string()).await {
Ok(s) => {
trace!("Host lookup succeeded");
Ok(Self::sort_ipv6(s, *prefer_ipv6))
},
Err(e) => {
// 4. Assume its a hostname without port
match lookup_host((server_address.to_string(), Self::DEFAULT_PORT)).await {
Ok(s) => {
trace!("Host lookup without ports succeeded");
Ok(Self::sort_ipv6(s, *prefer_ipv6))
},
Err(_) => Err(e),
}
},
}?;
*self = new;
}
Ok(())
}
fn sort_ipv6(s: impl Iterator<Item = SocketAddr>, prefer_ipv6: bool) -> Self {
let (mut first_addrs, mut second_addrs) =
s.partition::<Vec<_>, _>(|a| a.is_ipv6() == prefer_ipv6);
let addr = std::iter::Iterator::chain(first_addrs.drain(..), second_addrs.drain(..))
.collect::<Vec<_>>();
ConnectionArgs::IpAndPort(addr)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[tokio::test]
async fn keep_mpcs() {
let mut args = ConnectionArgs::Mpsc(1337);
assert!(args.resolve().await.is_ok());
assert!(matches!(args, ConnectionArgs::Mpsc(1337)));
}
#[tokio::test]
async fn keep_ip() {
let mut args = ConnectionArgs::IpAndPort(vec![SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
ConnectionArgs::DEFAULT_PORT,
)]);
assert!(args.resolve().await.is_ok());
assert!(matches!(args, ConnectionArgs::IpAndPort(..)));
}
#[tokio::test]
async fn resolve_localhost() {
let mut args = ConnectionArgs::HostnameAndOptionalPort("localhost".to_string(), false);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
assert_eq!(args[0].port(), 14004);
} else {
panic!("wrong resolution");
}
let mut args = ConnectionArgs::HostnameAndOptionalPort("localhost:666".to_string(), false);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].port(), 666);
} else {
panic!("wrong resolution");
}
}
#[tokio::test]
async fn resolve_ipv6() {
let mut args = ConnectionArgs::HostnameAndOptionalPort("localhost".to_string(), true);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].ip(), Ipv6Addr::LOCALHOST);
assert_eq!(args[0].port(), 14004);
} else {
panic!("wrong resolution");
}
}
#[tokio::test]
async fn resolve() {
let mut args = ConnectionArgs::HostnameAndOptionalPort("google.com".to_string(), false);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].port(), 14004);
} else {
panic!("wrong resolution");
}
let mut args = ConnectionArgs::HostnameAndOptionalPort("127.0.0.1".to_string(), false);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert_eq!(args.len(), 1);
assert_eq!(args[0].port(), 14004);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
} else {
panic!("wrong resolution");
}
let mut args = ConnectionArgs::HostnameAndOptionalPort("55.66.77.88".to_string(), false);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert_eq!(args.len(), 1);
assert_eq!(args[0].port(), 14004);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::new(55, 66, 77, 88)));
} else {
panic!("wrong resolution");
}
let mut args = ConnectionArgs::HostnameAndOptionalPort("127.0.0.1:776".to_string(), false);
assert!(args.resolve().await.is_ok());
if let ConnectionArgs::IpAndPort(args) = args {
assert_eq!(args.len(), 1);
assert_eq!(args[0].port(), 776);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
} else {
panic!("wrong resolution");
}
}
}

View File

@ -1,5 +1,5 @@
use authc::AuthClientError; use authc::AuthClientError;
pub use network::NetworkError; pub use network::{InitProtocolError, NetworkConnectError, NetworkError};
use network::{ParticipantError, StreamError}; use network::{ParticipantError, StreamError};
#[derive(Debug)] #[derive(Debug)]
@ -19,6 +19,8 @@ pub enum Error {
Banned(String), Banned(String),
/// Persisted character data is invalid or missing /// Persisted character data is invalid or missing
InvalidCharacter, InvalidCharacter,
/// is thrown when parsing the address or resolving Dns fails
DnsResolveFailed(String),
//TODO: InvalidAlias, //TODO: InvalidAlias,
Other(String), Other(String),
} }

View File

@ -1,7 +1,8 @@
#![deny(unsafe_code)] #![deny(unsafe_code)]
#![deny(clippy::clone_on_ref_ptr)] #![deny(clippy::clone_on_ref_ptr)]
#![feature(label_break_value, option_zip)] #![feature(label_break_value, option_zip, str_split_once)]
pub mod addr;
pub mod cmd; pub mod cmd;
pub mod error; pub mod error;
@ -14,6 +15,7 @@ pub use specs::{
Builder, DispatcherBuilder, Entity as EcsEntity, ReadStorage, WorldExt, Builder, DispatcherBuilder, Entity as EcsEntity, ReadStorage, WorldExt,
}; };
use crate::addr::ConnectionArgs;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use common::{ use common::{
character::{CharacterId, CharacterItem}, character::{CharacterId, CharacterItem},
@ -57,7 +59,6 @@ use rayon::prelude::*;
use specs::Component; use specs::Component;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
net::SocketAddr,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -183,14 +184,40 @@ pub struct CharacterList {
impl Client { impl Client {
/// Create a new `Client`. /// Create a new `Client`.
pub async fn new<A: Into<SocketAddr>>( pub async fn new(
addr: A, mut addr: ConnectionArgs,
view_distance: Option<u32>, view_distance: Option<u32>,
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let network = Network::new(Pid::new(), Arc::clone(&runtime)); let network = Network::new(Pid::new(), Arc::clone(&runtime));
let participant = network.connect(ProtocolAddr::Tcp(addr.into())).await?; if let Err(e) = addr.resolve().await {
error!(?e, "Dns resolve failed");
return Err(Error::DnsResolveFailed(e.to_string()));
}
let participant = match addr {
ConnectionArgs::HostnameAndOptionalPort(..) => {
unreachable!(".resolve() should have switched that state")
},
ConnectionArgs::IpAndPort(addrs) => {
// Try to connect to all IP's and return the first that works
let mut participant = None;
for addr in addrs {
match network.connect(ProtocolAddr::Tcp(addr)).await {
Ok(p) => {
participant = Some(Ok(p));
break;
},
Err(e) => participant = Some(Err(Error::NetworkErr(e))),
}
}
participant
.unwrap_or_else(|| Err(Error::Other("No Ip Addr provided".to_string())))?
},
ConnectionArgs::Mpsc(id) => network.connect(ProtocolAddr::Mpsc(id)).await?,
};
let stream = participant.opened().await?; let stream = participant.opened().await?;
let mut ping_stream = participant.opened().await?; let mut ping_stream = participant.opened().await?;
let mut register_stream = participant.opened().await?; let mut register_stream = participant.opened().await?;
@ -2019,18 +2046,22 @@ impl Drop for Client {
} else { } else {
trace!("no disconnect msg necessary as client wasn't registered") trace!("no disconnect msg necessary as client wasn't registered")
} }
if let Err(e) = self
.runtime tokio::task::block_in_place(|| {
.block_on(self.participant.take().unwrap().disconnect()) if let Err(e) = self
{ .runtime
warn!(?e, "error when disconnecting, couldn't send all data"); .block_on(self.participant.take().unwrap().disconnect())
} {
warn!(?e, "error when disconnecting, couldn't send all data");
}
});
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::net::SocketAddr;
#[test] #[test]
/// THIS TEST VERIFIES THE CONSTANT API. /// THIS TEST VERIFIES THE CONSTANT API.
@ -2048,8 +2079,11 @@ mod tests {
let view_distance: Option<u32> = None; let view_distance: Option<u32> = None;
let runtime = Arc::new(Runtime::new().unwrap()); let runtime = Arc::new(Runtime::new().unwrap());
let runtime2 = Arc::clone(&runtime); let runtime2 = Arc::clone(&runtime);
let veloren_client: Result<Client, Error> = let veloren_client: Result<Client, Error> = runtime.block_on(Client::new(
runtime.block_on(Client::new(socket, view_distance, runtime2)); ConnectionArgs::IpAndPort(vec![socket]),
view_distance,
runtime2,
));
let _ = veloren_client.map(|mut client| { let _ = veloren_client.map(|mut client| {
//register //register

View File

@ -1,7 +1,7 @@
[package] [package]
name = "veloren-network-protocol" name = "veloren-network-protocol"
description = "pure Protocol without any I/O itself" description = "pure Protocol without any I/O itself"
version = "0.5.0" version = "0.6.0"
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"] authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
edition = "2018" edition = "2018"

View File

@ -0,0 +1,54 @@
/// All possible Errors that can happen during Handshake [`InitProtocol`]
///
/// [`InitProtocol`]: crate::InitProtocol
#[derive(Debug, PartialEq)]
pub enum InitProtocolError {
Closed,
WrongMagicNumber([u8; 7]),
WrongVersion([u32; 3]),
}
/// When you return closed you must stay closed!
#[derive(Debug, PartialEq)]
pub enum ProtocolError {
Closed,
}
impl From<ProtocolError> for InitProtocolError {
fn from(err: ProtocolError) -> Self {
match err {
ProtocolError::Closed => InitProtocolError::Closed,
}
}
}
impl core::fmt::Display for InitProtocolError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
InitProtocolError::Closed => write!(f, "Channel closed"),
InitProtocolError::WrongMagicNumber(r) => write!(
f,
"Magic Number doesn't match, remote side send '{:?}' instead of '{:?}'",
&r,
&crate::types::VELOREN_MAGIC_NUMBER
),
InitProtocolError::WrongVersion(r) => write!(
f,
"Network doesn't match, remote side send '{:?}' we are on '{:?}'",
&r,
&crate::types::VELOREN_NETWORK_VERSION
),
}
}
}
impl core::fmt::Display for ProtocolError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ProtocolError::Closed => write!(f, "Channel closed"),
}
}
}
impl std::error::Error for InitProtocolError {}
impl std::error::Error for ProtocolError {}

View File

@ -1,10 +1,11 @@
use crate::{ use crate::{
error::{InitProtocolError, ProtocolError},
frame::InitFrame, frame::InitFrame,
types::{ types::{
Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
VELOREN_NETWORK_VERSION, VELOREN_NETWORK_VERSION,
}, },
InitProtocol, InitProtocolError, ProtocolError, InitProtocol,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use tracing::{debug, error, info, trace}; use tracing::{debug, error, info, trace};

View File

@ -49,6 +49,7 @@
//! [`RecvProtocol`]: crate::RecvProtocol //! [`RecvProtocol`]: crate::RecvProtocol
//! [`InitProtocol`]: crate::InitProtocol //! [`InitProtocol`]: crate::InitProtocol
mod error;
mod event; mod event;
mod frame; mod frame;
mod handshake; mod handshake;
@ -59,6 +60,7 @@ mod prio;
mod tcp; mod tcp;
mod types; mod types;
pub use error::{InitProtocolError, ProtocolError};
pub use event::ProtocolEvent; pub use event::ProtocolEvent;
pub use metrics::ProtocolMetricCache; pub use metrics::ProtocolMetricCache;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -150,27 +152,3 @@ pub trait UnreliableSink: Send {
type DataFormat; type DataFormat;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError>; async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError>;
} }
/// All possible Errors that can happen during Handshake [`InitProtocol`]
///
/// [`InitProtocol`]: crate::InitProtocol
#[derive(Debug, PartialEq)]
pub enum InitProtocolError {
Closed,
WrongMagicNumber([u8; 7]),
WrongVersion([u32; 3]),
}
/// When you return closed you must stay closed!
#[derive(Debug, PartialEq)]
pub enum ProtocolError {
Closed,
}
impl From<ProtocolError> for InitProtocolError {
fn from(err: ProtocolError) -> Self {
match err {
ProtocolError::Closed => InitProtocolError::Closed,
}
}
}

View File

@ -1,12 +1,13 @@
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use crate::metrics::RemoveReason; use crate::metrics::RemoveReason;
use crate::{ use crate::{
error::ProtocolError,
event::ProtocolEvent, event::ProtocolEvent,
frame::InitFrame, frame::InitFrame,
handshake::{ReliableDrain, ReliableSink}, handshake::{ReliableDrain, ReliableSink},
metrics::ProtocolMetricCache, metrics::ProtocolMetricCache,
types::Bandwidth, types::Bandwidth,
ProtocolError, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};

View File

@ -1,4 +1,5 @@
use crate::{ use crate::{
error::ProtocolError,
event::ProtocolEvent, event::ProtocolEvent,
frame::{ITFrame, InitFrame, OTFrame}, frame::{ITFrame, InitFrame, OTFrame},
handshake::{ReliableDrain, ReliableSink}, handshake::{ReliableDrain, ReliableSink},
@ -6,7 +7,7 @@ use crate::{
metrics::{ProtocolMetricCache, RemoveReason}, metrics::{ProtocolMetricCache, RemoveReason},
prio::PrioManager, prio::PrioManager,
types::{Bandwidth, Mid, Sid}, types::{Bandwidth, Mid, Sid},
ProtocolError, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut; use bytes::BytesMut;
@ -377,11 +378,12 @@ mod test_utils {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{ use crate::{
error::ProtocolError,
frame::OTFrame, frame::OTFrame,
metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason}, metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason},
tcp::test_utils::*, tcp::test_utils::*,
types::{Pid, Promises, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2}, types::{Pid, Promises, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2},
InitProtocol, ProtocolError, ProtocolEvent, RecvProtocol, SendProtocol, InitProtocol, ProtocolEvent, RecvProtocol, SendProtocol,
}; };
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};

View File

@ -1,12 +1,12 @@
use crate::{ use crate::{
message::{partial_eq_bincode, Message}, message::{partial_eq_bincode, Message},
participant::{A2bStreamOpen, S2bShutdownBparticipant}, participant::{A2bStreamOpen, S2bShutdownBparticipant},
scheduler::Scheduler, scheduler::{A2sConnect, Scheduler},
}; };
use bytes::Bytes; use bytes::Bytes;
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
use lz_fear::raw::DecodeError; use lz_fear::raw::DecodeError;
use network_protocol::{Bandwidth, Pid, Prio, Promises, Sid}; use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use prometheus::Registry; use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
@ -83,7 +83,16 @@ pub struct Stream {
pub enum NetworkError { pub enum NetworkError {
NetworkClosed, NetworkClosed,
ListenFailed(std::io::Error), ListenFailed(std::io::Error),
ConnectFailed(std::io::Error), ConnectFailed(NetworkConnectError),
}
/// Error type thrown by [`Networks`](Network) connect
#[derive(Debug)]
pub enum NetworkConnectError {
/// Either a Pid UUID clash or you are trying to hijack a connection
InvalidSecret,
Handshake(InitProtocolError),
Io(std::io::Error),
} }
/// Error type thrown by [`Participants`](Participant) methods /// Error type thrown by [`Participants`](Participant) methods
@ -149,10 +158,8 @@ pub struct Network {
local_pid: Pid, local_pid: Pid,
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>, participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>,
listen_sender: listen_sender: Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>>,
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<tokio::io::Result<()>>)>>, connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
connect_sender:
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>>,
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>, connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
shutdown_sender: Option<oneshot::Sender<()>>, shutdown_sender: Option<oneshot::Sender<()>>,
} }
@ -348,7 +355,8 @@ impl Network {
/// [`ProtocolAddres`]: crate::api::ProtocolAddr /// [`ProtocolAddres`]: crate::api::ProtocolAddr
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> { pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>(); let (pid_sender, pid_receiver) =
oneshot::channel::<Result<Participant, NetworkConnectError>>();
debug!(?address, "Connect to address"); debug!(?address, "Connect to address");
self.connect_sender self.connect_sender
.lock() .lock()
@ -1147,6 +1155,18 @@ impl core::fmt::Display for NetworkError {
} }
} }
impl core::fmt::Display for NetworkConnectError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
NetworkConnectError::Io(e) => write!(f, "Io error: {}", e),
NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e),
NetworkConnectError::InvalidSecret => {
write!(f, "You specified the wrong secret on your second channel")
},
}
}
}
/// implementing PartialEq as it's super convenient in tests /// implementing PartialEq as it's super convenient in tests
impl core::cmp::PartialEq for StreamError { impl core::cmp::PartialEq for StreamError {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
@ -1177,3 +1197,4 @@ impl core::cmp::PartialEq for StreamError {
impl std::error::Error for StreamError {} impl std::error::Error for StreamError {}
impl std::error::Error for ParticipantError {} impl std::error::Error for ParticipantError {}
impl std::error::Error for NetworkError {} impl std::error::Error for NetworkError {}
impl std::error::Error for NetworkConnectError {}

View File

@ -107,7 +107,8 @@ mod participant;
mod scheduler; mod scheduler;
pub use api::{ pub use api::{
Network, NetworkError, Participant, ParticipantError, ProtocolAddr, Stream, StreamError, Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr,
Stream, StreamError,
}; };
pub use message::Message; pub use message::Message;
pub use network_protocol::{Pid, Promises}; pub use network_protocol::{InitProtocolError, Pid, Promises};

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
api::{Participant, ProtocolAddr}, api::{NetworkConnectError, Participant, ProtocolAddr},
channel::Protocols, channel::Protocols,
metrics::NetworkMetrics, metrics::NetworkMetrics,
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant}, participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant},
@ -47,7 +47,10 @@ struct ParticipantInfo {
} }
type A2sListen = (ProtocolAddr, oneshot::Sender<io::Result<()>>); type A2sListen = (ProtocolAddr, oneshot::Sender<io::Result<()>>);
type A2sConnect = (ProtocolAddr, oneshot::Sender<io::Result<Participant>>); pub(crate) type A2sConnect = (
ProtocolAddr,
oneshot::Sender<Result<Participant, NetworkConnectError>>,
);
type A2sDisconnect = (Pid, S2bShutdownBparticipant); type A2sDisconnect = (Pid, S2bShutdownBparticipant);
type S2sMpscConnect = ( type S2sMpscConnect = (
mpsc::Sender<MpscMsg>, mpsc::Sender<MpscMsg>,
@ -196,13 +199,7 @@ impl Scheduler {
trace!("Stop listen_mgr"); trace!("Stop listen_mgr");
} }
async fn connect_mgr( async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver<A2sConnect>) {
&self,
mut a2s_connect_r: mpsc::UnboundedReceiver<(
ProtocolAddr,
oneshot::Sender<io::Result<Participant>>,
)>,
) {
trace!("Start connect_mgr"); trace!("Start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { while let Some((addr, pid_sender)) = a2s_connect_r.recv().await {
let (protocol, cid, handshake) = match addr { let (protocol, cid, handshake) = match addr {
@ -215,7 +212,7 @@ impl Scheduler {
let stream = match net::TcpStream::connect(addr).await { let stream = match net::TcpStream::connect(addr).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
pid_sender.send(Err(e)).unwrap(); pid_sender.send(Err(NetworkConnectError::Io(e))).unwrap();
continue; continue;
}, },
}; };
@ -232,10 +229,10 @@ impl Scheduler {
Some(s) => s.clone(), Some(s) => s.clone(),
None => { None => {
pid_sender pid_sender
.send(Err(std::io::Error::new( .send(Err(NetworkConnectError::Io(std::io::Error::new(
std::io::ErrorKind::NotConnected, std::io::ErrorKind::NotConnected,
"no mpsc listen on this addr", "no mpsc listen on this addr",
))) ))))
.unwrap(); .unwrap();
continue; continue;
}, },
@ -543,7 +540,7 @@ impl Scheduler {
&self, &self,
mut protocol: Protocols, mut protocol: Protocols,
cid: Cid, cid: Cid,
s2a_return_pid_s: Option<oneshot::Sender<io::Result<Participant>>>, s2a_return_pid_s: Option<oneshot::Sender<Result<Participant, NetworkConnectError>>>,
send_handshake: bool, send_handshake: bool,
) { ) {
//channels are unknown till PID is known! //channels are unknown till PID is known!
@ -647,10 +644,7 @@ impl Scheduler {
if let Some(pid_oneshot) = s2a_return_pid_s { if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
pid_oneshot pid_oneshot
.send(Err(std::io::Error::new( .send(Err(NetworkConnectError::InvalidSecret))
std::io::ErrorKind::PermissionDenied,
"invalid secret, denying connection",
)))
.unwrap(); .unwrap();
} }
return; return;
@ -670,10 +664,7 @@ impl Scheduler {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
trace!(?cid, "returning the Err to api who requested the connect"); trace!(?cid, "returning the Err to api who requested the connect");
pid_oneshot pid_oneshot
.send(Err(std::io::Error::new( .send(Err(NetworkConnectError::Handshake(e)))
std::io::ErrorKind::PermissionDenied,
"Handshake failed, denying connection",
)))
.unwrap(); .unwrap();
} }
}, },

View File

@ -40,7 +40,7 @@ impl ChunkGenerator {
&mut self, &mut self,
entity: Option<EcsEntity>, entity: Option<EcsEntity>,
key: Vec2<i32>, key: Vec2<i32>,
runtime: &mut Arc<Runtime>, runtime: &Runtime,
world: Arc<World>, world: Arc<World>,
index: IndexOwned, index: IndexOwned,
) { ) {

View File

@ -12,7 +12,7 @@ use common::{
use common_net::msg::{PlayerListUpdate, PresenceKind, ServerGeneral}; use common_net::msg::{PlayerListUpdate, PresenceKind, ServerGeneral};
use common_sys::state::State; use common_sys::state::State;
use specs::{saveload::MarkerAllocator, Builder, Entity as EcsEntity, WorldExt}; use specs::{saveload::MarkerAllocator, Builder, Entity as EcsEntity, WorldExt};
use tracing::*; use tracing::{debug, error, trace, warn, Instrument};
pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
span!(_guard, "handle_exit_ingame"); span!(_guard, "handle_exit_ingame");

View File

@ -1008,7 +1008,7 @@ impl Server {
.generate_chunk( .generate_chunk(
Some(entity), Some(entity),
key, key,
&mut self.runtime, &self.runtime,
Arc::clone(&self.world), Arc::clone(&self.world),
self.index.clone(), self.index.clone(),
); );

View File

@ -1,27 +1,23 @@
use client::{ use client::{
error::{Error as ClientError, NetworkError}, addr::ConnectionArgs,
error::{Error as ClientError, NetworkConnectError, NetworkError},
Client, Client,
}; };
use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError};
use std::{ use std::{
net::SocketAddr,
sync::{ sync::{
atomic::{AtomicBool, AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
time::Duration, time::Duration,
}; };
use tokio::{net::lookup_host, runtime}; use tokio::runtime;
use tracing::{trace, warn}; use tracing::{trace, warn};
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
// Error parsing input string or error resolving host name.
BadAddress(std::io::Error),
// Parsing/host name resolution successful but there was an error within the client.
ClientError(ClientError),
// Parsing yielded an empty iterator (specifically to_socket_addrs()).
NoAddress, NoAddress,
ClientError(ClientError),
ClientCrashed, ClientCrashed,
} }
@ -40,20 +36,17 @@ pub struct ClientInit {
rx: Receiver<Msg>, rx: Receiver<Msg>,
trust_tx: Sender<AuthTrust>, trust_tx: Sender<AuthTrust>,
cancel: Arc<AtomicBool>, cancel: Arc<AtomicBool>,
_runtime: Arc<runtime::Runtime>,
} }
impl ClientInit { impl ClientInit {
#[allow(clippy::op_ref)] // TODO: Pending review in #587 #[allow(clippy::op_ref)] // TODO: Pending review in #587
#[allow(clippy::or_fun_call)] // TODO: Pending review in #587 #[allow(clippy::or_fun_call)] // TODO: Pending review in #587
pub fn new( pub fn new(
connection_args: (String, u16, bool), connection_args: ConnectionArgs,
username: String, username: String,
view_distance: Option<u32>, view_distance: Option<u32>,
password: String, password: String,
runtime: Option<Arc<runtime::Runtime>>, runtime: Option<Arc<runtime::Runtime>>,
) -> Self { ) -> Self {
let (server_address, port, prefer_ipv6) = connection_args;
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let (trust_tx, trust_rx) = unbounded(); let (trust_tx, trust_rx) = unbounded();
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
@ -77,13 +70,14 @@ impl ClientInit {
let runtime2 = Arc::clone(&runtime); let runtime2 = Arc::clone(&runtime);
runtime.spawn(async move { runtime.spawn(async move {
let addresses = match Self::resolve(server_address, port, prefer_ipv6).await { let trust_fn = |auth_server: &str| {
Ok(a) => a, let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string()));
Err(e) => { trust_rx
let _ = tx.send(Msg::Done(Err(Error::BadAddress(e)))); .recv()
return; .map(|AuthTrust(server, trust)| trust && &server == auth_server)
}, .unwrap_or(false)
}; };
let mut last_err = None; let mut last_err = None;
const FOUR_MINUTES_RETRIES: u64 = 48; const FOUR_MINUTES_RETRIES: u64 = 48;
@ -91,97 +85,51 @@ impl ClientInit {
if cancel2.load(Ordering::Relaxed) { if cancel2.load(Ordering::Relaxed) {
break; break;
} }
for socket_addr in &addresses { match Client::new(
match Client::new(*socket_addr, view_distance, Arc::clone(&runtime2)).await { connection_args.clone(),
Ok(mut client) => { view_distance,
if let Err(e) = client Arc::clone(&runtime2),
.register(username, password, |auth_server| { )
let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); .await
trust_rx {
.recv() Ok(mut client) => {
.map(|AuthTrust(server, trust)| { if let Err(e) = client.register(username, password, trust_fn).await {
trust && &server == auth_server
})
.unwrap_or(false)
})
.await
{
last_err = Some(Error::ClientError(e));
break 'tries;
}
let _ = tx.send(Msg::Done(Ok(client)));
return;
},
Err(ClientError::NetworkErr(NetworkError::ConnectFailed(e))) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
warn!(?e, "Cannot connect to server: Incompatible version");
last_err = Some(Error::ClientError(ClientError::NetworkErr(
NetworkError::ConnectFailed(e),
)));
break 'tries;
} else {
warn!(?e, "Failed to connect to the server. Retrying...");
}
},
Err(e) => {
trace!(?e, "Aborting server connection attempt");
last_err = Some(Error::ClientError(e)); last_err = Some(Error::ClientError(e));
break 'tries; break 'tries;
}, }
} let _ = tx.send(Msg::Done(Ok(client)));
return;
},
Err(ClientError::NetworkErr(NetworkError::ConnectFailed(
NetworkConnectError::Io(e),
))) => {
warn!(?e, "Failed to connect to the server. Retrying...");
},
Err(e) => {
trace!(?e, "Aborting server connection attempt");
last_err = Some(Error::ClientError(e));
break 'tries;
},
} }
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
} }
// Parsing/host name resolution successful but no connection succeeded. // Parsing/host name resolution successful but no connection succeeded.
let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::NoAddress)))); let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::NoAddress))));
//Safe drop runtime
tokio::task::block_in_place(move || {
drop(runtime2);
});
}); });
ClientInit { ClientInit {
rx, rx,
trust_tx, trust_tx,
cancel, cancel,
_runtime: runtime,
} }
} }
/// Parse ip address or resolves hostname.
/// Note: if you use an ipv6 address, the number after the last colon will
/// be used as the port unless you use [] around the address.
async fn resolve(
server_address: String,
port: u16,
prefer_ipv6: bool,
) -> Result<Vec<SocketAddr>, std::io::Error> {
// 1. try if server_address already contains a port
if let Ok(addr) = server_address.parse::<SocketAddr>() {
warn!("please don't add port directly to server_address");
return Ok(vec![addr]);
}
// 2, try server_address and port
if let Ok(addr) = format!("{}:{}", server_address, port).parse::<SocketAddr>() {
return Ok(vec![addr]);
}
// 3. do DNS call
let (mut first_addrs, mut second_addrs) = match lookup_host(server_address).await {
Ok(s) => s.partition::<Vec<_>, _>(|a| a.is_ipv6() == prefer_ipv6),
Err(e) => {
return Err(e);
},
};
Ok(
std::iter::Iterator::chain(first_addrs.drain(..), second_addrs.drain(..))
.map(|mut addr| {
addr.set_port(port);
addr
})
.collect(),
)
}
/// Poll if the thread is complete. /// Poll if the thread is complete.
/// Returns None if the thread is still running, otherwise returns the /// Returns None if the thread is still running, otherwise returns the
/// Result of client creation. /// Result of client creation.

View File

@ -11,6 +11,10 @@ use crate::{
window::Event, window::Event,
Direction, GlobalState, PlayState, PlayStateResult, Direction, GlobalState, PlayState, PlayStateResult,
}; };
use client::{
addr::ConnectionArgs,
error::{InitProtocolError, NetworkConnectError, NetworkError},
};
use client_init::{ClientInit, Error as InitError, Msg as InitMsg}; use client_init::{ClientInit, Error as InitError, Msg as InitMsg};
use common::{assets::AssetExt, comp, span}; use common::{assets::AssetExt, comp, span};
use std::sync::Arc; use std::sync::Arc;
@ -34,8 +38,6 @@ impl MainMenuState {
} }
} }
const DEFAULT_PORT: u16 = 14004;
impl PlayState for MainMenuState { impl PlayState for MainMenuState {
fn enter(&mut self, global_state: &mut GlobalState, _: Direction) { fn enter(&mut self, global_state: &mut GlobalState, _: Direction) {
// Kick off title music // Kick off title music
@ -74,8 +76,7 @@ impl PlayState for MainMenuState {
&mut global_state.info_message, &mut global_state.info_message,
"singleplayer".to_owned(), "singleplayer".to_owned(),
"".to_owned(), "".to_owned(),
server_settings.gameserver_address.ip().to_string(), ConnectionArgs::IpAndPort(vec![server_settings.gameserver_address]),
server_settings.gameserver_address.port(),
&mut self.client_init, &mut self.client_init,
Some(runtime), Some(runtime),
); );
@ -122,10 +123,15 @@ impl PlayState for MainMenuState {
self.client_init = None; self.client_init = None;
global_state.info_message = Some({ global_state.info_message = Some({
let err = match err { let err = match err {
InitError::BadAddress(_) | InitError::NoAddress => { InitError::NoAddress => {
localized_strings.get("main.login.server_not_found").into() localized_strings.get("main.login.server_not_found").into()
}, },
InitError::ClientError(err) => match err { InitError::ClientError(err) => match err {
client::Error::DnsResolveFailed(reason) => format!(
"{}: {}",
localized_strings.get("main.login.server_not_found"),
reason
),
client::Error::AuthErr(e) => format!( client::Error::AuthErr(e) => format!(
"{}: {}", "{}: {}",
localized_strings.get("main.login.authentication_error"), localized_strings.get("main.login.authentication_error"),
@ -160,6 +166,11 @@ impl PlayState for MainMenuState {
client::Error::InvalidCharacter => { client::Error::InvalidCharacter => {
localized_strings.get("main.login.invalid_character").into() localized_strings.get("main.login.invalid_character").into()
}, },
client::Error::NetworkErr(NetworkError::ConnectFailed(
NetworkConnectError::Handshake(InitProtocolError::WrongVersion(_)),
)) => localized_strings
.get("main.login.network_wrong_version")
.into(),
client::Error::NetworkErr(e) => format!( client::Error::NetworkErr(e) => format!(
"{}: {:?}", "{}: {:?}",
localized_strings.get("main.login.network_error"), localized_strings.get("main.login.network_error"),
@ -247,8 +258,7 @@ impl PlayState for MainMenuState {
&mut global_state.info_message, &mut global_state.info_message,
username, username,
password, password,
server_address, ConnectionArgs::HostnameAndOptionalPort(server_address, false),
DEFAULT_PORT,
&mut self.client_init, &mut self.client_init,
None, None,
); );
@ -321,8 +331,7 @@ fn attempt_login(
info_message: &mut Option<String>, info_message: &mut Option<String>,
username: String, username: String,
password: String, password: String,
server_address: String, connection_args: ConnectionArgs,
server_port: u16,
client_init: &mut Option<ClientInit>, client_init: &mut Option<ClientInit>,
runtime: Option<Arc<runtime::Runtime>>, runtime: Option<Arc<runtime::Runtime>>,
) { ) {
@ -330,7 +339,7 @@ fn attempt_login(
// Don't try to connect if there is already a connection in progress. // Don't try to connect if there is already a connection in progress.
if client_init.is_none() { if client_init.is_none() {
*client_init = Some(ClientInit::new( *client_init = Some(ClientInit::new(
(server_address, server_port, false), connection_args,
username, username,
Some(settings.graphics.view_distance), Some(settings.graphics.view_distance),
password, password,