From a0ffe16e911a2cd4de22ed1bd44c80e8adc5fa49 Mon Sep 17 00:00:00 2001 From: tt2468 Date: Tue, 9 May 2023 22:48:07 -0700 Subject: [PATCH] base: Pause requests and events during start, SC change, and shutdown This implements the functionality described by the new NotReady request status. Behavior should now be *much* more reliable. --- src/eventhandler/EventHandler.cpp | 93 ++++--------------- src/eventhandler/EventHandler.h | 19 ++-- src/obs-websocket.cpp | 33 +++++-- src/requesthandler/RequestBatchHandler.cpp | 10 +- src/websocketserver/WebSocketServer.cpp | 17 +--- src/websocketserver/WebSocketServer.h | 4 +- .../WebSocketServer_Protocol.cpp | 54 +++++++---- 7 files changed, 98 insertions(+), 132 deletions(-) diff --git a/src/eventhandler/EventHandler.cpp b/src/eventhandler/EventHandler.cpp index ac8e4f5c..007ea957 100644 --- a/src/eventhandler/EventHandler.cpp +++ b/src/eventhandler/EventHandler.cpp @@ -20,11 +20,6 @@ with this program. If not, see #include "EventHandler.h" EventHandler::EventHandler() - : _obsLoaded(false), - _inputVolumeMetersRef(0), - _inputActiveStateChangedRef(0), - _inputShowStateChangedRef(0), - _sceneItemTransformChangedRef(0) { blog_debug("[EventHandler::EventHandler] Setting up..."); @@ -67,9 +62,9 @@ void EventHandler::SetBroadcastCallback(EventHandler::BroadcastCallback cb) _broadcastCallback = cb; } -void EventHandler::SetObsLoadedCallback(EventHandler::ObsLoadedCallback cb) +void EventHandler::SetObsReadyCallback(EventHandler::ObsReadyCallback cb) { - _obsLoadedCallback = cb; + _obsReadyCallback = cb; } // Function to increment refcounts for high volume event subscriptions @@ -261,9 +256,6 @@ void EventHandler::OnFrontendEvent(enum obs_frontend_event event, void *private_ { auto eventHandler = static_cast(private_data); - if (!eventHandler->_obsLoaded.load() && event != OBS_FRONTEND_EVENT_FINISHED_LOADING) - return; - switch (event) { // General case OBS_FRONTEND_EVENT_FINISHED_LOADING: @@ -283,7 +275,11 @@ void EventHandler::OnFrontendEvent(enum obs_frontend_event event, void *private_ } obs_frontend_source_list_free(&transitions); } + // Before ready update to allow event to broadcast eventHandler->HandleCurrentSceneCollectionChanging(); + eventHandler->_obsReady = false; + if (eventHandler->_obsReadyCallback) + eventHandler->_obsReadyCallback(false); break; case OBS_FRONTEND_EVENT_SCENE_COLLECTION_CHANGED: { obs_frontend_source_list transitions = {}; @@ -294,6 +290,9 @@ void EventHandler::OnFrontendEvent(enum obs_frontend_event event, void *private_ } obs_frontend_source_list_free(&transitions); } + eventHandler->_obsReady = true; + if (eventHandler->_obsReadyCallback) + eventHandler->_obsReadyCallback(true); eventHandler->HandleCurrentSceneCollectionChanged(); break; case OBS_FRONTEND_EVENT_SCENE_COLLECTION_LIST_CHANGED: @@ -430,30 +429,6 @@ void EventHandler::FrontendFinishedLoadingMultiHandler() blog_debug( "[EventHandler::FrontendFinishedLoadingMultiHandler] OBS has finished loading. Connecting final handlers and enabling events..."); - // Connect source signals and enable events only after OBS has fully loaded (to reduce extra logging). - _obsLoaded.store(true); - - // In the case that plugins become hotloadable, this will have to go back into `EventHandler::EventHandler()` - // Enumerate inputs and connect each one - { - auto enumInputs = [](void *param, obs_source_t *source) { - auto eventHandler = static_cast(param); - eventHandler->ConnectSourceSignals(source); - return true; - }; - obs_enum_sources(enumInputs, this); - } - - // Enumerate scenes and connect each one - { - auto enumScenes = [](void *param, obs_source_t *source) { - auto eventHandler = static_cast(param); - eventHandler->ConnectSourceSignals(source); - return true; - }; - obs_enum_scenes(enumScenes, this); - } - // Enumerate all scene transitions and connect each one { obs_frontend_source_list transitions = {}; @@ -465,41 +440,23 @@ void EventHandler::FrontendFinishedLoadingMultiHandler() obs_frontend_source_list_free(&transitions); } - blog_debug("[EventHandler::FrontendFinishedLoadingMultiHandler] Finished."); + _obsReady = true; + if (_obsReadyCallback) + _obsReadyCallback(true); - if (_obsLoadedCallback) - _obsLoadedCallback(); + blog_debug("[EventHandler::FrontendFinishedLoadingMultiHandler] Finished."); } void EventHandler::FrontendExitMultiHandler() { - HandleExitStarted(); - blog_debug("[EventHandler::FrontendExitMultiHandler] OBS is unloading. Disabling events..."); + HandleExitStarted(); + // Disconnect source signals and disable events when OBS starts unloading (to reduce extra logging). - _obsLoaded.store(false); - - // In the case that plugins become hotloadable, this will have to go back into `EventHandler::~EventHandler()` - // Enumerate inputs and disconnect each one - { - auto enumInputs = [](void *param, obs_source_t *source) { - auto eventHandler = static_cast(param); - eventHandler->DisconnectSourceSignals(source); - return true; - }; - obs_enum_sources(enumInputs, this); - } - - // Enumerate scenes and disconnect each one - { - auto enumScenes = [](void *param, obs_source_t *source) { - auto eventHandler = static_cast(param); - eventHandler->DisconnectSourceSignals(source); - return true; - }; - obs_enum_scenes(enumScenes, this); - } + _obsReady = false; + if (_obsReadyCallback) + _obsReadyCallback(false); // Enumerate all scene transitions and disconnect each one { @@ -520,10 +477,6 @@ void EventHandler::SourceCreatedMultiHandler(void *param, calldata_t *data) { auto eventHandler = static_cast(param); - // Don't react to signals until OBS has finished loading - if (!eventHandler->_obsLoaded.load()) - return; - obs_source_t *source = GetCalldataPointer(data, "source"); if (!source) return; @@ -556,10 +509,6 @@ void EventHandler::SourceDestroyedMultiHandler(void *param, calldata_t *data) // Disconnect all signals from the source eventHandler->DisconnectSourceSignals(source); - // Don't react to signals if OBS is unloading - if (!eventHandler->_obsLoaded.load()) - return; - switch (obs_source_get_type(source)) { case OBS_SOURCE_TYPE_INPUT: // Only emit removed if the input has not already been removed. This is the case when removing the last scene item of an input. @@ -582,9 +531,6 @@ void EventHandler::SourceRemovedMultiHandler(void *param, calldata_t *data) { auto eventHandler = static_cast(param); - if (!eventHandler->_obsLoaded.load()) - return; - obs_source_t *source = GetCalldataPointer(data, "source"); if (!source) return; @@ -605,9 +551,6 @@ void EventHandler::SourceRenamedMultiHandler(void *param, calldata_t *data) { auto eventHandler = static_cast(param); - if (!eventHandler->_obsLoaded.load()) - return; - obs_source_t *source = GetCalldataPointer(data, "source"); if (!source) return; diff --git a/src/eventhandler/EventHandler.h b/src/eventhandler/EventHandler.h index 542f2965..071f8071 100644 --- a/src/eventhandler/EventHandler.h +++ b/src/eventhandler/EventHandler.h @@ -34,25 +34,26 @@ public: EventHandler(); ~EventHandler(); - typedef std::function BroadcastCallback; + typedef std::function + BroadcastCallback; // uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion void SetBroadcastCallback(BroadcastCallback cb); - typedef std::function ObsLoadedCallback; - void SetObsLoadedCallback(ObsLoadedCallback cb); + typedef std::function ObsReadyCallback; // bool ready + void SetObsReadyCallback(ObsReadyCallback cb); void ProcessSubscription(uint64_t eventSubscriptions); void ProcessUnsubscription(uint64_t eventSubscriptions); private: BroadcastCallback _broadcastCallback; - ObsLoadedCallback _obsLoadedCallback; + ObsReadyCallback _obsReadyCallback; - std::atomic _obsLoaded; + std::atomic _obsReady = false; std::unique_ptr _inputVolumeMetersHandler; - std::atomic _inputVolumeMetersRef; - std::atomic _inputActiveStateChangedRef; - std::atomic _inputShowStateChangedRef; - std::atomic _sceneItemTransformChangedRef; + std::atomic _inputVolumeMetersRef = 0; + std::atomic _inputActiveStateChangedRef = 0; + std::atomic _inputShowStateChangedRef = 0; + std::atomic _sceneItemTransformChangedRef = 0; void ConnectSourceSignals(obs_source_t *source); void DisconnectSourceSignals(obs_source_t *source); diff --git a/src/obs-websocket.cpp b/src/obs-websocket.cpp index 48d92a9e..24fdbe29 100644 --- a/src/obs-websocket.cpp +++ b/src/obs-websocket.cpp @@ -89,7 +89,24 @@ bool obs_module_load(void) return true; } -void obs_module_unload() +#ifdef PLUGIN_TESTS +void test_register_vendor(); +#endif + +void obs_module_post_load(void) +{ +#ifdef PLUGIN_TESTS + test_register_vendor(); +#endif + + // Server will accept clients, but requests and events will not be served until FINISHED_LOADING occurs + if (_config->ServerEnabled) { + blog(LOG_INFO, "[obs_module_post_load] WebSocket server is enabled, starting..."); + _webSocketServer->Start(); + } +} + +void obs_module_unload(void) { blog(LOG_INFO, "[obs_module_unload] Shutting down..."); @@ -193,18 +210,18 @@ static void test_vendor_request_cb(obs_data_t *requestData, obs_data_t *response obs_websocket_vendor_emit_event(priv_data, "TestEvent", requestData); } -void obs_module_post_load() +void test_register_vendor() { - blog(LOG_INFO, "[obs_module_post_load] Post load started."); + blog(LOG_INFO, "[test_register_vendor] Registering test vendor..."); // Test plugin API version fetch uint apiVersion = obs_websocket_get_api_version(); - blog(LOG_INFO, "[obs_module_post_load] obs-websocket plugin API version: %u", apiVersion); + blog(LOG_INFO, "[test_register_vendor] obs-websocket plugin API version: %u", apiVersion); // Test calling obs-websocket requests struct obs_websocket_request_response *response = obs_websocket_call_request("GetVersion"); if (response) { - blog(LOG_INFO, "[obs_module_post_load] Called GetVersion. Status Code: %u | Comment: %s | Response Data: %s", + blog(LOG_INFO, "[test_register_vendor] Called GetVersion. Status Code: %u | Comment: %s | Response Data: %s", response->status_code, response->comment, response->response_data); obs_websocket_request_response_free(response); } @@ -212,17 +229,17 @@ void obs_module_post_load() // Test vendor creation auto vendor = obs_websocket_register_vendor("obs-websocket-test"); if (!vendor) { - blog(LOG_WARNING, "[obs_module_post_load] Failed to create vendor!"); + blog(LOG_WARNING, "[test_register_vendor] Failed to create vendor!"); return; } // Test vendor request registration if (!obs_websocket_vendor_register_request(vendor, "TestRequest", test_vendor_request_cb, vendor)) { - blog(LOG_WARNING, "[obs_module_post_load] Failed to register vendor request!"); + blog(LOG_WARNING, "[test_register_vendor] Failed to register vendor request!"); return; } - blog(LOG_INFO, "[obs_module_post_load] Post load completed."); + blog(LOG_INFO, "[test_register_vendor] Post load completed."); } #endif diff --git a/src/requesthandler/RequestBatchHandler.cpp b/src/requesthandler/RequestBatchHandler.cpp index 8a622b78..21e28c4f 100644 --- a/src/requesthandler/RequestBatchHandler.cpp +++ b/src/requesthandler/RequestBatchHandler.cpp @@ -31,17 +31,13 @@ struct SerialFrameBatch { json &variables; bool haltOnFailure; - size_t frameCount; - size_t sleepUntilFrame; + size_t frameCount = 0; + size_t sleepUntilFrame = 0; std::mutex conditionMutex; std::condition_variable condition; SerialFrameBatch(RequestHandler &requestHandler, json &variables, bool haltOnFailure) - : requestHandler(requestHandler), - variables(variables), - haltOnFailure(haltOnFailure), - frameCount(0), - sleepUntilFrame(0) + : requestHandler(requestHandler), variables(variables), haltOnFailure(haltOnFailure) { } }; diff --git a/src/websocketserver/WebSocketServer.cpp b/src/websocketserver/WebSocketServer.cpp index 605ec307..836fca0a 100644 --- a/src/websocketserver/WebSocketServer.cpp +++ b/src/websocketserver/WebSocketServer.cpp @@ -31,7 +31,7 @@ with this program. If not, see #include "../utils/Platform.h" #include "../utils/Compat.h" -WebSocketServer::WebSocketServer() : QObject(nullptr), _sessions() +WebSocketServer::WebSocketServer() : QObject(nullptr) { _server.get_alog().clear_channels(websocketpp::log::alevel::all); _server.get_elog().clear_channels(websocketpp::log::elevel::all); @@ -52,7 +52,7 @@ WebSocketServer::WebSocketServer() : QObject(nullptr), _sessions() eventHandler->SetBroadcastCallback(std::bind(&WebSocketServer::BroadcastEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); - eventHandler->SetObsLoadedCallback(std::bind(&WebSocketServer::onObsLoaded, this)); + eventHandler->SetObsReadyCallback(std::bind(&WebSocketServer::onObsReady, this, std::placeholders::_1)); } WebSocketServer::~WebSocketServer() @@ -205,18 +205,9 @@ std::vector WebSocketServer::GetWebSocke return webSocketSessions; } -void WebSocketServer::onObsLoaded() +void WebSocketServer::onObsReady(bool ready) { - auto conf = GetConfig(); - if (!conf) { - blog(LOG_ERROR, "[WebSocketServer::onObsLoaded] Unable to retreive config!"); - return; - } - - if (conf->ServerEnabled) { - blog(LOG_INFO, "[WebSocketServer::onObsLoaded] WebSocket server is enabled, starting..."); - Start(); - } + _obsReady = ready; } bool WebSocketServer::onValidate(websocketpp::connection_hdl hdl) diff --git a/src/websocketserver/WebSocketServer.h b/src/websocketserver/WebSocketServer.h index f742d680..a0d35edd 100644 --- a/src/websocketserver/WebSocketServer.h +++ b/src/websocketserver/WebSocketServer.h @@ -77,7 +77,7 @@ private: void ServerRunner(); - void onObsLoaded(); + void onObsReady(bool loaded); bool onValidate(websocketpp::connection_hdl hdl); void onOpen(websocketpp::connection_hdl hdl); void onClose(websocketpp::connection_hdl hdl); @@ -96,4 +96,6 @@ private: std::mutex _sessionMutex; std::map> _sessions; + + std::atomic _obsReady = false; }; diff --git a/src/websocketserver/WebSocketServer_Protocol.cpp b/src/websocketserver/WebSocketServer_Protocol.cpp index eedc4625..7772d7bb 100644 --- a/src/websocketserver/WebSocketServer_Protocol.cpp +++ b/src/websocketserver/WebSocketServer_Protocol.cpp @@ -209,13 +209,17 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces return; } - RequestHandler requestHandler(session); - std::string requestType = payloadData["requestType"]; - json requestData = payloadData["requestData"]; - Request request(requestType, requestData); + RequestResult requestResult; + if (_obsReady) { + json requestData = payloadData["requestData"]; + Request request(requestType, requestData); - RequestResult requestResult = requestHandler.ProcessRequest(request); + RequestHandler requestHandler(session); + requestResult = requestHandler.ProcessRequest(request); + } else { + requestResult = RequestResult::Error(RequestStatus::NotReady, "OBS is not ready to perform the request."); + } json resultPayloadData; resultPayloadData["requestType"] = requestType; @@ -303,22 +307,34 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces } std::vector requests = payloadData["requests"]; + std::vector resultsVector; + if (_obsReady) { + std::vector requestsVector; + for (auto &requestJson : requests) { + if (!requestJson["requestType"].is_string()) + requestJson["requestType"] = + ""; // Workaround for what would otherwise be extensive additional logic for a rare edge case + std::string requestType = requestJson["requestType"]; + json requestData = requestJson["requestData"]; + json inputVariables = requestJson["inputVariables"]; + json outputVariables = requestJson["outputVariables"]; + requestsVector.emplace_back(requestType, requestData, executionType, inputVariables, + outputVariables); + } - std::vector requestsVector; - for (auto &requestJson : requests) { - if (!requestJson["requestType"].is_string()) - requestJson["requestType"] = - ""; // Workaround for what would otherwise be extensive additional logic for a rare edge case - std::string requestType = requestJson["requestType"]; - json requestData = requestJson["requestData"]; - json inputVariables = requestJson["inputVariables"]; - json outputVariables = requestJson["outputVariables"]; - requestsVector.emplace_back(requestType, requestData, executionType, inputVariables, outputVariables); + resultsVector = RequestBatchHandler::ProcessRequestBatch( + _threadPool, session, executionType, requestsVector, payloadData["variables"], haltOnFailure); + } else { + // I lowkey hate this, but whatever + if (haltOnFailure) { + resultsVector.emplace_back(RequestStatus::NotReady, "OBS is not ready to perform the request."); + } else { + for (size_t i = 0; i < requests.size(); i++) + resultsVector.emplace_back(RequestStatus::NotReady, + "OBS is not ready to perform the request."); + } } - auto resultsVector = RequestBatchHandler::ProcessRequestBatch(_threadPool, session, executionType, requestsVector, - payloadData["variables"], haltOnFailure); - size_t i = 0; std::vector results; for (auto &requestResult : resultsVector) { @@ -342,7 +358,7 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, const std::string &eventType, const json &eventData, uint8_t rpcVersion) { - if (!_server.is_listening()) + if (!_server.is_listening() || !_obsReady) return; _threadPool.start(Utils::Compat::CreateFunctionRunnable([=]() {