From 7e1e1bc33cb975cfc61d4f7f9e1ccaa61787846b Mon Sep 17 00:00:00 2001 From: tt2468 Date: Sat, 4 Sep 2021 10:04:00 -0700 Subject: [PATCH] Base: Large plugin refactor - Merge WebSocketProtocol into WebSocketServer - Having them separated was not doing anything productive - Request: Move SessionPtr to RequestHandler - Less copying to do for batch requests - Fully modularize EventHandler - Make BroadcastEvent a stored callback that WebSocketServer sets - Return early on high volume events to avoid unnecessary compute - These events will only generate a json object when it is actually needed --- CMakeLists.txt | 3 +- src/WebSocketProtocol.h | 23 - src/WebSocketServer.cpp | 84 +-- src/WebSocketServer.h | 25 +- ...tocol.cpp => WebSocketServer_Protocol.cpp} | 569 ++++++++++-------- src/eventhandler/EventHandler.cpp | 49 +- src/eventhandler/EventHandler.h | 19 +- src/eventhandler/EventHandler_Config.cpp | 8 +- src/eventhandler/EventHandler_General.cpp | 4 +- src/eventhandler/EventHandler_Inputs.cpp | 26 +- src/eventhandler/EventHandler_MediaInputs.cpp | 6 +- src/eventhandler/EventHandler_Outputs.cpp | 10 +- src/eventhandler/EventHandler_SceneItems.cpp | 15 +- src/eventhandler/EventHandler_Scenes.cpp | 12 +- src/eventhandler/EventHandler_Transitions.cpp | 6 +- src/eventhandler/types/EventSubscription.h | 2 + src/obs-websocket.cpp | 5 +- src/requesthandler/RequestHandler.cpp | 5 + src/requesthandler/RequestHandler.h | 5 + src/requesthandler/RequestHandler_General.cpp | 4 +- src/requesthandler/rpc/Request.cpp | 5 +- src/requesthandler/rpc/Request.h | 8 +- 22 files changed, 486 insertions(+), 407 deletions(-) delete mode 100644 src/WebSocketProtocol.h rename src/{WebSocketProtocol.cpp => WebSocketServer_Protocol.cpp} (55%) diff --git a/CMakeLists.txt b/CMakeLists.txt index e0671802..a68b491a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,7 +86,7 @@ set(obs-websocket_SOURCES src/obs-websocket.cpp src/Config.cpp src/WebSocketServer.cpp - src/WebSocketProtocol.cpp + src/WebSocketServer_Protocol.cpp src/WebSocketSession.cpp src/eventhandler/EventHandler.cpp src/eventhandler/EventHandler_General.cpp @@ -121,7 +121,6 @@ set(obs-websocket_HEADERS src/obs-websocket.h src/Config.h src/WebSocketServer.h - src/WebSocketProtocol.h src/WebSocketSession.h src/eventhandler/EventHandler.h src/eventhandler/types/EventSubscription.h diff --git a/src/WebSocketProtocol.h b/src/WebSocketProtocol.h deleted file mode 100644 index 7905803b..00000000 --- a/src/WebSocketProtocol.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include -#include - -#include "WebSocketServer.h" - -class WebSocketSession; -typedef std::shared_ptr SessionPtr; - -namespace WebSocketProtocol { - const std::vector SupportedRpcVersions{ - 1 - }; - - struct ProcessResult { - WebSocketServer::WebSocketCloseCode closeCode = WebSocketServer::WebSocketCloseCode::DontClose; - std::string closeReason; - json result; - }; - - void ProcessMessage(SessionPtr session, ProcessResult &ret, uint8_t opCode, json incomingMessage); -} diff --git a/src/WebSocketServer.cpp b/src/WebSocketServer.cpp index 0cb70c07..397a1cba 100644 --- a/src/WebSocketServer.cpp +++ b/src/WebSocketServer.cpp @@ -6,8 +6,7 @@ #include #include "WebSocketServer.h" -#include "WebSocketProtocol.h" -#include "eventhandler/types/EventSubscription.h" +#include "eventhandler/EventHandler.h" #include "obs-websocket.h" #include "Config.h" #include "utils/Crypto.h" @@ -46,6 +45,13 @@ WebSocketServer::WebSocketServer() : &WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2 ) ); + + auto eventHandler = GetEventHandler(); + eventHandler->SetBroadcastCallback( + std::bind( + &WebSocketServer::BroadcastEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4 + ) + ); } WebSocketServer::~WebSocketServer() @@ -194,61 +200,6 @@ std::vector WebSocketServer::GetWebSocke } // It isn't consistent to directly call the WebSocketServer from the events system, but it would also be dumb to make it unnecessarily complicated. -void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion) -{ - if (!_server.is_listening()) - return; - - QtConcurrent::run(&_threadPool, [=]() { - // Populate message object - json eventMessage; - eventMessage["op"] = 5; - eventMessage["d"]["eventType"] = eventType; - eventMessage["d"]["eventIntent"] = requiredIntent; - if (eventData.is_object()) - eventMessage["d"]["eventData"] = eventData; - - // Initialize objects. The broadcast process only dumps the data when its needed. - std::string messageJson; - std::string messageMsgPack; - - // Recurse connected sessions and send the event to suitable sessions. - std::unique_lock lock(_sessionMutex); - for (auto & it : _sessions) { - if (!it.second->IsIdentified()) { - continue; - } - if (rpcVersion && it.second->RpcVersion() != rpcVersion) { - continue; - } - if ((it.second->EventSubscriptions() & requiredIntent) != 0) { - websocketpp::lib::error_code errorCode; - switch (it.second->Encoding()) { - case WebSocketEncoding::Json: - if (messageJson.empty()) { - messageJson = eventMessage.dump(); - } - _server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text, errorCode); - it.second->IncrementOutgoingMessages(); - break; - case WebSocketEncoding::MsgPack: - if (messageMsgPack.empty()) { - auto msgPackData = json::to_msgpack(eventMessage); - messageMsgPack = std::string(msgPackData.begin(), msgPackData.end()); - } - _server.send((websocketpp::connection_hdl)it.first, messageMsgPack, websocketpp::frame::opcode::binary, errorCode); - it.second->IncrementOutgoingMessages(); - break; - } - if (errorCode) - blog(LOG_ERROR, "[WebSocketServer::BroadcastEvent] Error sending event message: %s", errorCode.message().c_str()); - } - } - lock.unlock(); - if (_debugEnabled && (EventSubscription::All & requiredIntent) != 0) // Don't log high volume events - blog(LOG_INFO, "[WebSocketServer::BroadcastEvent] Outgoing event:\n%s", eventMessage.dump(2).c_str()); - }); -} bool WebSocketServer::onValidate(websocketpp::connection_hdl hdl) { @@ -340,6 +291,7 @@ void WebSocketServer::onClose(websocketpp::connection_hdl hdl) // Get info from the session and then delete it std::unique_lock lock(_sessionMutex); SessionPtr session = _sessions[hdl]; + uint64_t eventSubscriptions = session->EventSubscriptions(); bool isIdentified = session->IsIdentified(); uint64_t connectedAt = session->ConnectedAt(); uint64_t incomingMessages = session->IncomingMessages(); @@ -348,6 +300,12 @@ void WebSocketServer::onClose(websocketpp::connection_hdl hdl) _sessions.erase(hdl); lock.unlock(); + // If client was identified, decrement appropriate refs in eventhandler. + if (isIdentified) { + auto eventHandler = GetEventHandler(); + eventHandler->ProcessUnsubscription(eventSubscriptions); + } + // Build SessionState object for signal WebSocketSessionState state; state.remoteAddress = remoteAddress; @@ -431,12 +389,12 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se if (_debugEnabled) blog(LOG_INFO, "[WebSocketServer::onMessage] Incoming message (decoded):\n%s", incomingMessage.dump(2).c_str()); - WebSocketProtocol::ProcessResult ret; + ProcessResult ret; // Verify incoming message is an object if (!incomingMessage.is_object()) { if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::MessageDecodeError; + ret.closeCode = WebSocketCloseCode::MessageDecodeError; ret.closeReason = "You sent a non-object payload."; goto skipProcessing; } @@ -445,8 +403,8 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se // Disconnect client if 4.x protocol is detected if (!session->IsIdentified() && incomingMessage.contains("request-type")) { - blog(LOG_WARNING, "[WebSocketProtocol::ProcessMessage] Client %s appears to be running a pre-5.0.0 protocol.", session->RemoteAddress().c_str()); - ret.closeCode = WebSocketServer::WebSocketCloseCode::UnsupportedRpcVersion; + blog(LOG_WARNING, "[WebSocketServer::onMessage] Client %s appears to be running a pre-5.0.0 protocol.", session->RemoteAddress().c_str()); + ret.closeCode = WebSocketCloseCode::UnsupportedRpcVersion; ret.closeReason = "You appear to be attempting to connect with the pre-5.0.0 plugin protocol. Check to make sure your client is updated."; goto skipProcessing; } @@ -454,14 +412,14 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se // Validate op code if (!incomingMessage.contains("op")) { if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::UnknownOpCode; + ret.closeCode = WebSocketCloseCode::UnknownOpCode; ret.closeReason = "Your request is missing an `op`."; goto skipProcessing; } return; } - WebSocketProtocol::ProcessMessage(session, ret, incomingMessage["op"], incomingMessage["d"]); + ProcessMessage(session, ret, incomingMessage["op"], incomingMessage["d"]); skipProcessing: if (ret.closeCode != WebSocketCloseCode::DontClose) { diff --git a/src/WebSocketServer.h b/src/WebSocketServer.h index 2d24bdc0..af06ee9a 100644 --- a/src/WebSocketServer.h +++ b/src/WebSocketServer.h @@ -29,18 +29,6 @@ class WebSocketServer : QObject bool isIdentified; }; - enum WebSocketOpCode { - Hello = 0, - Identify = 1, - Identified = 2, - Reidentify = 3, - Event = 5, - Request = 6, - RequestResponse = 7, - RequestBatch = 8, - RequestBatchResponse = 9, - }; - enum WebSocketCloseCode { // Internal only DontClose = 0, @@ -72,6 +60,7 @@ class WebSocketServer : QObject void Start(); void Stop(); void InvalidateSession(websocketpp::connection_hdl hdl); + void BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData = nullptr, uint8_t rpcVersion = 0); bool IsListening() { return _server.is_listening(); @@ -87,14 +76,17 @@ class WebSocketServer : QObject std::string AuthenticationSecret; std::string AuthenticationSalt; - public Q_SLOTS: - void BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData = nullptr, uint8_t rpcVersion = 0); - signals: void ClientConnected(const WebSocketSessionState state); void ClientDisconnected(const WebSocketSessionState state, const uint16_t closeCode); private: + struct ProcessResult { + WebSocketCloseCode closeCode = WebSocketCloseCode::DontClose; + std::string closeReason; + json result; + }; + void ServerRunner(); bool onValidate(websocketpp::connection_hdl hdl); @@ -102,6 +94,9 @@ class WebSocketServer : QObject void onClose(websocketpp::connection_hdl hdl); void onMessage(websocketpp::connection_hdl hdl, websocketpp::server::message_ptr message); + void SetSessionParameters(SessionPtr session, WebSocketServer::ProcessResult &ret, json payloadData); + void ProcessMessage(SessionPtr session, ProcessResult &ret, const uint8_t opCode, json incomingMessage); + std::thread _serverThread; websocketpp::server _server; QThreadPool _threadPool; diff --git a/src/WebSocketProtocol.cpp b/src/WebSocketServer_Protocol.cpp similarity index 55% rename from src/WebSocketProtocol.cpp rename to src/WebSocketServer_Protocol.cpp index 94600e44..fd90fa8d 100644 --- a/src/WebSocketProtocol.cpp +++ b/src/WebSocketServer_Protocol.cpp @@ -1,245 +1,324 @@ -#include - -#include "WebSocketProtocol.h" -#include "WebSocketSession.h" -#include "requesthandler/RequestHandler.h" -#include "requesthandler/rpc/RequestStatus.h" -#include "obs-websocket.h" -#include "Config.h" -#include "plugin-macros.generated.h" -#include "utils/Crypto.h" -#include "utils/Json.h" -#include "utils/Platform.h" - -bool IsSupportedRpcVersion(uint8_t requestedVersion) -{ - for (auto version : WebSocketProtocol::SupportedRpcVersions) { - if (requestedVersion == version) - return true; - } - return false; -} - -void SetSessionParameters(SessionPtr session, WebSocketProtocol::ProcessResult &ret, json payloadData) -{ - if (payloadData.contains("ignoreInvalidMessages")) { - if (!payloadData["ignoreInvalidMessages"].is_boolean()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType; - ret.closeReason = "Your `ignoreInvalidMessages` is not a boolean."; - return; - } - session->SetIgnoreInvalidMessages(payloadData["ignoreInvalidMessages"]); - } - - if (payloadData.contains("ignoreNonFatalRequestChecks")) { - if (!payloadData["ignoreNonFatalRequestChecks"].is_boolean()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType; - ret.closeReason = "Your `ignoreNonFatalRequestChecks` is not a boolean."; - return; - } - session->SetIgnoreNonFatalRequestChecks(payloadData["ignoreNonFatalRequestChecks"]); - } - - if (payloadData.contains("eventSubscriptions")) { - if (!payloadData["eventSubscriptions"].is_number_unsigned()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType; - ret.closeReason = "Your `eventSubscriptions` is not an unsigned number."; - return; - } - session->SetEventSubscriptions(payloadData["eventSubscriptions"]); - } -} - -void WebSocketProtocol::ProcessMessage(SessionPtr session, WebSocketProtocol::ProcessResult &ret, uint8_t opCode, json payloadData) -{ - if (!payloadData.is_object()) { - if (payloadData.is_null()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey; - ret.closeReason = "Your payload is missing data (`d`)."; - } else { - ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType; - ret.closeReason = "Your payload's data (`d`) is not an object."; - } - return; - } - - // Only `Identify` is allowed when not identified - if (!session->IsIdentified() && opCode != 1) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::NotIdentified; - ret.closeReason = "You attempted to send a non-Identify message while not identified."; - return; - } - - switch (opCode) { - case 1: { // Identify - std::unique_lock sessionLock(session->OperationMutex); - if (session->IsIdentified()) { - if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::AlreadyIdentified; - ret.closeReason = "You are already Identified with the obs-websocket server."; - } - return; - } - - if (session->AuthenticationRequired()) { - if (!payloadData.contains("authentication")) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::AuthenticationFailed; - ret.closeReason = "Your payload's data is missing an `authentication` string, however authentication is required."; - return; - } - if (!Utils::Crypto::CheckAuthenticationString(session->Secret(), session->Challenge(), payloadData["authentication"])) { - auto conf = GetConfig(); - if (conf && conf->AlertsEnabled) { - QString title = obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Title"); - QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Body")).arg(QString::fromStdString(session->RemoteAddress())); - Utils::Platform::SendTrayNotification(QSystemTrayIcon::Warning, title, body); - } - ret.closeCode = WebSocketServer::WebSocketCloseCode::AuthenticationFailed; - ret.closeReason = "Authentication failed."; - return; - } - } - - if (!payloadData.contains("rpcVersion")) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey; - ret.closeReason = "Your payload's data is missing an `rpcVersion`."; - return; - } - - if (!payloadData["rpcVersion"].is_number_unsigned()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType; - ret.closeReason = "Your `rpcVersion` is not an unsigned number."; - } - - uint8_t requestedRpcVersion = payloadData["rpcVersion"]; - if (!IsSupportedRpcVersion(requestedRpcVersion)) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::UnsupportedRpcVersion; - ret.closeReason = "Your requested RPC version is not supported by this server."; - return; - } - session->SetRpcVersion(requestedRpcVersion); - - SetSessionParameters(session, ret, payloadData); - if (ret.closeCode != WebSocketServer::WebSocketCloseCode::DontClose) { - return; - } - - session->SetIsIdentified(true); - - auto conf = GetConfig(); - if (conf && conf->AlertsEnabled) { - QString title = obs_module_text("OBSWebSocket.TrayNotification.Identified.Title"); - QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.Identified.Body")).arg(QString::fromStdString(session->RemoteAddress())); - Utils::Platform::SendTrayNotification(QSystemTrayIcon::Information, title, body); - } - - ret.result["op"] = 2; - ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion(); - } return; - case 3: { // Reidentify - std::unique_lock sessionLock(session->OperationMutex); - - SetSessionParameters(session, ret, payloadData); - if (ret.closeCode != WebSocketServer::WebSocketCloseCode::DontClose) { - return; - } - - ret.result["op"] = 2; - ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion(); - } return; - case 6: { // Request - // RequestID checking has to be done here where we are able to close the connection. - if (!payloadData.contains("requestId")) { - if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey; - ret.closeReason = "Your payload data is missing a `requestId`."; - } - return; - } - - RequestHandler requestHandler; - Request request(session, payloadData["requestType"], payloadData["requestData"]); - - RequestResult requestResult = requestHandler.ProcessRequest(request); - - json resultPayloadData; - resultPayloadData["requestType"] = payloadData["requestType"]; - resultPayloadData["requestId"] = payloadData["requestId"]; - resultPayloadData["requestStatus"] = { - {"result", requestResult.StatusCode == RequestStatus::Success}, - {"code", requestResult.StatusCode} - }; - if (!requestResult.Comment.empty()) - resultPayloadData["requestStatus"]["comment"] = requestResult.Comment; - if (requestResult.ResponseData.is_object()) - resultPayloadData["responseData"] = requestResult.ResponseData; - ret.result["op"] = 7; - ret.result["d"] = resultPayloadData; - } return; - case 8: { // RequestBatch - // RequestID checking has to be done here where we are able to close the connection. - if (!payloadData.contains("requestId")) { - if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey; - ret.closeReason = "Your payload data is missing a `requestId`."; - } - return; - } - - if (!payloadData.contains("requests")) { - if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey; - ret.closeReason = "Your payload data is missing a `requests`."; - } - return; - } - - if (!payloadData["requests"].is_array()) { - if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType; - ret.closeReason = "Your `requests` is not an array."; - } - return; - } - - std::vector requests = payloadData["requests"]; - json results = json::array(); - - RequestHandler requestHandler; - for (auto requestJson : requests) { - Request request(session, requestJson["requestType"], requestJson["requestData"]); - - RequestResult requestResult = requestHandler.ProcessRequest(request); - - json result; - result["requestType"] = requestJson["requestType"]; - - if (requestJson.contains("requestId")) - result["requestId"] = requestJson["requestId"]; - - result["requestStatus"] = { - {"result", requestResult.StatusCode == RequestStatus::Success}, - {"code", requestResult.StatusCode} - }; - - if (!requestResult.Comment.empty()) - result["requestStatus"]["comment"] = requestResult.Comment; - - if (requestResult.ResponseData.is_object()) - result["responseData"] = requestResult.ResponseData; - - results.push_back(result); - } - - ret.result["op"] = 9; - ret.result["d"]["requestId"] = payloadData["requestId"]; - ret.result["d"]["results"] = results; - } return; - default: - if (!session->IgnoreInvalidMessages()) { - ret.closeCode = WebSocketServer::WebSocketCloseCode::UnknownOpCode; - ret.closeReason = std::string("Unknown OpCode: %s") + std::to_string(opCode); - } - return; - } -} +#include +#include + +#include "WebSocketServer.h" +#include "requesthandler/RequestHandler.h" +#include "eventhandler/EventHandler.h" +#include "obs-websocket.h" +#include "Config.h" +#include "plugin-macros.generated.h" +#include "utils/Crypto.h" +#include "utils/Json.h" +#include "utils/Platform.h" + +namespace WebSocketOpCode { + enum WebSocketOpCode: uint8_t { + Hello = 0, + Identify = 1, + Identified = 2, + Reidentify = 3, + Event = 5, + Request = 6, + RequestResponse = 7, + RequestBatch = 8, + RequestBatchResponse = 9, + }; +}; + +bool IsSupportedRpcVersion(uint8_t requestedVersion) +{ + return (requestedVersion == 1); +} + +void WebSocketServer::SetSessionParameters(SessionPtr session, ProcessResult &ret, json payloadData) +{ + if (payloadData.contains("ignoreInvalidMessages")) { + if (!payloadData["ignoreInvalidMessages"].is_boolean()) { + ret.closeCode = WebSocketCloseCode::InvalidDataKeyType; + ret.closeReason = "Your `ignoreInvalidMessages` is not a boolean."; + return; + } + session->SetIgnoreInvalidMessages(payloadData["ignoreInvalidMessages"]); + } + + if (payloadData.contains("ignoreNonFatalRequestChecks")) { + if (!payloadData["ignoreNonFatalRequestChecks"].is_boolean()) { + ret.closeCode = WebSocketCloseCode::InvalidDataKeyType; + ret.closeReason = "Your `ignoreNonFatalRequestChecks` is not a boolean."; + return; + } + session->SetIgnoreNonFatalRequestChecks(payloadData["ignoreNonFatalRequestChecks"]); + } + + if (payloadData.contains("eventSubscriptions")) { + if (!payloadData["eventSubscriptions"].is_number_unsigned()) { + ret.closeCode = WebSocketCloseCode::InvalidDataKeyType; + ret.closeReason = "Your `eventSubscriptions` is not an unsigned number."; + return; + } + session->SetEventSubscriptions(payloadData["eventSubscriptions"]); + } +} + +void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::ProcessResult &ret, uint8_t opCode, json payloadData) +{ + if (!payloadData.is_object()) { + if (payloadData.is_null()) { + ret.closeCode = WebSocketCloseCode::MissingDataKey; + ret.closeReason = "Your payload is missing data (`d`)."; + } else { + ret.closeCode = WebSocketCloseCode::InvalidDataKeyType; + ret.closeReason = "Your payload's data (`d`) is not an object."; + } + return; + } + + // Only `Identify` is allowed when not identified + if (!session->IsIdentified() && opCode != 1) { + ret.closeCode = WebSocketCloseCode::NotIdentified; + ret.closeReason = "You attempted to send a non-Identify message while not identified."; + return; + } + + switch (opCode) { + case WebSocketOpCode::Identify: { // Identify + std::unique_lock sessionLock(session->OperationMutex); + if (session->IsIdentified()) { + if (!session->IgnoreInvalidMessages()) { + ret.closeCode = WebSocketCloseCode::AlreadyIdentified; + ret.closeReason = "You are already Identified with the obs-websocket server."; + } + return; + } + + if (session->AuthenticationRequired()) { + if (!payloadData.contains("authentication")) { + ret.closeCode = WebSocketCloseCode::AuthenticationFailed; + ret.closeReason = "Your payload's data is missing an `authentication` string, however authentication is required."; + return; + } + if (!Utils::Crypto::CheckAuthenticationString(session->Secret(), session->Challenge(), payloadData["authentication"])) { + auto conf = GetConfig(); + if (conf && conf->AlertsEnabled) { + QString title = obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Title"); + QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Body")).arg(QString::fromStdString(session->RemoteAddress())); + Utils::Platform::SendTrayNotification(QSystemTrayIcon::Warning, title, body); + } + ret.closeCode = WebSocketCloseCode::AuthenticationFailed; + ret.closeReason = "Authentication failed."; + return; + } + } + + if (!payloadData.contains("rpcVersion")) { + ret.closeCode = WebSocketCloseCode::MissingDataKey; + ret.closeReason = "Your payload's data is missing an `rpcVersion`."; + return; + } + + if (!payloadData["rpcVersion"].is_number_unsigned()) { + ret.closeCode = WebSocketCloseCode::InvalidDataKeyType; + ret.closeReason = "Your `rpcVersion` is not an unsigned number."; + } + + uint8_t requestedRpcVersion = payloadData["rpcVersion"]; + if (!IsSupportedRpcVersion(requestedRpcVersion)) { + ret.closeCode = WebSocketCloseCode::UnsupportedRpcVersion; + ret.closeReason = "Your requested RPC version is not supported by this server."; + return; + } + session->SetRpcVersion(requestedRpcVersion); + + SetSessionParameters(session, ret, payloadData); + if (ret.closeCode != WebSocketCloseCode::DontClose) { + return; + } + + // Increment refs for event subscriptions + auto eventHandler = GetEventHandler(); + eventHandler->ProcessSubscription(session->EventSubscriptions()); + + // Mark session as identified + session->SetIsIdentified(true); + + // Send desktop notification. TODO: Move to UI code + auto conf = GetConfig(); + if (conf && conf->AlertsEnabled) { + QString title = obs_module_text("OBSWebSocket.TrayNotification.Identified.Title"); + QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.Identified.Body")).arg(QString::fromStdString(session->RemoteAddress())); + Utils::Platform::SendTrayNotification(QSystemTrayIcon::Information, title, body); + } + + ret.result["op"] = WebSocketOpCode::Identified; + ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion(); + } return; + case WebSocketOpCode::Reidentify: { // Reidentify + std::unique_lock sessionLock(session->OperationMutex); + + // Decrement refs for current subscriptions + auto eventHandler = GetEventHandler(); + eventHandler->ProcessUnsubscription(session->EventSubscriptions()); + + SetSessionParameters(session, ret, payloadData); + if (ret.closeCode != WebSocketCloseCode::DontClose) { + return; + } + + // Increment refs for new subscriptions + eventHandler->ProcessSubscription(session->EventSubscriptions()); + + ret.result["op"] = WebSocketOpCode::Identified; + ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion(); + } return; + case WebSocketOpCode::Request: { // Request + // RequestID checking has to be done here where we are able to close the connection. + if (!payloadData.contains("requestId")) { + if (!session->IgnoreInvalidMessages()) { + ret.closeCode = WebSocketCloseCode::MissingDataKey; + ret.closeReason = "Your payload data is missing a `requestId`."; + } + return; + } + + RequestHandler requestHandler(session); + Request request(payloadData["requestType"], payloadData["requestData"]); + + RequestResult requestResult = requestHandler.ProcessRequest(request); + + json resultPayloadData; + resultPayloadData["requestType"] = payloadData["requestType"]; + resultPayloadData["requestId"] = payloadData["requestId"]; + resultPayloadData["requestStatus"] = { + {"result", requestResult.StatusCode == RequestStatus::Success}, + {"code", requestResult.StatusCode} + }; + if (!requestResult.Comment.empty()) + resultPayloadData["requestStatus"]["comment"] = requestResult.Comment; + if (requestResult.ResponseData.is_object()) + resultPayloadData["responseData"] = requestResult.ResponseData; + ret.result["op"] = WebSocketOpCode::RequestResponse; + ret.result["d"] = resultPayloadData; + } return; + case WebSocketOpCode::RequestBatch: { // RequestBatch + // RequestID checking has to be done here where we are able to close the connection. + if (!payloadData.contains("requestId")) { + if (!session->IgnoreInvalidMessages()) { + ret.closeCode = WebSocketCloseCode::MissingDataKey; + ret.closeReason = "Your payload data is missing a `requestId`."; + } + return; + } + + if (!payloadData.contains("requests")) { + if (!session->IgnoreInvalidMessages()) { + ret.closeCode = WebSocketCloseCode::MissingDataKey; + ret.closeReason = "Your payload data is missing a `requests`."; + } + return; + } + + if (!payloadData["requests"].is_array()) { + if (!session->IgnoreInvalidMessages()) { + ret.closeCode = WebSocketCloseCode::InvalidDataKeyType; + ret.closeReason = "Your `requests` is not an array."; + } + return; + } + + std::vector requests = payloadData["requests"]; + json results = json::array(); + + RequestHandler requestHandler(session); + for (auto requestJson : requests) { + Request request(requestJson["requestType"], requestJson["requestData"]); + + RequestResult requestResult = requestHandler.ProcessRequest(request); + + json result; + result["requestType"] = requestJson["requestType"]; + + if (requestJson.contains("requestId")) + result["requestId"] = requestJson["requestId"]; + + result["requestStatus"] = { + {"result", requestResult.StatusCode == RequestStatus::Success}, + {"code", requestResult.StatusCode} + }; + + if (!requestResult.Comment.empty()) + result["requestStatus"]["comment"] = requestResult.Comment; + + if (requestResult.ResponseData.is_object()) + result["responseData"] = requestResult.ResponseData; + + results.push_back(result); + } + + ret.result["op"] = WebSocketOpCode::RequestBatchResponse; + ret.result["d"]["requestId"] = payloadData["requestId"]; + ret.result["d"]["results"] = results; + } return; + default: + if (!session->IgnoreInvalidMessages()) { + ret.closeCode = WebSocketCloseCode::UnknownOpCode; + ret.closeReason = std::string("Unknown OpCode: %s") + std::to_string(opCode); + } + return; + } +} + +void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion) +{ + if (!_server.is_listening()) + return; + + QtConcurrent::run(&_threadPool, [=]() { + // Populate message object + json eventMessage; + eventMessage["op"] = 5; + eventMessage["d"]["eventType"] = eventType; + eventMessage["d"]["eventIntent"] = requiredIntent; + if (eventData.is_object()) + eventMessage["d"]["eventData"] = eventData; + + // Initialize objects. The broadcast process only dumps the data when its needed. + std::string messageJson; + std::string messageMsgPack; + + // Recurse connected sessions and send the event to suitable sessions. + std::unique_lock lock(_sessionMutex); + for (auto & it : _sessions) { + if (!it.second->IsIdentified()) { + continue; + } + if (rpcVersion && it.second->RpcVersion() != rpcVersion) { + continue; + } + if ((it.second->EventSubscriptions() & requiredIntent) != 0) { + websocketpp::lib::error_code errorCode; + switch (it.second->Encoding()) { + case WebSocketEncoding::Json: + if (messageJson.empty()) { + messageJson = eventMessage.dump(); + } + _server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text, errorCode); + it.second->IncrementOutgoingMessages(); + break; + case WebSocketEncoding::MsgPack: + if (messageMsgPack.empty()) { + auto msgPackData = json::to_msgpack(eventMessage); + messageMsgPack = std::string(msgPackData.begin(), msgPackData.end()); + } + _server.send((websocketpp::connection_hdl)it.first, messageMsgPack, websocketpp::frame::opcode::binary, errorCode); + it.second->IncrementOutgoingMessages(); + break; + } + if (errorCode) + blog(LOG_ERROR, "[WebSocketServer::BroadcastEvent] Error sending event message: %s", errorCode.message().c_str()); + } + } + lock.unlock(); + if (_debugEnabled && (EventSubscription::All & requiredIntent) != 0) // Don't log high volume events + blog(LOG_INFO, "[WebSocketServer::BroadcastEvent] Outgoing event:\n%s", eventMessage.dump(2).c_str()); + }); +} diff --git a/src/eventhandler/EventHandler.cpp b/src/eventhandler/EventHandler.cpp index 167939c5..0a9e7368 100644 --- a/src/eventhandler/EventHandler.cpp +++ b/src/eventhandler/EventHandler.cpp @@ -1,9 +1,12 @@ #include "EventHandler.h" #include "../plugin-macros.generated.h" -EventHandler::EventHandler(WebSocketServerPtr webSocketServer) : - _webSocketServer(webSocketServer), - _obsLoaded(false) +EventHandler::EventHandler() : + _obsLoaded(false), + _inputVolumeMetersRef(0), + _inputActiveStateChangedRef(0), + _inputShowStateChangedRef(0), + _sceneItemTransformChangedRef(0) { blog(LOG_INFO, "[EventHandler::EventHandler] Setting up event handlers..."); @@ -41,6 +44,46 @@ EventHandler::~EventHandler() blog(LOG_INFO, "[EventHandler::~EventHandler] Finished."); } +void EventHandler::SetBroadcastCallback(EventHandler::BroadcastCallback cb) +{ + _broadcastCallback = cb; +} + +// Function to increment refcounts for high volume event subscriptions +void EventHandler::ProcessSubscription(uint64_t eventSubscriptions) +{ + if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) + _inputVolumeMetersRef++; + if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0) + _inputActiveStateChangedRef++; + if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0) + _inputShowStateChangedRef++; + if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0) + _sceneItemTransformChangedRef++; +} + +// Function to decrement refcounts for high volume event subscriptions +void EventHandler::ProcessUnsubscription(uint64_t eventSubscriptions) +{ + if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) + _inputVolumeMetersRef--; + if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0) + _inputActiveStateChangedRef--; + if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0) + _inputShowStateChangedRef--; + if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0) + _sceneItemTransformChangedRef--; +} + +// Function required in order to use default arguments +void EventHandler::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion) +{ + if (!_broadcastCallback) + return; + + _broadcastCallback(requiredIntent, eventType, eventData, rpcVersion); +} + void EventHandler::ConnectSourceSignals(obs_source_t *source) // Applies to inputs and scenes { if (!source || obs_source_removed(source)) diff --git a/src/eventhandler/EventHandler.h b/src/eventhandler/EventHandler.h index 996409fd..a57b237e 100644 --- a/src/eventhandler/EventHandler.h +++ b/src/eventhandler/EventHandler.h @@ -1,12 +1,12 @@ #pragma once +#include #include #include #include #include "types/EventSubscription.h" #include "../obs-websocket.h" -#include "../WebSocketServer.h" #include "../utils/Obs.h" template T* GetCalldataPointer(const calldata_t *data, const char* name) { @@ -18,17 +18,30 @@ template T* GetCalldataPointer(const calldata_t *data, const char* class EventHandler { public: - EventHandler(WebSocketServerPtr webSocketServer); + EventHandler(); ~EventHandler(); + typedef std::function BroadcastCallback; + void SetBroadcastCallback(BroadcastCallback cb); + + void ProcessSubscription(uint64_t eventSubscriptions); + void ProcessUnsubscription(uint64_t eventSubscriptions); + private: - WebSocketServerPtr _webSocketServer; + BroadcastCallback _broadcastCallback; std::atomic _obsLoaded; + std::atomic _inputVolumeMetersRef; + std::atomic _inputActiveStateChangedRef; + std::atomic _inputShowStateChangedRef; + std::atomic _sceneItemTransformChangedRef; + void ConnectSourceSignals(obs_source_t *source); void DisconnectSourceSignals(obs_source_t *source); + void BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData = nullptr, uint8_t rpcVersion = 1); + // Signal handler: frontend static void OnFrontendEvent(enum obs_frontend_event event, void *private_data); diff --git a/src/eventhandler/EventHandler_Config.cpp b/src/eventhandler/EventHandler_Config.cpp index 4d376ff5..d3e4fc8b 100644 --- a/src/eventhandler/EventHandler_Config.cpp +++ b/src/eventhandler/EventHandler_Config.cpp @@ -5,26 +5,26 @@ void EventHandler::HandleCurrentSceneCollectionChanged() { json eventData; eventData["sceneCollectionName"] = Utils::Obs::StringHelper::GetCurrentSceneCollection(); - _webSocketServer->BroadcastEvent(EventSubscription::Config, "CurrentSceneCollectionChanged", eventData); + BroadcastEvent(EventSubscription::Config, "CurrentSceneCollectionChanged", eventData); } void EventHandler::HandleSceneCollectionListChanged() { json eventData; eventData["sceneCollections"] = Utils::Obs::ListHelper::GetSceneCollectionList(); - _webSocketServer->BroadcastEvent(EventSubscription::Config, "SceneCollectionListChanged", eventData); + BroadcastEvent(EventSubscription::Config, "SceneCollectionListChanged", eventData); } void EventHandler::HandleCurrentProfileChanged() { json eventData; eventData["profileName"] = Utils::Obs::StringHelper::GetCurrentProfile(); - _webSocketServer->BroadcastEvent(EventSubscription::Config, "CurrentProfileChanged", eventData); + BroadcastEvent(EventSubscription::Config, "CurrentProfileChanged", eventData); } void EventHandler::HandleProfileListChanged() { json eventData; eventData["profiles"] = Utils::Obs::ListHelper::GetProfileList(); - _webSocketServer->BroadcastEvent(EventSubscription::Config, "ProfileListChanged", eventData); + BroadcastEvent(EventSubscription::Config, "ProfileListChanged", eventData); } diff --git a/src/eventhandler/EventHandler_General.cpp b/src/eventhandler/EventHandler_General.cpp index ad57635a..0fb1d96b 100644 --- a/src/eventhandler/EventHandler_General.cpp +++ b/src/eventhandler/EventHandler_General.cpp @@ -3,12 +3,12 @@ void EventHandler::HandleExitStarted() { - _webSocketServer->BroadcastEvent(EventSubscription::General, "ExitStarted"); + BroadcastEvent(EventSubscription::General, "ExitStarted"); } void EventHandler::HandleStudioModeStateChanged(bool enabled) { json eventData; eventData["studioModeEnabled"] = enabled; - _webSocketServer->BroadcastEvent(EventSubscription::General, "StudioModeStateChanged", eventData); + BroadcastEvent(EventSubscription::General, "StudioModeStateChanged", eventData); } diff --git a/src/eventhandler/EventHandler_Inputs.cpp b/src/eventhandler/EventHandler_Inputs.cpp index d557e97e..12186201 100644 --- a/src/eventhandler/EventHandler_Inputs.cpp +++ b/src/eventhandler/EventHandler_Inputs.cpp @@ -13,14 +13,14 @@ void EventHandler::HandleInputCreated(obs_source_t *source) eventData["unversionedInputKind"] = obs_source_get_unversioned_id(source); eventData["inputSettings"] = Utils::Json::ObsDataToJson(inputSettings); eventData["defaultInputSettings"] = Utils::Json::ObsDataToJson(defaultInputSettings, true); - _webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputCreated", eventData); + BroadcastEvent(EventSubscription::Inputs, "InputCreated", eventData); } void EventHandler::HandleInputRemoved(obs_source_t *source) { json eventData; eventData["inputName"] = obs_source_get_name(source); - _webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputRemoved", eventData); + BroadcastEvent(EventSubscription::Inputs, "InputRemoved", eventData); } void EventHandler::HandleInputNameChanged(obs_source_t *source, std::string oldInputName, std::string inputName) @@ -28,13 +28,16 @@ void EventHandler::HandleInputNameChanged(obs_source_t *source, std::string oldI json eventData; eventData["oldInputName"] = oldInputName; eventData["inputName"] = inputName; - _webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputNameChanged", eventData); + BroadcastEvent(EventSubscription::Inputs, "InputNameChanged", eventData); } void EventHandler::HandleInputActiveStateChanged(void *param, calldata_t *data) { auto eventHandler = reinterpret_cast(param); + if (!eventHandler->_inputActiveStateChangedRef.load()) + return; + obs_source_t *source = GetCalldataPointer(data, "source"); if (!source) return; @@ -45,13 +48,16 @@ void EventHandler::HandleInputActiveStateChanged(void *param, calldata_t *data) json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["videoActive"] = obs_source_active(source); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::InputActiveStateChanged, "InputActiveStateChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::InputActiveStateChanged, "InputActiveStateChanged", eventData); } void EventHandler::HandleInputShowStateChanged(void *param, calldata_t *data) { auto eventHandler = reinterpret_cast(param); + if (!eventHandler->_inputShowStateChangedRef.load()) + return; + obs_source_t *source = GetCalldataPointer(data, "source"); if (!source) return; @@ -62,7 +68,7 @@ void EventHandler::HandleInputShowStateChanged(void *param, calldata_t *data) json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["videoShowing"] = obs_source_showing(source); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::InputShowStateChanged, "InputShowStateChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::InputShowStateChanged, "InputShowStateChanged", eventData); } void EventHandler::HandleInputMuteStateChanged(void *param, calldata_t *data) @@ -79,7 +85,7 @@ void EventHandler::HandleInputMuteStateChanged(void *param, calldata_t *data) json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["inputMuted"] = obs_source_muted(source); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputMuteStateChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputMuteStateChanged", eventData); } void EventHandler::HandleInputVolumeChanged(void *param, calldata_t *data) @@ -104,7 +110,7 @@ void EventHandler::HandleInputVolumeChanged(void *param, calldata_t *data) eventData["inputName"] = obs_source_get_name(source); eventData["inputVolumeMul"] = inputVolumeMul; eventData["inputVolumeDb"] = inputVolumeDb; - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputVolumeChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputVolumeChanged", eventData); } void EventHandler::HandleInputAudioSyncOffsetChanged(void *param, calldata_t *data) @@ -123,7 +129,7 @@ void EventHandler::HandleInputAudioSyncOffsetChanged(void *param, calldata_t *da json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["inputAudioSyncOffset"] = inputAudioSyncOffset / 1000000; - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputAudioSyncOffsetChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputAudioSyncOffsetChanged", eventData); } void EventHandler::HandleInputAudioTracksChanged(void *param, calldata_t *data) @@ -147,7 +153,7 @@ void EventHandler::HandleInputAudioTracksChanged(void *param, calldata_t *data) json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["inputAudioTracks"] = inputAudioTracks; - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputAudioTracksChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputAudioTracksChanged", eventData); } void EventHandler::HandleInputAudioMonitorTypeChanged(void *param, calldata_t *data) @@ -180,5 +186,5 @@ void EventHandler::HandleInputAudioMonitorTypeChanged(void *param, calldata_t *d json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["monitorType"] = monitorTypeString; - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputAudioMonitorTypeChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputAudioMonitorTypeChanged", eventData); } diff --git a/src/eventhandler/EventHandler_MediaInputs.cpp b/src/eventhandler/EventHandler_MediaInputs.cpp index 136940b1..70237337 100644 --- a/src/eventhandler/EventHandler_MediaInputs.cpp +++ b/src/eventhandler/EventHandler_MediaInputs.cpp @@ -112,7 +112,7 @@ void EventHandler::HandleMediaInputPlaybackStarted(void *param, calldata_t *data json eventData; eventData["inputName"] = obs_source_get_name(source); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackStarted", eventData); + eventHandler->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackStarted", eventData); } void EventHandler::HandleMediaInputPlaybackEnded(void *param, calldata_t *data) @@ -128,7 +128,7 @@ void EventHandler::HandleMediaInputPlaybackEnded(void *param, calldata_t *data) json eventData; eventData["inputName"] = obs_source_get_name(source); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackEnded", eventData); + eventHandler->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackEnded", eventData); } void EventHandler::HandleMediaInputActionTriggered(obs_source_t *source, ObsMediaInputAction action) @@ -136,5 +136,5 @@ void EventHandler::HandleMediaInputActionTriggered(obs_source_t *source, ObsMedi json eventData; eventData["inputName"] = obs_source_get_name(source); eventData["mediaAction"] = GetMediaInputActionString(action); - _webSocketServer->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputActionTriggered", eventData); + BroadcastEvent(EventSubscription::MediaInputs, "MediaInputActionTriggered", eventData); } \ No newline at end of file diff --git a/src/eventhandler/EventHandler_Outputs.cpp b/src/eventhandler/EventHandler_Outputs.cpp index b2cd26f9..fceb21ba 100644 --- a/src/eventhandler/EventHandler_Outputs.cpp +++ b/src/eventhandler/EventHandler_Outputs.cpp @@ -35,7 +35,7 @@ void EventHandler::HandleStreamStateChanged(ObsOutputState state) json eventData; eventData["outputActive"] = GetOutputStateActive(state); eventData["outputState"] = GetOutputStateString(state); - _webSocketServer->BroadcastEvent(EventSubscription::Outputs, "StreamStateChanged", eventData); + BroadcastEvent(EventSubscription::Outputs, "StreamStateChanged", eventData); } void EventHandler::HandleRecordStateChanged(ObsOutputState state) @@ -43,7 +43,7 @@ void EventHandler::HandleRecordStateChanged(ObsOutputState state) json eventData; eventData["outputActive"] = GetOutputStateActive(state); eventData["outputState"] = GetOutputStateString(state); - _webSocketServer->BroadcastEvent(EventSubscription::Outputs, "RecordStateChanged", eventData); + BroadcastEvent(EventSubscription::Outputs, "RecordStateChanged", eventData); } void EventHandler::HandleReplayBufferStateChanged(ObsOutputState state) @@ -51,7 +51,7 @@ void EventHandler::HandleReplayBufferStateChanged(ObsOutputState state) json eventData; eventData["outputActive"] = GetOutputStateActive(state); eventData["outputState"] = GetOutputStateString(state); - _webSocketServer->BroadcastEvent(EventSubscription::Outputs, "ReplayBufferStateChanged", eventData); + BroadcastEvent(EventSubscription::Outputs, "ReplayBufferStateChanged", eventData); } void EventHandler::HandleVirtualcamStateChanged(ObsOutputState state) @@ -59,12 +59,12 @@ void EventHandler::HandleVirtualcamStateChanged(ObsOutputState state) json eventData; eventData["outputActive"] = GetOutputStateActive(state); eventData["outputState"] = GetOutputStateString(state); - _webSocketServer->BroadcastEvent(EventSubscription::Outputs, "VirtualcamStateChanged", eventData); + BroadcastEvent(EventSubscription::Outputs, "VirtualcamStateChanged", eventData); } void EventHandler::HandleReplayBufferSaved() { json eventData; eventData["savedReplayPath"] = Utils::Obs::StringHelper::GetLastReplayBufferFilePath(); - _webSocketServer->BroadcastEvent(EventSubscription::Outputs, "ReplayBufferSaved", eventData); + BroadcastEvent(EventSubscription::Outputs, "ReplayBufferSaved", eventData); } diff --git a/src/eventhandler/EventHandler_SceneItems.cpp b/src/eventhandler/EventHandler_SceneItems.cpp index d42a5cf7..87e28913 100644 --- a/src/eventhandler/EventHandler_SceneItems.cpp +++ b/src/eventhandler/EventHandler_SceneItems.cpp @@ -18,7 +18,7 @@ void EventHandler::HandleSceneItemCreated(void *param, calldata_t *data) eventData["inputName"] = obs_source_get_name(obs_sceneitem_get_source(sceneItem)); eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem); eventData["sceneItemIndex"] = obs_sceneitem_get_order_position(sceneItem); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemCreated", eventData); + eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemCreated", eventData); } // Will not be emitted if an item is removed due to the parent scene being removed. @@ -39,7 +39,7 @@ void EventHandler::HandleSceneItemRemoved(void *param, calldata_t *data) eventData["inputName"] = obs_source_get_name(obs_sceneitem_get_source(sceneItem)); eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem); eventData["sceneItemIndex"] = obs_sceneitem_get_order_position(sceneItem); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemRemoved", eventData); + eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemRemoved", eventData); } void EventHandler::HandleSceneItemListReindexed(void *param, calldata_t *data) @@ -53,7 +53,7 @@ void EventHandler::HandleSceneItemListReindexed(void *param, calldata_t *data) json eventData; eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene)); eventData["sceneItems"] = Utils::Obs::ListHelper::GetSceneItemList(scene, true); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemListReindexed", eventData); + eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemListReindexed", eventData); } void EventHandler::HandleSceneItemEnableStateChanged(void *param, calldata_t *data) @@ -74,7 +74,7 @@ void EventHandler::HandleSceneItemEnableStateChanged(void *param, calldata_t *da eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene)); eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem); eventData["sceneItemEnabled"] = sceneItemEnabled; - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemEnableStateChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemEnableStateChanged", eventData); } void EventHandler::HandleSceneItemLockStateChanged(void *param, calldata_t *data) @@ -95,13 +95,16 @@ void EventHandler::HandleSceneItemLockStateChanged(void *param, calldata_t *data eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene)); eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem); eventData["sceneItemLocked"] = sceneItemLocked; - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemLockStateChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemLockStateChanged", eventData); } void EventHandler::HandleSceneItemTransformChanged(void *param, calldata_t *data) { auto eventHandler = reinterpret_cast(param); + if (!eventHandler->_sceneItemTransformChangedRef.load()) + return; + obs_scene_t *scene = GetCalldataPointer(data, "scene"); if (!scene) return; @@ -114,5 +117,5 @@ void EventHandler::HandleSceneItemTransformChanged(void *param, calldata_t *data eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene)); eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem); eventData["sceneItemTransform"] = Utils::Obs::DataHelper::GetSceneItemTransform(sceneItem); - eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemTransformChanged", eventData); + eventHandler->BroadcastEvent(EventSubscription::SceneItemTransformChanged, "SceneItemTransformChanged", eventData); } diff --git a/src/eventhandler/EventHandler_Scenes.cpp b/src/eventhandler/EventHandler_Scenes.cpp index 37fcca75..409e4c4f 100644 --- a/src/eventhandler/EventHandler_Scenes.cpp +++ b/src/eventhandler/EventHandler_Scenes.cpp @@ -6,7 +6,7 @@ void EventHandler::HandleSceneCreated(obs_source_t *source) json eventData; eventData["sceneName"] = obs_source_get_name(source); eventData["isGroup"] = obs_source_is_group(source); - _webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneCreated", eventData); + BroadcastEvent(EventSubscription::Scenes, "SceneCreated", eventData); } void EventHandler::HandleSceneRemoved(obs_source_t *source) @@ -14,7 +14,7 @@ void EventHandler::HandleSceneRemoved(obs_source_t *source) json eventData; eventData["sceneName"] = obs_source_get_name(source); eventData["isGroup"] = obs_source_is_group(source); - _webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneRemoved", eventData); + BroadcastEvent(EventSubscription::Scenes, "SceneRemoved", eventData); } void EventHandler::HandleSceneNameChanged(obs_source_t *source, std::string oldSceneName, std::string sceneName) @@ -22,7 +22,7 @@ void EventHandler::HandleSceneNameChanged(obs_source_t *source, std::string oldS json eventData; eventData["oldSceneName"] = oldSceneName; eventData["sceneName"] = sceneName; - _webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneNameChanged", eventData); + BroadcastEvent(EventSubscription::Scenes, "SceneNameChanged", eventData); } void EventHandler::HandleCurrentSceneChanged() @@ -31,7 +31,7 @@ void EventHandler::HandleCurrentSceneChanged() json eventData; eventData["sceneName"] = obs_source_get_name(currentScene); - _webSocketServer->BroadcastEvent(EventSubscription::Scenes, "CurrentSceneChanged", eventData); + BroadcastEvent(EventSubscription::Scenes, "CurrentSceneChanged", eventData); } void EventHandler::HandleCurrentPreviewSceneChanged() @@ -44,12 +44,12 @@ void EventHandler::HandleCurrentPreviewSceneChanged() json eventData; eventData["sceneName"] = obs_source_get_name(currentPreviewScene); - _webSocketServer->BroadcastEvent(EventSubscription::Scenes, "CurrentPreviewSceneChanged", eventData); + BroadcastEvent(EventSubscription::Scenes, "CurrentPreviewSceneChanged", eventData); } void EventHandler::HandleSceneListChanged() { json eventData; eventData["scenes"] = Utils::Obs::ListHelper::GetSceneList(); - _webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneListChanged", eventData); + BroadcastEvent(EventSubscription::Scenes, "SceneListChanged", eventData); } diff --git a/src/eventhandler/EventHandler_Transitions.cpp b/src/eventhandler/EventHandler_Transitions.cpp index 9dcc9b63..00bdfe59 100644 --- a/src/eventhandler/EventHandler_Transitions.cpp +++ b/src/eventhandler/EventHandler_Transitions.cpp @@ -7,14 +7,14 @@ void EventHandler::HandleTransitionCreated(obs_source_t *source) eventData["transitionName"] = obs_source_get_name(source); eventData["transitionKind"] = obs_source_get_id(source); eventData["transitionFixed"] = obs_transition_fixed(source); - _webSocketServer->BroadcastEvent(EventSubscription::Transitions, "TransitionCreated", eventData); + BroadcastEvent(EventSubscription::Transitions, "TransitionCreated", eventData); } void EventHandler::HandleTransitionRemoved(obs_source_t *source) { json eventData; eventData["transitionName"] = obs_source_get_name(source); - _webSocketServer->BroadcastEvent(EventSubscription::Transitions, "TransitionRemoved", eventData); + BroadcastEvent(EventSubscription::Transitions, "TransitionRemoved", eventData); } void EventHandler::HandleTransitionNameChanged(obs_source_t *source, std::string oldTransitionName, std::string transitionName) @@ -22,5 +22,5 @@ void EventHandler::HandleTransitionNameChanged(obs_source_t *source, std::string json eventData; eventData["oldTransitionName"] = oldTransitionName; eventData["transitionName"] = transitionName; - _webSocketServer->BroadcastEvent(EventSubscription::Transitions, "TransitionNameChanged", eventData); + BroadcastEvent(EventSubscription::Transitions, "TransitionNameChanged", eventData); } diff --git a/src/eventhandler/types/EventSubscription.h b/src/eventhandler/types/EventSubscription.h index cc619408..aed68177 100644 --- a/src/eventhandler/types/EventSubscription.h +++ b/src/eventhandler/types/EventSubscription.h @@ -30,5 +30,7 @@ namespace EventSubscription { InputActiveStateChanged = (1 << 10), // InputShowStateChanged event (high-volume) InputShowStateChanged = (1 << 11), + // SceneItemTransformChanged event (high-volume) + SceneItemTransformChanged = (1 << 12), }; }; diff --git a/src/obs-websocket.cpp b/src/obs-websocket.cpp index 9668d6dc..33a1b841 100644 --- a/src/obs-websocket.cpp +++ b/src/obs-websocket.cpp @@ -44,9 +44,10 @@ bool obs_module_load(void) _config = ConfigPtr(new Config()); _config->Load(); - _webSocketServer = WebSocketServerPtr(new WebSocketServer()); + // Initialize event handler before server, as the server configures the event handler. + _eventHandler = EventHandlerPtr(new EventHandler()); - _eventHandler = EventHandlerPtr(new EventHandler(_webSocketServer)); + _webSocketServer = WebSocketServerPtr(new WebSocketServer()); obs_frontend_push_ui_translation(obs_module_get_string); QMainWindow* mainWindow = reinterpret_cast(obs_frontend_get_main_window()); diff --git a/src/requesthandler/RequestHandler.cpp b/src/requesthandler/RequestHandler.cpp index e981b339..461a42d2 100644 --- a/src/requesthandler/RequestHandler.cpp +++ b/src/requesthandler/RequestHandler.cpp @@ -86,6 +86,11 @@ const std::map RequestHandler::_handlerMap {"StopStream", &RequestHandler::StopStream}, }; +RequestHandler::RequestHandler(SessionPtr session) : + _session(session) +{ +} + RequestResult RequestHandler::ProcessRequest(const Request& request) { if (!request.RequestData.is_object() && !request.RequestData.is_null()) diff --git a/src/requesthandler/RequestHandler.h b/src/requesthandler/RequestHandler.h index 1ead4cb9..aeab5621 100644 --- a/src/requesthandler/RequestHandler.h +++ b/src/requesthandler/RequestHandler.h @@ -4,8 +4,10 @@ #include #include +#include "rpc/RequestStatus.h" #include "rpc/Request.h" #include "rpc/RequestResult.h" +#include "../WebSocketSession.h" #include "../obs-websocket.h" #include "../utils/Obs.h" #include "../plugin-macros.generated.h" @@ -15,6 +17,8 @@ typedef RequestResult(RequestHandler::*RequestMethodHandler)(const Request&); class RequestHandler { public: + RequestHandler(SessionPtr session); + RequestResult ProcessRequest(const Request& request); std::vector GetRequestList(); @@ -101,5 +105,6 @@ class RequestHandler { RequestResult StartStream(const Request&); RequestResult StopStream(const Request&); + SessionPtr _session; static const std::map _handlerMap; }; diff --git a/src/requesthandler/RequestHandler_General.cpp b/src/requesthandler/RequestHandler_General.cpp index b8507afd..cbfb644b 100644 --- a/src/requesthandler/RequestHandler_General.cpp +++ b/src/requesthandler/RequestHandler_General.cpp @@ -43,8 +43,8 @@ RequestResult RequestHandler::GetStats(const Request& request) { json responseData = Utils::Obs::DataHelper::GetStats(); - responseData["webSocketSessionIncomingMessages"] = request.Session->IncomingMessages(); - responseData["webSocketSessionOutgoingMessages"] = request.Session->OutgoingMessages(); + responseData["webSocketSessionIncomingMessages"] = _session->IncomingMessages(); + responseData["webSocketSessionOutgoingMessages"] = _session->OutgoingMessages(); return RequestResult::Success(responseData); } diff --git a/src/requesthandler/rpc/Request.cpp b/src/requesthandler/rpc/Request.cpp index cdf8810f..cc326b50 100644 --- a/src/requesthandler/rpc/Request.cpp +++ b/src/requesthandler/rpc/Request.cpp @@ -10,11 +10,8 @@ json GetDefaultJsonObject(json requestData) return requestData; } -Request::Request(SessionPtr session, std::string requestType, json requestData) : - Session(session), +Request::Request(std::string requestType, json requestData) : HasRequestData(requestData.is_object()), - RpcVersion(session->RpcVersion()), - IgnoreNonFatalRequestChecks(session->IgnoreNonFatalRequestChecks()), RequestType(requestType), RequestData(GetDefaultJsonObject(requestData)) { diff --git a/src/requesthandler/rpc/Request.h b/src/requesthandler/rpc/Request.h index d6e50605..26f6f638 100644 --- a/src/requesthandler/rpc/Request.h +++ b/src/requesthandler/rpc/Request.h @@ -1,7 +1,6 @@ #pragma once #include "RequestStatus.h" -#include "../../WebSocketSession.h" #include "../../utils/Json.h" enum ObsWebSocketSceneFilter { @@ -12,7 +11,7 @@ enum ObsWebSocketSceneFilter { struct Request { - Request(SessionPtr session, const std::string requestType, const json requestData = nullptr); + Request(const std::string requestType, const json requestData = nullptr); // Contains the key and is not null const bool Contains(const std::string keyName) const; @@ -35,10 +34,7 @@ struct Request obs_source_t *ValidateInput(const std::string keyName, RequestStatus::RequestStatus &statusCode, std::string &comment) const; obs_sceneitem_t *ValidateSceneItem(const std::string sceneKeyName, const std::string sceneItemIdKeyName, RequestStatus::RequestStatus &statusCode, std::string &comment, const ObsWebSocketSceneFilter filter = OBS_WEBSOCKET_SCENE_FILTER_SCENE_ONLY) const; - SessionPtr Session; const bool HasRequestData; - const uint8_t RpcVersion; - const bool IgnoreNonFatalRequestChecks; const std::string RequestType; const json RequestData; -}; \ No newline at end of file +};