Base: Large plugin refactor

- Merge WebSocketProtocol into WebSocketServer
  - Having them separated was not doing anything productive
- Request: Move SessionPtr to RequestHandler
  - Less copying to do for batch requests
- Fully modularize EventHandler
  - Make BroadcastEvent a stored callback that WebSocketServer sets
- Return early on high volume events to avoid unnecessary compute
  - These events will only generate a json object when it is actually
needed
This commit is contained in:
tt2468 2021-09-04 10:04:00 -07:00
parent 537595658d
commit 7e1e1bc33c
22 changed files with 486 additions and 407 deletions

View File

@ -86,7 +86,7 @@ set(obs-websocket_SOURCES
src/obs-websocket.cpp
src/Config.cpp
src/WebSocketServer.cpp
src/WebSocketProtocol.cpp
src/WebSocketServer_Protocol.cpp
src/WebSocketSession.cpp
src/eventhandler/EventHandler.cpp
src/eventhandler/EventHandler_General.cpp
@ -121,7 +121,6 @@ set(obs-websocket_HEADERS
src/obs-websocket.h
src/Config.h
src/WebSocketServer.h
src/WebSocketProtocol.h
src/WebSocketSession.h
src/eventhandler/EventHandler.h
src/eventhandler/types/EventSubscription.h

View File

@ -1,23 +0,0 @@
#pragma once
#include <vector>
#include <string>
#include "WebSocketServer.h"
class WebSocketSession;
typedef std::shared_ptr<WebSocketSession> SessionPtr;
namespace WebSocketProtocol {
const std::vector<uint8_t> SupportedRpcVersions{
1
};
struct ProcessResult {
WebSocketServer::WebSocketCloseCode closeCode = WebSocketServer::WebSocketCloseCode::DontClose;
std::string closeReason;
json result;
};
void ProcessMessage(SessionPtr session, ProcessResult &ret, uint8_t opCode, json incomingMessage);
}

View File

@ -6,8 +6,7 @@
#include <obs-frontend-api.h>
#include "WebSocketServer.h"
#include "WebSocketProtocol.h"
#include "eventhandler/types/EventSubscription.h"
#include "eventhandler/EventHandler.h"
#include "obs-websocket.h"
#include "Config.h"
#include "utils/Crypto.h"
@ -46,6 +45,13 @@ WebSocketServer::WebSocketServer() :
&WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2
)
);
auto eventHandler = GetEventHandler();
eventHandler->SetBroadcastCallback(
std::bind(
&WebSocketServer::BroadcastEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4
)
);
}
WebSocketServer::~WebSocketServer()
@ -194,61 +200,6 @@ std::vector<WebSocketServer::WebSocketSessionState> WebSocketServer::GetWebSocke
}
// It isn't consistent to directly call the WebSocketServer from the events system, but it would also be dumb to make it unnecessarily complicated.
void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion)
{
if (!_server.is_listening())
return;
QtConcurrent::run(&_threadPool, [=]() {
// Populate message object
json eventMessage;
eventMessage["op"] = 5;
eventMessage["d"]["eventType"] = eventType;
eventMessage["d"]["eventIntent"] = requiredIntent;
if (eventData.is_object())
eventMessage["d"]["eventData"] = eventData;
// Initialize objects. The broadcast process only dumps the data when its needed.
std::string messageJson;
std::string messageMsgPack;
// Recurse connected sessions and send the event to suitable sessions.
std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto & it : _sessions) {
if (!it.second->IsIdentified()) {
continue;
}
if (rpcVersion && it.second->RpcVersion() != rpcVersion) {
continue;
}
if ((it.second->EventSubscriptions() & requiredIntent) != 0) {
websocketpp::lib::error_code errorCode;
switch (it.second->Encoding()) {
case WebSocketEncoding::Json:
if (messageJson.empty()) {
messageJson = eventMessage.dump();
}
_server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text, errorCode);
it.second->IncrementOutgoingMessages();
break;
case WebSocketEncoding::MsgPack:
if (messageMsgPack.empty()) {
auto msgPackData = json::to_msgpack(eventMessage);
messageMsgPack = std::string(msgPackData.begin(), msgPackData.end());
}
_server.send((websocketpp::connection_hdl)it.first, messageMsgPack, websocketpp::frame::opcode::binary, errorCode);
it.second->IncrementOutgoingMessages();
break;
}
if (errorCode)
blog(LOG_ERROR, "[WebSocketServer::BroadcastEvent] Error sending event message: %s", errorCode.message().c_str());
}
}
lock.unlock();
if (_debugEnabled && (EventSubscription::All & requiredIntent) != 0) // Don't log high volume events
blog(LOG_INFO, "[WebSocketServer::BroadcastEvent] Outgoing event:\n%s", eventMessage.dump(2).c_str());
});
}
bool WebSocketServer::onValidate(websocketpp::connection_hdl hdl)
{
@ -340,6 +291,7 @@ void WebSocketServer::onClose(websocketpp::connection_hdl hdl)
// Get info from the session and then delete it
std::unique_lock<std::mutex> lock(_sessionMutex);
SessionPtr session = _sessions[hdl];
uint64_t eventSubscriptions = session->EventSubscriptions();
bool isIdentified = session->IsIdentified();
uint64_t connectedAt = session->ConnectedAt();
uint64_t incomingMessages = session->IncomingMessages();
@ -348,6 +300,12 @@ void WebSocketServer::onClose(websocketpp::connection_hdl hdl)
_sessions.erase(hdl);
lock.unlock();
// If client was identified, decrement appropriate refs in eventhandler.
if (isIdentified) {
auto eventHandler = GetEventHandler();
eventHandler->ProcessUnsubscription(eventSubscriptions);
}
// Build SessionState object for signal
WebSocketSessionState state;
state.remoteAddress = remoteAddress;
@ -431,12 +389,12 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se
if (_debugEnabled)
blog(LOG_INFO, "[WebSocketServer::onMessage] Incoming message (decoded):\n%s", incomingMessage.dump(2).c_str());
WebSocketProtocol::ProcessResult ret;
ProcessResult ret;
// Verify incoming message is an object
if (!incomingMessage.is_object()) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::MessageDecodeError;
ret.closeCode = WebSocketCloseCode::MessageDecodeError;
ret.closeReason = "You sent a non-object payload.";
goto skipProcessing;
}
@ -445,8 +403,8 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se
// Disconnect client if 4.x protocol is detected
if (!session->IsIdentified() && incomingMessage.contains("request-type")) {
blog(LOG_WARNING, "[WebSocketProtocol::ProcessMessage] Client %s appears to be running a pre-5.0.0 protocol.", session->RemoteAddress().c_str());
ret.closeCode = WebSocketServer::WebSocketCloseCode::UnsupportedRpcVersion;
blog(LOG_WARNING, "[WebSocketServer::onMessage] Client %s appears to be running a pre-5.0.0 protocol.", session->RemoteAddress().c_str());
ret.closeCode = WebSocketCloseCode::UnsupportedRpcVersion;
ret.closeReason = "You appear to be attempting to connect with the pre-5.0.0 plugin protocol. Check to make sure your client is updated.";
goto skipProcessing;
}
@ -454,14 +412,14 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se
// Validate op code
if (!incomingMessage.contains("op")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::UnknownOpCode;
ret.closeCode = WebSocketCloseCode::UnknownOpCode;
ret.closeReason = "Your request is missing an `op`.";
goto skipProcessing;
}
return;
}
WebSocketProtocol::ProcessMessage(session, ret, incomingMessage["op"], incomingMessage["d"]);
ProcessMessage(session, ret, incomingMessage["op"], incomingMessage["d"]);
skipProcessing:
if (ret.closeCode != WebSocketCloseCode::DontClose) {

View File

@ -29,18 +29,6 @@ class WebSocketServer : QObject
bool isIdentified;
};
enum WebSocketOpCode {
Hello = 0,
Identify = 1,
Identified = 2,
Reidentify = 3,
Event = 5,
Request = 6,
RequestResponse = 7,
RequestBatch = 8,
RequestBatchResponse = 9,
};
enum WebSocketCloseCode {
// Internal only
DontClose = 0,
@ -72,6 +60,7 @@ class WebSocketServer : QObject
void Start();
void Stop();
void InvalidateSession(websocketpp::connection_hdl hdl);
void BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData = nullptr, uint8_t rpcVersion = 0);
bool IsListening() {
return _server.is_listening();
@ -87,14 +76,17 @@ class WebSocketServer : QObject
std::string AuthenticationSecret;
std::string AuthenticationSalt;
public Q_SLOTS:
void BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData = nullptr, uint8_t rpcVersion = 0);
signals:
void ClientConnected(const WebSocketSessionState state);
void ClientDisconnected(const WebSocketSessionState state, const uint16_t closeCode);
private:
struct ProcessResult {
WebSocketCloseCode closeCode = WebSocketCloseCode::DontClose;
std::string closeReason;
json result;
};
void ServerRunner();
bool onValidate(websocketpp::connection_hdl hdl);
@ -102,6 +94,9 @@ class WebSocketServer : QObject
void onClose(websocketpp::connection_hdl hdl);
void onMessage(websocketpp::connection_hdl hdl, websocketpp::server<websocketpp::config::asio>::message_ptr message);
void SetSessionParameters(SessionPtr session, WebSocketServer::ProcessResult &ret, json payloadData);
void ProcessMessage(SessionPtr session, ProcessResult &ret, const uint8_t opCode, json incomingMessage);
std::thread _serverThread;
websocketpp::server<websocketpp::config::asio> _server;
QThreadPool _threadPool;

View File

@ -1,245 +1,324 @@
#include <obs-module.h>
#include "WebSocketProtocol.h"
#include "WebSocketSession.h"
#include "requesthandler/RequestHandler.h"
#include "requesthandler/rpc/RequestStatus.h"
#include "obs-websocket.h"
#include "Config.h"
#include "plugin-macros.generated.h"
#include "utils/Crypto.h"
#include "utils/Json.h"
#include "utils/Platform.h"
bool IsSupportedRpcVersion(uint8_t requestedVersion)
{
for (auto version : WebSocketProtocol::SupportedRpcVersions) {
if (requestedVersion == version)
return true;
}
return false;
}
void SetSessionParameters(SessionPtr session, WebSocketProtocol::ProcessResult &ret, json payloadData)
{
if (payloadData.contains("ignoreInvalidMessages")) {
if (!payloadData["ignoreInvalidMessages"].is_boolean()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `ignoreInvalidMessages` is not a boolean.";
return;
}
session->SetIgnoreInvalidMessages(payloadData["ignoreInvalidMessages"]);
}
if (payloadData.contains("ignoreNonFatalRequestChecks")) {
if (!payloadData["ignoreNonFatalRequestChecks"].is_boolean()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `ignoreNonFatalRequestChecks` is not a boolean.";
return;
}
session->SetIgnoreNonFatalRequestChecks(payloadData["ignoreNonFatalRequestChecks"]);
}
if (payloadData.contains("eventSubscriptions")) {
if (!payloadData["eventSubscriptions"].is_number_unsigned()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `eventSubscriptions` is not an unsigned number.";
return;
}
session->SetEventSubscriptions(payloadData["eventSubscriptions"]);
}
}
void WebSocketProtocol::ProcessMessage(SessionPtr session, WebSocketProtocol::ProcessResult &ret, uint8_t opCode, json payloadData)
{
if (!payloadData.is_object()) {
if (payloadData.is_null()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload is missing data (`d`).";
} else {
ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your payload's data (`d`) is not an object.";
}
return;
}
// Only `Identify` is allowed when not identified
if (!session->IsIdentified() && opCode != 1) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::NotIdentified;
ret.closeReason = "You attempted to send a non-Identify message while not identified.";
return;
}
switch (opCode) {
case 1: { // Identify
std::unique_lock<std::mutex> sessionLock(session->OperationMutex);
if (session->IsIdentified()) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::AlreadyIdentified;
ret.closeReason = "You are already Identified with the obs-websocket server.";
}
return;
}
if (session->AuthenticationRequired()) {
if (!payloadData.contains("authentication")) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::AuthenticationFailed;
ret.closeReason = "Your payload's data is missing an `authentication` string, however authentication is required.";
return;
}
if (!Utils::Crypto::CheckAuthenticationString(session->Secret(), session->Challenge(), payloadData["authentication"])) {
auto conf = GetConfig();
if (conf && conf->AlertsEnabled) {
QString title = obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Title");
QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Body")).arg(QString::fromStdString(session->RemoteAddress()));
Utils::Platform::SendTrayNotification(QSystemTrayIcon::Warning, title, body);
}
ret.closeCode = WebSocketServer::WebSocketCloseCode::AuthenticationFailed;
ret.closeReason = "Authentication failed.";
return;
}
}
if (!payloadData.contains("rpcVersion")) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload's data is missing an `rpcVersion`.";
return;
}
if (!payloadData["rpcVersion"].is_number_unsigned()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `rpcVersion` is not an unsigned number.";
}
uint8_t requestedRpcVersion = payloadData["rpcVersion"];
if (!IsSupportedRpcVersion(requestedRpcVersion)) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::UnsupportedRpcVersion;
ret.closeReason = "Your requested RPC version is not supported by this server.";
return;
}
session->SetRpcVersion(requestedRpcVersion);
SetSessionParameters(session, ret, payloadData);
if (ret.closeCode != WebSocketServer::WebSocketCloseCode::DontClose) {
return;
}
session->SetIsIdentified(true);
auto conf = GetConfig();
if (conf && conf->AlertsEnabled) {
QString title = obs_module_text("OBSWebSocket.TrayNotification.Identified.Title");
QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.Identified.Body")).arg(QString::fromStdString(session->RemoteAddress()));
Utils::Platform::SendTrayNotification(QSystemTrayIcon::Information, title, body);
}
ret.result["op"] = 2;
ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion();
} return;
case 3: { // Reidentify
std::unique_lock<std::mutex> sessionLock(session->OperationMutex);
SetSessionParameters(session, ret, payloadData);
if (ret.closeCode != WebSocketServer::WebSocketCloseCode::DontClose) {
return;
}
ret.result["op"] = 2;
ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion();
} return;
case 6: { // Request
// RequestID checking has to be done here where we are able to close the connection.
if (!payloadData.contains("requestId")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload data is missing a `requestId`.";
}
return;
}
RequestHandler requestHandler;
Request request(session, payloadData["requestType"], payloadData["requestData"]);
RequestResult requestResult = requestHandler.ProcessRequest(request);
json resultPayloadData;
resultPayloadData["requestType"] = payloadData["requestType"];
resultPayloadData["requestId"] = payloadData["requestId"];
resultPayloadData["requestStatus"] = {
{"result", requestResult.StatusCode == RequestStatus::Success},
{"code", requestResult.StatusCode}
};
if (!requestResult.Comment.empty())
resultPayloadData["requestStatus"]["comment"] = requestResult.Comment;
if (requestResult.ResponseData.is_object())
resultPayloadData["responseData"] = requestResult.ResponseData;
ret.result["op"] = 7;
ret.result["d"] = resultPayloadData;
} return;
case 8: { // RequestBatch
// RequestID checking has to be done here where we are able to close the connection.
if (!payloadData.contains("requestId")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload data is missing a `requestId`.";
}
return;
}
if (!payloadData.contains("requests")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload data is missing a `requests`.";
}
return;
}
if (!payloadData["requests"].is_array()) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `requests` is not an array.";
}
return;
}
std::vector<json> requests = payloadData["requests"];
json results = json::array();
RequestHandler requestHandler;
for (auto requestJson : requests) {
Request request(session, requestJson["requestType"], requestJson["requestData"]);
RequestResult requestResult = requestHandler.ProcessRequest(request);
json result;
result["requestType"] = requestJson["requestType"];
if (requestJson.contains("requestId"))
result["requestId"] = requestJson["requestId"];
result["requestStatus"] = {
{"result", requestResult.StatusCode == RequestStatus::Success},
{"code", requestResult.StatusCode}
};
if (!requestResult.Comment.empty())
result["requestStatus"]["comment"] = requestResult.Comment;
if (requestResult.ResponseData.is_object())
result["responseData"] = requestResult.ResponseData;
results.push_back(result);
}
ret.result["op"] = 9;
ret.result["d"]["requestId"] = payloadData["requestId"];
ret.result["d"]["results"] = results;
} return;
default:
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketServer::WebSocketCloseCode::UnknownOpCode;
ret.closeReason = std::string("Unknown OpCode: %s") + std::to_string(opCode);
}
return;
}
}
#include <QtConcurrent>
#include <obs-module.h>
#include "WebSocketServer.h"
#include "requesthandler/RequestHandler.h"
#include "eventhandler/EventHandler.h"
#include "obs-websocket.h"
#include "Config.h"
#include "plugin-macros.generated.h"
#include "utils/Crypto.h"
#include "utils/Json.h"
#include "utils/Platform.h"
namespace WebSocketOpCode {
enum WebSocketOpCode: uint8_t {
Hello = 0,
Identify = 1,
Identified = 2,
Reidentify = 3,
Event = 5,
Request = 6,
RequestResponse = 7,
RequestBatch = 8,
RequestBatchResponse = 9,
};
};
bool IsSupportedRpcVersion(uint8_t requestedVersion)
{
return (requestedVersion == 1);
}
void WebSocketServer::SetSessionParameters(SessionPtr session, ProcessResult &ret, json payloadData)
{
if (payloadData.contains("ignoreInvalidMessages")) {
if (!payloadData["ignoreInvalidMessages"].is_boolean()) {
ret.closeCode = WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `ignoreInvalidMessages` is not a boolean.";
return;
}
session->SetIgnoreInvalidMessages(payloadData["ignoreInvalidMessages"]);
}
if (payloadData.contains("ignoreNonFatalRequestChecks")) {
if (!payloadData["ignoreNonFatalRequestChecks"].is_boolean()) {
ret.closeCode = WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `ignoreNonFatalRequestChecks` is not a boolean.";
return;
}
session->SetIgnoreNonFatalRequestChecks(payloadData["ignoreNonFatalRequestChecks"]);
}
if (payloadData.contains("eventSubscriptions")) {
if (!payloadData["eventSubscriptions"].is_number_unsigned()) {
ret.closeCode = WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `eventSubscriptions` is not an unsigned number.";
return;
}
session->SetEventSubscriptions(payloadData["eventSubscriptions"]);
}
}
void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::ProcessResult &ret, uint8_t opCode, json payloadData)
{
if (!payloadData.is_object()) {
if (payloadData.is_null()) {
ret.closeCode = WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload is missing data (`d`).";
} else {
ret.closeCode = WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your payload's data (`d`) is not an object.";
}
return;
}
// Only `Identify` is allowed when not identified
if (!session->IsIdentified() && opCode != 1) {
ret.closeCode = WebSocketCloseCode::NotIdentified;
ret.closeReason = "You attempted to send a non-Identify message while not identified.";
return;
}
switch (opCode) {
case WebSocketOpCode::Identify: { // Identify
std::unique_lock<std::mutex> sessionLock(session->OperationMutex);
if (session->IsIdentified()) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketCloseCode::AlreadyIdentified;
ret.closeReason = "You are already Identified with the obs-websocket server.";
}
return;
}
if (session->AuthenticationRequired()) {
if (!payloadData.contains("authentication")) {
ret.closeCode = WebSocketCloseCode::AuthenticationFailed;
ret.closeReason = "Your payload's data is missing an `authentication` string, however authentication is required.";
return;
}
if (!Utils::Crypto::CheckAuthenticationString(session->Secret(), session->Challenge(), payloadData["authentication"])) {
auto conf = GetConfig();
if (conf && conf->AlertsEnabled) {
QString title = obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Title");
QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.AuthenticationFailed.Body")).arg(QString::fromStdString(session->RemoteAddress()));
Utils::Platform::SendTrayNotification(QSystemTrayIcon::Warning, title, body);
}
ret.closeCode = WebSocketCloseCode::AuthenticationFailed;
ret.closeReason = "Authentication failed.";
return;
}
}
if (!payloadData.contains("rpcVersion")) {
ret.closeCode = WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload's data is missing an `rpcVersion`.";
return;
}
if (!payloadData["rpcVersion"].is_number_unsigned()) {
ret.closeCode = WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `rpcVersion` is not an unsigned number.";
}
uint8_t requestedRpcVersion = payloadData["rpcVersion"];
if (!IsSupportedRpcVersion(requestedRpcVersion)) {
ret.closeCode = WebSocketCloseCode::UnsupportedRpcVersion;
ret.closeReason = "Your requested RPC version is not supported by this server.";
return;
}
session->SetRpcVersion(requestedRpcVersion);
SetSessionParameters(session, ret, payloadData);
if (ret.closeCode != WebSocketCloseCode::DontClose) {
return;
}
// Increment refs for event subscriptions
auto eventHandler = GetEventHandler();
eventHandler->ProcessSubscription(session->EventSubscriptions());
// Mark session as identified
session->SetIsIdentified(true);
// Send desktop notification. TODO: Move to UI code
auto conf = GetConfig();
if (conf && conf->AlertsEnabled) {
QString title = obs_module_text("OBSWebSocket.TrayNotification.Identified.Title");
QString body = QString(obs_module_text("OBSWebSocket.TrayNotification.Identified.Body")).arg(QString::fromStdString(session->RemoteAddress()));
Utils::Platform::SendTrayNotification(QSystemTrayIcon::Information, title, body);
}
ret.result["op"] = WebSocketOpCode::Identified;
ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion();
} return;
case WebSocketOpCode::Reidentify: { // Reidentify
std::unique_lock<std::mutex> sessionLock(session->OperationMutex);
// Decrement refs for current subscriptions
auto eventHandler = GetEventHandler();
eventHandler->ProcessUnsubscription(session->EventSubscriptions());
SetSessionParameters(session, ret, payloadData);
if (ret.closeCode != WebSocketCloseCode::DontClose) {
return;
}
// Increment refs for new subscriptions
eventHandler->ProcessSubscription(session->EventSubscriptions());
ret.result["op"] = WebSocketOpCode::Identified;
ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion();
} return;
case WebSocketOpCode::Request: { // Request
// RequestID checking has to be done here where we are able to close the connection.
if (!payloadData.contains("requestId")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload data is missing a `requestId`.";
}
return;
}
RequestHandler requestHandler(session);
Request request(payloadData["requestType"], payloadData["requestData"]);
RequestResult requestResult = requestHandler.ProcessRequest(request);
json resultPayloadData;
resultPayloadData["requestType"] = payloadData["requestType"];
resultPayloadData["requestId"] = payloadData["requestId"];
resultPayloadData["requestStatus"] = {
{"result", requestResult.StatusCode == RequestStatus::Success},
{"code", requestResult.StatusCode}
};
if (!requestResult.Comment.empty())
resultPayloadData["requestStatus"]["comment"] = requestResult.Comment;
if (requestResult.ResponseData.is_object())
resultPayloadData["responseData"] = requestResult.ResponseData;
ret.result["op"] = WebSocketOpCode::RequestResponse;
ret.result["d"] = resultPayloadData;
} return;
case WebSocketOpCode::RequestBatch: { // RequestBatch
// RequestID checking has to be done here where we are able to close the connection.
if (!payloadData.contains("requestId")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload data is missing a `requestId`.";
}
return;
}
if (!payloadData.contains("requests")) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketCloseCode::MissingDataKey;
ret.closeReason = "Your payload data is missing a `requests`.";
}
return;
}
if (!payloadData["requests"].is_array()) {
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketCloseCode::InvalidDataKeyType;
ret.closeReason = "Your `requests` is not an array.";
}
return;
}
std::vector<json> requests = payloadData["requests"];
json results = json::array();
RequestHandler requestHandler(session);
for (auto requestJson : requests) {
Request request(requestJson["requestType"], requestJson["requestData"]);
RequestResult requestResult = requestHandler.ProcessRequest(request);
json result;
result["requestType"] = requestJson["requestType"];
if (requestJson.contains("requestId"))
result["requestId"] = requestJson["requestId"];
result["requestStatus"] = {
{"result", requestResult.StatusCode == RequestStatus::Success},
{"code", requestResult.StatusCode}
};
if (!requestResult.Comment.empty())
result["requestStatus"]["comment"] = requestResult.Comment;
if (requestResult.ResponseData.is_object())
result["responseData"] = requestResult.ResponseData;
results.push_back(result);
}
ret.result["op"] = WebSocketOpCode::RequestBatchResponse;
ret.result["d"]["requestId"] = payloadData["requestId"];
ret.result["d"]["results"] = results;
} return;
default:
if (!session->IgnoreInvalidMessages()) {
ret.closeCode = WebSocketCloseCode::UnknownOpCode;
ret.closeReason = std::string("Unknown OpCode: %s") + std::to_string(opCode);
}
return;
}
}
void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion)
{
if (!_server.is_listening())
return;
QtConcurrent::run(&_threadPool, [=]() {
// Populate message object
json eventMessage;
eventMessage["op"] = 5;
eventMessage["d"]["eventType"] = eventType;
eventMessage["d"]["eventIntent"] = requiredIntent;
if (eventData.is_object())
eventMessage["d"]["eventData"] = eventData;
// Initialize objects. The broadcast process only dumps the data when its needed.
std::string messageJson;
std::string messageMsgPack;
// Recurse connected sessions and send the event to suitable sessions.
std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto & it : _sessions) {
if (!it.second->IsIdentified()) {
continue;
}
if (rpcVersion && it.second->RpcVersion() != rpcVersion) {
continue;
}
if ((it.second->EventSubscriptions() & requiredIntent) != 0) {
websocketpp::lib::error_code errorCode;
switch (it.second->Encoding()) {
case WebSocketEncoding::Json:
if (messageJson.empty()) {
messageJson = eventMessage.dump();
}
_server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text, errorCode);
it.second->IncrementOutgoingMessages();
break;
case WebSocketEncoding::MsgPack:
if (messageMsgPack.empty()) {
auto msgPackData = json::to_msgpack(eventMessage);
messageMsgPack = std::string(msgPackData.begin(), msgPackData.end());
}
_server.send((websocketpp::connection_hdl)it.first, messageMsgPack, websocketpp::frame::opcode::binary, errorCode);
it.second->IncrementOutgoingMessages();
break;
}
if (errorCode)
blog(LOG_ERROR, "[WebSocketServer::BroadcastEvent] Error sending event message: %s", errorCode.message().c_str());
}
}
lock.unlock();
if (_debugEnabled && (EventSubscription::All & requiredIntent) != 0) // Don't log high volume events
blog(LOG_INFO, "[WebSocketServer::BroadcastEvent] Outgoing event:\n%s", eventMessage.dump(2).c_str());
});
}

View File

@ -1,9 +1,12 @@
#include "EventHandler.h"
#include "../plugin-macros.generated.h"
EventHandler::EventHandler(WebSocketServerPtr webSocketServer) :
_webSocketServer(webSocketServer),
_obsLoaded(false)
EventHandler::EventHandler() :
_obsLoaded(false),
_inputVolumeMetersRef(0),
_inputActiveStateChangedRef(0),
_inputShowStateChangedRef(0),
_sceneItemTransformChangedRef(0)
{
blog(LOG_INFO, "[EventHandler::EventHandler] Setting up event handlers...");
@ -41,6 +44,46 @@ EventHandler::~EventHandler()
blog(LOG_INFO, "[EventHandler::~EventHandler] Finished.");
}
void EventHandler::SetBroadcastCallback(EventHandler::BroadcastCallback cb)
{
_broadcastCallback = cb;
}
// Function to increment refcounts for high volume event subscriptions
void EventHandler::ProcessSubscription(uint64_t eventSubscriptions)
{
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0)
_inputVolumeMetersRef++;
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
_inputActiveStateChangedRef++;
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
_inputShowStateChangedRef++;
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
_sceneItemTransformChangedRef++;
}
// Function to decrement refcounts for high volume event subscriptions
void EventHandler::ProcessUnsubscription(uint64_t eventSubscriptions)
{
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0)
_inputVolumeMetersRef--;
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
_inputActiveStateChangedRef--;
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
_inputShowStateChangedRef--;
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
_sceneItemTransformChangedRef--;
}
// Function required in order to use default arguments
void EventHandler::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion)
{
if (!_broadcastCallback)
return;
_broadcastCallback(requiredIntent, eventType, eventData, rpcVersion);
}
void EventHandler::ConnectSourceSignals(obs_source_t *source) // Applies to inputs and scenes
{
if (!source || obs_source_removed(source))

View File

@ -1,12 +1,12 @@
#pragma once
#include <atomic>
#include <obs.hpp>
#include <obs-frontend-api.h>
#include <util/platform.h>
#include "types/EventSubscription.h"
#include "../obs-websocket.h"
#include "../WebSocketServer.h"
#include "../utils/Obs.h"
template <typename T> T* GetCalldataPointer(const calldata_t *data, const char* name) {
@ -18,17 +18,30 @@ template <typename T> T* GetCalldataPointer(const calldata_t *data, const char*
class EventHandler
{
public:
EventHandler(WebSocketServerPtr webSocketServer);
EventHandler();
~EventHandler();
typedef std::function<void(uint64_t, std::string, json, uint8_t)> BroadcastCallback;
void SetBroadcastCallback(BroadcastCallback cb);
void ProcessSubscription(uint64_t eventSubscriptions);
void ProcessUnsubscription(uint64_t eventSubscriptions);
private:
WebSocketServerPtr _webSocketServer;
BroadcastCallback _broadcastCallback;
std::atomic<bool> _obsLoaded;
std::atomic<uint64_t> _inputVolumeMetersRef;
std::atomic<uint64_t> _inputActiveStateChangedRef;
std::atomic<uint64_t> _inputShowStateChangedRef;
std::atomic<uint64_t> _sceneItemTransformChangedRef;
void ConnectSourceSignals(obs_source_t *source);
void DisconnectSourceSignals(obs_source_t *source);
void BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData = nullptr, uint8_t rpcVersion = 1);
// Signal handler: frontend
static void OnFrontendEvent(enum obs_frontend_event event, void *private_data);

View File

@ -5,26 +5,26 @@ void EventHandler::HandleCurrentSceneCollectionChanged()
{
json eventData;
eventData["sceneCollectionName"] = Utils::Obs::StringHelper::GetCurrentSceneCollection();
_webSocketServer->BroadcastEvent(EventSubscription::Config, "CurrentSceneCollectionChanged", eventData);
BroadcastEvent(EventSubscription::Config, "CurrentSceneCollectionChanged", eventData);
}
void EventHandler::HandleSceneCollectionListChanged()
{
json eventData;
eventData["sceneCollections"] = Utils::Obs::ListHelper::GetSceneCollectionList();
_webSocketServer->BroadcastEvent(EventSubscription::Config, "SceneCollectionListChanged", eventData);
BroadcastEvent(EventSubscription::Config, "SceneCollectionListChanged", eventData);
}
void EventHandler::HandleCurrentProfileChanged()
{
json eventData;
eventData["profileName"] = Utils::Obs::StringHelper::GetCurrentProfile();
_webSocketServer->BroadcastEvent(EventSubscription::Config, "CurrentProfileChanged", eventData);
BroadcastEvent(EventSubscription::Config, "CurrentProfileChanged", eventData);
}
void EventHandler::HandleProfileListChanged()
{
json eventData;
eventData["profiles"] = Utils::Obs::ListHelper::GetProfileList();
_webSocketServer->BroadcastEvent(EventSubscription::Config, "ProfileListChanged", eventData);
BroadcastEvent(EventSubscription::Config, "ProfileListChanged", eventData);
}

View File

@ -3,12 +3,12 @@
void EventHandler::HandleExitStarted()
{
_webSocketServer->BroadcastEvent(EventSubscription::General, "ExitStarted");
BroadcastEvent(EventSubscription::General, "ExitStarted");
}
void EventHandler::HandleStudioModeStateChanged(bool enabled)
{
json eventData;
eventData["studioModeEnabled"] = enabled;
_webSocketServer->BroadcastEvent(EventSubscription::General, "StudioModeStateChanged", eventData);
BroadcastEvent(EventSubscription::General, "StudioModeStateChanged", eventData);
}

View File

@ -13,14 +13,14 @@ void EventHandler::HandleInputCreated(obs_source_t *source)
eventData["unversionedInputKind"] = obs_source_get_unversioned_id(source);
eventData["inputSettings"] = Utils::Json::ObsDataToJson(inputSettings);
eventData["defaultInputSettings"] = Utils::Json::ObsDataToJson(defaultInputSettings, true);
_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputCreated", eventData);
BroadcastEvent(EventSubscription::Inputs, "InputCreated", eventData);
}
void EventHandler::HandleInputRemoved(obs_source_t *source)
{
json eventData;
eventData["inputName"] = obs_source_get_name(source);
_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputRemoved", eventData);
BroadcastEvent(EventSubscription::Inputs, "InputRemoved", eventData);
}
void EventHandler::HandleInputNameChanged(obs_source_t *source, std::string oldInputName, std::string inputName)
@ -28,13 +28,16 @@ void EventHandler::HandleInputNameChanged(obs_source_t *source, std::string oldI
json eventData;
eventData["oldInputName"] = oldInputName;
eventData["inputName"] = inputName;
_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputNameChanged", eventData);
BroadcastEvent(EventSubscription::Inputs, "InputNameChanged", eventData);
}
void EventHandler::HandleInputActiveStateChanged(void *param, calldata_t *data)
{
auto eventHandler = reinterpret_cast<EventHandler*>(param);
if (!eventHandler->_inputActiveStateChangedRef.load())
return;
obs_source_t *source = GetCalldataPointer<obs_source_t>(data, "source");
if (!source)
return;
@ -45,13 +48,16 @@ void EventHandler::HandleInputActiveStateChanged(void *param, calldata_t *data)
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["videoActive"] = obs_source_active(source);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::InputActiveStateChanged, "InputActiveStateChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::InputActiveStateChanged, "InputActiveStateChanged", eventData);
}
void EventHandler::HandleInputShowStateChanged(void *param, calldata_t *data)
{
auto eventHandler = reinterpret_cast<EventHandler*>(param);
if (!eventHandler->_inputShowStateChangedRef.load())
return;
obs_source_t *source = GetCalldataPointer<obs_source_t>(data, "source");
if (!source)
return;
@ -62,7 +68,7 @@ void EventHandler::HandleInputShowStateChanged(void *param, calldata_t *data)
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["videoShowing"] = obs_source_showing(source);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::InputShowStateChanged, "InputShowStateChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::InputShowStateChanged, "InputShowStateChanged", eventData);
}
void EventHandler::HandleInputMuteStateChanged(void *param, calldata_t *data)
@ -79,7 +85,7 @@ void EventHandler::HandleInputMuteStateChanged(void *param, calldata_t *data)
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["inputMuted"] = obs_source_muted(source);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputMuteStateChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputMuteStateChanged", eventData);
}
void EventHandler::HandleInputVolumeChanged(void *param, calldata_t *data)
@ -104,7 +110,7 @@ void EventHandler::HandleInputVolumeChanged(void *param, calldata_t *data)
eventData["inputName"] = obs_source_get_name(source);
eventData["inputVolumeMul"] = inputVolumeMul;
eventData["inputVolumeDb"] = inputVolumeDb;
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputVolumeChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputVolumeChanged", eventData);
}
void EventHandler::HandleInputAudioSyncOffsetChanged(void *param, calldata_t *data)
@ -123,7 +129,7 @@ void EventHandler::HandleInputAudioSyncOffsetChanged(void *param, calldata_t *da
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["inputAudioSyncOffset"] = inputAudioSyncOffset / 1000000;
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputAudioSyncOffsetChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputAudioSyncOffsetChanged", eventData);
}
void EventHandler::HandleInputAudioTracksChanged(void *param, calldata_t *data)
@ -147,7 +153,7 @@ void EventHandler::HandleInputAudioTracksChanged(void *param, calldata_t *data)
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["inputAudioTracks"] = inputAudioTracks;
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputAudioTracksChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputAudioTracksChanged", eventData);
}
void EventHandler::HandleInputAudioMonitorTypeChanged(void *param, calldata_t *data)
@ -180,5 +186,5 @@ void EventHandler::HandleInputAudioMonitorTypeChanged(void *param, calldata_t *d
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["monitorType"] = monitorTypeString;
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::Inputs, "InputAudioMonitorTypeChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::Inputs, "InputAudioMonitorTypeChanged", eventData);
}

View File

@ -112,7 +112,7 @@ void EventHandler::HandleMediaInputPlaybackStarted(void *param, calldata_t *data
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackStarted", eventData);
eventHandler->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackStarted", eventData);
}
void EventHandler::HandleMediaInputPlaybackEnded(void *param, calldata_t *data)
@ -128,7 +128,7 @@ void EventHandler::HandleMediaInputPlaybackEnded(void *param, calldata_t *data)
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackEnded", eventData);
eventHandler->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputPlaybackEnded", eventData);
}
void EventHandler::HandleMediaInputActionTriggered(obs_source_t *source, ObsMediaInputAction action)
@ -136,5 +136,5 @@ void EventHandler::HandleMediaInputActionTriggered(obs_source_t *source, ObsMedi
json eventData;
eventData["inputName"] = obs_source_get_name(source);
eventData["mediaAction"] = GetMediaInputActionString(action);
_webSocketServer->BroadcastEvent(EventSubscription::MediaInputs, "MediaInputActionTriggered", eventData);
BroadcastEvent(EventSubscription::MediaInputs, "MediaInputActionTriggered", eventData);
}

View File

@ -35,7 +35,7 @@ void EventHandler::HandleStreamStateChanged(ObsOutputState state)
json eventData;
eventData["outputActive"] = GetOutputStateActive(state);
eventData["outputState"] = GetOutputStateString(state);
_webSocketServer->BroadcastEvent(EventSubscription::Outputs, "StreamStateChanged", eventData);
BroadcastEvent(EventSubscription::Outputs, "StreamStateChanged", eventData);
}
void EventHandler::HandleRecordStateChanged(ObsOutputState state)
@ -43,7 +43,7 @@ void EventHandler::HandleRecordStateChanged(ObsOutputState state)
json eventData;
eventData["outputActive"] = GetOutputStateActive(state);
eventData["outputState"] = GetOutputStateString(state);
_webSocketServer->BroadcastEvent(EventSubscription::Outputs, "RecordStateChanged", eventData);
BroadcastEvent(EventSubscription::Outputs, "RecordStateChanged", eventData);
}
void EventHandler::HandleReplayBufferStateChanged(ObsOutputState state)
@ -51,7 +51,7 @@ void EventHandler::HandleReplayBufferStateChanged(ObsOutputState state)
json eventData;
eventData["outputActive"] = GetOutputStateActive(state);
eventData["outputState"] = GetOutputStateString(state);
_webSocketServer->BroadcastEvent(EventSubscription::Outputs, "ReplayBufferStateChanged", eventData);
BroadcastEvent(EventSubscription::Outputs, "ReplayBufferStateChanged", eventData);
}
void EventHandler::HandleVirtualcamStateChanged(ObsOutputState state)
@ -59,12 +59,12 @@ void EventHandler::HandleVirtualcamStateChanged(ObsOutputState state)
json eventData;
eventData["outputActive"] = GetOutputStateActive(state);
eventData["outputState"] = GetOutputStateString(state);
_webSocketServer->BroadcastEvent(EventSubscription::Outputs, "VirtualcamStateChanged", eventData);
BroadcastEvent(EventSubscription::Outputs, "VirtualcamStateChanged", eventData);
}
void EventHandler::HandleReplayBufferSaved()
{
json eventData;
eventData["savedReplayPath"] = Utils::Obs::StringHelper::GetLastReplayBufferFilePath();
_webSocketServer->BroadcastEvent(EventSubscription::Outputs, "ReplayBufferSaved", eventData);
BroadcastEvent(EventSubscription::Outputs, "ReplayBufferSaved", eventData);
}

View File

@ -18,7 +18,7 @@ void EventHandler::HandleSceneItemCreated(void *param, calldata_t *data)
eventData["inputName"] = obs_source_get_name(obs_sceneitem_get_source(sceneItem));
eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem);
eventData["sceneItemIndex"] = obs_sceneitem_get_order_position(sceneItem);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemCreated", eventData);
eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemCreated", eventData);
}
// Will not be emitted if an item is removed due to the parent scene being removed.
@ -39,7 +39,7 @@ void EventHandler::HandleSceneItemRemoved(void *param, calldata_t *data)
eventData["inputName"] = obs_source_get_name(obs_sceneitem_get_source(sceneItem));
eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem);
eventData["sceneItemIndex"] = obs_sceneitem_get_order_position(sceneItem);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemRemoved", eventData);
eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemRemoved", eventData);
}
void EventHandler::HandleSceneItemListReindexed(void *param, calldata_t *data)
@ -53,7 +53,7 @@ void EventHandler::HandleSceneItemListReindexed(void *param, calldata_t *data)
json eventData;
eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene));
eventData["sceneItems"] = Utils::Obs::ListHelper::GetSceneItemList(scene, true);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemListReindexed", eventData);
eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemListReindexed", eventData);
}
void EventHandler::HandleSceneItemEnableStateChanged(void *param, calldata_t *data)
@ -74,7 +74,7 @@ void EventHandler::HandleSceneItemEnableStateChanged(void *param, calldata_t *da
eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene));
eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem);
eventData["sceneItemEnabled"] = sceneItemEnabled;
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemEnableStateChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemEnableStateChanged", eventData);
}
void EventHandler::HandleSceneItemLockStateChanged(void *param, calldata_t *data)
@ -95,13 +95,16 @@ void EventHandler::HandleSceneItemLockStateChanged(void *param, calldata_t *data
eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene));
eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem);
eventData["sceneItemLocked"] = sceneItemLocked;
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemLockStateChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::SceneItems, "SceneItemLockStateChanged", eventData);
}
void EventHandler::HandleSceneItemTransformChanged(void *param, calldata_t *data)
{
auto eventHandler = reinterpret_cast<EventHandler*>(param);
if (!eventHandler->_sceneItemTransformChangedRef.load())
return;
obs_scene_t *scene = GetCalldataPointer<obs_scene_t>(data, "scene");
if (!scene)
return;
@ -114,5 +117,5 @@ void EventHandler::HandleSceneItemTransformChanged(void *param, calldata_t *data
eventData["sceneName"] = obs_source_get_name(obs_scene_get_source(scene));
eventData["sceneItemId"] = obs_sceneitem_get_id(sceneItem);
eventData["sceneItemTransform"] = Utils::Obs::DataHelper::GetSceneItemTransform(sceneItem);
eventHandler->_webSocketServer->BroadcastEvent(EventSubscription::SceneItems, "SceneItemTransformChanged", eventData);
eventHandler->BroadcastEvent(EventSubscription::SceneItemTransformChanged, "SceneItemTransformChanged", eventData);
}

View File

@ -6,7 +6,7 @@ void EventHandler::HandleSceneCreated(obs_source_t *source)
json eventData;
eventData["sceneName"] = obs_source_get_name(source);
eventData["isGroup"] = obs_source_is_group(source);
_webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneCreated", eventData);
BroadcastEvent(EventSubscription::Scenes, "SceneCreated", eventData);
}
void EventHandler::HandleSceneRemoved(obs_source_t *source)
@ -14,7 +14,7 @@ void EventHandler::HandleSceneRemoved(obs_source_t *source)
json eventData;
eventData["sceneName"] = obs_source_get_name(source);
eventData["isGroup"] = obs_source_is_group(source);
_webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneRemoved", eventData);
BroadcastEvent(EventSubscription::Scenes, "SceneRemoved", eventData);
}
void EventHandler::HandleSceneNameChanged(obs_source_t *source, std::string oldSceneName, std::string sceneName)
@ -22,7 +22,7 @@ void EventHandler::HandleSceneNameChanged(obs_source_t *source, std::string oldS
json eventData;
eventData["oldSceneName"] = oldSceneName;
eventData["sceneName"] = sceneName;
_webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneNameChanged", eventData);
BroadcastEvent(EventSubscription::Scenes, "SceneNameChanged", eventData);
}
void EventHandler::HandleCurrentSceneChanged()
@ -31,7 +31,7 @@ void EventHandler::HandleCurrentSceneChanged()
json eventData;
eventData["sceneName"] = obs_source_get_name(currentScene);
_webSocketServer->BroadcastEvent(EventSubscription::Scenes, "CurrentSceneChanged", eventData);
BroadcastEvent(EventSubscription::Scenes, "CurrentSceneChanged", eventData);
}
void EventHandler::HandleCurrentPreviewSceneChanged()
@ -44,12 +44,12 @@ void EventHandler::HandleCurrentPreviewSceneChanged()
json eventData;
eventData["sceneName"] = obs_source_get_name(currentPreviewScene);
_webSocketServer->BroadcastEvent(EventSubscription::Scenes, "CurrentPreviewSceneChanged", eventData);
BroadcastEvent(EventSubscription::Scenes, "CurrentPreviewSceneChanged", eventData);
}
void EventHandler::HandleSceneListChanged()
{
json eventData;
eventData["scenes"] = Utils::Obs::ListHelper::GetSceneList();
_webSocketServer->BroadcastEvent(EventSubscription::Scenes, "SceneListChanged", eventData);
BroadcastEvent(EventSubscription::Scenes, "SceneListChanged", eventData);
}

View File

@ -7,14 +7,14 @@ void EventHandler::HandleTransitionCreated(obs_source_t *source)
eventData["transitionName"] = obs_source_get_name(source);
eventData["transitionKind"] = obs_source_get_id(source);
eventData["transitionFixed"] = obs_transition_fixed(source);
_webSocketServer->BroadcastEvent(EventSubscription::Transitions, "TransitionCreated", eventData);
BroadcastEvent(EventSubscription::Transitions, "TransitionCreated", eventData);
}
void EventHandler::HandleTransitionRemoved(obs_source_t *source)
{
json eventData;
eventData["transitionName"] = obs_source_get_name(source);
_webSocketServer->BroadcastEvent(EventSubscription::Transitions, "TransitionRemoved", eventData);
BroadcastEvent(EventSubscription::Transitions, "TransitionRemoved", eventData);
}
void EventHandler::HandleTransitionNameChanged(obs_source_t *source, std::string oldTransitionName, std::string transitionName)
@ -22,5 +22,5 @@ void EventHandler::HandleTransitionNameChanged(obs_source_t *source, std::string
json eventData;
eventData["oldTransitionName"] = oldTransitionName;
eventData["transitionName"] = transitionName;
_webSocketServer->BroadcastEvent(EventSubscription::Transitions, "TransitionNameChanged", eventData);
BroadcastEvent(EventSubscription::Transitions, "TransitionNameChanged", eventData);
}

View File

@ -30,5 +30,7 @@ namespace EventSubscription {
InputActiveStateChanged = (1 << 10),
// InputShowStateChanged event (high-volume)
InputShowStateChanged = (1 << 11),
// SceneItemTransformChanged event (high-volume)
SceneItemTransformChanged = (1 << 12),
};
};

View File

@ -44,9 +44,10 @@ bool obs_module_load(void)
_config = ConfigPtr(new Config());
_config->Load();
_webSocketServer = WebSocketServerPtr(new WebSocketServer());
// Initialize event handler before server, as the server configures the event handler.
_eventHandler = EventHandlerPtr(new EventHandler());
_eventHandler = EventHandlerPtr(new EventHandler(_webSocketServer));
_webSocketServer = WebSocketServerPtr(new WebSocketServer());
obs_frontend_push_ui_translation(obs_module_get_string);
QMainWindow* mainWindow = reinterpret_cast<QMainWindow*>(obs_frontend_get_main_window());

View File

@ -86,6 +86,11 @@ const std::map<std::string, RequestMethodHandler> RequestHandler::_handlerMap
{"StopStream", &RequestHandler::StopStream},
};
RequestHandler::RequestHandler(SessionPtr session) :
_session(session)
{
}
RequestResult RequestHandler::ProcessRequest(const Request& request)
{
if (!request.RequestData.is_object() && !request.RequestData.is_null())

View File

@ -4,8 +4,10 @@
#include <obs.hpp>
#include <obs-frontend-api.h>
#include "rpc/RequestStatus.h"
#include "rpc/Request.h"
#include "rpc/RequestResult.h"
#include "../WebSocketSession.h"
#include "../obs-websocket.h"
#include "../utils/Obs.h"
#include "../plugin-macros.generated.h"
@ -15,6 +17,8 @@ typedef RequestResult(RequestHandler::*RequestMethodHandler)(const Request&);
class RequestHandler {
public:
RequestHandler(SessionPtr session);
RequestResult ProcessRequest(const Request& request);
std::vector<std::string> GetRequestList();
@ -101,5 +105,6 @@ class RequestHandler {
RequestResult StartStream(const Request&);
RequestResult StopStream(const Request&);
SessionPtr _session;
static const std::map<std::string, RequestMethodHandler> _handlerMap;
};

View File

@ -43,8 +43,8 @@ RequestResult RequestHandler::GetStats(const Request& request)
{
json responseData = Utils::Obs::DataHelper::GetStats();
responseData["webSocketSessionIncomingMessages"] = request.Session->IncomingMessages();
responseData["webSocketSessionOutgoingMessages"] = request.Session->OutgoingMessages();
responseData["webSocketSessionIncomingMessages"] = _session->IncomingMessages();
responseData["webSocketSessionOutgoingMessages"] = _session->OutgoingMessages();
return RequestResult::Success(responseData);
}

View File

@ -10,11 +10,8 @@ json GetDefaultJsonObject(json requestData)
return requestData;
}
Request::Request(SessionPtr session, std::string requestType, json requestData) :
Session(session),
Request::Request(std::string requestType, json requestData) :
HasRequestData(requestData.is_object()),
RpcVersion(session->RpcVersion()),
IgnoreNonFatalRequestChecks(session->IgnoreNonFatalRequestChecks()),
RequestType(requestType),
RequestData(GetDefaultJsonObject(requestData))
{

View File

@ -1,7 +1,6 @@
#pragma once
#include "RequestStatus.h"
#include "../../WebSocketSession.h"
#include "../../utils/Json.h"
enum ObsWebSocketSceneFilter {
@ -12,7 +11,7 @@ enum ObsWebSocketSceneFilter {
struct Request
{
Request(SessionPtr session, const std::string requestType, const json requestData = nullptr);
Request(const std::string requestType, const json requestData = nullptr);
// Contains the key and is not null
const bool Contains(const std::string keyName) const;
@ -35,10 +34,7 @@ struct Request
obs_source_t *ValidateInput(const std::string keyName, RequestStatus::RequestStatus &statusCode, std::string &comment) const;
obs_sceneitem_t *ValidateSceneItem(const std::string sceneKeyName, const std::string sceneItemIdKeyName, RequestStatus::RequestStatus &statusCode, std::string &comment, const ObsWebSocketSceneFilter filter = OBS_WEBSOCKET_SCENE_FILTER_SCENE_ONLY) const;
SessionPtr Session;
const bool HasRequestData;
const uint8_t RpcVersion;
const bool IgnoreNonFatalRequestChecks;
const std::string RequestType;
const json RequestData;
};
};