config log && run server

This commit is contained in:
appflowy 2021-08-19 14:08:24 +08:00
parent 7adb75e85d
commit eac2e562c3
26 changed files with 228 additions and 187 deletions

View File

@ -1,5 +1,5 @@
[package]
name = "server"
name = "backend"
version = "0.1.0"
edition = "2018"
@ -22,9 +22,12 @@ serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_repr = "0.1"
flowy-log = { path = "../rust-lib/flowy-log" }
[lib]
path = "src/lib.rs"
[[bin]]
name = "flowy_server"
name = "backend"
path = "src/main.rs"

View File

@ -1,4 +1,4 @@
use crate::{config::Config, ws::WSServer};
use crate::{config::Config, ws_service::WSServer};
use actix::Addr;
use std::sync::Arc;

View File

@ -3,4 +3,4 @@ mod context;
mod errors;
mod routers;
pub mod startup;
mod ws;
pub mod ws_service;

View File

@ -1,4 +1,4 @@
use server::startup::{init_app_context, run};
use backend::startup::{init_app_context, run};
use std::net::TcpListener;
#[actix_web::main]

30
backend/src/routers/ws.rs Normal file
View File

@ -0,0 +1,30 @@
use crate::ws_service::{entities::SessionId, WSClient, WSServer};
use actix::Addr;
use actix_http::{body::Body, Response};
use actix_web::{
get,
web::{Data, Path, Payload},
Error,
HttpRequest,
HttpResponse,
};
use actix_web_actors::ws;
#[get("/{token}")]
pub async fn start_connection(
request: HttpRequest,
payload: Payload,
Path(token): Path<String>,
server: Data<Addr<WSServer>>,
) -> Result<HttpResponse, Error> {
let client = WSClient::new(SessionId::new(token), server.get_ref().clone());
let result = ws::start(client, &request, payload);
match result {
Ok(response) => Ok(response.into()),
Err(e) => {
log::error!("ws connection error: {:?}", e);
Err(e)
},
}
}

View File

@ -1,4 +1,4 @@
use crate::{context::AppContext, routers::*, ws::WSServer};
use crate::{context::AppContext, routers::*, ws_service::WSServer};
use actix::Actor;
use actix_web::{dev::Server, middleware, web, App, HttpServer, Scope};
use std::{net::TcpListener, sync::Arc};
@ -19,6 +19,12 @@ pub fn run(app_ctx: Arc<AppContext>, listener: TcpListener) -> Result<Server, st
fn ws_scope() -> Scope { web::scope("/ws").service(ws::start_connection) }
pub async fn init_app_context() -> Arc<AppContext> {
let _ = flowy_log::Builder::new("flowy").env_filter("Debug").build();
// std::env::set_var("RUST_LOG", "info");
// env_logger::init();
// log::debug!("EnvTask initialization");
let ws_server = WSServer::new().start();
let ctx = AppContext::new(ws_server);
Arc::new(ctx)

View File

@ -1,13 +1,29 @@
use crate::{errors::ServerError, ws::Packet};
use crate::{errors::ServerError, ws_service::ClientMessage};
use actix::{Message, Recipient};
use serde::{Deserialize, Serialize};
use std::fmt::Formatter;
pub type Socket = Recipient<ClientMessage>;
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
pub struct SessionId {
pub id: String,
}
pub struct Session {
pub id: SessionId,
pub socket: Socket,
}
impl std::convert::From<Connect> for Session {
fn from(c: Connect) -> Self {
Self {
id: c.sid,
socket: c.socket,
}
}
}
impl SessionId {
pub fn new(id: String) -> Self { SessionId { id } }
}
@ -22,7 +38,7 @@ impl std::fmt::Display for SessionId {
#[derive(Debug, Message, Clone)]
#[rtype(result = "Result<(), ServerError>")]
pub struct Connect {
pub socket: Recipient<Packet>,
pub socket: Socket,
pub sid: SessionId,
}

View File

@ -0,0 +1,37 @@
use crate::ws_service::entities::SessionId;
use actix::Message;
use bytes::Bytes;
use std::fmt::Formatter;
#[derive(Debug, Clone)]
pub enum MessageData {
Text(String),
Binary(Bytes),
Connect(SessionId),
Disconnect(String),
}
#[derive(Debug, Message, Clone)]
#[rtype(result = "()")]
pub struct ClientMessage {
pub sid: SessionId,
pub data: MessageData,
}
impl ClientMessage {
pub fn new(sid: SessionId, data: MessageData) -> Self { ClientMessage { sid, data } }
}
impl std::fmt::Display for ClientMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let content = match &self.data {
MessageData::Text(t) => format!("[Text]: {}", t),
MessageData::Binary(_) => "[Binary message]".to_owned(),
MessageData::Connect(_) => "Connect".to_owned(),
MessageData::Disconnect(_) => "Disconnect".to_owned(),
};
let desc = format!("{}:{}", &self.sid, content);
f.write_str(&desc)
}
}

View File

@ -0,0 +1,5 @@
pub use connect::*;
pub use message::*;
mod connect;
pub mod message;

View File

@ -0,0 +1,7 @@
pub use entities::message::*;
pub use ws_client::*;
pub use ws_server::*;
pub(crate) mod entities;
mod ws_client;
mod ws_server;

View File

@ -1,9 +1,9 @@
use crate::{
config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
ws::{
ws_service::{
entities::{Connect, Disconnect, SessionId},
Frame,
Packet,
ClientMessage,
MessageData,
WSServer,
},
};
@ -16,6 +16,7 @@ use actix::{
AsyncContext,
ContextFutureSpawner,
Handler,
Recipient,
Running,
StreamHandler,
WrapFuture,
@ -24,13 +25,13 @@ use actix::{
use actix_web_actors::{ws, ws::Message::Text};
use std::time::Instant;
pub struct WSSession {
pub struct WSClient {
sid: SessionId,
server: Addr<WSServer>,
hb: Instant,
}
impl WSSession {
impl WSClient {
pub fn new(sid: SessionId, server: Addr<WSServer>) -> Self {
Self {
sid,
@ -52,7 +53,16 @@ impl WSSession {
});
}
fn connect(&self, ctx: &mut ws::WebsocketContext<Self>) {
fn send(&self, data: MessageData) {
let msg = ClientMessage::new(self.sid.clone(), data);
self.server.do_send(msg);
}
}
impl Actor for WSClient {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
let socket = ctx.address().recipient();
let connect = Connect {
@ -75,17 +85,6 @@ impl WSSession {
.wait(ctx);
}
fn send(&self, frame: Frame) {
let msg = Packet::new(self.sid.clone(), frame);
self.server.do_send(msg);
}
}
impl Actor for WSSession {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) { self.connect(ctx); }
fn stopping(&mut self, _: &mut Self::Context) -> Running {
self.server.do_send(Disconnect {
sid: self.sid.clone(),
@ -95,7 +94,7 @@ impl Actor for WSSession {
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
@ -105,12 +104,12 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
},
Ok(ws::Message::Pong(msg)) => {
log::debug!("Receive {} pong {:?}", &self.sid, &msg);
self.send(Frame::Connect(self.sid.clone()));
self.send(MessageData::Connect(self.sid.clone()));
self.hb = Instant::now();
},
Ok(ws::Message::Binary(bin)) => {
log::debug!(" Receive {} binary", &self.sid);
self.send(Frame::Binary(bin));
self.send(MessageData::Binary(bin));
},
Ok(ws::Message::Close(reason)) => {
log::debug!("Receive {} close {:?}", &self.sid, &reason);
@ -125,7 +124,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
},
Ok(Text(s)) => {
log::debug!("Receive {} text {:?}", &self.sid, &s);
self.send(Frame::Text(s));
self.send(MessageData::Text(s));
},
Err(e) => {
@ -138,22 +137,22 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
}
}
impl Handler<Packet> for WSSession {
impl Handler<ClientMessage> for WSClient {
type Result = ();
fn handle(&mut self, msg: Packet, ctx: &mut Self::Context) {
match msg.frame {
Frame::Text(text) => {
fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
match msg.data {
MessageData::Text(text) => {
ctx.text(text);
},
Frame::Binary(binary) => {
MessageData::Binary(binary) => {
ctx.binary(binary);
},
Frame::Connect(sid) => {
MessageData::Connect(sid) => {
let connect_msg = format!("{} connect", &sid);
ctx.text(connect_msg);
},
Frame::Disconnect(text) => {
MessageData::Disconnect(text) => {
log::debug!("Session start disconnecting {}", self.sid);
ctx.text(text);
ctx.stop();

View File

@ -0,0 +1,59 @@
use crate::{
errors::ServerError,
ws_service::{
entities::{Connect, Disconnect, Session, SessionId},
ClientMessage,
WSClient,
},
};
use actix::{Actor, Context, Handler};
use dashmap::DashMap;
pub struct WSServer {
sessions: DashMap<SessionId, Session>,
}
impl WSServer {
pub fn new() -> Self {
Self {
sessions: DashMap::new(),
}
}
pub fn send(&self, _msg: ClientMessage) { unimplemented!() }
}
impl Actor for WSServer {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {}
}
impl Handler<Connect> for WSServer {
type Result = Result<(), ServerError>;
fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
let session: Session = msg.into();
self.sessions.insert(session.id.clone(), session);
Ok(())
}
}
impl Handler<Disconnect> for WSServer {
type Result = Result<(), ServerError>;
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
self.sessions.remove(&msg.sid);
Ok(())
}
}
impl Handler<ClientMessage> for WSServer {
type Result = ();
fn handle(&mut self, msg: ClientMessage, _ctx: &mut Context<Self>) -> Self::Result {}
}
impl actix::Supervised for WSServer {
fn restarting(&mut self, _ctx: &mut Context<WSServer>) {
log::warn!("restarting");
}
}

View File

@ -6,12 +6,13 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tracing = { version = "0.1" }
tracing-log = { version = "0.1.1"}
tracing-futures = "0.2.4"
tracing-subscriber = { version = "0.2.12", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.2.2"
tracing-appender = "0.1"
tracing = { version = "0.1", features = ["log"] }
log = "0.4.14"
[features]

View File

@ -6,20 +6,16 @@ use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_log::LogTracer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
pub struct FlowyLogBuilder {
pub struct Builder {
name: String,
env_filter: String,
directory: String,
}
impl FlowyLogBuilder {
pub fn new(name: &str, directory: impl AsRef<Path>) -> Self {
let directory = directory.as_ref().to_str().unwrap().to_owned();
FlowyLogBuilder {
impl Builder {
pub fn new(name: &str) -> Self {
Builder {
name: name.to_owned(),
env_filter: "Info".to_owned(),
directory,
}
}
@ -28,6 +24,15 @@ impl FlowyLogBuilder {
self
}
pub fn local(mut self, directory: impl AsRef<Path>) -> Self {
let directory = directory.as_ref().to_str().unwrap().to_owned();
let local_file_name = format!("{}.log", &self.name);
let file_appender = tracing_appender::rolling::daily(directory, local_file_name);
let (_non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
self
}
pub fn build(self) -> std::result::Result<(), String> {
let env_filter = EnvFilter::new(self.env_filter);
@ -44,17 +49,12 @@ impl FlowyLogBuilder {
if cfg!(feature = "use_bunyan") {
let formatting_layer = BunyanFormattingLayer::new(self.name.clone(), std::io::stdout);
let local_file_name = format!("{}.log", &self.name);
let file_appender =
tracing_appender::rolling::daily(self.directory.clone(), local_file_name);
let (_non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
let _ = set_global_default(subscriber.with(JsonStorageLayer).with(formatting_layer))
.map_err(|e| format!("{:?}", e))?;
} else {
let _ = set_global_default(subscriber).map_err(|e| format!("{:?}", e))?;
}
let _ = LogTracer::builder()
.with_max_level(LevelFilter::Trace)
.init()
@ -65,12 +65,6 @@ impl FlowyLogBuilder {
}
}
pub fn init_log(name: &str, directory: &str, env_filter: &str) -> std::result::Result<(), String> {
FlowyLogBuilder::new(name, directory)
.env_filter(env_filter)
.build()
}
#[cfg(test)]
mod tests {
use super::*;
@ -81,9 +75,10 @@ mod tests {
y: f32,
}
// run cargo test --features="use_bunyan" or cargo test
#[test]
fn test_log() {
init_log("flowy", ".", "Debug").unwrap();
let _ = Builder::new("flowy").env_filter("debug").build();
tracing::info!("😁 Tracing info log");
let pos = Position {
@ -93,5 +88,12 @@ mod tests {
tracing::debug!(?pos.x, ?pos.y);
log::debug!("😁 bridge 'log' to 'tracing'");
say("hello world");
}
#[tracing::instrument(name = "say")]
fn say(s: &str) {
log::info!("{}", s);
}
}

View File

@ -42,7 +42,11 @@ impl FlowySDK {
fn init_log(directory: &str) {
if !INIT_LOG.load(Ordering::SeqCst) {
INIT_LOG.store(true, Ordering::SeqCst);
flowy_log::init_log("flowy", directory, "Debug").unwrap();
let _ = flowy_log::Builder::new("flowy")
.local(directory)
.env_filter("Debug")
.build();
}
}

View File

@ -1,22 +0,0 @@
use crate::ws::{entities::SessionId, WSServer, WSSession};
use actix::Addr;
use actix_web::{
get,
web::{Data, Path, Payload},
Error,
HttpRequest,
HttpResponse,
};
use actix_web_actors::ws;
#[get("/{token}")]
pub async fn start_connection(
request: HttpRequest,
payload: Payload,
Path(token): Path<String>,
server: Data<Addr<WSServer>>,
) -> Result<HttpResponse, Error> {
let ws = WSSession::new(SessionId::new(token), server.get_ref().clone());
let response = ws::start(ws, &request, payload)?;
Ok(response.into())
}

View File

@ -1,5 +0,0 @@
pub use connect::*;
pub use packet::*;
mod connect;
pub mod packet;

View File

@ -1,37 +0,0 @@
use crate::ws::entities::SessionId;
use actix::Message;
use bytes::Bytes;
use std::fmt::Formatter;
#[derive(Debug, Clone)]
pub enum Frame {
Text(String),
Binary(Bytes),
Connect(SessionId),
Disconnect(String),
}
#[derive(Debug, Message, Clone)]
#[rtype(result = "()")]
pub struct Packet {
pub sid: SessionId,
pub frame: Frame,
}
impl Packet {
pub fn new(sid: SessionId, frame: Frame) -> Self { Packet { sid, frame } }
}
impl std::fmt::Display for Packet {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let content = match &self.frame {
Frame::Text(t) => format!("[Text]: {}", t),
Frame::Binary(_) => "[Binary message]".to_owned(),
Frame::Connect(_) => "Connect".to_owned(),
Frame::Disconnect(_) => "Disconnect".to_owned(),
};
let desc = format!("{}:{}", &self.sid, content);
f.write_str(&desc)
}
}

View File

@ -1,7 +0,0 @@
pub use entities::packet::*;
pub use ws_server::*;
pub use ws_session::*;
pub(crate) mod entities;
mod ws_server;
mod ws_session;

View File

@ -1,57 +0,0 @@
use crate::{
errors::ServerError,
ws::{
entities::{Connect, Disconnect, SessionId},
Packet,
WSSession,
},
};
use actix::{Actor, Context, Handler};
use dashmap::DashMap;
pub struct WSServer {
session_map: DashMap<SessionId, WSSession>,
}
impl WSServer {
pub fn new() -> Self {
Self {
session_map: DashMap::new(),
}
}
pub fn send(&self, _packet: Packet) { unimplemented!() }
}
impl Actor for WSServer {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {}
}
impl Handler<Connect> for WSServer {
type Result = Result<(), ServerError>;
fn handle(&mut self, _msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
unimplemented!()
}
}
impl Handler<Disconnect> for WSServer {
type Result = Result<(), ServerError>;
fn handle(&mut self, _msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
unimplemented!()
}
}
impl Handler<Packet> for WSServer {
type Result = ();
fn handle(&mut self, _packet: Packet, _ctx: &mut Context<Self>) -> Self::Result {
unimplemented!()
}
}
impl actix::Supervised for WSServer {
fn restarting(&mut self, _ctx: &mut Context<WSServer>) {
log::warn!("restarting");
}
}