mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
465 lines
18 KiB
Python
465 lines
18 KiB
Python
import os
|
|
import logging
|
|
import bleach
|
|
import tornado.web
|
|
import tornado.escape
|
|
|
|
from app.classes.models.server_permissions import EnumPermissionsServer
|
|
from app.classes.shared.console import Console
|
|
from app.classes.shared.helpers import Helpers
|
|
from app.classes.shared.file_helpers import FileHelpers
|
|
from app.classes.web.base_handler import BaseHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileHandler(BaseHandler):
|
|
def render_page(self, template, page_data):
|
|
self.render(
|
|
template,
|
|
data=page_data,
|
|
translate=self.translator.translate,
|
|
)
|
|
|
|
@tornado.web.authenticated
|
|
def get(self, page):
|
|
api_key, _, exec_user = self.current_user
|
|
superuser = exec_user["superuser"]
|
|
if api_key is not None:
|
|
superuser = superuser and api_key.superuser
|
|
|
|
server_id = self.get_argument("id", None)
|
|
|
|
permissions = {
|
|
"Commands": EnumPermissionsServer.COMMANDS,
|
|
"Terminal": EnumPermissionsServer.TERMINAL,
|
|
"Logs": EnumPermissionsServer.LOGS,
|
|
"Schedule": EnumPermissionsServer.SCHEDULE,
|
|
"Backup": EnumPermissionsServer.BACKUP,
|
|
"Files": EnumPermissionsServer.FILES,
|
|
"Config": EnumPermissionsServer.CONFIG,
|
|
"Players": EnumPermissionsServer.PLAYERS,
|
|
}
|
|
user_perms = self.controller.server_perms.get_user_id_permissions_list(
|
|
exec_user["user_id"], server_id
|
|
)
|
|
|
|
if page == "get_file":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
file_path = Helpers.get_os_understandable_path(
|
|
self.get_argument("file_path", None)
|
|
)
|
|
|
|
if not self.check_server_id(server_id, "get_file"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if not self.helper.is_subdir(
|
|
file_path,
|
|
Helpers.get_os_understandable_path(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"]
|
|
),
|
|
) or not Helpers.check_file_exists(os.path.abspath(file_path)):
|
|
logger.warning(
|
|
f"Invalid path in get_file file file ajax call ({file_path})"
|
|
)
|
|
Console.warning(
|
|
f"Invalid path in get_file file file ajax call ({file_path})"
|
|
)
|
|
return
|
|
|
|
error = None
|
|
|
|
try:
|
|
with open(file_path, encoding="utf-8") as file:
|
|
file_contents = file.read()
|
|
except UnicodeDecodeError:
|
|
file_contents = ""
|
|
error = "UnicodeDecodeError"
|
|
|
|
self.write({"content": file_contents, "error": error})
|
|
self.finish()
|
|
|
|
elif page == "get_tree":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
path = self.get_argument("path", None)
|
|
|
|
if not self.check_server_id(server_id, "get_tree"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if Helpers.validate_traversal(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"], path
|
|
):
|
|
self.write(
|
|
Helpers.get_os_understandable_path(path)
|
|
+ "\n"
|
|
+ self.helper.generate_tree(path)
|
|
)
|
|
self.finish()
|
|
|
|
elif page == "get_dir":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
path = self.get_argument("path", None)
|
|
|
|
if not self.check_server_id(server_id, "get_tree"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if Helpers.validate_traversal(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"], path
|
|
):
|
|
self.write(
|
|
Helpers.get_os_understandable_path(path)
|
|
+ "\n"
|
|
+ self.helper.generate_dir(path)
|
|
)
|
|
self.finish()
|
|
|
|
@tornado.web.authenticated
|
|
def post(self, page):
|
|
api_key, _, exec_user = self.current_user
|
|
superuser = exec_user["superuser"]
|
|
if api_key is not None:
|
|
superuser = superuser and api_key.superuser
|
|
|
|
server_id = self.get_argument("id", None)
|
|
|
|
permissions = {
|
|
"Commands": EnumPermissionsServer.COMMANDS,
|
|
"Terminal": EnumPermissionsServer.TERMINAL,
|
|
"Logs": EnumPermissionsServer.LOGS,
|
|
"Schedule": EnumPermissionsServer.SCHEDULE,
|
|
"Backup": EnumPermissionsServer.BACKUP,
|
|
"Files": EnumPermissionsServer.FILES,
|
|
"Config": EnumPermissionsServer.CONFIG,
|
|
"Players": EnumPermissionsServer.PLAYERS,
|
|
}
|
|
user_perms = self.controller.server_perms.get_user_id_permissions_list(
|
|
exec_user["user_id"], server_id
|
|
)
|
|
|
|
if page == "create_file":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
file_parent = Helpers.get_os_understandable_path(
|
|
self.get_body_argument("file_parent", default=None, strip=True)
|
|
)
|
|
file_name = self.get_body_argument("file_name", default=None, strip=True)
|
|
file_path = os.path.join(file_parent, file_name)
|
|
|
|
if not self.check_server_id(server_id, "create_file"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if not self.helper.is_subdir(
|
|
file_path,
|
|
Helpers.get_os_understandable_path(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"]
|
|
),
|
|
) or Helpers.check_file_exists(os.path.abspath(file_path)):
|
|
logger.warning(
|
|
f"Invalid path in create_file file ajax call ({file_path})"
|
|
)
|
|
Console.warning(
|
|
f"Invalid path in create_file file ajax call ({file_path})"
|
|
)
|
|
return
|
|
|
|
# Create the file by opening it
|
|
with open(file_path, "w", encoding="utf-8") as file_object:
|
|
file_object.close()
|
|
|
|
elif page == "create_dir":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
dir_parent = Helpers.get_os_understandable_path(
|
|
self.get_body_argument("dir_parent", default=None, strip=True)
|
|
)
|
|
dir_name = self.get_body_argument("dir_name", default=None, strip=True)
|
|
dir_path = os.path.join(dir_parent, dir_name)
|
|
|
|
if not self.check_server_id(server_id, "create_dir"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if not self.helper.is_subdir(
|
|
dir_path,
|
|
Helpers.get_os_understandable_path(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"]
|
|
),
|
|
) or Helpers.check_path_exists(os.path.abspath(dir_path)):
|
|
logger.warning(
|
|
f"Invalid path in create_dir file ajax call ({dir_path})"
|
|
)
|
|
Console.warning(
|
|
f"Invalid path in create_dir file ajax call ({dir_path})"
|
|
)
|
|
return
|
|
# Create the directory
|
|
os.mkdir(dir_path)
|
|
|
|
elif page == "unzip_file":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
path = Helpers.get_os_understandable_path(self.get_argument("path", None))
|
|
if Helpers.is_os_windows():
|
|
path = Helpers.wtol_path(path)
|
|
FileHelpers.unzip_file(path)
|
|
self.redirect(f"/panel/server_detail?id={server_id}&subpage=files")
|
|
return
|
|
|
|
@tornado.web.authenticated
|
|
def delete(self, page):
|
|
api_key, _, exec_user = self.current_user
|
|
superuser = exec_user["superuser"]
|
|
if api_key is not None:
|
|
superuser = superuser and api_key.superuser
|
|
|
|
server_id = self.get_argument("id", None)
|
|
|
|
permissions = {
|
|
"Commands": EnumPermissionsServer.COMMANDS,
|
|
"Terminal": EnumPermissionsServer.TERMINAL,
|
|
"Logs": EnumPermissionsServer.LOGS,
|
|
"Schedule": EnumPermissionsServer.SCHEDULE,
|
|
"Backup": EnumPermissionsServer.BACKUP,
|
|
"Files": EnumPermissionsServer.FILES,
|
|
"Config": EnumPermissionsServer.CONFIG,
|
|
"Players": EnumPermissionsServer.PLAYERS,
|
|
}
|
|
user_perms = self.controller.server_perms.get_user_id_permissions_list(
|
|
exec_user["user_id"], server_id
|
|
)
|
|
if page == "del_file":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
file_path = Helpers.get_os_understandable_path(
|
|
self.get_body_argument("file_path", default=None, strip=True)
|
|
)
|
|
|
|
Console.warning(f"Delete {file_path} for server {server_id}")
|
|
|
|
if not self.check_server_id(server_id, "del_file"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
server_info = self.controller.servers.get_server_data_by_id(server_id)
|
|
if not (
|
|
self.helper.is_subdir(
|
|
file_path, Helpers.get_os_understandable_path(server_info["path"])
|
|
)
|
|
or self.helper.is_subdir(
|
|
file_path,
|
|
Helpers.get_os_understandable_path(server_info["backup_path"]),
|
|
)
|
|
) or not Helpers.check_file_exists(os.path.abspath(file_path)):
|
|
logger.warning(f"Invalid path in del_file file ajax call ({file_path})")
|
|
Console.warning(
|
|
f"Invalid path in del_file file ajax call ({file_path})"
|
|
)
|
|
return
|
|
|
|
# Delete the file
|
|
FileHelpers.del_file(file_path)
|
|
|
|
elif page == "del_dir":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
dir_path = Helpers.get_os_understandable_path(
|
|
self.get_body_argument("dir_path", default=None, strip=True)
|
|
)
|
|
|
|
Console.warning(f"Delete {dir_path} for server {server_id}")
|
|
|
|
if not self.check_server_id(server_id, "del_dir"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
server_info = self.controller.servers.get_server_data_by_id(server_id)
|
|
if not self.helper.is_subdir(
|
|
dir_path, Helpers.get_os_understandable_path(server_info["path"])
|
|
) or not Helpers.check_path_exists(os.path.abspath(dir_path)):
|
|
logger.warning(f"Invalid path in del_file file ajax call ({dir_path})")
|
|
Console.warning(f"Invalid path in del_file file ajax call ({dir_path})")
|
|
return
|
|
|
|
# Delete the directory
|
|
# os.rmdir(dir_path) # Would only remove empty directories
|
|
if Helpers.validate_traversal(
|
|
Helpers.get_os_understandable_path(server_info["path"]), dir_path
|
|
):
|
|
# Removes also when there are contents
|
|
FileHelpers.del_dirs(dir_path)
|
|
|
|
@tornado.web.authenticated
|
|
def put(self, page):
|
|
api_key, _, exec_user = self.current_user
|
|
superuser = exec_user["superuser"]
|
|
if api_key is not None:
|
|
superuser = superuser and api_key.superuser
|
|
|
|
server_id = self.get_argument("id", None)
|
|
permissions = {
|
|
"Commands": EnumPermissionsServer.COMMANDS,
|
|
"Terminal": EnumPermissionsServer.TERMINAL,
|
|
"Logs": EnumPermissionsServer.LOGS,
|
|
"Schedule": EnumPermissionsServer.SCHEDULE,
|
|
"Backup": EnumPermissionsServer.BACKUP,
|
|
"Files": EnumPermissionsServer.FILES,
|
|
"Config": EnumPermissionsServer.CONFIG,
|
|
"Players": EnumPermissionsServer.PLAYERS,
|
|
}
|
|
user_perms = self.controller.server_perms.get_user_id_permissions_list(
|
|
exec_user["user_id"], server_id
|
|
)
|
|
if page == "save_file":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
file_contents = self.get_body_argument(
|
|
"file_contents", default=None, strip=True
|
|
)
|
|
file_path = Helpers.get_os_understandable_path(
|
|
self.get_body_argument("file_path", default=None, strip=True)
|
|
)
|
|
|
|
if not self.check_server_id(server_id, "save_file"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if not self.helper.is_subdir(
|
|
file_path,
|
|
Helpers.get_os_understandable_path(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"]
|
|
),
|
|
) or not Helpers.check_file_exists(os.path.abspath(file_path)):
|
|
logger.warning(
|
|
f"Invalid path in save_file file ajax call ({file_path})"
|
|
)
|
|
Console.warning(
|
|
f"Invalid path in save_file file ajax call ({file_path})"
|
|
)
|
|
return
|
|
|
|
# Open the file in write mode and store the content in file_object
|
|
with open(file_path, "w", encoding="utf-8") as file_object:
|
|
file_object.write(file_contents)
|
|
|
|
@tornado.web.authenticated
|
|
def patch(self, page):
|
|
api_key, _, exec_user = self.current_user
|
|
superuser = exec_user["superuser"]
|
|
if api_key is not None:
|
|
superuser = superuser and api_key.superuser
|
|
|
|
server_id = self.get_argument("id", None)
|
|
permissions = {
|
|
"Commands": EnumPermissionsServer.COMMANDS,
|
|
"Terminal": EnumPermissionsServer.TERMINAL,
|
|
"Logs": EnumPermissionsServer.LOGS,
|
|
"Schedule": EnumPermissionsServer.SCHEDULE,
|
|
"Backup": EnumPermissionsServer.BACKUP,
|
|
"Files": EnumPermissionsServer.FILES,
|
|
"Config": EnumPermissionsServer.CONFIG,
|
|
"Players": EnumPermissionsServer.PLAYERS,
|
|
}
|
|
user_perms = self.controller.server_perms.get_user_id_permissions_list(
|
|
exec_user["user_id"], server_id
|
|
)
|
|
if page == "rename_file":
|
|
if not permissions["Files"] in user_perms:
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Unauthorized access to Files")
|
|
return
|
|
item_path = Helpers.get_os_understandable_path(
|
|
self.get_body_argument("item_path", default=None, strip=True)
|
|
)
|
|
new_item_name = self.get_body_argument(
|
|
"new_item_name", default=None, strip=True
|
|
)
|
|
|
|
if not self.check_server_id(server_id, "rename_file"):
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
if item_path is None or new_item_name is None:
|
|
logger.warning("Invalid path(s) in rename_file file ajax call")
|
|
Console.warning("Invalid path(s) in rename_file file ajax call")
|
|
return
|
|
|
|
if not self.helper.is_subdir(
|
|
item_path,
|
|
Helpers.get_os_understandable_path(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"]
|
|
),
|
|
) or not Helpers.check_path_exists(os.path.abspath(item_path)):
|
|
logger.warning(
|
|
f"Invalid old name path in rename_file file ajax call ({server_id})"
|
|
)
|
|
Console.warning(
|
|
f"Invalid old name path in rename_file file ajax call ({server_id})"
|
|
)
|
|
return
|
|
|
|
new_item_path = os.path.join(os.path.split(item_path)[0], new_item_name)
|
|
|
|
if not self.helper.is_subdir(
|
|
new_item_path,
|
|
Helpers.get_os_understandable_path(
|
|
self.controller.servers.get_server_data_by_id(server_id)["path"]
|
|
),
|
|
) or Helpers.check_path_exists(os.path.abspath(new_item_path)):
|
|
logger.warning(
|
|
f"Invalid new name path in rename_file file ajax call ({server_id})"
|
|
)
|
|
Console.warning(
|
|
f"Invalid new name path in rename_file file ajax call ({server_id})"
|
|
)
|
|
return
|
|
|
|
# RENAME
|
|
os.rename(item_path, new_item_path)
|
|
|
|
def check_server_id(self, server_id, page_name):
|
|
if server_id is None:
|
|
logger.warning(
|
|
f"Server ID not defined in {page_name} file ajax call ({server_id})"
|
|
)
|
|
Console.warning(
|
|
f"Server ID not defined in {page_name} file ajax call ({server_id})"
|
|
)
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
# does this server id exist?
|
|
if not self.controller.servers.server_id_exists(server_id):
|
|
logger.warning(
|
|
f"Server ID not found in {page_name} file ajax call ({server_id})"
|
|
)
|
|
Console.warning(
|
|
f"Server ID not found in {page_name} file ajax call ({server_id})"
|
|
)
|
|
return
|
|
return True
|