crafty-4/app/classes/steamcmd/steamcmd.py
2024-02-14 22:58:29 +00:00

274 lines
9.5 KiB
Python

import os
import platform
import zipfile
import tarfile
import subprocess
import urllib.request
import logging
from getpass import getpass
from app.classes.steamcmd.steamcmd_command import SteamCMDcommand
logger = logging.getLogger(__name__)
package_links = {
"Windows": {
"url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd.zip",
"extension": ".exe",
"d_extension": ".zip",
},
"Linux": {
"url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd_linux.tar.gz",
"extension": ".sh",
"d_extension": ".tar.gz",
},
}
class SteamCMD:
"""
Wrapper for SteamCMD
Will install from source depending on OS.
"""
_installation_path = ""
_uname = "anonymous"
_passw = ""
def __init__(self, installation_path):
self._installation_path = installation_path
if not os.path.isdir(self._installation_path):
raise NotADirectoryError(
f"No valid directory found at {self._installation_path}. "
"Please make sure that the directory is correct."
)
self._prepare_installation()
def _prepare_installation(self):
"""
Sets internal configuration according to parameters and OS
"""
self.platform = platform.system()
if self.platform not in ["Windows", "Linux"]:
raise NotImplementedError(
f"Non supported operating system. "
f"Expected Windows, or Linux, got {self.platform}"
)
self.steamcmd_url = package_links[self.platform]["url"]
self.zip = "steamcmd" + package_links[self.platform]["d_extension"]
self.exe = os.path.join(
self._installation_path,
"steamcmd" + package_links[self.platform]["extension"],
)
def _download(self):
"""
Internal method to download the SteamCMD Binaries from steams' servers.
:return: downloaded data for debug purposes
"""
try:
if self.steamcmd_url.lower().startswith("http"):
req = urllib.request.Request(self.steamcmd_url)
else:
raise ValueError from None
with urllib.request.urlopen(req) as resp:
data = resp.read()
with open(self.zip, "wb") as f:
f.write(data)
return data
except Exception as e:
raise FileNotFoundError(
f"An unknown exception occurred during downloading. {e}"
) from e
def _extract_steamcmd(self):
"""
Internal method for extracting downloaded zip file. Works on both
windows and linux.
"""
if self.platform == "Windows":
with zipfile.ZipFile(self.zip, "r") as f:
f.extractall(self._installation_path)
elif self.platform == "Linux":
with tarfile.open(self.zip, "r:gz") as f:
f.extractall(self._installation_path)
else:
# This should never happen, but let's just throw it just in case.
raise NotImplementedError(
"The operating system is not supported."
f"Expected Linux or Windows, received: {self.platform}"
)
os.remove(self.zip)
# @staticmethod
# def _print_log(*message):
# """
# Small helper function for printing log entries.
# Helps with output of subprocess.check_call not always having newlines
# :param *message: Accepts multiple messages, each will be printed on a
# new line
# """
# # TODO: Handle logs better
# print("")
# print("")
# for msg in message:
# print(msg)
# print("")
def install(self, force: bool = False):
"""
Installs steamcmd if it is not already installed to self.install_path.
:param force: forces steamcmd install regardless of its presence
:return:
"""
if not os.path.isfile(self.exe) or force:
# Steamcmd isn't installed. Go ahead and install it.
self._download()
self._extract_steamcmd()
else:
raise FileExistsError(
"Steamcmd is already installed. Reinstall is not necessary."
"Use force=True to override."
)
try:
subprocess.check_call((self.exe, "+quit"))
except subprocess.CalledProcessError as e:
if e.returncode == 7:
logger.error("SteamCMD has returned error code 7 on fresh installation")
return
raise SystemError(
f"Failed to install, check error code {e.returncode}"
) from e
return
def login(self, uname: str = None, passw: str = None):
"""
Login function in order to do a persistent login on the steam servers.
Prompts users for their credentials and spawns a child process.
:param uname: Steam Username
:param passw: Steam Password
:return: status code of child process
"""
self._uname = uname if uname else input("Please enter steam username: ")
self._passw = passw if passw else getpass("Please enter steam password: ")
steam_command = SteamCMDcommand()
return self.execute(steam_command)
def app_update(
self,
app_id: int,
install_dir: str = None,
validate: bool = None,
beta: str = None,
betapassword: str = None,
):
"""
Installer function for apps.
:param app_id: The Steam ID for the app you want to install
:param install_dir: Optional custom installation directory.
:param validate: Optional. Turn this on when updating something.
:param beta: Optional parameter for running a beta branch.
:param betapassword: Optional parameter for entering beta password.
:return: Status code of child process.
"""
steam_command = SteamCMDcommand()
if install_dir:
steam_command.force_install_dir(install_dir)
steam_command.app_update(app_id, validate, beta, betapassword)
logger.debug(
f"Downloading item {app_id}\n"
f"into {install_dir} with validate set to {validate}"
)
return self.execute(steam_command)
def workshop_update(
self,
app_id: int,
workshop_id: int,
install_dir: str = None,
validate: bool = None,
n_tries: int = 5,
):
"""
Installer function for workshop content. Retries multiple times on timeout
due to valves' timeout on large downloads.
:param app_id: The parent application ID
:param workshop_id: The ID for workshop content. Can be found in the url.
:param install_dir: Optional custom installation directory.
:param validate: Optional. Turn this on when updating something.
:param n_tries: Counter for how many redownloads it can make before timing out.
:return: Status code of child process.
"""
steam_command = SteamCMDcommand()
if install_dir:
steam_command.force_install_dir(install_dir)
steam_command.workshop_download_item(app_id, workshop_id, validate)
return self.execute(steam_command, n_tries)
def execute(self, cmd: SteamCMDcommand, n_tries: int = 1):
"""
Executes a SteamCMD_command, with added actions occurring sequentially.
May retry multiple times on timeout due to valves' timeout on large downloads.
:param cmd: Sequence of commands to execute
:param n_tries: Number of times the command will be tried.
:return: Status code of child process.
"""
if n_tries == 0:
raise TimeoutError(
"""Error executing command, max number of retries exceeded!
Consider increasing the n_tries parameter if the download is
particularly large"""
)
params = (
f'"{self.exe}"',
f"+login {self._uname} {self._passw}",
cmd.get_cmd(),
"+quit",
)
logger.debug("Parameters used: ".join(params))
try:
return subprocess.check_call(" ".join(params), shell=True)
except subprocess.CalledProcessError as e:
# SteamCMD has a habit of timing out large downloads,
# so retry on timeout for the remainder of n_tries.
if e.returncode == 10:
logger.warning(
f"Download timeout! Tries remaining: {n_tries}. Retrying..."
)
return self.execute(cmd, n_tries - 1)
# SteamCMD sometimes crashes when timing out downloads, due to
# an assert checking that the download actually finished.
# If this happens, retry.
if e.returncode == 134:
logger.error(
f"SteamCMD errored! Tries remaining: {n_tries}. Retrying..."
)
return self.execute(cmd, n_tries - 1)
# Specifically handle the case of exit code 8, insufficient disk space.
if e.returncode == 8:
raise SystemError(
"""SteamCMD was unable to run due to
insufficient disk space. Exit code was 8."""
) from e
# Handle other non-zero exit codes with a general error message.
raise SystemError(
f"SteamCMD was unable to run. Exit code was {e.returncode}."
) from e