Merge branch 'refactor/scheduled-task-queue' into 'dev'

Move Schedules to from DB to Queue Datatype

See merge request crafty-controller/crafty-4!535
This commit is contained in:
Iain Powrie 2023-01-27 19:35:23 +00:00
commit ba53e19d14
9 changed files with 153 additions and 157 deletions

View File

@ -7,8 +7,8 @@ TBD
- Fix Schedule Restore | Add Backup Config Preservation. ([Merge Request](https://gitlab.com/crafty-controller/crafty-4/-/merge_requests/533))
### Tweaks
- Added further login screen customisation settings. ([Merge Request](https://gitlab.com/crafty-controller/crafty-4/-/merge_requests/531))
- Fix Schedule Restore | Add Backup Config Preservation. ([Merge Request](https://gitlab.com/crafty-controller/crafty-4/-/merge_requests/533))
- Set backup filename to use same time as schedule. ([Merge Request](https://gitlab.com/crafty-controller/crafty-4/-/merge_requests/534))
- Move Schedules to from DB to Queue Datatype. ([Merge Request](https://gitlab.com/crafty-controller/crafty-4/-/merge_requests/535))
### Lang
TBD
<br><br>

View File

@ -1,4 +1,5 @@
import logging
import queue
from app.classes.models.management import HelpersManagement
from app.classes.models.servers import HelperServers
@ -9,6 +10,7 @@ logger = logging.getLogger(__name__)
class ManagementController:
def __init__(self, management_helper):
self.management_helper = management_helper
self.command_queue = queue.Queue()
# **********************************************************************************
# Config Methods
@ -47,9 +49,6 @@ class ManagementController:
# **********************************************************************************
# Commands Methods
# **********************************************************************************
@staticmethod
def get_unactioned_commands():
return HelpersManagement.get_unactioned_commands()
def send_command(self, user_id, server_id, remote_ip, command):
server_name = HelperServers.get_server_friendly_name(server_id)
@ -61,11 +60,12 @@ class ManagementController:
server_id,
remote_ip,
)
HelpersManagement.add_command(server_id, user_id, remote_ip, command)
self.queue_command(
{"server_id": server_id, "user_id": user_id, "command": command}
)
@staticmethod
def mark_command_complete(command_id=None):
return HelpersManagement.mark_command_complete(command_id)
def queue_command(self, command_data):
self.command_queue.put(command_data)
# **********************************************************************************
# Audit_Log Methods

View File

@ -13,7 +13,7 @@ from peewee import (
from playhouse.shortcuts import model_to_dict
from app.classes.models.base_model import BaseModel
from app.classes.models.users import Users, HelperUsers
from app.classes.models.users import HelperUsers
from app.classes.models.servers import Servers
from app.classes.models.server_permissions import PermissionsServers
from app.classes.shared.main_models import DatabaseShortcuts
@ -69,22 +69,6 @@ class HostStats(BaseModel):
table_name = "host_stats"
# **********************************************************************************
# Commands Class
# **********************************************************************************
class Commands(BaseModel):
command_id = AutoField()
created = DateTimeField(default=datetime.datetime.now)
server_id = ForeignKeyField(Servers, backref="server", index=True)
user = ForeignKeyField(Users, backref="user", index=True)
source_ip = CharField(default="127.0.0.1")
command = CharField(default="")
executed = BooleanField(default=False)
class Meta:
table_name = "commands"
# **********************************************************************************
# Webhooks Class
# **********************************************************************************
@ -151,33 +135,6 @@ class HelpersManagement:
query = HostStats.select().order_by(HostStats.id.desc()).get()
return model_to_dict(query)
# **********************************************************************************
# Commands Methods
# **********************************************************************************
@staticmethod
def add_command(server_id, user_id, remote_ip, command):
Commands.insert(
{
Commands.server_id: server_id,
Commands.user: user_id,
Commands.source_ip: remote_ip,
Commands.command: command,
}
).execute()
@staticmethod
def get_unactioned_commands():
query = Commands.select().where(Commands.executed == 0)
return query
@staticmethod
def mark_command_complete(command_id=None):
if command_id is not None:
logger.debug(f"Marking Command {command_id} completed")
Commands.update({Commands.executed: True}).where(
Commands.command_id == command_id
).execute()
# **********************************************************************************
# Audit_Log Methods
# **********************************************************************************
@ -490,9 +447,3 @@ class HelpersManagement:
f"Not removing {dir_to_del} from excluded directories - "
f"not in the excluded directory list for server ID {server_id}"
)
@staticmethod
def clear_unexecuted_commands():
Commands.update({Commands.executed: True}).where(
Commands.executed == False # pylint: disable=singleton-comparison
).execute()

View File

@ -965,10 +965,6 @@ class Controller:
# remove the server from the DB
self.servers.remove_server(server_id)
@staticmethod
def clear_unexecuted_commands():
HelpersManagement.clear_unexecuted_commands()
@staticmethod
def clear_support_status():
HelperUsers.clear_support_status()

View File

@ -91,22 +91,21 @@ class TasksManager:
def command_watcher(self):
while True:
# select any commands waiting to be processed
commands = HelpersManagement.get_unactioned_commands()
for cmd in commands:
if not self.controller.management.command_queue.empty():
cmd = self.controller.management.command_queue.get()
try:
svr = self.controller.servers.get_server_instance_by_id(
cmd.server_id.server_id
cmd["server_id"]
)
except:
logger.error(
"Server value requested does not exist! "
"Purging item from waiting commands."
)
HelpersManagement.mark_command_complete(cmd.command_id)
continue
user_id = cmd.user_id
command = cmd.command
user_id = cmd["user_id"]
command = cmd["command"]
if command == "start_server":
svr.run_threaded_server(user_id)
@ -136,8 +135,6 @@ class TasksManager:
else:
svr.send_command(command)
HelpersManagement.mark_command_complete(cmd.command_id)
time.sleep(1)
def _main_graceful_exit(self):
@ -212,16 +209,19 @@ class TasksManager:
if schedule.cron_string != "":
try:
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
CronTrigger.from_crontab(
schedule.cron_string, timezone=str(self.tz)
),
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
except Exception as e:
@ -237,45 +237,54 @@ class TasksManager:
else:
if schedule.interval_type == "hours":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute=0,
hour="*/" + str(schedule.interval),
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
elif schedule.interval_type == "minutes":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute="*/" + str(schedule.interval),
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
elif schedule.interval_type == "days":
curr_time = schedule.start_time.split(":")
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
day="*/" + str(schedule.interval),
hour=curr_time[0],
minute=curr_time[1],
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
if new_job != "error":
@ -322,16 +331,19 @@ class TasksManager:
if job_data["cron_string"] != "":
try:
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
CronTrigger.from_crontab(
job_data["cron_string"], timezone=str(self.tz)
),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
except Exception as e:
@ -345,45 +357,54 @@ class TasksManager:
else:
if job_data["interval_type"] == "hours":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute=0,
hour="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "minutes":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "days":
curr_time = job_data["start_time"].split(":")
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
day="*/" + str(job_data["interval"]),
hour=curr_time[0],
minute=curr_time[1],
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
logger.info("Added job. Current enabled schedules: ")
@ -460,16 +481,19 @@ class TasksManager:
if job_data["cron_string"] != "":
try:
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
CronTrigger.from_crontab(
job_data["cron_string"], timezone=str(self.tz)
),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
except Exception as e:
@ -480,45 +504,54 @@ class TasksManager:
else:
if job_data["interval_type"] == "hours":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute=0,
hour="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "minutes":
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
minute="*/" + str(job_data["interval"]),
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
elif job_data["interval_type"] == "days":
curr_time = job_data["start_time"].split(":")
new_job = self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"cron",
day="*/" + str(job_data["interval"]),
hour=curr_time[0],
minute=curr_time[1],
id=str(sch_id),
args=[
job_data["server_id"],
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
job_data["command"],
{
"server_id": job_data["server_id"],
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": job_data["command"],
}
],
)
if new_job != "error":
@ -579,15 +612,18 @@ class TasksManager:
seconds=schedule.delay
)
self.scheduler.add_job(
HelpersManagement.add_command,
self.controller.management.queue_command,
"date",
run_date=delaytime,
id=str(schedule.schedule_id),
args=[
schedule.server_id,
self.users_controller.get_id_by_name("system"),
"127.0.0.1",
schedule.command,
{
"server_id": schedule.server_id.server_id,
"user_id": self.users_controller.get_id_by_name(
"system"
),
"command": schedule.command,
}
],
)
else:

View File

@ -348,11 +348,6 @@ class AjaxHandler(BaseHandler):
server.backup_server()
elif page == "clear_comms":
if exec_user["superuser"]:
self.controller.clear_unexecuted_commands()
return
elif page == "select_photo":
if exec_user["superuser"]:
photo = urllib.parse.unquote(self.get_argument("photo", ""))

View File

@ -221,11 +221,6 @@
<h4 class="card-title"><i class="fas fa-user-tag"></i> {{ translate('panelConfig', 'adminControls',
data['lang']) }}</h4>
</div>
<div class="card-body">
<button type="button" class="btn btn-outline-danger clear-comm">{{ translate('panelConfig',
'clearComms', data['lang']) }}</button>
</div>
</div>
</div>
</div>
@ -483,17 +478,6 @@
});
});
$('.clear-comm').click(function () {
var token = getCookie("_xsrf")
$.ajax({
type: "POST",
headers: { 'X-XSRFToken': token },
url: '/ajax/clear_comm',
success: function (data) {
},
});
})
$('.delete-photo').click(function () {
var token = getCookie("_xsrf")
let photo = $('#photo').find(":selected").val();

View File

@ -0,0 +1,35 @@
# Generated by database migrator
import datetime
from peewee import *
from app.classes.models.users import Users
from app.classes.models.servers import Servers
def migrate(migrator, database, **kwargs):
migrator.drop_table("commands")
"""
Write your migrations here.
"""
def rollback(migrator, database, **kwargs):
db = database
class Commands(Model):
command_id = AutoField()
created = DateTimeField(default=datetime.datetime.now)
server_id = ForeignKeyField(Servers, backref="server", index=True)
user = ForeignKeyField(Users, backref="user", index=True)
source_ip = CharField(default="127.0.0.1")
command = CharField(default="")
executed = BooleanField(default=False)
class Meta:
table_name = "commands"
database = db
migrator.create_table(Commands)
"""
Write your rollback migrations here.
"""

View File

@ -213,7 +213,6 @@ if __name__ == "__main__":
Console.debug(f"Execution Mode: {running_mode}")
Console.debug(f"Application path : '{application_path}'")
controller.clear_unexecuted_commands()
controller.clear_support_status()
crafty_prompt = MainPrompt(