crafty-4/app/classes/web/panel_handler.py
2022-03-08 04:40:44 +00:00

1745 lines
82 KiB
Python

import time
import datetime
import os
from typing import Dict, Any, Tuple
import json
import logging
import threading
from app.classes.models.server_permissions import Enum_Permissions_Server
from app.classes.models.crafty_permissions import Enum_Permissions_Crafty
from app.classes.models.management import management_helper
from app.classes.shared.authentication import authentication
from app.classes.shared.helpers import helper
from app.classes.web.base_handler import BaseHandler
try:
import bleach
import libgravatar
import requests
import tornado.web
import tornado.escape
from tornado import iostream
#TZLocal is set as a hidden import on win pipeline
from tzlocal import get_localzone
from cron_validator import CronValidator
except ModuleNotFoundError as ex:
helper.auto_installer_fix(ex)
logger = logging.getLogger(__name__)
class PanelHandler(BaseHandler):
def get_user_roles(self) -> Dict[str, list]:
user_roles = {}
for user in self.controller.users.get_all_users():
user_roles_list = self.controller.users.get_user_roles_names(user.user_id)
# user_servers = self.controller.servers.get_authorized_servers(user.user_id)
user_roles[user.user_id] = user_roles_list
return user_roles
def get_role_servers(self) -> set:
servers = set()
for server in self.controller.list_defined_servers():
argument = int(float(
bleach.clean(
self.get_argument(f"server_{server['server_id']}_access", '0')
)
))
if argument:
servers.add(server['server_id'])
return servers
def get_perms_quantity(self) -> Tuple[str, dict]:
permissions_mask: str = "000"
server_quantity: dict = {}
for permission in self.controller.crafty_perms.list_defined_crafty_permissions():
argument = int(float(bleach.clean(
self.get_argument(f'permission_{permission.name}', '0')
)))
if argument:
permissions_mask = self.controller.crafty_perms.set_permission(permissions_mask, permission, argument)
q_argument = int(float(
bleach.clean(
self.get_argument(f'quantity_{permission.name}', '0')
)
))
if q_argument:
server_quantity[permission.name] = q_argument
else:
server_quantity[permission.name] = 0
return permissions_mask, server_quantity
def get_perms(self) -> str:
permissions_mask: str = "000"
for permission in self.controller.crafty_perms.list_defined_crafty_permissions():
argument = self.get_argument(f'permission_{permission.name}', None)
if argument is not None:
permissions_mask = self.controller.crafty_perms.set_permission(permissions_mask, permission,
1 if argument == '1' else 0)
return permissions_mask
def get_perms_server(self) -> str:
permissions_mask = "00000000"
for permission in self.controller.server_perms.list_defined_permissions():
argument = self.get_argument(f'permission_{permission.name}', None)
if argument is not None:
permissions_mask = self.controller.server_perms.set_permission(permissions_mask, permission,
1 if argument == '1' else 0)
return permissions_mask
def get_user_role_memberships(self) -> set:
roles = set()
for role in self.controller.roles.get_all_roles():
if self.get_argument(f'role_{role.role_id}_membership', None) == '1':
roles.add(role.role_id)
return roles
def download_file(self, name: str, file: str):
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition', f'attachment; filename={name}')
chunk_size = 1024 * 1024 * 4 # 4 MiB
with open(file, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
try:
self.write(chunk) # write the chunk to response
self.flush() # send the chunk to client
except iostream.StreamClosedError:
# this means the client has closed the connection
# so break the loop
break
finally:
# deleting the chunk is very important because
# if many clients are downloading files at the
# same time, the chunks in memory will keep
# increasing and will eat up the RAM
del chunk
def check_server_id(self):
server_id = self.get_argument('id', None)
api_key, _, exec_user = self.current_user
superuser = exec_user['superuser']
# Commented out because there is no server access control for API keys, they just inherit from the host user
# if api_key is not None:
# superuser = superuser and api_key.superuser
if server_id is None:
self.redirect("/panel/error?error=Invalid Server ID")
return None
else:
# Does this server exist?
if not self.controller.servers.server_id_exists(server_id):
self.redirect("/panel/error?error=Invalid Server ID")
return None
# Does the user have permission?
if not superuser: # TODO: Figure out a better solution
if api_key is not None:
if not self.controller.servers.server_id_authorized_api_key(server_id, api_key):
print(f'API key {api_key.name} (id: {api_key.token_id}) does not have permission')
self.redirect("/panel/error?error=Invalid Server ID")
return None
else:
if not self.controller.servers.server_id_authorized(server_id, exec_user["user_id"]):
print(f'User {exec_user["user_id"]} does not have permission')
self.redirect("/pandel/error?error=Invalid Server ID")
return None
return server_id
# Server fetching, spawned asynchronously
# TODO: Make the related front-end elements update with AJAX
def fetch_server_data(self, page_data):
total_players = 0
for server in page_data['servers']:
total_players += len(self.controller.stats.get_server_players(server['server_data']['server_id']))
page_data['num_players'] = total_players
for s in page_data['servers']:
try:
data = json.loads(s['int_ping_results'])
s['int_ping_results'] = data
except Exception as e:
logger.error(f"Failed server data for page with error: {e}")
return page_data
@tornado.web.authenticated
async def get(self, page):
error = self.get_argument('error', "WTF Error!")
template = "panel/denied.html"
now = time.time()
formatted_time = str(datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S'))
# pylint: disable=unused-variable
api_key, token_data, exec_user = self.current_user
superuser = exec_user['superuser']
if api_key is not None:
superuser = superuser and api_key.superuser
exec_user_role = set()
if superuser: # TODO: Figure out a better solution
defined_servers = self.controller.list_defined_servers()
exec_user_role.add("Super User")
exec_user_crafty_permissions = self.controller.crafty_perms.list_defined_crafty_permissions()
else:
if api_key is not None:
exec_user_crafty_permissions = self.controller.crafty_perms.get_api_key_permissions_list(api_key)
else:
exec_user_crafty_permissions = self.controller.crafty_perms.get_crafty_permissions_list(
exec_user["user_id"])
logger.debug(exec_user['roles'])
for r in exec_user['roles']:
role = self.controller.roles.get_role(r)
exec_user_role.add(role['role_name'])
defined_servers = self.controller.servers.get_authorized_servers(exec_user["user_id"])
user_order = self.controller.users.get_user_by_id(exec_user['user_id'])
user_order = user_order['server_order'].split(',')
page_servers = []
server_ids = []
for server_id in user_order[:]:
for server in defined_servers[:]:
if str(server['server_id']) == str(server_id):
page_servers.append(server)
user_order.remove(server_id)
defined_servers.remove(server)
for server in defined_servers:
server_ids.append(str(server['server_id']))
if server not in page_servers:
page_servers.append(server)
for server_id in user_order[:]:
#remove IDs in list that user no longer has access to
if str(server_id) not in server_ids:
user_order.remove(server_id)
defined_servers = page_servers
page_data: Dict[str, Any] = {
# todo: make this actually pull and compare version data
'update_available': False,
'serverTZ':get_localzone(),
'version_data': helper.get_version_string(),
'user_data': exec_user,
'user_role' : exec_user_role,
'user_crafty_permissions' : exec_user_crafty_permissions,
'crafty_permissions': {
'Server_Creation': Enum_Permissions_Crafty.Server_Creation,
'User_Config': Enum_Permissions_Crafty.User_Config,
'Roles_Config': Enum_Permissions_Crafty.Roles_Config,
},
'server_stats': {
'total': len(defined_servers),
'running': len(self.controller.list_running_servers()),
'stopped': (len(self.controller.list_defined_servers()) - len(self.controller.list_running_servers()))
},
'menu_servers': defined_servers,
'hosts_data': self.controller.management.get_latest_hosts_stats(),
'show_contribute': helper.get_setting("show_contribute_link", True),
'error': error,
'time': formatted_time,
'lang': self.controller.users.get_user_lang_by_id(exec_user["user_id"]),
'lang_page': helper.getLangPage(self.controller.users.get_user_lang_by_id(exec_user["user_id"])),
'super_user': superuser,
'api_key': {
'name': api_key.name,
'created': api_key.created,
'server_permissions': api_key.server_permissions,
'crafty_permissions': api_key.crafty_permissions,
'superuser': api_key.superuser
} if api_key is not None else None,
'superuser': superuser
}
if helper.get_setting("allow_nsfw_profile_pictures"):
rating = "x"
else:
rating = "g"
#Get grvatar hash for profile pictures
if exec_user['email'] != 'default@example.com' or "":
g = libgravatar.Gravatar(libgravatar.sanitize_email(exec_user['email']))
url = g.get_image(size=80, default="404", force_default=False, rating=rating, filetype_extension=False, use_ssl=True) # + "?d=404"
if requests.head(url).status_code != 404:
profile_url = url
else:
profile_url = "/static/assets/images/faces-clipart/pic-3.png"
else:
profile_url = "/static/assets/images/faces-clipart/pic-3.png"
page_data['user_image'] = profile_url
if page == 'unauthorized':
template = "panel/denied.html"
elif page == "error":
template = "public/error.html"
elif page == 'credits':
with open(helper.credits_cache, encoding='utf-8') as credits_default_local:
try:
remote = requests.get('https://craftycontrol.com/credits', allow_redirects=True)
credits_dict: dict = remote.json()
if not credits_dict["staff"]:
logger.error("Issue with upstream Staff, using local.")
credits_dict: dict = json.load(credits_default_local)
except:
logger.error("Request to credits bucket failed, using local.")
credits_dict: dict = json.load(credits_default_local)
timestamp = credits_dict["lastUpdate"] / 1000.0
page_data["patrons"] = credits_dict["patrons"]
page_data["staff"] = credits_dict["staff"]
# Filter Language keys to exclude joke prefix '*'
clean_dict = {user.replace('*', ''): translation for user, translation in credits_dict['translations'].items()}
page_data["translations"] = clean_dict
# 0 Defines if we are using local credits file andd displays sadcat.
if timestamp == 0:
page_data["lastUpdate"] = '😿'
else:
page_data["lastUpdate"] = str(datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S'))
template = "panel/credits.html"
elif page == 'contribute':
template = "panel/contribute.html"
elif page == 'dashboard':
if superuser: # TODO: Figure out a better solution
try:
page_data['servers'] = self.controller.servers.get_all_servers_stats()
except IndexError:
self.controller.stats.record_stats()
page_data['servers'] = self.controller.servers.get_all_servers_stats()
else:
try:
user_auth = self.controller.servers.get_authorized_servers_stats(exec_user["user_id"])
except IndexError:
self.controller.stats.record_stats()
user_auth = self.controller.servers.get_authorized_servers_stats(exec_user["user_id"])
logger.debug(f"ASFR: {user_auth}")
page_data['servers'] = user_auth
page_data['server_stats']['running'] = len(
list(filter(lambda x: x['stats']['running'], page_data['servers'])))
page_data['server_stats']['stopped'] = len(page_data['servers']) - page_data['server_stats']['running']
#set user server order
user_order = self.controller.users.get_user_by_id(exec_user['user_id'])
user_order = user_order['server_order'].split(',')
page_servers = []
server_ids = []
un_used_servers = page_data['servers']
flag = 0
for server_id in user_order[:]:
for server in un_used_servers[:]:
if flag == 0:
server['stats']['downloading'] = self.controller.servers.get_download_status(
str(server['stats']['server_id']['server_id']))
server['stats']['crashed'] = self.controller.servers.is_crashed(
str(server['stats']['server_id']['server_id']))
try:
server['stats']['waiting_start'] = self.controller.servers.get_waiting_start(
str(server['stats']['server_id']['server_id']))
except Exception as e:
logger.error(f"Failed to get server waiting to start: {e}")
server['stats']['waiting_start'] = False
if str(server['server_data']['server_id']) == str(server_id):
page_servers.append(server)
un_used_servers.remove(server)
user_order.remove(server_id)
#we only want to set these server stats values once. We need to update the flag so it only hits that if once.
flag += 1
for server in un_used_servers:
server_ids.append(str(server['server_data']['server_id']))
if server not in page_servers:
page_servers.append(server)
for server_id in user_order:
#remove IDs in list that user no longer has access to
if str(server_id) not in server_ids[:]:
user_order.remove(server_id)
page_data['servers'] = page_servers
#num players is set to zero here. If we poll all servers while dashboard is loading it takes FOREVER. We leave this to the
#async polling once dashboard is served.
page_data['num_players'] = 0
template = "panel/dashboard.html"
elif page == 'server_detail':
subpage = bleach.clean(self.get_argument('subpage', ""))
server_id = self.check_server_id()
if server_id is None:
return
valid_subpages = ['term', 'logs', 'backup', 'config', 'files', 'admin_controls', 'schedules']
if subpage not in valid_subpages:
logger.debug('not a valid subpage')
subpage = 'term'
logger.debug(f'Subpage: "{subpage}"')
server = self.controller.get_server_obj(server_id)
# server_data isn't needed since the server_stats also pulls server data
page_data['server_data'] = self.controller.servers.get_server_data_by_id(server_id)
page_data['server_stats'] = self.controller.servers.get_server_stats_by_id(server_id)
page_data['downloading'] = self.controller.servers.get_download_status(
server_id)
try:
page_data['waiting_start'] = self.controller.servers.get_waiting_start(server_id)
except Exception as e:
logger.error(f"Failed to get server waiting to start: {e}")
page_data['waiting_start'] = False
page_data['get_players'] = lambda: self.controller.stats.get_server_players(server_id)
page_data['active_link'] = subpage
page_data['permissions'] = {
'Commands': Enum_Permissions_Server.Commands,
'Terminal': Enum_Permissions_Server.Terminal,
'Logs': Enum_Permissions_Server.Logs,
'Schedule': Enum_Permissions_Server.Schedule,
'Backup': Enum_Permissions_Server.Backup,
'Files': Enum_Permissions_Server.Files,
'Config': Enum_Permissions_Server.Config,
'Players': Enum_Permissions_Server.Players,
}
page_data['user_permissions'] = self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id)
page_data['server_stats']['crashed'] = self.controller.servers.is_crashed(server_id)
page_data['server_stats']['server_type'] = self.controller.servers.get_server_type_by_id(server_id)
if subpage == 'term':
if not page_data['permissions']['Terminal'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access to Terminal")
return
if subpage == 'logs':
if not page_data['permissions']['Logs'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access to Logs")
return
if subpage == 'schedules':
if not page_data['permissions']['Schedule'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access To Schedules")
return
page_data['schedules'] = management_helper.get_schedules_by_server(server_id)
if subpage == 'config':
if not page_data['permissions']['Config'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access Server Config")
return
if subpage == 'files':
if not page_data['permissions']['Files'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access Files")
return
if subpage == "backup":
if not page_data['permissions']['Backup'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access to Backups")
return
server_info = self.controller.servers.get_server_data_by_id(server_id)
page_data['backup_config'] = self.controller.management.get_backup_config(server_id)
exclusions = []
page_data['exclusions'] = self.controller.management.get_excluded_backup_dirs(server_id)
#makes it so relative path is the only thing shown
for file in page_data['exclusions']:
if helper.is_os_windows():
exclusions.append(file.replace(server_info['path']+'\\', ""))
else:
exclusions.append(file.replace(server_info['path']+'/', ""))
page_data['exclusions'] = exclusions
self.controller.refresh_server_settings(server_id)
try:
page_data['backup_list'] = server.list_backups()
except:
page_data['backup_list'] = []
page_data['backup_path'] = helper.wtol_path(server_info["backup_path"])
def get_banned_players_html():
banned_players = self.controller.servers.get_banned_players(server_id)
if banned_players is None:
return """
<li class="playerItem banned">
<h3>Error while reading banned-players.json</h3>
</li>
"""
html = ""
for player in banned_players:
html += f"""
<li class="playerItem banned">
<h3>{player['name']}</h3>
<span>Banned by {player['source']} for reason: {player['reason']}</span>
<button onclick="send_command_to_server('pardon {player['name']}')" type="button" class="btn btn-danger">Unban</button>
</li>
"""
return html
if subpage == "admin_controls":
if not page_data['permissions']['Players'] in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access")
page_data['banned_players'] = get_banned_players_html()
template = f"panel/server_{subpage}.html"
elif page == 'download_backup':
file = self.get_argument('file', "")
server_id = self.check_server_id()
if server_id is None:
return
server_info = self.controller.servers.get_server_data_by_id(server_id)
backup_file = os.path.abspath(os.path.join(helper.get_os_understandable_path(server_info["backup_path"]), file))
if not helper.in_path(helper.get_os_understandable_path(server_info["backup_path"]), backup_file) \
or not os.path.isfile(backup_file):
self.redirect("/panel/error?error=Invalid path detected")
return
self.download_file(file, backup_file)
self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup")
elif page == 'backup_now':
server_id = self.check_server_id()
if server_id is None:
return
server = self.controller.get_server_obj(server_id)
management_helper.add_to_audit_log_raw(
self.controller.users.get_user_by_id(exec_user['user_id'])['username'], exec_user['user_id'], server_id,
f"Backup now executed for server {server_id} ",
source_ip=self.get_remote_ip())
server.backup_server()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup")
elif page == 'panel_config':
auth_servers = {}
auth_role_servers = {}
users_list = []
role_users = {}
roles = self.controller.roles.get_all_roles()
user_roles = {}
for user in self.controller.users.get_all_users():
user_roles_list = self.controller.users.get_user_roles_names(user.user_id)
user_servers = self.controller.servers.get_authorized_servers(user.user_id)
servers = []
for server in user_servers:
if server['server_name'] not in servers:
servers.append(server['server_name'])
new_item = {user.user_id: servers}
auth_servers.update(new_item)
data = {user.user_id: user_roles_list}
user_roles.update(data)
for role in roles:
role_servers = []
role = self.controller.roles.get_role_with_servers(role.role_id)
for serv_id in role['servers']:
role_servers.append(self.controller.servers.get_server_data_by_id(serv_id)['server_name'])
data = {role['role_id']: role_servers}
auth_role_servers.update(data)
page_data['auth-servers'] = auth_servers
page_data['role-servers'] = auth_role_servers
page_data['user-roles'] = user_roles
page_data['users'] = self.controller.users.user_query(exec_user['user_id'])
page_data['roles'] = self.controller.users.user_role_query(exec_user['user_id'])
for user in page_data['users']:
if user.user_id != exec_user['user_id']:
user.api_token = "********"
if superuser:
for user in self.controller.users.get_all_users():
if user.superuser:
super_auth_servers = ["Super User Access To All Servers"]
page_data['users'] = self.controller.users.get_all_users()
page_data['roles'] = self.controller.roles.get_all_roles()
page_data['auth-servers'][user.user_id] = super_auth_servers
template = "panel/panel_config.html"
elif page == "add_user":
page_data['new_user'] = True
page_data['user'] = {}
page_data['user']['username'] = ""
page_data['user']['user_id'] = -1
page_data['user']['email'] = ""
page_data['user']['enabled'] = True
page_data['user']['superuser'] = False
page_data['user']['created'] = "N/A"
page_data['user']['last_login'] = "N/A"
page_data['user']['last_ip'] = "N/A"
page_data['user']['last_update'] = "N/A"
page_data['user']['roles'] = set()
if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not a user editor")
return
page_data['roles_all'] = self.controller.roles.get_all_roles()
page_data['servers'] = []
page_data['servers_all'] = self.controller.list_defined_servers()
page_data['role-servers'] = []
page_data['permissions_all'] = self.controller.crafty_perms.list_defined_crafty_permissions()
page_data['permissions_list'] = set()
page_data['quantity_server'] = self.controller.crafty_perms.list_all_crafty_permissions_quantity_limits()
page_data['languages'] = []
page_data['languages'].append(self.controller.users.get_user_lang_by_id(exec_user["user_id"]))
if superuser:
page_data['super-disabled'] = ''
else:
page_data['super-disabled'] = 'disabled'
for file in sorted(os.listdir(os.path.join(helper.root_dir, 'app', 'translations'))):
if file.endswith('.json'):
if file not in helper.get_setting('disabled_language_files'):
if file != str(page_data['languages'][0] + '.json'):
page_data['languages'].append(file.split('.')[0])
template = "panel/panel_edit_user.html"
elif page == "add_schedule":
server_id = self.get_argument('id', None)
page_data['schedules'] = management_helper.get_schedules_by_server(server_id)
page_data['get_players'] = lambda: self.controller.stats.get_server_players(server_id)
page_data['active_link'] = 'schedules'
page_data['permissions'] = {
'Commands': Enum_Permissions_Server.Commands,
'Terminal': Enum_Permissions_Server.Terminal,
'Logs': Enum_Permissions_Server.Logs,
'Schedule': Enum_Permissions_Server.Schedule,
'Backup': Enum_Permissions_Server.Backup,
'Files': Enum_Permissions_Server.Files,
'Config': Enum_Permissions_Server.Config,
'Players': Enum_Permissions_Server.Players,
}
page_data['user_permissions'] = self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id)
page_data['server_data'] = self.controller.servers.get_server_data_by_id(server_id)
page_data['server_stats'] = self.controller.servers.get_server_stats_by_id(server_id)
page_data['server_stats']['server_type'] = self.controller.servers.get_server_type_by_id(server_id)
page_data['new_schedule'] = True
page_data['schedule'] = {}
page_data['schedule']['children'] = []
page_data['schedule']['server_id'] = server_id
page_data['schedule']['schedule_id'] = ''
page_data['schedule']['action'] = ""
page_data['schedule']['enabled'] = True
page_data['schedule']['command'] = ''
page_data['schedule']['one_time'] = False
page_data['schedule']['cron_string'] = ""
page_data['schedule']['time'] = ""
page_data['schedule']['interval'] = ""
#we don't need to check difficulty here. We'll just default to basic for new schedules
page_data['schedule']['difficulty'] = "basic"
page_data['schedule']['interval_type'] = 'days'
if not Enum_Permissions_Server.Schedule in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access To Schedules")
return
template = "panel/server_schedule_edit.html"
elif page == "edit_schedule":
server_id = self.get_argument('id', None)
page_data['schedules'] = management_helper.get_schedules_by_server(server_id)
sch_id = self.get_argument('sch_id', None)
schedule = self.controller.management.get_scheduled_task_model(sch_id)
page_data['get_players'] = lambda: self.controller.stats.get_server_players(server_id)
page_data['active_link'] = 'schedules'
page_data['permissions'] = {
'Commands': Enum_Permissions_Server.Commands,
'Terminal': Enum_Permissions_Server.Terminal,
'Logs': Enum_Permissions_Server.Logs,
'Schedule': Enum_Permissions_Server.Schedule,
'Backup': Enum_Permissions_Server.Backup,
'Files': Enum_Permissions_Server.Files,
'Config': Enum_Permissions_Server.Config,
'Players': Enum_Permissions_Server.Players,
}
page_data['user_permissions'] = self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id)
page_data['server_data'] = self.controller.servers.get_server_data_by_id(server_id)
page_data['server_stats'] = self.controller.servers.get_server_stats_by_id(server_id)
page_data['server_stats']['server_type'] = self.controller.servers.get_server_type_by_id(server_id)
page_data['new_schedule'] = False
page_data['schedule'] = {}
page_data['schedule']['server_id'] = server_id
page_data['schedule']['schedule_id'] = schedule.schedule_id
page_data['schedule']['action'] = schedule.action
page_data['schedule']['children'] = self.controller.management.get_child_schedules(sch_id)
# We check here to see if the command is any of the default ones.
# We do not want a user changing to a custom command and seeing our command there.
if schedule.action != 'start' or schedule.action != 'stop' or schedule.action != 'restart' or schedule.action != 'backup':
page_data['schedule']['command'] = schedule.command
else:
page_data['schedule']['command'] = ''
page_data['schedule']['enabled'] = schedule.enabled
page_data['schedule']['one_time'] = schedule.one_time
page_data['schedule']['cron_string'] = schedule.cron_string
page_data['schedule']['time'] = schedule.start_time
page_data['schedule']['interval'] = schedule.interval
page_data['schedule']['interval_type'] = schedule.interval_type
if schedule.interval_type == 'reaction':
difficulty = 'reaction'
elif schedule.cron_string == '':
difficulty = 'basic'
else:
difficulty = 'advanced'
page_data['schedule']['difficulty'] = difficulty
if sch_id is None or server_id is None:
self.redirect("/panel/error?error=Invalid server ID or Schedule ID")
if not Enum_Permissions_Server.Schedule in page_data['user_permissions']:
if not superuser:
self.redirect("/panel/error?error=Unauthorized access To Schedules")
return
template = "panel/server_schedule_edit.html"
elif page == "edit_user":
user_id = self.get_argument('id', None)
role_servers = self.controller.servers.get_authorized_servers(user_id)
page_role_servers = []
servers = set()
for server in role_servers:
page_role_servers.append(server['server_id'])
page_data['new_user'] = False
page_data['user'] = self.controller.users.get_user_by_id(user_id)
page_data['servers'] = servers
page_data['role-servers'] = page_role_servers
page_data['roles_all'] = self.controller.roles.get_all_roles()
page_data['servers_all'] = self.controller.list_defined_servers()
page_data['permissions_all'] = self.controller.crafty_perms.list_defined_crafty_permissions()
page_data['permissions_list'] = self.controller.crafty_perms.get_crafty_permissions_list(user_id)
page_data['quantity_server'] = self.controller.crafty_perms.list_crafty_permissions_quantity_limits(user_id)
page_data['languages'] = []
page_data['languages'].append(self.controller.users.get_user_lang_by_id(user_id))
#checks if super user. If not we disable the button.
if superuser and str(exec_user['user_id']) != str(user_id):
page_data['super-disabled'] = ''
else:
page_data['super-disabled'] = 'disabled'
for file in sorted(os.listdir(os.path.join(helper.root_dir, 'app', 'translations'))):
if file.endswith('.json'):
if file not in helper.get_setting('disabled_language_files'):
if file != str(page_data['languages'][0] + '.json'):
page_data['languages'].append(file.split('.')[0])
if user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
elif Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
if str(user_id) != str(exec_user["user_id"]):
self.redirect("/panel/error?error=Unauthorized access: not a user editor")
return
page_data['servers'] = []
page_data['role-servers'] = []
page_data['roles_all'] = []
page_data['servers_all'] = []
if exec_user['user_id'] != page_data['user']['user_id']:
page_data['user']['api_token'] = "********"
if exec_user['email'] == 'default@example.com':
page_data['user']['email'] = ""
template = "panel/panel_edit_user.html"
elif page == "edit_user_apikeys":
user_id = self.get_argument('id', None)
page_data['user'] = self.controller.users.get_user_by_id(user_id)
page_data['api_keys'] = self.controller.users.get_user_api_keys(user_id)
# self.controller.crafty_perms.list_defined_crafty_permissions()
page_data['server_permissions_all'] = self.controller.server_perms.list_defined_permissions()
page_data['crafty_permissions_all'] = self.controller.crafty_perms.list_defined_crafty_permissions()
if user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
template = "panel/panel_edit_user_apikeys.html"
elif page == "remove_user":
user_id = bleach.clean(self.get_argument('id', None))
if not superuser and Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not superuser")
return
elif str(exec_user["user_id"]) == str(user_id):
self.redirect("/panel/error?error=Unauthorized access: you cannot delete yourself")
return
elif user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
else:
# does this user id exist?
target_user = self.controller.users.get_user_by_id(user_id)
if not target_user:
self.redirect("/panel/error?error=Invalid User ID")
return
elif target_user['superuser']:
self.redirect("/panel/error?error=Cannot remove a superuser")
return
self.controller.users.remove_user(user_id)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Removed user {target_user['username']} (UID:{user_id})",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
elif page == "add_role":
user_roles = self.get_user_roles()
page_data['new_role'] = True
page_data['role'] = {}
page_data['role']['role_name'] = ""
page_data['role']['role_id'] = -1
page_data['role']['created'] = "N/A"
page_data['role']['last_update'] = "N/A"
page_data['role']['servers'] = set()
page_data['user-roles'] = user_roles
page_data['users'] = self.controller.users.get_all_users()
if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not a role editor")
return
page_data['servers_all'] = self.controller.list_defined_servers()
page_data['permissions_all'] = self.controller.server_perms.list_defined_permissions()
page_data['permissions_list'] = set()
template = "panel/panel_edit_role.html"
elif page == "edit_role":
user_roles = self.get_user_roles()
page_data['new_role'] = False
role_id = self.get_argument('id', None)
page_data['role'] = self.controller.roles.get_role_with_servers(role_id)
page_data['servers_all'] = self.controller.list_defined_servers()
page_data['permissions_all'] = self.controller.server_perms.list_defined_permissions()
page_data['permissions_list'] = self.controller.server_perms.get_role_permissions(role_id)
page_data['user-roles'] = user_roles
page_data['users'] = self.controller.users.get_all_users()
if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not a role editor")
return
elif role_id is None:
self.redirect("/panel/error?error=Invalid Role ID")
return
template = "panel/panel_edit_role.html"
elif page == "remove_role":
role_id = bleach.clean(self.get_argument('id', None))
if not superuser:
self.redirect("/panel/error?error=Unauthorized access: not superuser")
return
elif role_id is None:
self.redirect("/panel/error?error=Invalid Role ID")
return
else:
# does this user id exist?
target_role = self.controller.roles.get_role(role_id)
if not target_role:
self.redirect("/panel/error?error=Invalid Role ID")
return
self.controller.roles.remove_role(role_id)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Removed role {target_role['role_name']} (RID:{role_id})",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
elif page == "activity_logs":
page_data['audit_logs'] = self.controller.management.get_actity_log()
template = "panel/activity_logs.html"
elif page == 'download_file':
file = helper.get_os_understandable_path(self.get_argument('path', ""))
name = self.get_argument('name', "")
server_id = self.check_server_id()
if server_id is None:
return
server_info = self.controller.servers.get_server_data_by_id(server_id)
if not helper.in_path(helper.get_os_understandable_path(server_info["path"]), file) \
or not os.path.isfile(file):
self.redirect("/panel/error?error=Invalid path detected")
return
self.download_file(name, file)
self.redirect(f"/panel/server_detail?id={server_id}&subpage=files")
elif page == 'download_support_package':
tempZipStorage = exec_user['support_logs']
#We'll reset the support path for this user now.
self.controller.users.set_support_path(exec_user["user_id"], "")
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition', 'attachment; filename=' + "support_logs.zip")
chunk_size = 1024 * 1024 * 4 # 4 MiB
if tempZipStorage != '':
with open(tempZipStorage, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
try:
self.write(chunk) # write the chunk to response
self.flush() # send the chunk to client
except iostream.StreamClosedError:
# this means the client has closed the connection
# so break the loop
break
finally:
# deleting the chunk is very important because
# if many clients are downloading files at the
# same time, the chunks in memory will keep
# increasing and will eat up the RAM
del chunk
self.redirect('/panel/dashboard')
else:
self.redirect('/panel/error?error=No path found for support logs')
return
elif page == "support_logs":
logger.info(f"Support logs requested. Packinging logs for user with ID: {exec_user['user_id']}")
logs_thread = threading.Thread(target=self.controller.package_support_logs,
daemon=True,
args=(exec_user,),
name=f"{exec_user['user_id']}_logs_thread")
logs_thread.start()
self.redirect('/panel/dashboard')
return
self.render(
template,
data=page_data,
time=time,
utc_offset=(time.timezone * -1 / 60 / 60),
translate=self.translator.translate,
)
@tornado.web.authenticated
def post(self, page):
# pylint: disable=unused-variable
api_key, token_data, exec_user = self.current_user
superuser = exec_user['superuser']
if api_key is not None:
superuser = superuser and api_key.superuser
server_id = self.get_argument('id', None)
permissions = {
'Commands': Enum_Permissions_Server.Commands,
'Terminal': Enum_Permissions_Server.Terminal,
'Logs': Enum_Permissions_Server.Logs,
'Schedule': Enum_Permissions_Server.Schedule,
'Backup': Enum_Permissions_Server.Backup,
'Files': Enum_Permissions_Server.Files,
'Config': Enum_Permissions_Server.Config,
'Players': Enum_Permissions_Server.Players,
}
exec_user_role = set()
if superuser:
# defined_servers = self.controller.list_defined_servers()
exec_user_role.add("Super User")
exec_user_crafty_permissions = self.controller.crafty_perms.list_defined_crafty_permissions()
else:
exec_user_crafty_permissions = self.controller.crafty_perms.get_crafty_permissions_list(exec_user["user_id"])
# defined_servers = self.controller.servers.get_authorized_servers(exec_user["user_id"])
for r in exec_user['roles']:
role = self.controller.roles.get_role(r)
exec_user_role.add(role['role_name'])
if page == 'server_detail':
if not permissions['Config'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id):
if not superuser:
self.redirect("/panel/error?error=Unauthorized access to Config")
return
server_name = self.get_argument('server_name', None)
server_obj = self.controller.servers.get_server_obj(server_id)
if superuser:
server_path = self.get_argument('server_path', None)
if helper.is_os_windows():
server_path.replace(' ', '^ ')
server_path = helper.wtol_path(server_path)
log_path = self.get_argument('log_path', None)
if helper.is_os_windows():
log_path.replace(' ', '^ ')
log_path = helper.wtol_path(log_path)
executable = self.get_argument('executable', None)
execution_command = self.get_argument('execution_command', None)
server_ip = self.get_argument('server_ip', None)
server_port = self.get_argument('server_port', None)
executable_update_url = self.get_argument('executable_update_url', None)
else:
execution_command = server_obj.execution_command
executable = server_obj.executable
stop_command = self.get_argument('stop_command', None)
auto_start_delay = self.get_argument('auto_start_delay', '10')
auto_start = int(float(self.get_argument('auto_start', '0')))
crash_detection = int(float(self.get_argument('crash_detection', '0')))
logs_delete_after = int(float(self.get_argument('logs_delete_after', '0')))
# subpage = self.get_argument('subpage', None)
server_id = self.check_server_id()
if server_id is None:
return
server_obj = self.controller.servers.get_server_obj(server_id)
stale_executable = server_obj.executable
#Compares old jar name to page data being passed. If they are different we replace the executable name in the
if str(stale_executable) != str(executable):
execution_command = execution_command.replace(str(stale_executable), str(executable))
server_obj.server_name = server_name
if superuser:
if helper.validate_traversal(helper.get_servers_root_dir(), server_path):
server_obj.path = server_path
server_obj.log_path = log_path
if helper.validate_traversal(helper.get_servers_root_dir(), executable):
server_obj.executable = executable
server_obj.execution_command = execution_command
server_obj.server_ip = server_ip
server_obj.server_port = server_port
server_obj.executable_update_url = executable_update_url
else:
server_obj.path = server_obj.path
server_obj.log_path = server_obj.log_path
server_obj.executable = server_obj.executable
print(server_obj.execution_command)
server_obj.execution_command = server_obj.execution_command
server_obj.server_ip = server_obj.server_ip
server_obj.server_port = server_obj.server_port
server_obj.executable_update_url = server_obj.executable_update_url
server_obj.stop_command = stop_command
server_obj.auto_start_delay = auto_start_delay
server_obj.auto_start = auto_start
server_obj.crash_detection = crash_detection
server_obj.logs_delete_after = logs_delete_after
self.controller.servers.update_server(server_obj)
self.controller.crash_detection(server_obj)
self.controller.refresh_server_settings(server_id)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited server {server_id} named {server_name}",
server_id,
self.get_remote_ip())
self.redirect(f"/panel/server_detail?id={server_id}&subpage=config")
if page == "server_backup":
logger.debug(self.request.arguments)
server_id = self.get_argument('id', None)
server_obj = self.controller.servers.get_server_obj(server_id)
compress = self.get_argument('compress', False)
check_changed = self.get_argument('changed')
if str(check_changed) == str(1):
checked = self.get_body_arguments('root_path')
else:
checked = self.controller.management.get_excluded_backup_dirs(server_id)
if superuser:
backup_path = bleach.clean(self.get_argument('backup_path', None))
if helper.is_os_windows():
backup_path.replace(' ', '^ ')
backup_path = helper.wtol_path(backup_path)
else:
backup_path = server_obj.backup_path
max_backups = bleach.clean(self.get_argument('max_backups', None))
if not permissions['Backup'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id):
if not superuser:
self.redirect("/panel/error?error=Unauthorized access: User not authorized")
return
elif server_id is None:
self.redirect("/panel/error?error=Invalid Server ID")
return
else:
# does this server id exist?
if not self.controller.servers.server_id_exists(server_id):
self.redirect("/panel/error?error=Invalid Server ID")
return
server_obj = self.controller.servers.get_server_obj(server_id)
server_obj.backup_path = backup_path
self.controller.servers.update_server(server_obj)
self.controller.management.set_backup_config(server_id, max_backups=max_backups, excluded_dirs=checked, compress=bool(compress))
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited server {server_id}: updated backups",
server_id,
self.get_remote_ip())
self.tasks_manager.reload_schedule_from_db()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup")
if page == "new_schedule":
server_id = bleach.clean(self.get_argument('id', None))
difficulty = bleach.clean(self.get_argument('difficulty', None))
server_obj = self.controller.servers.get_server_obj(server_id)
enabled = bleach.clean(self.get_argument('enabled', '0'))
if difficulty == 'basic':
action = bleach.clean(self.get_argument('action', None))
interval = bleach.clean(self.get_argument('interval', None))
interval_type = bleach.clean(self.get_argument('interval_type', None))
#only check for time if it's number of days
if interval_type == "days":
sch_time = bleach.clean(self.get_argument('time', None))
if action == "command":
command = bleach.clean(self.get_argument('command', None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
elif difficulty == 'reaction':
interval_type = 'reaction'
action = bleach.clean(self.get_argument('action', None))
delay = bleach.clean(self.get_argument('delay', None))
parent = bleach.clean(self.get_argument('parent', None))
if action == "command":
command = bleach.clean(self.get_argument('command', None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
else:
interval_type = ''
cron_string = bleach.clean(self.get_argument('cron', ''))
try:
CronValidator.parse(cron_string)
except Exception as e:
self.redirect(f"/panel/error?error=INVALID FORMAT: Invalid Cron Format. {e}")
return
action = bleach.clean(self.get_argument('action', None))
if action == "command":
command = bleach.clean(self.get_argument('command', None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
if bleach.clean(self.get_argument('enabled', '0')) == '1':
enabled = True
else:
enabled = False
if bleach.clean(self.get_argument('one_time', '0')) == '1':
one_time = True
else:
one_time = False
if not superuser and not permissions['Backup'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"],
server_id):
self.redirect("/panel/error?error=Unauthorized access: User not authorized")
return
elif server_id is None:
self.redirect("/panel/error?error=Invalid Server ID")
return
else:
# does this server id exist?
if not self.controller.servers.server_id_exists(server_id):
self.redirect("/panel/error?error=Invalid Server ID")
return
if interval_type == "days":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"start_time": sch_time,
"enabled": enabled,
"one_time": one_time,
"cron_string": '',
"parent": None,
"delay": 0
}
elif difficulty == "reaction":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": '',
#We'll base every interval off of a midnight start time.
"start_time": '',
"command": command,
"cron_string": '',
"enabled": enabled,
"one_time": one_time,
"parent": parent,
"delay": delay
}
elif difficulty == "advanced":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": '',
"interval": '',
#We'll base every interval off of a midnight start time.
"start_time": '',
"command": command,
"cron_string": cron_string,
"enabled": enabled,
"one_time": one_time,
"parent": None,
"delay": 0
}
else:
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"enabled": enabled,
#We'll base every interval off of a midnight start time.
"start_time": '00:00',
"one_time": one_time,
"cron_string": '',
'parent': None,
'delay': 0
}
self.tasks_manager.schedule_job(job_data)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited server {server_id}: added scheduled job",
server_id,
self.get_remote_ip())
self.tasks_manager.reload_schedule_from_db()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=schedules")
if page == "edit_schedule":
server_id = bleach.clean(self.get_argument('id', None))
difficulty = bleach.clean(self.get_argument('difficulty', None))
server_obj = self.controller.servers.get_server_obj(server_id)
enabled = bleach.clean(self.get_argument('enabled', '0'))
if difficulty == 'basic':
action = bleach.clean(self.get_argument('action', None))
interval = bleach.clean(self.get_argument('interval', None))
interval_type = bleach.clean(self.get_argument('interval_type', None))
#only check for time if it's number of days
if interval_type == "days":
sch_time = bleach.clean(self.get_argument('time', None))
if action == "command":
command = bleach.clean(self.get_argument('command', None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
elif difficulty == 'reaction':
interval_type = 'reaction'
action = bleach.clean(self.get_argument('action', None))
delay = bleach.clean(self.get_argument('delay', None))
parent = bleach.clean(self.get_argument('parent', None))
if action == "command":
command = bleach.clean(self.get_argument('command', None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
parent = bleach.clean(self.get_argument('parent', None))
else:
interval_type = ''
cron_string = bleach.clean(self.get_argument('cron', ''))
sch_id = self.get_argument('sch_id', None)
try:
CronValidator.parse(cron_string)
except Exception as e:
self.redirect(f"/panel/error?error=INVALID FORMAT: Invalid Cron Format. {e}")
return
action = bleach.clean(self.get_argument('action', None))
if action == "command":
command = bleach.clean(self.get_argument('command', None))
elif action == "start":
command = "start_server"
elif action == "stop":
command = "stop_server"
elif action == "restart":
command = "restart_server"
elif action == "backup":
command = "backup_server"
if bleach.clean(self.get_argument('enabled', '0'))=='1':
enabled = True
else:
enabled = False
if bleach.clean(self.get_argument('one_time', '0')) == '1':
one_time = True
else:
one_time = False
if not superuser and not permissions['Backup'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"],
server_id):
self.redirect("/panel/error?error=Unauthorized access: User not authorized")
return
elif server_id is None:
self.redirect("/panel/error?error=Invalid Server ID")
return
else:
# does this server id exist?
if not self.controller.servers.server_id_exists(server_id):
self.redirect("/panel/error?error=Invalid Server ID")
return
if interval_type == "days":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"start_time": sch_time,
"enabled": enabled,
"one_time": one_time,
"cron_string": '',
"parent": None,
"delay": 0
}
elif difficulty == "advanced":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": '',
"interval": '',
#We'll base every interval off of a midnight start time.
"start_time": '',
"command": command,
"cron_string": cron_string,
"delay": '',
"parent": '',
"enabled": enabled,
"one_time": one_time
}
elif difficulty == "reaction":
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": '',
#We'll base every interval off of a midnight start time.
"start_time": '',
"command": command,
"cron_string": '',
"enabled": enabled,
"one_time": one_time,
"parent": parent,
"delay": delay
}
else:
job_data = {
"server_id": server_id,
"action": action,
"interval_type": interval_type,
"interval": interval,
"command": command,
"enabled": enabled,
#We'll base every interval off of a midnight start time.
"start_time": '00:00',
"delay": '',
"parent": '',
"one_time": one_time,
"cron_string": ''
}
sch_id = self.get_argument('sch_id', None)
self.tasks_manager.update_job(sch_id, job_data)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited server {server_id}: updated schedule",
server_id,
self.get_remote_ip())
self.tasks_manager.reload_schedule_from_db()
self.redirect(f"/panel/server_detail?id={server_id}&subpage=schedules")
elif page == "edit_user":
if bleach.clean(self.get_argument('username', None)) == 'system':
self.redirect("/panel/error?error=Unauthorized access: system user is not editable")
user_id = bleach.clean(self.get_argument('id', None))
username = bleach.clean(self.get_argument('username', None))
password0 = bleach.clean(self.get_argument('password0', None))
password1 = bleach.clean(self.get_argument('password1', None))
email = bleach.clean(self.get_argument('email', "default@example.com"))
enabled = int(float(self.get_argument('enabled', '0')))
lang = bleach.clean(self.get_argument('language'), helper.get_setting('language'))
if superuser:
#Checks if user is trying to change super user status of self. We don't want that.
# Automatically make them stay super user since we know they are.
if str(exec_user['user_id']) != str(user_id):
superuser = bleach.clean(self.get_argument('superuser', '0'))
else:
superuser = '1'
else:
superuser = '0'
if superuser == '1':
superuser = True
else:
superuser = False
if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
if str(user_id) != str(exec_user["user_id"]):
self.redirect("/panel/error?error=Unauthorized access: not a user editor")
return
user_data = {
"username": username,
"password": password0,
"lang": lang,
}
self.controller.users.update_user(user_id, user_data=user_data)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited user {username} (UID:{user_id}) password",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
return
elif username is None or username == "":
self.redirect("/panel/error?error=Invalid username")
return
elif user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
else:
# does this user id exist?
if not self.controller.users.user_id_exists(user_id):
self.redirect("/panel/error?error=Invalid User ID")
return
if password0 != password1:
self.redirect("/panel/error?error=Passwords must match")
return
roles = self.get_user_role_memberships()
permissions_mask, server_quantity = self.get_perms_quantity()
# if email is None or "":
# email = "default@example.com"
user_data = {
"username": username,
"password": password0,
"email": email,
"enabled": enabled,
"roles": roles,
"lang": lang,
"superuser": superuser,
}
user_crafty_data = {
"permissions_mask": permissions_mask,
"server_quantity": server_quantity
}
self.controller.users.update_user(user_id, user_data=user_data, user_crafty_data=user_crafty_data)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited user {username} (UID:{user_id}) with roles {roles} and permissions {permissions_mask}",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
elif page == "edit_user_apikeys":
user_id = self.get_argument('id', None)
name = self.get_argument('name', None)
superuser = self.get_argument('superuser', None) == '1'
if name is None or name == "":
self.redirect("/panel/error?error=Invalid API key name")
return
elif user_id is None:
self.redirect("/panel/error?error=Invalid User ID")
return
else:
# does this user id exist?
if not self.controller.users.user_id_exists(user_id):
self.redirect("/panel/error?error=Invalid User ID")
return
crafty_permissions_mask = self.get_perms()
server_permissions_mask = self.get_perms_server()
self.controller.users.add_user_api_key(name, user_id, superuser, crafty_permissions_mask, server_permissions_mask)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Added API key {name} with crafty permissions {crafty_permissions_mask}" +
f" and {server_permissions_mask} for user with UID: {user_id}",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect(f"/panel/edit_user_apikeys?id={user_id}")
elif page == "get_token":
key_id = self.get_argument('id', None)
if key_id is None:
self.redirect("/panel/error?error=Invalid Key ID")
return
else:
key = self.controller.users.get_user_api_key(key_id)
# does this user id exist?
if key is None:
self.redirect("/panel/error?error=Invalid Key ID")
return
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Generated a new API token for the key {key.name} from user with UID: {key.user.user_id}",
server_id=0,
source_ip=self.get_remote_ip())
self.write(authentication.generate(key.user.user_id, {
'token_id': key.token_id
}))
self.finish()
elif page == "add_user":
if bleach.clean(self.get_argument('username', None)).lower() == 'system':
self.redirect("/panel/error?error=Unauthorized access: username system is reserved for the Crafty system." +
" Please choose a different username.")
return
username = bleach.clean(self.get_argument('username', None))
password0 = bleach.clean(self.get_argument('password0', None))
password1 = bleach.clean(self.get_argument('password1', None))
email = bleach.clean(self.get_argument('email', "default@example.com"))
enabled = int(float(self.get_argument('enabled', '0')))
lang = bleach.clean(self.get_argument('lang', helper.get_setting('language')))
if superuser:
superuser = bleach.clean(self.get_argument('superuser', '0'))
else:
superuser = '0'
if superuser == '1':
superuser = True
else:
superuser = False
if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not a user editor")
return
elif username is None or username == "":
self.redirect("/panel/error?error=Invalid username")
return
else:
# does this user id exist?
if self.controller.users.get_id_by_name(username) is not None:
self.redirect("/panel/error?error=User exists")
return
if password0 != password1:
self.redirect("/panel/error?error=Passwords must match")
return
roles = self.get_user_role_memberships()
permissions_mask, server_quantity = self.get_perms_quantity()
user_id = self.controller.users.add_user(username, password=password0, email=email, enabled=enabled, superuser=superuser)
user_data = {
"roles": roles,
'lang': lang,
'lang_page': helper.getLangPage(lang),
}
user_crafty_data = {
"permissions_mask": permissions_mask,
"server_quantity": server_quantity
}
self.controller.users.update_user(user_id, user_data=user_data, user_crafty_data=user_crafty_data)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Added user {username} (UID:{user_id})",
server_id=0,
source_ip=self.get_remote_ip())
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited user {username} (UID:{user_id}) with roles {roles}",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
elif page == "edit_role":
role_id = bleach.clean(self.get_argument('id', None))
role_name = bleach.clean(self.get_argument('role_name', None))
if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not a role editor")
return
elif role_name is None or role_name == "":
self.redirect("/panel/error?error=Invalid username")
return
elif role_id is None:
self.redirect("/panel/error?error=Invalid Role ID")
return
else:
# does this user id exist?
if not self.controller.roles.role_id_exists(role_id):
self.redirect("/panel/error?error=Invalid Role ID")
return
servers = self.get_role_servers()
permissions_mask = self.get_perms_server()
role_data = {
"role_name": role_name,
"servers": servers
}
self.controller.roles.update_role(role_id, role_data=role_data, permissions_mask=permissions_mask)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited role {role_name} (RID:{role_id}) with servers {servers}",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
elif page == "add_role":
role_name = bleach.clean(self.get_argument('role_name', None))
if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions:
self.redirect("/panel/error?error=Unauthorized access: not a role editor")
return
elif role_name is None or role_name == "":
self.redirect("/panel/error?error=Invalid role name")
return
else:
# does this user id exist?
if self.controller.roles.get_roleid_by_name(role_name) is not None:
self.redirect("/panel/error?error=Role exists")
return
servers = self.get_role_servers()
permissions_mask = self.get_perms_server()
role_id = self.controller.roles.add_role(role_name)
self.controller.roles.update_role(role_id, {"servers": servers}, permissions_mask)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Added role {role_name} (RID:{role_id})",
server_id=0,
source_ip=self.get_remote_ip())
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Edited role {role_name} (RID:{role_id}) with servers {servers}",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
else:
self.set_status(404)
page_data = {
'lang': helper.get_setting('language'),
'lang_page': helper.getLangPage(helper.get_setting('language')),
}
self.render(
"public/404.html",
translate=self.translator.translate,
data=page_data
)
@tornado.web.authenticated
def delete(self, page):
# pylint: disable=unused-variable
api_key, token_data, exec_user = self.current_user
superuser = exec_user['superuser']
if api_key is not None:
superuser = superuser and api_key.superuser
page_data = {
# todo: make this actually pull and compare version data
'update_available': False,
'version_data': helper.get_version_string(),
'user_data': exec_user,
'hosts_data': self.controller.management.get_latest_hosts_stats(),
'show_contribute': helper.get_setting("show_contribute_link", True),
'lang': self.controller.users.get_user_lang_by_id(exec_user["user_id"]),
'lang_page': helper.getLangPage(self.controller.users.get_user_lang_by_id(exec_user["user_id"])),
}
if page == "remove_apikey":
key_id = bleach.clean(self.get_argument('id', None))
if not superuser:
self.redirect("/panel/error?error=Unauthorized access: not superuser")
return
elif key_id is None or self.controller.users.get_user_api_key(key_id) is None:
self.redirect("/panel/error?error=Invalid Key ID")
return
else:
# does this user id exist?
target_key = self.controller.users.get_user_api_key(key_id)
if not target_key:
self.redirect("/panel/error?error=Invalid Key ID")
return
self.controller.users.delete_user_api_key(key_id)
self.controller.management.add_to_audit_log(exec_user['user_id'],
f"Removed API key {target_key} (ID: {key_id}) from user {exec_user['user_id']}",
server_id=0,
source_ip=self.get_remote_ip())
self.redirect("/panel/panel_config")
else:
self.set_status(404)
self.render(
"public/404.html",
data=page_data,
translate=self.translator.translate,
)