mirror of
synced 2024-08-30 18:23:09 +00:00
Also change order of login to remove "Please use force_install_dir before logon!" SteamCMD warning
315 lines
11 KiB
315 lines
11 KiB
import os
import re
import platform
import zipfile
import tarfile
import subprocess
import urllib.request
import logging
from getpass import getpass
from app.classes.steamcmd.steamcmd_command import SteamCMDcommand
logger = logging.getLogger(__name__)
package_links = {
"Windows": {
"url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd.zip",
"extension": ".exe",
"d_extension": ".zip",
"Linux": {
"url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd_linux.tar.gz",
"extension": ".sh",
"d_extension": ".tar.gz",
class SteamCMD:
Wrapper for SteamCMD
Will install from source depending on OS.
_installation_path = ""
_uname = "anonymous"
_passw = ""
def __init__(self, installation_path):
self._installation_path = installation_path
if not os.path.isdir(self._installation_path):
raise NotADirectoryError(
f"No valid directory found at {self._installation_path}. "
"Please make sure that the directory is correct."
def _prepare_installation(self):
Sets internal configuration according to parameters and OS
self.platform = platform.system()
if self.platform not in ["Windows", "Linux"]:
raise NotImplementedError(
f"Non supported operating system. "
f"Expected Windows, or Linux, got {self.platform}"
self.steamcmd_url = package_links[self.platform]["url"]
self.zip = "steamcmd" + package_links[self.platform]["d_extension"]
self.exe = os.path.join(
"steamcmd" + package_links[self.platform]["extension"],
def _download(self):
Internal method to download the SteamCMD Binaries from steams' servers.
:return: downloaded data for debug purposes
if self.steamcmd_url.lower().startswith("http"):
req = urllib.request.Request(self.steamcmd_url)
raise ValueError from None
with urllib.request.urlopen(req) as resp:
data = resp.read()
with open(self.zip, "wb") as f:
return data
except Exception as e:
raise FileNotFoundError(
f"An unknown exception occurred during downloading. {e}"
) from e
def _extract_steamcmd(self):
Internal method for extracting downloaded zip file. Works on both
windows and linux.
if self.platform == "Windows":
with zipfile.ZipFile(self.zip, "r") as f:
elif self.platform == "Linux":
with tarfile.open(self.zip, "r:gz") as f:
# This should never happen, but let's just throw it just in case.
raise NotImplementedError(
"The operating system is not supported."
f"Expected Linux or Windows, received: {self.platform}"
# @staticmethod
# def _print_log(*message):
# """
# Small helper function for printing log entries.
# Helps with output of subprocess.check_call not always having newlines
# :param *message: Accepts multiple messages, each will be printed on a
# new line
# """
# # TODO: Handle logs better
# print("")
# print("")
# for msg in message:
# print(msg)
# print("")
def install(self, force: bool = False):
Installs steamcmd if it is not already installed to self.install_path.
:param force: forces steamcmd install regardless of its presence
if not os.path.isfile(self.exe) or force:
# Steamcmd isn't installed. Go ahead and install it.
raise FileExistsError(
"Steamcmd is already installed. Reinstall is not necessary."
"Use force=True to override."
subprocess.check_call((self.exe, "+quit"))
except subprocess.CalledProcessError as e:
if e.returncode == 7:
logger.error("SteamCMD has returned error code 7 on fresh installation")
raise SystemError(
f"Failed to install, check error code {e.returncode}"
) from e
def login(self, uname: str = None, passw: str = None):
Login function in order to do a persistent login on the steam servers.
Prompts users for their credentials and spawns a child process.
:param uname: Steam Username
:param passw: Steam Password
:return: status code of child process
self._uname = uname if uname else input("Please enter steam username: ")
self._passw = passw if passw else getpass("Please enter steam password: ")
steam_command = SteamCMDcommand()
return self.execute(steam_command)
def app_update(
app_id: int,
install_dir: str = None,
validate: bool = None,
beta: str = None,
betapassword: str = None,
Installer function for apps.
:param app_id: The Steam ID for the app you want to install
:param install_dir: Optional custom installation directory.
:param validate: Optional. Turn this on when updating something.
:param beta: Optional parameter for running a beta branch.
:param betapassword: Optional parameter for entering beta password.
:return: Status code of child process.
steam_command = SteamCMDcommand()
if install_dir:
steam_command.custom(f"+login {self._uname} {self._passw}")
steam_command.app_update(app_id, validate, beta, betapassword)
f"Downloading item {app_id}\n"
f"into {install_dir} with validate set to {validate}"
return self.execute(steam_command)
def workshop_update(
app_id: int,
workshop_id: int,
install_dir: str = None,
validate: bool = None,
n_tries: int = 5,
Installer function for workshop content. Retries multiple times on timeout
due to valves' timeout on large downloads.
:param app_id: The parent application ID
:param workshop_id: The ID for workshop content. Can be found in the url.
:param install_dir: Optional custom installation directory.
:param validate: Optional. Turn this on when updating something.
:param n_tries: Counter for how many redownloads it can make before timing out.
:return: Status code of child process.
steam_command = SteamCMDcommand()
if install_dir:
steam_command.custom(f"+login {self._uname} {self._passw}")
steam_command.workshop_download_item(app_id, workshop_id, validate)
return self.execute(steam_command, n_tries)
def execute(self, cmd: SteamCMDcommand, n_tries: int = 1):
Executes a SteamCMD_command, with added actions occurring sequentially.
May retry multiple times on timeout due to valves' timeout on large downloads.
:param cmd: Sequence of commands to execute
:param n_tries: Number of times the command will be tried.
:return: Status code of child process.
if n_tries == 0:
raise TimeoutError(
"""Error executing command, max number of retries exceeded!
Consider increasing the n_tries parameter if the download is
particularly large"""
params = (
# f"+login {self._uname} {self._passw}", # steam isnt happy with cmd order
# if this is here
logger.debug("Parameters used: ".join(params))
return subprocess.check_call(" ".join(params), shell=True)
except subprocess.CalledProcessError as e:
# SteamCMD has a habit of timing out large downloads,
# so retry on timeout for the remainder of n_tries.
if e.returncode == 10:
f"Download timeout! Tries remaining: {n_tries}. Retrying..."
return self.execute(cmd, n_tries - 1)
# SteamCMD sometimes crashes when timing out downloads, due to
# an assert checking that the download actually finished.
# If this happens, retry.
if e.returncode == 134:
f"SteamCMD errored! Tries remaining: {n_tries}. Retrying..."
return self.execute(cmd, n_tries - 1)
# Specifically handle the case of exit code 8, insufficient disk space.
if e.returncode == 8:
raise SystemError(
"SteamCMD was unable to run due to"
"insufficient disk space. Exit code was 8."
) from e
# Handle other non-zero exit codes with a general error message.
raise SystemError(
f"SteamCMD was unable to run. Exit code was {e.returncode}."
) from e
def find_app_id(gameserver_files_path):
Searches for appmanifest file in the given directory and extracts the app ID.
This function looks for files matching the pattern 'appmanifest_*.acf' within
the specified path. It reads content of the found appmanifest file to extract
the 'appid' using a regular expression.
:param gameserver_files_path: The path to the directory containing the steamapps
folder where appmanifest file is located. It's expected to be a part of the
gameserver files directory structure.
:return: The extracted app ID as a string if found.
:raises ValueError: If the app ID could not be found in the specified directory.
app_id = ""
steamapps_path = os.path.join(gameserver_files_path, "steamapps")
# Search for appmanifest_*.acf files in the directory
for file in os.listdir(steamapps_path):
if re.match(r"appmanifest_\d+\.acf", file):
appmanifest_path = os.path.join(steamapps_path, file)
with open(appmanifest_path, "r", encoding="utf-8") as f:
content = f.read()
# Use regex to extract the appid
match = re.search(r'"appid"\s+"(\d+)"', content)
if match:
app_id = match.group(1) # Return the found appid
if app_id is None:
raise ValueError(
f"App ID could not be found in directory: {steamapps_path}"
return app_id