2021-04-30 04:13:34 +00:00
# include <chrono>
# include <thread>
# include <QtConcurrent>
# include <QDateTime>
2021-06-13 10:58:15 +00:00
# include <obs-module.h>
# include <obs-frontend-api.h>
2021-05-03 20:31:22 +00:00
# include "obs-websocket.h"
2021-04-30 04:13:34 +00:00
# include "WebSocketServer.h"
# include "WebSocketProtocol.h"
# include "Config.h"
2021-05-14 08:13:27 +00:00
# include "eventhandler/types/EventSubscription.h"
2021-04-30 04:13:34 +00:00
# include "plugin-macros.generated.h"
WebSocketServer : : WebSocketServer ( ) :
QObject ( nullptr ) ,
_sessions ( )
{
_server . get_alog ( ) . clear_channels ( websocketpp : : log : : alevel : : all ) ;
_server . get_elog ( ) . clear_channels ( websocketpp : : log : : elevel : : all ) ;
_server . init_asio ( ) ;
# ifndef _WIN32
_server . set_reuse_addr ( true ) ;
# endif
_server . set_open_handler (
websocketpp : : lib : : bind (
& WebSocketServer : : onOpen , this , websocketpp : : lib : : placeholders : : _1
)
) ;
_server . set_close_handler (
websocketpp : : lib : : bind (
& WebSocketServer : : onClose , this , websocketpp : : lib : : placeholders : : _1
)
) ;
_server . set_message_handler (
websocketpp : : lib : : bind (
& WebSocketServer : : onMessage , this , websocketpp : : lib : : placeholders : : _1 , websocketpp : : lib : : placeholders : : _2
)
) ;
}
WebSocketServer : : ~ WebSocketServer ( )
{
if ( _server . is_listening ( ) )
Stop ( ) ;
}
void WebSocketServer : : ServerRunner ( )
{
blog ( LOG_INFO , " [WebSocketServer::ServerRunner] IO thread started. " ) ;
try {
_server . run ( ) ;
} catch ( websocketpp : : exception const & e ) {
blog ( LOG_ERROR , " [WebSocketServer::ServerRunner] websocketpp instance returned an error: %s " , e . what ( ) ) ;
} catch ( const std : : exception & e ) {
blog ( LOG_ERROR , " [WebSocketServer::ServerRunner] websocketpp instance returned an error: %s " , e . what ( ) ) ;
} catch ( . . . ) {
blog ( LOG_ERROR , " [WebSocketServer::ServerRunner] websocketpp instance returned an error " ) ;
}
blog ( LOG_INFO , " [WebSocketServer::ServerRunner] IO thread exited. " ) ;
}
void WebSocketServer : : Start ( )
{
if ( _server . is_listening ( ) ) {
blog ( LOG_WARNING , " [WebSocketServer::Start] Call to Start() but the server is already listening. " ) ;
return ;
}
auto conf = GetConfig ( ) ;
if ( ! conf ) {
blog ( LOG_ERROR , " [WebSocketServer::Start] Unable to retreive config! " ) ;
return ;
}
_serverPort = conf - > ServerPort ;
_serverPassword = conf - > ServerPassword ;
_debugEnabled = conf - > DebugEnabled ;
AuthenticationRequired = conf - > AuthRequired ;
AuthenticationSalt = Utils : : Crypto : : GenerateSalt ( ) ;
AuthenticationSecret = Utils : : Crypto : : GenerateSecret ( conf - > ServerPassword . toStdString ( ) , AuthenticationSalt ) ;
// Set log levels if debug is enabled
if ( _debugEnabled ) {
_server . get_alog ( ) . set_channels ( websocketpp : : log : : alevel : : all ) ;
_server . get_alog ( ) . clear_channels ( websocketpp : : log : : alevel : : frame_header | websocketpp : : log : : alevel : : frame_payload | websocketpp : : log : : alevel : : control ) ;
_server . get_elog ( ) . set_channels ( websocketpp : : log : : elevel : : all ) ;
_server . get_alog ( ) . clear_channels ( websocketpp : : log : : elevel : : devel | websocketpp : : log : : elevel : : library ) ;
} else {
_server . get_alog ( ) . clear_channels ( websocketpp : : log : : alevel : : all ) ;
_server . get_elog ( ) . clear_channels ( websocketpp : : log : : elevel : : all ) ;
}
_server . reset ( ) ;
websocketpp : : lib : : error_code errorCode ;
_server . listen ( websocketpp : : lib : : asio : : ip : : tcp : : v4 ( ) , _serverPort , errorCode ) ;
if ( errorCode ) {
std : : string errorCodeMessage = errorCode . message ( ) ;
blog ( LOG_INFO , " [WebSocketServer::Start] Listen failed: %s " , errorCodeMessage . c_str ( ) ) ;
return ;
}
_server . start_accept ( ) ;
_serverThread = std : : thread ( & WebSocketServer : : ServerRunner , this ) ;
blog ( LOG_INFO , " [WebSocketServer::Start] Server started successfully on port %d. Possible connect address: %s " , _serverPort , Utils : : Platform : : GetLocalAddress ( ) . c_str ( ) ) ;
}
void WebSocketServer : : Stop ( )
{
if ( ! _server . is_listening ( ) ) {
blog ( LOG_WARNING , " [WebSocketServer::Stop] Call to Stop() but the server is not listening. " ) ;
return ;
}
_server . stop_listening ( ) ;
std : : unique_lock < std : : mutex > lock ( _sessionMutex ) ;
for ( auto const & [ hdl , session ] : _sessions ) {
websocketpp : : lib : : error_code errorCode ;
_server . pause_reading ( hdl , errorCode ) ;
if ( errorCode ) {
blog ( LOG_INFO , " [WebSocketServer::Stop] Error: %s " , errorCode . message ( ) . c_str ( ) ) ;
continue ;
}
_server . close ( hdl , websocketpp : : close : : status : : going_away , " Server stopping. " , errorCode ) ;
if ( errorCode ) {
blog ( LOG_INFO , " [WebSocketServer::Stop] Error: %s " , errorCode . message ( ) . c_str ( ) ) ;
continue ;
}
}
lock . unlock ( ) ;
_threadPool . waitForDone ( ) ;
// This can delay the thread that it is running on. Bad but kinda required.
while ( _sessions . size ( ) > 0 ) {
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
}
_serverThread . join ( ) ;
blog ( LOG_INFO , " [WebSocketServer::Stop] Server stopped successfully " ) ;
}
void WebSocketServer : : InvalidateSession ( websocketpp : : connection_hdl hdl )
{
blog ( LOG_INFO , " [WebSocketServer::InvalidateSession] Invalidating a session. " ) ;
websocketpp : : lib : : error_code errorCode ;
_server . pause_reading ( hdl , errorCode ) ;
if ( errorCode ) {
blog ( LOG_INFO , " [WebSocketServer::InvalidateSession] Error: %s " , errorCode . message ( ) . c_str ( ) ) ;
return ;
}
_server . close ( hdl , WebSocketCloseCode : : SessionInvalidated , " Your session has been invalidated. " , errorCode ) ;
if ( errorCode ) {
blog ( LOG_INFO , " [WebSocketServer::InvalidateSession] Error: %s " , errorCode . message ( ) . c_str ( ) ) ;
return ;
}
}
std : : vector < WebSocketServer : : WebSocketSessionState > WebSocketServer : : GetWebSocketSessions ( )
{
std : : vector < WebSocketServer : : WebSocketSessionState > webSocketSessions ;
std : : unique_lock < std : : mutex > lock ( _sessionMutex ) ;
for ( auto & [ hdl , session ] : _sessions ) {
2021-04-30 15:45:34 +00:00
uint64_t connectedAt = session - > ConnectedAt ( ) ;
uint64_t incomingMessages = session - > IncomingMessages ( ) ;
uint64_t outgoingMessages = session - > OutgoingMessages ( ) ;
std : : string remoteAddress = session - > RemoteAddress ( ) ;
2021-04-30 18:24:09 +00:00
bool isIdentified = session - > IsIdentified ( ) ;
2021-04-30 04:13:34 +00:00
2021-04-30 18:24:09 +00:00
webSocketSessions . emplace_back ( WebSocketSessionState { hdl , remoteAddress , connectedAt , incomingMessages , outgoingMessages , isIdentified } ) ;
2021-04-30 04:13:34 +00:00
}
lock . unlock ( ) ;
return webSocketSessions ;
}
// It isn't consistent to directly call the WebSocketServer from the events system, but it would also be dumb to make it unnecessarily complicated.
2021-05-11 05:53:51 +00:00
void WebSocketServer : : BroadcastEvent ( uint64_t requiredIntent , std : : string eventType , json eventData , uint8_t rpcVersion )
2021-04-30 04:13:34 +00:00
{
2021-05-11 01:26:28 +00:00
if ( ! _server . is_listening ( ) )
return ;
2021-04-30 04:13:34 +00:00
QtConcurrent : : run ( & _threadPool , [ = ] ( ) {
// Populate message object
json eventMessage ;
eventMessage [ " messageType " ] = " Event " ;
eventMessage [ " eventType " ] = eventType ;
if ( eventData . is_object ( ) )
eventMessage [ " eventData " ] = eventData ;
// Initialize objects. The broadcast process only dumps the data when its needed.
std : : string messageJson ;
std : : string messageMsgPack ;
// Recurse connected sessions and send the event to suitable sessions.
std : : unique_lock < std : : mutex > lock ( _sessionMutex ) ;
for ( auto & it : _sessions ) {
2021-04-30 15:45:34 +00:00
if ( ! it . second - > IsIdentified ( ) ) {
2021-04-30 04:13:34 +00:00
continue ;
2021-04-30 05:11:24 +00:00
}
2021-05-11 05:53:51 +00:00
if ( rpcVersion & & it . second - > RpcVersion ( ) ! = rpcVersion ) {
continue ;
}
2021-04-30 15:45:34 +00:00
if ( ( it . second - > EventSubscriptions ( ) & requiredIntent ) ! = 0 ) {
2021-04-30 04:13:34 +00:00
websocketpp : : lib : : error_code errorCode ;
2021-04-30 15:45:34 +00:00
switch ( it . second - > Encoding ( ) ) {
2021-04-30 04:13:34 +00:00
case WebSocketEncoding : : Json :
if ( messageJson . empty ( ) ) {
messageJson = eventMessage . dump ( ) ;
}
_server . send ( ( websocketpp : : connection_hdl ) it . first , messageJson , websocketpp : : frame : : opcode : : text , errorCode ) ;
2021-04-30 15:45:34 +00:00
it . second - > IncrementOutgoingMessages ( ) ;
2021-04-30 04:13:34 +00:00
break ;
case WebSocketEncoding : : MsgPack :
if ( messageMsgPack . empty ( ) ) {
auto msgPackData = json : : to_msgpack ( eventMessage ) ;
messageMsgPack = std : : string ( msgPackData . begin ( ) , msgPackData . end ( ) ) ;
}
_server . send ( ( websocketpp : : connection_hdl ) it . first , messageMsgPack , websocketpp : : frame : : opcode : : binary , errorCode ) ;
2021-04-30 15:45:34 +00:00
it . second - > IncrementOutgoingMessages ( ) ;
2021-04-30 04:13:34 +00:00
break ;
}
2021-05-03 23:55:30 +00:00
if ( errorCode )
blog ( LOG_ERROR , " [WebSocketServer::BroadcastEvent] Error sending event message: %s " , errorCode . message ( ) . c_str ( ) ) ;
2021-04-30 04:13:34 +00:00
}
}
lock . unlock ( ) ;
2021-05-14 08:13:27 +00:00
if ( _debugEnabled & & ( EventSubscription : : All & requiredIntent ) ! = 0 ) // Don't log high volume events
2021-04-30 04:13:34 +00:00
blog ( LOG_INFO , " [WebSocketServer::BroadcastEvent] Outgoing event: \n %s " , eventMessage . dump ( 2 ) . c_str ( ) ) ;
} ) ;
}
void WebSocketServer : : onOpen ( websocketpp : : connection_hdl hdl )
{
auto conn = _server . get_con_from_hdl ( hdl ) ;
// Build new session
std : : unique_lock < std : : mutex > lock ( _sessionMutex ) ;
2021-04-30 15:45:34 +00:00
SessionPtr session = _sessions [ hdl ] = std : : make_shared < WebSocketSession > ( ) ;
2021-04-30 15:56:08 +00:00
std : : unique_lock < std : : mutex > sessionLock ( session - > OperationMutex ) ;
2021-04-30 04:13:34 +00:00
lock . unlock ( ) ;
// Configure session details
2021-04-30 15:45:34 +00:00
session - > SetRemoteAddress ( conn - > get_remote_endpoint ( ) ) ;
session - > SetConnectedAt ( QDateTime : : currentSecsSinceEpoch ( ) ) ;
2021-05-03 20:31:22 +00:00
session - > SetAuthenticationRequired ( AuthenticationRequired ) ;
2021-04-30 04:13:34 +00:00
std : : string contentType = conn - > get_request_header ( " Content-Type " ) ;
if ( contentType = = " " ) {
;
} else if ( contentType = = " application/json " ) {
2021-04-30 15:45:34 +00:00
session - > SetEncoding ( WebSocketEncoding : : Json ) ;
2021-04-30 04:13:34 +00:00
} else if ( contentType = = " application/msgpack " ) {
2021-04-30 15:45:34 +00:00
session - > SetEncoding ( WebSocketEncoding : : MsgPack ) ;
2021-04-30 04:13:34 +00:00
} else {
conn - > close ( WebSocketCloseCode : : InvalidContentType , " Your HTTP `Content-Type` header specifies an invalid encoding type. " ) ;
return ;
}
// Build `Hello`
json helloMessage ;
helloMessage [ " messageType " ] = " Hello " ;
helloMessage [ " obsWebSocketVersion " ] = OBS_WEBSOCKET_VERSION ;
helloMessage [ " rpcVersion " ] = OBS_WEBSOCKET_RPC_VERSION ;
if ( AuthenticationRequired ) {
2021-05-03 20:31:22 +00:00
session - > SetSecret ( AuthenticationSecret ) ;
2021-04-30 04:13:34 +00:00
std : : string sessionChallenge = Utils : : Crypto : : GenerateSalt ( ) ;
2021-04-30 15:45:34 +00:00
session - > SetChallenge ( sessionChallenge ) ;
2021-04-30 04:13:34 +00:00
helloMessage [ " authentication " ] = { } ;
helloMessage [ " authentication " ] [ " challenge " ] = sessionChallenge ;
helloMessage [ " authentication " ] [ " salt " ] = AuthenticationSalt ;
}
2021-04-30 15:56:08 +00:00
sessionLock . unlock ( ) ;
2021-06-13 10:58:15 +00:00
// Build SessionState object for signal
WebSocketSessionState state ;
state . remoteAddress = session - > RemoteAddress ( ) ;
state . connectedAt = session - > ConnectedAt ( ) ;
state . incomingMessages = session - > IncomingMessages ( ) ;
state . outgoingMessages = session - > OutgoingMessages ( ) ;
state . isIdentified = session - > IsIdentified ( ) ;
// Emit signals
emit ClientConnected ( state ) ;
2021-04-30 04:13:34 +00:00
// Send object to client
websocketpp : : lib : : error_code errorCode ;
2021-04-30 15:45:34 +00:00
auto sessionEncoding = session - > Encoding ( ) ;
2021-04-30 04:13:34 +00:00
if ( sessionEncoding = = WebSocketEncoding : : Json ) {
std : : string helloMessageJson = helloMessage . dump ( ) ;
_server . send ( hdl , helloMessageJson , websocketpp : : frame : : opcode : : text , errorCode ) ;
} else if ( sessionEncoding = = WebSocketEncoding : : MsgPack ) {
auto msgPackData = json : : to_msgpack ( helloMessage ) ;
std : : string messageMsgPack ( msgPackData . begin ( ) , msgPackData . end ( ) ) ;
_server . send ( hdl , messageMsgPack , websocketpp : : frame : : opcode : : binary , errorCode ) ;
}
2021-04-30 15:45:34 +00:00
session - > IncrementOutgoingMessages ( ) ;
2021-04-30 04:13:34 +00:00
}
void WebSocketServer : : onClose ( websocketpp : : connection_hdl hdl )
{
auto conn = _server . get_con_from_hdl ( hdl ) ;
// Get info from the session and then delete it
std : : unique_lock < std : : mutex > lock ( _sessionMutex ) ;
2021-04-30 15:45:34 +00:00
SessionPtr session = _sessions [ hdl ] ;
bool isIdentified = session - > IsIdentified ( ) ;
uint64_t connectedAt = session - > ConnectedAt ( ) ;
uint64_t incomingMessages = session - > IncomingMessages ( ) ;
uint64_t outgoingMessages = session - > OutgoingMessages ( ) ;
std : : string remoteAddress = session - > RemoteAddress ( ) ;
2021-04-30 04:13:34 +00:00
_sessions . erase ( hdl ) ;
lock . unlock ( ) ;
// Build SessionState object for signal
WebSocketSessionState state ;
state . remoteAddress = remoteAddress ;
state . connectedAt = connectedAt ;
state . incomingMessages = incomingMessages ;
state . outgoingMessages = outgoingMessages ;
2021-04-30 18:24:09 +00:00
state . isIdentified = isIdentified ;
2021-04-30 04:13:34 +00:00
// Emit signals
emit ClientDisconnected ( state , conn - > get_local_close_code ( ) ) ;
2021-06-13 10:58:15 +00:00
// Get config for tray notification
auto conf = GetConfig ( ) ;
if ( ! conf ) {
blog ( LOG_ERROR , " [WebSocketServer::onClose] Unable to retreive config! " ) ;
return ;
}
// If previously identified, not going away, and notifications enabled, send a tray notification
if ( isIdentified & & ( conn - > get_local_close_code ( ) ! = websocketpp : : close : : status : : going_away ) & & conf - > AlertsEnabled ) {
obs_frontend_push_ui_translation ( obs_module_get_string ) ;
QString title = QObject : : tr ( " OBSWebSocket.TrayNotification.Disconnected.Title " ) ;
QString body = QObject : : tr ( " OBSWebSocket.TrayNotification.Disconnected.Body " ) ;
obs_frontend_pop_ui_translation ( ) ;
Utils : : Platform : : SendTrayNotification ( QSystemTrayIcon : : Information , title , body ) ;
}
2021-04-30 04:13:34 +00:00
}
void WebSocketServer : : onMessage ( websocketpp : : connection_hdl hdl , websocketpp : : server < websocketpp : : config : : asio > : : message_ptr message )
{
auto opcode = message - > get_opcode ( ) ;
std : : string payload = message - > get_payload ( ) ;
QtConcurrent : : run ( & _threadPool , [ = ] ( ) {
std : : unique_lock < std : : mutex > lock ( _sessionMutex ) ;
2021-04-30 15:45:34 +00:00
SessionPtr session ;
try {
session = _sessions . at ( hdl ) ;
} catch ( const std : : out_of_range & oor ) {
2021-04-30 05:11:24 +00:00
return ;
2021-04-30 15:45:34 +00:00
}
2021-04-30 04:13:34 +00:00
lock . unlock ( ) ;
2021-04-30 15:45:34 +00:00
session - > IncrementIncomingMessages ( ) ;
2021-04-30 04:13:34 +00:00
json incomingMessage ;
// Check for invalid opcode and decode
websocketpp : : lib : : error_code errorCode ;
2021-04-30 15:45:34 +00:00
uint8_t sessionEncoding = session - > Encoding ( ) ;
2021-04-30 04:13:34 +00:00
if ( sessionEncoding = = WebSocketEncoding : : Json ) {
if ( opcode ! = websocketpp : : frame : : opcode : : text ) {
2021-04-30 15:45:34 +00:00
if ( ! session - > IgnoreInvalidMessages ( ) )
2021-04-30 04:13:34 +00:00
_server . close ( hdl , WebSocketCloseCode : : MessageDecodeError , " Your session encoding is set to Json, but a binary message was received. " , errorCode ) ;
return ;
}
try {
incomingMessage = json : : parse ( payload ) ;
} catch ( json : : parse_error & e ) {
2021-04-30 15:45:34 +00:00
if ( ! session - > IgnoreInvalidMessages ( ) )
2021-04-30 04:13:34 +00:00
_server . close ( hdl , WebSocketCloseCode : : MessageDecodeError , std : : string ( " Unable to decode Json: " ) + e . what ( ) , errorCode ) ;
return ;
}
} else if ( sessionEncoding = = WebSocketEncoding : : MsgPack ) {
if ( opcode ! = websocketpp : : frame : : opcode : : binary ) {
2021-04-30 15:45:34 +00:00
if ( ! session - > IgnoreInvalidMessages ( ) )
2021-04-30 04:13:34 +00:00
_server . close ( hdl , WebSocketCloseCode : : MessageDecodeError , " Your session encoding is set to MsgPack, but a text message was received. " , errorCode ) ;
return ;
}
try {
incomingMessage = json : : from_msgpack ( payload ) ;
} catch ( json : : parse_error & e ) {
2021-04-30 15:45:34 +00:00
if ( ! session - > IgnoreInvalidMessages ( ) )
2021-04-30 04:13:34 +00:00
_server . close ( hdl , WebSocketCloseCode : : MessageDecodeError , std : : string ( " Unable to decode MsgPack: " ) + e . what ( ) , errorCode ) ;
return ;
}
}
if ( _debugEnabled )
blog ( LOG_INFO , " [WebSocketServer::onMessage] Incoming message (decoded): \n %s " , incomingMessage . dump ( 2 ) . c_str ( ) ) ;
2021-04-30 15:45:34 +00:00
WebSocketProtocol : : ProcessResult ret = WebSocketProtocol : : ProcessMessage ( session , incomingMessage ) ;
2021-04-30 04:13:34 +00:00
if ( ret . closeCode ! = WebSocketCloseCode : : DontClose ) {
websocketpp : : lib : : error_code errorCode ;
_server . close ( hdl , ret . closeCode , ret . closeReason , errorCode ) ;
return ;
}
if ( ! ret . result . is_null ( ) ) {
websocketpp : : lib : : error_code errorCode ;
if ( sessionEncoding = = WebSocketEncoding : : Json ) {
std : : string helloMessageJson = ret . result . dump ( ) ;
_server . send ( hdl , helloMessageJson , websocketpp : : frame : : opcode : : text , errorCode ) ;
} else if ( sessionEncoding = = WebSocketEncoding : : MsgPack ) {
auto msgPackData = json : : to_msgpack ( ret . result ) ;
std : : string messageMsgPack ( msgPackData . begin ( ) , msgPackData . end ( ) ) ;
_server . send ( hdl , messageMsgPack , websocketpp : : frame : : opcode : : binary , errorCode ) ;
}
2021-04-30 15:45:34 +00:00
session - > IncrementOutgoingMessages ( ) ;
2021-04-30 04:13:34 +00:00
if ( _debugEnabled )
blog ( LOG_INFO , " [WebSocketServer::onMessage] Outgoing message: \n %s " , ret . result . dump ( 2 ) . c_str ( ) ) ;
if ( errorCode )
blog ( LOG_WARNING , " [WebSocketServer::onMessage] Sending message to client failed: %s " , errorCode . message ( ) . c_str ( ) ) ;
}
} ) ;
}