Beef up security on API

This commit is contained in:
Andrew 2022-04-12 21:52:40 -04:00
parent 64d2e5fedd
commit 3711c94ffe
2 changed files with 97 additions and 7 deletions

View File

@ -258,8 +258,14 @@ class Permissions_Servers:
.where(Role_Servers.server_id == server_id)
.execute()
)
user_permissions_mask = role_server[0].permissions
key_permissions_mask = key.Permissions_Servers
try:
user_permissions_mask = role_server[0].permissions
except:
if user["superuser"]:
user_permissions_mask = "11111111"
else:
user_permissions_mask = "00000000"
key_permissions_mask = key.server_permissions
permissions_mask = PermissionHelper.combine_masks(
user_permissions_mask, key_permissions_mask
)

View File

@ -2,6 +2,8 @@ from datetime import datetime
import logging
import re
from platformdirs import user_cache_path
from app.classes.controllers.crafty_perms_controller import Enum_Permissions_Crafty
from app.classes.controllers.server_perms_controller import Enum_Permissions_Server
from app.classes.web.base_handler import BaseHandler
@ -135,16 +137,28 @@ class SendCommand(ApiHandler):
def post(self):
user = self.authenticate_user()
user_obj = self.controller.users.get_user_by_api_token(self.api_token)
if user is None:
self.access_denied("unknown")
return
server_id = self.get_argument("id")
if (
not user_obj["user_id"]
in self.controller.server_perms.get_server_user_list(server_id)
and not user_obj["superuser"]
):
self.access_denied("unknown")
return
if not self.permissions[
"Commands"
] in self.controller.server_perms.get_api_key_permissions_list(
self.controller.users.get_api_key_by_token(self.api_token), server_id
):
self.access_denied(user)
return
command = self.get_argument("command", default=None, strip=True)
server_id = self.get_argument("id")
@ -163,16 +177,28 @@ class ServerBackup(ApiHandler):
def post(self):
user = self.authenticate_user()
user_obj = self.controller.users.get_user_by_api_token(self.api_token)
if user is None:
self.access_denied("unknown")
return
server_id = self.get_argument("id")
if (
not user_obj["user_id"]
in self.controller.server_perms.get_server_user_list(server_id)
and not user_obj["superuser"]
):
self.access_denied("unknown")
return
if not self.permissions[
"Backup"
] in self.controller.server_perms.get_api_key_permissions_list(
self.controller.users.get_api_key_by_token(self.api_token), server_id
):
self.access_denied(user)
return
server = self.controller.get_server_obj(server_id)
@ -190,15 +216,23 @@ class StartServer(ApiHandler):
if user is None:
self.access_denied("unknown")
return
server_id = self.get_argument("id")
if not self.permissions[
if (
not user_obj["user_id"]
in self.controller.server_perms.get_server_user_list(server_id)
and not user_obj["superuser"]
):
self.access_denied("unknown")
return
elif not self.permissions[
"Commands"
] in self.controller.server_perms.get_api_key_permissions_list(
self.controller.users.get_api_key_by_token(self.api_token), server_id
):
self.access_denied(user)
self.access_denied("unknown")
return
server = self.controller.get_server_obj(server_id)
@ -216,16 +250,27 @@ class StopServer(ApiHandler):
user = self.authenticate_user()
remote_ip = self.get_remote_ip()
user_obj = self.controller.users.get_user_by_api_token(self.api_token)
if user is None:
self.access_denied("unknown")
return
server_id = self.get_argument("id")
if (
not user_obj["user_id"]
in self.controller.server_perms.get_server_user_list(server_id)
and not user_obj["superuser"]
):
self.access_denied("unknown")
if not self.permissions[
"Commands"
] in self.controller.server_perms.get_api_key_permissions_list(
self.controller.users.get_api_key_by_token(self.api_token), server_id
):
self.access_denied(user)
return
server = self.controller.get_server_obj(server_id)
@ -243,10 +288,17 @@ class RestartServer(ApiHandler):
def post(self):
user = self.authenticate_user()
remote_ip = self.get_remote_ip()
server_id = self.get_argument("id")
user_obj = self.controller.users.get_user_by_api_token(self.api_token)
if user is None:
self.access_denied("unknown")
return
server_id = self.get_argument("id")
if not user_obj["user_id"] in self.controller.server_perms.get_server_user_list(
server_id
):
self.access_denied("unknown")
if not self.permissions[
"Commands"
@ -264,9 +316,21 @@ class RestartServer(ApiHandler):
class CreateUser(ApiHandler):
def post(self):
user = self.authenticate_user()
user_obj = self.controller.users.get_user_by_api_token(self.api_token)
user_perms = self.controller.crafty_perms.get_crafty_permissions_list(
user_obj["user_id"]
)
if (
not self.permissions["User_Config"] in user_perms
and not user_obj["superuser"]
):
self.access_denied("unknown")
return
if user is None:
self.access_denied("unknown")
return
if not self.permissions[
"User_Config"
@ -274,6 +338,7 @@ class CreateUser(ApiHandler):
self.controller.users.get_api_key_by_token(self.api_token)
):
self.access_denied(user)
return
new_username = self.get_argument("username")
new_pass = self.get_argument("password")
@ -305,8 +370,22 @@ class DeleteUser(ApiHandler):
def post(self):
user = self.authenticate_user()
user_obj = self.controller.users.get_user_by_api_token(self.api_token)
user_perms = self.controller.crafty_perms.get_crafty_permissions_list(
user_obj["user_id"]
)
if (
not self.permissions["User_Config"] in user_perms
and not user_obj["superuser"]
):
self.access_denied("unknown")
return
if user is None:
self.access_denied("unknown")
return
if not self.permissions[
"User_Config"
@ -314,6 +393,7 @@ class DeleteUser(ApiHandler):
self.controller.users.get_api_key_by_token(self.api_token)
):
self.access_denied(user)
return
user_id = self.get_argument("user_id", None, True)
user_to_del = self.controller.users.get_user_by_id(user_id)
@ -336,15 +416,19 @@ class ListServers(ApiHandler):
if user is None:
self.access_denied("unknown")
return
if self.api_token is None:
self.access_denied("unknown")
return
if user_obj["superuser"]:
servers = self.controller.servers.get_all_defined_servers()
servers = [str(i) for i in servers]
else:
servers = self.controller.servers.get_all_defined_servers()
servers = self.controller.servers.get_authorized_servers(
user_obj["user_id"]
)
servers = [str(i) for i in servers]
self.return_response(