mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
339 lines
14 KiB
Python
339 lines
14 KiB
Python
import os
|
|
import shutil
|
|
import logging
|
|
import pathlib
|
|
import tempfile
|
|
import zipfile
|
|
from zipfile import ZipFile, ZIP_DEFLATED
|
|
|
|
from app.classes.shared.helpers import Helpers
|
|
from app.classes.shared.console import Console
|
|
from app.classes.shared.websocket_manager import WebSocketManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileHelpers:
|
|
allowed_quotes = ['"', "'", "`"]
|
|
|
|
def __init__(self, helper):
|
|
self.helper: Helpers = helper
|
|
|
|
@staticmethod
|
|
def del_dirs(path):
|
|
path = pathlib.Path(path)
|
|
for sub in path.iterdir():
|
|
if sub.is_dir():
|
|
# Delete folder if it is a folder
|
|
FileHelpers.del_dirs(sub)
|
|
else:
|
|
# Delete file if it is a file:
|
|
try:
|
|
sub.unlink()
|
|
except:
|
|
logger.error(f"Unable to delete file {sub}")
|
|
try:
|
|
# This removes the top-level folder:
|
|
path.rmdir()
|
|
except Exception as e:
|
|
logger.error("Unable to remove top level")
|
|
return e
|
|
return True
|
|
|
|
@staticmethod
|
|
def del_file(path):
|
|
path = pathlib.Path(path)
|
|
try:
|
|
logger.debug(f"Deleting file: {path}")
|
|
# Remove the file
|
|
os.remove(path)
|
|
return True
|
|
except (FileNotFoundError, PermissionError) as e:
|
|
logger.error(f"Path specified is not a file or does not exist. {path}")
|
|
return e
|
|
|
|
@staticmethod
|
|
def copy_dir(src_path, dest_path, dirs_exist_ok=False):
|
|
# pylint: disable=unexpected-keyword-arg
|
|
shutil.copytree(src_path, dest_path, dirs_exist_ok=dirs_exist_ok)
|
|
|
|
@staticmethod
|
|
def copy_file(src_path, dest_path):
|
|
shutil.copy(src_path, dest_path)
|
|
|
|
@staticmethod
|
|
def move_dir(src_path, dest_path):
|
|
FileHelpers.copy_dir(src_path, dest_path)
|
|
FileHelpers.del_dirs(src_path)
|
|
|
|
@staticmethod
|
|
def move_dir_exist(src_path, dest_path):
|
|
FileHelpers.copy_dir(src_path, dest_path, True)
|
|
FileHelpers.del_dirs(src_path)
|
|
|
|
@staticmethod
|
|
def move_file(src_path, dest_path):
|
|
FileHelpers.copy_file(src_path, dest_path)
|
|
FileHelpers.del_file(src_path)
|
|
|
|
@staticmethod
|
|
def make_archive(path_to_destination, path_to_zip, comment=""):
|
|
# create a ZipFile object
|
|
path_to_destination += ".zip"
|
|
with ZipFile(path_to_destination, "w") as zip_file:
|
|
zip_file.comment = bytes(
|
|
comment, "utf-8"
|
|
) # comments over 65535 bytes will be truncated
|
|
for root, _dirs, files in os.walk(path_to_zip, topdown=True):
|
|
ziproot = path_to_zip
|
|
for file in files:
|
|
try:
|
|
logger.info(f"backing up: {os.path.join(root, file)}")
|
|
if os.name == "nt":
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, ""), file),
|
|
)
|
|
else:
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, "/"), file),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error backing up: {os.path.join(root, file)}!"
|
|
f" - Error was: {e}"
|
|
)
|
|
return True
|
|
|
|
@staticmethod
|
|
def make_compressed_archive(path_to_destination, path_to_zip, comment=""):
|
|
# create a ZipFile object
|
|
path_to_destination += ".zip"
|
|
with ZipFile(path_to_destination, "w", ZIP_DEFLATED) as zip_file:
|
|
zip_file.comment = bytes(
|
|
comment, "utf-8"
|
|
) # comments over 65535 bytes will be truncated
|
|
for root, _dirs, files in os.walk(path_to_zip, topdown=True):
|
|
ziproot = path_to_zip
|
|
for file in files:
|
|
try:
|
|
logger.info(f"packaging: {os.path.join(root, file)}")
|
|
if os.name == "nt":
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, ""), file),
|
|
)
|
|
else:
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, "/"), file),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error packaging: {os.path.join(root, file)}!"
|
|
f" - Error was: {e}"
|
|
)
|
|
|
|
return True
|
|
|
|
def make_compressed_backup(
|
|
self, path_to_destination, path_to_zip, excluded_dirs, server_id, comment=""
|
|
):
|
|
# create a ZipFile object
|
|
path_to_destination += ".zip"
|
|
ex_replace = [p.replace("\\", "/") for p in excluded_dirs]
|
|
total_bytes = 0
|
|
dir_bytes = Helpers.get_dir_size(path_to_zip)
|
|
results = {
|
|
"percent": 0,
|
|
"total_files": self.helper.human_readable_file_size(dir_bytes),
|
|
}
|
|
WebSocketManager().broadcast_page_params(
|
|
"/panel/server_detail",
|
|
{"id": str(server_id)},
|
|
"backup_status",
|
|
results,
|
|
)
|
|
with ZipFile(path_to_destination, "w", ZIP_DEFLATED) as zip_file:
|
|
zip_file.comment = bytes(
|
|
comment, "utf-8"
|
|
) # comments over 65535 bytes will be truncated
|
|
for root, dirs, files in os.walk(path_to_zip, topdown=True):
|
|
for l_dir in dirs:
|
|
if str(os.path.join(root, l_dir)).replace("\\", "/") in ex_replace:
|
|
dirs.remove(l_dir)
|
|
ziproot = path_to_zip
|
|
for file in files:
|
|
if (
|
|
str(os.path.join(root, file)).replace("\\", "/")
|
|
not in ex_replace
|
|
and file != "crafty.sqlite"
|
|
):
|
|
try:
|
|
logger.info(f"backing up: {os.path.join(root, file)}")
|
|
if os.name == "nt":
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, ""), file),
|
|
)
|
|
else:
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, "/"), file),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error backing up: {os.path.join(root, file)}!"
|
|
f" - Error was: {e}"
|
|
)
|
|
total_bytes += os.path.getsize(os.path.join(root, file))
|
|
percent = round((total_bytes / dir_bytes) * 100, 2)
|
|
results = {
|
|
"percent": percent,
|
|
"total_files": self.helper.human_readable_file_size(dir_bytes),
|
|
}
|
|
WebSocketManager().broadcast_page_params(
|
|
"/panel/server_detail",
|
|
{"id": str(server_id)},
|
|
"backup_status",
|
|
results,
|
|
)
|
|
|
|
return True
|
|
|
|
def make_backup(
|
|
self, path_to_destination, path_to_zip, excluded_dirs, server_id, comment=""
|
|
):
|
|
# create a ZipFile object
|
|
path_to_destination += ".zip"
|
|
ex_replace = [p.replace("\\", "/") for p in excluded_dirs]
|
|
total_bytes = 0
|
|
dir_bytes = Helpers.get_dir_size(path_to_zip)
|
|
results = {
|
|
"percent": 0,
|
|
"total_files": self.helper.human_readable_file_size(dir_bytes),
|
|
}
|
|
WebSocketManager().broadcast_page_params(
|
|
"/panel/server_detail",
|
|
{"id": str(server_id)},
|
|
"backup_status",
|
|
results,
|
|
)
|
|
with ZipFile(path_to_destination, "w") as zip_file:
|
|
zip_file.comment = bytes(
|
|
comment, "utf-8"
|
|
) # comments over 65535 bytes will be truncated
|
|
for root, dirs, files in os.walk(path_to_zip, topdown=True):
|
|
for l_dir in dirs[:]:
|
|
# make all paths in exclusions a unix style slash
|
|
# to match directories.
|
|
if str(os.path.join(root, l_dir)).replace("\\", "/") in ex_replace:
|
|
dirs.remove(l_dir)
|
|
ziproot = path_to_zip
|
|
# iterate through list of files
|
|
for file in files:
|
|
# check if file/dir is in exclusions list.
|
|
# Only proceed if not exluded.
|
|
if (
|
|
str(os.path.join(root, file)).replace("\\", "/")
|
|
not in ex_replace
|
|
and file != "crafty.sqlite"
|
|
):
|
|
try:
|
|
logger.debug(f"backing up: {os.path.join(root, file)}")
|
|
# add trailing slash to zip root dir if not windows.
|
|
if os.name == "nt":
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, ""), file),
|
|
)
|
|
else:
|
|
zip_file.write(
|
|
os.path.join(root, file),
|
|
os.path.join(root.replace(ziproot, "/"), file),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error backing up: {os.path.join(root, file)}!"
|
|
f" - Error was: {e}"
|
|
)
|
|
# debug logging for exlusions list
|
|
else:
|
|
logger.debug(f"Found {file} in exclusion list. Skipping...")
|
|
|
|
# add current file bytes to total bytes.
|
|
total_bytes += os.path.getsize(os.path.join(root, file))
|
|
# calcualte percentage based off total size and current archive size
|
|
percent = round((total_bytes / dir_bytes) * 100, 2)
|
|
# package results
|
|
results = {
|
|
"percent": percent,
|
|
"total_files": self.helper.human_readable_file_size(dir_bytes),
|
|
}
|
|
# send status results to page.
|
|
WebSocketManager().broadcast_page_params(
|
|
"/panel/server_detail",
|
|
{"id": str(server_id)},
|
|
"backup_status",
|
|
results,
|
|
)
|
|
return True
|
|
|
|
@staticmethod
|
|
def unzip_file(zip_path, server_update=False):
|
|
ignored_names = ["server.properties", "permissions.json", "allowlist.json"]
|
|
# Get directory without zipfile name
|
|
new_dir = pathlib.Path(zip_path).parents[0]
|
|
# make sure we're able to access the zip file
|
|
if Helpers.check_file_perms(zip_path) and os.path.isfile(zip_path):
|
|
# make sure the directory we're unzipping this to exists
|
|
Helpers.ensure_dir_exists(new_dir)
|
|
# we'll make a temporary directory to unzip this to.
|
|
temp_dir = tempfile.mkdtemp()
|
|
try:
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
# we'll extract this to the temp dir using zipfile module
|
|
zip_ref.extractall(temp_dir)
|
|
# we'll iterate through the top level directory moving everything
|
|
# out of the temp directory and into it's final home.
|
|
for item in os.listdir(temp_dir):
|
|
# if the file is one of our ignored names we'll skip it
|
|
if item in ignored_names and server_update:
|
|
continue
|
|
# we handle files and dirs differently or we'll crash out.
|
|
if os.path.isdir(os.path.join(temp_dir, item)):
|
|
try:
|
|
FileHelpers.move_dir_exist(
|
|
os.path.join(temp_dir, item),
|
|
os.path.join(new_dir, item),
|
|
)
|
|
except Exception as ex:
|
|
logger.error(f"ERROR IN ZIP IMPORT: {ex}")
|
|
else:
|
|
try:
|
|
FileHelpers.move_file(
|
|
os.path.join(temp_dir, item),
|
|
os.path.join(new_dir, item),
|
|
)
|
|
except Exception as ex:
|
|
logger.error(f"ERROR IN ZIP IMPORT: {ex}")
|
|
except Exception as ex:
|
|
Console.error(ex)
|
|
else:
|
|
return "false"
|
|
return
|
|
|
|
def unzip_server(self, zip_path, user_id):
|
|
if Helpers.check_file_perms(zip_path):
|
|
temp_dir = tempfile.mkdtemp()
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
# extracts archive to temp directory
|
|
zip_ref.extractall(temp_dir)
|
|
if user_id:
|
|
return temp_dir
|