crafty-4/app/classes/shared/helpers.py

1224 lines
43 KiB
Python
Raw Normal View History

import contextlib
2020-08-12 00:36:09 +00:00
import os
import re
2020-08-12 00:36:09 +00:00
import sys
import json
import tempfile
2020-08-23 22:43:28 +00:00
import time
2020-08-12 00:36:09 +00:00
import uuid
import string
import base64
import socket
import secrets
2020-08-12 00:36:09 +00:00
import logging
import html
import zipfile
import pathlib
import ctypes
import shutil
2023-03-03 22:31:29 +00:00
import shlex
2022-06-20 17:05:39 +00:00
import subprocess
import itertools
from datetime import datetime, timezone
2020-08-12 00:36:09 +00:00
from socket import gethostname
from contextlib import redirect_stderr, suppress
2022-09-12 19:34:47 +00:00
import libgravatar
from packaging import version as pkg_version
2020-08-12 00:36:09 +00:00
from app.classes.shared.null_writer import NullWriter
from app.classes.shared.console import Console
2022-03-08 04:40:44 +00:00
from app.classes.shared.installer import installer
from app.classes.shared.translation import Translation
2020-08-12 00:36:09 +00:00
with redirect_stderr(NullWriter()):
import psutil
# winreg is only a package on windows-python. We will only import
# this on windows systems to avoid a module not found error
# this is only needed for windows java path shenanigans
if os.name == "nt":
import winreg
2022-03-08 05:44:26 +00:00
logger = logging.getLogger(__name__)
2020-08-12 00:36:09 +00:00
try:
import requests
2022-03-08 04:40:44 +00:00
from requests import get
2020-08-12 00:36:09 +00:00
from OpenSSL import crypto
from argon2 import PasswordHasher
2020-08-12 00:36:09 +00:00
except ModuleNotFoundError as err:
2022-03-08 05:44:26 +00:00
logger.critical(f"Import Error: Unable to load {err.name} module", exc_info=True)
print(f"Import Error: Unable to load {err.name} module")
installer.do_install()
2020-08-12 00:36:09 +00:00
2020-08-12 00:36:09 +00:00
class Helpers:
allowed_quotes = ['"', "'", "`"]
2020-08-12 00:36:09 +00:00
def __init__(self):
2020-08-12 00:36:09 +00:00
self.root_dir = os.path.abspath(os.path.curdir)
self.config_dir = os.path.join(self.root_dir, "app", "config")
self.webroot = os.path.join(self.root_dir, "app", "frontend")
self.servers_dir = os.path.join(self.root_dir, "servers")
self.backup_path = os.path.join(self.root_dir, "backups")
self.migration_dir = os.path.join(self.root_dir, "app", "migrations")
self.dir_migration = False
self.session_file = os.path.join(self.root_dir, "app", "config", "session.lock")
self.settings_file = os.path.join(self.root_dir, "app", "config", "config.json")
self.ensure_dir_exists(os.path.join(self.root_dir, "app", "config", "db"))
self.db_path = os.path.join(
self.root_dir, "app", "config", "db", "crafty.sqlite"
)
2024-05-04 19:15:43 +00:00
self.big_bucket_cache = os.path.join(self.config_dir, "bigbucket.json")
self.credits_cache = os.path.join(self.config_dir, "credits.json")
self.passhasher = PasswordHasher()
2020-08-12 00:36:09 +00:00
self.exiting = False
self.translation = Translation(self)
self.update_available = False
self.ignored_names = ["crafty_managed.txt", "db_stats"]
2023-11-03 20:41:23 +00:00
self.crafty_starting = False
2024-02-20 02:14:15 +00:00
self.minimum_password_length = 8
2022-03-08 04:40:44 +00:00
@staticmethod
def auto_installer_fix(ex):
logger.critical(f"Import Error: Unable to load {ex.name} module", exc_info=True)
print(f"Import Error: Unable to load {ex.name} module")
installer.do_install()
2022-07-20 22:53:31 +00:00
def check_remote_version(self):
"""
Check if the remote version is newer than the local version
Returning remote version if it is newer, otherwise False.
"""
try:
# Get tags from Gitlab, select the latest and parse the semver
response = get(
2023-02-15 23:40:20 +00:00
"https://gitlab.com/api/v4/projects/20430749/repository/tags", timeout=1
2022-07-20 22:53:31 +00:00
)
if response.status_code == 200:
remote_version = pkg_version.parse(json.loads(response.text)[0]["name"])
2022-07-20 22:53:31 +00:00
# Get local version data from the file and parse the semver
local_version = pkg_version.parse(self.get_version_string())
2022-07-20 22:53:31 +00:00
if remote_version > local_version:
return remote_version
except Exception as e:
logger.error(f"Unable to check for new crafty version! \n{e}")
return False
2022-08-31 18:04:04 +00:00
@staticmethod
def get_latest_bedrock_url():
"""
Get latest bedrock executable url \n\n
returns url if successful, False if not
"""
2024-03-02 21:53:17 +00:00
url = "https://www.minecraft.net/en-us/download/server/bedrock/"
2022-08-31 18:04:04 +00:00
headers = {
"Accept-Encoding": "identity",
"Accept-Language": "en",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.0.0 Safari/537.36"
),
}
2024-03-02 22:00:52 +00:00
target_win = 'https://minecraft.azureedge.net/bin-win/[^"]*'
target_linux = 'https://minecraft.azureedge.net/bin-linux/[^"]*'
2022-08-31 18:04:04 +00:00
try:
# Get minecraft server download page
# (hopefully the don't change the structure)
2023-02-15 23:40:20 +00:00
download_page = get(url, headers=headers, timeout=1)
2022-08-31 18:04:04 +00:00
# Search for our string targets
win_download_url = re.search(target_win, download_page.text).group(0)
linux_download_url = re.search(target_linux, download_page.text).group(0)
if os.name == "nt":
return win_download_url
return linux_download_url
except Exception as e:
logger.error(f"Unable to resolve remote bedrock download url! \n{e}")
return False
2023-03-03 22:31:29 +00:00
def get_execution_java(self, value, execution_command):
if self.is_os_windows():
execution_list = shlex.split(execution_command, posix=False)
else:
execution_list = shlex.split(execution_command, posix=True)
if (
not any(value in path for path in self.find_java_installs())
and value != "java"
):
return
if value != "java":
if self.is_os_windows():
execution_list[0] = '"' + value + '/bin/java"'
else:
execution_list[0] = '"' + value + '"'
else:
execution_list[0] = "java"
execution_command = ""
for item in execution_list:
execution_command += item + " "
return execution_command
def detect_java(self):
if len(self.find_java_installs()) > 0:
return True
2023-02-17 00:36:18 +00:00
# We'll use this as a fallback for systems
# That do not properly setup reg keys or
# Update alternatives
if self.is_os_windows():
if shutil.which("java.exe"):
return True
else:
2023-02-17 00:36:18 +00:00
if shutil.which("java"):
return True
return False
2022-06-20 17:05:39 +00:00
@staticmethod
def find_java_installs():
# If we're windows return oracle java versions,
# otherwise java vers need to be manual.
if os.name == "nt":
# Adapted from LeeKamentsky >>>
# https://github.com/LeeKamentsky/python-javabridge/blob/master/javabridge/locate.py
jdk_key_paths = (
"SOFTWARE\\JavaSoft\\JDK",
"SOFTWARE\\JavaSoft\\Java Development Kit",
)
java_paths = []
for jdk_key_path in jdk_key_paths:
try:
with suppress(OSError), winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, jdk_key_path
) as kjdk:
for i in itertools.count():
version = winreg.EnumKey(kjdk, i)
kjdk_current = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
jdk_key_path,
)
kjdk_current = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
jdk_key_path + "\\" + version,
)
kjdk_current_values = dict( # pylint: disable=consider-using-dict-comprehension
[
winreg.EnumValue(kjdk_current, i)[:2]
for i in range(winreg.QueryInfoKey(kjdk_current)[1])
]
)
java_paths.append(kjdk_current_values["JavaHome"])
except OSError as e:
if e.errno == 2:
continue
raise
return java_paths
# If we get here we're linux so we will use 'update-alternatives'
# (If distro does not have update-alternatives then manual input.)
# Sometimes u-a will be in /sbin on some distros (which is annoying.)
ua_path = "/usr/bin/update-alternatives"
if not os.path.exists(ua_path):
logger.warning("update-alternatives not found! Trying /sbin")
ua_path = "/usr/sbin/update-alternatives"
2022-06-20 17:05:39 +00:00
try:
paths = subprocess.check_output(
[ua_path, "--list", "java"], encoding="utf8"
2022-06-20 17:05:39 +00:00
)
if re.match("^(/[^/ ]*)+/?$", paths):
return paths.split("\n")
except Exception as e:
logger.error(f"Java Detect Error: {e}")
return []
2022-06-20 17:05:39 +00:00
@staticmethod
2022-05-05 11:02:23 +00:00
def float_to_string(gbs: float):
s = str(float(gbs) * 1000).rstrip("0").rstrip(".")
return s
@staticmethod
def check_file_perms(path):
try:
2022-06-17 22:27:55 +00:00
with open(path, "r", encoding="utf-8"):
pass
logger.info(f"{path} is readable")
return True
except PermissionError:
return False
@staticmethod
def is_file_older_than_x_days(file, days=1):
if Helpers.check_file_exists(file):
2020-08-23 22:43:28 +00:00
file_time = os.path.getmtime(file)
# Check against 24 hours
2022-06-14 12:40:57 +00:00
return (time.time() - file_time) / 3600 > 24 * days
logger.error(f"{file} does not exist")
return True
2020-08-23 22:43:28 +00:00
def get_servers_root_dir(self):
return self.servers_dir
2022-06-15 02:33:02 +00:00
@staticmethod
def which_java():
2022-06-15 03:11:40 +00:00
# Adapted from LeeKamentsky >>>
# https://github.com/LeeKamentsky/python-javabridge/blob/master/javabridge/locate.py
2022-06-15 02:33:02 +00:00
jdk_key_paths = (
"SOFTWARE\\JavaSoft\\JDK",
"SOFTWARE\\JavaSoft\\Java Development Kit",
)
for jdk_key_path in jdk_key_paths:
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, jdk_key_path) as kjdk:
2022-06-15 03:52:19 +00:00
kjdk_values = (
dict( # pylint: disable=consider-using-dict-comprehension
[
winreg.EnumValue(kjdk, i)[:2]
for i in range(winreg.QueryInfoKey(kjdk)[1])
]
)
2022-06-15 02:33:02 +00:00
)
current_version = kjdk_values["CurrentVersion"]
kjdk_current = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, jdk_key_path + "\\" + current_version
)
2022-06-15 03:52:19 +00:00
kjdk_current_values = (
dict( # pylint: disable=consider-using-dict-comprehension
[
winreg.EnumValue(kjdk_current, i)[:2]
for i in range(winreg.QueryInfoKey(kjdk_current)[1])
]
)
2022-06-15 02:33:02 +00:00
)
return kjdk_current_values["JavaHome"]
except OSError as e:
2022-06-15 02:33:02 +00:00
if e.errno == 2:
continue
2022-06-15 03:15:46 +00:00
raise
2022-06-15 02:33:02 +00:00
@staticmethod
def check_internet():
try:
requests.get("https://ntp.org", timeout=1)
return True
except Exception:
2023-03-08 23:46:06 +00:00
try:
logger.error("ntp.org ping failed. Falling back to google")
requests.get("https://google.com", timeout=1)
return True
except Exception:
return False
@staticmethod
def check_address_status(address):
try:
2023-05-15 01:49:37 +00:00
response = requests.get(address, timeout=2)
return (
response.status_code // 100 == 2
) # Check if the status code starts with 2
except requests.RequestException:
return False
@staticmethod
def check_port(server_port):
try:
2023-02-15 23:40:20 +00:00
ip = get("https://api.ipify.org", timeout=1).content.decode("utf8")
except:
ip = "google.com"
2021-12-21 22:39:24 +00:00
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
a_socket.settimeout(20.0)
location = (ip, server_port)
result_of_check = a_socket.connect_ex(location)
2021-12-21 22:39:24 +00:00
a_socket.close()
2022-06-14 12:40:57 +00:00
return result_of_check == 0
@staticmethod
def check_server_conn(server_port):
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
a_socket.settimeout(10.0)
ip = "127.0.0.1"
2021-12-21 22:39:24 +00:00
location = (ip, server_port)
result_of_check = a_socket.connect_ex(location)
a_socket.close()
2021-12-21 22:39:24 +00:00
2022-06-14 12:40:57 +00:00
return result_of_check == 0
def create_pass(self):
# Maximum length of password needed
max_len = 64
# Declare string of the character that we need in our password
digits = string.digits
locase = string.ascii_lowercase
upcase = string.ascii_uppercase
2023-12-07 16:05:26 +00:00
symbols = "!@#$%^&*" # Reducing to avoid issues with ([]{}<>,'`) etc
# Combine all the character strings above to form one string
combo = digits + upcase + locase + symbols
# Randomly select at least one character from each character set above
rand_digit = secrets.choice(digits)
rand_upper = secrets.choice(upcase)
rand_lower = secrets.choice(locase)
rand_symbol = secrets.choice(symbols)
# Combine the character randomly selected above
temp_pass = rand_digit + rand_upper + rand_lower + rand_symbol
# Fill the rest of the password length by selecting randomly char list
for _ in range(max_len - 4):
temp_pass += secrets.choice(combo)
# Shuffle the temporary password to prevent predictable patterns
temp_pass_list = list(temp_pass)
secrets.SystemRandom().shuffle(temp_pass_list)
# Form the password by concatenating the characters
2023-11-25 20:47:45 +00:00
password = "".join(temp_pass_list)
# Return completed password
return password
2021-09-25 19:29:03 +00:00
@staticmethod
def cmdparse(cmd_in):
# Parse a string into arguments
cmd_out = [] # "argv" output array
cmd_index = (
-1
) # command index - pointer to the argument we're building in cmd_out
new_param = True # whether we're creating a new argument/parameter
2021-09-25 19:29:03 +00:00
esc = False # whether an escape character was encountered
quote_char = None # if we're dealing with a quote, save the quote type here.
# Nested quotes to be dealt with by the command
for char in cmd_in: # for character in string
if (
new_param
): # if set, begin a new argument and increment the command index.
# Continue the loop.
if char == " ":
2021-09-25 19:29:03 +00:00
continue
2022-06-14 12:40:57 +00:00
cmd_index += 1
cmd_out.append("")
new_param = False
if esc: # if we encountered an escape character on the last loop,
# append this char regardless of what it is
if char not in Helpers.allowed_quotes:
cmd_out[cmd_index] += "\\"
cmd_out[cmd_index] += char
2021-09-25 19:29:03 +00:00
esc = False
else:
if char == "\\": # if the current character is an escape character,
# set the esc flag and continue to next loop
2021-09-25 19:29:03 +00:00
esc = True
elif (
char == " " and quote_char is None
): # if we encounter a space and are not dealing with a quote,
# set the new argument flag and continue to next loop
new_param = True
elif (
char == quote_char
): # if we encounter the character that matches our start quote,
# end the quote and continue to next loop
quote_char = None
elif quote_char is None and (
char in Helpers.allowed_quotes
): # if we're not in the middle of a quote and we get a quotable
# character, start a quote and proceed to the next loop
quote_char = char
else: # else, just store the character in the current arg
cmd_out[cmd_index] += char
2021-09-25 19:29:03 +00:00
return cmd_out
def get_setting(self, key, default_return=False):
2020-08-12 00:36:09 +00:00
try:
with open(self.settings_file, "r", encoding="utf-8") as f:
data = json.load(f)
2020-08-12 00:36:09 +00:00
if key in data.keys():
return data.get(key)
2022-05-25 13:21:34 +00:00
logger.error(f'Config File Error: Setting "{key}" does not exist')
Console.error(f'Config File Error: Setting "{key}" does not exist')
2020-08-12 00:36:09 +00:00
except Exception as e:
logger.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
Console.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
2020-08-12 00:36:09 +00:00
return default_return
2020-08-12 00:36:09 +00:00
def set_settings(self, data):
try:
with open(self.settings_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
except Exception as e:
logger.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
Console.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
return False
return True
@staticmethod
def get_master_config():
2023-02-25 19:39:29 +00:00
# Let's get the mounts and only show the first one by default
mounts = Helpers.get_all_mounts()
if len(mounts) != 0:
mounts = mounts[0]
# Make changes for users' local config.json files here. As of 4.0.20
# Config.json was removed from the repo to make it easier for users
# To make non-breaking changes to the file.
return {
"https_port": 8443,
"language": "en_EN",
"cookie_expire": 30,
"show_errors": True,
"history_max_age": 7,
2023-02-05 18:06:52 +00:00
"stats_update_frequency_seconds": 30,
"delete_default_json": False,
"show_contribute_link": True,
"virtual_terminal_lines": 70,
"max_log_lines": 700,
"max_audit_entries": 300,
2023-01-29 23:21:04 +00:00
"disabled_language_files": [],
"stream_size_GB": 1,
"keywords": ["help", "chunk"],
"allow_nsfw_profile_pictures": False,
"enable_user_self_delete": False,
2023-01-29 22:00:28 +00:00
"reset_secrets_on_next_boot": False,
2023-02-25 19:39:29 +00:00
"monitored_mounts": mounts,
2023-02-05 18:06:52 +00:00
"dir_size_poll_freq_minutes": 5,
2023-03-02 18:40:45 +00:00
"crafty_logs_delete_after_days": 0,
"big_bucket_repo": "https://jars.arcadiatech.org",
}
def get_all_settings(self):
try:
with open(self.settings_file, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
data = {}
logger.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
Console.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
return data
2022-12-14 19:54:52 +00:00
@staticmethod
def get_all_mounts():
mounts = []
for item in psutil.disk_partitions(all=False):
mounts.append(item.mountpoint)
return mounts
def is_subdir(self, child_path, parent_path):
server_path = os.path.realpath(child_path)
root_dir = os.path.realpath(parent_path)
2022-12-14 19:54:52 +00:00
if self.is_os_windows():
try:
relative = os.path.relpath(server_path, root_dir)
except:
# Windows will crash out if two paths are on different
# Drives We can happily return false if this is the case.
# Since two different drives will not be relative to eachother.
return False
else:
relative = os.path.relpath(server_path, root_dir)
2022-12-14 19:54:52 +00:00
if relative.startswith(os.pardir):
return False
2022-12-16 23:47:36 +00:00
return True
2022-12-14 19:54:52 +00:00
2022-05-25 13:21:34 +00:00
def set_setting(self, key, new_value):
try:
with open(self.settings_file, "r", encoding="utf-8") as f:
data = json.load(f)
if key in data.keys():
data[key] = new_value
2022-05-25 13:21:34 +00:00
with open(self.settings_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
return True
2022-05-25 13:21:34 +00:00
logger.error(f'Config File Error: Setting "{key}" does not exist')
Console.error(f'Config File Error: Setting "{key}" does not exist')
except Exception as e:
logger.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
Console.critical(
f"Config File Error: Unable to read {self.settings_file} due to {e}"
)
2022-05-25 13:21:34 +00:00
return False
2022-09-27 01:23:47 +00:00
@staticmethod
def get_themes():
return ["default", "dark", "light", "ronald"]
@staticmethod
def get_local_ip():
2020-08-12 00:36:09 +00:00
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(("10.255.255.255", 1))
ip = s.getsockname()[0]
2020-08-12 00:36:09 +00:00
except Exception:
ip = "127.0.0.1"
2020-08-12 00:36:09 +00:00
finally:
s.close()
return ip
2020-08-12 00:36:09 +00:00
def get_version(self):
version_data = {}
try:
with open(
os.path.join(self.config_dir, "version.json"), "r", encoding="utf-8"
) as f:
2020-08-12 00:36:09 +00:00
version_data = json.load(f)
except Exception as e:
Console.critical(f"Unable to get version data! \n{e}")
2020-08-12 00:36:09 +00:00
return version_data
def get_announcements(self):
try:
data = []
2023-09-03 19:26:04 +00:00
response = requests.get("https://craftycontrol.com/notify", timeout=2)
data = json.loads(response.content)
if self.update_available:
data.append(self.update_available)
return data
except Exception as e:
logger.error(f"Failed to fetch notifications with error: {e}")
if self.update_available:
data = [self.update_available]
else:
return False
def get_version_string(self):
version_data = self.get_version()
major = version_data.get("major", "?")
minor = version_data.get("minor", "?")
sub = version_data.get("sub", "?")
# set some defaults if we don't get version_data from our helper
2022-10-08 17:34:38 +00:00
version = f"{major}.{minor}.{sub}"
return str(version)
@staticmethod
def get_utc_now() -> datetime:
return datetime.fromtimestamp(time.time(), tz=timezone.utc)
def encode_pass(self, password):
return self.passhasher.hash(password)
def verify_pass(self, password, currenthash):
try:
self.passhasher.verify(currenthash, password)
return True
except:
return False
def log_colors(self, line):
# our regex replacements
# note these are in a tuple
user_keywords = self.get_setting("keywords")
replacements = [
(r"(\[.+?/INFO\])", r'<span class="mc-log-info">\1</span>'),
(r"(\[.+?/WARN\])", r'<span class="mc-log-warn">\1</span>'),
(r"(\[.+?/ERROR\])", r'<span class="mc-log-error">\1</span>'),
(r"(\[.+?/FATAL\])", r'<span class="mc-log-fatal">\1</span>'),
(
r"(\w+?\[/\d+?\.\d+?\.\d+?\.\d+?\:\d+?\])",
r'<span class="mc-log-keyword">\1</span>',
),
(r"\[(\d\d:\d\d:\d\d)\]", r'<span class="mc-log-time">[\1]</span>'),
(r"(\[.+? INFO\])", r'<span class="mc-log-info">\1</span>'),
(r"(\[.+? WARN\])", r'<span class="mc-log-warn">\1</span>'),
(r"(\[.+? ERROR\])", r'<span class="mc-log-error">\1</span>'),
(r"(\[.+? FATAL\])", r'<span class="mc-log-fatal">\1</span>'),
]
# highlight users keywords
for keyword in user_keywords:
# pylint: disable=consider-using-f-string
search_replace = (
r"({})".format(keyword),
r'<span class="mc-log-keyword">\1</span>',
)
replacements.append(search_replace)
for old, new in replacements:
line = re.sub(old, new, line, flags=re.IGNORECASE)
return line
@staticmethod
def validate_traversal(base_path, filename):
logger.debug(f'Validating traversal ("{base_path}", "{filename}")')
base = pathlib.Path(base_path).resolve()
file = pathlib.Path(filename)
fileabs = base.joinpath(file).resolve()
common_path = pathlib.Path(os.path.commonpath([base, fileabs]))
if base == common_path:
return fileabs
2022-06-14 12:40:57 +00:00
raise ValueError("Path traversal detected")
@staticmethod
def tail_file(file_name, number_lines=20):
if not Helpers.check_file_exists(file_name):
logger.warning(f"Unable to find file to tail: {file_name}")
return [f"Unable to find file to tail: {file_name}"]
# length of lines is X char here
avg_line_length = 255
# create our buffer number - number of lines * avg_line_length
line_buffer = number_lines * avg_line_length
# open our file
with open(file_name, "r", encoding="utf-8") as f:
# seek
f.seek(0, 2)
# get file size
fsize = f.tell()
# set pos @ last n chars
# (buffer from above = number of lines * avg_line_length)
f.seek(max(fsize - line_buffer, 0), 0)
# read file til the end
try:
lines = f.readlines()
except Exception as e:
logger.warning(
f"Unable to read a line in the file:{file_name} - due to error: {e}"
)
# now we are done getting the lines, let's return it
return lines
2020-08-12 00:36:09 +00:00
@staticmethod
def check_writeable(path: str):
filename = os.path.join(path, "tempfile.txt")
try:
2022-06-17 22:27:55 +00:00
with open(filename, "w", encoding="utf-8"):
pass
2020-08-12 00:36:09 +00:00
os.remove(filename)
logger.info(f"{filename} is writable")
2020-08-12 00:36:09 +00:00
return True
except Exception as e:
logger.critical(f"Unable to write to {path} - Error: {e}")
2020-08-12 00:36:09 +00:00
return False
@staticmethod
def check_root():
if Helpers.is_os_windows():
2022-06-14 12:40:57 +00:00
return ctypes.windll.shell32.IsUserAnAdmin() == 1
return os.geteuid() == 0
2021-11-29 21:22:46 +00:00
2020-08-12 00:36:09 +00:00
def ensure_logging_setup(self):
log_file = os.path.join(os.path.curdir, "logs", "commander.log")
session_log_file = os.path.join(os.path.curdir, "logs", "session.log")
2020-08-12 00:36:09 +00:00
logger.info("Checking app directory writable")
writeable = Helpers.check_writeable(self.root_dir)
2020-08-12 00:36:09 +00:00
# if not writeable, let's bomb out
if not writeable:
logger.critical(f"Unable to write to {self.root_dir} directory!")
2020-08-12 00:36:09 +00:00
sys.exit(1)
# ensure the log directory is there
2020-08-12 00:36:09 +00:00
try:
with suppress(FileExistsError):
os.makedirs(os.path.join(self.root_dir, "logs"))
2020-08-12 00:36:09 +00:00
except Exception as e:
Console.error(f"Failed to make logs directory with error: {e} ")
2020-08-12 00:36:09 +00:00
# ensure the log file is there
try:
2022-06-17 22:27:55 +00:00
with open(log_file, "a", encoding="utf-8"):
pass
2020-08-12 00:36:09 +00:00
except Exception as e:
Console.critical(f"Unable to open log file! {e}")
2020-08-12 00:36:09 +00:00
sys.exit(1)
# del any old session.lock file as this is a new session
2020-08-12 00:36:09 +00:00
try:
with contextlib.suppress(FileNotFoundError):
os.remove(session_log_file)
except Exception as e:
Console.error(f"Deleting logs/session.log failed with error: {e}")
2020-08-12 00:36:09 +00:00
2020-08-13 14:38:36 +00:00
@staticmethod
def get_time_as_string():
now = datetime.now()
return now.strftime("%m/%d/%Y, %H:%M:%S")
2022-03-13 13:26:38 +00:00
2022-03-10 04:45:49 +00:00
@staticmethod
def calc_percent(source_path, dest_path):
# calculates percentable of zip from drive. Not with compression.
# (For backups and support logs)
2022-03-10 04:45:49 +00:00
source_size = 0
files_count = 0
2022-03-10 04:45:49 +00:00
for path, _dirs, files in os.walk(source_path):
for file in files:
full_path = os.path.join(path, file)
source_size += os.stat(full_path).st_size
files_count += 1
try:
dest_size = os.path.getsize(str(dest_path))
percent = round((dest_size / source_size) * 100, 1)
except:
percent = 0
if percent >= 0:
results = {"percent": percent, "total_files": files_count}
else:
results = {"percent": 0, "total_files": files_count}
return results
2020-08-13 14:38:36 +00:00
2020-08-12 00:36:09 +00:00
@staticmethod
def check_file_exists(path: str):
logger.debug(f"Looking for path: {path}")
2020-08-12 00:36:09 +00:00
if os.path.exists(path) and os.path.isfile(path):
logger.debug(f"Found path: {path}")
2020-08-12 00:36:09 +00:00
return True
2022-06-14 12:40:57 +00:00
return False
2020-08-12 00:36:09 +00:00
@staticmethod
def human_readable_file_size(num: int, suffix="B"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
2020-08-12 00:36:09 +00:00
if abs(num) < 1024.0:
# pylint: disable=consider-using-f-string
2020-08-12 00:36:09 +00:00
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
# pylint: disable=consider-using-f-string
return "%.1f%s%s" % (num, "Y", suffix)
2020-08-12 00:36:09 +00:00
@staticmethod
2021-01-17 17:20:28 +00:00
def check_path_exists(path: str):
if not path:
return False
logger.debug(f"Looking for path: {path}")
2020-08-12 00:36:09 +00:00
if os.path.exists(path):
logger.debug(f"Found path: {path}")
2020-08-12 00:36:09 +00:00
return True
2022-06-14 12:40:57 +00:00
return False
2020-08-12 00:36:09 +00:00
2022-09-12 19:34:47 +00:00
def get_gravatar_image(self, email):
profile_url = "/static/assets/images/faces-clipart/pic-3.png"
# http://en.gravatar.com/site/implement/images/#rating
if self.get_setting("allow_nsfw_profile_pictures"):
rating = "x"
else:
rating = "g"
# Get grvatar hash for profile pictures
if self.check_internet() and email != "default@example.com" and email:
2022-09-12 19:34:47 +00:00
gravatar = libgravatar.Gravatar(libgravatar.sanitize_email(email))
url = gravatar.get_image(
size=80,
default="404",
force_default=False,
rating=rating,
filetype_extension=False,
use_ssl=True,
) # + "?d=404"
try:
2023-02-15 23:40:20 +00:00
if requests.head(url, timeout=1).status_code != 404:
2022-09-12 19:34:47 +00:00
profile_url = url
2022-09-12 19:36:13 +00:00
except Exception as e:
logger.debug(f"Could not pull resource from Gravatar with error {e}")
2022-09-12 19:34:47 +00:00
return profile_url
2020-08-12 00:36:09 +00:00
@staticmethod
def get_file_contents(path: str, lines=100):
contents = ""
2020-08-12 00:36:09 +00:00
if os.path.exists(path) and os.path.isfile(path):
try:
with open(path, "r", encoding="utf-8") as f:
for line in f.readlines()[-lines:]:
2020-08-12 00:36:09 +00:00
contents = contents + line
return contents
except Exception as e:
logger.error(f"Unable to read file: {path}. \n Error: {e}")
2020-08-12 00:36:09 +00:00
return False
else:
logger.error(
f"Unable to read file: {path}. File not found, or isn't a file."
)
2020-08-12 00:36:09 +00:00
return False
def create_session_file(self, ignore=False):
if ignore and os.path.exists(self.session_file):
os.remove(self.session_file)
if os.path.exists(self.session_file):
file_data = self.get_file_contents(self.session_file)
try:
data = json.loads(file_data)
pid = data.get("pid")
started = data.get("started")
2022-03-02 14:48:03 +00:00
if psutil.pid_exists(pid):
Console.critical(
f"Another Crafty Controller agent seems to be running..."
f"\npid: {pid} \nstarted on: {started}"
)
2022-03-02 14:48:03 +00:00
logger.critical("Found running crafty process. Exiting.")
sys.exit(1)
else:
logger.info(
"No process found for pid. Assuming "
"crafty crashed. Deleting stale session.lock"
)
2022-03-02 14:48:03 +00:00
os.remove(self.session_file)
2020-08-12 00:36:09 +00:00
except Exception as e:
logger.error(f"Failed to locate existing session.lock with error: {e} ")
Console.error(
f"Failed to locate existing session.lock with error: {e} "
)
2020-08-12 00:36:09 +00:00
2022-03-02 14:48:03 +00:00
sys.exit(1)
2020-08-12 00:36:09 +00:00
pid = os.getpid()
now = datetime.now()
session_data = {"pid": pid, "started": now.strftime("%d-%m-%Y, %H:%M:%S")}
with open(self.session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f, indent=4)
2020-08-12 00:36:09 +00:00
# because this is a recursive function, we will return bytes,
# and set human readable later
@staticmethod
def get_dir_size(path: str):
2020-08-12 00:36:09 +00:00
total = 0
for entry in os.scandir(path):
if entry.is_dir(follow_symlinks=False):
total += Helpers.get_dir_size(entry.path)
2020-08-12 00:36:09 +00:00
else:
total += entry.stat(follow_symlinks=False).st_size
return total
@staticmethod
def list_dir_by_date(path: str, reverse=False):
return [
str(p)
for p in sorted(
pathlib.Path(path).iterdir(), key=os.path.getmtime, reverse=reverse
)
]
@staticmethod
def get_human_readable_files_sizes(paths: list):
sizes = []
for p in paths:
sizes.append(
2022-04-11 10:08:36 +00:00
{
"path": p,
"size": Helpers.human_readable_file_size(os.stat(p).st_size),
}
)
return sizes
2020-08-12 00:36:09 +00:00
@staticmethod
def base64_encode_string(fun_str: str):
s_bytes = str(fun_str).encode("utf-8")
2020-08-12 00:36:09 +00:00
b64_bytes = base64.encodebytes(s_bytes)
return b64_bytes.decode("utf-8")
2020-08-12 00:36:09 +00:00
@staticmethod
def base64_decode_string(fun_str: str):
s_bytes = str(fun_str).encode("utf-8")
2020-08-12 00:36:09 +00:00
b64_bytes = base64.decodebytes(s_bytes)
return b64_bytes.decode("utf-8")
@staticmethod
def create_uuid():
2020-08-23 22:43:28 +00:00
return str(uuid.uuid4())
2020-08-12 00:36:09 +00:00
@staticmethod
def ensure_dir_exists(path):
2020-08-12 00:36:09 +00:00
"""
ensures a directory exists
Checks for the existence of a directory, if the directory isn't there,
this function creates the directory
2020-08-12 00:36:09 +00:00
Args:
path (string): the path you are checking for
"""
try:
os.makedirs(path)
logger.debug(f"Created Directory : {path}")
return True
2020-08-12 00:36:09 +00:00
# directory already exists - non-blocking error
except FileExistsError:
return True
except PermissionError as e:
logger.critical(f"Check generated exception due to permssion error: {e}")
return False
2020-08-12 00:36:09 +00:00
def create_self_signed_cert(self, cert_dir=None):
if cert_dir is None:
cert_dir = os.path.join(self.config_dir, "web", "certs")
2020-08-12 00:36:09 +00:00
# create a directory if needed
Helpers.ensure_dir_exists(cert_dir)
2020-08-12 00:36:09 +00:00
cert_file = os.path.join(cert_dir, "commander.cert.pem")
key_file = os.path.join(cert_dir, "commander.key.pem")
2020-08-12 00:36:09 +00:00
logger.info(f"SSL Cert File is set to: {cert_file}")
logger.info(f"SSL Key File is set to: {key_file}")
2020-08-12 00:36:09 +00:00
# don't create new files if we already have them.
if Helpers.check_file_exists(cert_file) and Helpers.check_file_exists(key_file):
logger.info("Cert and Key files already exists, not creating them.")
2020-08-12 00:36:09 +00:00
return True
Console.info("Generating a self signed SSL")
2020-08-12 00:36:09 +00:00
logger.info("Generating a self signed SSL")
# create a key pair
logger.info("Generating a key pair. This might take a moment.")
Console.info("Generating a key pair. This might take a moment.")
2020-08-12 00:36:09 +00:00
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 4096)
# create a self-signed cert
cert = crypto.X509()
cert.get_subject().C = "US"
cert.get_subject().ST = "Michigan"
cert.get_subject().L = "Kent County"
2020-08-12 00:36:09 +00:00
cert.get_subject().O = "Crafty Controller"
cert.get_subject().OU = "Server Ops"
cert.get_subject().CN = gethostname()
alt_names = ",".join(
[
f"DNS:{socket.gethostname()}",
f"DNS:*.{socket.gethostname()}",
"DNS:localhost",
"DNS:*.localhost",
"DNS:127.0.0.1",
]
).encode()
subject_alt_names_ext = crypto.X509Extension(
b"subjectAltName", False, alt_names
)
basic_constraints_ext = crypto.X509Extension(
b"basicConstraints", True, b"CA:false"
)
cert.add_extensions([subject_alt_names_ext, basic_constraints_ext])
cert.set_serial_number(secrets.randbelow(254) + 1)
2020-08-12 00:36:09 +00:00
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
2020-08-12 00:36:09 +00:00
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.set_version(2)
cert.sign(k, "sha256")
2020-08-12 00:36:09 +00:00
2022-06-17 22:27:55 +00:00
with open(cert_file, "w", encoding="utf-8") as cert_file_handle:
cert_file_handle.write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode()
)
2020-08-12 00:36:09 +00:00
2022-06-17 22:27:55 +00:00
with open(key_file, "w", encoding="utf-8") as key_file_handle:
key_file_handle.write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode()
)
2020-08-12 00:36:09 +00:00
@staticmethod
def random_string_generator(size=6, chars=string.ascii_uppercase + string.digits):
"""
Example Usage
random_generator() = G8sjO2
random_generator(3, abcdef) = adf
"""
return "".join(secrets.choice(chars) for x in range(size))
2020-08-12 00:36:09 +00:00
@staticmethod
def is_os_windows():
2022-06-14 12:40:57 +00:00
return os.name == "nt"
2023-02-12 17:28:37 +00:00
@staticmethod
def is_env_docker():
path = "/proc/self/cgroup"
return (
os.path.exists("/.dockerenv")
or os.path.isfile(path)
and any("docker" in line for line in open(path, encoding="utf-8"))
)
@staticmethod
def wtol_path(w_path):
l_path = w_path.replace("\\", "/")
return l_path
@staticmethod
def ltow_path(l_path):
w_path = l_path.replace("/", "\\")
return w_path
@staticmethod
def get_os_understandable_path(path):
return os.path.normpath(path)
def find_default_password(self):
default_file = os.path.join(self.root_dir, "app", "config", "default.json")
data = {}
if Helpers.check_file_exists(default_file):
with open(default_file, "r", encoding="utf-8") as f:
data = json.load(f)
del_json = self.get_setting("delete_default_json")
if del_json:
os.remove(default_file)
return data
@staticmethod
def generate_zip_tree(folder, output=""):
file_list = os.listdir(folder)
file_list = sorted(file_list, key=str.casefold)
output += f"""<ul class="tree-nested d-block" id="{folder}ul">"""
for raw_filename in file_list:
filename = html.escape(raw_filename)
rel = os.path.join(folder, raw_filename)
dpath = os.path.join(folder, filename)
if os.path.isdir(rel):
output += f"""<li class="tree-item" data-path="{dpath}">
\n<div id="{dpath}" data-path="{dpath}" data-name="{filename}" class="tree-caret tree-ctx-item tree-folder">
<input type="radio" name="root_path" value="{dpath}">
<span id="{dpath}span" class="files-tree-title" data-path="{dpath}" data-name="{filename}" onclick="getDirView(event)">
2022-10-04 17:31:37 +00:00
<i style="color: var(--info);" class="far fa-folder"></i>
<i style="color: var(--info);" class="far fa-folder-open"></i>
{filename}
</span>
</input></div><li>
\n"""
return output
@staticmethod
def generate_zip_dir(folder, output=""):
file_list = os.listdir(folder)
file_list = sorted(file_list, key=str.casefold)
output += f"""<ul class="tree-nested d-block" id="{folder}ul">"""
for raw_filename in file_list:
filename = html.escape(raw_filename)
rel = os.path.join(folder, raw_filename)
dpath = os.path.join(folder, filename)
if os.path.isdir(rel):
output += f"""<li class="tree-item" data-path="{dpath}">
\n<div id="{dpath}" data-path="{dpath}" data-name="{filename}" class="tree-caret tree-ctx-item tree-folder">
<input type="radio" name="root_path" value="{dpath}">
<span id="{dpath}span" class="files-tree-title" data-path="{dpath}" data-name="{filename}" onclick="getDirView(event)">
2022-10-04 17:31:37 +00:00
<i style="color: var(--info);" class="far fa-folder"></i>
<i style="color: var(--info);" class="far fa-folder-open"></i>
{filename}
</span>
</input></div><li>"""
return output
@staticmethod
def unzip_backup_archive(backup_path, zip_name):
zip_path = os.path.join(backup_path, zip_name)
if Helpers.check_file_perms(zip_path):
temp_dir = tempfile.mkdtemp()
with zipfile.ZipFile(zip_path, "r") as zip_ref:
# extracts archive to temp directory
zip_ref.extractall(temp_dir)
return temp_dir
2022-06-14 12:40:57 +00:00
return False
@staticmethod
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix) :]
return text
@staticmethod
2022-05-18 22:10:06 +00:00
def get_lang_page(text) -> str:
splitted = text.split("_")
if len(splitted) != 2:
return "en"
lang, region = splitted
if region == "EN":
return "en"
2022-05-18 22:10:06 +00:00
return lang + "-" + region
2023-04-15 18:47:35 +00:00
@staticmethod
2023-04-15 18:55:46 +00:00
def get_player_avatar(uuid_player):
2023-04-15 18:47:35 +00:00
mojang_response = requests.get(
2023-04-15 18:55:46 +00:00
f"https://sessionserver.mojang.com/session/minecraft/profile/{uuid_player}",
timeout=10,
2023-04-15 18:47:35 +00:00
)
if mojang_response.status_code == 200:
uuid_profile = mojang_response.json()
profile_properties = uuid_profile["properties"]
2023-04-15 18:55:46 +00:00
for prop in profile_properties:
if prop["name"] == "textures":
decoded_bytes = base64.b64decode(prop["value"])
decoded_str = decoded_bytes.decode("utf-8")
texture_json = json.loads(decoded_str)
skin_url = texture_json["textures"]["SKIN"]["url"]
skin_response = requests.get(skin_url, stream=True, timeout=10)
2023-04-15 18:47:35 +00:00
if skin_response.status_code == 200:
return base64.b64encode(skin_response.content)
else:
return