Use queues for scheduled task disapatch

This commit is contained in:
amcmanu3 2023-01-19 11:27:33 -05:00
parent d7d27b00c4
commit efd3bef029
2 changed files with 117 additions and 74 deletions

View File

@ -1,4 +1,5 @@
import logging
import queue
from app.classes.models.management import HelpersManagement
from app.classes.models.servers import HelperServers
@ -9,6 +10,7 @@ logger = logging.getLogger(__name__)
class ManagementController:
def __init__(self, management_helper):
self.management_helper = management_helper
self.command_queue = queue.Queue()
# **********************************************************************************
# Host_Stats Methods
@ -42,7 +44,12 @@ class ManagementController:
server_id,
remote_ip,
)
HelpersManagement.add_command(server_id, user_id, remote_ip, command)
self.queue_command(
{"server_id": server_id, "user_id": user_id, "command": command}
)
def queue_command(self, command_data):
self.command_queue.put(command_data)
@staticmethod
def mark_command_complete(command_id=None):

View File

@ -91,22 +91,21 @@ class TasksManager:
def command_watcher(self):
while True:
# select any commands waiting to be processed
commands = HelpersManagement.get_unactioned_commands()
for cmd in commands:
if not self.controller.management.command_queue.empty() > 0:
cmd = self.controller.management.command_queue.get()
try:
svr = self.controller.servers.get_server_instance_by_id(
cmd.server_id.server_id
cmd["server_id"]
)
except:
logger.error(
"Server value requested does not exist! "
"Purging item from waiting commands."
)
HelpersManagement.mark_command_complete(cmd.command_id)
continue
user_id = cmd.user_id
command = cmd.command
user_id = cmd["user_id"]
command = cmd["command"]
if command == "start_server":
svr.run_threaded_server(user_id)
@ -136,8 +135,6 @@ class TasksManager:
else:
svr.send_command(command)
HelpersManagement.mark_command_complete(cmd.command_id)
time.sleep(1)
def _main_graceful_exit(self):
@ -212,16 +209,19 @@ class TasksManager:
if schedule.cron_string != "":
try:
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
CronTrigger.from_crontab(
schedule.cron_string, timezone=str(self.tz)
),
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
except Exception as e:
@ -237,45 +237,54 @@ class TasksManager:
else:
if schedule.interval_type == "hours":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute=0,
hour="*/" + str(schedule.interval),
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
elif schedule.interval_type == "minutes":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute="*/" + str(schedule.interval),
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
elif schedule.interval_type == "days":
curr_time = schedule.start_time.split(":")
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
day="*/" + str(schedule.interval),
hour=curr_time[0],
minute=curr_time[1],
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
if new_job != "error":
@ -322,16 +331,19 @@ class TasksManager:
if job_data["cron_string"] != "":
try:
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
CronTrigger.from_crontab(
job_data["cron_string"], timezone=str(self.tz)
),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
except Exception as e:
@ -345,45 +357,54 @@ class TasksManager:
else:
if job_data["interval_type"] == "hours":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute=0,
hour="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "minutes":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "days":
curr_time = job_data["start_time"].split(":")
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
day="*/" + str(job_data["interval"]),
hour=curr_time[0],
minute=curr_time[1],
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
logger.info("Added job. Current enabled schedules: ")
@ -460,16 +481,19 @@ class TasksManager:
if job_data["cron_string"] != "":
try:
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
CronTrigger.from_crontab(
job_data["cron_string"], timezone=str(self.tz)
),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
except Exception as e:
@ -480,45 +504,54 @@ class TasksManager:
else:
if job_data["interval_type"] == "hours":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute=0,
hour="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "minutes":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "days":
curr_time = job_data["start_time"].split(":")
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
day="*/" + str(job_data["interval"]),
hour=curr_time[0],
minute=curr_time[1],
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
if new_job != "error":
@ -579,15 +612,18 @@ class TasksManager:
seconds=schedule.delay
)
self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"date",
run_date=delaytime,
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
else: