obs-websocket/src/WebSocketServer.cpp

284 lines
8.9 KiB
C++
Raw Normal View History

2021-04-28 17:27:32 +00:00
#include <chrono>
#include <thread>
2021-04-27 21:52:48 +00:00
#include <QtConcurrent>
#include <QDateTime>
#include <QTime>
2021-04-27 21:52:48 +00:00
#include "WebSocketServer.h"
2021-04-27 23:33:47 +00:00
#include "obs-websocket.h"
#include "Config.h"
#include "requesthandler/RequestHandler.h"
2021-04-28 17:27:32 +00:00
#include "utils/Utils.h"
2021-04-27 23:33:47 +00:00
2021-04-27 23:41:10 +00:00
#include "plugin-macros.generated.h"
2021-04-27 23:33:47 +00:00
WebSocketServer::WebSocketServer() :
2021-04-29 05:13:02 +00:00
QObject(nullptr),
2021-04-27 23:33:47 +00:00
_sessions()
{
// Randomize the random number generator
qsrand(QTime::currentTime().msec());
_server.get_alog().clear_channels(websocketpp::log::alevel::all);
_server.get_elog().clear_channels(websocketpp::log::elevel::all);
2021-04-27 23:33:47 +00:00
_server.init_asio();
#ifndef _WIN32
_server.set_reuse_addr(true);
#endif
_server.set_open_handler(
websocketpp::lib::bind(
&WebSocketServer::onOpen, this, websocketpp::lib::placeholders::_1
)
);
_server.set_close_handler(
websocketpp::lib::bind(
&WebSocketServer::onClose, this, websocketpp::lib::placeholders::_1
)
);
_server.set_message_handler(
websocketpp::lib::bind(
&WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2
)
);
}
WebSocketServer::~WebSocketServer()
{
2021-04-28 19:20:56 +00:00
if (_server.is_listening())
Stop();
2021-04-27 23:33:47 +00:00
}
2021-04-28 17:27:32 +00:00
void WebSocketServer::ServerRunner()
{
2021-04-28 18:43:39 +00:00
blog(LOG_INFO, "[ServerRunner] IO thread started.");
2021-04-28 17:27:32 +00:00
try {
_server.run();
} catch (websocketpp::exception const & e) {
2021-04-28 18:43:39 +00:00
blog(LOG_ERROR, "[ServerRunner] websocketpp instance returned an error: %s", e.what());
2021-04-28 17:27:32 +00:00
} catch (const std::exception & e) {
2021-04-28 18:43:39 +00:00
blog(LOG_ERROR, "[ServerRunner] websocketpp instance returned an error: %s", e.what());
2021-04-28 17:27:32 +00:00
} catch (...) {
2021-04-28 18:43:39 +00:00
blog(LOG_ERROR, "[ServerRunner] websocketpp instance returned an error");
2021-04-28 17:27:32 +00:00
}
2021-04-28 18:43:39 +00:00
blog(LOG_INFO, "[ServerRunner] IO thread exited.");
2021-04-28 17:27:32 +00:00
}
2021-04-27 23:33:47 +00:00
void WebSocketServer::Start()
{
2021-04-28 17:27:32 +00:00
if (_server.is_listening()) {
2021-04-28 18:43:39 +00:00
blog(LOG_WARNING, "[Start] Call to Start() but the server is already listening.");
2021-04-28 17:27:32 +00:00
return;
}
auto conf = GetConfig();
if (!conf) {
2021-04-28 18:43:39 +00:00
blog(LOG_ERROR, "[Start] Unable to retreive config!");
2021-04-28 17:27:32 +00:00
return;
}
_serverPort = conf->ServerPort;
2021-04-29 15:24:27 +00:00
_debugEnabled = conf->DebugEnabled;
2021-04-29 05:03:23 +00:00
_authenticationRequired = conf->AuthRequired;
2021-04-28 17:27:32 +00:00
_authenticationSalt = Utils::Crypto::GenerateSalt();
_authenticationSecret = Utils::Crypto::GenerateSecret(conf->ServerPassword.toStdString(), _authenticationSalt);
// Set log levels if debug is enabled
2021-04-29 15:24:27 +00:00
if (_debugEnabled) {
_server.get_alog().set_channels(websocketpp::log::alevel::all);
_server.get_alog().clear_channels(websocketpp::log::alevel::frame_header | websocketpp::log::alevel::frame_payload | websocketpp::log::alevel::control);
_server.get_elog().set_channels(websocketpp::log::elevel::all);
_server.get_alog().clear_channels(websocketpp::log::elevel::info);
} else {
_server.get_alog().clear_channels(websocketpp::log::alevel::all);
_server.get_elog().clear_channels(websocketpp::log::elevel::all);
}
2021-04-28 17:27:32 +00:00
_server.reset();
websocketpp::lib::error_code errorCode;
_server.listen(websocketpp::lib::asio::ip::tcp::v4(), _serverPort, errorCode);
2021-04-28 17:27:32 +00:00
if (errorCode) {
std::string errorCodeMessage = errorCode.message();
2021-04-28 18:43:39 +00:00
blog(LOG_INFO, "[Start] Listen failed: %s", errorCodeMessage.c_str());
2021-04-28 17:27:32 +00:00
return;
}
_server.start_accept();
_serverThread = std::thread(&WebSocketServer::ServerRunner, this);
2021-04-28 21:24:05 +00:00
blog(LOG_INFO, "[Start] Server started successfully on port %d. Possible connect address: %s", _serverPort, Utils::Platform::GetLocalAddress().c_str());
2021-04-27 23:33:47 +00:00
}
void WebSocketServer::Stop()
{
2021-04-28 17:27:32 +00:00
if (!_server.is_listening()) {
2021-04-28 18:43:39 +00:00
blog(LOG_WARNING, "[Stop] Call to Stop() but the server is not listening.");
2021-04-28 17:27:32 +00:00
return;
}
_server.stop_listening();
std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto const& [hdl, session] : _sessions) {
websocketpp::lib::error_code errorCode;
_server.pause_reading(hdl, errorCode);
if (errorCode) {
blog(LOG_INFO, "[Stop] Error: %s", errorCode.message().c_str());
continue;
}
_server.close(hdl, websocketpp::close::status::going_away, "Server stopping.", errorCode);
if (errorCode) {
blog(LOG_INFO, "[Stop] Error: %s", errorCode.message().c_str());
continue;
}
2021-04-28 17:27:32 +00:00
}
lock.unlock();
_threadPool.waitForDone();
// This can delay the thread that it is running on. Bad but kinda required.
2021-04-28 17:27:32 +00:00
while (_sessions.size() > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
_serverThread.join();
2021-04-28 18:43:39 +00:00
blog(LOG_INFO, "[Stop] Server stopped successfully");
2021-04-27 23:33:47 +00:00
}
void WebSocketServer::InvalidateSession(websocketpp::connection_hdl hdl)
{
2021-04-28 18:43:39 +00:00
blog(LOG_INFO, "[InvalidateSession] Invalidating a session.");
websocketpp::lib::error_code errorCode;
_server.pause_reading(hdl);
if (errorCode) {
blog(LOG_INFO, "[InvalidateSession] Error: %s", errorCode.message().c_str());
return;
}
2021-04-28 17:40:51 +00:00
_server.close(hdl, WebSocketCloseCode::SessionInvalidated, "Your session has been invalidated.");
if (errorCode) {
blog(LOG_INFO, "[InvalidateSession] Error: %s", errorCode.message().c_str());
return;
}
2021-04-27 23:33:47 +00:00
}
std::vector<WebSocketServer::WebSocketSessionState> WebSocketServer::GetWebSocketSessions()
2021-04-27 23:33:47 +00:00
{
std::vector<WebSocketServer::WebSocketSessionState> webSocketSessions;
std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto & [hdl, session] : _sessions) {
uint64_t connectedAt = session.ConnectedAt();
uint64_t incomingMessages = session.IncomingMessages();
uint64_t outgoingMessages = session.OutgoingMessages();
std::string remoteAddress = session.RemoteAddress();
webSocketSessions.emplace_back(WebSocketSessionState{hdl, remoteAddress, connectedAt, incomingMessages, outgoingMessages});
}
lock.unlock();
2021-04-27 23:33:47 +00:00
return webSocketSessions;
}
std::string WebSocketServer::GetConnectUrl()
{
QString ret = QString("ws://%1:%2").arg(QString::fromStdString(Utils::Platform::GetLocalAddress())).arg(_serverPort);
return ret.toStdString();
2021-04-27 23:33:47 +00:00
}
void WebSocketServer::BroadcastEvent(uint64_t requiredIntent, std::string eventType, json eventData)
{
QtConcurrent::run(&_threadPool, [=]() {
json eventMessage;
eventMessage["messageType"] = "Event";
eventMessage["eventType"] = eventType;
if (eventData.is_object())
eventMessage["eventData"] = eventData;
// I hate to have to encode all supported types, but it's more efficient at scale than doing it per-session.
std::string messageJson = eventMessage.dump();
auto messageMsgPack = json::to_msgpack(eventMessage);
std::string messageMsgPackString(messageMsgPack.begin(), messageMsgPack.end());
std::unique_lock<std::mutex> lock(_sessionMutex);
for (auto & it : _sessions) {
if (!it.second.IsIdentified())
continue;
if ((it.second.EventSubscriptions() & requiredIntent) != 0) {
switch (it.second.Encoding()) {
case WebSocketEncoding::Json:
_server.send((websocketpp::connection_hdl)it.first, messageJson, websocketpp::frame::opcode::text);
break;
case WebSocketEncoding::MsgPack:
_server.send((websocketpp::connection_hdl)it.first, messageMsgPackString, websocketpp::frame::opcode::binary);
break;
}
}
}
lock.unlock();
});
2021-04-27 23:33:47 +00:00
}
void WebSocketServer::onOpen(websocketpp::connection_hdl hdl)
{
auto conn = _server.get_con_from_hdl(hdl);
std::unique_lock<std::mutex> lock(_sessionMutex);
2021-04-29 05:03:23 +00:00
auto &session = _sessions[hdl];
lock.unlock();
2021-04-29 05:03:23 +00:00
session.SetRemoteAddress(conn->get_remote_endpoint());
session.SetConnectedAt(QDateTime::currentSecsSinceEpoch());
std::string contentType = conn->get_request_header("Content-Type");
if (contentType == "") {
;
} else if (contentType == "application/json") {
session.SetEncoding(WebSocketEncoding::Json);
} else if (contentType == "application/msgpack") {
session.SetEncoding(WebSocketEncoding::MsgPack);
} else {
conn->close(WebSocketCloseCode::InvalidContentType, "Your HTTP `Content-Type` header specifies an invalid encoding type.");
return;
}
json helloMessage;
helloMessage["messageType"] = "Hello";
helloMessage["obsWebSocketVersion"] = OBS_WEBSOCKET_VERSION;
helloMessage["rpcVersion"] = OBS_WEBSOCKET_RPC_VERSION;
// todo: Add request and event lists
if (_authenticationRequired) {
std::string sessionChallenge = Utils::Crypto::GenerateSalt();
session.SetChallenge(sessionChallenge);
helloMessage["authentication"] = {};
helloMessage["authentication"]["challenge"] = sessionChallenge;
helloMessage["authentication"]["salt"] = _authenticationSalt;
}
auto sessionEncoding = session.Encoding();
if (sessionEncoding == WebSocketEncoding::Json) {
conn->send(helloMessage.dump());
} else if (sessionEncoding == WebSocketEncoding::MsgPack) {
auto message = json::to_msgpack(helloMessage);
conn->send(message.data(), sizeof(message[0]) * message.size());
}
2021-04-27 23:33:47 +00:00
}
void WebSocketServer::onClose(websocketpp::connection_hdl hdl)
{
std::unique_lock<std::mutex> lock(_sessionMutex);
_sessions.erase(hdl);
lock.unlock();
2021-04-27 23:33:47 +00:00
}
void WebSocketServer::onMessage(websocketpp::connection_hdl hdl, websocketpp::server<websocketpp::config::asio>::message_ptr message)
{
;
}