Protype subprocess management

This commit is contained in:
computergeek125 2021-09-25 14:29:28 -05:00
parent 4f320e69a5
commit 250b68ae51
4 changed files with 80 additions and 114 deletions

View File

@ -45,15 +45,16 @@ class Stats:
return data
@staticmethod
def _get_process_stats(process_pid: int):
if process_pid is None:
def _get_process_stats(process):
if process is None:
process_stats = {
'cpu_usage': 0,
'memory_usage': 0,
'mem_percentage': 0
}
return process_stats
else:
process_pid = process.pid
try:
p = psutil.Process(process_pid)
dummy = p.cpu_percent()
@ -217,7 +218,7 @@ class Stats:
world_path = os.path.join(server_data.get('path', None), world_name)
# process stats
p_stats = self._get_process_stats(server_obj.PID)
p_stats = self._get_process_stats(server_obj.process)
# TODO: search server properties file for possible override of 127.0.0.1
internal_ip = server['server_ip']
@ -275,7 +276,7 @@ class Stats:
world_path = os.path.join(server_data.get('path', None), world_name)
# process stats
p_stats = self._get_process_stats(server_obj.PID)
p_stats = self._get_process_stats(server_obj.process)
# TODO: search server properties file for possible override of 127.0.0.1
internal_ip = server['server_ip']

View File

@ -247,38 +247,9 @@ class Controller:
console.info("All Servers Stopped")
def stop_server(self, server_id):
# get object
svr_obj = self.get_server_obj(server_id)
svr_data = self.get_server_data(server_id)
server_name = svr_data['server_name']
running = svr_obj.check_running()
# issue the stop command
svr_obj.stop_threaded_server()
# while it's running, we wait
x = 0
while running:
logger.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
console.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
running = svr_obj.check_running()
# let's keep track of how long this is going on...
x = x + 1
# if we have been waiting more than 120 seconds. let's just kill the pid
if x >= 60:
logger.error("Server {} is taking way too long to stop. Killing this process".format(server_name))
console.error("Server {} is taking way too long to stop. Killing this process".format(server_name))
svr_obj.killpid(svr_obj.PID)
running = False
# if we killed the server, let's clean up the object
if not running:
svr_obj.cleanup_server_object()
def create_jar_server(self, server: str, version: str, name: str, min_mem: int, max_mem: int, port: int):
server_id = helper.create_uuid()
server_dir = os.path.join(helper.servers_dir, server_id)

View File

@ -3,15 +3,13 @@ import sys
import re
import json
import time
import psutil
import pexpect
import datetime
import threading
import schedule
import logging.config
import zipfile
from threading import Thread
import shutil
import subprocess
import zlib
import html
@ -26,7 +24,9 @@ logger = logging.getLogger(__name__)
try:
import pexpect
import psutil
#import pexpect
import schedule
except ModuleNotFoundError as e:
logger.critical("Import Error: Unable to load {} module".format(e.name), exc_info=True)
@ -37,8 +37,8 @@ except ModuleNotFoundError as e:
class ServerOutBuf:
lines = {}
def __init__(self, p, server_id):
self.p = p
def __init__(self, proc, server_id):
self.proc = proc
self.server_id = str(server_id)
# Buffers text for virtual_terminal_lines config number of lines
self.max_lines = helper.get_setting('virtual_terminal_lines')
@ -46,10 +46,12 @@ class ServerOutBuf:
ServerOutBuf.lines[self.server_id] = []
def check(self):
while self.p.isalive():
char = self.p.read(1)
while self.proc.poll() is None:
char = self.proc.stdout.read(1).decode('utf-8')
# TODO: we may want to benchmark reading in blocks and userspace processing it later, reads are kind of expensive as a syscall
if char == os.linesep:
ServerOutBuf.lines[self.server_id].append(self.line_buffer)
self.new_line_handler(self.line_buffer)
self.line_buffer = ''
# Limit list length to self.max_lines:
@ -84,7 +86,6 @@ class Server:
# holders for our process
self.process = None
self.line = False
self.PID = None
self.start_time = None
self.server_command = None
self.server_path = None
@ -141,7 +142,7 @@ class Server:
def setup_server_run_command(self):
# configure the server
server_exec_path = self.settings['executable']
self.server_command = self.settings['execution_command']
self.server_command = helper.cmdparse(self.settings['execution_command'])
self.server_path = self.settings['path']
# let's do some quick checking to make sure things actually exists
@ -178,13 +179,15 @@ class Server:
if os.name == "nt":
logger.info("Windows Detected")
creationflags=subprocess.CREATE_NEW_CONSOLE
self.server_command = self.server_command.replace('\\', '/')
else:
logger.info("Linux Detected")
logger.info("Unix Detected")
creationflags=None
logger.info("Starting server in {p} with command: {c}".format(p=self.server_path, c=self.server_command))
try:
self.process = pexpect.spawn(self.server_command, cwd=self.server_path, timeout=None, encoding=None)
self.process = subprocess.Popen(self.server_command, cwd=self.server_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except Exception as ex:
msg = "Server {} failed to start with error code: {}".format(self.name, ex)
logger.error(msg)
@ -205,9 +208,7 @@ class Server:
websocket_helper.broadcast('send_start_error', {
'error': translation.translate('error', 'internet')
})
db_helper.set_waiting_start(self.server_id, False)
self.process = pexpect.spawn(self.server_command, cwd=self.server_path, timeout=None, encoding='utf-8')
out_buf = ServerOutBuf(self.process, self.server_id)
logger.debug('Starting virtual terminal listener for server {}'.format(self.name))
@ -217,15 +218,14 @@ class Server:
self.start_time = str(datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
if psutil.pid_exists(self.process.pid):
self.PID = self.process.pid
logger.info("Server {} running with PID {}".format(self.name, self.PID))
console.info("Server {} running with PID {}".format(self.name, self.PID))
if self.process.poll() is None:
logger.info("Server {} running with PID {}".format(self.name, self.process.pid))
console.info("Server {} running with PID {}".format(self.name, self.process.pid))
self.is_crashed = False
self.stats.record_stats()
else:
logger.warning("Server PID {} died right after starting - is this a server config issue?".format(self.PID))
console.warning("Server PID {} died right after starting - is this a server config issue?".format(self.PID))
logger.warning("Server PID {} died right after starting - is this a server config issue?".format(self.process.pid))
console.warning("Server PID {} died right after starting - is this a server config issue?".format(self.process.pid))
if self.settings['crash_detection']:
logger.info("Server {} has crash detection enabled - starting watcher task".format(self.name))
@ -242,33 +242,36 @@ class Server:
def stop_server(self):
if self.settings['stop_command']:
self.send_command(self.settings['stop_command'])
running = self.check_running()
x = 0
# caching the name and pid number
server_name = self.name
server_pid = self.PID
while running:
x = x+1
logger.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
console.info("Server {} is still running - waiting 2s to see if it stops".format(server_name))
console.info("Server has {} seconds to respond before we force it down".format(int(60-(x*2))))
running = self.check_running()
time.sleep(2)
# if we haven't closed in 60 seconds, let's just slam down on the PID
if x >= 30:
logger.info("Server {} is still running - Forcing the process down".format(server_name))
console.info("Server {} is still running - Forcing the process down".format(server_name))
self.process.terminate(force=True)
logger.info("Stopped Server {} with PID {}".format(server_name, server_pid))
console.info("Stopped Server {} with PID {}".format(server_name, server_pid))
else:
self.process.terminate(force=True)
#windows will need to be handled separately for Ctrl+C
self.process.terinate()
running = self.check_running()
if not running:
logger.info("Can't stop server {} if it's not running".format(self.name))
console.info("Can't stop server {} if it's not running".format(self.name))
return
x = 0
# caching the name and pid number
server_name = self.name
server_pid = self.process.pid
while running:
x = x+1
logstr = "Server {} is still running - waiting 2s to see if it stops ({} seconds until force close)".format(server_name, int(60-(x*2)))
logger.info(logstr)
console.info(logstr)
running = self.check_running()
time.sleep(2)
# if we haven't closed in 60 seconds, let's just slam down on the PID
if x >= 30:
logger.info("Server {} is still running - Forcing the process down".format(server_name))
console.info("Server {} is still running - Forcing the process down".format(server_name))
self.kill()
logger.info("Stopped Server {} with PID {}".format(server_name, server_pid))
console.info("Stopped Server {} with PID {}".format(server_name, server_pid))
# massive resetting of variables
self.cleanup_server_object()
@ -286,7 +289,6 @@ class Server:
self.run_threaded_server()
def cleanup_server_object(self):
self.PID = None
self.start_time = None
self.restart_count = 0
self.is_crashed = False
@ -294,35 +296,27 @@ class Server:
self.process = None
def check_running(self):
running = False
# if process is None, we never tried to start
if self.PID is None:
return running
try:
alive = self.process.isalive()
if type(alive) is not bool:
self.last_rc = alive
running = False
else:
running = alive
except Exception as e:
logger.error("Unable to find if server PID exists: {}".format(self.PID), exc_info=True)
pass
return running
if self.process is None:
return False
poll = self.process.poll()
if poll is None:
return True
else:
self.last_rc = poll
return False
def send_command(self, command):
console.info("COMMAND TIME: {}".format(command))
if not self.check_running() and command.lower() != 'start':
logger.warning("Server not running, unable to send command \"{}\"".format(command))
return False
logger.debug("Sending command {} to server via pexpect".format(command))
logger.debug("Sending command {} to server".format(command))
# send it
self.process.send(command + '\n')
self.process.stdin.write("{}\n".format(command).encode('utf-8'))
self.process.stdin.flush()
def crash_detected(self, name):
@ -344,18 +338,18 @@ class Server:
"The server {} has crashed, crash detection is disabled and it will not be restarted".format(name))
return False
def killpid(self, pid):
logger.info("Terminating PID {} and all child processes".format(pid))
process = psutil.Process(pid)
def kill(self):
logger.info("Terminating server {} and all child processes".format(self.server_id))
process = psutil.Process(self.process.pid)
# for every sub process...
for proc in process.children(recursive=True):
# kill all the child processes - it sounds too wrong saying kill all the children (kevdagoat: lol!)
logger.info("Sending SIGKILL to PID {}".format(proc.name))
logger.info("Sending SIGKILL to server {}".format(proc.name))
proc.kill()
# kill the main process we are after
logger.info('Sending SIGKILL to parent')
process.kill()
self.process.kill()
def get_start_time(self):
if self.check_running():
@ -364,7 +358,10 @@ class Server:
return False
def get_pid(self):
return self.PID
if self.process is not None:
return self.process.pid
else:
return None
def detect_crash(self):
@ -476,7 +473,7 @@ class Server:
#checks if server is running. Calls shutdown if it is running.
if self.check_running():
wasStarted = True
logger.info("Server with PID {} is running. Sending shutdown command".format(self.PID))
logger.info("Server with PID {} is running. Sending shutdown command".format(self.process.pid))
self.stop_threaded_server()
else:
wasStarted = False

View File

@ -203,12 +203,9 @@ class AjaxHandler(BaseHandler):
elif page == "kill":
server_id = self.get_argument('id', None)
svr = self.controller.get_server_obj(server_id)
if svr.get_pid():
try:
svr.killpid(svr.get_pid())
except Exception as e:
logger.error("Could not find PID for requested termsig. Full error: {}".format(e))
else:
try:
svr.kill()
except Exception as e:
logger.error("Could not find PID for requested termsig. Full error: {}".format(e))
return