Merge branch 'subprocess' into 'dev'

Subprocess Merge to Dev

See merge request crafty-controller/crafty-commander!84
This commit is contained in:
xithical 2021-11-16 21:57:43 +00:00
commit 055051fc2f
5 changed files with 163 additions and 123 deletions

View File

@ -45,15 +45,16 @@ class Stats:
return data
@staticmethod
def _get_process_stats(process_pid: int):
if process_pid is None:
def _get_process_stats(process):
if process is None:
process_stats = {
'cpu_usage': 0,
'memory_usage': 0,
'mem_percentage': 0
}
return process_stats
else:
process_pid = process.pid
try:
p = psutil.Process(process_pid)
dummy = p.cpu_percent()
@ -217,7 +218,7 @@ class Stats:
world_path = os.path.join(server_data.get('path', None), world_name)
# process stats
p_stats = self._get_process_stats(server_obj.PID)
p_stats = self._get_process_stats(server_obj.process)
# TODO: search server properties file for possible override of 127.0.0.1
internal_ip = server['server_ip']
@ -275,7 +276,7 @@ class Stats:
world_path = os.path.join(server_data.get('path', None), world_name)
# process stats
p_stats = self._get_process_stats(server_obj.PID)
p_stats = self._get_process_stats(server_obj.process)
# TODO: search server properties file for possible override of 127.0.0.1
#internal_ip = server['server_ip']

View File

@ -34,6 +34,11 @@ except ModuleNotFoundError as e:
sys.exit(1)
class Helpers:
allowed_quotes = [
"\"",
"'",
"`"
]
def __init__(self):
self.root_dir = os.path.abspath(os.path.curdir)
@ -99,6 +104,59 @@ class Helpers:
except Exception as err:
return False
@staticmethod
def cmdparse(cmd_in):
# Parse a string into arguments
cmd_out = [] # "argv" output array
ci = -1 # command index - pointer to the argument we're building in cmd_out
np = True # whether we're creating a new argument/parameter
esc = False # whether an escape character was encountered
stch = None # if we're dealing with a quote, save the quote type here. Nested quotes to be dealt with by the command
for c in cmd_in: # for character in string
if np == True: # if set, begin a new argument and increment the command index. Continue the loop.
if c == ' ':
continue
else:
ci += 1
cmd_out.append("")
np = False
if esc: # if we encountered an escape character on the last loop, append this char regardless of what it is
if c not in Helpers.allowed_quotes:
cmd_out[ci] += '\\'
cmd_out[ci] += c
esc = False
else:
if c == '\\': # if the current character is an escape character, set the esc flag and continue to next loop
esc = True
elif c == ' ' and stch is None: # if we encounter a space and are not dealing with a quote, set the new argument flag and continue to next loop
np = True
elif c == stch: # if we encounter the character that matches our start quote, end the quote and continue to next loop
stch = None
elif stch is None and (c in Helpers.allowed_quotes): # if we're not in the middle of a quote and we get a quotable character, start a quote and proceed to the next loop
stch = c
else: # else, just store the character in the current arg
cmd_out[ci] += c
return cmd_out
def check_for_old_logs(self, db_helper):
servers = db_helper.get_all_defined_servers()
for server in servers:
logs_path = os.path.split(server['log_path'])[0]
latest_log_file = os.path.split(server['log_path'])[1]
logs_delete_after = int(server['logs_delete_after'])
if logs_delete_after == 0:
continue
log_files = list(filter(
lambda val: val != latest_log_file,
os.listdir(logs_path)
))
for log_file in log_files:
log_file_path = os.path.join(logs_path, log_file)
if self.check_file_exists(log_file_path) and \
self.is_file_older_than_x_days(log_file_path, logs_delete_after):
os.remove(log_file_path)
def get_setting(self, key, default_return=False):
try:

View File

@ -184,38 +184,9 @@ class Controller:
console.info("All Servers Stopped")
def stop_server(self, server_id):
# get object
svr_obj = self.get_server_obj(server_id)
svr_data = self.get_server_data(server_id)
server_name = svr_data['server_name']
running = svr_obj.check_running()
# issue the stop command
svr_obj.stop_threaded_server()
# while it's running, we wait
x = 0
while running:
logger.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
console.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
running = svr_obj.check_running()
# let's keep track of how long this is going on...
x = x + 1
# if we have been waiting more than 120 seconds. let's just kill the pid
if x >= 60:
logger.error("Server {} is taking way too long to stop. Killing this process".format(server_name))
console.error("Server {} is taking way too long to stop. Killing this process".format(server_name))
svr_obj.killpid(svr_obj.PID)
running = False
# if we killed the server, let's clean up the object
if not running:
svr_obj.cleanup_server_object()
def create_jar_server(self, server: str, version: str, name: str, min_mem: int, max_mem: int, port: int):
server_id = helper.create_uuid()
server_dir = os.path.join(helper.servers_dir, server_id)

View File

@ -3,15 +3,13 @@ import sys
import re
import json
import time
import psutil
import pexpect
import datetime
import threading
import schedule
import logging.config
import zipfile
from threading import Thread
import shutil
import subprocess
import zlib
import html
@ -27,7 +25,9 @@ logger = logging.getLogger(__name__)
try:
import pexpect
import psutil
#import pexpect
import schedule
except ModuleNotFoundError as e:
logger.critical("Import Error: Unable to load {} module".format(e.name), exc_info=True)
@ -38,26 +38,42 @@ except ModuleNotFoundError as e:
class ServerOutBuf:
lines = {}
def __init__(self, p, server_id):
self.p = p
def __init__(self, proc, server_id):
self.proc = proc
self.server_id = str(server_id)
# Buffers text for virtual_terminal_lines config number of lines
self.max_lines = helper.get_setting('virtual_terminal_lines')
self.line_buffer = ''
ServerOutBuf.lines[self.server_id] = []
self.lsi = 0
def process_byte(self, char):
if char == os.linesep[self.lsi]:
self.lsi += 1
else:
self.lsi = 0
self.line_buffer += char
if self.lsi >= len(os.linesep):
self.lsi = 0
ServerOutBuf.lines[self.server_id].append(self.line_buffer)
self.new_line_handler(self.line_buffer)
self.line_buffer = ''
# Limit list length to self.max_lines:
if len(ServerOutBuf.lines[self.server_id]) > self.max_lines:
ServerOutBuf.lines[self.server_id].pop(0)
def check(self):
while self.p.isalive():
char = self.p.read(1)
if char == os.linesep:
ServerOutBuf.lines[self.server_id].append(self.line_buffer)
self.new_line_handler(self.line_buffer)
self.line_buffer = ''
# Limit list length to self.max_lines:
if len(ServerOutBuf.lines[self.server_id]) > self.max_lines:
ServerOutBuf.lines[self.server_id].pop(0)
while True:
if self.proc.poll() is None:
char = self.proc.stdout.read(1).decode('utf-8')
# TODO: we may want to benchmark reading in blocks and userspace processing it later, reads are kind of expensive as a syscall
self.process_byte(char)
else:
self.line_buffer += char
flush = self.proc.stdout.read().decode('utf-8')
for char in flush:
self.process_byte(char)
def new_line_handler(self, new_line):
new_line = re.sub('(\033\\[(0;)?[0-9]*[A-z]?(;[0-9])?m?)|(> )', '', new_line)
@ -85,7 +101,6 @@ class Server:
# holders for our process
self.process = None
self.line = False
self.PID = None
self.start_time = None
self.server_command = None
self.server_path = None
@ -142,7 +157,7 @@ class Server:
def setup_server_run_command(self):
# configure the server
server_exec_path = self.settings['executable']
self.server_command = self.settings['execution_command']
self.server_command = helper.cmdparse(self.settings['execution_command'])
self.server_path = self.settings['path']
# let's do some quick checking to make sure things actually exists
@ -179,16 +194,16 @@ class Server:
if os.name == "nt":
logger.info("Windows Detected")
self.server_command = self.server_command.replace('\\', '/')
creationflags=subprocess.CREATE_NEW_CONSOLE
else:
logger.info("Linux Detected")
logger.info("Unix Detected")
creationflags=None
logger.info("Starting server in {p} with command: {c}".format(p=self.server_path, c=self.server_command))
servers_helper.set_waiting_start(self.server_id, False)
try:
self.process = pexpect.spawn(self.server_command, cwd=self.server_path, timeout=None, encoding='utf-8')
out_buf = ServerOutBuf(self.process, self.server_id)
self.process = subprocess.Popen(self.server_command, cwd=self.server_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except Exception as ex:
msg = "Server {} failed to start with error code: {}".format(self.name, ex)
logger.error(msg)
@ -209,6 +224,8 @@ class Server:
websocket_helper.broadcast('send_start_error', {
'error': translation.translate('error', 'internet', user_lang)
})
servers_helper.set_waiting_start(self.server_id, False)
out_buf = ServerOutBuf(self.process, self.server_id)
logger.debug('Starting virtual terminal listener for server {}'.format(self.name))
threading.Thread(target=out_buf.check, daemon=True, name='{}_virtual_terminal'.format(self.server_id)).start()
@ -217,15 +234,14 @@ class Server:
self.start_time = str(datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
if psutil.pid_exists(self.process.pid):
self.PID = self.process.pid
logger.info("Server {} running with PID {}".format(self.name, self.PID))
console.info("Server {} running with PID {}".format(self.name, self.PID))
if self.process.poll() is None:
logger.info("Server {} running with PID {}".format(self.name, self.process.pid))
console.info("Server {} running with PID {}".format(self.name, self.process.pid))
self.is_crashed = False
self.stats.record_stats()
else:
logger.warning("Server PID {} died right after starting - is this a server config issue?".format(self.PID))
console.warning("Server PID {} died right after starting - is this a server config issue?".format(self.PID))
logger.warning("Server PID {} died right after starting - is this a server config issue?".format(self.process.pid))
console.warning("Server PID {} died right after starting - is this a server config issue?".format(self.process.pid))
if self.settings['crash_detection']:
logger.info("Server {} has crash detection enabled - starting watcher task".format(self.name))
@ -242,33 +258,36 @@ class Server:
def stop_server(self):
if self.settings['stop_command']:
self.send_command(self.settings['stop_command'])
running = self.check_running()
x = 0
# caching the name and pid number
server_name = self.name
server_pid = self.PID
while running:
x = x+1
logger.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
console.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
console.info("Server has {} seconds to respond before we force it down".format(int(60-(x*2))))
running = self.check_running()
time.sleep(2)
# if we haven't closed in 60 seconds, let's just slam down on the PID
if x >= 30:
logger.info("Server {} is still running - Forcing the process down".format(server_name))
console.info("Server {} is still running - Forcing the process down".format(server_name))
self.process.terminate(force=True)
logger.info("Stopped Server {} with PID {}".format(server_name, server_pid))
console.info("Stopped Server {} with PID {}".format(server_name, server_pid))
else:
self.process.terminate(force=True)
#windows will need to be handled separately for Ctrl+C
self.process.terinate()
running = self.check_running()
if not running:
logger.info("Can't stop server {} if it's not running".format(self.name))
console.info("Can't stop server {} if it's not running".format(self.name))
return
x = 0
# caching the name and pid number
server_name = self.name
server_pid = self.process.pid
while running:
x = x+1
logstr = "Server {} is still running - waiting 2s to see if it stops ({} seconds until force close)".format(server_name, int(60-(x*2)))
logger.info(logstr)
console.info(logstr)
running = self.check_running()
time.sleep(2)
# if we haven't closed in 60 seconds, let's just slam down on the PID
if x >= 30:
logger.info("Server {} is still running - Forcing the process down".format(server_name))
console.info("Server {} is still running - Forcing the process down".format(server_name))
self.kill()
logger.info("Stopped Server {} with PID {}".format(server_name, server_pid))
console.info("Stopped Server {} with PID {}".format(server_name, server_pid))
# massive resetting of variables
self.cleanup_server_object()
@ -286,7 +305,6 @@ class Server:
self.run_threaded_server(lang)
def cleanup_server_object(self):
self.PID = None
self.start_time = None
self.restart_count = 0
self.is_crashed = False
@ -294,35 +312,27 @@ class Server:
self.process = None
def check_running(self):
running = False
# if process is None, we never tried to start
if self.PID is None:
return running
try:
alive = self.process.isalive()
if type(alive) is not bool:
self.last_rc = alive
running = False
else:
running = alive
except Exception as e:
logger.error("Unable to find if server PID exists: {}".format(self.PID), exc_info=True)
pass
return running
if self.process is None:
return False
poll = self.process.poll()
if poll is None:
return True
else:
self.last_rc = poll
return False
def send_command(self, command):
console.info("COMMAND TIME: {}".format(command))
if not self.check_running() and command.lower() != 'start':
logger.warning("Server not running, unable to send command \"{}\"".format(command))
return False
logger.debug("Sending command {} to server via pexpect".format(command))
logger.debug("Sending command {} to server".format(command))
# send it
self.process.send(command + '\n')
self.process.stdin.write("{}\n".format(command).encode('utf-8'))
self.process.stdin.flush()
def crash_detected(self, name):
@ -344,18 +354,18 @@ class Server:
"The server {} has crashed, crash detection is disabled and it will not be restarted".format(name))
return False
def killpid(self, pid):
logger.info("Terminating PID {} and all child processes".format(pid))
process = psutil.Process(pid)
def kill(self):
logger.info("Terminating server {} and all child processes".format(self.server_id))
process = psutil.Process(self.process.pid)
# for every sub process...
for proc in process.children(recursive=True):
# kill all the child processes - it sounds too wrong saying kill all the children (kevdagoat: lol!)
logger.info("Sending SIGKILL to PID {}".format(proc.name))
logger.info("Sending SIGKILL to server {}".format(proc.name))
proc.kill()
# kill the main process we are after
logger.info('Sending SIGKILL to parent')
process.kill()
self.process.kill()
def get_start_time(self):
if self.check_running():
@ -364,7 +374,10 @@ class Server:
return False
def get_pid(self):
return self.PID
if self.process is not None:
return self.process.pid
else:
return None
def detect_crash(self):
@ -476,7 +489,7 @@ class Server:
#checks if server is running. Calls shutdown if it is running.
if self.check_running():
wasStarted = True
logger.info("Server with PID {} is running. Sending shutdown command".format(self.PID))
logger.info("Server with PID {} is running. Sending shutdown command".format(self.process.pid))
self.stop_threaded_server()
else:
wasStarted = False

View File

@ -202,13 +202,10 @@ class AjaxHandler(BaseHandler):
elif page == "kill":
server_id = self.get_argument('id', None)
svr = self.controller.get_server_obj(server_id)
if svr.get_pid():
try:
svr.killpid(svr.get_pid())
except Exception as e:
logger.error("Could not find PID for requested termsig. Full error: {}".format(e))
else:
logger.error("Could not find PID for requested termsig. Full error: svr.get_pid() = FALSE")
try:
svr.kill()
except Exception as e:
logger.error("Could not find PID for requested termsig. Full error: {}".format(e))
return
@tornado.web.authenticated