mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
536 lines
28 KiB
Python
536 lines
28 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
import threading
|
|
import asyncio
|
|
import datetime
|
|
from tzlocal import get_localzone
|
|
|
|
from app.classes.shared.helpers import helper
|
|
from app.classes.shared.console import console
|
|
|
|
from app.classes.web.tornado_handler import Webserver
|
|
from app.classes.web.websocket_helper import websocket_helper
|
|
|
|
from app.classes.minecraft.serverjars import server_jar_obj
|
|
from app.classes.models.management import management_helper
|
|
from app.classes.controllers.users_controller import Users_Controller
|
|
from app.classes.controllers.servers_controller import Servers_Controller
|
|
from app.classes.models.servers import servers_helper
|
|
from app.classes.models.users import users_helper
|
|
|
|
logger = logging.getLogger('apscheduler')
|
|
|
|
try:
|
|
from apscheduler.events import EVENT_JOB_EXECUTED
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
except ModuleNotFoundError as err:
|
|
logger.critical(f"Import Error: Unable to load {err.name} module", exc_info=True)
|
|
console.critical(f"Import Error: Unable to load {err.name} module")
|
|
sys.exit(1)
|
|
|
|
scheduler_intervals = { 'seconds',
|
|
'minutes',
|
|
'hours',
|
|
'days',
|
|
'weeks',
|
|
'monday',
|
|
'tuesday',
|
|
'wednesday',
|
|
'thursday',
|
|
'friday',
|
|
'saturday',
|
|
'sunday'
|
|
}
|
|
|
|
class TasksManager:
|
|
|
|
def __init__(self, controller):
|
|
self.controller = controller
|
|
self.tornado = Webserver(controller, self)
|
|
|
|
self.tz = get_localzone()
|
|
self.scheduler = BackgroundScheduler(timezone=str(self.tz))
|
|
|
|
self.users_controller = Users_Controller()
|
|
|
|
self.webserver_thread = threading.Thread(target=self.tornado.run_tornado, daemon=True, name='tornado_thread')
|
|
|
|
self.main_thread_exiting = False
|
|
|
|
self.schedule_thread = threading.Thread(target=self.scheduler_thread, daemon=True, name="scheduler")
|
|
|
|
self.log_watcher_thread = threading.Thread(target=self.log_watcher, daemon=True, name="log_watcher")
|
|
|
|
self.command_thread = threading.Thread(target=self.command_watcher, daemon=True, name="command_watcher")
|
|
|
|
self.realtime_thread = threading.Thread(target=self.realtime, daemon=True, name="realtime")
|
|
|
|
self.reload_schedule_from_db()
|
|
|
|
|
|
def get_main_thread_run_status(self):
|
|
return self.main_thread_exiting
|
|
|
|
def reload_schedule_from_db(self):
|
|
jobs = management_helper.get_schedules_enabled()
|
|
logger.info("Reload from DB called. Current enabled schedules: ")
|
|
for item in jobs:
|
|
logger.info(f"JOB: {item}")
|
|
|
|
def command_watcher(self):
|
|
while True:
|
|
# select any commands waiting to be processed
|
|
commands = management_helper.get_unactioned_commands()
|
|
for c in commands:
|
|
try:
|
|
svr = self.controller.get_server_obj(c.server_id)
|
|
except:
|
|
logger.error("Server value requested does note exist purging item from waiting commands.")
|
|
management_helper.mark_command_complete(c.command_id)
|
|
|
|
user_id = c.user_id
|
|
command = c.command
|
|
|
|
if command == 'start_server':
|
|
svr.run_threaded_server(user_id)
|
|
|
|
elif command == 'stop_server':
|
|
svr.stop_threaded_server()
|
|
|
|
elif command == "restart_server":
|
|
svr.restart_threaded_server(user_id)
|
|
|
|
elif command == "backup_server":
|
|
svr.backup_server()
|
|
|
|
elif command == "update_executable":
|
|
svr.jar_update()
|
|
else:
|
|
svr.send_command(command)
|
|
management_helper.mark_command_complete(c.command_id)
|
|
|
|
time.sleep(1)
|
|
|
|
def _main_graceful_exit(self):
|
|
try:
|
|
os.remove(helper.session_file)
|
|
os.remove(os.path.join(helper.root_dir, '.header'))
|
|
self.controller.stop_all_servers()
|
|
except:
|
|
logger.info("Caught error during shutdown", exc_info=True)
|
|
|
|
logger.info("***** Crafty Shutting Down *****\n\n")
|
|
console.info("***** Crafty Shutting Down *****\n\n")
|
|
self.main_thread_exiting = True
|
|
|
|
def start_webserver(self):
|
|
self.webserver_thread.start()
|
|
|
|
def reload_webserver(self):
|
|
self.tornado.stop_web_server()
|
|
console.info("Waiting 3 seconds")
|
|
time.sleep(3)
|
|
self.webserver_thread = threading.Thread(target=self.tornado.run_tornado, daemon=True, name='tornado_thread')
|
|
self.start_webserver()
|
|
|
|
def stop_webserver(self):
|
|
self.tornado.stop_web_server()
|
|
|
|
def start_scheduler(self):
|
|
logger.info("Launching Scheduler Thread...")
|
|
console.info("Launching Scheduler Thread...")
|
|
self.schedule_thread.start()
|
|
logger.info("Launching command thread...")
|
|
console.info("Launching command thread...")
|
|
self.command_thread.start()
|
|
logger.info("Launching log watcher...")
|
|
console.info("Launching log watcher...")
|
|
self.log_watcher_thread.start()
|
|
logger.info("Launching realtime thread...")
|
|
console.info("Launching realtime thread...")
|
|
self.realtime_thread.start()
|
|
|
|
def scheduler_thread(self):
|
|
schedules = management_helper.get_schedules_enabled()
|
|
self.scheduler.add_listener(self.schedule_watcher, mask=EVENT_JOB_EXECUTED)
|
|
#self.scheduler.add_job(self.scheduler.print_jobs, 'interval', seconds=10, id='-1')
|
|
|
|
#load schedules from DB
|
|
for schedule in schedules:
|
|
if schedule.interval != 'reaction':
|
|
if schedule.cron_string != "":
|
|
try:
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
CronTrigger.from_crontab(schedule.cron_string,
|
|
timezone=str(self.tz)),
|
|
id = str(schedule.schedule_id),
|
|
args = [schedule.server_id,
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
schedule.command]
|
|
)
|
|
except Exception as e:
|
|
console.error(f"Failed to schedule task with error: {e}.")
|
|
console.warning("Removing failed task from DB.")
|
|
logger.error(f"Failed to schedule task with error: {e}.")
|
|
logger.warning("Removing failed task from DB.")
|
|
#remove items from DB if task fails to add to apscheduler
|
|
management_helper.delete_scheduled_task(schedule.schedule_id)
|
|
else:
|
|
if schedule.interval_type == 'hours':
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
minute = 0,
|
|
hour = '*/'+str(schedule.interval),
|
|
id = str(schedule.schedule_id),
|
|
args = [schedule.server_id,
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
schedule.command]
|
|
)
|
|
elif schedule.interval_type == 'minutes':
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
minute = '*/'+str(schedule.interval),
|
|
id = str(schedule.schedule_id),
|
|
args = [schedule.server_id,
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
schedule.command]
|
|
)
|
|
elif schedule.interval_type == 'days':
|
|
curr_time = schedule.start_time.split(':')
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
day = '*/'+str(schedule.interval),
|
|
hour=curr_time[0],
|
|
minute=curr_time[1],
|
|
id=str(schedule.schedule_id),
|
|
args=[schedule.server_id,
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
schedule.command]
|
|
)
|
|
self.scheduler.start()
|
|
jobs = self.scheduler.get_jobs()
|
|
logger.info("Loaded schedules. Current enabled schedules: ")
|
|
for item in jobs:
|
|
logger.info(f"JOB: {item}")
|
|
|
|
def schedule_job(self, job_data):
|
|
sch_id = management_helper.create_scheduled_task(
|
|
job_data['server_id'],
|
|
job_data['action'],
|
|
job_data['interval'],
|
|
job_data['interval_type'],
|
|
job_data['start_time'],
|
|
job_data['command'],
|
|
"None",
|
|
job_data['enabled'],
|
|
job_data['one_time'],
|
|
job_data['cron_string'],
|
|
job_data['parent'],
|
|
job_data['delay'])
|
|
#Checks to make sure some doofus didn't actually make the newly created task a child of itself.
|
|
if str(job_data['parent']) == str(sch_id):
|
|
management_helper.update_scheduled_task(sch_id, {'parent':None})
|
|
#Check to see if it's enabled and is not a chain reaction.
|
|
if job_data['enabled'] and job_data['interval_type'] != 'reaction':
|
|
if job_data['cron_string'] != "":
|
|
try:
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
CronTrigger.from_crontab(job_data['cron_string'],
|
|
timezone=str(self.tz)),
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
except Exception as e:
|
|
console.error(f"Failed to schedule task with error: {e}.")
|
|
console.warning("Removing failed task from DB.")
|
|
logger.error(f"Failed to schedule task with error: {e}.")
|
|
logger.warning("Removing failed task from DB.")
|
|
#remove items from DB if task fails to add to apscheduler
|
|
management_helper.delete_scheduled_task(sch_id)
|
|
else:
|
|
if job_data['interval_type'] == 'hours':
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
minute = 0,
|
|
hour = '*/'+str(job_data['interval']),
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
elif job_data['interval_type'] == 'minutes':
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
minute = '*/'+str(job_data['interval']),
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
elif job_data['interval_type'] == 'days':
|
|
curr_time = job_data['start_time'].split(':')
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
day = '*/'+str(job_data['interval']),
|
|
hour = curr_time[0],
|
|
minute = curr_time[1],
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']],
|
|
)
|
|
logger.info("Added job. Current enabled schedules: ")
|
|
jobs = self.scheduler.get_jobs()
|
|
for item in jobs:
|
|
logger.info(f"JOB: {item}")
|
|
|
|
def remove_all_server_tasks(self, server_id):
|
|
schedules = management_helper.get_schedules_by_server(server_id)
|
|
for schedule in schedules:
|
|
if schedule.interval != 'reaction':
|
|
self.remove_job(schedule.schedule_id)
|
|
|
|
def remove_job(self, sch_id):
|
|
job = management_helper.get_scheduled_task_model(sch_id)
|
|
for schedule in management_helper.get_child_schedules(sch_id):
|
|
management_helper.update_scheduled_task(schedule.schedule_id, {'parent':None})
|
|
management_helper.delete_scheduled_task(sch_id)
|
|
if job.enabled and job.interval_type != 'reaction':
|
|
self.scheduler.remove_job(str(sch_id))
|
|
logger.info(f"Job with ID {sch_id} was deleted.")
|
|
else:
|
|
logger.info(f"Job with ID {sch_id} was deleted from DB, but was not enabled."
|
|
+ "Not going to try removing something that doesn't exist from active schedules.")
|
|
|
|
def update_job(self, sch_id, job_data):
|
|
management_helper.update_scheduled_task(sch_id, job_data)
|
|
#Checks to make sure some doofus didn't actually make the newly created task a child of itself.
|
|
if str(job_data['parent']) == str(sch_id):
|
|
management_helper.update_scheduled_task(sch_id, {'parent':None})
|
|
try:
|
|
if job_data['interval'] != 'reaction':
|
|
self.scheduler.remove_job(str(sch_id))
|
|
except:
|
|
logger.info("No job found in update job. Assuming it was previously disabled. Starting new job.")
|
|
|
|
if job_data['enabled']:
|
|
if job_data['interval'] != 'reaction':
|
|
if job_data['cron_string'] != "":
|
|
try:
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
CronTrigger.from_crontab(job_data['cron_string'],
|
|
timezone=str(self.tz)),
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
except Exception as e:
|
|
console.error(f"Failed to schedule task with error: {e}.")
|
|
console.info("Removing failed task from DB.")
|
|
management_helper.delete_scheduled_task(sch_id)
|
|
else:
|
|
if job_data['interval_type'] == 'hours':
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
minute = 0,
|
|
hour = '*/'+str(job_data['interval']),
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
elif job_data['interval_type'] == 'minutes':
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
minute = '*/'+str(job_data['interval']),
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
elif job_data['interval_type'] == 'days':
|
|
curr_time = job_data['start_time'].split(':')
|
|
self.scheduler.add_job(management_helper.add_command,
|
|
'cron',
|
|
day = '*/'+str(job_data['interval']),
|
|
hour = curr_time[0],
|
|
minute = curr_time[1],
|
|
id=str(sch_id),
|
|
args=[job_data['server_id'],
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
job_data['command']]
|
|
)
|
|
else:
|
|
try:
|
|
self.scheduler.get_job(str(sch_id))
|
|
self.scheduler.remove_job(str(sch_id))
|
|
except:
|
|
logger.info(f"APScheduler found no scheduled job on schedule update for schedule with id: {sch_id} Assuming it was already disabled.")
|
|
|
|
def schedule_watcher(self, event):
|
|
if not event.exception:
|
|
if str(event.job_id).isnumeric():
|
|
task = management_helper.get_scheduled_task_model(int(event.job_id))
|
|
management_helper.add_to_audit_log_raw('system', users_helper.get_user_id_by_name('system'), task.server_id,
|
|
f"Task with id {task.schedule_id} completed successfully", '127.0.0.1')
|
|
#check if the task is a single run.
|
|
if task.one_time:
|
|
self.remove_job(task.schedule_id)
|
|
logger.info("one time task detected. Deleting...")
|
|
#check for any child tasks for this. It's kind of backward, but this makes DB management a lot easier. One to one instead of one to many.
|
|
for schedule in management_helper.get_child_schedules_by_server(task.schedule_id, task.server_id):
|
|
#event job ID's are strings so we need to look at this as the same data type.
|
|
if str(schedule.parent) == str(event.job_id):
|
|
if schedule.enabled:
|
|
delaytime = datetime.datetime.now() + datetime.timedelta(seconds=schedule.delay)
|
|
self.scheduler.add_job(management_helper.add_command, 'date', run_date=delaytime, id=str(schedule.schedule_id),
|
|
args=[schedule.server_id,
|
|
self.users_controller.get_id_by_name('system'),
|
|
'127.0.0.1',
|
|
schedule.command])
|
|
else:
|
|
logger.info("Event job ID is not numerical. Assuming it's stats - not stored in DB. Moving on.")
|
|
else:
|
|
logger.error(f"Task failed with error: {event.exception}")
|
|
|
|
def start_stats_recording(self):
|
|
stats_update_frequency = helper.get_setting('stats_update_frequency')
|
|
logger.info(f"Stats collection frequency set to {stats_update_frequency} seconds")
|
|
console.info(f"Stats collection frequency set to {stats_update_frequency} seconds")
|
|
|
|
# one for now,
|
|
self.controller.stats.record_stats()
|
|
# one for later
|
|
self.scheduler.add_job(self.controller.stats.record_stats, 'interval', seconds=stats_update_frequency, id="stats")
|
|
|
|
|
|
def serverjar_cache_refresher(self):
|
|
logger.info("Refreshing serverjars.com cache on start")
|
|
server_jar_obj.refresh_cache()
|
|
|
|
logger.info("Scheduling Serverjars.com cache refresh service every 12 hours")
|
|
self.scheduler.add_job(server_jar_obj.refresh_cache, 'interval', hours=12, id="serverjars")
|
|
|
|
def realtime(self):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
host_stats = management_helper.get_latest_hosts_stats()
|
|
|
|
while True:
|
|
|
|
if host_stats.get('cpu_usage') != \
|
|
management_helper.get_latest_hosts_stats().get('cpu_usage') or \
|
|
host_stats.get('mem_percent') != \
|
|
management_helper.get_latest_hosts_stats().get('mem_percent'):
|
|
# Stats are different
|
|
|
|
host_stats = management_helper.get_latest_hosts_stats()
|
|
if len(websocket_helper.clients) > 0:
|
|
# There are clients
|
|
websocket_helper.broadcast_page('/panel/dashboard', 'update_host_stats', {
|
|
'cpu_usage': host_stats.get('cpu_usage'),
|
|
'cpu_cores': host_stats.get('cpu_cores'),
|
|
'cpu_cur_freq': host_stats.get('cpu_cur_freq'),
|
|
'cpu_max_freq': host_stats.get('cpu_max_freq'),
|
|
'mem_percent': host_stats.get('mem_percent'),
|
|
'mem_usage': host_stats.get('mem_usage')
|
|
})
|
|
|
|
for user in Users_Controller.get_all_users():
|
|
total_players = 0
|
|
max_players = 0
|
|
servers_ping = []
|
|
players_ping = {}
|
|
if user.superuser:
|
|
servers = Servers_Controller.get_all_servers_stats()
|
|
else:
|
|
servers = Servers_Controller.get_authorized_servers_stats(user.user_id)
|
|
for srv in servers:
|
|
if srv:
|
|
server_id = srv['server_data']['server_id']
|
|
srv['raw_ping_result'] = self.controller.stats.get_raw_server_stats(server_id)
|
|
|
|
servers_ping.append({
|
|
'id': srv['raw_ping_result'].get('id'),
|
|
'started': srv['raw_ping_result'].get('started'),
|
|
'running': srv['raw_ping_result'].get('running'),
|
|
'cpu': srv['raw_ping_result'].get('cpu'),
|
|
'mem': srv['raw_ping_result'].get('mem'),
|
|
'mem_percent': srv['raw_ping_result'].get('mem_percent'),
|
|
'world_name': srv['raw_ping_result'].get('world_name'),
|
|
'world_size': srv['raw_ping_result'].get('world_size'),
|
|
'server_port': srv['raw_ping_result'].get('server_port'),
|
|
'int_ping_results': srv['raw_ping_result'].get('int_ping_results'),
|
|
'online': srv['raw_ping_result'].get('online'),
|
|
'max': srv['raw_ping_result'].get('max'),
|
|
'players': srv['raw_ping_result'].get('players'),
|
|
'desc': srv['raw_ping_result'].get('desc'),
|
|
'version': srv['raw_ping_result'].get('version'),
|
|
'crashed': servers_helper.is_crashed(server_id),
|
|
})
|
|
if len(websocket_helper.clients) > 0:
|
|
websocket_helper.broadcast_user_page_params(
|
|
'/panel/server_detail',
|
|
{
|
|
'id': str(server_id)
|
|
}, user.user_id,
|
|
'update_server_details',
|
|
{
|
|
'id': srv['raw_ping_result'].get('id'),
|
|
'started': srv['raw_ping_result'].get('started'),
|
|
'running': srv['raw_ping_result'].get('running'),
|
|
'cpu': srv['raw_ping_result'].get('cpu'),
|
|
'mem': srv['raw_ping_result'].get('mem'),
|
|
'mem_percent': srv['raw_ping_result'].get('mem_percent'),
|
|
'world_name': srv['raw_ping_result'].get('world_name'),
|
|
'world_size': srv['raw_ping_result'].get('world_size'),
|
|
'server_port': srv['raw_ping_result'].get('server_port'),
|
|
'int_ping_results': srv['raw_ping_result'].get('int_ping_results'),
|
|
'online': srv['raw_ping_result'].get('online'),
|
|
'max': srv['raw_ping_result'].get('max'),
|
|
'players': srv['raw_ping_result'].get('players'),
|
|
'desc': srv['raw_ping_result'].get('desc'),
|
|
'version': srv['raw_ping_result'].get('version'),
|
|
'crashed': servers_helper.is_crashed(server_id),
|
|
}
|
|
)
|
|
total_players += int(srv['raw_ping_result'].get('online'))
|
|
max_players += int(srv['raw_ping_result'].get('max'))
|
|
players_ping = {
|
|
'total_players': total_players,
|
|
'max_players': max_players
|
|
}
|
|
websocket_helper.broadcast_user_page('/panel/dashboard', user.user_id, 'update_player_status', players_ping)
|
|
|
|
if (len(servers_ping) > 0) & (len(websocket_helper.clients) > 0):
|
|
try:
|
|
websocket_helper.broadcast_user_page('/panel/dashboard', user.user_id, 'update_server_status', servers_ping)
|
|
websocket_helper.broadcast_page('/status', 'update_server_status', servers_ping)
|
|
except:
|
|
console.warning("Can't broadcast server status to websocket")
|
|
time.sleep(5)
|
|
|
|
def log_watcher(self):
|
|
self.controller.servers.check_for_old_logs()
|
|
self.scheduler.add_job(self.controller.servers.check_for_old_logs, 'interval', hours=6, id="log-mgmt")
|