diff --git a/network/Cargo.toml b/network/Cargo.toml index cbe271cd3f..3c791e32b7 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -8,8 +8,6 @@ edition = "2018" [dependencies] -#threadpool -uvth = "3.1" #serialisation bincode = "1.2" serde = { version = "1.0" } @@ -18,7 +16,7 @@ async-std = { version = "~1.5", features = ["std"] } #tracing and metrics tracing = { version = "0.1", default-features = false } tracing-futures = "0.2" -prometheus = "0.7" +prometheus = { version = "0.7", default-features = false } #async futures = { version = "0.3", features = ["thread-pool"] } #mpsc channel registry @@ -26,4 +24,5 @@ lazy_static = { version = "1.4", default-features = false } rand = { version = "0.7" } [dev-dependencies] -tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } \ No newline at end of file +tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } +uvth = { version = "3.1", default-features = false } \ No newline at end of file diff --git a/network/examples/async_recv/Cargo.lock b/network/examples/async_recv/Cargo.lock deleted file mode 100644 index d015d8bb72..0000000000 --- a/network/examples/async_recv/Cargo.lock +++ /dev/null @@ -1,978 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -[[package]] -name = "aho-corasick" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" -dependencies = [ - "memchr", -] - -[[package]] -name = "ansi_term" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" -dependencies = [ - "winapi 0.3.8", -] - -[[package]] -name = "async-recv" -version = "0.1.0" -dependencies = [ - "bincode", - "chrono", - "clap", - "futures", - "serde", - "tracing", - "tracing-subscriber", - "uvth", - "veloren_network", -] - -[[package]] -name = "async-std" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267" -dependencies = [ - "async-task", - "crossbeam-channel 0.4.2", - "crossbeam-deque", - "crossbeam-utils 0.7.2", - "futures-core", - "futures-io", - "futures-timer", - "kv-log-macro", - "log", - "memchr", - "mio", - "mio-uds", - "num_cpus", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", -] - -[[package]] -name = "async-task" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ac2c016b079e771204030951c366db398864f5026f84a44dafb0ff20f02085d" -dependencies = [ - "libc", - "winapi 0.3.8", -] - -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi 0.3.8", -] - -[[package]] -name = "autocfg" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" - -[[package]] -name = "bincode" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" -dependencies = [ - "byteorder", - "serde", -] - -[[package]] -name = "bitflags" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" - -[[package]] -name = "byteorder" -version = "1.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" - -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - -[[package]] -name = "chrono" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" -dependencies = [ - "num-integer", - "num-traits", - "time", -] - -[[package]] -name = "clap" -version = "2.33.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" -dependencies = [ - "ansi_term", - "atty", - "bitflags", - "strsim", - "textwrap", - "unicode-width", - "vec_map", -] - -[[package]] -name = "crossbeam-channel" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" -dependencies = [ - "crossbeam-utils 0.6.6", -] - -[[package]] -name = "crossbeam-channel" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" -dependencies = [ - "crossbeam-utils 0.7.2", - "maybe-uninit", -] - -[[package]] -name = "crossbeam-deque" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils 0.7.2", - "maybe-uninit", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" -dependencies = [ - "autocfg", - "cfg-if", - "crossbeam-utils 0.7.2", - "lazy_static", - "maybe-uninit", - "memoffset", - "scopeguard", -] - -[[package]] -name = "crossbeam-utils" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" -dependencies = [ - "cfg-if", - "lazy_static", -] - -[[package]] -name = "crossbeam-utils" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" -dependencies = [ - "autocfg", - "cfg-if", - "lazy_static", -] - -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - -[[package]] -name = "fuchsia-zircon" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" -dependencies = [ - "bitflags", - "fuchsia-zircon-sys", -] - -[[package]] -name = "fuchsia-zircon-sys" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" - -[[package]] -name = "futures" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" -dependencies = [ - "futures-core", - "futures-sink", -] - -[[package]] -name = "futures-core" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" - -[[package]] -name = "futures-executor" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", - "num_cpus", -] - -[[package]] -name = "futures-io" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" - -[[package]] -name = "futures-macro" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" -dependencies = [ - "proc-macro-hack", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "futures-sink" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" - -[[package]] -name = "futures-task" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" -dependencies = [ - "once_cell", -] - -[[package]] -name = "futures-timer" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" - -[[package]] -name = "futures-util" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project", - "pin-utils", - "proc-macro-hack", - "proc-macro-nested", - "slab", -] - -[[package]] -name = "getrandom" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - -[[package]] -name = "hermit-abi" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91780f809e750b0a89f5544be56617ff6b1227ee485bcb06ebe10cdf89bd3b71" -dependencies = [ - "libc", -] - -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - -[[package]] -name = "itoa" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" - -[[package]] -name = "kernel32-sys" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - -[[package]] -name = "kv-log-macro" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ff57d6d215f7ca7eb35a9a64d656ba4d9d2bef114d741dc08048e75e2f5d418" -dependencies = [ - "log", -] - -[[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - -[[package]] -name = "libc" -version = "0.2.71" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" - -[[package]] -name = "log" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - -[[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - -[[package]] -name = "memchr" -version = "2.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" - -[[package]] -name = "memoffset" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4fc2c02a7e374099d4ee95a193111f72d2110197fe200272371758f6c3643d8" -dependencies = [ - "autocfg", -] - -[[package]] -name = "mio" -version = "0.6.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" -dependencies = [ - "cfg-if", - "fuchsia-zircon", - "fuchsia-zircon-sys", - "iovec", - "kernel32-sys", - "libc", - "log", - "miow", - "net2", - "slab", - "winapi 0.2.8", -] - -[[package]] -name = "mio-uds" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" -dependencies = [ - "iovec", - "libc", - "mio", -] - -[[package]] -name = "miow" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" -dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", -] - -[[package]] -name = "net2" -version = "0.2.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7" -dependencies = [ - "cfg-if", - "libc", - "winapi 0.3.8", -] - -[[package]] -name = "num-integer" -version = "0.1.42" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" -dependencies = [ - "autocfg", - "num-traits", -] - -[[package]] -name = "num-traits" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" -dependencies = [ - "autocfg", -] - -[[package]] -name = "num_cpus" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" -dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "once_cell" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" - -[[package]] -name = "pin-project" -version = "0.4.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "pin-project-lite" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9df32da11d84f3a7d70205549562966279adb900e080fad3dccd8e64afccf0ad" - -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - -[[package]] -name = "ppv-lite86" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" - -[[package]] -name = "proc-macro-hack" -version = "0.5.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" - -[[package]] -name = "proc-macro-nested" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" - -[[package]] -name = "proc-macro2" -version = "1.0.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" -dependencies = [ - "unicode-xid", -] - -[[package]] -name = "prometheus" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "protobuf", - "quick-error", - "spin", -] - -[[package]] -name = "protobuf" -version = "2.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e86d370532557ae7573551a1ec8235a0f8d6cb276c7c9e6aa490b511c447485" - -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - -[[package]] -name = "quote" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom", - "libc", - "rand_chacha", - "rand_core", - "rand_hc", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core", -] - -[[package]] -name = "regex" -version = "1.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", - "thread_local", -] - -[[package]] -name = "regex-automata" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" - -[[package]] -name = "ryu" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" - -[[package]] -name = "scopeguard" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" - -[[package]] -name = "serde" -version = "1.0.111" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9124df5b40cbd380080b2cc6ab894c040a3070d995f5c9dc77e18c34a8ae37d" -dependencies = [ - "serde_derive", -] - -[[package]] -name = "serde_derive" -version = "1.0.111" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2c3ac8e6ca1e9c80b8be1023940162bf81ae3cffbb1809474152f2ce1eb250" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "serde_json" -version = "1.0.53" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" -dependencies = [ - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "sharded-slab" -version = "0.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "slab" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" - -[[package]] -name = "smallvec" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" - -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - -[[package]] -name = "syn" -version = "1.0.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a56fabc59dce20fe48b6c832cc249c713e7ed88fa28b0ee0a3bfcaae5fe4e2" -dependencies = [ - "proc-macro2", - "quote", - "unicode-xid", -] - -[[package]] -name = "textwrap" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] - -[[package]] -name = "thread_local" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "time" -version = "0.1.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" -dependencies = [ - "libc", - "winapi 0.3.8", -] - -[[package]] -name = "tracing" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7c6b59d116d218cb2d990eb06b77b64043e0268ef7323aae63d8b30ae462923" -dependencies = [ - "cfg-if", - "tracing-attributes", - "tracing-core", -] - -[[package]] -name = "tracing-attributes" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tracing-core" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "tracing-futures" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" -dependencies = [ - "pin-project", - "tracing", -] - -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "tracing-core", - "tracing-log", - "tracing-serde", -] - -[[package]] -name = "unicode-width" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" - -[[package]] -name = "unicode-xid" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" - -[[package]] -name = "uvth" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e59a167890d173eb0fcd7a1b99b84dc05c521ae8d76599130b8e19bef287abbf" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - -[[package]] -name = "vec_map" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" - -[[package]] -name = "veloren_network" -version = "0.1.0" -dependencies = [ - "async-std", - "bincode", - "futures", - "lazy_static", - "prometheus", - "rand", - "serde", - "tracing", - "tracing-futures", - "uvth", -] - -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - -[[package]] -name = "winapi" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" - -[[package]] -name = "winapi" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-build" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "ws2_32-sys" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] diff --git a/network/examples/async_recv/Cargo.toml b/network/examples/async_recv/Cargo.toml deleted file mode 100644 index ceb362c679..0000000000 --- a/network/examples/async_recv/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[workspace] - -[package] -name = "async-recv" -version = "0.1.0" -authors = ["Marcel Märtens "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -uvth = "3.1" -network = { package = "veloren_network", path = "../../../network" } -clap = "2.33" -futures = "0.3" -tracing = "0.1" -chrono = "0.4" -tracing-subscriber = "0.2.3" -bincode = "1.2" -serde = { version = "1.0", features = ["derive"] } \ No newline at end of file diff --git a/network/examples/async_recv/src/main.rs b/network/examples/async_recv/src/main.rs deleted file mode 100644 index 2c547e35c5..0000000000 --- a/network/examples/async_recv/src/main.rs +++ /dev/null @@ -1,202 +0,0 @@ -//!run with -//! ```bash -//! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run) -//! ``` -use chrono::prelude::*; -use clap::{App, Arg}; -use futures::executor::block_on; -use network::{Address, Network, Pid, Stream, PROMISES_NONE}; -use serde::{Deserialize, Serialize}; -use std::{ - thread, - time::{Duration, Instant}, -}; -use tracing::*; -use tracing_subscriber::EnvFilter; -use uvth::ThreadPoolBuilder; - -#[derive(Serialize, Deserialize, Debug)] -enum Msg { - Ping(u64), - Pong(u64), -} - -/// This utility checks if async functionality of veloren-network works -/// correctly and outputs it at the end -fn main() { - let matches = App::new("Veloren Async Prove Utility") - .version("0.1.0") - .author("Marcel Märtens ") - .about("proves that veloren-network runs async") - .arg( - Arg::with_name("mode") - .short("m") - .long("mode") - .takes_value(true) - .possible_values(&["server", "client", "both"]) - .default_value("both") - .help( - "choose whether you want to start the server or client or both needed for \ - this program", - ), - ) - .arg( - Arg::with_name("port") - .short("p") - .long("port") - .takes_value(true) - .default_value("52000") - .help("port to listen on"), - ) - .arg( - Arg::with_name("ip") - .long("ip") - .takes_value(true) - .default_value("127.0.0.1") - .help("ip to listen and connect to"), - ) - .arg( - Arg::with_name("protocol") - .long("protocol") - .takes_value(true) - .default_value("tcp") - .possible_values(&["tcp", "upd", "mpsc"]) - .help( - "underlying protocol used for this test, mpsc can only combined with mode=both", - ), - ) - .arg( - Arg::with_name("trace") - .short("t") - .long("trace") - .takes_value(true) - .default_value("warn") - .possible_values(&["trace", "debug", "info", "warn", "error"]) - .help("set trace level, not this has a performance impact!"), - ) - .get_matches(); - - if let Some(trace) = matches.value_of("trace") { - let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap()); - tracing_subscriber::FmtSubscriber::builder() - .with_max_level(Level::TRACE) - .with_env_filter(filter) - .init(); - }; - let port: u16 = matches.value_of("port").unwrap().parse().unwrap(); - let ip: &str = matches.value_of("ip").unwrap(); - let address = match matches.value_of("protocol") { - Some("tcp") => Address::Tcp(format!("{}:{}", ip, port).parse().unwrap()), - Some("udp") => Address::Udp(format!("{}:{}", ip, port).parse().unwrap()), - _ => panic!("invalid mode, run --help!"), - }; - - let mut background = None; - match matches.value_of("mode") { - Some("server") => server(address), - Some("client") => client(address), - Some("both") => { - let address1 = address.clone(); - background = Some(thread::spawn(|| server(address1))); - thread::sleep(Duration::from_millis(200)); //start client after server - client(address) - }, - _ => panic!("invalid mode, run --help!"), - }; - if let Some(background) = background { - background.join().unwrap(); - } -} - -fn server(address: Address) { - let thread_pool = ThreadPoolBuilder::new().build(); - let server = Network::new(Pid::new(), &thread_pool, None); - block_on(server.listen(address.clone())).unwrap(); //await - println!("waiting for client"); - - let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 - let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 - let mut s2 = block_on(p1.opened()).unwrap(); //remote representation of s2 - let t1 = thread::spawn(move || { - if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { - thread::sleep(Duration::from_millis(3000)); - s1.send(Msg::Pong(id)).unwrap(); - println!("[{}], send s1_1", Utc::now().time()); - } - if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { - thread::sleep(Duration::from_millis(3000)); - s1.send(Msg::Pong(id)).unwrap(); - println!("[{}], send s1_2", Utc::now().time()); - } - thread::sleep(Duration::from_millis(10000)); - }); - let t2 = thread::spawn(move || { - if let Ok(Msg::Ping(id)) = block_on(s2.recv()) { - thread::sleep(Duration::from_millis(1000)); - s2.send(Msg::Pong(id)).unwrap(); - println!("[{}], send s2_1", Utc::now().time()); - } - if let Ok(Msg::Ping(id)) = block_on(s2.recv()) { - thread::sleep(Duration::from_millis(1000)); - s2.send(Msg::Pong(id)).unwrap(); - println!("[{}], send s2_2", Utc::now().time()); - } - thread::sleep(Duration::from_millis(10000)); - }); - t1.join().unwrap(); - t2.join().unwrap(); - thread::sleep(Duration::from_millis(50)); -} - -async fn async_task1(mut s: Stream) -> u64 { - s.send(Msg::Ping(100)).unwrap(); - println!("[{}], s1_1...", Utc::now().time()); - let m1: Result = s.recv().await; - println!("[{}], s1_1: {:?}", Utc::now().time(), m1); - thread::sleep(Duration::from_millis(1000)); - s.send(Msg::Ping(101)).unwrap(); - println!("[{}], s1_2...", Utc::now().time()); - let m2: Result = s.recv().await; - println!("[{}], s1_2: {:?}", Utc::now().time(), m2); - match m2.unwrap() { - Msg::Pong(id) => id, - _ => panic!("wrong answer"), - } -} - -async fn async_task2(mut s: Stream) -> u64 { - s.send(Msg::Ping(200)).unwrap(); - println!("[{}], s2_1...", Utc::now().time()); - let m1: Result = s.recv().await; - println!("[{}], s2_1: {:?}", Utc::now().time(), m1); - thread::sleep(Duration::from_millis(5000)); - s.send(Msg::Ping(201)).unwrap(); - println!("[{}], s2_2...", Utc::now().time()); - let m2: Result = s.recv().await; - println!("[{}], s2_2: {:?}", Utc::now().time(), m2); - match m2.unwrap() { - Msg::Pong(id) => id, - _ => panic!("wrong answer"), - } -} - -fn client(address: Address) { - let thread_pool = ThreadPoolBuilder::new().build(); - let client = Network::new(Pid::new(), &thread_pool, None); - - let p1 = block_on(client.connect(address.clone())).unwrap(); //remote representation of p1 - let s1 = block_on(p1.open(16, PROMISES_NONE)).unwrap(); //remote representation of s1 - let s2 = block_on(p1.open(16, PROMISES_NONE)).unwrap(); //remote representation of s2 - let before = Instant::now(); - block_on(async { - let f1 = async_task1(s1); - let f2 = async_task2(s2); - let _ = futures::join!(f1, f2); - }); - if before.elapsed() < Duration::from_secs(13) { - println!("IT WORKS!"); - } else { - println!("doesn't seem to work :/") - } - thread::sleep(Duration::from_millis(50)); -} diff --git a/network/examples/chat/Cargo.lock b/network/examples/chat/Cargo.lock index 148709ed50..8839dcce09 100644 --- a/network/examples/chat/Cargo.lock +++ b/network/examples/chat/Cargo.lock @@ -25,9 +25,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267" dependencies = [ "async-task", - "crossbeam-channel 0.4.2", + "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "futures-core", "futures-io", "futures-timer", @@ -53,17 +53,6 @@ dependencies = [ "winapi 0.3.8", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi 0.3.8", -] - [[package]] name = "autocfg" version = "1.0.0" @@ -115,22 +104,9 @@ version = "2.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" dependencies = [ - "ansi_term", - "atty", "bitflags", - "strsim", "textwrap", "unicode-width", - "vec_map", -] - -[[package]] -name = "crossbeam-channel" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" -dependencies = [ - "crossbeam-utils 0.6.6", ] [[package]] @@ -139,7 +115,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" dependencies = [ - "crossbeam-utils 0.7.2", + "crossbeam-utils", "maybe-uninit", ] @@ -150,7 +126,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "maybe-uninit", ] @@ -162,23 +138,13 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg", "cfg-if", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "lazy_static", "maybe-uninit", "memoffset", "scopeguard", ] -[[package]] -name = "crossbeam-utils" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" -dependencies = [ - "cfg-if", - "lazy_static", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -339,12 +305,6 @@ dependencies = [ "libc", ] -[[package]] -name = "itoa" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" - [[package]] name = "kernel32-sys" version = "0.2.2" @@ -479,7 +439,6 @@ dependencies = [ "serde", "tracing", "tracing-subscriber", - "uvth", "veloren_network", ] @@ -586,17 +545,10 @@ dependencies = [ "cfg-if", "fnv", "lazy_static", - "protobuf", "quick-error", "spin", ] -[[package]] -name = "protobuf" -version = "2.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e86d370532557ae7573551a1ec8235a0f8d6cb276c7c9e6aa490b511c447485" - [[package]] name = "quick-error" version = "1.2.3" @@ -687,12 +639,6 @@ version = "0.6.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" -[[package]] -name = "ryu" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76" - [[package]] name = "scopeguard" version = "1.1.0" @@ -704,16 +650,19 @@ name = "serde" version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" +dependencies = [ + "serde_derive", +] [[package]] -name = "serde_json" -version = "1.0.51" +name = "serde_derive" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da07b57ee2623368351e9a0488bb0b261322a15a6e0ae53e243cbdc0f4208da9" +checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" dependencies = [ - "itoa", - "ryu", - "serde", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -743,12 +692,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - [[package]] name = "syn" version = "1.0.17" @@ -796,20 +739,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1721cc8cf7d770cc4257872507180f35a4797272f5962f24c806af9e7faf52ab" dependencies = [ "cfg-if", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbad39da2f9af1cae3016339ad7f2c7a9e870f12e8fd04c4fd7ef35b30c0d2b" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "tracing-core" version = "0.1.10" @@ -829,27 +761,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" -dependencies = [ - "serde", - "tracing-core", -] - [[package]] name = "tracing-subscriber" version = "0.2.4" @@ -861,13 +772,9 @@ dependencies = [ "lazy_static", "matchers", "regex", - "serde", - "serde_json", "sharded-slab", "smallvec", "tracing-core", - "tracing-log", - "tracing-serde", ] [[package]] @@ -882,23 +789,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" -[[package]] -name = "uvth" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e59a167890d173eb0fcd7a1b99b84dc05c521ae8d76599130b8e19bef287abbf" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - -[[package]] -name = "vec_map" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" - [[package]] name = "veloren_network" version = "0.1.0" @@ -912,7 +802,6 @@ dependencies = [ "serde", "tracing", "tracing-futures", - "uvth", ] [[package]] diff --git a/network/examples/chat/Cargo.toml b/network/examples/chat/Cargo.toml index cc86dbc2b4..a5291966cf 100644 --- a/network/examples/chat/Cargo.toml +++ b/network/examples/chat/Cargo.toml @@ -9,12 +9,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -uvth = "3.1" network = { package = "veloren_network", path = "../../../network" } -clap = "2.33" +clap = { version = "2.33", default-features = false } async-std = { version = "1.5", default-features = false } -futures = "0.3" -tracing = "0.1" -tracing-subscriber = "0.2.3" +futures = { version = "0.3", default-features = false } +tracing = { version = "0.1", default-features = false } +tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } bincode = "1.2" -serde = "1.0" \ No newline at end of file +serde = { version = "1.0", features = ["derive"] } \ No newline at end of file diff --git a/network/examples/chat/src/main.rs b/network/examples/chat/src/main.rs index 1ddb0fca0a..a3a4fabcab 100644 --- a/network/examples/chat/src/main.rs +++ b/network/examples/chat/src/main.rs @@ -10,10 +10,9 @@ use network::{Address, Network, Participant, Pid, PROMISES_CONSISTENCY, PROMISES use std::{sync::Arc, thread, time::Duration}; use tracing::*; use tracing_subscriber::EnvFilter; -use uvth::ThreadPoolBuilder; ///This example contains a simple chatserver, that allows to send messages -/// between participants +/// between participants, it's neither pretty nor perfect, but it should show how to integrate network fn main() { let matches = App::new("Chat example") .version("0.1.0") @@ -100,8 +99,9 @@ fn main() { } fn server(address: Address) { - let thread_pool = ThreadPoolBuilder::new().build(); - let server = Arc::new(Network::new(Pid::new(), &thread_pool, None)); + let (server, f) = Network::new(Pid::new(), None); + let server = Arc::new(server); + std::thread::spawn(f); let pool = ThreadPool::new().unwrap(); block_on(async { server.listen(address).await.unwrap(); @@ -124,13 +124,17 @@ async fn client_connection(network: Arc, participant: Arc) }, Ok(msg) => { println!("[{}]: {}", username, msg); - let parts = network.participants().await; - for p in parts.values() { - let mut s = p + let mut parts = network.participants().await; + for (_, p) in parts.drain() { + match p .open(32, PROMISES_ORDERED | PROMISES_CONSISTENCY) - .await - .unwrap(); - s.send((username.clone(), msg.clone())).unwrap(); + .await { + Err(_) => { + //probably disconnected, remove it + network.disconnect(p).await.unwrap(); + }, + Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(), + }; } }, } @@ -139,8 +143,8 @@ async fn client_connection(network: Arc, participant: Arc) } fn client(address: Address) { - let thread_pool = ThreadPoolBuilder::new().build(); - let client = Network::new(Pid::new(), &thread_pool, None); + let (client, f) = Network::new(Pid::new(), None); + std::thread::spawn(f); let pool = ThreadPool::new().unwrap(); block_on(async { diff --git a/network/examples/fileshare/Cargo.lock b/network/examples/fileshare/Cargo.lock index 4bf8e8870b..de5da54e7e 100644 --- a/network/examples/fileshare/Cargo.lock +++ b/network/examples/fileshare/Cargo.lock @@ -37,9 +37,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267" dependencies = [ "async-task", - "crossbeam-channel 0.4.2", + "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "futures-core", "futures-io", "futures-timer", @@ -65,17 +65,6 @@ dependencies = [ "winapi 0.3.8", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi 0.3.8", -] - [[package]] name = "autocfg" version = "1.0.0" @@ -144,13 +133,9 @@ version = "2.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" dependencies = [ - "ansi_term", - "atty", "bitflags", - "strsim", "textwrap", "unicode-width", - "vec_map", ] [[package]] @@ -159,22 +144,13 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "crossbeam-channel" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" -dependencies = [ - "crossbeam-utils 0.6.6", -] - [[package]] name = "crossbeam-channel" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" dependencies = [ - "crossbeam-utils 0.7.2", + "crossbeam-utils", "maybe-uninit", ] @@ -185,7 +161,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "maybe-uninit", ] @@ -197,23 +173,13 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg", "cfg-if", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "lazy_static", "maybe-uninit", "memoffset", "scopeguard", ] -[[package]] -name = "crossbeam-utils" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" -dependencies = [ - "cfg-if", - "lazy_static", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -260,7 +226,6 @@ dependencies = [ "shellexpand", "tracing", "tracing-subscriber", - "uvth", "veloren_network", ] @@ -413,12 +378,6 @@ dependencies = [ "libc", ] -[[package]] -name = "itoa" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" - [[package]] name = "kernel32-sys" version = "0.2.2" @@ -645,17 +604,10 @@ dependencies = [ "cfg-if", "fnv", "lazy_static", - "protobuf", "quick-error", "spin", ] -[[package]] -name = "protobuf" -version = "2.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e86d370532557ae7573551a1ec8235a0f8d6cb276c7c9e6aa490b511c447485" - [[package]] name = "quick-error" version = "1.2.3" @@ -766,15 +718,9 @@ dependencies = [ "base64", "blake2b_simd", "constant_time_eq", - "crossbeam-utils 0.7.2", + "crossbeam-utils", ] -[[package]] -name = "ryu" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76" - [[package]] name = "scopeguard" version = "1.1.0" @@ -801,17 +747,6 @@ dependencies = [ "syn", ] -[[package]] -name = "serde_json" -version = "1.0.51" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da07b57ee2623368351e9a0488bb0b261322a15a6e0ae53e243cbdc0f4208da9" -dependencies = [ - "itoa", - "ryu", - "serde", -] - [[package]] name = "sharded-slab" version = "0.0.8" @@ -848,12 +783,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - [[package]] name = "syn" version = "1.0.17" @@ -901,20 +830,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1721cc8cf7d770cc4257872507180f35a4797272f5962f24c806af9e7faf52ab" dependencies = [ "cfg-if", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbad39da2f9af1cae3016339ad7f2c7a9e870f12e8fd04c4fd7ef35b30c0d2b" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "tracing-core" version = "0.1.10" @@ -934,27 +852,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" -dependencies = [ - "serde", - "tracing-core", -] - [[package]] name = "tracing-subscriber" version = "0.2.4" @@ -966,13 +863,9 @@ dependencies = [ "lazy_static", "matchers", "regex", - "serde", - "serde_json", "sharded-slab", "smallvec", "tracing-core", - "tracing-log", - "tracing-serde", ] [[package]] @@ -987,23 +880,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" -[[package]] -name = "uvth" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e59a167890d173eb0fcd7a1b99b84dc05c521ae8d76599130b8e19bef287abbf" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - -[[package]] -name = "vec_map" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" - [[package]] name = "veloren_network" version = "0.1.0" @@ -1017,7 +893,6 @@ dependencies = [ "serde", "tracing", "tracing-futures", - "uvth", ] [[package]] diff --git a/network/examples/fileshare/Cargo.toml b/network/examples/fileshare/Cargo.toml index f175a55f1b..492985e51a 100644 --- a/network/examples/fileshare/Cargo.toml +++ b/network/examples/fileshare/Cargo.toml @@ -9,13 +9,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -uvth = "3.1" network = { package = "veloren_network", path = "../../../network" } -clap = "2.33" +clap = { version = "2.33", default-features = false } async-std = { version = "1.5", default-features = false } -futures = "0.3" -tracing = "0.1" -tracing-subscriber = "0.2.3" +futures = { version = "0.3", default-features = false } +tracing = { version = "0.1", default-features = false } +tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } bincode = "1.2" serde = { version = "1.0", features = ["derive"] } rand = "0.7.3" diff --git a/network/examples/fileshare/src/server.rs b/network/examples/fileshare/src/server.rs index 9628e4f384..f6312a58b1 100644 --- a/network/examples/fileshare/src/server.rs +++ b/network/examples/fileshare/src/server.rs @@ -8,7 +8,6 @@ use futures::{channel::mpsc, future::FutureExt, stream::StreamExt}; use network::{Address, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED}; use std::{collections::HashMap, sync::Arc}; use tracing::*; -use uvth::ThreadPoolBuilder; #[derive(Debug)] struct ControlChannels { @@ -27,8 +26,8 @@ impl Server { pub fn new() -> (Self, mpsc::UnboundedSender) { let (command_sender, command_receiver) = mpsc::unbounded(); - let thread_pool = ThreadPoolBuilder::new().build(); - let network = Network::new(Pid::new(), &thread_pool, None); + let (network, f) = Network::new(Pid::new(), None); + std::thread::spawn(f); let run_channels = Some(ControlChannels { command_receiver }); ( diff --git a/network/examples/network-speed/Cargo.lock b/network/examples/network-speed/Cargo.lock index 2fcebd2eb7..58b125e281 100644 --- a/network/examples/network-speed/Cargo.lock +++ b/network/examples/network-speed/Cargo.lock @@ -31,9 +31,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267" dependencies = [ "async-task", - "crossbeam-channel 0.4.2", + "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "futures-core", "futures-io", "futures-timer", @@ -59,17 +59,6 @@ dependencies = [ "winapi 0.3.8", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi 0.3.8", -] - [[package]] name = "autocfg" version = "1.0.0" @@ -127,22 +116,9 @@ version = "2.33.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" dependencies = [ - "ansi_term", - "atty", "bitflags", - "strsim", "textwrap", "unicode-width", - "vec_map", -] - -[[package]] -name = "crossbeam-channel" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" -dependencies = [ - "crossbeam-utils 0.6.6", ] [[package]] @@ -151,7 +127,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" dependencies = [ - "crossbeam-utils 0.7.2", + "crossbeam-utils", "maybe-uninit", ] @@ -162,7 +138,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "maybe-uninit", ] @@ -174,23 +150,13 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg", "cfg-if", - "crossbeam-utils 0.7.2", + "crossbeam-utils", "lazy_static", "maybe-uninit", "memoffset", "scopeguard", ] -[[package]] -name = "crossbeam-utils" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" -dependencies = [ - "cfg-if", - "lazy_static", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -366,12 +332,6 @@ dependencies = [ "libc", ] -[[package]] -name = "itoa" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" - [[package]] name = "kernel32-sys" version = "0.2.2" @@ -513,7 +473,6 @@ dependencies = [ "tiny_http", "tracing", "tracing-subscriber", - "uvth", "veloren_network", ] @@ -721,12 +680,6 @@ version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" -[[package]] -name = "ryu" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" - [[package]] name = "scopeguard" version = "1.1.0" @@ -753,17 +706,6 @@ dependencies = [ "syn", ] -[[package]] -name = "serde_json" -version = "1.0.53" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" -dependencies = [ - "itoa", - "ryu", - "serde", -] - [[package]] name = "sharded-slab" version = "0.0.9" @@ -791,12 +733,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - [[package]] name = "syn" version = "1.0.30" @@ -856,21 +792,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7c6b59d116d218cb2d990eb06b77b64043e0268ef7323aae63d8b30ae462923" dependencies = [ "cfg-if", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tracing-core" version = "0.1.10" @@ -890,27 +814,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" -dependencies = [ - "serde", - "tracing-core", -] - [[package]] name = "tracing-subscriber" version = "0.2.5" @@ -922,13 +825,9 @@ dependencies = [ "lazy_static", "matchers", "regex", - "serde", - "serde_json", "sharded-slab", "smallvec", "tracing-core", - "tracing-log", - "tracing-serde", ] [[package]] @@ -972,23 +871,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "uvth" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e59a167890d173eb0fcd7a1b99b84dc05c521ae8d76599130b8e19bef287abbf" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - -[[package]] -name = "vec_map" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" - [[package]] name = "veloren_network" version = "0.1.0" @@ -1002,7 +884,6 @@ dependencies = [ "serde", "tracing", "tracing-futures", - "uvth", ] [[package]] diff --git a/network/examples/network-speed/Cargo.toml b/network/examples/network-speed/Cargo.toml index 40d7c22395..10ec82e375 100644 --- a/network/examples/network-speed/Cargo.toml +++ b/network/examples/network-speed/Cargo.toml @@ -9,12 +9,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -uvth = "3.1" network = { package = "veloren_network", path = "../../../network" } -clap = "2.33" -futures = "0.3" -tracing = "0.1" -tracing-subscriber = "0.2.3" +clap = { version = "2.33", default-features = false } +futures = { version = "0.3", default-features = false } +tracing = { version = "0.1", default-features = false } +tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } bincode = "1.2" prometheus = "0.7" tiny_http = "0.7.0" diff --git a/network/examples/network-speed/src/main.rs b/network/examples/network-speed/src/main.rs index 77410c1499..3e702ae2ce 100644 --- a/network/examples/network-speed/src/main.rs +++ b/network/examples/network-speed/src/main.rs @@ -16,7 +16,6 @@ use std::{ }; use tracing::*; use tracing_subscriber::EnvFilter; -use uvth::ThreadPoolBuilder; #[derive(Serialize, Deserialize, Debug)] enum Msg { @@ -120,9 +119,9 @@ fn main() { } fn server(address: Address) { - let thread_pool = ThreadPoolBuilder::new().num_threads(1).build(); let mut metrics = metrics::SimpleMetrics::new(); - let server = Network::new(Pid::new(), &thread_pool, Some(metrics.registry())); + let (server, f) = Network::new(Pid::new(), Some(metrics.registry())); + std::thread::spawn(f); metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap(); block_on(server.listen(address)).unwrap(); @@ -148,9 +147,9 @@ fn server(address: Address) { } fn client(address: Address) { - let thread_pool = ThreadPoolBuilder::new().num_threads(1).build(); let mut metrics = metrics::SimpleMetrics::new(); - let client = Network::new(Pid::new(), &thread_pool, Some(metrics.registry())); + let (client, f) = Network::new(Pid::new(), Some(metrics.registry())); + std::thread::spawn(f); metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap(); let p1 = block_on(client.connect(address.clone())).unwrap(); //remote representation of p1 diff --git a/network/src/api.rs b/network/src/api.rs index a07a7c86f6..05be3c3f3b 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -3,7 +3,7 @@ //! //! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run) use crate::{ - message::{self, IncomingMessage, MessageBuffer, OutgoingMessage}, + message::{self, partial_eq_bincode, IncomingMessage, MessageBuffer, OutgoingMessage}, scheduler::Scheduler, types::{Mid, Pid, Prio, Promises, Sid}, }; @@ -25,7 +25,6 @@ use std::{ }; use tracing::*; use tracing_futures::Instrument; -use uvth::ThreadPool; /// Represents a Tcp or Udp or Mpsc address #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -96,9 +95,10 @@ pub enum ParticipantError { } /// Error type thrown by [`Streams`](Stream) methods -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub enum StreamError { StreamClosed, + DeserializeError(Box), } /// Use the `Network` to create connections to other [`Participants`] @@ -115,15 +115,16 @@ pub enum StreamError { /// # Examples /// ```rust /// use veloren_network::{Network, Address, Pid}; -/// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application -/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); +/// let (network, f) = Network::new(Pid::new(), None); +/// std::thread::spawn(f); /// block_on(async{ /// # //setup pseudo database! -/// # let database = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); +/// # let (database, fd) = Network::new(Pid::new(), None); +/// # std::thread::spawn(fd); /// # database.listen(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// network.listen(Address::Tcp("127.0.0.1:2999".parse().unwrap())).await?; /// let database = network.connect(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?; @@ -152,49 +153,75 @@ impl Network { /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` - /// * `thread_pool` - you need to provide a [`ThreadPool`] where exactly 1 - /// thread will be created to handle all `Network` internals. Additional - /// threads will be allocated on an internal async-aware threadpool /// * `registry` - Provide a Registy in order to collect Prometheus metrics /// by this `Network`, `None` will deactivate Tracing. Tracing is done via /// [`prometheus`] /// + /// # Result + /// * `Self` - returns a `Network` which can be `Send` to multiple areas of + /// your code, including multiple threads. This is the base strct of this + /// crate. + /// * `FnOnce` - you need to run the returning FnOnce exactly once, probably + /// in it's own thread. this is NOT done internally, so that you are free + /// to choose the threadpool implementation of your choice. We recommend + /// using [`ThreadPool`] from [`uvth`] crate. This fn will runn the + /// Scheduler to handle all `Network` internals. Additional threads will + /// be allocated on an internal async-aware threadpool + /// /// # Examples /// ```rust + /// //Example with uvth /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let pool = ThreadPoolBuilder::new().build(); + /// let (network, f) = Network::new(Pid::new(), None); + /// pool.execute(f); /// ``` /// - /// Usually you only create a single `Network` for an application, except - /// when client and server are in the same application, then you will want - /// 2. However there are no technical limitations from creating more. + /// ```rust + /// //Example with std::thread + /// use veloren_network::{Address, Network, Pid}; + /// + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// ``` + /// + /// Usually you only create a single `Network` for an appliregistrycation, + /// except when client and server are in the same application, then you + /// will want 2. However there are no technical limitations from + /// creating more. /// /// [`Pid::new()`]: crate::types::Pid::new - /// [`ThreadPool`]: uvth::ThreadPool - pub fn new(participant_id: Pid, thread_pool: &ThreadPool, registry: Option<&Registry>) -> Self { + /// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html + /// [`uvth`]: https://docs.rs/uvth + pub fn new( + participant_id: Pid, + registry: Option<&Registry>, + ) -> (Self, impl std::ops::FnOnce()) { let p = participant_id; debug!(?p, "starting Network"); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = Scheduler::new(participant_id, registry); - thread_pool.execute(move || { - trace!(?p, "starting sheduler in own thread"); - let _handle = task::block_on( - scheduler - .run() - .instrument(tracing::info_span!("scheduler", ?p)), - ); - trace!(?p, "stopping sheduler and his own thread"); - }); - Self { - local_pid: participant_id, - participants: RwLock::new(HashMap::new()), - listen_sender: RwLock::new(listen_sender), - connect_sender: RwLock::new(connect_sender), - connected_receiver: RwLock::new(connected_receiver), - shutdown_sender: Some(shutdown_sender), - } + ( + Self { + local_pid: participant_id, + participants: RwLock::new(HashMap::new()), + listen_sender: RwLock::new(listen_sender), + connect_sender: RwLock::new(connect_sender), + connected_receiver: RwLock::new(connected_receiver), + shutdown_sender: Some(shutdown_sender), + }, + move || { + trace!(?p, "starting sheduler in own thread"); + let _handle = task::block_on( + scheduler + .run() + .instrument(tracing::info_span!("scheduler", ?p)), + ); + trace!(?p, "stopping sheduler and his own thread"); + }, + ) } /// starts listening on an [`Address`]. @@ -207,12 +234,12 @@ impl Network { /// # Examples /// ```rust /// use futures::executor::block_on; - /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); /// block_on(async { /// network /// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap())) @@ -248,13 +275,14 @@ impl Network { /// can't connect, or invalid Handshake) # Examples /// ```rust /// use futures::executor::block_on; - /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// # remote.listen(Address::Tcp("0.0.0.0:2010".parse().unwrap())).await?; /// # remote.listen(Address::Udp("0.0.0.0:2011".parse().unwrap())).await?; @@ -311,13 +339,14 @@ impl Network { /// # Examples /// ```rust /// use futures::executor::block_on; - /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2020` TCP and opens returns their Pid - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// network /// .listen(Address::Tcp("0.0.0.0:2020".parse().unwrap())) @@ -358,16 +387,22 @@ impl Network { /// Except if the remote side already dropped the [`Participant`] /// simultaneously, then messages won't be sended /// + /// There is NO `disconnected` function in `Network`, if a [`Participant`] + /// is no longer reachable (e.g. as the network cable was unplugged) the + /// [`Participant`] will fail all action, but needs to be manually + /// disconected, using this function. + /// /// # Examples /// ```rust /// use futures::executor::block_on; - /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// network /// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap())) @@ -425,9 +460,12 @@ impl Network { Ok(()) } - /// returns a copy of all current connected [`Participants`] + /// returns a copy of all current connected [`Participants`], + /// including ones, which can't send data anymore as the underlying sockets + /// are closed already but haven't been [`disconnected`] yet. /// /// [`Participants`]: crate::api::Participant + /// [`disconnected`]: Network::disconnect pub async fn participants(&self) -> HashMap> { self.participants.read().await.clone() } @@ -471,13 +509,14 @@ impl Participant { /// # Examples /// ```rust /// use futures::executor::block_on; - /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// # remote.listen(Address::Tcp("0.0.0.0:2100".parse().unwrap())).await?; /// let p1 = network @@ -532,14 +571,15 @@ impl Participant { /// # Examples /// ```rust /// use veloren_network::{Network, Pid, Address, PROMISES_ORDERED, PROMISES_CONSISTENCY}; - /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2110 and wait for the other side to open a stream /// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream. - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// # remote.listen(Address::Tcp("0.0.0.0:2110".parse().unwrap())).await?; /// let p1 = network.connect(Address::Tcp("127.0.0.1:2110".parse().unwrap())).await?; @@ -581,6 +621,7 @@ impl Participant { } impl Stream { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( pid: Pid, sid: Sid, @@ -632,13 +673,14 @@ impl Stream { /// ``` /// use veloren_network::{Network, Address, Pid}; /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; - /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World` - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// network.listen(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await?; @@ -671,14 +713,16 @@ impl Stream { /// use veloren_network::{Network, Address, Pid, MessageBuffer}; /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; /// use futures::executor::block_on; - /// use uvth::ThreadPoolBuilder; /// use bincode; /// use std::sync::Arc; /// /// # fn main() -> std::result::Result<(), Box> { - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote1 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote2 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote1, fr1) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr1); + /// # let (remote2, fr2) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr2); /// block_on(async { /// network.listen(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote1_p = remote1.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?; @@ -734,13 +778,14 @@ impl Stream { /// ``` /// use veloren_network::{Network, Address, Pid}; /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; - /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it - /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); /// block_on(async { /// network.listen(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?; @@ -756,7 +801,7 @@ impl Stream { /// ``` #[inline] pub async fn recv(&mut self) -> Result { - Ok(message::deserialize(self.recv_raw().await?)) + Ok(message::deserialize(self.recv_raw().await?)?) } /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is @@ -788,15 +833,12 @@ impl Drop for Network { // `self.participants` as the `disconnect` fn needs it. let mut participant_clone = self.participants().await; for (_, p) in participant_clone.drain() { - match self.disconnect(p).await { - Err(e) => { - error!( - ?e, - "error while dropping network, the error occured when dropping a \ - participant but can't be notified to the user any more" - ); - }, - _ => (), + if let Err(e) = self.disconnect(p).await { + error!( + ?e, + "error while dropping network, the error occured when dropping a \ + participant but can't be notified to the user any more" + ); } } self.participants.write().await.clear(); @@ -936,16 +978,23 @@ impl From for NetworkError { fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed } } +impl From> for StreamError { + fn from(err: Box) -> Self { StreamError::DeserializeError(err) } +} + impl core::fmt::Display for StreamError { - fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { StreamError::StreamClosed => write!(f, "stream closed"), + StreamError::DeserializeError(err) => { + write!(f, "deserialize error on message: {}", err) + }, } } } impl core::fmt::Display for ParticipantError { - fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { ParticipantError::ParticipantClosed => write!(f, "participant closed"), } @@ -953,7 +1002,7 @@ impl core::fmt::Display for ParticipantError { } impl core::fmt::Display for NetworkError { - fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { NetworkError::NetworkClosed => write!(f, "network closed"), NetworkError::ListenFailed(_) => write!(f, "listening failed"), @@ -961,6 +1010,22 @@ impl core::fmt::Display for NetworkError { } } +/// implementing PartialEq as it's super convenient in tests +impl core::cmp::PartialEq for StreamError { + fn eq(&self, other: &Self) -> bool { + match self { + StreamError::StreamClosed => match other { + StreamError::StreamClosed => true, + StreamError::DeserializeError(_) => false, + }, + StreamError::DeserializeError(err) => match other { + StreamError::StreamClosed => false, + StreamError::DeserializeError(other_err) => partial_eq_bincode(err, other_err), + }, + } + } +} + impl std::error::Error for StreamError {} impl std::error::Error for ParticipantError {} impl std::error::Error for NetworkError {} diff --git a/network/src/channel.rs b/network/src/channel.rs index fa9729d42a..b62f08938a 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -139,6 +139,7 @@ impl Handshake { }, }; + #[allow(clippy::unit_arg)] match res { Ok(res) => { let mut leftover_frames = vec![]; @@ -278,7 +279,7 @@ impl Handshake { STREAM_ID_OFFSET2 }; info!(?pid, "this Handshake is now configured!"); - return Ok((pid, stream_id_offset, secret)); + Ok((pid, stream_id_offset, secret)) }, Some((_, Frame::Shutdown)) => { info!("shutdown signal received"); @@ -286,7 +287,7 @@ impl Handshake { .frames_in_total .with_label_values(&[&pid_string, &cid_string, "Shutdown"]) .inc(); - return Err(()); + Err(()) }, Some((_, Frame::Raw(bytes))) => { self.metrics @@ -297,17 +298,17 @@ impl Handshake { Ok(string) => error!(?string, ERR_S), _ => error!(?bytes, ERR_S), } - return Err(()); + Err(()) }, Some((_, frame)) => { self.metrics .frames_in_total .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) .inc(); - return Err(()); + Err(()) }, - None => return Err(()), - }; + None => Err(()), + } } async fn send_handshake(&self, c2w_frame_s: &mut mpsc::UnboundedSender) { diff --git a/network/src/lib.rs b/network/src/lib.rs index c5ff2e87c4..36568f2dc7 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,4 +1,6 @@ #![deny(unsafe_code)] +#![cfg_attr(test, deny(rust_2018_idioms))] +#![cfg_attr(test, deny(warnings))] #![feature(try_trait, const_if_match)] //! Crate to handle high level networking of messages with different @@ -38,13 +40,13 @@ //! ```rust //! use async_std::task::sleep; //! use futures::{executor::block_on, join}; -//! use uvth::ThreadPoolBuilder; //! use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; //! //! // Client //! async fn client() -> std::result::Result<(), Box> { //! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen` -//! let client_network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); +//! let (client_network, f) = Network::new(Pid::new(), None); +//! std::thread::spawn(f); //! let server = client_network //! .connect(Address::Tcp("127.0.0.1:12345".parse().unwrap())) //! .await?; @@ -57,7 +59,8 @@ //! //! // Server //! async fn server() -> std::result::Result<(), Box> { -//! let server_network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); +//! let (server_network, f) = Network::new(Pid::new(), None); +//! std::thread::spawn(f); //! server_network //! .listen(Address::Tcp("127.0.0.1:12345".parse().unwrap())) //! .await?; diff --git a/network/src/message.rs b/network/src/message.rs index 56de80910d..e1460eaaab 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,8 +1,7 @@ -use bincode; use serde::{de::DeserializeOwned, Serialize}; //use std::collections::VecDeque; use crate::types::{Mid, Sid}; -use std::sync::Arc; +use std::{io, sync::Arc}; //Todo: Evaluate switching to VecDeque for quickly adding and removing data // from front, back. @@ -40,16 +39,69 @@ pub(crate) fn serialize(message: &M) -> MessageBuffer { MessageBuffer { data: writer } } -pub(crate) fn deserialize(buffer: MessageBuffer) -> M { +//pub(crate) fn deserialize(buffer: MessageBuffer) -> +// std::Result> { +pub(crate) fn deserialize(buffer: MessageBuffer) -> bincode::Result { let span = buffer.data; //this might fail if you choose the wrong type for M. in that case probably X // got transfered while you assume Y. probably this means your application // logic is wrong. E.g. You expect a String, but just get a u8. - let decoded: M = bincode::deserialize(span.as_slice()).expect( - "deserialisation failed, this is probably due to a programming error on YOUR side, \ - probably the type send by remote isn't what you are expecting. change the type of `M`", - ); - decoded + bincode::deserialize(span.as_slice()) +} + +///wouldn't trust this aaaassss much, fine for tests +pub(crate) fn partial_eq_io_error(first: &io::Error, second: &io::Error) -> bool { + if let Some(f) = first.raw_os_error() { + if let Some(s) = second.raw_os_error() { + f == s + } else { + false + } + } else { + let fk = first.kind(); + fk == second.kind() && fk != io::ErrorKind::Other + } +} + +pub(crate) fn partial_eq_bincode(first: &bincode::ErrorKind, second: &bincode::ErrorKind) -> bool { + match *first { + bincode::ErrorKind::Io(ref f) => match *second { + bincode::ErrorKind::Io(ref s) => partial_eq_io_error(f, s), + _ => false, + }, + bincode::ErrorKind::InvalidUtf8Encoding(f) => match *second { + bincode::ErrorKind::InvalidUtf8Encoding(s) => f == s, + _ => false, + }, + bincode::ErrorKind::InvalidBoolEncoding(f) => match *second { + bincode::ErrorKind::InvalidBoolEncoding(s) => f == s, + _ => false, + }, + bincode::ErrorKind::InvalidCharEncoding => match *second { + bincode::ErrorKind::InvalidCharEncoding => true, + _ => false, + }, + bincode::ErrorKind::InvalidTagEncoding(f) => match *second { + bincode::ErrorKind::InvalidTagEncoding(s) => f == s, + _ => false, + }, + bincode::ErrorKind::DeserializeAnyNotSupported => match *second { + bincode::ErrorKind::DeserializeAnyNotSupported => true, + _ => false, + }, + bincode::ErrorKind::SizeLimit => match *second { + bincode::ErrorKind::SizeLimit => true, + _ => false, + }, + bincode::ErrorKind::SequenceMustHaveLength => match *second { + bincode::ErrorKind::SequenceMustHaveLength => true, + _ => false, + }, + bincode::ErrorKind::Custom(ref f) => match *second { + bincode::ErrorKind::Custom(ref s) => f == s, + _ => false, + }, + } } impl std::fmt::Debug for MessageBuffer { diff --git a/network/src/participant.rs b/network/src/participant.rs index c3161a248b..0dec87fd70 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -42,6 +42,7 @@ struct StreamInfo { } #[derive(Debug)] +#[allow(clippy::type_complexity)] struct ControlChannels { a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, b2a_stream_opened_s: mpsc::UnboundedSender, @@ -65,6 +66,7 @@ pub struct BParticipant { } impl BParticipant { + #[allow(clippy::type_complexity)] pub(crate) fn new( remote_pid: Pid, offset_sid: Sid, @@ -208,7 +210,14 @@ impl BParticipant { self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn send_frame(&self, frame: Frame, frames_out_total_cache: &mut PidCidFrameCache) { + //retruns false if sending isn't possible. In that case we have to render the + // Participant `closed` + #[must_use = "You need to check if the send was successful and report to client!"] + async fn send_frame( + &self, + frame: Frame, + frames_out_total_cache: &mut PidCidFrameCache, + ) -> bool { // find out ideal channel here //TODO: just take first let mut lock = self.channels.write().await; @@ -232,9 +241,18 @@ impl BParticipant { longer work in the first place" ); }; + //TODO + warn!( + "FIXME: the frame is actually drop. which is fine for now as the participant \ + will be closed, but not if we do channel-takeover" + ); + false + } else { + true } } else { error!("participant has no channel to communicate on"); + false } } @@ -365,6 +383,7 @@ impl BParticipant { self.running_mgr.fetch_sub(1, Ordering::Relaxed); } + #[allow(clippy::type_complexity)] async fn create_channel_mgr( &self, s2b_create_channel_r: mpsc::UnboundedReceiver<( @@ -440,17 +459,22 @@ impl BParticipant { let stream = self .create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s) .await; - self.send_frame( - Frame::OpenStream { - sid, - prio, - promises, - }, - &mut send_cache, - ) - .await; - p2a_return_stream.send(stream).unwrap(); - stream_ids += Sid::from(1); + if self + .send_frame( + Frame::OpenStream { + sid, + prio, + promises, + }, + &mut send_cache, + ) + .await + { + //On error, we drop this, so it gets closed and client will handle this as an + // Err any way (: + p2a_return_stream.send(stream).unwrap(); + stream_ids += Sid::from(1); + } } trace!("stop open_mgr"); self.running_mgr.fetch_sub(1, Ordering::Relaxed); diff --git a/network/src/prios.rs b/network/src/prios.rs index ed5206246a..dac46270ee 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -50,6 +50,7 @@ impl PrioManager { 310419, 356578, 409600, 470507, 540470, 620838, ]; + #[allow(clippy::type_complexity)] pub fn new( metrics: Arc, pid: String, @@ -275,8 +276,9 @@ impl PrioManager { cnt.len -= 1; if cnt.len == 0 { let cnt = self.sid_owned.remove(&sid).unwrap(); - cnt.empty_notify - .map(|empty_notify| empty_notify.send(()).unwrap()); + if let Some(empty_notify) = cnt.empty_notify { + empty_notify.send(()).unwrap(); + } } } else { error!(?msg.mid, "repush message"); diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 8e6043b0b0..9cd0db19cb 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -104,7 +104,7 @@ impl TcpProtocol { let frame = match frame_no { FRAME_HANDSHAKE => { let mut bytes = [0u8; 19]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let magic_number = [ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], ]; @@ -119,7 +119,7 @@ impl TcpProtocol { }, FRAME_INIT => { let mut bytes = [0u8; 16]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let pid = Pid::from_le_bytes(bytes); stream.read_exact(&mut bytes).await.unwrap(); let secret = u128::from_le_bytes(bytes); @@ -128,7 +128,7 @@ impl TcpProtocol { FRAME_SHUTDOWN => Frame::Shutdown, FRAME_OPEN_STREAM => { let mut bytes = [0u8; 10]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let sid = Sid::from_le_bytes([ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], @@ -143,7 +143,7 @@ impl TcpProtocol { }, FRAME_CLOSE_STREAM => { let mut bytes = [0u8; 8]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let sid = Sid::from_le_bytes([ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], @@ -152,7 +152,7 @@ impl TcpProtocol { }, FRAME_DATA_HEADER => { let mut bytes = [0u8; 24]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let mid = Mid::from_le_bytes([ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], @@ -169,7 +169,7 @@ impl TcpProtocol { }, FRAME_DATA => { let mut bytes = [0u8; 18]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let mid = Mid::from_le_bytes([ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], @@ -181,22 +181,22 @@ impl TcpProtocol { let length = u16::from_le_bytes([bytes[16], bytes[17]]); let mut data = vec![0; length as usize]; throughput_cache.inc_by(length as i64); - Self::read_except_or_close(cid, &mut stream, &mut data, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await; Frame::Data { mid, start, data } }, FRAME_RAW => { let mut bytes = [0u8; 2]; - Self::read_except_or_close(cid, &mut stream, &mut bytes, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await; let length = u16::from_le_bytes([bytes[0], bytes[1]]); let mut data = vec![0; length as usize]; - Self::read_except_or_close(cid, &mut stream, &mut data, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await; Frame::Raw(data) }, _ => { // report a RAW frame, but cannot rely on the next 2 bytes to be a size. // guessing 256 bytes, which might help to sort down issues let mut data = vec![0; 256]; - Self::read_except_or_close(cid, &mut stream, &mut data, w2c_cid_frame_s).await; + Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await; Frame::Raw(data) }, }; @@ -683,9 +683,7 @@ impl UdpProtocol { let x = (data.len() as u16).to_le_bytes(); buffer[17] = x[0]; buffer[18] = x[1]; - for i in 0..data.len() { - buffer[19 + i] = data[i]; - } + buffer[19..(data.len() + 19)].clone_from_slice(&data[..]); throughput_cache.inc_by(data.len() as i64); 19 + data.len() }, @@ -695,9 +693,7 @@ impl UdpProtocol { let x = (data.len() as u16).to_le_bytes(); buffer[1] = x[0]; buffer[2] = x[1]; - for i in 0..data.len() { - buffer[3 + i] = data[i]; - } + buffer[3..(data.len() + 3)].clone_from_slice(&data[..]); 3 + data.len() }, }; diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 1483388656..7179a8491d 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -31,6 +31,7 @@ use tracing::*; use tracing_futures::Instrument; #[derive(Debug)] +#[allow(clippy::type_complexity)] struct ParticipantInfo { secret: u128, s2b_create_channel_s: @@ -78,6 +79,7 @@ pub struct Scheduler { } impl Scheduler { + #[allow(clippy::type_complexity)] pub fn new( local_pid: Pid, registry: Option<&Registry>, @@ -159,7 +161,7 @@ impl Scheduler { trace!("start listen_mgr"); a2s_listen_r .for_each_concurrent(None, |(address, s2a_listen_result_s)| { - let address = address.clone(); + let address = address; async move { debug!(?address, "got request to open a channel_creator"); @@ -397,13 +399,16 @@ impl Scheduler { } { let mut datavec = Vec::with_capacity(size); datavec.extend_from_slice(&data[0..size]); + //Due to the async nature i cannot make of .entry() as it would lead to a still + // borrowed in another branch situation + #[allow(clippy::map_entry)] if !listeners.contains_key(&remote_addr) { info!("Accepting Udp from: {}", &remote_addr); let (udp_data_sender, udp_data_receiver) = mpsc::unbounded::>(); - listeners.insert(remote_addr.clone(), udp_data_sender); + listeners.insert(remote_addr, udp_data_sender); let protocol = UdpProtocol::new( socket.clone(), - remote_addr.clone(), + remote_addr, self.metrics.clone(), udp_data_receiver, ); diff --git a/network/src/types.rs b/network/src/types.rs index 2d7855b7bc..fc0fb8c698 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -142,11 +142,10 @@ impl Pid { /// /// # Example /// ```rust - /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Network, Pid}; /// /// let pid = Pid::new(); - /// let _network = Network::new(pid, &ThreadPoolBuilder::new().build(), None); + /// let _ = Network::new(pid, None); /// ``` pub fn new() -> Self { Self { @@ -196,28 +195,20 @@ impl std::fmt::Debug for Pid { write!( f, "{}", - sixlet_to_str((self.internal >> i * BITS_PER_SIXLET) & 0x3F) + sixlet_to_str((self.internal >> (i * BITS_PER_SIXLET)) & 0x3F) )?; } Ok(()) } } +impl Default for Pid { + fn default() -> Self { Pid::new() } +} + impl std::fmt::Display for Pid { #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - const BITS_PER_SIXLET: usize = 6; - //only print last 6 chars of number as full u128 logs are unreadable - const CHAR_COUNT: usize = 6; - for i in 0..CHAR_COUNT { - write!( - f, - "{}", - sixlet_to_str((self.internal >> i * BITS_PER_SIXLET) & 0x3F) - )?; - } - Ok(()) - } + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self) } } impl std::ops::AddAssign for Sid { diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 3970601aba..f043074e8e 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -10,7 +10,6 @@ use std::{ }; use tracing::*; use tracing_subscriber::EnvFilter; -use uvth::ThreadPoolBuilder; use veloren_network::{Address, Network, Participant, Pid, Stream, PROMISES_NONE}; #[allow(dead_code)] @@ -60,9 +59,10 @@ pub async fn network_participant_stream( Arc, Stream, ) { - let pool = ThreadPoolBuilder::new().num_threads(2).build(); - let n_a = Network::new(Pid::fake(1), &pool, None); - let n_b = Network::new(Pid::fake(2), &pool, None); + let (n_a, f_a) = Network::new(Pid::fake(1), None); + std::thread::spawn(f_a); + let (n_b, f_b) = Network::new(Pid::fake(2), None); + std::thread::spawn(f_b); n_a.listen(addr.clone()).await.unwrap(); let p1_b = n_b.connect(addr).await.unwrap(); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 2514dd32bb..fe40810eb3 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -1,10 +1,9 @@ use async_std::task; use task::block_on; -use veloren_network::NetworkError; +use veloren_network::{NetworkError, StreamError}; mod helper; use helper::{network_participant_stream, tcp, udp}; use std::io::ErrorKind; -use uvth::ThreadPoolBuilder; use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; #[test] @@ -63,8 +62,10 @@ fn stream_simple_udp_3msg() { #[ignore] fn tcp_and_udp_2_connections() -> std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); - let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + let (network, f) = Network::new(Pid::new(), None); + let (remote, fr) = Network::new(Pid::new(), None); + std::thread::spawn(f); + std::thread::spawn(fr); block_on(async { remote .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap())) @@ -86,14 +87,16 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); - let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + let (network, f) = Network::new(Pid::new(), None); + std::thread::spawn(f); let udp1 = udp(); let tcp1 = tcp(); block_on(network.listen(udp1.clone()))?; block_on(network.listen(tcp1.clone()))?; std::thread::sleep(std::time::Duration::from_millis(200)); - let network2 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + let (network2, f2) = Network::new(Pid::new(), None); + std::thread::spawn(f2); let e1 = block_on(network2.listen(udp1)); let e2 = block_on(network2.listen(tcp1)); match e1 { @@ -117,8 +120,10 @@ fn api_stream_send_main() -> std::result::Result<(), Box> let (_, _) = helper::setup(false, 0); // Create a Network, listen on Port `1200` and wait for a Stream to be opened, // then answer `Hello World` - let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + let (network, f) = Network::new(Pid::new(), None); + let (remote, fr) = Network::new(Pid::new(), None); + std::thread::spawn(f); + std::thread::spawn(fr); block_on(async { network .listen(Address::Tcp("127.0.0.1:1200".parse().unwrap())) @@ -143,8 +148,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> let (_, _) = helper::setup(false, 0); // Create a Network, listen on Port `1220` and wait for a Stream to be opened, // then listen on it - let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); - let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); + let (network, f) = Network::new(Pid::new(), None); + let (remote, fr) = Network::new(Pid::new(), None); + std::thread::spawn(f); + std::thread::spawn(fr); block_on(async { network .listen(Address::Tcp("127.0.0.1:1220".parse().unwrap())) @@ -165,11 +172,13 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> } #[test] -#[should_panic] fn wrong_parse() { let (_, _) = helper::setup(false, 0); let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send(1337).unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); + match block_on(s1_b.recv::()) { + Err(StreamError::DeserializeError(_)) => assert!(true), + _ => assert!(false, "this should fail, but it doesnt!"), + } }