Base: Move request batch processing to requesthandler directory

Request batch processing had less to do with the protocol/server and
more to do with the actual request handler, so it felt better to move
it.
This commit is contained in:
tt2468 2021-12-13 19:05:50 -08:00
parent 0ed3c9b367
commit eb8d69dca5
9 changed files with 178 additions and 118 deletions

View File

@ -88,7 +88,6 @@ set(obs-websocket_SOURCES
src/WebSocketApi.cpp src/WebSocketApi.cpp
src/websocketserver/WebSocketServer.cpp src/websocketserver/WebSocketServer.cpp
src/websocketserver/WebSocketServer_Protocol.cpp src/websocketserver/WebSocketServer_Protocol.cpp
src/websocketserver/WebSocketServer_RequestBatchProcessing.cpp
src/websocketserver/rpc/WebSocketSession.cpp src/websocketserver/rpc/WebSocketSession.cpp
src/eventhandler/EventHandler.cpp src/eventhandler/EventHandler.cpp
src/eventhandler/EventHandler_General.cpp src/eventhandler/EventHandler_General.cpp
@ -101,6 +100,7 @@ set(obs-websocket_SOURCES
src/eventhandler/EventHandler_SceneItems.cpp src/eventhandler/EventHandler_SceneItems.cpp
src/eventhandler/EventHandler_MediaInputs.cpp src/eventhandler/EventHandler_MediaInputs.cpp
src/requesthandler/RequestHandler.cpp src/requesthandler/RequestHandler.cpp
src/requesthandler/RequestBatchHandler.cpp
src/requesthandler/RequestHandler_General.cpp src/requesthandler/RequestHandler_General.cpp
src/requesthandler/RequestHandler_Config.cpp src/requesthandler/RequestHandler_Config.cpp
src/requesthandler/RequestHandler_Sources.cpp src/requesthandler/RequestHandler_Sources.cpp
@ -111,6 +111,7 @@ set(obs-websocket_SOURCES
src/requesthandler/RequestHandler_Record.cpp src/requesthandler/RequestHandler_Record.cpp
src/requesthandler/RequestHandler_MediaInputs.cpp src/requesthandler/RequestHandler_MediaInputs.cpp
src/requesthandler/rpc/Request.cpp src/requesthandler/rpc/Request.cpp
src/requesthandler/rpc/RequestBatchRequest.cpp
src/requesthandler/rpc/RequestResult.cpp src/requesthandler/rpc/RequestResult.cpp
src/forms/SettingsDialog.cpp src/forms/SettingsDialog.cpp
src/forms/ConnectInfo.cpp src/forms/ConnectInfo.cpp
@ -134,9 +135,11 @@ set(obs-websocket_HEADERS
src/eventhandler/EventHandler.h src/eventhandler/EventHandler.h
src/eventhandler/types/EventSubscription.h src/eventhandler/types/EventSubscription.h
src/requesthandler/RequestHandler.h src/requesthandler/RequestHandler.h
src/requesthandler/RequestBatchHandler.h
src/requesthandler/types/RequestStatus.h src/requesthandler/types/RequestStatus.h
src/requesthandler/types/RequestBatchExecutionType.h src/requesthandler/types/RequestBatchExecutionType.h
src/requesthandler/rpc/Request.h src/requesthandler/rpc/Request.h
src/requesthandler/rpc/RequestBatchRequest.h
src/requesthandler/rpc/RequestResult.h src/requesthandler/rpc/RequestResult.h
src/forms/SettingsDialog.h src/forms/SettingsDialog.h
src/forms/ConnectInfo.h src/forms/ConnectInfo.h

View File

@ -1,6 +1,5 @@
/* /*
obs-websocket obs-websocket
Copyright (C) 2016-2021 Stephane Lepin <stephane.lepin@gmail.com>
Copyright (C) 2020-2021 Kyle Manning <tt2468@gmail.com> Copyright (C) 2020-2021 Kyle Manning <tt2468@gmail.com>
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
@ -17,30 +16,18 @@ You should have received a copy of the GNU General Public License along
with this program. If not, see <https://www.gnu.org/licenses/> with this program. If not, see <https://www.gnu.org/licenses/>
*/ */
#include <queue>
#include <condition_variable>
#include <util/profiler.hpp> #include <util/profiler.hpp>
#include "WebSocketServer.h" #include "RequestBatchHandler.h"
#include "../requesthandler/RequestHandler.h"
#include "../obs-websocket.h"
#include "../utils/Compat.h" #include "../utils/Compat.h"
#include "../obs-websocket.h"
struct SerialFrameRequest
{
Request request;
const json inputVariables;
const json outputVariables;
SerialFrameRequest(const std::string &requestType, const json &requestData, const json &inputVariables, const json &outputVariables) :
request(requestType, requestData, RequestBatchExecutionType::SerialFrame),
inputVariables(inputVariables),
outputVariables(outputVariables)
{}
};
struct SerialFrameBatch struct SerialFrameBatch
{ {
RequestHandler &requestHandler; RequestHandler &requestHandler;
std::queue<SerialFrameRequest> requests; std::queue<RequestBatchRequest> requests;
std::vector<RequestResult> results; std::vector<RequestResult> results;
json &variables; json &variables;
bool haltOnFailure; bool haltOnFailure;
@ -62,25 +49,23 @@ struct SerialFrameBatch
struct ParallelBatchResults struct ParallelBatchResults
{ {
RequestHandler &requestHandler; RequestHandler &requestHandler;
size_t requestCount; std::vector<RequestResult> results;
std::mutex resultsMutex;
std::vector<json> results; std::mutex conditionMutex;
std::condition_variable condition; std::condition_variable condition;
ParallelBatchResults(RequestHandler &requestHandler, size_t requestCount) : ParallelBatchResults(RequestHandler &requestHandler) :
requestHandler(requestHandler), requestHandler(requestHandler)
requestCount(requestCount)
{} {}
}; };
// `{"inputName": "inputNameVariable"}` is essentially `inputName = inputNameVariable` // `{"inputName": "inputNameVariable"}` is essentially `inputName = inputNameVariable`
static void PreProcessVariables(const json &variables, const json &inputVariables, json &requestData) static void PreProcessVariables(const json &variables, RequestBatchRequest &request)
{ {
if (variables.empty() || !inputVariables.is_object() || inputVariables.empty() || !requestData.is_object()) if (variables.empty() || !request.InputVariables.is_object() || request.InputVariables.empty() || !request.RequestData.is_object())
return; return;
for (auto& [key, value] : inputVariables.items()) { for (auto& [key, value] : request.InputVariables.items()) {
if (!value.is_string()) { if (!value.is_string()) {
blog_debug("[WebSocketServer::ProcessRequestBatch] Value of field `%s` in `inputVariables `is not a string. Skipping!", key.c_str()); blog_debug("[WebSocketServer::ProcessRequestBatch] Value of field `%s` in `inputVariables `is not a string. Skipping!", key.c_str());
continue; continue;
@ -92,55 +77,34 @@ static void PreProcessVariables(const json &variables, const json &inputVariable
continue; continue;
} }
requestData[key] = variables[valueString]; request.RequestData[key] = variables[valueString];
} }
request.HasRequestData = !request.RequestData.empty();
} }
// `{"sceneItemIdVariable": "sceneItemId"}` is essentially `sceneItemIdVariable = sceneItemId` // `{"sceneItemIdVariable": "sceneItemId"}` is essentially `sceneItemIdVariable = sceneItemId`
static void PostProcessVariables(json &variables, const json &outputVariables, const json &responseData) static void PostProcessVariables(json &variables, const RequestBatchRequest &request, const RequestResult &requestResult)
{ {
if (!outputVariables.is_object() || outputVariables.empty() || responseData.empty()) if (!request.OutputVariables.is_object() || request.OutputVariables.empty() || requestResult.ResponseData.empty())
return; return;
for (auto& [key, value] : outputVariables.items()) { for (auto& [key, value] : request.OutputVariables.items()) {
if (!value.is_string()) { if (!value.is_string()) {
blog_debug("[WebSocketServer::ProcessRequestBatch] Value of field `%s` in `outputVariables` is not a string. Skipping!", key.c_str()); blog_debug("[WebSocketServer::ProcessRequestBatch] Value of field `%s` in `outputVariables` is not a string. Skipping!", key.c_str());
continue; continue;
} }
std::string valueString = value; std::string valueString = value;
if (!responseData.contains(valueString)) { if (!requestResult.ResponseData.contains(valueString)) {
blog_debug("[WebSocketServer::ProcessRequestBatch] `outputVariables` requested responseData field `%s`, but it does not exist. Skipping!", valueString.c_str()); blog_debug("[WebSocketServer::ProcessRequestBatch] `outputVariables` requested responseData field `%s`, but it does not exist. Skipping!", valueString.c_str());
continue; continue;
} }
variables[key] = responseData[valueString]; variables[key] = requestResult.ResponseData[valueString];
} }
} }
static json ConstructRequestResult(RequestResult requestResult, const json &requestJson)
{
json ret;
ret["requestType"] = requestJson["requestType"];
if (requestJson.contains("requestId") && !requestJson["requestId"].is_null())
ret["requestId"] = requestJson["requestId"];
ret["requestStatus"] = {
{"result", requestResult.StatusCode == RequestStatus::Success},
{"code", requestResult.StatusCode}
};
if (!requestResult.Comment.empty())
ret["requestStatus"]["comment"] = requestResult.Comment;
if (requestResult.ResponseData.is_object())
ret["responseData"] = requestResult.ResponseData;
return ret;
}
static void ObsTickCallback(void *param, float) static void ObsTickCallback(void *param, float)
{ {
ScopeProfiler prof{"obs_websocket_request_batch_frame_tick"}; ScopeProfiler prof{"obs_websocket_request_batch_frame_tick"};
@ -162,15 +126,13 @@ static void ObsTickCallback(void *param, float)
// Begin recursing any unprocessed requests // Begin recursing any unprocessed requests
while (!serialFrameBatch->requests.empty()) { while (!serialFrameBatch->requests.empty()) {
// Fetch first in queue // Fetch first in queue
SerialFrameRequest frameRequest = serialFrameBatch->requests.front(); RequestBatchRequest request = serialFrameBatch->requests.front();
// Pre-process batch variables // Pre-process batch variables
PreProcessVariables(serialFrameBatch->variables, frameRequest.inputVariables, frameRequest.request.RequestData); PreProcessVariables(serialFrameBatch->variables, request);
// Determine if there is request data
frameRequest.request.HasRequestData = !frameRequest.request.RequestData.empty();
// Process request and get result // Process request and get result
RequestResult requestResult = serialFrameBatch->requestHandler.ProcessRequest(frameRequest.request); RequestResult requestResult = serialFrameBatch->requestHandler.ProcessRequest(request);
// Post-process batch variables // Post-process batch variables
PostProcessVariables(serialFrameBatch->variables, frameRequest.outputVariables, requestResult.ResponseData); PostProcessVariables(serialFrameBatch->variables, request, requestResult);
// Add to results vector // Add to results vector
serialFrameBatch->results.push_back(requestResult); serialFrameBatch->results.push_back(requestResult);
// Remove from front of queue // Remove from front of queue
@ -178,7 +140,7 @@ static void ObsTickCallback(void *param, float)
// If haltOnFailure and the request failed, clear the queue to make the batch return early. // If haltOnFailure and the request failed, clear the queue to make the batch return early.
if (serialFrameBatch->haltOnFailure && requestResult.StatusCode != RequestStatus::Success) { if (serialFrameBatch->haltOnFailure && requestResult.StatusCode != RequestStatus::Success) {
serialFrameBatch->requests = std::queue<SerialFrameRequest>(); serialFrameBatch->requests = std::queue<RequestBatchRequest>();
break; break;
} }
@ -194,37 +156,33 @@ static void ObsTickCallback(void *param, float)
serialFrameBatch->condition.notify_one(); serialFrameBatch->condition.notify_one();
} }
void WebSocketServer::ProcessRequestBatch(SessionPtr session, RequestBatchExecutionType::RequestBatchExecutionType executionType, const std::vector<json> &requests, std::vector<json> &results, json &variables, bool haltOnFailure) std::vector<RequestResult> RequestBatchHandler::ProcessRequestBatch(QThreadPool &threadPool, SessionPtr session, RequestBatchExecutionType::RequestBatchExecutionType executionType, std::vector<RequestBatchRequest> &requests, json &variables, bool haltOnFailure)
{ {
RequestHandler requestHandler(session); RequestHandler requestHandler(session);
if (executionType == RequestBatchExecutionType::SerialRealtime) { if (executionType == RequestBatchExecutionType::SerialRealtime) {
std::vector<RequestResult> ret;
// Recurse all requests in batch serially, processing the request then moving to the next one // Recurse all requests in batch serially, processing the request then moving to the next one
for (auto requestJson : requests) { for (auto &request : requests) {
Request request(requestJson["requestType"], requestJson["requestData"], RequestBatchExecutionType::SerialRealtime); PreProcessVariables(variables, request);
PreProcessVariables(variables, requestJson["inputVariables"], request.RequestData);
request.HasRequestData = !request.RequestData.empty();
RequestResult requestResult = requestHandler.ProcessRequest(request); RequestResult requestResult = requestHandler.ProcessRequest(request);
PostProcessVariables(variables, requestJson["outputVariables"], requestResult.ResponseData); PostProcessVariables(variables, request, requestResult);
json result = ConstructRequestResult(requestResult, requestJson); ret.push_back(requestResult);
results.push_back(result);
if (haltOnFailure && requestResult.StatusCode != RequestStatus::Success) if (haltOnFailure && requestResult.StatusCode != RequestStatus::Success)
break; break;
} }
return ret;
} else if (executionType == RequestBatchExecutionType::SerialFrame) { } else if (executionType == RequestBatchExecutionType::SerialFrame) {
SerialFrameBatch serialFrameBatch(requestHandler, variables, haltOnFailure); SerialFrameBatch serialFrameBatch(requestHandler, variables, haltOnFailure);
// Create Request objects in the worker thread (avoid unnecessary processing in graphics thread) // Create Request objects in the worker thread (avoid unnecessary processing in graphics thread)
for (auto requestJson : requests) { for (auto &request : requests)
SerialFrameRequest frameRequest(requestJson["requestType"], requestJson["requestData"], requestJson["inputVariables"], requestJson["outputVariables"]); serialFrameBatch.requests.push(request);
serialFrameBatch.requests.push(frameRequest);
}
// Create a callback entry for the graphics thread to execute on each video frame // Create a callback entry for the graphics thread to execute on each video frame
obs_add_tick_callback(ObsTickCallback, &serialFrameBatch); obs_add_tick_callback(ObsTickCallback, &serialFrameBatch);
@ -236,38 +194,32 @@ void WebSocketServer::ProcessRequestBatch(SessionPtr session, RequestBatchExecut
// Remove the created callback entry since we don't need it anymore // Remove the created callback entry since we don't need it anymore
obs_remove_tick_callback(ObsTickCallback, &serialFrameBatch); obs_remove_tick_callback(ObsTickCallback, &serialFrameBatch);
// Create Request objects in the worker thread (avoid unnecessary processing in graphics thread) return serialFrameBatch.results;
size_t i = 0;
for (auto requestResult : serialFrameBatch.results) {
results.push_back(ConstructRequestResult(requestResult, requests[i]));
i++;
}
} else if (executionType == RequestBatchExecutionType::Parallel) { } else if (executionType == RequestBatchExecutionType::Parallel) {
ParallelBatchResults parallelResults(requestHandler, requests.size()); ParallelBatchResults parallelResults(requestHandler);
// Acquire the lock early to prevent the batch from finishing before we're ready
std::unique_lock<std::mutex> lock(parallelResults.conditionMutex);
// Submit each request as a task to the thread pool to be processed ASAP // Submit each request as a task to the thread pool to be processed ASAP
for (auto requestJson : requests) { for (auto &request : requests) {
_threadPool.start(Utils::Compat::CreateFunctionRunnable([&parallelResults, &executionType, requestJson]() { threadPool.start(Utils::Compat::CreateFunctionRunnable([&parallelResults, &request]() {
Request request(requestJson["requestType"], requestJson["requestData"], RequestBatchExecutionType::Parallel);
RequestResult requestResult = parallelResults.requestHandler.ProcessRequest(request); RequestResult requestResult = parallelResults.requestHandler.ProcessRequest(request);
json result = ConstructRequestResult(requestResult, requestJson); std::unique_lock<std::mutex> lock(parallelResults.conditionMutex);
parallelResults.results.push_back(requestResult);
std::unique_lock<std::mutex> lock(parallelResults.resultsMutex);
parallelResults.results.push_back(result);
lock.unlock(); lock.unlock();
parallelResults.condition.notify_one(); parallelResults.condition.notify_one();
})); }));
} }
// Wait for the last request to finish processing // Wait for the last request to finish processing
std::unique_lock<std::mutex> lock(parallelResults.resultsMutex); size_t requestCount = requests.size();
auto cb = [&parallelResults]{return parallelResults.results.size() == parallelResults.requestCount;}; parallelResults.condition.wait(lock, [&parallelResults, requestCount]{return parallelResults.results.size() == requestCount;});
// A check just in case all requests managed to complete before we started waiting for the condition to be notified
if (!cb())
parallelResults.condition.wait(lock, cb);
results = parallelResults.results; return parallelResults.results;
} }
// Return empty vector if not a batch somehow
return std::vector<RequestResult>();
} }

View File

@ -0,0 +1,28 @@
/*
obs-websocket
Copyright (C) 2020-2021 Kyle Manning <tt2468@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see <https://www.gnu.org/licenses/>
*/
#pragma once
#include <QThreadPool>
#include "RequestHandler.h"
#include "rpc/RequestBatchRequest.h"
namespace RequestBatchHandler {
std::vector<RequestResult> ProcessRequestBatch(QThreadPool &threadPool, SessionPtr session, RequestBatchExecutionType::RequestBatchExecutionType executionType, std::vector<RequestBatchRequest> &requests, json &variables, bool haltOnFailure);
}

View File

@ -19,7 +19,6 @@ with this program. If not, see <https://www.gnu.org/licenses/>
#include "Request.h" #include "Request.h"
#include "../../obs-websocket.h" #include "../../obs-websocket.h"
#include "../../plugin-macros.generated.h"
json GetDefaultJsonObject(const json &requestData) json GetDefaultJsonObject(const json &requestData)
{ {
@ -30,15 +29,7 @@ json GetDefaultJsonObject(const json &requestData)
return requestData; return requestData;
} }
Request::Request(const std::string &requestType, const json &requestData) : Request::Request(const std::string &requestType, const json &requestData, const RequestBatchExecutionType::RequestBatchExecutionType executionType) :
RequestType(requestType),
HasRequestData(requestData.is_object()),
RequestData(GetDefaultJsonObject(requestData)),
ExecutionType(RequestBatchExecutionType::None)
{
}
Request::Request(const std::string &requestType, const json &requestData, RequestBatchExecutionType::RequestBatchExecutionType executionType) :
RequestType(requestType), RequestType(requestType),
HasRequestData(requestData.is_object()), HasRequestData(requestData.is_object()),
RequestData(GetDefaultJsonObject(requestData)), RequestData(GetDefaultJsonObject(requestData)),

View File

@ -31,8 +31,7 @@ enum ObsWebSocketSceneFilter {
struct Request struct Request
{ {
Request(const std::string &requestType, const json &requestData = nullptr); Request(const std::string &requestType, const json &requestData = nullptr, const RequestBatchExecutionType::RequestBatchExecutionType executionType = RequestBatchExecutionType::None);
Request(const std::string &requestType, const json &requestData, RequestBatchExecutionType::RequestBatchExecutionType executionType);
// Contains the key and is not null // Contains the key and is not null
bool Contains(const std::string &keyName) const; bool Contains(const std::string &keyName) const;

View File

@ -0,0 +1,26 @@
/*
obs-websocket
Copyright (C) 2020-2021 Kyle Manning <tt2468@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see <https://www.gnu.org/licenses/>
*/
#include "RequestBatchRequest.h"
RequestBatchRequest::RequestBatchRequest(const std::string &requestType, const json &requestData, RequestBatchExecutionType::RequestBatchExecutionType executionType, const json &inputVariables, const json &outputVariables) :
Request(requestType, requestData, executionType),
InputVariables(inputVariables),
OutputVariables(outputVariables)
{
}

View File

@ -0,0 +1,28 @@
/*
obs-websocket
Copyright (C) 2020-2021 Kyle Manning <tt2468@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see <https://www.gnu.org/licenses/>
*/
#pragma once
#include "Request.h"
struct RequestBatchRequest : Request {
RequestBatchRequest(const std::string &requestType, const json &requestData, RequestBatchExecutionType::RequestBatchExecutionType executionType, const json &inputVariables = nullptr, const json &outputVariables = nullptr);
json InputVariables;
json OutputVariables;
};

View File

@ -89,11 +89,9 @@ class WebSocketServer : QObject
void onClose(websocketpp::connection_hdl hdl); void onClose(websocketpp::connection_hdl hdl);
void onMessage(websocketpp::connection_hdl hdl, websocketpp::server<websocketpp::config::asio>::message_ptr message); void onMessage(websocketpp::connection_hdl hdl, websocketpp::server<websocketpp::config::asio>::message_ptr message);
void SetSessionParameters(SessionPtr session, WebSocketServer::ProcessResult &ret, const json &payloadData); static void SetSessionParameters(SessionPtr session, WebSocketServer::ProcessResult &ret, const json &payloadData);
void ProcessMessage(SessionPtr session, ProcessResult &ret, WebSocketOpCode::WebSocketOpCode opCode, json &payloadData); void ProcessMessage(SessionPtr session, ProcessResult &ret, WebSocketOpCode::WebSocketOpCode opCode, json &payloadData);
void ProcessRequestBatch(SessionPtr session, RequestBatchExecutionType::RequestBatchExecutionType executionType, const std::vector<json> &requests, std::vector<json> &results, json &variables, bool haltOnFailure);
QThreadPool _threadPool; QThreadPool _threadPool;
std::thread _serverThread; std::thread _serverThread;

View File

@ -18,9 +18,11 @@ with this program. If not, see <https://www.gnu.org/licenses/>
*/ */
#include <obs-module.h> #include <obs-module.h>
#include <util/profiler.hpp>
#include "WebSocketServer.h" #include "WebSocketServer.h"
#include "../requesthandler/RequestHandler.h" #include "../requesthandler/RequestHandler.h"
#include "../requesthandler/RequestBatchHandler.h"
#include "../eventhandler/EventHandler.h" #include "../eventhandler/EventHandler.h"
#include "../obs-websocket.h" #include "../obs-websocket.h"
#include "../Config.h" #include "../Config.h"
@ -33,6 +35,29 @@ static bool IsSupportedRpcVersion(uint8_t requestedVersion)
return (requestedVersion == 1); return (requestedVersion == 1);
} }
static json ConstructRequestResult(RequestResult requestResult, const json &requestJson)
{
json ret;
ret["requestType"] = requestJson["requestType"];
if (requestJson.contains("requestId") && !requestJson["requestId"].is_null())
ret["requestId"] = requestJson["requestId"];
ret["requestStatus"] = {
{"result", requestResult.StatusCode == RequestStatus::Success},
{"code", requestResult.StatusCode}
};
if (!requestResult.Comment.empty())
ret["requestStatus"]["comment"] = requestResult.Comment;
if (requestResult.ResponseData.is_object())
ret["responseData"] = requestResult.ResponseData;
return ret;
}
void WebSocketServer::SetSessionParameters(SessionPtr session, ProcessResult &ret, const json &payloadData) void WebSocketServer::SetSessionParameters(SessionPtr session, ProcessResult &ret, const json &payloadData)
{ {
if (payloadData.contains("ignoreInvalidMessages")) { if (payloadData.contains("ignoreInvalidMessages")) {
@ -281,9 +306,19 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
} }
std::vector<json> requests = payloadData["requests"]; std::vector<json> requests = payloadData["requests"];
json variables = payloadData["variables"];
std::vector<RequestBatchRequest> requestsVector;
for (auto &requestJson : requests)
requestsVector.emplace_back(requestJson["requestType"], requestJson["requestData"], executionType, requestJson["inputVariables"], requestJson["outputVariables"]);
auto resultsVector = RequestBatchHandler::ProcessRequestBatch(_threadPool, session, executionType, requestsVector, payloadData["variables"], haltOnFailure);
size_t i = 0;
std::vector<json> results; std::vector<json> results;
ProcessRequestBatch(session, executionType, requests, results, variables, haltOnFailure); for (auto &requestResult : resultsVector) {
results.push_back(ConstructRequestResult(requestResult, requests[i]));
i++;
}
ret.result["op"] = WebSocketOpCode::RequestBatchResponse; ret.result["op"] = WebSocketOpCode::RequestBatchResponse;
ret.result["d"]["requestId"] = payloadData["requestId"]; ret.result["d"]["requestId"] = payloadData["requestId"];