diff --git a/app/classes/steamcmd/SteamCMD.py b/app/classes/steamcmd/SteamCMD.py index 18364967..5fcf7d4e 100644 --- a/app/classes/steamcmd/SteamCMD.py +++ b/app/classes/steamcmd/SteamCMD.py @@ -3,8 +3,7 @@ import platform import zipfile import subprocess import urllib.request -from pysteamcmdwrapper.SteamCMD_command import SteamCMD_command -from pysteamcmdwrapper.exceptions import SteamCMDException, SteamCMDDownloadException, SteamCMDInstallException +from app.classes.steamcmd.Steam_CMD_command import SteamCMD_command from getpass import getpass @@ -12,13 +11,13 @@ package_links = { "Windows": { "url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd.zip", "extension": ".exe", - "d_extension": ".zip" + "d_extension": ".zip", }, "Linux": { "url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd_linux.tar.gz", "extension": ".sh", - "d_extension": ".tar.gz" - } + "d_extension": ".tar.gz", + }, } @@ -36,9 +35,10 @@ class SteamCMD: self._installation_path = installation_path if not os.path.isdir(self._installation_path): - raise SteamCMDInstallException(message= - f'No valid directory found at {self._installation_path}' - 'Please make sure that the directory is correct.') + raise NotADirectoryError( + message=f"No valid directory found at {self._installation_path}" + "Please make sure that the directory is correct." + ) self._prepare_installation() @@ -49,13 +49,15 @@ class SteamCMD: self.platform = platform.system() if self.platform not in ["Windows", "Linux"]: - raise SteamCMDException(message=f"Non supported operating system. Expected Windows or Linux, got {self.platform}") + raise NotImplementedError( + message=f"Non supported operating system. Expected Windows, or Linux, got {self.platform}" + ) self.steamcmd_url = package_links[self.platform]["url"] self.zip = "steamcmd" + package_links[self.platform]["d_extension"] self.exe = os.path.join( self._installation_path, - "steamcmd" + package_links[self.platform]["extension"] + "steamcmd" + package_links[self.platform]["extension"], ) def _download(self): @@ -70,33 +72,35 @@ class SteamCMD: else: raise ValueError from None with urllib.request.urlopen(req) as resp: - data = resp.read() with open(self.zip, "wb") as f: f.write(data) return data except Exception as e: - raise SteamCMDException(message=f"An unknown exception occurred during downloading. {e}") + raise Exception( + message=f"An unknown exception occurred during downloading. {e}" + ) def _extract_steamcmd(self): """ Internal method for extracting downloaded zip file. Works on both windows and linux. """ - if self.platform == 'Windows': - with zipfile.ZipFile(self.zip, 'r') as f: + if self.platform == "Windows": + with zipfile.ZipFile(self.zip, "r") as f: f.extractall(self._installation_path) - elif self.platform == 'Linux': + elif self.platform == "Linux": import tarfile - with tarfile.open(self.zip, 'r:gz') as f: + + with tarfile.open(self.zip, "r:gz") as f: f.extractall(self._installation_path) else: # This should never happen, but let's just throw it just in case. - raise SteamCMDException(message= - 'The operating system is not supported.' - f'Expected Linux or Windows, received: {self.platform}' + raise NotImplementedError( + message="The operating system is not supported." + f"Expected Linux or Windows, received: {self.platform}" ) os.remove(self.zip) @@ -128,9 +132,9 @@ class SteamCMD: self._extract_steamcmd() else: - raise SteamCMDException(message= - 'Steamcmd is already installed. Reinstall is not necessary.' - 'Use force=True to override.' + raise FileExistsError( + message="Steamcmd is already installed. Reinstall is not necessary." + "Use force=True to override." ) try: subprocess.check_call((self.exe, "+quit")) @@ -141,10 +145,13 @@ class SteamCMD: "", "Not sure why this crashed,", "long live steamcmd and it's non existent documentation..", - "It should be fine nevertheless") + "It should be fine nevertheless", + ) return else: - raise SteamCMDInstallException(message=f"Failed to install, check error code {e.returncode}") + raise Exception( + message=f"Failed to install, check error code {e.returncode}" + ) return @@ -162,8 +169,14 @@ class SteamCMD: sc = SteamCMD_command() return self.execute(sc) - def app_update(self, app_id: int, install_dir: str = None, validate: bool = None, beta: str = None, - betapassword: str = None): + def app_update( + self, + app_id: int, + install_dir: str = None, + validate: bool = None, + beta: str = None, + betapassword: str = None, + ): """ Installer function for apps. :param app_id: The Steam ID for the app you want to install @@ -179,11 +192,18 @@ class SteamCMD: sc.app_update(app_id, validate, beta, betapassword) self._print_log( f"Downloading item {app_id}", - f"into {install_dir} with validate set to {validate}") + f"into {install_dir} with validate set to {validate}", + ) return self.execute(sc) - def workshop_update(self, app_id: int, workshop_id: int, install_dir: str = None, validate: bool = None, - n_tries: int = 5): + def workshop_update( + self, + app_id: int, + workshop_id: int, + install_dir: str = None, + validate: bool = None, + n_tries: int = 5, + ): """ Installer function for workshop content. Retries multiple times on timeout due to valves' stupid timeout on large downloads. @@ -210,8 +230,8 @@ class SteamCMD: :return: Status code of child process. """ if n_tries == 0: - raise SteamCMDDownloadException(message= - """Error executing command, max number of timeout tries exceeded! + raise TimeoutError( + message="""Error executing command, max number of timeout tries exceeded! Consider increasing the n_tries parameter if the download is particularly large""" ) @@ -229,13 +249,19 @@ class SteamCMD: except subprocess.CalledProcessError as e: # SteamCMD has a habit of timing out large downloads, so retry on timeout for the remainder of n_tries. if e.returncode == 10: - self._print_log(f"Download timeout! Tries remaining: {n_tries}. Retrying...") + self._print_log( + f"Download timeout! Tries remaining: {n_tries}. Retrying..." + ) return self.execute(cmd, n_tries - 1) # SteamCMD sometimes crashes when timing out downloads, due to # an assert checking that the download actually finished. # If this happens, retry. elif e.returncode == 134: - self._print_log(f"SteamCMD errored! Tries remaining: {n_tries}. Retrying...") + self._print_log( + f"SteamCMD errored! Tries remaining: {n_tries}. Retrying..." + ) return self.execute(cmd, n_tries - 1) - raise SteamCMDException(message=f"Steamcmd was unable to run. exit code was {e.returncode}") + raise SystemError( + message=f"Steamcmd was unable to run. exit code was {e.returncode}" + )