crafty-4/app/classes/helpers/file_helpers.py
2024-08-16 15:49:24 -06:00

513 lines
19 KiB
Python

import os
import shutil
import logging
import pathlib
import tempfile
import zipfile
import zlib
import hashlib
from typing import BinaryIO
import mimetypes
from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED
import urllib.request
import ssl
import time
import certifi
from app.classes.helpers.helpers import Helpers
from app.classes.shared.console import Console
from app.classes.shared.websocket_manager import WebSocketManager
logger = logging.getLogger(__name__)
class FileHelpers:
allowed_quotes = ['"', "'", "`"]
def __init__(self, helper):
self.helper: Helpers = helper
self.mime_types = mimetypes.MimeTypes()
@staticmethod
def ssl_get_file(
url, out_path, out_file, max_retries=3, backoff_factor=2, headers=None
):
"""
Downloads a file from a given URL using HTTPS with SSL context verification,
retries with exponential backoff and providing download progress feedback.
Parameters:
- url (str): The URL of the file to download. Must start with "https".
- out_path (str): The local path where the file will be saved.
- out_file (str): The name of the file to save the downloaded content as.
- max_retries (int, optional): The maximum number of retry attempts
in case of download failure. Defaults to 3.
- backoff_factor (int, optional): The factor by which the wait time
increases after each failed attempt. Defaults to 2.
- headers (dict, optional):
A dictionary of HTTP headers to send with the request.
Returns:
- bool: True if the download was successful, False otherwise.
Raises:
- urllib.error.URLError: If a URL error occurs during the download.
- ssl.SSLError: If an SSL error occurs during the download.
Exception: If an unexpected error occurs during the download.
Note:
This method logs critical errors and download progress information.
Ensure that the logger is properly configured to capture this information.
"""
if not url.lower().startswith("https"):
logger.error("SSL File Get - Error: URL must start with https.")
return False
ssl_context = ssl.create_default_context(cafile=certifi.where())
if not headers:
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/58.0.3029.110 Safari/537.3"
)
}
req = urllib.request.Request(url, headers=headers)
write_path = os.path.join(out_path, out_file)
attempt = 0
logger.info(f"SSL File Get - Requesting remote: {url}")
file_path_full = os.path.join(out_path, out_file)
logger.info(f"SSL File Get - Download Destination: {file_path_full}")
while attempt < max_retries:
try:
with urllib.request.urlopen(req, context=ssl_context) as response:
total_size = response.getheader("Content-Length")
if total_size:
total_size = int(total_size)
downloaded = 0
with open(write_path, "wb") as file:
while True:
chunk = response.read(1024 * 1024) # 1 MB
if not chunk:
break
file.write(chunk)
downloaded += len(chunk)
if total_size:
progress = (downloaded / total_size) * 100
logger.info(
f"SSL File Get - Download progress: {progress:.2f}%"
)
return True
except (urllib.error.URLError, ssl.SSLError) as e:
logger.warning(f"SSL File Get - Attempt {attempt+1} failed: {e}")
time.sleep(backoff_factor**attempt)
except Exception as e:
logger.critical(f"SSL File Get - Unexpected error: {e}")
return False
finally:
attempt += 1
logger.error("SSL File Get - Maximum retries reached. Download failed.")
return False
@staticmethod
def del_dirs(path):
path = pathlib.Path(path)
for sub in path.iterdir():
if sub.is_dir():
# Delete folder if it is a folder
FileHelpers.del_dirs(sub)
else:
# Delete file if it is a file:
try:
sub.unlink()
except:
logger.error(f"Unable to delete file {sub}")
try:
# This removes the top-level folder:
path.rmdir()
except Exception as e:
logger.error("Unable to remove top level")
return e
return True
@staticmethod
def del_file(path):
path = pathlib.Path(path)
try:
logger.debug(f"Deleting file: {path}")
# Remove the file
os.remove(path)
return True
except (FileNotFoundError, PermissionError) as e:
logger.error(f"Path specified is not a file or does not exist. {path}")
return e
def check_mime_types(self, file_path):
m_type, _value = self.mime_types.guess_type(file_path)
return m_type
@staticmethod
def calculate_file_hash_blake2b(value_to_hash: pathlib.Path) -> bytes:
"""
Calculates blake2b hash from file at specified Path. Output bytes hash.
:param value_to_hash: Path to file to hash.
:return: blake2b hash bytes.
"""
# Hash input value
blake2 = hashlib.blake2b()
# Read file and hash. Catches file not found and permission error.
try:
with value_to_hash.open("rb") as f:
while True:
# Read 2^16 byte chunks, 64 KiB.
data = f.read(65536)
# Break if end of file
if not data:
break
blake2.update(data)
# Except file not found and permission error.
except FileNotFoundError as why:
raise FileNotFoundError(f"File at {value_to_hash} not found.") from why
except PermissionError as why:
raise PermissionError(
f"Insufficient permissions to read {value_to_hash}."
) from why
return blake2.digest()
@staticmethod
def calculate_file_hash_sha256(file_path: str) -> str:
"""
Takes one parameter of file path.
It will generate a SHA256 hash for the path and return it.
"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
@staticmethod
def calculate_buffer_hash(buffer: BinaryIO) -> str:
"""
Takes one argument of a stream buffer. Will return a
sha256 hash of the buffer
"""
sha256_hash = hashlib.sha256()
sha256_hash.update(buffer)
return sha256_hash.hexdigest()
@staticmethod
def copy_dir(src_path, dest_path, dirs_exist_ok=False):
# pylint: disable=unexpected-keyword-arg
shutil.copytree(src_path, dest_path, dirs_exist_ok=dirs_exist_ok)
@staticmethod
def copy_file(src_path, dest_path):
shutil.copy(src_path, dest_path)
@staticmethod
def move_dir(src_path, dest_path):
shutil.move(src_path, dest_path)
@staticmethod
def move_dir_exist(src_path, dest_path):
FileHelpers.copy_dir(src_path, dest_path, True)
FileHelpers.del_dirs(src_path)
@staticmethod
def move_file(src_path, dest_path):
shutil.move(src_path, dest_path)
@staticmethod
def make_archive(path_to_destination, path_to_zip, comment=""):
# create a ZipFile object
path_to_destination += ".zip"
with ZipFile(path_to_destination, "w") as zip_file:
zip_file.comment = bytes(
comment, "utf-8"
) # comments over 65535 bytes will be truncated
for root, _dirs, files in os.walk(path_to_zip, topdown=True):
ziproot = path_to_zip
for file in files:
try:
logger.info(f"backing up: {os.path.join(root, file)}")
if os.name == "nt":
zip_file.write(
os.path.join(root, file),
os.path.join(root.replace(ziproot, ""), file),
)
else:
zip_file.write(
os.path.join(root, file),
os.path.join(root.replace(ziproot, "/"), file),
)
except Exception as e:
logger.warning(
f"Error backing up: {os.path.join(root, file)}!"
f" - Error was: {e}"
)
return True
@staticmethod
def make_compressed_archive(path_to_destination, path_to_zip, comment=""):
# create a ZipFile object
path_to_destination += ".zip"
with ZipFile(path_to_destination, "w", ZIP_DEFLATED) as zip_file:
zip_file.comment = bytes(
comment, "utf-8"
) # comments over 65535 bytes will be truncated
for root, _dirs, files in os.walk(path_to_zip, topdown=True):
ziproot = path_to_zip
for file in files:
try:
logger.info(f"packaging: {os.path.join(root, file)}")
if os.name == "nt":
zip_file.write(
os.path.join(root, file),
os.path.join(root.replace(ziproot, ""), file),
)
else:
zip_file.write(
os.path.join(root, file),
os.path.join(root.replace(ziproot, "/"), file),
)
except Exception as e:
logger.warning(
f"Error packaging: {os.path.join(root, file)}!"
f" - Error was: {e}"
)
return True
def make_backup(
self,
path_to_destination,
path_to_zip,
excluded_dirs,
server_id,
backup_id,
comment="",
compressed=None,
):
# create a ZipFile object
path_to_destination += ".zip"
ex_replace = [p.replace("\\", "/") for p in excluded_dirs]
total_bytes = 0
dir_bytes = Helpers.get_dir_size(path_to_zip)
results = {
"percent": 0,
"total_files": self.helper.human_readable_file_size(dir_bytes),
}
WebSocketManager().broadcast_page_params(
"/panel/server_detail",
{"id": str(server_id)},
"backup_status",
results,
)
WebSocketManager().broadcast_page_params(
"/panel/edit_backup",
{"id": str(server_id)},
"backup_status",
results,
)
# Set the compression mode based on the `compressed` parameter
compression_mode = ZIP_DEFLATED if compressed else ZIP_STORED
with ZipFile(path_to_destination, "w", compression_mode) as zip_file:
zip_file.comment = bytes(
comment, "utf-8"
) # comments over 65535 bytes will be truncated
for root, dirs, files in os.walk(path_to_zip, topdown=True):
for l_dir in dirs[:]:
# make all paths in exclusions a unix style slash
# to match directories.
if str(os.path.join(root, l_dir)).replace("\\", "/") in ex_replace:
dirs.remove(l_dir)
ziproot = path_to_zip
# iterate through list of files
for file in files:
# check if file/dir is in exclusions list.
# Only proceed if not exluded.
if (
str(os.path.join(root, file)).replace("\\", "/")
not in ex_replace
and file != "crafty.sqlite"
):
try:
logger.debug(f"backing up: {os.path.join(root, file)}")
# add trailing slash to zip root dir if not windows.
if os.name == "nt":
zip_file.write(
os.path.join(root, file),
os.path.join(root.replace(ziproot, ""), file),
)
else:
zip_file.write(
os.path.join(root, file),
os.path.join(root.replace(ziproot, "/"), file),
)
except Exception as e:
logger.warning(
f"Error backing up: {os.path.join(root, file)}!"
f" - Error was: {e}"
)
# debug logging for exlusions list
else:
logger.debug(f"Found {file} in exclusion list. Skipping...")
# add current file bytes to total bytes.
total_bytes += os.path.getsize(os.path.join(root, file))
# calcualte percentage based off total size and current archive size
percent = round((total_bytes / dir_bytes) * 100, 2)
# package results
results = {
"percent": percent,
"total_files": self.helper.human_readable_file_size(dir_bytes),
"backup_id": backup_id,
}
# send status results to page.
WebSocketManager().broadcast_page_params(
"/panel/server_detail",
{"id": str(server_id)},
"backup_status",
results,
)
WebSocketManager().broadcast_page_params(
"/panel/edit_backup",
{"id": str(server_id)},
"backup_status",
results,
)
return True
@staticmethod
def move_item_file_or_dir(old_dir, new_dir, item) -> None:
"""
Move item to new location if it is either a file or a dir. Will raise
shutil.Error for any errors encountered.
Args:
old_dir: Old location.
new_dir: New location.
item: File or directory name.
Returns: None
"""
try:
# Check if source item is a directory or a file.
if os.path.isdir(os.path.join(old_dir, item)):
# Source item is a directory
FileHelpers.move_dir_exist(
os.path.join(old_dir, item),
os.path.join(new_dir, item),
)
else:
# Source item is a file.
FileHelpers.move_file(
os.path.join(old_dir, item),
os.path.join(new_dir, item),
)
# Error raised by shutil if an error is encountered. Raising the same error if
# encountered.
except shutil.Error as why:
raise RuntimeError(
f"Error moving {old_dir} to {new_dir} with information: {why}"
) from why
def unzip_file(self, zip_path, server_update: bool = False) -> None:
"""
Unzips zip file at zip_path to location generated at new_dir based on zip
contents.
Args:
zip_path: Path to zip file to unzip.
server_update: Will skip ignored items list if not set to true. Used for
updating bedrock servers.
Returns: None
"""
ignored_names: list = [
"server.properties",
"permissions.json",
"allowlist.json",
]
# Get directory without zipfile name
new_dir = pathlib.Path(zip_path).parents[0]
# make sure the directory we're unzipping this to exists
Helpers.ensure_dir_exists(new_dir)
# we'll make a temporary directory to unzip this to.
temp_dir = tempfile.mkdtemp()
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
# we'll extract this to the temp dir using zipfile module
zip_ref.extractall(temp_dir)
# Catch zipfile extract all error or file open errors.
except (ValueError, FileNotFoundError, PermissionError) as why:
Console.error(f"Unzip failed with information: {why}")
raise RuntimeError(f"Unzip failed for path: {zip_path}") from why
# we'll iterate through the top level directory moving everything
# out of the temp directory and into it's final home.
for item in os.listdir(temp_dir):
# if the file is one of our ignored names we'll skip it
if item in ignored_names and server_update:
continue
# we handle files and dirs differently or we'll crash out.
try:
self.move_item_file_or_dir(temp_dir, new_dir, item)
except shutil.Error as ex:
logger.error(f"ERROR IN ZIP IMPORT: {ex}")
@staticmethod
def unzip_server(zip_path, user_id):
if Helpers.check_file_perms(zip_path):
temp_dir = tempfile.mkdtemp()
with zipfile.ZipFile(zip_path, "r") as zip_ref:
# extracts archive to temp directory
zip_ref.extractall(temp_dir)
if user_id:
return temp_dir
@staticmethod
def zlib_compress_bytes(bytes_to_compress: bytes) -> bytes:
"""Compress provided bytes using zlib default compression settings. Returns
compressed bytes.
Args:
bytes_to_compress (bytes): Bytes to compress.
Returns:
bytes: Compressed bytes.
"""
return zlib.compress(bytes_to_compress)
@staticmethod
def zlib_decompress_bytes(bytes_to_decompress: bytes) -> bytes:
"""Decompress provided bytes using zlib default settings. Returns decompressed
bytes.
Args:
bytes_to_decompress (bytes): Compresses bytes to decompress.
Returns:
bytes: Decompressed bytes.
"""
return zlib.decompress(bytes_to_decompress)