crafty-4/app/classes/web/panel_handler.py

2115 lines
88 KiB
Python

# pylint: disable=too-many-lines
import time
import datetime
import os
from typing import Dict, Any, Tuple
import json
import logging
import threading
import bleach
import libgravatar
import requests
import tornado.web
import tornado.escape
from tornado import iostream
# TZLocal is set as a hidden import on win pipeline
from tzlocal import get_localzone
from cron_validator import CronValidator
from app.classes.models.server_permissions import EnumPermissionsServer
from app.classes.models.crafty_permissions import EnumPermissionsCrafty
from app.classes.models.management import HelpersManagement
from app.classes.shared.helpers import Helpers
from app.classes.web.base_handler import BaseHandler
logger = logging.getLogger(__name__)
class PanelHandler(BaseHandler):
def get_user_roles(self) -> Dict[str, list]:
user_roles = {}
for user in self.controller.users.get_all_users():
user_roles_list = self.controller.users.get_user_roles_names(user.user_id)
# user_servers =
# self.controller.servers.get_authorized_servers(user.user_id)
user_roles[user.user_id] = user_roles_list
return user_roles
def get_role_servers(self) -> set:
servers = set()
for server in self.controller.list_defined_servers():
argument = int(
float(
bleach.clean(
self.get_argument(f"server_{server['server_id']}_access", "0")
)
)
)
if argument:
servers.add(server["server_id"])
return servers
def get_perms_quantity(self) -> Tuple[str, dict]:
permissions_mask: str = "000"
server_quantity: dict = {}
for (
permission
) in self.controller.crafty_perms.list_defined_crafty_permissions():
argument = int(
float(
bleach.clean(
self.get_argument(f"permission_{permission.name}", "0")
)
)
)
if argument:
permissions_mask = self.controller.crafty_perms.set_permission(
permissions_mask, permission, argument
)
q_argument = int(
float(
bleach.clean(self.get_argument(f"quantity_{permission.name}", "0"))
)
)
if q_argument:
server_quantity[permission.name] = q_argument
else:
server_quantity[permission.name] = 0
return permissions_mask, server_quantity
def get_perms(self) -> str:
permissions_mask: str = "000"
for (
permission
) in self.controller.crafty_perms.list_defined_crafty_permissions():
argument = self.get_argument(f"permission_{permission.name}", None)
if argument is not None:
permissions_mask = self.controller.crafty_perms.set_permission(
permissions_mask, permission, 1 if argument == "1" else 0
)
return permissions_mask
def get_perms_server(self) -> str:
permissions_mask = "00000000"
for permission in self.controller.server_perms.list_defined_permissions():
argument = self.get_argument(f"permission_{permission.name}", None)
if argument is not None:
permissions_mask = self.controller.server_perms.set_permission(
permissions_mask, permission, 1 if argument == "1" else 0
)
return permissions_mask
def get_user_role_memberships(self) -> set:
roles = set()
for role in self.controller.roles.get_all_roles():
if self.get_argument(f"role_{role.role_id}_membership", None) == "1":
roles.add(role.role_id)
return roles
def download_file(self, name: str, file: str):
self.set_header("Content-Type", "application/octet-stream")
self.set_header("Content-Disposition", f"attachment; filename={name}")
chunk_size = 1024 * 1024 * 4 # 4 MiB
with open(file, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
try:
self.write(chunk) # write the chunk to response
self.flush() # send the chunk to client
except iostream.StreamClosedError:
# this means the client has closed the connection
# so break the loop
break
finally:
# deleting the chunk is very important because
# if many clients are downloading files at the
# same time, the chunks in memory will keep
# increasing and will eat up the RAM
del chunk
def check_server_id(self):
server_id = self.get_argument("id", None)
api_key, _, exec_user = self.current_user
superuser = exec_user["superuser"]
# Commented out because there is no server access control for API keys,
# they just inherit from the host user
# if api_key is not None:
# superuser = superuser and api_key.superuser
if server_id is None:
self.redirect("/panel/error?error=Invalid Server ID")
return None
else:
# Does this server exist?
if not self.controller.servers.server_id_exists(server_id):
self.redirect("/panel/error?error=Invalid Server ID")
return None
# Does the user have permission?
if not superuser: # TODO: Figure out a better solution
if api_key is not None:
if not self.controller.servers.server_id_authorized_api_key(
server_id, api_key
):
print(
f"API key {api_key.name} (id: {api_key.token_id}) "
f"does not have permission"
)
self.redirect("/panel/error?error=Invalid Server ID")
return None
else:
if not self.controller.servers.server_id_authorized(
server_id, exec_user["user_id"]
):
print(f'User {exec_user["user_id"]} does not have permission')
self.redirect("/panel/error?error=Invalid Server ID")
return None
return server_id
# Server fetching, spawned asynchronously
# TODO: Make the related front-end elements update with AJAX
def fetch_server_data(self, page_data):
total_players = 0
for server in page_data["servers"]:
total_players += len(
self.controller.stats.get_server_players(
server["server_data"]["server_id"]
)
)
page_data["num_players"] = total_players
for server in page_data["servers"]:
try:
data = json.loads(server["int_ping_results"])
server["int_ping_results"] = data
except Exception as e:
logger.error(f"Failed server data for page with error: {e}")
return page_data
@tornado.web.authenticated
async def get(self, page):
error = self.get_argument("error", "WTF Error!")
template = "panel/denied.html"
now = time.time()
formatted_time = str(
datetime.datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M:%S")
)
api_key, _token_data, exec_user = self.current_user
superuser = exec_user["superuser"]
if api_key is not None:
superuser = superuser and api_key.superuser
if superuser: # TODO: Figure out a better solution
defined_servers = self.controller.list_defined_servers()
exec_user_role = {"Super User"}
exec_user_crafty_permissions = (
self.controller.crafty_perms.list_defined_crafty_permissions()
)
else:
if api_key is not None:
exec_user_crafty_permissions = (
self.controller.crafty_perms.get_api_key_permissions_list(api_key)
)
else:
exec_user_crafty_permissions = (
self.controller.crafty_perms.get_crafty_permissions_list(
exec_user["user_id"]
)
)
logger.debug(exec_user["roles"])
exec_user_role = set()
for r in exec_user["roles"]:
role = self.controller.roles.get_role(r)
exec_user_role.add(role["role_name"])
defined_servers = self.controller.servers.get_authorized_servers(
exec_user["user_id"]
)
user_order = self.controller.users.get_user_by_id(exec_user["user_id"])
user_order = user_order["server_order"].split(",")
page_servers = []
server_ids = []
for server_id in user_order[:]:
for server in defined_servers[:]:
if str(server["server_id"]) == str(server_id):
page_servers.append(server)
user_order.remove(server_id)
defined_servers.remove(server)
for server in defined_servers:
server_ids.append(str(server["server_id"]))
if server not in page_servers:
page_servers.append(server)
for server_id in user_order[:]:
# remove IDs in list that user no longer has access to
if str(server_id) not in server_ids:
user_order.remove(server_id)
defined_servers = page_servers
page_data: Dict[str, Any] = {
# todo: make this actually pull and compare version data
"update_available": False,
"serverTZ": get_localzone(),
"version_data": self.helper.get_version_string(),
"user_data": exec_user,
"user_role": exec_user_role,
"user_crafty_permissions": exec_user_crafty_permissions,
"crafty_permissions": {
"Server_Creation": EnumPermissionsCrafty.SERVER_CREATION,
"User_Config": EnumPermissionsCrafty.USER_CONFIG,
"Roles_Config": EnumPermissionsCrafty.ROLES_CONFIG,
},
"server_stats": {
"total": len(defined_servers),
"running": len(self.controller.list_running_servers()),
"stopped": (
len(self.controller.list_defined_servers())
- len(self.controller.list_running_servers())
),
},
"menu_servers": defined_servers,
"hosts_data": self.controller.management.get_latest_hosts_stats(),
"show_contribute": self.helper.get_setting("show_contribute_link", True),
"error": error,
"time": formatted_time,
"lang": self.controller.users.get_user_lang_by_id(exec_user["user_id"]),
"lang_page": Helpers.get_lang_page(
self.controller.users.get_user_lang_by_id(exec_user["user_id"])
),
"super_user": superuser,
"api_key": {
"name": api_key.name,
"created": api_key.created,
"server_permissions": api_key.server_permissions,
"crafty_permissions": api_key.crafty_permissions,
"superuser": api_key.superuser,
}
if api_key is not None
else None,
"superuser": superuser,
}
if self.helper.get_setting("allow_nsfw_profile_pictures"):
rating = "x"
else:
rating = "g"
# Get grvatar hash for profile pictures
if exec_user["email"] != "default@example.com" or "":
gravatar = libgravatar.Gravatar(
libgravatar.sanitize_email(exec_user["email"])
)
url = gravatar.get_image(
size=80,
default="404",
force_default=False,
rating=rating,
filetype_extension=False,
use_ssl=True,
) # + "?d=404"
try:
if requests.head(url).status_code != 404:
profile_url = url
else:
profile_url = "/static/assets/images/faces-clipart/pic-3.png"
except:
profile_url = "/static/assets/images/faces-clipart/pic-3.png"
else:
profile_url = "/static/assets/images/faces-clipart/pic-3.png"
page_data["user_image"] = profile_url
if page == "unauthorized":
template = "panel/denied.html"
elif page == "error":
template = "public/error.html"
elif page == "credits":
with open(
self.helper.credits_cache, encoding="utf-8"
) as credits_default_local:
try:
remote = requests.get(
"https://craftycontrol.com/credits", allow_redirects=True
)
credits_dict: dict = remote.json()
if not credits_dict["staff"]:
logger.error("Issue with upstream Staff, using local.")
credits_dict: dict = json.load(credits_default_local)
except:
logger.error("Request to credits bucket failed, using local.")
credits_dict: dict = json.load(credits_default_local)
timestamp = credits_dict["lastUpdate"] / 1000.0
page_data["patrons"] = credits_dict["patrons"]
page_data["staff"] = credits_dict["staff"]
# Filter Language keys to exclude joke prefix '*'
clean_dict = {
user.replace("*", ""): translation
for user, translation in credits_dict["translations"].items()
}
page_data["translations"] = clean_dict
# 0 Defines if we are using local credits file andd displays sadcat.
if timestamp == 0:
page_data["lastUpdate"] = "😿"
else:
page_data["lastUpdate"] = str(
datetime.datetime.fromtimestamp(timestamp).strftime(
"%Y-%m-%d %H:%M:%S"
)
)
template = "panel/credits.html"
elif page == "contribute":
template = "panel/contribute.html"
elif page == "dashboard":
if superuser: # TODO: Figure out a better solution
try:
page_data[
"servers"
] = self.controller.servers.get_all_servers_stats()
except IndexError:
self.controller.stats.record_stats()
page_data[
"servers"
] = self.controller.servers.get_all_servers_stats()
else:
try:
user_auth = self.controller.servers.get_authorized_servers_stats(
exec_user["user_id"]
)
except IndexError:
self.controller.stats.record_stats()
user_auth = self.controller.servers.get_authorized_servers_stats(
exec_user["user_id"]
)
logger.debug(f"ASFR: {user_auth}")
page_data["servers"] = user_auth
page_data["server_stats"]["running"] = len(
list(filter(lambda x: x["stats"]["running"], page_data["servers"]))
)
page_data["server_stats"]["stopped"] = (
len(page_data["servers"]) - page_data["server_stats"]["running"]
)
# set user server order
user_order = self.controller.users.get_user_by_id(exec_user["user_id"])
user_order = user_order["server_order"].split(",")
page_servers = []
server_ids = []
un_used_servers = page_data["servers"]
flag = 0
for server_id in user_order[:]:
for server in un_used_servers[:]:
if flag == 0:
server["stats"][
"downloading"
] = self.controller.servers.get_download_status(
str(server["stats"]["server_id"]["server_id"])
)
server["stats"]["crashed"] = self.controller.servers.is_crashed(
str(server["stats"]["server_id"]["server_id"])
)
try:
server["stats"][
"waiting_start"
] = self.controller.servers.get_waiting_start(
str(server["stats"]["server_id"]["server_id"])
)
except Exception as e:
logger.error(f"Failed to get server waiting to start: {e}")
server["stats"]["waiting_start"] = False
if str(server["server_data"]["server_id"]) == str(server_id):
page_servers.append(server)
un_used_servers.remove(server)
user_order.remove(server_id)
# we only want to set these server stats values once.
# We need to update the flag so it only hits that if once.
flag += 1
for server in un_used_servers:
server_ids.append(str(server["server_data"]["server_id"]))
if server not in page_servers:
page_servers.append(server)
for server_id in user_order:
# remove IDs in list that user no longer has access to
if str(server_id) not in server_ids[:]:
user_order.remove(server_id)
page_data["servers"] = page_servers
# num players is set to zero here. If we poll all servers while
# dashboard is loading it takes FOREVER. We leave this to the
# async polling once dashboard is served.
page_data["num_players"] = 0
template = "panel/dashboard.html"
elif page == "server_detail":
subpage = bleach.clean(self.get_argument("subpage", ""))
server_id = self.check_server_id()
if server_id is None:
return
valid_subpages = [
"term",
"logs",
"backup",
"config",
"files",
"admin_controls",
"schedules",
]
server = self.controller.get_server_obj(server_id)
# server_data isn't needed since the server_stats also pulls server data
page_data["server_data"] = self.controller.servers.get_server_data_by_id(
server_id
)
page_data["server_stats"] = self.controller.servers.get_server_stats_by_id(
server_id
)
page_data["downloading"] = self.controller.servers.get_download_status(
server_id
)
try:
page_data["waiting_start"] = self.controller.servers.get_waiting_start(
server_id
)
except Exception as e:
logger.error(f"Failed to get server waiting to start: {e}")
page_data["waiting_start"] = False
page_data["get_players"] = lambda: self.controller.stats.get_server_players(
server_id
)
page_data["active_link"] = subpage
page_data["permissions"] = {
"Commands": EnumPermissionsServer.COMMANDS,
"Terminal": EnumPermissionsServer.TERMINAL,
"Logs": EnumPermissionsServer.LOGS,
"Schedule": EnumPermissionsServer.SCHEDULE,
"Backup": EnumPermissionsServer.BACKUP,
"Files": EnumPermissionsServer.FILES,
"Config": EnumPermissionsServer.CONFIG,
"Players": EnumPermissionsServer.PLAYERS,
}
page_data[
"user_permissions"
] = self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
)
page_data["server_stats"]["crashed"] = self.controller.servers.is_crashed(
server_id
)
page_data["server_stats"][
"server_type"
] = self.controller.servers.get_server_type_by_id(server_id)
if subpage not in valid_subpages:
logger.debug("not a valid subpage")
if not subpage:
if (
page_data["permissions"]["Terminal"]
in page_data["user_permissions"]
):
subpage = "term"
elif page_data["permissions"]["Logs"] in page_data["user_permissions"]:
subpage = "logs"
elif (
page_data["permissions"]["Schedule"]
in page_data["user_permissions"]
):
subpage = "schedules"
elif (
page_data["permissions"]["Backup"] in page_data["user_permissions"]
):
subpage = "backup"
elif page_data["permissions"]["Files"] in page_data["user_permissions"]:
subpage = "files"
elif (
page_data["permissions"]["Config"] in page_data["user_permissions"]
):
subpage = "config"
elif (
page_data["permissions"]["Players"] in page_data["user_permissions"]
):
subpage = "admin_controls"
else:
self.redirect("/panel/error?error=Unauthorized access to Server")
logger.debug(f'Subpage: "{subpage}"')
if subpage == "term":
if (
not page_data["permissions"]["Terminal"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect(
"/panel/error?error=Unauthorized access to Terminal"
)
return
if subpage == "logs":
if (
not page_data["permissions"]["Logs"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect("/panel/error?error=Unauthorized access to Logs")
return
if subpage == "schedules":
if (
not page_data["permissions"]["Schedule"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect(
"/panel/error?error=Unauthorized access To Schedules"
)
return
page_data["schedules"] = HelpersManagement.get_schedules_by_server(
server_id
)
if subpage == "config":
if (
not page_data["permissions"]["Config"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect(
"/panel/error?error=Unauthorized access Server Config"
)
return
if subpage == "files":
if (
not page_data["permissions"]["Files"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect("/panel/error?error=Unauthorized access Files")
return
if subpage == "backup":
if (
not page_data["permissions"]["Backup"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect(
"/panel/error?error=Unauthorized access to Backups"
)
return
server_info = self.controller.servers.get_server_data_by_id(server_id)
page_data[
"backup_config"
] = self.controller.management.get_backup_config(server_id)
exclusions = []
page_data[
"exclusions"
] = self.controller.management.get_excluded_backup_dirs(server_id)
page_data["backing_up"] = self.controller.get_server_obj(
server_id
).is_backingup
page_data["backup_stats"] = self.controller.get_server_obj(
server_id
).send_backup_status()
# makes it so relative path is the only thing shown
for file in page_data["exclusions"]:
if Helpers.is_os_windows():
exclusions.append(file.replace(server_info["path"] + "\\", ""))
else:
exclusions.append(file.replace(server_info["path"] + "/", ""))
page_data["exclusions"] = exclusions
self.controller.refresh_server_settings(server_id)
try:
page_data["backup_list"] = server.list_backups()
except:
page_data["backup_list"] = []
page_data["backup_path"] = Helpers.wtol_path(server_info["backup_path"])
def get_banned_players_html():
banned_players = self.controller.servers.get_banned_players(server_id)
if banned_players is None:
return """
<li class="playerItem banned">
<h3>Error while reading banned-players.json</h3>
</li>
"""
html = ""
for player in banned_players:
html += f"""
<li class="playerItem banned">
<h3>{player['name']}</h3>
<span>Banned by {player['source']} for reason: {player['reason']}</span>
<button onclick="send_command_to_server('pardon {player['name']}')" type="button" class="btn btn-danger">Unban</button>
</li>
"""
return html
if subpage == "admin_controls":
if (
not page_data["permissions"]["Players"]
in page_data["user_permissions"]
):
if not superuser:
self.redirect("/panel/error?error=Unauthorized access")
page_data["banned_players"] = get_banned_players_html()
template = f"panel/server_{subpage}.html"
elif page == "download_backup":
file = self.get_argument("file", "")
server_id = self.check_server_id()
if server_id is None:
return
server_info = self.controller.servers.get_server_data_by_id(server_id)
backup_file = os.path.abspath(
os.path.join(
Helpers.get_os_understandable_path(server_info["backup_path"]), file
)
)
if not Helpers.in_path(
Helpers.get_os_understandable_path(server_info["backup_path"]),
backup_file,
) or not os.path.isfile(backup_file):
self.redirect("/panel/error?error=Invalid path detected")
return
self.download_file(file, backup_file)
self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup")
elif page == "panel_config":
auth_servers = {}
auth_role_servers = {}
roles = self.controller.roles.get_all_roles()
user_roles = {}
for user in self.controller.users.get_all_users():
user_roles_list = self.controller.users.get_user_roles_names(
user.user_id
)
user_servers = self.controller.servers.get_authorized_servers(
user.user_id
)
servers = []
for server in user_servers:
if server["server_name"] not in servers:
servers.append(server["server_name"])
new_item = {user.user_id: servers}
auth_servers.update(new_item)
data = {user.user_id: user_roles_list}
user_roles.update(data)
for role in roles:
role_servers = []
role = self.controller.roles.get_role_with_servers(role.role_id)
for serv_id in role["servers"]:
role_servers.append(
self.controller.servers.get_server_data_by_id(serv_id)[
"server_name"
]
)
data = {role["role_id"]: role_servers}
auth_role_servers.update(data)
page_data["auth-servers"] = auth_servers
page_data["role-servers"] = auth_role_servers
page_data["user-roles"] = user_roles
page_data["users"] = self.controller.users.user_query(exec_user["user_id"])
page_data["roles"] = self.controller.users.user_role_query(
exec_user["user_id"]
)
for user in page_data["users"]:
if user.user_id != exec_user["user_id"]:
user.api_token = "********"
if superuser:
for user in self.controller.users.get_all_users():
if user.superuser:
super_auth_servers = ["Super User Access To All Servers"]
page_data["users"] = self.controller.users.get_all_users()
page_data["roles"] = self.controller.roles.get_all_roles()
page_data["auth-servers"][user.user_id] = super_auth_servers
template = "panel/panel_config.html"
elif page == "add_user":
page_data["new_user"] = True
page_data["user"] = {}
page_data["user"]["username"] = ""
page_data["user"]["user_id"] = -1
page_data["user"]["email"] = ""
page_data["user"]["enabled"] = True
page_data["user"]["superuser"] = False
page_data["user"]["created"] = "N/A"
page_data["user"]["last_login"] = "N/A"
page_data["user"]["last_ip"] = "N/A"
page_data["user"]["last_update"] = "N/A"
page_data["user"]["roles"] = set()
page_data["user"]["hints"] = True
if EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
self.redirect(
"/panel/error?error=Unauthorized access: not a user editor"
)
return
page_data["roles_all"] = self.controller.roles.get_all_roles()
page_data["servers"] = []
page_data["servers_all"] = self.controller.list_defined_servers()
page_data["role-servers"] = []
page_data[
"permissions_all"
] = self.controller.crafty_perms.list_defined_crafty_permissions()
page_data["permissions_list"] = set()
page_data[
"quantity_server"
] = (
self.controller.crafty_perms.list_all_crafty_permissions_quantity_limits() # pylint: disable=line-too-long
)
page_data["languages"] = []
page_data["languages"].append(
self.controller.users.get_user_lang_by_id(exec_user["user_id"])
)
if superuser:
page_data["super-disabled"] = ""
else:
page_data["super-disabled"] = "disabled"
for file in sorted(
os.listdir(os.path.join(self.helper.root_dir, "app", "translations"))
):
if file.endswith(".json"):
if file not in self.helper.get_setting("disabled_language_files"):
if file != str(page_data["languages"][0] + ".json"):
page_data["languages"].append(file.split(".")[0])
template = "panel/panel_edit_user.html"
elif page == "add_schedule":
server_id = self.get_argument("id", None)
page_data["schedules"] = HelpersManagement.get_schedules_by_server(
server_id
)
page_data["get_players"] = lambda: self.controller.stats.get_server_players(
server_id
)
page_data["active_link"] = "schedules"
page_data["permissions"] = {
"Commands": EnumPermissionsServer.COMMANDS,
"Terminal": EnumPermissionsServer.TERMINAL,
"Logs": EnumPermissionsServer.LOGS,
"Schedule": EnumPermissionsServer.SCHEDULE,
"Backup": EnumPermissionsServer.BACKUP,
"Files": EnumPermissionsServer.FILES,
"Config": EnumPermissionsServer.CONFIG,
"Players": EnumPermissionsServer.PLAYERS,
}
page_data[
"user_permissions"
] = self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
)
page_data["server_data"] = self.controller.servers.get_server_data_by_id(
server_id
)
page_data["server_stats"] = self.controller.servers.get_server_stats_by_id(
server_id
)
page_data["server_stats"][
"server_type"
] = self.controller.servers.get_server_type_by_id(server_id)
page_data["new_schedule"] = True
page_data["schedule"] = {}
page_data["schedule"]["children"] = []
page_data["schedule"]["server_id"] = server_id
page_data["schedule"]["schedule_id"] = ""
page_data["schedule"]["action"] = ""
page_data["schedule"]["enabled"] = True
page_data["schedule"]["command"] = ""
page_data["schedule"]["one_time"] = False
page_data["schedule"]["cron_string"] = ""
page_data["schedule"]["time"] = ""
page_data["schedule"]["interval"] = ""
# we don't need to check difficulty here.
# We'll just default to basic for new schedules
page_data["schedule"]["difficulty"] = "basic"
page_data["schedule"]["interval_type"] = "days"
if not EnumPermissionsServer.SCHEDULE in page_data["user_permissions"]:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access To Schedules")
return
template = "panel/server_schedule_edit.html"
elif page == "edit_schedule":
server_id = self.check_server_id()
if not server_id:
return
page_data["schedules"] = HelpersManagement.get_schedules_by_server(
server_id
)
sch_id = self.get_argument("sch_id", None)
if sch_id is None:
self.redirect("/panel/error?error=Invalid Schedule ID")
schedule = self.controller.management.get_scheduled_task_model(sch_id)
page_data["get_players"] = lambda: self.controller.stats.get_server_players(
server_id
)
page_data["active_link"] = "schedules"
page_data["permissions"] = {
"Commands": EnumPermissionsServer.COMMANDS,
"Terminal": EnumPermissionsServer.TERMINAL,
"Logs": EnumPermissionsServer.LOGS,
"Schedule": EnumPermissionsServer.SCHEDULE,
"Backup": EnumPermissionsServer.BACKUP,
"Files": EnumPermissionsServer.FILES,
"Config": EnumPermissionsServer.CONFIG,
"Players": EnumPermissionsServer.PLAYERS,
}
page_data[
"user_permissions"
] = self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
)
page_data["server_data"] = self.controller.servers.get_server_data_by_id(
server_id
)
page_data["server_stats"] = self.controller.servers.get_server_stats_by_id(
server_id
)
page_data["server_stats"][
"server_type"
] = self.controller.servers.get_server_type_by_id(server_id)
page_data["new_schedule"] = False
page_data["schedule"] = {}
page_data["schedule"]["server_id"] = server_id
page_data["schedule"]["schedule_id"] = schedule.schedule_id
page_data["schedule"]["action"] = schedule.action
page_data["schedule"][
"children"
] = self.controller.management.get_child_schedules(sch_id)
# We check here to see if the command is any of the default ones.
# We do not want a user changing to a custom command
# and seeing our command there.
if (
schedule.action != "start"
or schedule.action != "stop"
or schedule.action != "restart"
or schedule.action != "backup"
):
page_data["schedule"]["command"] = schedule.command
else:
page_data["schedule"]["command"] = ""
page_data["schedule"]["enabled"] = schedule.enabled
page_data["schedule"]["one_time"] = schedule.one_time
page_data["schedule"]["cron_string"] = schedule.cron_string
page_data["schedule"]["time"] = schedule.start_time
page_data["schedule"]["interval"] = schedule.interval
page_data["schedule"]["interval_type"] = schedule.interval_type
if schedule.interval_type == "reaction":
difficulty = "reaction"
elif schedule.cron_string == "":
difficulty = "basic"
else:
difficulty = "advanced"
page_data["schedule"]["difficulty"] = difficulty
if not EnumPermissionsServer.SCHEDULE in page_data["user_permissions"]:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access To Schedules")
return
template = "panel/server_schedule_edit.html"
elif page == "edit_user":
user_id = self.get_argument("id", None)
role_servers = self.controller.servers.get_authorized_servers(user_id)
page_role_servers = []
for server in role_servers:
page_role_servers.append(server["server_id"])
page_data["new_user"] = False
page_data["user"] = self.controller.users.get_user_by_id(user_id)
page_data["servers"] = set()
page_data["role-servers"] = page_role_servers
page_data["roles_all"] = self.controller.roles.get_all_roles()
page_data["servers_all"] = self.controller.list_defined_servers()
page_data[
"permissions_all"
] = self.controller.crafty_perms.list_defined_crafty_permissions()
page_data[
"permissions_list"
] = self.controller.crafty_perms.get_crafty_permissions_list(user_id)
page_data[
"quantity_server"
] = self.controller.crafty_perms.list_crafty_permissions_quantity_limits(
user_id
)
page_data["languages"] = []
page_data["languages"].append(
self.controller.users.get_user_lang_by_id(user_id)
)
# checks if super user. If not we disable the button.
if superuser and str(exec_user["user_id"]) != str(user_id):
page_data["super-disabled"] = ""
else:
page_data["super-disabled"] = "disabled"
for file in sorted(
os.listdir(os.path.join(self.helper.root_dir, "app", "translations"))
):
if file.endswith(".json"):
if file not in self.helper.get_setting("disabled_language_files"):
if file != str(page_data["languages"][0] + ".json"):
page_data["languages"].append(file.split(".")[0])
if user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
elif EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
if str(user_id) != str(exec_user["user_id"]):
self.redirect(
"/panel/error?error=Unauthorized access: not a user editor"
)
return
page_data["servers"] = []
page_data["role-servers"] = []
page_data["roles_all"] = []
page_data["servers_all"] = []
if exec_user["user_id"] != page_data["user"]["user_id"]:
page_data["user"]["api_token"] = "********"
if exec_user["email"] == "default@example.com":
page_data["user"]["email"] = ""
template = "panel/panel_edit_user.html"
elif page == "edit_user_apikeys":
user_id = self.get_argument("id", None)
page_data["user"] = self.controller.users.get_user_by_id(user_id)
page_data["api_keys"] = self.controller.users.get_user_api_keys(user_id)
# self.controller.crafty_perms.list_defined_crafty_permissions()
page_data[
"server_permissions_all"
] = self.controller.server_perms.list_defined_permissions()
page_data[
"crafty_permissions_all"
] = self.controller.crafty_perms.list_defined_crafty_permissions()
if user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
template = "panel/panel_edit_user_apikeys.html"
elif page == "remove_user":
user_id = bleach.clean(self.get_argument("id", None))
if (
not superuser
and EnumPermissionsCrafty.USER_CONFIG
not in exec_user_crafty_permissions
):
self.redirect("/panel/error?error=Unauthorized access: not superuser")
return
elif str(exec_user["user_id"]) == str(user_id):
self.redirect(
"/panel/error?error=Unauthorized access: you cannot delete yourself"
)
return
elif user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
else:
# does this user id exist?
target_user = self.controller.users.get_user_by_id(user_id)
if not target_user:
self.redirect("/panel/error?error=Invalid User ID")
return
elif target_user["superuser"]:
self.redirect("/panel/error?error=Cannot remove a superuser")
return
self.controller.users.remove_user(user_id)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Removed user {target_user['username']} (UID:{user_id})",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
elif page == "add_role":
user_roles = self.get_user_roles()
page_data["new_role"] = True
page_data["role"] = {}
page_data["role"]["role_name"] = ""
page_data["role"]["role_id"] = -1
page_data["role"]["created"] = "N/A"
page_data["role"]["last_update"] = "N/A"
page_data["role"]["servers"] = set()
page_data["user-roles"] = user_roles
page_data["users"] = self.controller.users.get_all_users()
if EnumPermissionsCrafty.ROLES_CONFIG not in exec_user_crafty_permissions:
self.redirect(
"/panel/error?error=Unauthorized access: not a role editor"
)
return
page_data["servers_all"] = self.controller.list_defined_servers()
page_data[
"permissions_all"
] = self.controller.server_perms.list_defined_permissions()
page_data["permissions_list"] = set()
template = "panel/panel_edit_role.html"
elif page == "edit_role":
user_roles = self.get_user_roles()
page_data["new_role"] = False
role_id = self.get_argument("id", None)
page_data["role"] = self.controller.roles.get_role_with_servers(role_id)
page_data["servers_all"] = self.controller.list_defined_servers()
page_data[
"permissions_all"
] = self.controller.server_perms.list_defined_permissions()
page_data[
"permissions_list"
] = self.controller.server_perms.get_role_permissions(role_id)
page_data["user-roles"] = user_roles
page_data["users"] = self.controller.users.get_all_users()
if EnumPermissionsCrafty.ROLES_CONFIG not in exec_user_crafty_permissions:
self.redirect(
"/panel/error?error=Unauthorized access: not a role editor"
)
return
elif role_id is None:
self.redirect("/panel/error?error=Invalid Role ID")
return
template = "panel/panel_edit_role.html"
elif page == "remove_role":
role_id = bleach.clean(self.get_argument("id", None))
if not superuser:
self.redirect("/panel/error?error=Unauthorized access: not superuser")
return
elif role_id is None:
self.redirect("/panel/error?error=Invalid Role ID")
return
else:
# does this user id exist?
target_role = self.controller.roles.get_role(role_id)
if not target_role:
self.redirect("/panel/error?error=Invalid Role ID")
return
self.controller.roles.remove_role(role_id)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Removed role {target_role['role_name']} (RID:{role_id})",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
elif page == "activity_logs":
page_data["audit_logs"] = self.controller.management.get_actity_log()
template = "panel/activity_logs.html"
elif page == "download_file":
file = Helpers.get_os_understandable_path(self.get_argument("path", ""))
name = self.get_argument("name", "")
server_id = self.check_server_id()
if server_id is None:
return
server_info = self.controller.servers.get_server_data_by_id(server_id)
if not Helpers.in_path(
Helpers.get_os_understandable_path(server_info["path"]), file
) or not os.path.isfile(file):
self.redirect("/panel/error?error=Invalid path detected")
return
self.download_file(name, file)
self.redirect(f"/panel/server_detail?id={server_id}&subpage=files")
elif page == "download_support_package":
temp_zip_storage = exec_user["support_logs"]
# We'll reset the support path for this user now.
self.controller.users.set_support_path(exec_user["user_id"], "")
self.set_header("Content-Type", "application/octet-stream")
self.set_header(
"Content-Disposition", "attachment; filename=" + "support_logs.zip"
)
chunk_size = 1024 * 1024 * 4 # 4 MiB
if temp_zip_storage != "":
with open(temp_zip_storage, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
try:
self.write(chunk) # write the chunk to response
self.flush() # send the chunk to client
except iostream.StreamClosedError:
# this means the client has closed the connection
# so break the loop
break
finally:
# deleting the chunk is very important because
# if many clients are downloading files at the
# same time, the chunks in memory will keep
# increasing and will eat up the RAM
del chunk
self.redirect("/panel/dashboard")
else:
self.redirect("/panel/error?error=No path found for support logs")
return
elif page == "support_logs":
logger.info(
f"Support logs requested. "
f"Packinging logs for user with ID: {exec_user['user_id']}"
)
logs_thread = threading.Thread(
target=self.controller.package_support_logs,
daemon=True,
args=(exec_user,),
name=f"{exec_user['user_id']}_logs_thread",
)
logs_thread.start()
self.redirect("/panel/dashboard")
return
self.render(
template,
data=page_data,
time=time,
utc_offset=(time.timezone * -1 / 60 / 60),
translate=self.translator.translate,
)
@tornado.web.authenticated
def post(self, page):
api_key, _token_data, exec_user = self.current_user
superuser = exec_user["superuser"]
if api_key is not None:
superuser = superuser and api_key.superuser
server_id = self.get_argument("id", None)
permissions = {
"Commands": EnumPermissionsServer.COMMANDS,
"Terminal": EnumPermissionsServer.TERMINAL,
"Logs": EnumPermissionsServer.LOGS,
"Schedule": EnumPermissionsServer.SCHEDULE,
"Backup": EnumPermissionsServer.BACKUP,
"Files": EnumPermissionsServer.FILES,
"Config": EnumPermissionsServer.CONFIG,
"Players": EnumPermissionsServer.PLAYERS,
}
if superuser:
# defined_servers = self.controller.list_defined_servers()
exec_user_role = {"Super User"}
exec_user_crafty_permissions = (
self.controller.crafty_perms.list_defined_crafty_permissions()
)
else:
exec_user_crafty_permissions = (
self.controller.crafty_perms.get_crafty_permissions_list(
exec_user["user_id"]
)
)
# defined_servers =
# self.controller.servers.get_authorized_servers(exec_user["user_id"])
exec_user_role = set()
for r in exec_user["roles"]:
role = self.controller.roles.get_role(r)
exec_user_role.add(role["role_name"])
if page == "server_detail":
if not permissions[
"Config"
] in self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
):
if not superuser:
self.redirect("/panel/error?error=Unauthorized access to Config")
return
server_name = self.get_argument("server_name", None)
server_obj = self.controller.servers.get_server_obj(server_id)
if superuser:
server_path = self.get_argument("server_path", None)
if Helpers.is_os_windows():
server_path.replace(" ", "^ ")
server_path = Helpers.wtol_path(server_path)
log_path = self.get_argument("log_path", None)
if Helpers.is_os_windows():
log_path.replace(" ", "^ ")
log_path = Helpers.wtol_path(log_path)
executable = self.get_argument("executable", None)
execution_command = self.get_argument("execution_command", None)
server_ip = self.get_argument("server_ip", None)
server_port = self.get_argument("server_port", None)
executable_update_url = self.get_argument("executable_update_url", None)
else:
execution_command = server_obj.execution_command
executable = server_obj.executable
stop_command = self.get_argument("stop_command", None)
auto_start_delay = self.get_argument("auto_start_delay", "10")
auto_start = int(float(self.get_argument("auto_start", "0")))
crash_detection = int(float(self.get_argument("crash_detection", "0")))
logs_delete_after = int(float(self.get_argument("logs_delete_after", "0")))
# subpage = self.get_argument('subpage', None)
server_id = self.check_server_id()
if server_id is None:
return
server_obj = self.controller.servers.get_server_obj(server_id)
stale_executable = server_obj.executable
# Compares old jar name to page data being passed.
# If they are different we replace the executable name in the
if str(stale_executable) != str(executable):
execution_command = execution_command.replace(
str(stale_executable), str(executable)
)
server_obj.server_name = server_name
if superuser:
if Helpers.validate_traversal(
self.helper.get_servers_root_dir(), server_path
):
server_obj.path = server_path
server_obj.log_path = log_path
if Helpers.validate_traversal(
self.helper.get_servers_root_dir(), executable
):
server_obj.executable = executable
server_obj.execution_command = execution_command
server_obj.server_ip = server_ip
server_obj.server_port = server_port
server_obj.executable_update_url = executable_update_url
else:
server_obj.path = server_obj.path
server_obj.log_path = server_obj.log_path
server_obj.executable = server_obj.executable
server_obj.execution_command = server_obj.execution_command
server_obj.server_ip = server_obj.server_ip
server_obj.server_port = server_obj.server_port
server_obj.executable_update_url = server_obj.executable_update_url
server_obj.stop_command = stop_command
server_obj.auto_start_delay = auto_start_delay
server_obj.auto_start = auto_start
server_obj.crash_detection = crash_detection
server_obj.logs_delete_after = logs_delete_after
self.controller.servers.update_server(server_obj)
self.controller.crash_detection(server_obj)
self.controller.refresh_server_settings(server_id)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited server {server_id} named {server_name}",
server_id,
self.get_remote_ip(),
)
self.redirect(f"/panel/server_detail?id={server_id}&subpage=config")
if page == "server_backup":
logger.debug(self.request.arguments)
server_id = self.check_server_id()
if not server_id:
return
if (
not permissions["Backup"]
in self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
)
and not superuser
):
self.redirect(
"/panel/error?error=Unauthorized access: User not authorized"
)
return
server_obj = self.controller.servers.get_server_obj(server_id)
compress = self.get_argument("compress", False)
check_changed = self.get_argument("changed")
if str(check_changed) == str(1):
checked = self.get_body_arguments("root_path")
else:
checked = self.controller.management.get_excluded_backup_dirs(server_id)
if superuser:
backup_path = bleach.clean(self.get_argument("backup_path", None))
if Helpers.is_os_windows():
backup_path.replace(" ", "^ ")
backup_path = Helpers.wtol_path(backup_path)
else:
backup_path = server_obj.backup_path
max_backups = bleach.clean(self.get_argument("max_backups", None))
server_obj = self.controller.servers.get_server_obj(server_id)
server_obj.backup_path = backup_path
self.controller.servers.update_server(server_obj)
self.controller.management.set_backup_config(
server_id,
max_backups=max_backups,
excluded_dirs=checked,
compress=bool(compress),
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited server {server_id}: updated backups",
server_id,
self.get_remote_ip(),
)
self.tasks_manager.reload_schedule_from_db()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup")
if page == "new_schedule":
server_id = self.check_server_id()
if not server_id:
return
if (
not permissions["Schedule"]
in self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
)
and not superuser
):
self.redirect(
"/panel/error?error=Unauthorized access: User not authorized"
)
return
difficulty = bleach.clean(self.get_argument("difficulty", None))
server_obj = self.controller.servers.get_server_obj(server_id)
enabled = bleach.clean(self.get_argument("enabled", "0"))
if difficulty == "basic":
action = bleach.clean(self.get_argument("action", None))
interval = bleach.clean(self.get_argument("interval", None))
interval_type = bleach.clean(self.get_argument("interval_type", None))
# only check for time if it's number of days
if interval_type == "days":
sch_time = bleach.clean(self.get_argument("time", None))
if action == "command":
command = bleach.clean(self.get_argument("command", None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
elif difficulty == "reaction":
interval_type = "reaction"
action = bleach.clean(self.get_argument("action", None))
delay = bleach.clean(self.get_argument("delay", None))
parent = bleach.clean(self.get_argument("parent", None))
if action == "command":
command = bleach.clean(self.get_argument("command", None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
else:
interval_type = ""
cron_string = bleach.clean(self.get_argument("cron", ""))
try:
CronValidator.parse(cron_string)
except Exception as e:
self.redirect(
f"/panel/error?error=INVALID FORMAT: Invalid Cron Format. {e}"
)
return
action = bleach.clean(self.get_argument("action", None))
if action == "command":
command = bleach.clean(self.get_argument("command", None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
if bleach.clean(self.get_argument("enabled", "0")) == "1":
enabled = True
else:
enabled = False
if bleach.clean(self.get_argument("one_time", "0")) == "1":
one_time = True
else:
one_time = False
if interval_type == "days":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"start_time": sch_time,
"enabled": enabled,
"one_time": one_time,
"cron_string": "",
"parent": None,
"delay": 0,
}
elif difficulty == "reaction":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": "",
# We'll base every interval off of a midnight start time.
"start_time": "",
"command": command,
"cron_string": "",
"enabled": enabled,
"one_time": one_time,
"parent": parent,
"delay": delay,
}
elif difficulty == "advanced":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": "",
"interval": "",
# We'll base every interval off of a midnight start time.
"start_time": "",
"command": command,
"cron_string": cron_string,
"enabled": enabled,
"one_time": one_time,
"parent": None,
"delay": 0,
}
else:
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"enabled": enabled,
# We'll base every interval off of a midnight start time.
"start_time": "00:00",
"one_time": one_time,
"cron_string": "",
"parent": None,
"delay": 0,
}
self.tasks_manager.schedule_job(job_data)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited server {server_id}: added scheduled job",
server_id,
self.get_remote_ip(),
)
self.tasks_manager.reload_schedule_from_db()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=schedules")
if page == "edit_schedule":
server_id = self.check_server_id()
if not server_id:
return
if (
not permissions["Schedule"]
in self.controller.server_perms.get_user_id_permissions_list(
exec_user["user_id"], server_id
)
and not superuser
):
self.redirect(
"/panel/error?error=Unauthorized access: User not authorized"
)
return
sch_id = self.get_argument("sch_id", None)
if sch_id is None:
self.redirect("/panel/error?error=Invalid Schedule ID")
difficulty = bleach.clean(self.get_argument("difficulty", None))
server_obj = self.controller.servers.get_server_obj(server_id)
enabled = bleach.clean(self.get_argument("enabled", "0"))
if difficulty == "basic":
action = bleach.clean(self.get_argument("action", None))
interval = bleach.clean(self.get_argument("interval", None))
interval_type = bleach.clean(self.get_argument("interval_type", None))
# only check for time if it's number of days
if interval_type == "days":
sch_time = bleach.clean(self.get_argument("time", None))
if action == "command":
command = bleach.clean(self.get_argument("command", None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
elif difficulty == "reaction":
interval_type = "reaction"
action = bleach.clean(self.get_argument("action", None))
delay = bleach.clean(self.get_argument("delay", None))
parent = bleach.clean(self.get_argument("parent", None))
if action == "command":
command = bleach.clean(self.get_argument("command", None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
parent = bleach.clean(self.get_argument("parent", None))
else:
interval_type = ""
cron_string = bleach.clean(self.get_argument("cron", ""))
try:
CronValidator.parse(cron_string)
except Exception as e:
self.redirect(
f"/panel/error?error=INVALID FORMAT: Invalid Cron Format. {e}"
)
return
action = bleach.clean(self.get_argument("action", None))
if action == "command":
command = bleach.clean(self.get_argument("command", None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
if bleach.clean(self.get_argument("enabled", "0")) == "1":
enabled = True
else:
enabled = False
if bleach.clean(self.get_argument("one_time", "0")) == "1":
one_time = True
else:
one_time = False
if interval_type == "days":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"start_time": sch_time,
"enabled": enabled,
"one_time": one_time,
"cron_string": "",
"parent": None,
"delay": 0,
}
elif difficulty == "advanced":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": "",
"interval": "",
# We'll base every interval off of a midnight start time.
"start_time": "",
"command": command,
"cron_string": cron_string,
"delay": "",
"parent": "",
"enabled": enabled,
"one_time": one_time,
}
elif difficulty == "reaction":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": "",
# We'll base every interval off of a midnight start time.
"start_time": "",
"command": command,
"cron_string": "",
"enabled": enabled,
"one_time": one_time,
"parent": parent,
"delay": delay,
}
else:
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"enabled": enabled,
# We'll base every interval off of a midnight start time.
"start_time": "00:00",
"delay": "",
"parent": "",
"one_time": one_time,
"cron_string": "",
}
self.tasks_manager.update_job(sch_id, job_data)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited server {server_id}: updated schedule",
server_id,
self.get_remote_ip(),
)
self.tasks_manager.reload_schedule_from_db()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=schedules")
elif page == "edit_user":
if bleach.clean(self.get_argument("username", None)) == "system":
self.redirect(
"/panel/error?error=Unauthorized access: "
"system user is not editable"
)
user_id = bleach.clean(self.get_argument("id", None))
username = bleach.clean(self.get_argument("username", None))
password0 = bleach.clean(self.get_argument("password0", None))
password1 = bleach.clean(self.get_argument("password1", None))
email = bleach.clean(self.get_argument("email", "default@example.com"))
enabled = int(float(self.get_argument("enabled", "0")))
try:
hints = int(bleach.clean(self.get_argument("hints")))
hints = True
except:
hints = False
lang = bleach.clean(
self.get_argument("language"), self.helper.get_setting("language")
)
if superuser:
# Checks if user is trying to change super user status of self.
# We don't want that. Automatically make them stay super user
# since we know they are.
if str(exec_user["user_id"]) != str(user_id):
superuser = bleach.clean(self.get_argument("superuser", "0"))
else:
superuser = "1"
else:
superuser = "0"
if superuser == "1":
superuser = True
else:
superuser = False
if not exec_user["superuser"]:
if (
EnumPermissionsCrafty.USER_CONFIG
not in exec_user_crafty_permissions
):
if str(user_id) != str(exec_user["user_id"]):
self.redirect(
"/panel/error?error=Unauthorized access: not a user editor"
)
return
user_data = {
"username": username,
"password": password0,
"email": email,
"lang": lang,
"hints": hints,
}
self.controller.users.update_user(user_id, user_data=user_data)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited user {username} (UID:{user_id}) password",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
return
elif username is None or username == "":
self.redirect("/panel/error?error=Invalid username")
return
elif user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
else:
# does this user id exist?
if not self.controller.users.user_id_exists(user_id):
self.redirect("/panel/error?error=Invalid User ID")
return
else:
if password0 != password1:
self.redirect("/panel/error?error=Passwords must match")
return
roles = self.get_user_role_memberships()
permissions_mask, server_quantity = self.get_perms_quantity()
# if email is None or "":
# email = "default@example.com"
user_data = {
"username": username,
"password": password0,
"email": email,
"enabled": enabled,
"roles": roles,
"lang": lang,
"superuser": superuser,
"hints": hints,
}
user_crafty_data = {
"permissions_mask": permissions_mask,
"server_quantity": server_quantity,
}
self.controller.users.update_user(
user_id, user_data=user_data, user_crafty_data=user_crafty_data
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited user {username} (UID:{user_id}) with roles {roles} "
f"and permissions {permissions_mask}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
elif page == "edit_user_apikeys":
user_id = self.get_argument("id", None)
name = self.get_argument("name", None)
superuser = self.get_argument("superuser", None) == "1"
if name is None or name == "":
self.redirect("/panel/error?error=Invalid API key name")
return
elif user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
else:
# does this user id exist?
if not self.controller.users.user_id_exists(user_id):
self.redirect("/panel/error?error=Invalid User ID")
return
crafty_permissions_mask = self.get_perms()
server_permissions_mask = self.get_perms_server()
self.controller.users.add_user_api_key(
name,
user_id,
superuser,
server_permissions_mask,
crafty_permissions_mask,
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Added API key {name} with crafty permissions "
f"{crafty_permissions_mask}"
f" and {server_permissions_mask} for user with UID: {user_id}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect(f"/panel/edit_user_apikeys?id={user_id}")
elif page == "get_token":
key_id = self.get_argument("id", None)
if key_id is None:
self.redirect("/panel/error?error=Invalid Key ID")
return
else:
key = self.controller.users.get_user_api_key(key_id)
# does this user id exist?
if key is None:
self.redirect("/panel/error?error=Invalid Key ID")
return
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Generated a new API token for the key {key.name} "
f"from user with UID: {key.user_id}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.write(
self.controller.authentication.generate(
key.user_id_id, {"token_id": key.token_id}
)
)
self.finish()
elif page == "add_user":
if bleach.clean(self.get_argument("username", None)).lower() == "system":
self.redirect(
"/panel/error?error=Unauthorized access: "
"username system is reserved for the Crafty system."
" Please choose a different username."
)
return
username = bleach.clean(self.get_argument("username", None))
password0 = bleach.clean(self.get_argument("password0", None))
password1 = bleach.clean(self.get_argument("password1", None))
email = bleach.clean(self.get_argument("email", "default@example.com"))
enabled = int(float(self.get_argument("enabled", "0")))
hints = True
lang = bleach.clean(
self.get_argument("lang", self.helper.get_setting("language"))
)
# We don't want a non-super user to be able to create a super user.
if superuser:
new_superuser = bleach.clean(self.get_argument("superuser", "0"))
else:
new_superuser = "0"
if superuser == "1":
new_superuser = True
else:
new_superuser = False
if EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
self.redirect(
"/panel/error?error=Unauthorized access: not a user editor"
)
return
elif username is None or username == "":
self.redirect("/panel/error?error=Invalid username")
return
else:
# does this user id exist?
if self.controller.users.get_id_by_name(username) is not None:
self.redirect("/panel/error?error=User exists")
return
if password0 != password1:
self.redirect("/panel/error?error=Passwords must match")
return
roles = self.get_user_role_memberships()
permissions_mask, server_quantity = self.get_perms_quantity()
user_id = self.controller.users.add_user(
username,
password=password0,
email=email,
enabled=enabled,
superuser=new_superuser,
)
user_data = {"roles": roles, "lang": lang, "hints": True}
user_crafty_data = {
"permissions_mask": permissions_mask,
"server_quantity": server_quantity,
}
self.controller.users.update_user(
user_id, user_data=user_data, user_crafty_data=user_crafty_data
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Added user {username} (UID:{user_id})",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited user {username} (UID:{user_id}) with roles {roles}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
elif page == "edit_role":
role_id = bleach.clean(self.get_argument("id", None))
role_name = bleach.clean(self.get_argument("role_name", None))
if EnumPermissionsCrafty.ROLES_CONFIG not in exec_user_crafty_permissions:
self.redirect(
"/panel/error?error=Unauthorized access: not a role editor"
)
return
elif role_name is None or role_name == "":
self.redirect("/panel/error?error=Invalid username")
return
elif role_id is None:
self.redirect("/panel/error?error=Invalid Role ID")
return
else:
# does this user id exist?
if not self.controller.roles.role_id_exists(role_id):
self.redirect("/panel/error?error=Invalid Role ID")
return
servers = self.get_role_servers()
permissions_mask = self.get_perms_server()
role_data = {"role_name": role_name, "servers": servers}
self.controller.roles.update_role(
role_id, role_data=role_data, permissions_mask=permissions_mask
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited role {role_name} (RID:{role_id}) with servers {servers}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
elif page == "add_role":
role_name = bleach.clean(self.get_argument("role_name", None))
if EnumPermissionsCrafty.ROLES_CONFIG not in exec_user_crafty_permissions:
self.redirect(
"/panel/error?error=Unauthorized access: not a role editor"
)
return
elif role_name is None or role_name == "":
self.redirect("/panel/error?error=Invalid role name")
return
else:
# does this user id exist?
if self.controller.roles.get_roleid_by_name(role_name) is not None:
self.redirect("/panel/error?error=Role exists")
return
servers = self.get_role_servers()
permissions_mask = self.get_perms_server()
role_id = self.controller.roles.add_role(role_name)
self.controller.roles.update_role(
role_id, {"servers": servers}, permissions_mask
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Added role {role_name} (RID:{role_id})",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Edited role {role_name} (RID:{role_id}) with servers {servers}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
else:
self.set_status(404)
page_data = {
"lang": self.helper.get_setting("language"),
"lang_page": Helpers.get_lang_page(self.helper.get_setting("language")),
}
self.render(
"public/404.html", translate=self.translator.translate, data=page_data
)
@tornado.web.authenticated
def delete(self, page):
api_key, _token_data, exec_user = self.current_user
superuser = exec_user["superuser"]
if api_key is not None:
superuser = superuser and api_key.superuser
page_data = {
# todo: make this actually pull and compare version data
"update_available": False,
"version_data": self.helper.get_version_string(),
"user_data": exec_user,
"hosts_data": self.controller.management.get_latest_hosts_stats(),
"show_contribute": self.helper.get_setting("show_contribute_link", True),
"lang": self.controller.users.get_user_lang_by_id(exec_user["user_id"]),
"lang_page": Helpers.get_lang_page(
self.controller.users.get_user_lang_by_id(exec_user["user_id"])
),
}
if page == "remove_apikey":
key_id = bleach.clean(self.get_argument("id", None))
if not superuser:
self.redirect("/panel/error?error=Unauthorized access: not superuser")
return
elif (
key_id is None or self.controller.users.get_user_api_key(key_id) is None
):
self.redirect("/panel/error?error=Invalid Key ID")
return
else:
# does this user id exist?
target_key = self.controller.users.get_user_api_key(key_id)
if not target_key:
self.redirect("/panel/error?error=Invalid Key ID")
return
self.controller.users.delete_user_api_key(key_id)
self.controller.management.add_to_audit_log(
exec_user["user_id"],
f"Removed API key {target_key} "
f"(ID: {key_id}) from user {exec_user['user_id']}",
server_id=0,
source_ip=self.get_remote_ip(),
)
self.redirect("/panel/panel_config")
else:
self.set_status(404)
self.render(
"public/404.html",
data=page_data,
translate=self.translator.translate,
)