base: Use Unix EOL

This commit is contained in:
tt2468 2021-04-29 21:13:34 -07:00
parent 904e866a07
commit e151a9a8db
10 changed files with 614 additions and 614 deletions

View File

@ -15,4 +15,4 @@ namespace WebSocketProtocol {
}; };
ProcessResult ProcessMessage(websocketpp::connection_hdl hdl, WebSocketSession *session, json incomingMessage); ProcessResult ProcessMessage(websocketpp::connection_hdl hdl, WebSocketSession *session, json incomingMessage);
} }

View File

@ -1,404 +1,404 @@
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include <QtConcurrent> #include <QtConcurrent>
#include <QDateTime> #include <QDateTime>
#include <QTime> #include <QTime>
#include "WebSocketServer.h" #include "WebSocketServer.h"
#include "WebSocketProtocol.h" #include "WebSocketProtocol.h"
#include "obs-websocket.h" #include "obs-websocket.h"
#include "Config.h" #include "Config.h"
#include "utils/Utils.h" #include "utils/Utils.h"
#include "plugin-macros.generated.h" #include "plugin-macros.generated.h"
WebSocketServer::WebSocketServer() : WebSocketServer::WebSocketServer() :
QObject(nullptr), QObject(nullptr),
_sessions() _sessions()
{ {
// Randomize the random number generator // Randomize the random number generator
qsrand(QTime::currentTime().msec()); qsrand(QTime::currentTime().msec());
_server.get_alog().clear_channels(websocketpp::log::alevel::all); _server.get_alog().clear_channels(websocketpp::log::alevel::all);
_server.get_elog().clear_channels(websocketpp::log::elevel::all); _server.get_elog().clear_channels(websocketpp::log::elevel::all);
_server.init_asio(); _server.init_asio();
#ifndef _WIN32 #ifndef _WIN32
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
#endif #endif
_server.set_open_handler( _server.set_open_handler(
websocketpp::lib::bind( websocketpp::lib::bind(
&WebSocketServer::onOpen, this, websocketpp::lib::placeholders::_1 &WebSocketServer::onOpen, this, websocketpp::lib::placeholders::_1
) )
); );
_server.set_close_handler( _server.set_close_handler(
websocketpp::lib::bind( websocketpp::lib::bind(
&WebSocketServer::onClose, this, websocketpp::lib::placeholders::_1 &WebSocketServer::onClose, this, websocketpp::lib::placeholders::_1
) )
); );
_server.set_message_handler( _server.set_message_handler(
websocketpp::lib::bind( websocketpp::lib::bind(
&WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2 &WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2
) )
); );
} }
WebSocketServer::~WebSocketServer() WebSocketServer::~WebSocketServer()
{ {
if (_server.is_listening()) if (_server.is_listening())
Stop(); Stop();
} }
void WebSocketServer::ServerRunner() void WebSocketServer::ServerRunner()
{ {
blog(LOG_INFO, "[WebSocketServer::ServerRunner] IO thread started."); blog(LOG_INFO, "[WebSocketServer::ServerRunner] IO thread started.");
try { try {
_server.run(); _server.run();
} catch (websocketpp::exception const & e) { } catch (websocketpp::exception const & e) {
blog(LOG_ERROR, "[WebSocketServer::ServerRunner] websocketpp instance returned an error: %s", e.what()); blog(LOG_ERROR, "[WebSocketServer::ServerRunner] websocketpp instance returned an error: %s", e.what());
} catch (const std::exception & e) { } catch (const std::exception & e) {
blog(LOG_ERROR, "[WebSocketServer::ServerRunner] websocketpp instance returned an error: %s", e.what()); blog(LOG_ERROR, "[WebSocketServer::ServerRunner] websocketpp instance returned an error: %s", e.what());
} catch (...) { } catch (...) {
blog(LOG_ERROR, "[WebSocketServer::ServerRunner] websocketpp instance returned an error"); blog(LOG_ERROR, "[WebSocketServer::ServerRunner] websocketpp instance returned an error");
} }
blog(LOG_INFO, "[WebSocketServer::ServerRunner] IO thread exited."); blog(LOG_INFO, "[WebSocketServer::ServerRunner] IO thread exited.");
} }
void WebSocketServer::Start() void WebSocketServer::Start()
{ {
if (_server.is_listening()) { if (_server.is_listening()) {
blog(LOG_WARNING, "[WebSocketServer::Start] Call to Start() but the server is already listening."); blog(LOG_WARNING, "[WebSocketServer::Start] Call to Start() but the server is already listening.");
return; return;
} }
auto conf = GetConfig(); auto conf = GetConfig();
if (!conf) { if (!conf) {
blog(LOG_ERROR, "[WebSocketServer::Start] Unable to retreive config!"); blog(LOG_ERROR, "[WebSocketServer::Start] Unable to retreive config!");
return; return;
} }
_serverPort = conf->ServerPort; _serverPort = conf->ServerPort;
_serverPassword = conf->ServerPassword; _serverPassword = conf->ServerPassword;
_debugEnabled = conf->DebugEnabled; _debugEnabled = conf->DebugEnabled;
AuthenticationRequired = conf->AuthRequired; AuthenticationRequired = conf->AuthRequired;
AuthenticationSalt = Utils::Crypto::GenerateSalt(); AuthenticationSalt = Utils::Crypto::GenerateSalt();
AuthenticationSecret = Utils::Crypto::GenerateSecret(conf->ServerPassword.toStdString(), AuthenticationSalt); AuthenticationSecret = Utils::Crypto::GenerateSecret(conf->ServerPassword.toStdString(), AuthenticationSalt);
// Set log levels if debug is enabled // Set log levels if debug is enabled
if (_debugEnabled) { if (_debugEnabled) {
_server.get_alog().set_channels(websocketpp::log::alevel::all); _server.get_alog().set_channels(websocketpp::log::alevel::all);
_server.get_alog().clear_channels(websocketpp::log::alevel::frame_header | websocketpp::log::alevel::frame_payload | websocketpp::log::alevel::control); _server.get_alog().clear_channels(websocketpp::log::alevel::frame_header | websocketpp::log::alevel::frame_payload | websocketpp::log::alevel::control);
_server.get_elog().set_channels(websocketpp::log::elevel::all); _server.get_elog().set_channels(websocketpp::log::elevel::all);
_server.get_alog().clear_channels(websocketpp::log::elevel::devel | websocketpp::log::elevel::library); _server.get_alog().clear_channels(websocketpp::log::elevel::devel | websocketpp::log::elevel::library);
} else { } else {
_server.get_alog().clear_channels(websocketpp::log::alevel::all); _server.get_alog().clear_channels(websocketpp::log::alevel::all);
_server.get_elog().clear_channels(websocketpp::log::elevel::all); _server.get_elog().clear_channels(websocketpp::log::elevel::all);
} }
_server.reset(); _server.reset();
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
_server.listen(websocketpp::lib::asio::ip::tcp::v4(), _serverPort, errorCode); _server.listen(websocketpp::lib::asio::ip::tcp::v4(), _serverPort, errorCode);
if (errorCode) { if (errorCode) {
std::string errorCodeMessage = errorCode.message(); std::string errorCodeMessage = errorCode.message();
blog(LOG_INFO, "[WebSocketServer::Start] Listen failed: %s", errorCodeMessage.c_str()); blog(LOG_INFO, "[WebSocketServer::Start] Listen failed: %s", errorCodeMessage.c_str());
return; return;
} }
_server.start_accept(); _server.start_accept();
_serverThread = std::thread(&WebSocketServer::ServerRunner, this); _serverThread = std::thread(&WebSocketServer::ServerRunner, this);
blog(LOG_INFO, "[WebSocketServer::Start] Server started successfully on port %d. Possible connect address: %s", _serverPort, Utils::Platform::GetLocalAddress().c_str()); blog(LOG_INFO, "[WebSocketServer::Start] Server started successfully on port %d. Possible connect address: %s", _serverPort, Utils::Platform::GetLocalAddress().c_str());
} }
void WebSocketServer::Stop() void WebSocketServer::Stop()
{ {
if (!_server.is_listening()) { if (!_server.is_listening()) {
blog(LOG_WARNING, "[WebSocketServer::Stop] Call to Stop() but the server is not listening."); blog(LOG_WARNING, "[WebSocketServer::Stop] Call to Stop() but the server is not listening.");
return; return;
} }
_server.stop_listening(); _server.stop_listening();
std::unique_lock<std::mutex> lock(_sessionMutex); std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto const& [hdl, session] : _sessions) { for (auto const& [hdl, session] : _sessions) {
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
_server.pause_reading(hdl, errorCode); _server.pause_reading(hdl, errorCode);
if (errorCode) { if (errorCode) {
blog(LOG_INFO, "[WebSocketServer::Stop] Error: %s", errorCode.message().c_str()); blog(LOG_INFO, "[WebSocketServer::Stop] Error: %s", errorCode.message().c_str());
continue; continue;
} }
_server.close(hdl, websocketpp::close::status::going_away, "Server stopping.", errorCode); _server.close(hdl, websocketpp::close::status::going_away, "Server stopping.", errorCode);
if (errorCode) { if (errorCode) {
blog(LOG_INFO, "[WebSocketServer::Stop] Error: %s", errorCode.message().c_str()); blog(LOG_INFO, "[WebSocketServer::Stop] Error: %s", errorCode.message().c_str());
continue; continue;
} }
} }
lock.unlock(); lock.unlock();
_threadPool.waitForDone(); _threadPool.waitForDone();
// This can delay the thread that it is running on. Bad but kinda required. // This can delay the thread that it is running on. Bad but kinda required.
while (_sessions.size() > 0) { while (_sessions.size() > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
_serverThread.join(); _serverThread.join();
blog(LOG_INFO, "[WebSocketServer::Stop] Server stopped successfully"); blog(LOG_INFO, "[WebSocketServer::Stop] Server stopped successfully");
} }
void WebSocketServer::InvalidateSession(websocketpp::connection_hdl hdl) void WebSocketServer::InvalidateSession(websocketpp::connection_hdl hdl)
{ {
blog(LOG_INFO, "[WebSocketServer::InvalidateSession] Invalidating a session."); blog(LOG_INFO, "[WebSocketServer::InvalidateSession] Invalidating a session.");
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
_server.pause_reading(hdl, errorCode); _server.pause_reading(hdl, errorCode);
if (errorCode) { if (errorCode) {
blog(LOG_INFO, "[WebSocketServer::InvalidateSession] Error: %s", errorCode.message().c_str()); blog(LOG_INFO, "[WebSocketServer::InvalidateSession] Error: %s", errorCode.message().c_str());
return; return;
} }
_server.close(hdl, WebSocketCloseCode::SessionInvalidated, "Your session has been invalidated.", errorCode); _server.close(hdl, WebSocketCloseCode::SessionInvalidated, "Your session has been invalidated.", errorCode);
if (errorCode) { if (errorCode) {
blog(LOG_INFO, "[WebSocketServer::InvalidateSession] Error: %s", errorCode.message().c_str()); blog(LOG_INFO, "[WebSocketServer::InvalidateSession] Error: %s", errorCode.message().c_str());
return; return;
} }
} }
std::vector<WebSocketServer::WebSocketSessionState> WebSocketServer::GetWebSocketSessions() std::vector<WebSocketServer::WebSocketSessionState> WebSocketServer::GetWebSocketSessions()
{ {
std::vector<WebSocketServer::WebSocketSessionState> webSocketSessions; std::vector<WebSocketServer::WebSocketSessionState> webSocketSessions;
std::unique_lock<std::mutex> lock(_sessionMutex); std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto & [hdl, session] : _sessions) { for (auto & [hdl, session] : _sessions) {
uint64_t connectedAt = session.ConnectedAt(); uint64_t connectedAt = session.ConnectedAt();
uint64_t incomingMessages = session.IncomingMessages(); uint64_t incomingMessages = session.IncomingMessages();
uint64_t outgoingMessages = session.OutgoingMessages(); uint64_t outgoingMessages = session.OutgoingMessages();
std::string remoteAddress = session.RemoteAddress(); std::string remoteAddress = session.RemoteAddress();
webSocketSessions.emplace_back(WebSocketSessionState{hdl, remoteAddress, connectedAt, incomingMessages, outgoingMessages}); webSocketSessions.emplace_back(WebSocketSessionState{hdl, remoteAddress, connectedAt, incomingMessages, outgoingMessages});
} }
lock.unlock(); lock.unlock();
return webSocketSessions; return webSocketSessions;
} }
QString WebSocketServer::GetConnectString() QString WebSocketServer::GetConnectString()
{ {
QString ret; QString ret;
if (AuthenticationRequired) if (AuthenticationRequired)
ret = QString("obswebsocket|%1:%2|%3").arg(QString::fromStdString(Utils::Platform::GetLocalAddress())).arg(_serverPort).arg(_serverPassword); ret = QString("obswebsocket|%1:%2|%3").arg(QString::fromStdString(Utils::Platform::GetLocalAddress())).arg(_serverPort).arg(_serverPassword);
else else
ret = QString("obswebsocket|%1:%2").arg(QString::fromStdString(Utils::Platform::GetLocalAddress())).arg(_serverPort); ret = QString("obswebsocket|%1:%2").arg(QString::fromStdString(Utils::Platform::GetLocalAddress())).arg(_serverPort);
return ret; return ret;
} }
// It isn't consistent to directly call the WebSocketServer from the events system, but it would also be dumb to make it unnecessarily complicated. // It isn't consistent to directly call the WebSocketServer from the events system, but it would also be dumb to make it unnecessarily complicated.
void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData) void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData)
{ {
QtConcurrent::run(&_threadPool, [=]() { QtConcurrent::run(&_threadPool, [=]() {
// Populate message object // Populate message object
json eventMessage; json eventMessage;
eventMessage["messageType"] = "Event"; eventMessage["messageType"] = "Event";
eventMessage["eventType"] = eventType; eventMessage["eventType"] = eventType;
if (eventData.is_object()) if (eventData.is_object())
eventMessage["eventData"] = eventData; eventMessage["eventData"] = eventData;
// Initialize objects. The broadcast process only dumps the data when its needed. // Initialize objects. The broadcast process only dumps the data when its needed.
std::string messageJson; std::string messageJson;
std::string messageMsgPack; std::string messageMsgPack;
// Recurse connected sessions and send the event to suitable sessions. // Recurse connected sessions and send the event to suitable sessions.
std::unique_lock<std::mutex> lock(_sessionMutex); std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto & it : _sessions) { for (auto & it : _sessions) {
if (!it.second.IsIdentified()) if (!it.second.IsIdentified())
continue; continue;
if ((it.second.EventSubscriptions() & requiredIntent) != 0) { if ((it.second.EventSubscriptions() & requiredIntent) != 0) {
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
switch (it.second.Encoding()) { switch (it.second.Encoding()) {
case WebSocketEncoding::Json: case WebSocketEncoding::Json:
if (messageJson.empty()) { if (messageJson.empty()) {
messageJson = eventMessage.dump(); messageJson = eventMessage.dump();
} }
_server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text, errorCode); _server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text, errorCode);
it.second.IncrementOutgoingMessages(); it.second.IncrementOutgoingMessages();
break; break;
case WebSocketEncoding::MsgPack: case WebSocketEncoding::MsgPack:
if (messageMsgPack.empty()) { if (messageMsgPack.empty()) {
auto msgPackData = json::to_msgpack(eventMessage); auto msgPackData = json::to_msgpack(eventMessage);
messageMsgPack = std::string(msgPackData.begin(), msgPackData.end()); messageMsgPack = std::string(msgPackData.begin(), msgPackData.end());
} }
_server.send((websocketpp::connection_hdl)it.first, messageMsgPack, websocketpp::frame::opcode::binary, errorCode); _server.send((websocketpp::connection_hdl)it.first, messageMsgPack, websocketpp::frame::opcode::binary, errorCode);
it.second.IncrementOutgoingMessages(); it.second.IncrementOutgoingMessages();
break; break;
} }
} }
} }
lock.unlock(); lock.unlock();
if (_debugEnabled) if (_debugEnabled)
blog(LOG_INFO, "[WebSocketServer::BroadcastEvent] Outgoing event:\n%s", eventMessage.dump(2).c_str()); blog(LOG_INFO, "[WebSocketServer::BroadcastEvent] Outgoing event:\n%s", eventMessage.dump(2).c_str());
}); });
} }
void WebSocketServer::onOpen(websocketpp::connection_hdl hdl) void WebSocketServer::onOpen(websocketpp::connection_hdl hdl)
{ {
auto conn = _server.get_con_from_hdl(hdl); auto conn = _server.get_con_from_hdl(hdl);
// Build new session // Build new session
std::unique_lock<std::mutex> lock(_sessionMutex); std::unique_lock<std::mutex> lock(_sessionMutex);
auto &session = _sessions[hdl]; auto &session = _sessions[hdl];
lock.unlock(); lock.unlock();
// Configure session details // Configure session details
session.SetRemoteAddress(conn->get_remote_endpoint()); session.SetRemoteAddress(conn->get_remote_endpoint());
session.SetConnectedAt(QDateTime::currentSecsSinceEpoch()); session.SetConnectedAt(QDateTime::currentSecsSinceEpoch());
std::string contentType = conn->get_request_header("Content-Type"); std::string contentType = conn->get_request_header("Content-Type");
if (contentType == "") { if (contentType == "") {
; ;
} else if (contentType == "application/json") { } else if (contentType == "application/json") {
session.SetEncoding(WebSocketEncoding::Json); session.SetEncoding(WebSocketEncoding::Json);
} else if (contentType == "application/msgpack") { } else if (contentType == "application/msgpack") {
session.SetEncoding(WebSocketEncoding::MsgPack); session.SetEncoding(WebSocketEncoding::MsgPack);
} else { } else {
conn->close(WebSocketCloseCode::InvalidContentType, "Your HTTP `Content-Type` header specifies an invalid encoding type."); conn->close(WebSocketCloseCode::InvalidContentType, "Your HTTP `Content-Type` header specifies an invalid encoding type.");
return; return;
} }
// Build `Hello` // Build `Hello`
json helloMessage; json helloMessage;
helloMessage["messageType"] = "Hello"; helloMessage["messageType"] = "Hello";
helloMessage["obsWebSocketVersion"] = OBS_WEBSOCKET_VERSION; helloMessage["obsWebSocketVersion"] = OBS_WEBSOCKET_VERSION;
helloMessage["rpcVersion"] = OBS_WEBSOCKET_RPC_VERSION; helloMessage["rpcVersion"] = OBS_WEBSOCKET_RPC_VERSION;
// todo: Add request and event lists // todo: Add request and event lists
if (AuthenticationRequired) { if (AuthenticationRequired) {
std::string sessionChallenge = Utils::Crypto::GenerateSalt(); std::string sessionChallenge = Utils::Crypto::GenerateSalt();
session.SetChallenge(sessionChallenge); session.SetChallenge(sessionChallenge);
helloMessage["authentication"] = {}; helloMessage["authentication"] = {};
helloMessage["authentication"]["challenge"] = sessionChallenge; helloMessage["authentication"]["challenge"] = sessionChallenge;
helloMessage["authentication"]["salt"] = AuthenticationSalt; helloMessage["authentication"]["salt"] = AuthenticationSalt;
} }
// Send object to client // Send object to client
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
auto sessionEncoding = session.Encoding(); auto sessionEncoding = session.Encoding();
if (sessionEncoding == WebSocketEncoding::Json) { if (sessionEncoding == WebSocketEncoding::Json) {
std::string helloMessageJson = helloMessage.dump(); std::string helloMessageJson = helloMessage.dump();
_server.send(hdl, helloMessageJson, websocketpp::frame::opcode::text, errorCode); _server.send(hdl, helloMessageJson, websocketpp::frame::opcode::text, errorCode);
} else if (sessionEncoding == WebSocketEncoding::MsgPack) { } else if (sessionEncoding == WebSocketEncoding::MsgPack) {
auto msgPackData = json::to_msgpack(helloMessage); auto msgPackData = json::to_msgpack(helloMessage);
std::string messageMsgPack(msgPackData.begin(), msgPackData.end()); std::string messageMsgPack(msgPackData.begin(), msgPackData.end());
_server.send(hdl, messageMsgPack, websocketpp::frame::opcode::binary, errorCode); _server.send(hdl, messageMsgPack, websocketpp::frame::opcode::binary, errorCode);
} }
session.IncrementOutgoingMessages(); session.IncrementOutgoingMessages();
} }
void WebSocketServer::onClose(websocketpp::connection_hdl hdl) void WebSocketServer::onClose(websocketpp::connection_hdl hdl)
{ {
auto conn = _server.get_con_from_hdl(hdl); auto conn = _server.get_con_from_hdl(hdl);
// Get info from the session and then delete it // Get info from the session and then delete it
std::unique_lock<std::mutex> lock(_sessionMutex); std::unique_lock<std::mutex> lock(_sessionMutex);
auto &session = _sessions[hdl]; auto &session = _sessions[hdl];
bool isIdentified = session.IsIdentified(); bool isIdentified = session.IsIdentified();
uint64_t connectedAt = session.ConnectedAt(); uint64_t connectedAt = session.ConnectedAt();
uint64_t incomingMessages = session.IncomingMessages(); uint64_t incomingMessages = session.IncomingMessages();
uint64_t outgoingMessages = session.OutgoingMessages(); uint64_t outgoingMessages = session.OutgoingMessages();
std::string remoteAddress = session.RemoteAddress(); std::string remoteAddress = session.RemoteAddress();
_sessions.erase(hdl); _sessions.erase(hdl);
lock.unlock(); lock.unlock();
// Build SessionState object for signal // Build SessionState object for signal
WebSocketSessionState state; WebSocketSessionState state;
state.remoteAddress = remoteAddress; state.remoteAddress = remoteAddress;
state.connectedAt = connectedAt; state.connectedAt = connectedAt;
state.incomingMessages = incomingMessages; state.incomingMessages = incomingMessages;
state.outgoingMessages = outgoingMessages; state.outgoingMessages = outgoingMessages;
// Emit signals // Emit signals
emit ClientDisconnected(state, conn->get_local_close_code()); emit ClientDisconnected(state, conn->get_local_close_code());
if (isIdentified) if (isIdentified)
emit IdentifiedClientDisconnected(state, conn->get_local_close_code()); emit IdentifiedClientDisconnected(state, conn->get_local_close_code());
} }
void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::server<websocketpp::config::asio>::message_ptr message) void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::server<websocketpp::config::asio>::message_ptr message)
{ {
auto opcode = message->get_opcode(); auto opcode = message->get_opcode();
std::string payload = message->get_payload(); std::string payload = message->get_payload();
QtConcurrent::run(&_threadPool, [=]() { QtConcurrent::run(&_threadPool, [=]() {
std::unique_lock<std::mutex> lock(_sessionMutex); std::unique_lock<std::mutex> lock(_sessionMutex);
auto &session = _sessions[hdl]; auto &session = _sessions[hdl];
lock.unlock(); lock.unlock();
session.IncrementIncomingMessages(); session.IncrementIncomingMessages();
json incomingMessage; json incomingMessage;
// Check for invalid opcode and decode // Check for invalid opcode and decode
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
uint8_t sessionEncoding = session.Encoding(); uint8_t sessionEncoding = session.Encoding();
if (sessionEncoding == WebSocketEncoding::Json) { if (sessionEncoding == WebSocketEncoding::Json) {
if (opcode != websocketpp::frame::opcode::text) { if (opcode != websocketpp::frame::opcode::text) {
if (!session.IgnoreInvalidMessages()) if (!session.IgnoreInvalidMessages())
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to Json, but a binary message was received.", errorCode); _server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to Json, but a binary message was received.", errorCode);
return; return;
} }
try { try {
incomingMessage = json::parse(payload); incomingMessage = json::parse(payload);
} catch (json::parse_error& e) { } catch (json::parse_error& e) {
if (!session.IgnoreInvalidMessages()) if (!session.IgnoreInvalidMessages())
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode Json: ") + e.what(), errorCode); _server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode Json: ") + e.what(), errorCode);
return; return;
} }
} else if (sessionEncoding == WebSocketEncoding::MsgPack) { } else if (sessionEncoding == WebSocketEncoding::MsgPack) {
if (opcode != websocketpp::frame::opcode::binary) { if (opcode != websocketpp::frame::opcode::binary) {
if (!session.IgnoreInvalidMessages()) if (!session.IgnoreInvalidMessages())
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to MsgPack, but a text message was received.", errorCode); _server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to MsgPack, but a text message was received.", errorCode);
return; return;
} }
try { try {
incomingMessage = json::from_msgpack(payload); incomingMessage = json::from_msgpack(payload);
} catch (json::parse_error& e) { } catch (json::parse_error& e) {
if (!session.IgnoreInvalidMessages()) if (!session.IgnoreInvalidMessages())
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode MsgPack: ") + e.what(), errorCode); _server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode MsgPack: ") + e.what(), errorCode);
return; return;
} }
} }
if (_debugEnabled) if (_debugEnabled)
blog(LOG_INFO, "[WebSocketServer::onMessage] Incoming message (decoded):\n%s", incomingMessage.dump(2).c_str()); blog(LOG_INFO, "[WebSocketServer::onMessage] Incoming message (decoded):\n%s", incomingMessage.dump(2).c_str());
WebSocketProtocol::ProcessResult ret = WebSocketProtocol::ProcessMessage(hdl, &session, incomingMessage); WebSocketProtocol::ProcessResult ret = WebSocketProtocol::ProcessMessage(hdl, &session, incomingMessage);
if (ret.closeCode != WebSocketCloseCode::DontClose) { if (ret.closeCode != WebSocketCloseCode::DontClose) {
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
_server.close(hdl, ret.closeCode, ret.closeReason, errorCode); _server.close(hdl, ret.closeCode, ret.closeReason, errorCode);
return; return;
} }
if (!ret.result.is_null()) { if (!ret.result.is_null()) {
websocketpp::lib::error_code errorCode; websocketpp::lib::error_code errorCode;
if (sessionEncoding == WebSocketEncoding::Json) { if (sessionEncoding == WebSocketEncoding::Json) {
std::string helloMessageJson = ret.result.dump(); std::string helloMessageJson = ret.result.dump();
_server.send(hdl, helloMessageJson, websocketpp::frame::opcode::text, errorCode); _server.send(hdl, helloMessageJson, websocketpp::frame::opcode::text, errorCode);
} else if (sessionEncoding == WebSocketEncoding::MsgPack) { } else if (sessionEncoding == WebSocketEncoding::MsgPack) {
auto msgPackData = json::to_msgpack(ret.result); auto msgPackData = json::to_msgpack(ret.result);
std::string messageMsgPack(msgPackData.begin(), msgPackData.end()); std::string messageMsgPack(msgPackData.begin(), msgPackData.end());
_server.send(hdl, messageMsgPack, websocketpp::frame::opcode::binary, errorCode); _server.send(hdl, messageMsgPack, websocketpp::frame::opcode::binary, errorCode);
} }
session.IncrementOutgoingMessages(); session.IncrementOutgoingMessages();
if (_debugEnabled) if (_debugEnabled)
blog(LOG_INFO, "[WebSocketServer::onMessage] Outgoing message:\n%s", ret.result.dump(2).c_str()); blog(LOG_INFO, "[WebSocketServer::onMessage] Outgoing message:\n%s", ret.result.dump(2).c_str());
if (errorCode) if (errorCode)
blog(LOG_WARNING, "[WebSocketServer::onMessage] Sending message to client failed: %s", errorCode.message().c_str()); blog(LOG_WARNING, "[WebSocketServer::onMessage] Sending message to client failed: %s", errorCode.message().c_str());
} }
}); });
} }

View File

@ -1,134 +1,134 @@
#include "WebSocketSession.h" #include "WebSocketSession.h"
#include "plugin-macros.generated.h" #include "plugin-macros.generated.h"
WebSocketSession::WebSocketSession() : WebSocketSession::WebSocketSession() :
_remoteAddress(""), _remoteAddress(""),
_connectedAt(0), _connectedAt(0),
_incomingMessages(0), _incomingMessages(0),
_outgoingMessages(0), _outgoingMessages(0),
_encoding(0), _encoding(0),
_challenge(""), _challenge(""),
_rpcVersion(OBS_WEBSOCKET_RPC_VERSION), _rpcVersion(OBS_WEBSOCKET_RPC_VERSION),
_isIdentified(false), _isIdentified(false),
_ignoreInvalidMessages(false), _ignoreInvalidMessages(false),
_ignoreNonFatalRequestChecks(false), _ignoreNonFatalRequestChecks(false),
_eventSubscriptions(0) _eventSubscriptions(0)
{ {
} }
std::string WebSocketSession::RemoteAddress() std::string WebSocketSession::RemoteAddress()
{ {
std::lock_guard<std::mutex> lock(_remoteAddressMutex); std::lock_guard<std::mutex> lock(_remoteAddressMutex);
std::string ret(_remoteAddress); std::string ret(_remoteAddress);
return ret; return ret;
} }
void WebSocketSession::SetRemoteAddress(std::string address) void WebSocketSession::SetRemoteAddress(std::string address)
{ {
std::lock_guard<std::mutex> lock(_remoteAddressMutex); std::lock_guard<std::mutex> lock(_remoteAddressMutex);
_remoteAddress = address; _remoteAddress = address;
} }
uint64_t WebSocketSession::ConnectedAt() uint64_t WebSocketSession::ConnectedAt()
{ {
return _connectedAt.load(); return _connectedAt.load();
} }
void WebSocketSession::SetConnectedAt(uint64_t at) void WebSocketSession::SetConnectedAt(uint64_t at)
{ {
_connectedAt.store(at); _connectedAt.store(at);
} }
uint64_t WebSocketSession::IncomingMessages() uint64_t WebSocketSession::IncomingMessages()
{ {
return _incomingMessages.load(); return _incomingMessages.load();
} }
void WebSocketSession::IncrementIncomingMessages() void WebSocketSession::IncrementIncomingMessages()
{ {
_incomingMessages++; _incomingMessages++;
} }
uint64_t WebSocketSession::OutgoingMessages() uint64_t WebSocketSession::OutgoingMessages()
{ {
return _outgoingMessages.load(); return _outgoingMessages.load();
} }
void WebSocketSession::IncrementOutgoingMessages() void WebSocketSession::IncrementOutgoingMessages()
{ {
_outgoingMessages++; _outgoingMessages++;
} }
uint8_t WebSocketSession::Encoding() uint8_t WebSocketSession::Encoding()
{ {
return _encoding.load(); return _encoding.load();
} }
void WebSocketSession::SetEncoding(uint8_t encoding) void WebSocketSession::SetEncoding(uint8_t encoding)
{ {
_encoding.store(encoding); _encoding.store(encoding);
} }
std::string WebSocketSession::Challenge() std::string WebSocketSession::Challenge()
{ {
std::lock_guard<std::mutex> lock(_challengeMutex); std::lock_guard<std::mutex> lock(_challengeMutex);
std::string ret(_challenge); std::string ret(_challenge);
return ret; return ret;
} }
void WebSocketSession::SetChallenge(std::string challengeString) void WebSocketSession::SetChallenge(std::string challengeString)
{ {
std::lock_guard<std::mutex> lock(_challengeMutex); std::lock_guard<std::mutex> lock(_challengeMutex);
_challenge = challengeString; _challenge = challengeString;
} }
uint8_t WebSocketSession::RpcVersion() uint8_t WebSocketSession::RpcVersion()
{ {
return _rpcVersion.load(); return _rpcVersion.load();
} }
void WebSocketSession::SetRpcVersion(uint8_t version) void WebSocketSession::SetRpcVersion(uint8_t version)
{ {
_rpcVersion.store(version); _rpcVersion.store(version);
} }
bool WebSocketSession::IsIdentified() bool WebSocketSession::IsIdentified()
{ {
return _isIdentified.load(); return _isIdentified.load();
} }
void WebSocketSession::SetIsIdentified(bool identified) void WebSocketSession::SetIsIdentified(bool identified)
{ {
_isIdentified.store(identified); _isIdentified.store(identified);
} }
bool WebSocketSession::IgnoreInvalidMessages() bool WebSocketSession::IgnoreInvalidMessages()
{ {
return _ignoreInvalidMessages.load(); return _ignoreInvalidMessages.load();
} }
void WebSocketSession::SetIgnoreInvalidMessages(bool ignore) void WebSocketSession::SetIgnoreInvalidMessages(bool ignore)
{ {
_ignoreInvalidMessages.store(ignore); _ignoreInvalidMessages.store(ignore);
} }
bool WebSocketSession::IgnoreNonFatalRequestChecks() bool WebSocketSession::IgnoreNonFatalRequestChecks()
{ {
return _ignoreNonFatalRequestChecks.load(); return _ignoreNonFatalRequestChecks.load();
} }
void WebSocketSession::SetIgnoreNonFatalRequestChecks(bool ignore) void WebSocketSession::SetIgnoreNonFatalRequestChecks(bool ignore)
{ {
_ignoreNonFatalRequestChecks.store(ignore); _ignoreNonFatalRequestChecks.store(ignore);
} }
uint64_t WebSocketSession::EventSubscriptions() uint64_t WebSocketSession::EventSubscriptions()
{ {
return _eventSubscriptions.load(); return _eventSubscriptions.load();
} }
void WebSocketSession::SetEventSubscriptions(uint64_t subscriptions) void WebSocketSession::SetEventSubscriptions(uint64_t subscriptions)
{ {
_eventSubscriptions.store(subscriptions); _eventSubscriptions.store(subscriptions);
} }

View File

@ -1,59 +1,59 @@
#pragma once #pragma once
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <atomic> #include <atomic>
class WebSocketSession class WebSocketSession
{ {
public: public:
WebSocketSession(); WebSocketSession();
std::string RemoteAddress(); std::string RemoteAddress();
void SetRemoteAddress(std::string address); void SetRemoteAddress(std::string address);
uint64_t ConnectedAt(); uint64_t ConnectedAt();
void SetConnectedAt(uint64_t at); void SetConnectedAt(uint64_t at);
uint64_t IncomingMessages(); uint64_t IncomingMessages();
void IncrementIncomingMessages(); void IncrementIncomingMessages();
uint64_t OutgoingMessages(); uint64_t OutgoingMessages();
void IncrementOutgoingMessages(); void IncrementOutgoingMessages();
uint8_t Encoding(); uint8_t Encoding();
void SetEncoding(uint8_t encoding); void SetEncoding(uint8_t encoding);
std::string Challenge(); std::string Challenge();
void SetChallenge(std::string challenge); void SetChallenge(std::string challenge);
uint8_t RpcVersion(); uint8_t RpcVersion();
void SetRpcVersion(uint8_t version); void SetRpcVersion(uint8_t version);
bool IsIdentified(); bool IsIdentified();
void SetIsIdentified(bool identified); void SetIsIdentified(bool identified);
bool IgnoreInvalidMessages(); bool IgnoreInvalidMessages();
void SetIgnoreInvalidMessages(bool ignore); void SetIgnoreInvalidMessages(bool ignore);
bool IgnoreNonFatalRequestChecks(); bool IgnoreNonFatalRequestChecks();
void SetIgnoreNonFatalRequestChecks(bool ignore); void SetIgnoreNonFatalRequestChecks(bool ignore);
uint64_t EventSubscriptions(); uint64_t EventSubscriptions();
void SetEventSubscriptions(uint64_t subscriptions); void SetEventSubscriptions(uint64_t subscriptions);
private: private:
std::mutex _remoteAddressMutex; std::mutex _remoteAddressMutex;
std::string _remoteAddress; std::string _remoteAddress;
std::atomic<uint64_t> _connectedAt; std::atomic<uint64_t> _connectedAt;
std::atomic<uint64_t> _incomingMessages; std::atomic<uint64_t> _incomingMessages;
std::atomic<uint64_t> _outgoingMessages; std::atomic<uint64_t> _outgoingMessages;
std::atomic<uint8_t> _encoding; std::atomic<uint8_t> _encoding;
std::mutex _challengeMutex; std::mutex _challengeMutex;
std::string _challenge; std::string _challenge;
std::atomic<uint8_t> _rpcVersion; std::atomic<uint8_t> _rpcVersion;
std::atomic<bool> _isIdentified; std::atomic<bool> _isIdentified;
std::atomic<bool> _ignoreInvalidMessages; std::atomic<bool> _ignoreInvalidMessages;
std::atomic<bool> _ignoreNonFatalRequestChecks; std::atomic<bool> _ignoreNonFatalRequestChecks;
std::atomic<uint64_t> _eventSubscriptions; std::atomic<uint64_t> _eventSubscriptions;
}; };

View File

@ -1,3 +1,3 @@
#include "RequestHandler.h" #include "RequestHandler.h"
#include "../plugin-macros.generated.h" #include "../plugin-macros.generated.h"

View File

@ -1,3 +1,3 @@
#include "Request.h" #include "Request.h"
#include "../../plugin-macros.generated.h" #include "../../plugin-macros.generated.h"

View File

@ -1,3 +1,3 @@
#pragma once #pragma once
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>

View File

@ -1,3 +1,3 @@
#include "RequestResult.h" #include "RequestResult.h"
#include "../../plugin-macros.generated.h" #include "../../plugin-macros.generated.h"

View File

@ -1,3 +1,3 @@
#pragma once #pragma once
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>

View File

@ -54,4 +54,4 @@ QString Utils::Platform::GetCommandLineArgument(QString arg)
return ""; return "";
return parser.value(cmdlineOption); return parser.value(cmdlineOption);
} }