mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
252 lines
9.2 KiB
Python
252 lines
9.2 KiB
Python
import os
|
|
import html
|
|
import pathlib
|
|
import re
|
|
import logging
|
|
import time
|
|
import urllib.parse
|
|
import bleach
|
|
import tornado.web
|
|
import tornado.escape
|
|
|
|
from app.classes.models.server_permissions import EnumPermissionsServer
|
|
from app.classes.shared.console import Console
|
|
from app.classes.shared.helpers import Helpers
|
|
from app.classes.shared.server import ServerOutBuf
|
|
from app.classes.web.base_handler import BaseHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AjaxHandler(BaseHandler):
|
|
def render_page(self, template, page_data):
|
|
self.render(
|
|
template,
|
|
data=page_data,
|
|
translate=self.translator.translate,
|
|
)
|
|
|
|
@tornado.web.authenticated
|
|
def get(self, page):
|
|
_, _, exec_user = self.current_user
|
|
error = bleach.clean(self.get_argument("error", "WTF Error!"))
|
|
|
|
template = "panel/denied.html"
|
|
|
|
page_data = {"user_data": exec_user, "error": error}
|
|
|
|
if page == "error":
|
|
template = "public/error.html"
|
|
self.render_page(template, page_data)
|
|
|
|
elif page == "server_log":
|
|
server_id = self.get_argument("id", None)
|
|
full_log = self.get_argument("full", False)
|
|
|
|
if server_id is None:
|
|
logger.warning("Server ID not found in server_log ajax call")
|
|
self.redirect("/panel/error?error=Server ID Not Found")
|
|
return
|
|
|
|
server_id = bleach.clean(server_id)
|
|
|
|
server_data = self.controller.servers.get_server_data_by_id(server_id)
|
|
if not server_data:
|
|
logger.warning("Server Data not found in server_log ajax call")
|
|
self.redirect("/panel/error?error=Server ID Not Found")
|
|
return
|
|
|
|
if not server_data["log_path"]:
|
|
logger.warning(
|
|
f"Log path not found in server_log ajax call ({server_id})"
|
|
)
|
|
|
|
if full_log:
|
|
log_lines = self.helper.get_setting("max_log_lines")
|
|
data = Helpers.tail_file(
|
|
# If the log path is absolute it returns it as is
|
|
# If it is relative it joins the paths below like normal
|
|
pathlib.Path(server_data["path"], server_data["log_path"]),
|
|
log_lines,
|
|
)
|
|
else:
|
|
data = ServerOutBuf.lines.get(server_id, [])
|
|
|
|
for line in data:
|
|
try:
|
|
line = re.sub("(\033\\[(0;)?[0-9]*[A-z]?(;[0-9])?m?)", "", line)
|
|
line = re.sub("[A-z]{2}\b\b", "", line)
|
|
line = self.helper.log_colors(html.escape(line))
|
|
self.write(f"<span class='box'>{line}<br /></span>")
|
|
# self.write(d.encode("utf-8"))
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Skipping Log Line due to error: {e}")
|
|
|
|
elif page == "announcements":
|
|
data = Helpers.get_announcements()
|
|
page_data["notify_data"] = data
|
|
self.render_page("ajax/notify.html", page_data)
|
|
|
|
elif page == "get_zip_tree":
|
|
path = self.get_argument("path", None)
|
|
|
|
self.write(
|
|
Helpers.get_os_understandable_path(path)
|
|
+ "\n"
|
|
+ Helpers.generate_zip_tree(path)
|
|
)
|
|
self.finish()
|
|
|
|
elif page == "get_zip_dir":
|
|
path = self.get_argument("path", None)
|
|
|
|
self.write(
|
|
Helpers.get_os_understandable_path(path)
|
|
+ "\n"
|
|
+ Helpers.generate_zip_dir(path)
|
|
)
|
|
self.finish()
|
|
|
|
@tornado.web.authenticated
|
|
def post(self, page):
|
|
api_key, _, exec_user = self.current_user
|
|
superuser = exec_user["superuser"]
|
|
if api_key is not None:
|
|
superuser = superuser and api_key.superuser
|
|
|
|
server_id = self.get_argument("id", None)
|
|
|
|
if page == "send_order":
|
|
self.controller.users.update_server_order(
|
|
exec_user["user_id"], bleach.clean(self.get_argument("order"))
|
|
)
|
|
return
|
|
|
|
elif page == "select_photo":
|
|
if exec_user["superuser"]:
|
|
photo = urllib.parse.unquote(self.get_argument("photo", ""))
|
|
opacity = self.get_argument("opacity", 100)
|
|
self.controller.management.set_login_opacity(int(opacity))
|
|
if photo == "login_1.jpg":
|
|
self.controller.management.set_login_image("login_1.jpg")
|
|
self.controller.cached_login = f"{photo}"
|
|
else:
|
|
self.controller.management.set_login_image(f"custom/{photo}")
|
|
self.controller.cached_login = f"custom/{photo}"
|
|
return
|
|
|
|
elif page == "delete_photo":
|
|
if exec_user["superuser"]:
|
|
photo = urllib.parse.unquote(self.get_argument("photo", None))
|
|
if photo and photo != "login_1.jpg":
|
|
os.remove(
|
|
os.path.join(
|
|
self.controller.project_root,
|
|
f"app/frontend/static/assets/images/auth/custom/{photo}",
|
|
)
|
|
)
|
|
current = self.controller.cached_login
|
|
split = current.split("/")
|
|
if len(split) == 1:
|
|
current_photo = current
|
|
else:
|
|
current_photo = split[1]
|
|
if current_photo == photo:
|
|
self.controller.management.set_login_image("login_1.jpg")
|
|
self.controller.cached_login = "login_1.jpg"
|
|
return
|
|
|
|
elif page == "eula":
|
|
server_id = self.get_argument("id", None)
|
|
svr = self.controller.servers.get_server_instance_by_id(server_id)
|
|
svr.agree_eula(exec_user["user_id"])
|
|
|
|
elif page == "unzip_server":
|
|
path = urllib.parse.unquote(self.get_argument("path", ""))
|
|
if not path:
|
|
path = os.path.join(
|
|
self.controller.project_root,
|
|
"imports",
|
|
urllib.parse.unquote(self.get_argument("file", "")),
|
|
)
|
|
if Helpers.check_file_exists(path):
|
|
self.helper.unzip_server(path, exec_user["user_id"])
|
|
else:
|
|
user_id = exec_user["user_id"]
|
|
if user_id:
|
|
time.sleep(5)
|
|
user_lang = self.controller.users.get_user_lang_by_id(user_id)
|
|
self.helper.websocket_helper.broadcast_user(
|
|
user_id,
|
|
"send_start_error",
|
|
{
|
|
"error": self.helper.translation.translate(
|
|
"error", "no-file", user_lang
|
|
)
|
|
},
|
|
)
|
|
return
|
|
|
|
elif page == "backup_select":
|
|
path = self.get_argument("path", None)
|
|
self.helper.backup_select(path, exec_user["user_id"])
|
|
return
|
|
|
|
elif page == "jar_cache":
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Not a super user")
|
|
return
|
|
|
|
self.controller.server_jars.manual_refresh_cache()
|
|
return
|
|
|
|
elif page == "update_server_dir":
|
|
if self.helper.dir_migration:
|
|
return
|
|
for server in self.controller.servers.get_all_servers_stats():
|
|
if server["stats"]["running"]:
|
|
self.helper.websocket_helper.broadcast_user(
|
|
exec_user["user_id"],
|
|
"send_start_error",
|
|
{
|
|
"error": "You must stop all servers before "
|
|
"starting a storage migration."
|
|
},
|
|
)
|
|
return
|
|
if not superuser:
|
|
self.redirect("/panel/error?error=Not a super user")
|
|
return
|
|
if self.helper.is_env_docker():
|
|
self.redirect(
|
|
"/panel/error?error=This feature is not"
|
|
" supported on docker environments"
|
|
)
|
|
return
|
|
new_dir = urllib.parse.unquote(self.get_argument("server_dir"))
|
|
self.controller.update_master_server_dir(new_dir, exec_user["user_id"])
|
|
return
|
|
|
|
def check_server_id(self, server_id, page_name):
|
|
if server_id is None:
|
|
logger.warning(
|
|
f"Server ID not defined in {page_name} ajax call ({server_id})"
|
|
)
|
|
Console.warning(
|
|
f"Server ID not defined in {page_name} ajax call ({server_id})"
|
|
)
|
|
return
|
|
server_id = bleach.clean(server_id)
|
|
|
|
# does this server id exist?
|
|
if not self.controller.servers.server_id_exists(server_id):
|
|
logger.warning(
|
|
f"Server ID not found in {page_name} ajax call ({server_id})"
|
|
)
|
|
Console.warning(
|
|
f"Server ID not found in {page_name} ajax call ({server_id})"
|
|
)
|
|
return
|
|
return True
|