WebSocketServer: [BREAKING] Remove ignoreInvalidMessages identify param

This parameter is a weird one. With the abstraction of requests from
the underlying websocket protocol, there theoretically should be no
need to ignore invalid messages, because the implementation of the
low-level protocol on clients should be solid, with the requests
themselves not being fatal to the session.

As such, I consider this to be feature bloat, with lots of messy code
attributed to it.
This commit is contained in:
tt2468 2021-12-15 02:37:14 -08:00
parent 9c8f056d3e
commit 0f6ee87f99
5 changed files with 35 additions and 95 deletions

View File

@ -162,12 +162,10 @@ Authentication is not required
{ {
"rpcVersion": number, "rpcVersion": number,
"authentication": string(optional), "authentication": string(optional),
"ignoreInvalidMessages": bool(optional) = false,
"eventSubscriptions": number(optional) = (EventSubscription::All) "eventSubscriptions": number(optional) = (EventSubscription::All)
} }
``` ```
- `rpcVersion` is the version number that the client would like the obs-websocket server to use. - `rpcVersion` is the version number that the client would like the obs-websocket server to use.
- When `ignoreInvalidMessages` is true, the socket will not be closed for `WebSocketCloseCode`: `MessageDecodeError`, `UnknownOpCode`, or `MissingDataKey`. Instead, the message will be logged and ignored.
- `eventSubscriptions` is a bitmask of `EventSubscriptions` items to subscribe to events and event categories at will. By default, all event categories are subscribed, except for events marked as high volume. High volume events must be explicitly subscribed to. - `eventSubscriptions` is a bitmask of `EventSubscriptions` items to subscribe to events and event categories at will. By default, all event categories are subscribed, except for events marked as high volume. High volume events must be explicitly subscribed to.
**Example Message:** **Example Message:**
@ -217,7 +215,6 @@ Authentication is not required
**Data Keys:** **Data Keys:**
``` ```
{ {
"ignoreInvalidMessages": bool(optional) = false,
"eventSubscriptions": number(optional) = (EventSubscription::All) "eventSubscriptions": number(optional) = (EventSubscription::All)
} }
``` ```

View File

@ -398,30 +398,26 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se
uint8_t sessionEncoding = session->Encoding(); uint8_t sessionEncoding = session->Encoding();
if (sessionEncoding == WebSocketEncoding::Json) { if (sessionEncoding == WebSocketEncoding::Json) {
if (opCode != websocketpp::frame::opcode::text) { if (opCode != websocketpp::frame::opcode::text) {
if (!session->IgnoreInvalidMessages()) _server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to Json, but a binary message was received.", errorCode);
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to Json, but a binary message was received.", errorCode);
return; return;
} }
try { try {
incomingMessage = json::parse(payload); incomingMessage = json::parse(payload);
} catch (json::parse_error& e) { } catch (json::parse_error& e) {
if (!session->IgnoreInvalidMessages()) _server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode Json: ") + e.what(), errorCode);
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode Json: ") + e.what(), errorCode);
return; return;
} }
} else if (sessionEncoding == WebSocketEncoding::MsgPack) { } else if (sessionEncoding == WebSocketEncoding::MsgPack) {
if (opCode != websocketpp::frame::opcode::binary) { if (opCode != websocketpp::frame::opcode::binary) {
if (!session->IgnoreInvalidMessages()) _server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to MsgPack, but a text message was received.", errorCode);
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, "Your session encoding is set to MsgPack, but a text message was received.", errorCode);
return; return;
} }
try { try {
incomingMessage = json::from_msgpack(payload); incomingMessage = json::from_msgpack(payload);
} catch (json::parse_error& e) { } catch (json::parse_error& e) {
if (!session->IgnoreInvalidMessages()) _server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode MsgPack: ") + e.what(), errorCode);
_server.close(hdl, WebSocketCloseCode::MessageDecodeError, std::string("Unable to decode MsgPack: ") + e.what(), errorCode);
return; return;
} }
} }
@ -432,12 +428,9 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se
// Verify incoming message is an object // Verify incoming message is an object
if (!incomingMessage.is_object()) { if (!incomingMessage.is_object()) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::MessageDecodeError;
ret.closeCode = WebSocketCloseCode::MessageDecodeError; ret.closeReason = "You sent a non-object payload.";
ret.closeReason = "You sent a non-object payload."; goto skipProcessing;
goto skipProcessing;
}
return;
} }
// Disconnect client if 4.x protocol is detected // Disconnect client if 4.x protocol is detected
@ -450,12 +443,9 @@ void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::se
// Validate op code // Validate op code
if (!incomingMessage.contains("op")) { if (!incomingMessage.contains("op")) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::UnknownOpCode;
ret.closeCode = WebSocketCloseCode::UnknownOpCode; ret.closeReason = "Your request is missing an `op`.";
ret.closeReason = "Your request is missing an `op`."; goto skipProcessing;
goto skipProcessing;
}
return;
} }
ProcessMessage(session, ret, incomingMessage["op"], incomingMessage["d"]); ProcessMessage(session, ret, incomingMessage["op"], incomingMessage["d"]);

View File

@ -60,15 +60,6 @@ static json ConstructRequestResult(RequestResult requestResult, const json &requ
void WebSocketServer::SetSessionParameters(SessionPtr session, ProcessResult &ret, const json &payloadData) void WebSocketServer::SetSessionParameters(SessionPtr session, ProcessResult &ret, const json &payloadData)
{ {
if (payloadData.contains("ignoreInvalidMessages")) {
if (!payloadData["ignoreInvalidMessages"].is_boolean()) {
ret.closeCode = WebSocketCloseCode::InvalidDataFieldType;
ret.closeReason = "Your `ignoreInvalidMessages` is not a boolean.";
return;
}
session->SetIgnoreInvalidMessages(payloadData["ignoreInvalidMessages"]);
}
if (payloadData.contains("eventSubscriptions")) { if (payloadData.contains("eventSubscriptions")) {
if (!payloadData["eventSubscriptions"].is_number_unsigned()) { if (!payloadData["eventSubscriptions"].is_number_unsigned()) {
ret.closeCode = WebSocketCloseCode::InvalidDataFieldType; ret.closeCode = WebSocketCloseCode::InvalidDataFieldType;
@ -103,10 +94,8 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
case WebSocketOpCode::Identify: { // Identify case WebSocketOpCode::Identify: { // Identify
std::unique_lock<std::mutex> sessionLock(session->OperationMutex); std::unique_lock<std::mutex> sessionLock(session->OperationMutex);
if (session->IsIdentified()) { if (session->IsIdentified()) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::AlreadyIdentified;
ret.closeCode = WebSocketCloseCode::AlreadyIdentified; ret.closeReason = "You are already Identified with the obs-websocket server.";
ret.closeReason = "You are already Identified with the obs-websocket server.";
}
return; return;
} }
@ -192,10 +181,8 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
case WebSocketOpCode::Request: { // Request case WebSocketOpCode::Request: { // Request
// RequestID checking has to be done here where we are able to close the connection. // RequestID checking has to be done here where we are able to close the connection.
if (!payloadData.contains("requestId")) { if (!payloadData.contains("requestId")) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::MissingDataField;
ret.closeCode = WebSocketCloseCode::MissingDataField; ret.closeReason = "Your payload data is missing a `requestId`.";
ret.closeReason = "Your payload data is missing a `requestId`.";
}
return; return;
} }
@ -221,53 +208,42 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
case WebSocketOpCode::RequestBatch: { // RequestBatch case WebSocketOpCode::RequestBatch: { // RequestBatch
// RequestID checking has to be done here where we are able to close the connection. // RequestID checking has to be done here where we are able to close the connection.
if (!payloadData.contains("requestId")) { if (!payloadData.contains("requestId")) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::MissingDataField;
ret.closeCode = WebSocketCloseCode::MissingDataField; ret.closeReason = "Your payload data is missing a `requestId`.";
ret.closeReason = "Your payload data is missing a `requestId`.";
}
return; return;
} }
if (!payloadData.contains("requests")) { if (!payloadData.contains("requests")) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::MissingDataField;
ret.closeCode = WebSocketCloseCode::MissingDataField; ret.closeReason = "Your payload data is missing a `requests`.";
ret.closeReason = "Your payload data is missing a `requests`.";
}
return; return;
} }
if (!payloadData["requests"].is_array()) { if (!payloadData["requests"].is_array()) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::InvalidDataFieldType;
ret.closeCode = WebSocketCloseCode::InvalidDataFieldType; ret.closeReason = "Your `requests` is not an array.";
ret.closeReason = "Your `requests` is not an array.";
}
return; return;
} }
RequestBatchExecutionType::RequestBatchExecutionType executionType = RequestBatchExecutionType::SerialRealtime; RequestBatchExecutionType::RequestBatchExecutionType executionType = RequestBatchExecutionType::SerialRealtime;
if (payloadData.contains("executionType") && !payloadData["executionType"].is_null()) { if (payloadData.contains("executionType") && !payloadData["executionType"].is_null()) {
if (!payloadData["executionType"].is_number_unsigned()) { if (!payloadData["executionType"].is_number_unsigned()) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::InvalidDataFieldType;
ret.closeCode = WebSocketCloseCode::InvalidDataFieldType; ret.closeReason = "Your `executionType` is not a number.";
ret.closeReason = "Your `executionType` is not a number.";
}
return; return;
} }
uint8_t requestedExecutionType = payloadData["executionType"]; uint8_t requestedExecutionType = payloadData["executionType"];
if (!RequestBatchExecutionType::IsValid(requestedExecutionType) || requestedExecutionType == RequestBatchExecutionType::None) { if (!RequestBatchExecutionType::IsValid(requestedExecutionType) || requestedExecutionType == RequestBatchExecutionType::None) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::InvalidDataFieldValue;
ret.closeCode = WebSocketCloseCode::InvalidDataFieldValue; ret.closeReason = "Your `executionType` has an invalid value.";
ret.closeReason = "Your `executionType` has an invalid value."; return;
}
} }
// The thread pool must support 2 or more threads else parallel requests will deadlock. // The thread pool must support 2 or more threads else parallel requests will deadlock.
if (requestedExecutionType == RequestBatchExecutionType::Parallel && _threadPool.maxThreadCount() < 2) { if (requestedExecutionType == RequestBatchExecutionType::Parallel && _threadPool.maxThreadCount() < 2) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::UnsupportedFeature;
ret.closeCode = WebSocketCloseCode::UnsupportedFeature; ret.closeReason = "Parallel request batch processing is not available on this system due to limited core count.";
ret.closeReason = "Parallel request batch processing is not available on this system due to limited core count.";
}
return; return;
} }
@ -276,18 +252,14 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
if (payloadData.contains("variables") && !payloadData["variables"].is_null()) { if (payloadData.contains("variables") && !payloadData["variables"].is_null()) {
if (!payloadData.is_object()) { if (!payloadData.is_object()) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::InvalidDataFieldType;
ret.closeCode = WebSocketCloseCode::InvalidDataFieldType; ret.closeReason = "Your `variables` is not an object.";
ret.closeReason = "Your `variables` is not an object.";
}
return; return;
} }
if (executionType == RequestBatchExecutionType::Parallel) { if (executionType == RequestBatchExecutionType::Parallel) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::UnsupportedFeature;
ret.closeCode = WebSocketCloseCode::UnsupportedFeature; ret.closeReason = "Variables are not supported in Parallel mode.";
ret.closeReason = "Variables are not supported in Parallel mode.";
}
return; return;
} }
} }
@ -295,10 +267,8 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
bool haltOnFailure = false; bool haltOnFailure = false;
if (payloadData.contains("haltOnFailure") && !payloadData["haltOnFailure"].is_null()) { if (payloadData.contains("haltOnFailure") && !payloadData["haltOnFailure"].is_null()) {
if (!payloadData["haltOnFailure"].is_boolean()) { if (!payloadData["haltOnFailure"].is_boolean()) {
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::InvalidDataFieldType;
ret.closeCode = WebSocketCloseCode::InvalidDataFieldType; ret.closeReason = "Your `haltOnFailure` is not a boolean.";
ret.closeReason = "Your `haltOnFailure` is not a boolean.";
}
return; return;
} }
@ -325,10 +295,8 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
ret.result["d"]["results"] = results; ret.result["d"]["results"] = results;
} return; } return;
default: default:
if (!session->IgnoreInvalidMessages()) { ret.closeCode = WebSocketCloseCode::UnknownOpCode;
ret.closeCode = WebSocketCloseCode::UnknownOpCode; ret.closeReason = std::string("Unknown OpCode: %s") + std::to_string(opCode);
ret.closeReason = std::string("Unknown OpCode: %s") + std::to_string(opCode);
}
return; return;
} }
} }

View File

@ -29,7 +29,6 @@ WebSocketSession::WebSocketSession() :
_challenge(""), _challenge(""),
_rpcVersion(OBS_WEBSOCKET_RPC_VERSION), _rpcVersion(OBS_WEBSOCKET_RPC_VERSION),
_isIdentified(false), _isIdentified(false),
_ignoreInvalidMessages(false),
_eventSubscriptions(EventSubscription::All) _eventSubscriptions(EventSubscription::All)
{ {
} }
@ -143,16 +142,6 @@ void WebSocketSession::SetIsIdentified(bool identified)
_isIdentified.store(identified); _isIdentified.store(identified);
} }
bool WebSocketSession::IgnoreInvalidMessages()
{
return _ignoreInvalidMessages.load();
}
void WebSocketSession::SetIgnoreInvalidMessages(bool ignore)
{
_ignoreInvalidMessages.store(ignore);
}
uint64_t WebSocketSession::EventSubscriptions() uint64_t WebSocketSession::EventSubscriptions()
{ {
return _eventSubscriptions.load(); return _eventSubscriptions.load();

View File

@ -64,9 +64,6 @@ class WebSocketSession
bool IsIdentified(); bool IsIdentified();
void SetIsIdentified(bool identified); void SetIsIdentified(bool identified);
bool IgnoreInvalidMessages();
void SetIgnoreInvalidMessages(bool ignore);
uint64_t EventSubscriptions(); uint64_t EventSubscriptions();
void SetEventSubscriptions(uint64_t subscriptions); void SetEventSubscriptions(uint64_t subscriptions);
@ -86,6 +83,5 @@ class WebSocketSession
std::string _challenge; std::string _challenge;
std::atomic<uint8_t> _rpcVersion; std::atomic<uint8_t> _rpcVersion;
std::atomic<bool> _isIdentified; std::atomic<bool> _isIdentified;
std::atomic<bool> _ignoreInvalidMessages;
std::atomic<uint64_t> _eventSubscriptions; std::atomic<uint64_t> _eventSubscriptions;
}; };