switch from serde to manually for speed, remove async_serde

- removing async_serde as it seems to be not usefull
  the idea was because deserialising is slow parallising it could speed up.
  Whoever we need to keep the order of frames, (at least for controlframes) so serialising in threads would be quite complicated.
  Also serialisation is quite fast, about 1 Gbit/s such speed is enough for messaging, it's more important to serve parallel streams better.
  Thats why i am removing async serde coding for now
- frames are no longer serialized by serde, by byte by byte manually, increadible speed upgrade
- more metrics
- switch channel_creator into for_each_concurrent
- removing some pool.spwan_ok() as they dont allow me to use self
- reduce features needed
This commit is contained in:
Marcel Märtens 2020-04-24 12:56:04 +02:00
parent 2ee18b1fd8
commit 661060808d
18 changed files with 971 additions and 503 deletions

38
Cargo.lock generated
View File

@ -4836,21 +4836,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7c6b59d116d218cb2d990eb06b77b64043e0268ef7323aae63d8b30ae462923"
dependencies = [
"cfg-if",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c"
dependencies = [
"proc-macro2 1.0.17",
"quote 1.0.6",
"syn 1.0.27",
]
[[package]]
name = "tracing-core"
version = "0.1.10"
@ -4870,27 +4858,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9"
dependencies = [
"lazy_static",
"log 0.4.8",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.5"
@ -4902,13 +4869,9 @@ dependencies = [
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec 1.4.0",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
@ -5292,7 +5255,6 @@ version = "0.1.0"
dependencies = [
"async-std",
"bincode",
"byteorder 1.3.4",
"futures 0.3.5",
"lazy_static",
"prometheus",

15
network/Cargo.lock generated
View File

@ -590,19 +590,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "serde"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_derive"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_json"
@ -760,7 +747,6 @@ version = "0.1.0"
dependencies = [
"async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -889,7 +875,6 @@ dependencies = [
"checksum ryu 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76"
"checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
"checksum serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399"
"checksum serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c"
"checksum serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)" = "da07b57ee2623368351e9a0488bb0b261322a15a6e0ae53e243cbdc0f4208da9"
"checksum sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"

View File

@ -12,19 +12,18 @@ edition = "2018"
uvth = "3.1"
#serialisation
bincode = "1.2"
serde = { version = "1.0", features = ["derive"] }
byteorder = "1.3"
serde = { version = "1.0" }
#sending
async-std = { version = "1.5", features = ["std"] }
async-std = { version = "~1.5", features = ["std"] }
#tracing and metrics
tracing = "0.1"
tracing = { version = "0.1", default-features = false }
tracing-futures = "0.2"
prometheus = "0.7"
#async
futures = { version = "0.3", features = ["thread-pool"] }
#mpsc channel registry
lazy_static = "1.4"
rand = "0.7"
lazy_static = { version = "1.4", default-features = false }
rand = { version = "0.7" }
[dev-dependencies]
tracing-subscriber = "0.2.3"
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }

View File

@ -16,4 +16,6 @@ futures = "0.3"
tracing = "0.1"
tracing-subscriber = "0.2.3"
bincode = "1.2"
serde = "1.0"
prometheus = "0.7"
rouille = "3.0.0"
serde = { version = "1.0", features = ["derive"] }

View File

@ -1,10 +1,13 @@
mod metrics;
use clap::{App, Arg};
use futures::executor::block_on;
use network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
use network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED, MessageBuffer};
use serde::{Deserialize, Serialize};
use std::{
thread,
time::{Duration, Instant},
sync::Arc,
};
use tracing::*;
use tracing_subscriber::EnvFilter;
@ -55,7 +58,7 @@ fn main() {
.long("protocol")
.takes_value(true)
.default_value("tcp")
.possible_values(&["tcp", "upd", "mpsc"])
.possible_values(&["tcp", "udp", "mpsc"])
.help(
"underlying protocol used for this test, mpsc can only combined with mode=both",
),
@ -72,9 +75,10 @@ fn main() {
.get_matches();
let trace = matches.value_of("trace").unwrap();
let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap()).add_directive("veloren_network::participant=debug".parse().unwrap()).add_directive("veloren_network::api=debug".parse().unwrap());
let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap())/*
.add_directive("veloren_network::participant=debug".parse().unwrap()).add_directive("veloren_network::api=debug".parse().unwrap())*/;
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.with_max_level(Level::ERROR)
.with_env_filter(filter)
.init();
@ -86,15 +90,16 @@ fn main() {
_ => panic!("invalid mode, run --help!"),
};
let mut m = metrics::SimpleMetrics::new();
let mut background = None;
match matches.value_of("mode") {
Some("server") => server(address),
Some("client") => client(address),
Some("client") => client(address, &mut m),
Some("both") => {
let address1 = address.clone();
background = Some(thread::spawn(|| server(address1)));
thread::sleep(Duration::from_millis(200)); //start client after server
client(address)
client(address, &mut m);
},
_ => panic!("invalid mode, run --help!"),
};
@ -105,7 +110,7 @@ fn main() {
fn server(address: Address) {
let thread_pool = ThreadPoolBuilder::new().build();
let server = Network::new(Pid::new(), &thread_pool);
let server = Network::new(Pid::new(), &thread_pool, None);
block_on(server.listen(address)).unwrap();
loop {
@ -115,7 +120,7 @@ fn server(address: Address) {
block_on(async {
let mut last = Instant::now();
let mut id = 0u64;
while let Ok(_msg) = s1.recv::<Msg>().await {
while let Ok(_msg) = s1.recv_raw().await {
id += 1;
if id.rem_euclid(1000000) == 0 {
let new = Instant::now();
@ -129,20 +134,23 @@ fn server(address: Address) {
}
}
fn client(address: Address) {
fn client(address: Address, metrics: &mut metrics::SimpleMetrics) {
let thread_pool = ThreadPoolBuilder::new().build();
let client = Network::new(Pid::new(), &thread_pool);
let client = Network::new(Pid::new(), &thread_pool, Some(metrics.registry()));
metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap();
let p1 = block_on(client.connect(address.clone())).unwrap(); //remote representation of p1
let mut s1 = block_on(p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY)).unwrap(); //remote representation of s1
let mut last = Instant::now();
let mut id = 0u64;
loop {
s1.send(Msg::Ping {
let raw_msg = Arc::new(MessageBuffer{
data: bincode::serialize(&Msg::Ping {
id,
data: vec![0; 1000],
})
.unwrap();
}).unwrap(),
});
loop {
s1.send_raw(raw_msg.clone()).unwrap();
id += 1;
if id.rem_euclid(1000000) == 0 {
let new = Instant::now();

View File

@ -0,0 +1,83 @@
use prometheus::{Encoder, Gauge, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder};
use rouille::{router, Server};
use std::{
convert::TryInto,
error::Error,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub struct SimpleMetrics {
running: Arc<AtomicBool>,
handle: Option<thread::JoinHandle<()>>,
registry: Option<Registry>,
}
impl SimpleMetrics {
pub fn new() -> Self {
let running = Arc::new(AtomicBool::new(false));
let registry = Some(Registry::new());
Self {
running,
handle: None,
registry,
}
}
pub fn registry(&self) -> &Registry {
match self.registry {
Some(ref r) => r,
None => panic!("You cannot longer register new metrics after the server has started!"),
}
}
pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box<dyn Error>> {
self.running.store(true, Ordering::Relaxed);
let running2 = self.running.clone();
let registry = self
.registry
.take()
.expect("ServerMetrics must be already started");
//TODO: make this a job
self.handle = Some(thread::spawn(move || {
let server = Server::new(addr, move |request| {
router!(request,
(GET) (/metrics) => {
let encoder = TextEncoder::new();
let mut buffer = vec![];
let mf = registry.gather();
encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text.");
rouille::Response::text(String::from_utf8(buffer).expect("Failed to parse bytes as a string."))
},
_ => rouille::Response::empty_404()
)
})
.expect("Failed to start server");
while running2.load(Ordering::Relaxed) {
server.poll();
// Poll at 10Hz
thread::sleep(Duration::from_millis(100));
}
}));
Ok(())
}
}
impl Drop for SimpleMetrics {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
let handle = self.handle.take();
handle
.expect("ServerMetrics worker handle does not exist.")
.join()
.expect("Error shutting down prometheus metric exporter");
}
}

View File

@ -1,5 +1,5 @@
use crate::{
message::{self, InCommingMessage, OutGoingMessage},
message::{self, InCommingMessage, MessageBuffer, OutGoingMessage},
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Requestor::User, Sid},
};
@ -9,6 +9,7 @@ use futures::{
sink::SinkExt,
stream::StreamExt,
};
use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
@ -78,12 +79,12 @@ pub struct Network {
}
impl Network {
pub fn new(participant_id: Pid, thread_pool: &ThreadPool) -> Self {
pub fn new(participant_id: Pid, thread_pool: &ThreadPool, registry: Option<&Registry>) -> Self {
//let participants = RwLock::new(vec![]);
let p = participant_id;
debug!(?p, ?User, "starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(participant_id);
Scheduler::new(participant_id, registry);
thread_pool.execute(move || {
trace!(?p, ?User, "starting sheduler in own thread");
let _handle = task::block_on(
@ -272,10 +273,14 @@ impl Stream {
}
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
let messagebuffer = Arc::new(message::serialize(&msg));
self.send_raw(Arc::new(message::serialize(&msg)))
}
pub fn send_raw(&mut self, messagebuffer: Arc<MessageBuffer>) -> Result<(), StreamError> {
if self.closed.load(Ordering::Relaxed) {
return Err(StreamError::StreamClosed);
}
debug!(?messagebuffer, ?User, "sending a message");
self.msg_send_sender
.send((self.prio, self.pid, self.sid, OutGoingMessage {
buffer: messagebuffer,
@ -288,13 +293,16 @@ impl Stream {
}
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
Ok(message::deserialize(self.recv_raw().await?))
}
pub async fn recv_raw(&mut self) -> Result<MessageBuffer, StreamError> {
//no need to access self.closed here, as when this stream is closed the Channel
// is closed which will trigger a None
let msg = self.msg_recv_receiver.next().await?;
info!(?msg, "delivering a message");
Ok(message::deserialize(msg.buffer))
info!(?msg, ?User, "delivering a message");
Ok(msg.buffer)
}
//Todo: ERROR: TODO: implement me and the disconnecting!
}
impl Drop for Network {

View File

@ -1,178 +0,0 @@
/*
use ::uvth::ThreadPool;
use bincode;
use serde::{de::DeserializeOwned, Serialize};
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
};
pub struct SerializeFuture {
shared_state: Arc<Mutex<SerializeSharedState>>,
}
struct SerializeSharedState {
result: Option<Vec<u8>>,
waker: Option<Waker>,
}
pub struct DeserializeFuture<M: 'static + Send + DeserializeOwned> {
shared_state: Arc<Mutex<DeserializeSharedState<M>>>,
}
struct DeserializeSharedState<M: 'static + Send + DeserializeOwned> {
result: Option<M>,
waker: Option<Waker>,
}
impl Future for SerializeFuture {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.result.is_some() {
Poll::Ready(shared_state.result.take().unwrap())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl SerializeFuture {
pub fn new<M: 'static + Send + Serialize>(message: M, pool: &ThreadPool) -> Self {
let shared_state = Arc::new(Mutex::new(SerializeSharedState {
result: None,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
pool.execute(move || {
let mut writer = {
let actual_size = bincode::serialized_size(&message).unwrap();
Vec::<u8>::with_capacity(actual_size as usize)
};
if let Err(e) = bincode::serialize_into(&mut writer, &message) {
panic!(
"bincode serialize error, probably undefined behavior somewhere else, check \
the possible error types of `bincode::serialize_into`: {}",
e
);
};
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.result = Some(writer);
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
Self { shared_state }
}
}
impl<M: 'static + Send + DeserializeOwned> Future for DeserializeFuture<M> {
type Output = M;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.result.is_some() {
Poll::Ready(shared_state.result.take().unwrap())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl<M: 'static + Send + DeserializeOwned> DeserializeFuture<M> {
pub fn new(data: Vec<u8>, pool: &ThreadPool) -> Self {
let shared_state = Arc::new(Mutex::new(DeserializeSharedState {
result: None,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
pool.execute(move || {
let decoded: M = bincode::deserialize(data.as_slice()).unwrap();
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.result = Some(decoded);
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
Self { shared_state }
}
}
*/
/*
#[cfg(test)]
mod tests {
use crate::{
async_serde::*,
message::{MessageBuffer, OutGoingMessage},
types::{Frame, Sid},
};
use std::{collections::VecDeque, sync::Arc};
use uvth::ThreadPoolBuilder;
use async_std::{
io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
};
#[macro_use] use futures;
async fn tick_tock(msg: String, pool: &ThreadPool) {
let serialized = SerializeFuture::new(msg.clone(), pool).await;
let deserialized = DeserializeFuture::<String>::new(serialized, pool).await;
assert_eq!(msg, deserialized)
}
#[test]
fn multiple_serialize() {
let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string();
let pool = ThreadPoolBuilder::new().build();
let (r1, r2, r3) = task::block_on(async {
let s1 = SerializeFuture::new(msg.clone(), &pool);
let s2 = SerializeFuture::new(msg.clone(), &pool);
let s3 = SerializeFuture::new(msg.clone(), &pool);
futures::join!(s1, s2, s3)
});
assert_eq!(r1.len(), 108);
assert_eq!(r2.len(), 108);
assert_eq!(r3.len(), 108);
}
#[test]
fn await_serialize() {
let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string();
let pool = ThreadPoolBuilder::new().build();
task::block_on(async {
let r1 = SerializeFuture::new(msg.clone(), &pool).await;
let r2 = SerializeFuture::new(msg.clone(), &pool).await;
let r3 = SerializeFuture::new(msg.clone(), &pool).await;
assert_eq!(r1.len(), 108);
assert_eq!(r2.len(), 108);
assert_eq!(r3.len(), 108);
});
}
#[test]
fn multiple_serialize_deserialize() {
let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string();
let pool = ThreadPoolBuilder::new().build();
task::block_on(async {
let s1 = tick_tock(msg.clone(), &pool);
let s2 = tick_tock(msg.clone(), &pool);
let s3 = tick_tock(msg.clone(), &pool);
futures::join!(s1, s2, s3)
});
}
}
*/

View File

@ -1,4 +1,5 @@
use crate::{
metrics::NetworkMetrics,
protocols::Protocols,
types::{
Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
@ -11,12 +12,14 @@ use futures::{
sink::SinkExt,
stream::StreamExt,
};
use std::sync::Arc;
use tracing::*;
//use futures::prelude::*;
pub(crate) struct Channel {
cid: Cid,
local_pid: Pid,
metrics: Arc<NetworkMetrics>,
remote_pid: RwLock<Option<Pid>>,
send_state: RwLock<ChannelState>,
recv_state: RwLock<ChannelState>,
@ -41,10 +44,11 @@ impl Channel {
invalid version.\nWe don't know how to communicate with \
you.\nClosing the connection";
pub fn new(cid: u64, local_pid: Pid) -> Self {
pub fn new(cid: u64, local_pid: Pid, metrics: Arc<NetworkMetrics>) -> Self {
Self {
cid,
local_pid,
metrics,
remote_pid: RwLock::new(None),
send_state: RwLock::new(ChannelState::None),
recv_state: RwLock::new(ChannelState::None),
@ -103,6 +107,8 @@ impl Channel {
) {
const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \
something went wrong on network layer and connection will be closed";
let mut pid_string = "".to_string();
let cid_string = self.cid.to_string();
while let Some(frame) = frames.next().await {
match frame {
Frame::Handshake {
@ -110,6 +116,10 @@ impl Channel {
version,
} => {
trace!(?magic_number, ?version, "recv handshake");
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, "Handshake"])
.inc();
if self
.verify_handshake(magic_number, version, &mut frame_sender)
.await
@ -132,6 +142,12 @@ impl Channel {
*self.remote_pid.write().await = Some(pid);
*self.recv_state.write().await = ChannelState::Pid;
debug!(?pid, "Participant send their ID");
let pid_u128: u128 = pid.into();
pid_string = pid_u128.to_string();
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "ParticipantId"])
.inc();
let stream_id_offset = if *self.send_state.read().await != ChannelState::Pid {
self.send_pid(&mut frame_sender).await;
STREAM_ID_OFFSET2
@ -139,6 +155,11 @@ impl Channel {
STREAM_ID_OFFSET1
};
info!(?pid, "this channel is now configured!");
let pid_u128: u128 = pid.into();
self.metrics
.channels_connected_total
.with_label_values(&[&pid_u128.to_string()])
.inc();
let (sender, receiver) = oneshot::channel();
configured_sender
.send((self.cid, pid, stream_id_offset, sender))
@ -156,12 +177,26 @@ impl Channel {
Frame::Shutdown => {
info!("shutdown signal received");
*self.recv_state.write().await = ChannelState::Shutdown;
self.metrics
.channels_disconnected_total
.with_label_values(&[&pid_string])
.inc();
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Shutdown"])
.inc();
},
/* Sending RAW is only used for debug purposes in case someone write a
* new API against veloren Server! */
Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
Frame::Raw(bytes) => {
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Raw"])
.inc();
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
}
},
_ => {
trace!("forward frame");
@ -173,7 +208,7 @@ impl Channel {
async fn verify_handshake(
&self,
magic_number: String,
magic_number: [u8; 7],
version: [u32; 3],
#[cfg(debug_assertions)] frame_sender: &mut mpsc::UnboundedSender<Frame>,
#[cfg(not(debug_assertions))] _: &mut mpsc::UnboundedSender<Frame>,
@ -221,7 +256,7 @@ impl Channel {
pub(crate) async fn send_handshake(&self, part_in_sender: &mut mpsc::UnboundedSender<Frame>) {
part_in_sender
.send(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
magic_number: VELOREN_MAGIC_NUMBER,
version: VELOREN_NETWORK_VERSION,
})
.await

View File

@ -1,6 +1,6 @@
#![feature(trait_alias, try_trait)]
#![feature(trait_alias, try_trait, async_closure)]
mod api;
mod async_serde;
mod channel;
mod message;
mod metrics;
@ -11,6 +11,7 @@ mod scheduler;
mod types;
pub use api::{Address, Network, NetworkError, Participant, ParticipantError, Stream, StreamError};
pub use message::MessageBuffer;
pub use types::{
Pid, Promises, PROMISES_COMPRESSED, PROMISES_CONSISTENCY, PROMISES_ENCRYPTED,
PROMISES_GUARANTEED_DELIVERY, PROMISES_NONE, PROMISES_ORDERED,

View File

@ -2,10 +2,9 @@ use bincode;
use serde::{de::DeserializeOwned, Serialize};
//use std::collections::VecDeque;
use crate::types::{Mid, Sid};
use byteorder::{NetworkEndian, ReadBytesExt};
use std::sync::Arc;
pub(crate) struct MessageBuffer {
pub struct MessageBuffer {
// use VecDeque for msg storage, because it allows to quickly remove data from front.
//however VecDeque needs custom bincode code, but it's possible
pub data: Vec<u8>,
@ -44,16 +43,13 @@ impl std::fmt::Debug for MessageBuffer {
//TODO: small messages!
let len = self.data.len();
if len > 20 {
let n1 = (&self.data[0..4]).read_u32::<NetworkEndian>().unwrap();
let n2 = (&self.data[4..8]).read_u32::<NetworkEndian>().unwrap();
let n3 = (&self.data[8..12]).read_u32::<NetworkEndian>().unwrap();
write!(
f,
"MessageBuffer(len: {}, {}, {}, {}, {:?}..{:?})",
len,
n1,
n2,
n3,
u32::from_le_bytes([self.data[0], self.data[1], self.data[2], self.data[3]]),
u32::from_le_bytes([self.data[4], self.data[5], self.data[6], self.data[7]]),
u32::from_le_bytes([self.data[8], self.data[9], self.data[10], self.data[11]]),
&self.data[13..16],
&self.data[len - 8..len]
)

View File

@ -1,23 +1,24 @@
use prometheus::{IntGauge, IntGaugeVec, Opts, Registry};
use std::{
error::Error,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
use std::error::Error;
//TODO: switch over to Counter for frames_count, message_count, bytes_send,
// frames_message_count 1 NetworkMetrics per Network
#[allow(dead_code)]
pub struct NetworkMetrics {
pub participants_connected: IntGauge,
pub listen_requests_total: IntCounterVec,
pub connect_requests_total: IntCounterVec,
pub participants_connected_total: IntCounter,
pub participants_disconnected_total: IntCounter,
// opened Channels, seperated by PARTICIPANT
pub channels_connected: IntGauge,
pub channels_connected_total: IntCounterVec,
pub channels_disconnected_total: IntCounterVec,
// opened streams, seperated by PARTICIPANT
pub streams_open: IntGauge,
pub streams_opened_total: IntCounterVec,
pub streams_closed_total: IntCounterVec,
pub network_info: IntGauge,
// Frames, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE,
pub frames_out_total: IntCounterVec,
pub frames_in_total: IntCounterVec,
pub frames_count: IntGaugeVec,
// send Messages, seperated by STREAM (and PARTICIPANT, CHANNEL),
pub message_count: IntGaugeVec,
@ -33,24 +34,61 @@ pub struct NetworkMetrics {
pub queued_bytes: IntGaugeVec,
// ping calculated based on last msg seperated by PARTICIPANT
pub participants_ping: IntGaugeVec,
tick: Arc<AtomicU64>,
}
impl NetworkMetrics {
#[allow(dead_code)]
pub fn new(registry: &Registry, tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
let participants_connected = IntGauge::with_opts(Opts::new(
"participants_connected",
pub fn new() -> Result<Self, Box<dyn Error>> {
let listen_requests_total = IntCounterVec::new(
Opts::new(
"listen_requests_total",
"shows the number of listen requests to the scheduler",
),
&["protocol"],
)?;
let connect_requests_total = IntCounterVec::new(
Opts::new(
"connect_requests_total",
"shows the number of connect requests to the scheduler",
),
&["protocol"],
)?;
let participants_connected_total = IntCounter::with_opts(Opts::new(
"participants_connected_total",
"shows the number of participants connected to the network",
))?;
let channels_connected = IntGauge::with_opts(Opts::new(
"channels_connected",
"number of all channels currently connected on the network",
))?;
let streams_open = IntGauge::with_opts(Opts::new(
"streams_open",
"number of all streams currently open on the network",
let participants_disconnected_total = IntCounter::with_opts(Opts::new(
"participants_disconnected_total",
"shows the number of participants disconnected to the network",
))?;
let channels_connected_total = IntCounterVec::new(
Opts::new(
"channels_connected_total",
"number of all channels currently connected on the network",
),
&["participant"],
)?;
let channels_disconnected_total = IntCounterVec::new(
Opts::new(
"channels_disconnected_total",
"number of all channels currently disconnected on the network",
),
&["participant"],
)?;
let streams_opened_total = IntCounterVec::new(
Opts::new(
"streams_opened_total",
"number of all streams currently open on the network",
),
&["participant"],
)?;
let streams_closed_total = IntCounterVec::new(
Opts::new(
"streams_closed_total",
"number of all streams currently open on the network",
),
&["participant"],
)?;
let opts = Opts::new("network_info", "Static Network information").const_label(
"version",
&format!(
@ -61,71 +99,77 @@ impl NetworkMetrics {
),
);
let network_info = IntGauge::with_opts(opts)?;
let frames_out_total = IntCounterVec::new(
Opts::new("frames_out_total", "number of all frames send per channel"),
&["participant", "channel", "frametype"],
)?;
let frames_in_total = IntCounterVec::new(
Opts::new(
"frames_in_total",
"number of all frames received per channel",
),
&["participant", "channel", "frametype"],
)?;
let frames_count = IntGaugeVec::from(IntGaugeVec::new(
let frames_count = IntGaugeVec::new(
Opts::new(
"frames_count",
"number of all frames send by streams on the network",
),
&["channel"],
)?);
let message_count = IntGaugeVec::from(IntGaugeVec::new(
)?;
let message_count = IntGaugeVec::new(
Opts::new(
"message_count",
"number of messages send by streams on the network",
),
&["channel"],
)?);
let bytes_send = IntGaugeVec::from(IntGaugeVec::new(
)?;
let bytes_send = IntGaugeVec::new(
Opts::new("bytes_send", "bytes send by streams on the network"),
&["channel"],
)?);
let frames_message_count = IntGaugeVec::from(IntGaugeVec::new(
)?;
let frames_message_count = IntGaugeVec::new(
Opts::new(
"frames_message_count",
"bytes sends per message on the network",
),
&["channel"],
)?);
let queued_count = IntGaugeVec::from(IntGaugeVec::new(
)?;
let queued_count = IntGaugeVec::new(
Opts::new(
"queued_count",
"queued number of messages by participant on the network",
),
&["channel"],
)?);
let queued_bytes = IntGaugeVec::from(IntGaugeVec::new(
)?;
let queued_bytes = IntGaugeVec::new(
Opts::new(
"queued_bytes",
"queued bytes of messages by participant on the network",
),
&["channel"],
)?);
let participants_ping = IntGaugeVec::from(IntGaugeVec::new(
)?;
let participants_ping = IntGaugeVec::new(
Opts::new(
"participants_ping",
"ping time to participants on the network",
),
&["channel"],
)?);
registry.register(Box::new(participants_connected.clone()))?;
registry.register(Box::new(channels_connected.clone()))?;
registry.register(Box::new(streams_open.clone()))?;
registry.register(Box::new(network_info.clone()))?;
registry.register(Box::new(frames_count.clone()))?;
registry.register(Box::new(message_count.clone()))?;
registry.register(Box::new(bytes_send.clone()))?;
registry.register(Box::new(frames_message_count.clone()))?;
registry.register(Box::new(queued_count.clone()))?;
registry.register(Box::new(queued_bytes.clone()))?;
registry.register(Box::new(participants_ping.clone()))?;
)?;
Ok(Self {
participants_connected,
channels_connected,
streams_open,
listen_requests_total,
connect_requests_total,
participants_connected_total,
participants_disconnected_total,
channels_connected_total,
channels_disconnected_total,
streams_opened_total,
streams_closed_total,
network_info,
frames_out_total,
frames_in_total,
frames_count,
message_count,
bytes_send,
@ -133,9 +177,38 @@ impl NetworkMetrics {
queued_count,
queued_bytes,
participants_ping,
tick,
})
}
pub fn _is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
registry.register(Box::new(self.listen_requests_total.clone()))?;
registry.register(Box::new(self.connect_requests_total.clone()))?;
registry.register(Box::new(self.participants_connected_total.clone()))?;
registry.register(Box::new(self.participants_disconnected_total.clone()))?;
registry.register(Box::new(self.channels_connected_total.clone()))?;
registry.register(Box::new(self.channels_disconnected_total.clone()))?;
registry.register(Box::new(self.streams_opened_total.clone()))?;
registry.register(Box::new(self.streams_closed_total.clone()))?;
registry.register(Box::new(self.network_info.clone()))?;
registry.register(Box::new(self.frames_out_total.clone()))?;
registry.register(Box::new(self.frames_in_total.clone()))?;
registry.register(Box::new(self.frames_count.clone()))?;
registry.register(Box::new(self.message_count.clone()))?;
registry.register(Box::new(self.bytes_send.clone()))?;
registry.register(Box::new(self.frames_message_count.clone()))?;
registry.register(Box::new(self.queued_count.clone()))?;
registry.register(Box::new(self.queued_bytes.clone()))?;
registry.register(Box::new(self.participants_ping.clone()))?;
Ok(())
}
//pub fn _is_100th_tick(&self) -> bool {
// self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
}
impl std::fmt::Debug for NetworkMetrics {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "NetworkMetrics()")
}
}

View File

@ -1,6 +1,7 @@
use crate::{
api::Stream,
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
metrics::NetworkMetrics,
types::{Cid, Frame, Pid, Prio, Promises, Sid},
};
use async_std::sync::RwLock;
@ -51,12 +52,14 @@ pub struct BParticipant {
>,
>,
run_channels: Option<ControlChannels>,
metrics: Arc<NetworkMetrics>,
}
impl BParticipant {
pub(crate) fn new(
remote_pid: Pid,
offset_sid: Sid,
metrics: Arc<NetworkMetrics>,
send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
) -> (
@ -98,6 +101,7 @@ impl BParticipant {
channels: RwLock::new(vec![]),
streams: RwLock::new(HashMap::new()),
run_channels,
metrics,
},
stream_open_sender,
stream_opened_receiver,
@ -166,6 +170,8 @@ impl BParticipant {
trace!("start handle_frames");
let send_outgoing = { send_outgoing.lock().unwrap().clone() };
let mut messages = HashMap::new();
let pid_u128: u128 = self.remote_pid.into();
let pid_string = pid_u128.to_string();
while let Some(frame) = frame_recv_receiver.next().await {
debug!("handling frame");
match frame {
@ -179,6 +185,9 @@ impl BParticipant {
.create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender)
.await;
stream_opened_sender.send(stream).await.unwrap();
//TODO: Metrics
//self.metrics.frames_in_total.with_label_values(&[&pid_string, &cid_string,
// "Raw"]).inc();
trace!("opened frame from remote");
},
Frame::CloseStream { sid } => {
@ -188,6 +197,11 @@ impl BParticipant {
// is dropped, so i need a way to notify the Stream that it's send messages will
// be dropped... from remote, notify local
if let Some((_, _, _, closed)) = self.streams.write().await.remove(&sid) {
let pid_u128: u128 = self.remote_pid.into();
self.metrics
.streams_closed_total
.with_label_values(&[&pid_u128.to_string()])
.inc();
closed.store(true, Ordering::Relaxed);
} else {
error!(
@ -207,19 +221,19 @@ impl BParticipant {
messages.insert(mid, imsg);
},
Frame::Data {
id,
mid,
start: _,
mut data,
} => {
let finished = if let Some(imsg) = messages.get_mut(&id) {
let finished = if let Some(imsg) = messages.get_mut(&mid) {
imsg.buffer.data.append(&mut data);
imsg.buffer.data.len() as u64 == imsg.length
} else {
false
};
if finished {
debug!(?id, "finished receiving message");
let imsg = messages.remove(&id).unwrap();
debug!(?mid, "finished receiving message");
let imsg = messages.remove(&mid).unwrap();
if let Some((_, _, sender, _)) =
self.streams.write().await.get_mut(&imsg.sid)
{
@ -318,6 +332,7 @@ impl BParticipant {
trace!(?sid, "shutting down Stream");
closing.store(true, Ordering::Relaxed);
}
self.metrics.participants_disconnected_total.inc();
trace!("stop shutdown_manager");
}
@ -354,6 +369,11 @@ impl BParticipant {
.unwrap();
receiver.await.unwrap();
trace!(?sid, "stream was successfully flushed");
let pid_u128: u128 = self.remote_pid.into();
self.metrics
.streams_closed_total
.with_label_values(&[&pid_u128.to_string()])
.inc();
self.streams.write().await.remove(&sid);
//from local, notify remote
@ -376,6 +396,11 @@ impl BParticipant {
.write()
.await
.insert(sid, (prio, promises, msg_recv_sender, closed.clone()));
let pid_u128: u128 = self.remote_pid.into();
self.metrics
.streams_opened_total
.with_label_values(&[&pid_u128.to_string()])
.inc();
Stream::new(
self.remote_pid,
sid,

View File

@ -174,7 +174,7 @@ impl PrioManager {
})));
}
frames.extend(std::iter::once((msg_pid, msg_sid, Frame::Data {
id: msg.mid,
mid: msg.mid,
start: msg.cursor,
data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize]
.to_vec(),
@ -316,8 +316,8 @@ mod tests {
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!")
.2;
if let Frame::Data { id, start, data } = frame {
assert_eq!(id, 1);
if let Frame::Data { mid, start, data } = frame {
assert_eq!(mid, 1);
assert_eq!(start, f_start);
assert_eq!(data, f_data);
} else {

View File

@ -1,4 +1,7 @@
use crate::types::Frame;
use crate::{
metrics::NetworkMetrics,
types::{Frame, Mid, Pid, Sid},
};
use async_std::{
net::{TcpStream, UdpSocket},
prelude::*,
@ -8,6 +11,20 @@ use futures::{channel::mpsc, future::FutureExt, select, sink::SinkExt, stream::S
use std::{net::SocketAddr, sync::Arc};
use tracing::*;
// Reserving bytes 0, 10, 13 as i have enough space and want to make it easy to
// detect a invalid client, e.g. sending an empty line would make 10 first char
// const FRAME_RESERVED_1: u8 = 0;
const FRAME_HANDSHAKE: u8 = 1;
const FRAME_PARTICIPANT_ID: u8 = 2;
const FRAME_SHUTDOWN: u8 = 3;
const FRAME_OPEN_STREAM: u8 = 4;
const FRAME_CLOSE_STREAM: u8 = 5;
const FRAME_DATA_HEADER: u8 = 6;
const FRAME_DATA: u8 = 7;
const FRAME_RAW: u8 = 8;
//const FRAME_RESERVED_2: u8 = 10;
//const FRAME_RESERVED_3: u8 = 13;
#[derive(Debug)]
pub(crate) enum Protocols {
Tcp(TcpProtocol),
@ -18,64 +35,130 @@ pub(crate) enum Protocols {
#[derive(Debug)]
pub(crate) struct TcpProtocol {
stream: TcpStream,
metrics: Arc<NetworkMetrics>,
}
#[derive(Debug)]
pub(crate) struct UdpProtocol {
socket: Arc<UdpSocket>,
remote_addr: SocketAddr,
metrics: Arc<NetworkMetrics>,
data_in: RwLock<mpsc::UnboundedReceiver<Vec<u8>>>,
}
impl TcpProtocol {
pub(crate) fn new(stream: TcpStream) -> Self { Self { stream } }
pub(crate) fn new(stream: TcpStream, metrics: Arc<NetworkMetrics>) -> Self {
Self { stream, metrics }
}
pub async fn read(&self, mut frame_handler: mpsc::UnboundedSender<Frame>) {
let mut stream = self.stream.clone();
let mut buffer = NetworkBuffer::new();
loop {
match stream.read(buffer.get_write_slice(2048)).await {
Ok(0) => {
debug!(?buffer, "shutdown of tcp channel detected");
frame_handler.send(Frame::Shutdown).await.unwrap();
break;
},
Ok(n) => {
buffer.actually_written(n);
trace!("incomming message with len: {}", n);
let slice = buffer.get_read_slice();
let mut cur = std::io::Cursor::new(slice);
let mut read_ok = 0;
while cur.position() < n as u64 {
let round_start = cur.position() as usize;
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => {
frame_handler.send(frame).await.unwrap();
read_ok = cur.position() as usize;
},
Err(e) => {
// Probably we have to wait for moare data!
let first_bytes_of_msg =
&slice[round_start..std::cmp::min(n, round_start + 16)];
trace!(
?buffer,
?e,
?n,
?round_start,
?first_bytes_of_msg,
"message cant be parsed, probably because we need to wait for \
more data"
);
break;
},
}
}
buffer.actually_read(read_ok);
},
Err(e) => panic!("{}", e),
let mut bytes = [0u8; 1];
if stream.read_exact(&mut bytes).await.is_err() {
info!("tcp channel closed, shutting down read");
break;
}
let frame_no = bytes[0];
let frame = match frame_no {
FRAME_HANDSHAKE => {
let mut bytes = [0u8; 19];
stream.read_exact(&mut bytes).await.unwrap();
let magic_number = [
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
];
Frame::Handshake {
magic_number,
version: [
u32::from_le_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]),
u32::from_le_bytes([bytes[11], bytes[12], bytes[13], bytes[14]]),
u32::from_le_bytes([bytes[15], bytes[16], bytes[17], bytes[18]]),
],
}
},
FRAME_PARTICIPANT_ID => {
let mut bytes = [0u8; 16];
stream.read_exact(&mut bytes).await.unwrap();
let pid = Pid::from_le_bytes(bytes);
Frame::ParticipantId { pid }
},
FRAME_SHUTDOWN => Frame::Shutdown,
FRAME_OPEN_STREAM => {
let mut bytes = [0u8; 10];
stream.read_exact(&mut bytes).await.unwrap();
let sid = Sid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
let prio = bytes[8];
let promises = bytes[9];
Frame::OpenStream {
sid,
prio,
promises,
}
},
FRAME_CLOSE_STREAM => {
let mut bytes = [0u8; 8];
stream.read_exact(&mut bytes).await.unwrap();
let sid = Sid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
Frame::CloseStream { sid }
},
FRAME_DATA_HEADER => {
let mut bytes = [0u8; 24];
stream.read_exact(&mut bytes).await.unwrap();
let mid = Mid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
let sid = Sid::from_le_bytes([
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
bytes[15],
]);
let length = u64::from_le_bytes([
bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21],
bytes[22], bytes[23],
]);
Frame::DataHeader { mid, sid, length }
},
FRAME_DATA => {
let mut bytes = [0u8; 18];
stream.read_exact(&mut bytes).await.unwrap();
let mid = Mid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
let start = u64::from_le_bytes([
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
bytes[15],
]);
let length = u16::from_le_bytes([bytes[16], bytes[17]]);
let mut data = vec![0; length as usize];
stream.read_exact(&mut data).await.unwrap();
Frame::Data { mid, start, data }
},
FRAME_RAW => {
let mut bytes = [0u8; 2];
stream.read_exact(&mut bytes).await.unwrap();
let length = u16::from_le_bytes([bytes[0], bytes[1]]);
let mut data = vec![0; length as usize];
stream.read_exact(&mut data).await.unwrap();
Frame::Raw(data)
},
_ => {
// report a RAW frame, but cannot rely on the next 2 bytes to be a size.
// guessing 256 bytes, which might help to sort down issues
let mut data = vec![0; 256];
stream.read(&mut data).await.unwrap();
Frame::Raw(data)
},
};
frame_handler.send(frame).await.unwrap();
}
trace!("shutting down tcp read()");
}
//dezerialize here as this is executed in a seperate thread PER channel.
@ -91,11 +174,83 @@ impl TcpProtocol {
next = internal_frame_receiver.next().fuse() => next,
next = external_frame_receiver.next().fuse() => next,
} {
let data = bincode::serialize(&frame).unwrap();
let len = data.len();
trace!(?len, "going to send frame via Tcp");
stream.write_all(data.as_slice()).await.unwrap();
match frame {
Frame::Handshake {
magic_number,
version,
} => {
stream
.write_all(&FRAME_HANDSHAKE.to_be_bytes())
.await
.unwrap();
stream.write_all(&magic_number).await.unwrap();
stream.write_all(&version[0].to_le_bytes()).await.unwrap();
stream.write_all(&version[1].to_le_bytes()).await.unwrap();
stream.write_all(&version[2].to_le_bytes()).await.unwrap();
},
Frame::ParticipantId { pid } => {
stream
.write_all(&FRAME_PARTICIPANT_ID.to_be_bytes())
.await
.unwrap();
stream.write_all(&pid.to_le_bytes()).await.unwrap();
},
Frame::Shutdown => {
stream
.write_all(&FRAME_SHUTDOWN.to_be_bytes())
.await
.unwrap();
},
Frame::OpenStream {
sid,
prio,
promises,
} => {
stream
.write_all(&FRAME_OPEN_STREAM.to_be_bytes())
.await
.unwrap();
stream.write_all(&sid.to_le_bytes()).await.unwrap();
stream.write_all(&prio.to_le_bytes()).await.unwrap();
stream.write_all(&promises.to_le_bytes()).await.unwrap();
},
Frame::CloseStream { sid } => {
stream
.write_all(&FRAME_CLOSE_STREAM.to_be_bytes())
.await
.unwrap();
stream.write_all(&sid.to_le_bytes()).await.unwrap();
},
Frame::DataHeader { mid, sid, length } => {
stream
.write_all(&FRAME_DATA_HEADER.to_be_bytes())
.await
.unwrap();
stream.write_all(&mid.to_le_bytes()).await.unwrap();
stream.write_all(&sid.to_le_bytes()).await.unwrap();
stream.write_all(&length.to_le_bytes()).await.unwrap();
},
Frame::Data { mid, start, data } => {
stream.write_all(&FRAME_DATA.to_be_bytes()).await.unwrap();
stream.write_all(&mid.to_le_bytes()).await.unwrap();
stream.write_all(&start.to_le_bytes()).await.unwrap();
stream
.write_all(&(data.len() as u16).to_le_bytes())
.await
.unwrap();
stream.write_all(&data).await.unwrap();
},
Frame::Raw(data) => {
stream.write_all(&FRAME_RAW.to_be_bytes()).await.unwrap();
stream
.write_all(&(data.len() as u16).to_le_bytes())
.await
.unwrap();
stream.write_all(&data).await.unwrap();
},
}
}
trace!("shutting down tcp write()");
}
}
@ -103,16 +258,110 @@ impl UdpProtocol {
pub(crate) fn new(
socket: Arc<UdpSocket>,
remote_addr: SocketAddr,
metrics: Arc<NetworkMetrics>,
data_in: mpsc::UnboundedReceiver<Vec<u8>>,
) -> Self {
Self {
socket,
remote_addr,
metrics,
data_in: RwLock::new(data_in),
}
}
pub async fn read(&self, mut frame_handler: mpsc::UnboundedSender<Frame>) {
let mut data_in = self.data_in.write().await;
while let Some(bytes) = data_in.next().await {
trace!("got raw UDP message with len: {}", bytes.len());
let frame_no = bytes[0];
let frame = match frame_no {
FRAME_HANDSHAKE => {
let bytes = &bytes[1..20];
let magic_number = [
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
];
Frame::Handshake {
magic_number,
version: [
u32::from_le_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]),
u32::from_le_bytes([bytes[11], bytes[12], bytes[13], bytes[14]]),
u32::from_le_bytes([bytes[15], bytes[16], bytes[17], bytes[18]]),
],
}
},
FRAME_PARTICIPANT_ID => {
let pid = Pid::from_le_bytes([
bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
bytes[15], bytes[16],
]);
Frame::ParticipantId { pid }
},
FRAME_SHUTDOWN => Frame::Shutdown,
FRAME_OPEN_STREAM => {
let bytes = &bytes[1..11];
let sid = Sid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
let prio = bytes[8];
let promises = bytes[9];
Frame::OpenStream {
sid,
prio,
promises,
}
},
FRAME_CLOSE_STREAM => {
let bytes = &bytes[1..9];
let sid = Sid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
Frame::CloseStream { sid }
},
FRAME_DATA_HEADER => {
let bytes = &bytes[1..25];
let mid = Mid::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
let sid = Sid::from_le_bytes([
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
bytes[15],
]);
let length = u64::from_le_bytes([
bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21],
bytes[22], bytes[23],
]);
Frame::DataHeader { mid, sid, length }
},
FRAME_DATA => {
let mid = Mid::from_le_bytes([
bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
bytes[8],
]);
let start = u64::from_le_bytes([
bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
bytes[16],
]);
let length = u16::from_le_bytes([bytes[17], bytes[18]]);
let mut data = vec![0; length as usize];
data.copy_from_slice(&bytes[19..]);
Frame::Data { mid, start, data }
},
FRAME_RAW => {
error!("Uffff");
let length = u16::from_le_bytes([bytes[1], bytes[2]]);
let mut data = vec![0; length as usize];
data.copy_from_slice(&bytes[3..]);
Frame::Raw(data)
},
_ => Frame::Raw(bytes),
};
frame_handler.send(frame).await.unwrap();
}
/*
let mut data_in = self.data_in.write().await;
let mut buffer = NetworkBuffer::new();
while let Some(data) = data_in.next().await {
@ -150,7 +399,8 @@ impl UdpProtocol {
}
}
buffer.actually_read(read_ok);
}
}*/
trace!("shutting down udp read()");
}
pub async fn write(
@ -158,6 +408,201 @@ impl UdpProtocol {
mut internal_frame_receiver: mpsc::UnboundedReceiver<Frame>,
mut external_frame_receiver: mpsc::UnboundedReceiver<Frame>,
) {
let mut buffer = [0u8; 2000];
while let Some(frame) = select! {
next = internal_frame_receiver.next().fuse() => next,
next = external_frame_receiver.next().fuse() => next,
} {
let len = match frame {
Frame::Handshake {
magic_number,
version,
} => {
let x = FRAME_HANDSHAKE.to_be_bytes();
buffer[0] = x[0];
buffer[1] = magic_number[0];
buffer[2] = magic_number[1];
buffer[3] = magic_number[2];
buffer[4] = magic_number[3];
buffer[5] = magic_number[4];
buffer[6] = magic_number[5];
buffer[7] = magic_number[6];
let x = version[0].to_le_bytes();
buffer[8] = x[0];
buffer[9] = x[1];
buffer[10] = x[2];
buffer[11] = x[3];
let x = version[1].to_le_bytes();
buffer[12] = x[0];
buffer[13] = x[1];
buffer[14] = x[2];
buffer[15] = x[3];
let x = version[2].to_le_bytes();
buffer[16] = x[0];
buffer[17] = x[1];
buffer[18] = x[2];
buffer[19] = x[3];
20
},
Frame::ParticipantId { pid } => {
let x = FRAME_PARTICIPANT_ID.to_be_bytes();
buffer[0] = x[0];
let x = pid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
buffer[9] = x[8];
buffer[10] = x[9];
buffer[11] = x[10];
buffer[12] = x[11];
buffer[13] = x[12];
buffer[14] = x[13];
buffer[15] = x[14];
buffer[16] = x[15];
17
},
Frame::Shutdown => {
let x = FRAME_SHUTDOWN.to_be_bytes();
buffer[0] = x[0];
1
},
Frame::OpenStream {
sid,
prio,
promises,
} => {
let x = FRAME_OPEN_STREAM.to_be_bytes();
buffer[0] = x[0];
let x = sid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
let x = prio.to_le_bytes();
buffer[9] = x[0];
let x = promises.to_le_bytes();
buffer[10] = x[0];
11
},
Frame::CloseStream { sid } => {
let x = FRAME_CLOSE_STREAM.to_be_bytes();
buffer[0] = x[0];
let x = sid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
9
},
Frame::DataHeader { mid, sid, length } => {
let x = FRAME_DATA_HEADER.to_be_bytes();
buffer[0] = x[0];
let x = mid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
let x = sid.to_le_bytes();
buffer[9] = x[0];
buffer[10] = x[1];
buffer[11] = x[2];
buffer[12] = x[3];
buffer[13] = x[4];
buffer[14] = x[5];
buffer[15] = x[6];
buffer[16] = x[7];
let x = length.to_le_bytes();
buffer[17] = x[0];
buffer[18] = x[1];
buffer[19] = x[2];
buffer[20] = x[3];
buffer[21] = x[4];
buffer[22] = x[5];
buffer[23] = x[6];
buffer[24] = x[7];
25
},
Frame::Data { mid, start, data } => {
let x = FRAME_DATA.to_be_bytes();
buffer[0] = x[0];
let x = mid.to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
buffer[3] = x[2];
buffer[4] = x[3];
buffer[5] = x[4];
buffer[6] = x[5];
buffer[7] = x[6];
buffer[8] = x[7];
let x = start.to_le_bytes();
buffer[9] = x[0];
buffer[10] = x[1];
buffer[11] = x[2];
buffer[12] = x[3];
buffer[13] = x[4];
buffer[14] = x[5];
buffer[15] = x[6];
buffer[16] = x[7];
let x = (data.len() as u16).to_le_bytes();
buffer[17] = x[0];
buffer[18] = x[1];
for i in 0..data.len() {
buffer[19 + i] = data[i];
}
19 + data.len()
},
Frame::Raw(data) => {
let x = FRAME_RAW.to_be_bytes();
buffer[0] = x[0];
let x = (data.len() as u16).to_le_bytes();
buffer[1] = x[0];
buffer[2] = x[1];
for i in 0..data.len() {
buffer[3 + i] = data[i];
}
3 + data.len()
},
};
let mut start = 0;
while start < len {
trace!(?start, ?len, "splitting up udp frame in multiple packages");
match self
.socket
.send_to(&buffer[start..len], self.remote_addr)
.await
{
Ok(n) => {
start += n;
if n != len {
error!(
"THIS DOESNT WORK, as RECEIVER CURRENLTY ONLY HANDLES 1 FRAME per \
UDP message. splitting up will fail!"
);
}
},
Err(e) => error!(?e, "need to handle that error!"),
}
}
}
trace!("shutting down udp write()");
/*
let mut buffer = NetworkBuffer::new();
while let Some(frame) = select! {
next = internal_frame_receiver.next().fuse() => next,
@ -178,11 +623,12 @@ impl UdpProtocol {
to_send = buffer.get_read_slice();
}
}
*/
}
}
// INTERNAL NetworkBuffer
/*
struct NetworkBuffer {
pub(crate) data: Vec<u8>,
pub(crate) read_idx: usize,
@ -267,3 +713,5 @@ impl std::fmt::Debug for NetworkBuffer {
)
}
}
*/

View File

@ -2,6 +2,7 @@ use crate::{
api::{Address, Participant},
channel::Channel,
message::OutGoingMessage,
metrics::NetworkMetrics,
participant::BParticipant,
prios::PrioManager,
protocols::{Protocols, TcpProtocol, UdpProtocol},
@ -19,6 +20,7 @@ use futures::{
sink::SinkExt,
stream::StreamExt,
};
use prometheus::Registry;
use std::{
collections::{HashMap, VecDeque},
sync::{
@ -62,11 +64,13 @@ pub struct Scheduler {
channel_listener: RwLock<HashMap<Address, oneshot::Sender<()>>>,
unknown_channels: Arc<RwLock<HashMap<Cid, UnknownChannelInfo>>>,
prios: Arc<Mutex<PrioManager>>,
metrics: Arc<NetworkMetrics>,
}
impl Scheduler {
pub fn new(
local_pid: Pid,
registry: Option<&Registry>,
) -> (
Self,
mpsc::UnboundedSender<(Address, oneshot::Sender<io::Result<()>>)>,
@ -90,6 +94,11 @@ impl Scheduler {
prios_sender,
});
let metrics = Arc::new(NetworkMetrics::new().unwrap());
if let Some(registry) = registry {
metrics.register(registry).unwrap();
}
(
Self {
local_pid,
@ -102,6 +111,7 @@ impl Scheduler {
channel_listener: RwLock::new(HashMap::new()),
unknown_channels: Arc::new(RwLock::new(HashMap::new())),
prios: Arc::new(Mutex::new(prios)),
metrics,
},
listen_sender,
connect_sender,
@ -146,30 +156,43 @@ impl Scheduler {
async fn listen_manager(
&self,
mut listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid, oneshot::Sender<()>)>,
) {
trace!("start listen_manager");
while let Some((address, result_sender)) = listen_receiver.next().await {
debug!(?address, "got request to open a channel_creator");
let (end_sender, end_receiver) = oneshot::channel::<()>();
self.channel_listener
.write()
.await
.insert(address.clone(), end_sender);
self.pool.spawn_ok(Self::channel_creator(
self.channel_ids.clone(),
self.local_pid,
address.clone(),
end_receiver,
self.pool.clone(),
part_out_sender.clone(),
configured_sender.clone(),
self.unknown_channels.clone(),
result_sender,
));
}
listen_receiver
.for_each_concurrent(None, |(address, result_sender)| {
let address = address.clone();
let part_out_sender = part_out_sender.clone();
let configured_sender = configured_sender.clone();
async move {
debug!(?address, "got request to open a channel_creator");
self.metrics
.listen_requests_total
.with_label_values(&[match address {
Address::Tcp(_) => "tcp",
Address::Udp(_) => "udp",
Address::Mpsc(_) => "mpsc",
}])
.inc();
let (end_sender, end_receiver) = oneshot::channel::<()>();
self.channel_listener
.write()
.await
.insert(address.clone(), end_sender);
self.channel_creator(
address,
end_receiver,
part_out_sender.clone(),
configured_sender.clone(),
result_sender,
)
.await;
}
})
.await;
trace!("stop listen_manager");
}
@ -184,8 +207,12 @@ impl Scheduler {
) {
trace!("start connect_manager");
while let Some((addr, pid_sender)) = connect_receiver.next().await {
match addr {
let (addr, protocol, handshake) = match addr {
Address::Tcp(addr) => {
self.metrics
.connect_requests_total
.with_label_values(&["tcp"])
.inc();
let stream = match net::TcpStream::connect(addr).await {
Ok(stream) => stream,
Err(e) => {
@ -194,21 +221,14 @@ impl Scheduler {
},
};
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
Self::init_protocol(
&self.channel_ids,
self.local_pid,
addr,
&self.pool,
&part_out_sender,
&configured_sender,
&self.unknown_channels,
Protocols::Tcp(TcpProtocol::new(stream)),
Some(pid_sender),
false,
)
.await;
let protocol = Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone()));
(addr, protocol, false)
},
Address::Udp(addr) => {
self.metrics
.connect_requests_total
.with_label_values(&["udp"])
.inc();
let socket = match net::UdpSocket::bind("0.0.0.0:0").await {
Ok(socket) => Arc::new(socket),
Err(e) => {
@ -222,28 +242,29 @@ impl Scheduler {
};
info!("Connecting Udp to: {}", addr);
let (udp_data_sender, udp_data_receiver) = mpsc::unbounded::<Vec<u8>>();
let protocol =
Protocols::Udp(UdpProtocol::new(socket.clone(), addr, udp_data_receiver));
let protocol = Protocols::Udp(UdpProtocol::new(
socket.clone(),
addr,
self.metrics.clone(),
udp_data_receiver,
));
self.pool.spawn_ok(
Self::udp_single_channel_connect(socket.clone(), udp_data_sender)
.instrument(tracing::info_span!("udp", ?addr)),
);
Self::init_protocol(
&self.channel_ids,
self.local_pid,
addr,
&self.pool,
&part_out_sender,
&configured_sender,
&self.unknown_channels,
protocol,
Some(pid_sender),
true,
)
.await;
(addr, protocol, true)
},
_ => unimplemented!(),
}
};
self.init_protocol(
addr,
&part_out_sender,
&configured_sender,
protocol,
Some(pid_sender),
handshake,
)
.await;
}
trace!("stop connect_manager");
}
@ -286,6 +307,8 @@ impl Scheduler {
trace!("stop send_outgoing");
}
//TODO Why is this done in scheduler when it just redirecty everything to
// participant?
async fn handle_frames(&self, mut part_out_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>) {
trace!("start handle_frames");
while let Some((cid, frame)) = part_out_receiver.next().await {
@ -301,7 +324,9 @@ impl Scheduler {
trace!("stop handle_frames");
}
//
//TODO: //ERROR CHECK IF THIS SHOULD BE PUT IN A ASYNC FUNC WHICH IS SEND OVER
// TO CHANNEL OR NOT FOR RETURN VALUE!
async fn channel_configurer(
&self,
mut connected_sender: mpsc::UnboundedSender<Participant>,
@ -334,6 +359,7 @@ impl Scheduler {
) = BParticipant::new(
pid,
offset_sid,
self.metrics.clone(),
prios_sender.clone(),
stream_finished_request_sender.clone(),
);
@ -352,6 +378,7 @@ impl Scheduler {
// noone is waiting on this Participant, return in to Network
connected_sender.send(participant).await.unwrap();
}
self.metrics.participants_connected_total.inc();
transfer_channel_receiver
.send((cid, frame_sender))
.await
@ -387,31 +414,22 @@ impl Scheduler {
// more msg is in prio and return
pub(crate) async fn stream_finished_manager(
&self,
mut stream_finished_request_receiver: mpsc::UnboundedReceiver<(
Pid,
Sid,
oneshot::Sender<()>,
)>,
stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>,
) {
trace!("start stream_finished_manager");
while let Some((pid, sid, sender)) = stream_finished_request_receiver.next().await {
//TODO: THERE MUST BE A MORE CLEVER METHOD THAN SPIN LOCKING! LIKE REGISTERING
// DIRECTLY IN PRIO AS A FUTURE WERE PRIO IS WAKER! TODO: also this
// has a great potential for handing network, if you create a network, send
// gigabytes close it then. Also i need a Mutex, which really adds
// to cost if alot strems want to close
let prios = self.prios.clone();
self.pool
.spawn_ok(Self::stream_finished_waiter(pid, sid, sender, prios));
}
stream_finished_request_receiver
.for_each_concurrent(None, async move |(pid, sid, sender)| {
//TODO: THERE MUST BE A MORE CLEVER METHOD THAN SPIN LOCKING! LIKE REGISTERING
// DIRECTLY IN PRIO AS A FUTURE WERE PRIO IS WAKER! TODO: also this
// has a great potential for handing network, if you create a network, send
// gigabytes close it then. Also i need a Mutex, which really adds
// to cost if alot strems want to close
self.stream_finished_waiter(pid, sid, sender).await;
})
.await;
}
async fn stream_finished_waiter(
pid: Pid,
sid: Sid,
sender: oneshot::Sender<()>,
prios: Arc<Mutex<PrioManager>>,
) {
async fn stream_finished_waiter(&self, pid: Pid, sid: Sid, sender: oneshot::Sender<()>) {
const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(5);
//TODO: ARRRG, i need to wait for AT LEAST 1 TICK, because i am lazy i just
// wait 15mn and tick count is 10ms because recv is only done with a
@ -419,24 +437,21 @@ impl Scheduler {
async_std::task::sleep(TICK_TIME * 3).await;
let mut n = 0u64;
loop {
if !prios.lock().await.contains_pid_sid(pid, sid) {
if !self.prios.lock().await.contains_pid_sid(pid, sid) {
trace!("prio is clear, go to close stream as requested from api");
sender.send(()).unwrap();
break;
}
n += 1;
if n > 200 {
warn!(
?pid,
?sid,
?n,
"cant close stream, as it still queued, even after 1000ms, this starts to \
take long"
);
async_std::task::sleep(TICK_TIME * 50).await;
} else {
async_std::task::sleep(TICK_TIME).await;
}
async_std::task::sleep(match n {
0..=199 => TICK_TIME,
n if n.rem_euclid(100) == 0 => {
warn!(?pid, ?sid, ?n, "cant close stream, as it still queued");
TICK_TIME * (n as f32 * (n as f32).sqrt() / 100.0) as u32
},
n => TICK_TIME * (n as f32 * (n as f32).sqrt() / 100.0) as u32,
})
.await;
}
}
@ -454,14 +469,11 @@ impl Scheduler {
}
pub(crate) async fn channel_creator(
channel_ids: Arc<AtomicU64>,
local_pid: Pid,
&self,
addr: Address,
end_receiver: oneshot::Receiver<()>,
pool: Arc<ThreadPool>,
part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid, oneshot::Sender<()>)>,
unknown_channels: Arc<RwLock<HashMap<Cid, UnknownChannelInfo>>>,
result_sender: oneshot::Sender<io::Result<()>>,
) {
info!(?addr, "start up channel creator");
@ -491,15 +503,11 @@ impl Scheduler {
} {
let stream = stream.unwrap();
info!("Accepting Tcp from: {}", stream.peer_addr().unwrap());
Self::init_protocol(
&channel_ids,
local_pid,
self.init_protocol(
addr,
&pool,
&part_out_sender,
&configured_sender,
&unknown_channels,
Protocols::Tcp(TcpProtocol::new(stream)),
Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone())),
None,
true,
)
@ -541,16 +549,13 @@ impl Scheduler {
let protocol = Protocols::Udp(UdpProtocol::new(
socket.clone(),
remote_addr,
self.metrics.clone(),
udp_data_receiver,
));
Self::init_protocol(
&channel_ids,
local_pid,
self.init_protocol(
addr,
&pool,
&part_out_sender,
&configured_sender,
&unknown_channels,
protocol,
None,
true,
@ -591,13 +596,10 @@ impl Scheduler {
}
async fn init_protocol(
channel_ids: &Arc<AtomicU64>,
local_pid: Pid,
&self,
addr: std::net::SocketAddr,
pool: &Arc<ThreadPool>,
part_out_sender: &mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: &mpsc::UnboundedSender<(Cid, Pid, Sid, oneshot::Sender<()>)>,
unknown_channels: &Arc<RwLock<HashMap<Cid, UnknownChannelInfo>>>,
protocol: Protocols,
pid_sender: Option<oneshot::Sender<io::Result<Participant>>>,
send_handshake: bool,
@ -609,12 +611,12 @@ impl Scheduler {
Contra: - DOS posibility because we answer fist
- Speed, because otherwise the message can be send with the creation
*/
let cid = channel_ids.fetch_add(1, Ordering::Relaxed);
let channel = Channel::new(cid, local_pid);
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
let channel = Channel::new(cid, self.local_pid, self.metrics.clone());
if send_handshake {
channel.send_handshake(&mut part_in_sender).await;
}
pool.spawn_ok(
self.pool.spawn_ok(
channel
.run(
protocol,
@ -624,7 +626,7 @@ impl Scheduler {
)
.instrument(tracing::info_span!("channel", ?addr)),
);
unknown_channels
self.unknown_channels
.write()
.await
.insert(cid, (part_in_sender, pid_sender));

View File

@ -1,5 +1,4 @@
use rand::Rng;
use serde::{Deserialize, Serialize};
pub type Mid = u64;
pub type Cid = u64;
@ -13,26 +12,26 @@ pub const PROMISES_GUARANTEED_DELIVERY: Promises = 4;
pub const PROMISES_COMPRESSED: Promises = 8;
pub const PROMISES_ENCRYPTED: Promises = 16;
pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = [86, 69, 76, 79, 82, 69, 78]; //VELOREN
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 2, 0];
pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0);
pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Hash, Clone, Copy)]
pub struct Pid {
internal: u128,
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Hash, Clone, Copy)]
pub(crate) struct Sid {
internal: u64,
}
// Used for Communication between Channel <----(TCP/UDP)----> Channel
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug)]
pub(crate) enum Frame {
Handshake {
magic_number: String,
magic_number: [u8; 7],
version: [u32; 3],
},
ParticipantId {
@ -54,7 +53,7 @@ pub(crate) enum Frame {
length: u64,
},
Data {
id: Mid,
mid: Mid,
start: u64,
data: Vec<u8>,
},
@ -63,7 +62,7 @@ pub(crate) enum Frame {
Raw(Vec<u8>),
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug)]
pub(crate) enum Requestor {
User,
Api,
@ -87,10 +86,26 @@ impl Pid {
internal: pid as u128,
}
}
pub(crate) fn to_le_bytes(&self) -> [u8; 16] { self.internal.to_le_bytes() }
pub(crate) fn from_le_bytes(bytes: [u8; 16]) -> Self {
Self {
internal: u128::from_le_bytes(bytes),
}
}
}
impl Sid {
pub const fn new(internal: u64) -> Self { Self { internal } }
pub(crate) fn to_le_bytes(&self) -> [u8; 8] { self.internal.to_le_bytes() }
pub(crate) fn from_le_bytes(bytes: [u8; 8]) -> Self {
Self {
internal: u64::from_le_bytes(bytes),
}
}
}
impl std::fmt::Debug for Pid {
@ -101,6 +116,10 @@ impl std::fmt::Debug for Pid {
}
}
impl From<Pid> for u128 {
fn from(pid: Pid) -> Self { pid.internal }
}
impl std::ops::AddAssign for Sid {
fn add_assign(&mut self, other: Self) {
*self = Self {

View File

@ -59,8 +59,8 @@ pub async fn network_participant_stream(
Stream,
) {
let pool = ThreadPoolBuilder::new().num_threads(2).build();
let n_a = Network::new(Pid::fake(1), &pool);
let n_b = Network::new(Pid::fake(2), &pool);
let n_a = Network::new(Pid::fake(1), &pool, None);
let n_b = Network::new(Pid::fake(2), &pool, None);
n_a.listen(addr.clone()).await.unwrap();
let p1_b = n_b.connect(addr).await.unwrap();