From efd3bef0299d0f4341d8cdaee2d87b82af914c21 Mon Sep 17 00:00:00 2001 From: amcmanu3 Date: Thu, 19 Jan 2023 11:27:33 -0500 Subject: [PATCH] Use queues for scheduled task disapatch --- .../controllers/management_controller.py | 9 +- app/classes/shared/tasks.py | 182 +++++++++++------- 2 files changed, 117 insertions(+), 74 deletions(-) diff --git a/app/classes/controllers/management_controller.py b/app/classes/controllers/management_controller.py index 47860fe1..5caaa48a 100644 --- a/app/classes/controllers/management_controller.py +++ b/app/classes/controllers/management_controller.py @@ -1,4 +1,5 @@ import logging +import queue from app.classes.models.management import HelpersManagement from app.classes.models.servers import HelperServers @@ -9,6 +10,7 @@ logger = logging.getLogger(__name__) class ManagementController: def __init__(self, management_helper): self.management_helper = management_helper + self.command_queue = queue.Queue() # ********************************************************************************** # Host_Stats Methods @@ -42,7 +44,12 @@ class ManagementController: server_id, remote_ip, ) - HelpersManagement.add_command(server_id, user_id, remote_ip, command) + self.queue_command( + {"server_id": server_id, "user_id": user_id, "command": command} + ) + + def queue_command(self, command_data): + self.command_queue.put(command_data) @staticmethod def mark_command_complete(command_id=None): diff --git a/app/classes/shared/tasks.py b/app/classes/shared/tasks.py index 17940181..38d057b8 100644 --- a/app/classes/shared/tasks.py +++ b/app/classes/shared/tasks.py @@ -91,22 +91,21 @@ class TasksManager: def command_watcher(self): while True: # select any commands waiting to be processed - commands = HelpersManagement.get_unactioned_commands() - for cmd in commands: + if not self.controller.management.command_queue.empty() > 0: + cmd = self.controller.management.command_queue.get() try: svr = self.controller.servers.get_server_instance_by_id( - cmd.server_id.server_id + cmd["server_id"] ) except: logger.error( "Server value requested does not exist! " "Purging item from waiting commands." ) - HelpersManagement.mark_command_complete(cmd.command_id) continue - user_id = cmd.user_id - command = cmd.command + user_id = cmd["user_id"] + command = cmd["command"] if command == "start_server": svr.run_threaded_server(user_id) @@ -136,8 +135,6 @@ class TasksManager: else: svr.send_command(command) - HelpersManagement.mark_command_complete(cmd.command_id) - time.sleep(1) def _main_graceful_exit(self): @@ -212,16 +209,19 @@ class TasksManager: if schedule.cron_string != "": try: new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, CronTrigger.from_crontab( schedule.cron_string, timezone=str(self.tz) ), id=str(schedule.schedule_id), args=[ - schedule.server_id, - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - schedule.command, + { + "server_id": schedule.server_id.server_id, + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": schedule.command, + } ], ) except Exception as e: @@ -237,45 +237,54 @@ class TasksManager: else: if schedule.interval_type == "hours": new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", minute=0, hour="*/" + str(schedule.interval), id=str(schedule.schedule_id), args=[ - schedule.server_id, - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - schedule.command, + { + "server_id": schedule.server_id.server_id, + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": schedule.command, + } ], ) elif schedule.interval_type == "minutes": new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", minute="*/" + str(schedule.interval), id=str(schedule.schedule_id), args=[ - schedule.server_id, - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - schedule.command, + { + "server_id": schedule.server_id.server_id, + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": schedule.command, + } ], ) elif schedule.interval_type == "days": curr_time = schedule.start_time.split(":") new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", day="*/" + str(schedule.interval), hour=curr_time[0], minute=curr_time[1], id=str(schedule.schedule_id), args=[ - schedule.server_id, - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - schedule.command, + { + "server_id": schedule.server_id.server_id, + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": schedule.command, + } ], ) if new_job != "error": @@ -322,16 +331,19 @@ class TasksManager: if job_data["cron_string"] != "": try: new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, CronTrigger.from_crontab( job_data["cron_string"], timezone=str(self.tz) ), id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) except Exception as e: @@ -345,45 +357,54 @@ class TasksManager: else: if job_data["interval_type"] == "hours": new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", minute=0, hour="*/" + str(job_data["interval"]), id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) elif job_data["interval_type"] == "minutes": new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", minute="*/" + str(job_data["interval"]), id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) elif job_data["interval_type"] == "days": curr_time = job_data["start_time"].split(":") new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", day="*/" + str(job_data["interval"]), hour=curr_time[0], minute=curr_time[1], id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) logger.info("Added job. Current enabled schedules: ") @@ -460,16 +481,19 @@ class TasksManager: if job_data["cron_string"] != "": try: new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, CronTrigger.from_crontab( job_data["cron_string"], timezone=str(self.tz) ), id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) except Exception as e: @@ -480,45 +504,54 @@ class TasksManager: else: if job_data["interval_type"] == "hours": new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", minute=0, hour="*/" + str(job_data["interval"]), id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) elif job_data["interval_type"] == "minutes": new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", minute="*/" + str(job_data["interval"]), id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) elif job_data["interval_type"] == "days": curr_time = job_data["start_time"].split(":") new_job = self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "cron", day="*/" + str(job_data["interval"]), hour=curr_time[0], minute=curr_time[1], id=str(sch_id), args=[ - job_data["server_id"], - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - job_data["command"], + { + "server_id": job_data["server_id"], + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": job_data["command"], + } ], ) if new_job != "error": @@ -579,15 +612,18 @@ class TasksManager: seconds=schedule.delay ) self.scheduler.add_job( - HelpersManagement.add_command, + self.controller.management.queue_command, "date", run_date=delaytime, id=str(schedule.schedule_id), args=[ - schedule.server_id, - self.users_controller.get_id_by_name("system"), - "127.0.0.1", - schedule.command, + { + "server_id": schedule.server_id.server_id, + "user_id": self.users_controller.get_id_by_name( + "system" + ), + "command": schedule.command, + } ], ) else: