import os import logging import tornado.web import tornado.escape import bleach from app.classes.shared.console import console from app.classes.shared.helpers import helper from app.classes.shared.file_helpers import file_helper from app.classes.web.base_handler import BaseHandler from app.classes.models.server_permissions import Enum_Permissions_Server logger = logging.getLogger(__name__) class FileHandler(BaseHandler): def render_page(self, template, page_data): self.render( template, data=page_data, translate=self.translator.translate, ) @tornado.web.authenticated def get(self, page): api_key, _, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser server_id = self.get_argument('id', None) permissions = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } user_perms = self.controller.server_perms.get_user_id_permissions_list(exec_user['user_id'], server_id) if page == "get_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return file_path = helper.get_os_understandable_path(self.get_argument('file_path', None)) if not self.check_server_id(server_id, 'get_file'): return else: server_id = bleach.clean(server_id) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), file_path)\ or not helper.check_file_exists(os.path.abspath(file_path)): logger.warning(f"Invalid path in get_file file file ajax call ({file_path})") console.warning(f"Invalid path in get_file file file ajax call ({file_path})") return error = None try: with open(file_path, encoding='utf-8') as file: file_contents = file.read() except UnicodeDecodeError: file_contents = '' error = 'UnicodeDecodeError' self.write({ 'content': file_contents, 'error': error }) self.finish() elif page == "get_tree": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return path = self.get_argument('path', None) if not self.check_server_id(server_id, 'get_tree'): return else: server_id = bleach.clean(server_id) if helper.validate_traversal(self.controller.servers.get_server_data_by_id(server_id)['path'], path): self.write(helper.get_os_understandable_path(path) + '\n' + helper.generate_tree(path)) self.finish() elif page == "get_dir": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return path = self.get_argument('path', None) if not self.check_server_id(server_id, 'get_tree'): return else: server_id = bleach.clean(server_id) if helper.validate_traversal(self.controller.servers.get_server_data_by_id(server_id)['path'], path): self.write(helper.get_os_understandable_path(path) + '\n' + helper.generate_dir(path)) self.finish() @tornado.web.authenticated def post(self, page): api_key, _, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser server_id = self.get_argument('id', None) permissions = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } user_perms = self.controller.server_perms.get_user_id_permissions_list(exec_user['user_id'], server_id) if page == "create_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return file_parent = helper.get_os_understandable_path(self.get_body_argument('file_parent', default=None, strip=True)) file_name = self.get_body_argument('file_name', default=None, strip=True) file_path = os.path.join(file_parent, file_name) if not self.check_server_id(server_id, 'create_file'): return else: server_id = bleach.clean(server_id) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), file_path) \ or helper.check_file_exists(os.path.abspath(file_path)): logger.warning(f"Invalid path in create_file file ajax call ({file_path})") console.warning(f"Invalid path in create_file file ajax call ({file_path})") return # Create the file by opening it with open(file_path, 'w', encoding='utf-8') as file_object: file_object.close() elif page == "create_dir": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return dir_parent = helper.get_os_understandable_path(self.get_body_argument('dir_parent', default=None, strip=True)) dir_name = self.get_body_argument('dir_name', default=None, strip=True) dir_path = os.path.join(dir_parent, dir_name) if not self.check_server_id(server_id, 'create_dir'): return else: server_id = bleach.clean(server_id) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), dir_path) \ or helper.check_path_exists(os.path.abspath(dir_path)): logger.warning(f"Invalid path in create_dir file ajax call ({dir_path})") console.warning(f"Invalid path in create_dir file ajax call ({dir_path})") return # Create the directory os.mkdir(dir_path) elif page == "unzip_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return path = helper.get_os_understandable_path(self.get_argument('path', None)) helper.unzipFile(path) self.redirect(f"/panel/server_detail?id={server_id}&subpage=files") return @tornado.web.authenticated def delete(self, page): api_key, _, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser server_id = self.get_argument('id', None) permissions = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } user_perms = self.controller.server_perms.get_user_id_permissions_list(exec_user['user_id'], server_id) if page == "del_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return file_path = helper.get_os_understandable_path(self.get_body_argument('file_path', default=None, strip=True)) console.warning(f"Delete {file_path} for server {server_id}") if not self.check_server_id(server_id, 'del_file'): return else: server_id = bleach.clean(server_id) server_info = self.controller.servers.get_server_data_by_id(server_id) if not (helper.in_path(helper.get_os_understandable_path(server_info['path']), file_path) \ or helper.in_path(helper.get_os_understandable_path(server_info['backup_path']), file_path)) \ or not helper.check_file_exists(os.path.abspath(file_path)): logger.warning(f"Invalid path in del_file file ajax call ({file_path})") console.warning(f"Invalid path in del_file file ajax call ({file_path})") return # Delete the file file_helper.del_file(file_path) elif page == "del_dir": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return dir_path = helper.get_os_understandable_path(self.get_body_argument('dir_path', default=None, strip=True)) console.warning(f"Delete {dir_path} for server {server_id}") if not self.check_server_id(server_id, 'del_dir'): return else: server_id = bleach.clean(server_id) server_info = self.controller.servers.get_server_data_by_id(server_id) if not helper.in_path(helper.get_os_understandable_path(server_info['path']), dir_path) \ or not helper.check_path_exists(os.path.abspath(dir_path)): logger.warning(f"Invalid path in del_file file ajax call ({dir_path})") console.warning(f"Invalid path in del_file file ajax call ({dir_path})") return # Delete the directory # os.rmdir(dir_path) # Would only remove empty directories if helper.validate_traversal(helper.get_os_understandable_path(server_info['path']), dir_path): # Removes also when there are contents file_helper.del_dirs(dir_path) @tornado.web.authenticated def put(self, page): api_key, _, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser server_id = self.get_argument('id', None) permissions = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } user_perms = self.controller.server_perms.get_user_id_permissions_list(exec_user['user_id'], server_id) if page == "save_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return file_contents = self.get_body_argument('file_contents', default=None, strip=True) file_path = helper.get_os_understandable_path(self.get_body_argument('file_path', default=None, strip=True)) if not self.check_server_id(server_id, 'save_file'): return else: server_id = bleach.clean(server_id) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), file_path)\ or not helper.check_file_exists(os.path.abspath(file_path)): logger.warning(f"Invalid path in save_file file ajax call ({file_path})") console.warning(f"Invalid path in save_file file ajax call ({file_path})") return # Open the file in write mode and store the content in file_object with open(file_path, 'w', encoding='utf-8') as file_object: file_object.write(file_contents) elif page == "rename_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return item_path = helper.get_os_understandable_path(self.get_body_argument('item_path', default=None, strip=True)) new_item_name = self.get_body_argument('new_item_name', default=None, strip=True) if not self.check_server_id(server_id, 'rename_file'): return else: server_id = bleach.clean(server_id) if item_path is None or new_item_name is None: logger.warning("Invalid path(s) in rename_file file ajax call") console.warning("Invalid path(s) in rename_file file ajax call") return if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), item_path) \ or not helper.check_path_exists(os.path.abspath(item_path)): logger.warning(f"Invalid old name path in rename_file file ajax call ({server_id})") console.warning(f"Invalid old name path in rename_file file ajax call ({server_id})") return new_item_path = os.path.join(os.path.split(item_path)[0], new_item_name) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), new_item_path) \ or helper.check_path_exists(os.path.abspath(new_item_path)): logger.warning(f"Invalid new name path in rename_file file ajax call ({server_id})") console.warning(f"Invalid new name path in rename_file file ajax call ({server_id})") return # RENAME os.rename(item_path, new_item_path) @tornado.web.authenticated def patch(self, page): api_key, _, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser server_id = self.get_argument('id', None) permissions = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } user_perms = self.controller.server_perms.get_user_id_permissions_list(exec_user['user_id'], server_id) if page == "rename_file": if not permissions['Files'] in user_perms: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Files") return item_path = helper.get_os_understandable_path(self.get_body_argument('item_path', default=None, strip=True)) new_item_name = self.get_body_argument('new_item_name', default=None, strip=True) if not self.check_server_id(server_id, 'rename_file'): return else: server_id = bleach.clean(server_id) if item_path is None or new_item_name is None: logger.warning("Invalid path(s) in rename_file file ajax call") console.warning("Invalid path(s) in rename_file file ajax call") return if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), item_path) \ or not helper.check_path_exists(os.path.abspath(item_path)): logger.warning(f"Invalid old name path in rename_file file ajax call ({server_id})") console.warning(f"Invalid old name path in rename_file file ajax call ({server_id})") return new_item_path = os.path.join(os.path.split(item_path)[0], new_item_name) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), new_item_path) \ or helper.check_path_exists(os.path.abspath(new_item_path)): logger.warning(f"Invalid new name path in rename_file file ajax call ({server_id})") console.warning(f"Invalid new name path in rename_file file ajax call ({server_id})") return # RENAME os.rename(item_path, new_item_path) def check_server_id(self, server_id, page_name): if server_id is None: logger.warning(f"Server ID not defined in {page_name} file ajax call ({server_id})") console.warning(f"Server ID not defined in {page_name} file ajax call ({server_id})") return else: server_id = bleach.clean(server_id) # does this server id exist? if not self.controller.servers.server_id_exists(server_id): logger.warning(f"Server ID not found in {page_name} file ajax call ({server_id})") console.warning(f"Server ID not found in {page_name} file ajax call ({server_id})") return return True