From 4df984f5629fc8e85cf13c21041d22be5c311138 Mon Sep 17 00:00:00 2001 From: Zedifus Date: Mon, 9 Jan 2023 05:04:10 +0000 Subject: [PATCH] Add SteamCMD code Would import wrapper as package but releases do not look steadily maintained. https://github.com/wmellema/Py-SteamCMD-Wrapper Origional Author: wmellema --- app/classes/steamcmd/SteamCMD.py | 241 ++++++++++++++++++++++ app/classes/steamcmd/Steam_CMD_command.py | 78 +++++++ 2 files changed, 319 insertions(+) create mode 100644 app/classes/steamcmd/SteamCMD.py create mode 100644 app/classes/steamcmd/Steam_CMD_command.py diff --git a/app/classes/steamcmd/SteamCMD.py b/app/classes/steamcmd/SteamCMD.py new file mode 100644 index 00000000..18364967 --- /dev/null +++ b/app/classes/steamcmd/SteamCMD.py @@ -0,0 +1,241 @@ +import os +import platform +import zipfile +import subprocess +import urllib.request +from pysteamcmdwrapper.SteamCMD_command import SteamCMD_command +from pysteamcmdwrapper.exceptions import SteamCMDException, SteamCMDDownloadException, SteamCMDInstallException + +from getpass import getpass + +package_links = { + "Windows": { + "url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd.zip", + "extension": ".exe", + "d_extension": ".zip" + }, + "Linux": { + "url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd_linux.tar.gz", + "extension": ".sh", + "d_extension": ".tar.gz" + } +} + + +class SteamCMD: + """ + Wrapper for SteamCMD + Will install from source depending on OS. + """ + + _installation_path = "" + _uname = "anonymous" + _passw = "" + + def __init__(self, installation_path): + self._installation_path = installation_path + + if not os.path.isdir(self._installation_path): + raise SteamCMDInstallException(message= + f'No valid directory found at {self._installation_path}' + 'Please make sure that the directory is correct.') + + self._prepare_installation() + + def _prepare_installation(self): + """ + Sets internal configuration according to parameters and OS + """ + + self.platform = platform.system() + if self.platform not in ["Windows", "Linux"]: + raise SteamCMDException(message=f"Non supported operating system. Expected Windows or Linux, got {self.platform}") + + self.steamcmd_url = package_links[self.platform]["url"] + self.zip = "steamcmd" + package_links[self.platform]["d_extension"] + self.exe = os.path.join( + self._installation_path, + "steamcmd" + package_links[self.platform]["extension"] + ) + + def _download(self): + """ + Internal method to download the SteamCMD Binaries from steams' servers. + :return: downloaded data for debug purposes + """ + + try: + if self.steamcmd_url.lower().startswith("http"): + req = urllib.request.Request(self.steamcmd_url) + else: + raise ValueError from None + with urllib.request.urlopen(req) as resp: + + data = resp.read() + with open(self.zip, "wb") as f: + f.write(data) + return data + except Exception as e: + raise SteamCMDException(message=f"An unknown exception occurred during downloading. {e}") + + def _extract_steamcmd(self): + """ + Internal method for extracting downloaded zip file. Works on both + windows and linux. + """ + if self.platform == 'Windows': + with zipfile.ZipFile(self.zip, 'r') as f: + f.extractall(self._installation_path) + + elif self.platform == 'Linux': + import tarfile + with tarfile.open(self.zip, 'r:gz') as f: + f.extractall(self._installation_path) + + else: + # This should never happen, but let's just throw it just in case. + raise SteamCMDException(message= + 'The operating system is not supported.' + f'Expected Linux or Windows, received: {self.platform}' + ) + + os.remove(self.zip) + + @staticmethod + def _print_log(*message): + """ + Small helper function for printing log entries. + Helps with output of subprocess.check_call not always having newlines + :param *message: Accepts multiple messages, each will be printed on a + new line + """ + # TODO: Handle logs better + print("") + print("") + for msg in message: + print(msg) + print("") + + def install(self, force: bool = False): + """ + Installs steamcmd if it is not already installed to self.install_path. + :param force: forces steamcmd install regardless of its presence + :return: + """ + if not os.path.isfile(self.exe) or force: + # Steamcmd isn't installed. Go ahead and install it. + self._download() + self._extract_steamcmd() + + else: + raise SteamCMDException(message= + 'Steamcmd is already installed. Reinstall is not necessary.' + 'Use force=True to override.' + ) + try: + subprocess.check_call((self.exe, "+quit")) + except subprocess.CalledProcessError as e: + if e.returncode == 7: + self._print_log( + "SteamCMD has returned error code 7 on fresh installation", + "", + "Not sure why this crashed,", + "long live steamcmd and it's non existent documentation..", + "It should be fine nevertheless") + return + else: + raise SteamCMDInstallException(message=f"Failed to install, check error code {e.returncode}") + + return + + def login(self, uname: str = None, passw: str = None): + """ + Login function in order to do a persistent login on the steam servers. + Prompts users for their credentials and spawns a child process. + :param uname: Steam Username + :param passw: Steam Password + :return: status code of child process + """ + self._uname = uname if uname else input("Please enter steam username: ") + self._passw = passw if passw else getpass("Please enter steam password: ") + + sc = SteamCMD_command() + return self.execute(sc) + + def app_update(self, app_id: int, install_dir: str = None, validate: bool = None, beta: str = None, + betapassword: str = None): + """ + Installer function for apps. + :param app_id: The Steam ID for the app you want to install + :param install_dir: Optional custom installation directory. + :param validate: Optional parameter for validation. Turn this on when updating something. + :param beta: Optional parameter for running a beta branch. + :param betapassword: Optional parameter for entering beta password. + :return: Status code of child process. + """ + sc = SteamCMD_command() + if install_dir: + sc.force_install_dir(install_dir) + sc.app_update(app_id, validate, beta, betapassword) + self._print_log( + f"Downloading item {app_id}", + f"into {install_dir} with validate set to {validate}") + return self.execute(sc) + + def workshop_update(self, app_id: int, workshop_id: int, install_dir: str = None, validate: bool = None, + n_tries: int = 5): + """ + Installer function for workshop content. Retries multiple times on timeout due to valves' + stupid timeout on large downloads. + :param app_id: The parent application ID + :param workshop_id: The ID for workshop content. Can be found in the url. + :param install_dir: Optional custom installation directory. + :param validate: Optional parameter for validation. Turn this on when updating something. + :param n_tries: Counter for how many redownloads it can make before officially timing out. + :return: Status code of child process. + """ + + sc = SteamCMD_command() + if install_dir: + sc.force_install_dir(install_dir) + sc.workshop_download_item(app_id, workshop_id, validate) + return self.execute(sc, n_tries) + + def execute(self, cmd: SteamCMD_command, n_tries: int = 1): + """ + Executes a SteamCMD_command, with added actions occurring sequentially. + May retry multiple times on timeout due to valves' stupid timeout on large downloads. + :param cmd: Sequence of commands to execute + :param n_tries: Number of times the command will be tried. + :return: Status code of child process. + """ + if n_tries == 0: + raise SteamCMDDownloadException(message= + """Error executing command, max number of timeout tries exceeded! + Consider increasing the n_tries parameter if the download is + particularly large""" + ) + + params = ( + self.exe, + f"+login {self._uname} {self._passw}", + cmd.get_cmd(), + "+quit", + ) + self._print_log("Parameters used:", " ".join(params)) + try: + return subprocess.check_call(" ".join(params), shell=True) + + except subprocess.CalledProcessError as e: + # SteamCMD has a habit of timing out large downloads, so retry on timeout for the remainder of n_tries. + if e.returncode == 10: + self._print_log(f"Download timeout! Tries remaining: {n_tries}. Retrying...") + return self.execute(cmd, n_tries - 1) + # SteamCMD sometimes crashes when timing out downloads, due to + # an assert checking that the download actually finished. + # If this happens, retry. + elif e.returncode == 134: + self._print_log(f"SteamCMD errored! Tries remaining: {n_tries}. Retrying...") + return self.execute(cmd, n_tries - 1) + + raise SteamCMDException(message=f"Steamcmd was unable to run. exit code was {e.returncode}") diff --git a/app/classes/steamcmd/Steam_CMD_command.py b/app/classes/steamcmd/Steam_CMD_command.py new file mode 100644 index 00000000..e3f23844 --- /dev/null +++ b/app/classes/steamcmd/Steam_CMD_command.py @@ -0,0 +1,78 @@ +class SteamCMD_command: + """ + Used to construct a sequence of commands to sequentially be executed by SteamCMD. + This reduces the number of required logins, which when using the other provided + methods may result in getting rate limited by Steam. + To be used with the SteamCMD.execute() method. + """ + _commands = [] + + def __init__(self): + self._commands = [] + + def force_install_dir(self, install_dir: str): + """ + Sets the install directory for following app_update and workshop_download_item commands + :param install_dir: Directory to install to + :return: Index command was added at + """ + self._commands.append('+force_install_dir "{}"'.format(install_dir)) + return len(self._commands) - 1 + + def app_update(self, app_id: int, validate: bool = False, beta: str = '', beta_pass: str = ''): + """ + Updates/installs an app + :param app_id: The Steam ID for the app you want to install + :param validate: Optional parameter for validation. Turn this on when updating something + :param beta: Optional parameter for running a beta branch. + :param beta_pass: Optional parameter for entering beta password. + :return: Index command was added at + """ + self._commands.append('+app_update {}{}{}{}'.format( + app_id, + ' validate' if validate else '', + ' -beta {}'.format(beta) if beta else '', + ' -betapassword {}'.format(beta_pass) if beta_pass else '', + )) + return len(self._commands) - 1 + + def workshop_download_item(self, app_id: int, workshop_id: int, validate: bool = False): + """ + Updates/installs workshop content + :param app_id: The parent application ID + :param workshop_id: The ID for workshop content. Can be found in the url. + :param validate: Optional parameter for validation. Turn this on when updating something + :return: Index command was added at + """ + self._commands.append('+workshop_download_item {} {}{}'.format( + app_id, + workshop_id, + ' validate' if validate else '' + )) + return len(self._commands) - 1 + + def custom(self, cmd: str): + """ + Custom SteamCMD command + :param cmd: Command to execute + :return: Index command was added at + """ + self._commands.append(cmd) + return len(self._commands) - 1 + + def remove(self, idx): + """ + Removes a command at the stated index + :param idx: Index of command to remove + :return: Whether command was removed + """ + if 0 <= idx < len(self._commands) and self._commands[idx]: + # Replacing with None to keep indexes intact + self._commands[idx] = None + return True + else: + return False + + def get_cmd(self): + params = filter(None, self._commands) + return " ".join(params)