Add SteamCMD code

Would import wrapper as package but releases do not look steadily maintained.
https://github.com/wmellema/Py-SteamCMD-Wrapper
Origional Author: wmellema <https://github.com/wmellema>
This commit is contained in:
Zedifus 2023-01-09 05:04:10 +00:00
parent 1680ce18f5
commit 4df984f562
2 changed files with 319 additions and 0 deletions

View File

@ -0,0 +1,241 @@
import os
import platform
import zipfile
import subprocess
import urllib.request
from pysteamcmdwrapper.SteamCMD_command import SteamCMD_command
from pysteamcmdwrapper.exceptions import SteamCMDException, SteamCMDDownloadException, SteamCMDInstallException
from getpass import getpass
package_links = {
"Windows": {
"url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd.zip",
"extension": ".exe",
"d_extension": ".zip"
},
"Linux": {
"url": "https://steamcdn-a.akamaihd.net/client/installer/steamcmd_linux.tar.gz",
"extension": ".sh",
"d_extension": ".tar.gz"
}
}
class SteamCMD:
"""
Wrapper for SteamCMD
Will install from source depending on OS.
"""
_installation_path = ""
_uname = "anonymous"
_passw = ""
def __init__(self, installation_path):
self._installation_path = installation_path
if not os.path.isdir(self._installation_path):
raise SteamCMDInstallException(message=
f'No valid directory found at {self._installation_path}'
'Please make sure that the directory is correct.')
self._prepare_installation()
def _prepare_installation(self):
"""
Sets internal configuration according to parameters and OS
"""
self.platform = platform.system()
if self.platform not in ["Windows", "Linux"]:
raise SteamCMDException(message=f"Non supported operating system. Expected Windows or Linux, got {self.platform}")
self.steamcmd_url = package_links[self.platform]["url"]
self.zip = "steamcmd" + package_links[self.platform]["d_extension"]
self.exe = os.path.join(
self._installation_path,
"steamcmd" + package_links[self.platform]["extension"]
)
def _download(self):
"""
Internal method to download the SteamCMD Binaries from steams' servers.
:return: downloaded data for debug purposes
"""
try:
if self.steamcmd_url.lower().startswith("http"):
req = urllib.request.Request(self.steamcmd_url)
else:
raise ValueError from None
with urllib.request.urlopen(req) as resp:
data = resp.read()
with open(self.zip, "wb") as f:
f.write(data)
return data
except Exception as e:
raise SteamCMDException(message=f"An unknown exception occurred during downloading. {e}")
def _extract_steamcmd(self):
"""
Internal method for extracting downloaded zip file. Works on both
windows and linux.
"""
if self.platform == 'Windows':
with zipfile.ZipFile(self.zip, 'r') as f:
f.extractall(self._installation_path)
elif self.platform == 'Linux':
import tarfile
with tarfile.open(self.zip, 'r:gz') as f:
f.extractall(self._installation_path)
else:
# This should never happen, but let's just throw it just in case.
raise SteamCMDException(message=
'The operating system is not supported.'
f'Expected Linux or Windows, received: {self.platform}'
)
os.remove(self.zip)
@staticmethod
def _print_log(*message):
"""
Small helper function for printing log entries.
Helps with output of subprocess.check_call not always having newlines
:param *message: Accepts multiple messages, each will be printed on a
new line
"""
# TODO: Handle logs better
print("")
print("")
for msg in message:
print(msg)
print("")
def install(self, force: bool = False):
"""
Installs steamcmd if it is not already installed to self.install_path.
:param force: forces steamcmd install regardless of its presence
:return:
"""
if not os.path.isfile(self.exe) or force:
# Steamcmd isn't installed. Go ahead and install it.
self._download()
self._extract_steamcmd()
else:
raise SteamCMDException(message=
'Steamcmd is already installed. Reinstall is not necessary.'
'Use force=True to override.'
)
try:
subprocess.check_call((self.exe, "+quit"))
except subprocess.CalledProcessError as e:
if e.returncode == 7:
self._print_log(
"SteamCMD has returned error code 7 on fresh installation",
"",
"Not sure why this crashed,",
"long live steamcmd and it's non existent documentation..",
"It should be fine nevertheless")
return
else:
raise SteamCMDInstallException(message=f"Failed to install, check error code {e.returncode}")
return
def login(self, uname: str = None, passw: str = None):
"""
Login function in order to do a persistent login on the steam servers.
Prompts users for their credentials and spawns a child process.
:param uname: Steam Username
:param passw: Steam Password
:return: status code of child process
"""
self._uname = uname if uname else input("Please enter steam username: ")
self._passw = passw if passw else getpass("Please enter steam password: ")
sc = SteamCMD_command()
return self.execute(sc)
def app_update(self, app_id: int, install_dir: str = None, validate: bool = None, beta: str = None,
betapassword: str = None):
"""
Installer function for apps.
:param app_id: The Steam ID for the app you want to install
:param install_dir: Optional custom installation directory.
:param validate: Optional parameter for validation. Turn this on when updating something.
:param beta: Optional parameter for running a beta branch.
:param betapassword: Optional parameter for entering beta password.
:return: Status code of child process.
"""
sc = SteamCMD_command()
if install_dir:
sc.force_install_dir(install_dir)
sc.app_update(app_id, validate, beta, betapassword)
self._print_log(
f"Downloading item {app_id}",
f"into {install_dir} with validate set to {validate}")
return self.execute(sc)
def workshop_update(self, app_id: int, workshop_id: int, install_dir: str = None, validate: bool = None,
n_tries: int = 5):
"""
Installer function for workshop content. Retries multiple times on timeout due to valves'
stupid timeout on large downloads.
:param app_id: The parent application ID
:param workshop_id: The ID for workshop content. Can be found in the url.
:param install_dir: Optional custom installation directory.
:param validate: Optional parameter for validation. Turn this on when updating something.
:param n_tries: Counter for how many redownloads it can make before officially timing out.
:return: Status code of child process.
"""
sc = SteamCMD_command()
if install_dir:
sc.force_install_dir(install_dir)
sc.workshop_download_item(app_id, workshop_id, validate)
return self.execute(sc, n_tries)
def execute(self, cmd: SteamCMD_command, n_tries: int = 1):
"""
Executes a SteamCMD_command, with added actions occurring sequentially.
May retry multiple times on timeout due to valves' stupid timeout on large downloads.
:param cmd: Sequence of commands to execute
:param n_tries: Number of times the command will be tried.
:return: Status code of child process.
"""
if n_tries == 0:
raise SteamCMDDownloadException(message=
"""Error executing command, max number of timeout tries exceeded!
Consider increasing the n_tries parameter if the download is
particularly large"""
)
params = (
self.exe,
f"+login {self._uname} {self._passw}",
cmd.get_cmd(),
"+quit",
)
self._print_log("Parameters used:", " ".join(params))
try:
return subprocess.check_call(" ".join(params), shell=True)
except subprocess.CalledProcessError as e:
# SteamCMD has a habit of timing out large downloads, so retry on timeout for the remainder of n_tries.
if e.returncode == 10:
self._print_log(f"Download timeout! Tries remaining: {n_tries}. Retrying...")
return self.execute(cmd, n_tries - 1)
# SteamCMD sometimes crashes when timing out downloads, due to
# an assert checking that the download actually finished.
# If this happens, retry.
elif e.returncode == 134:
self._print_log(f"SteamCMD errored! Tries remaining: {n_tries}. Retrying...")
return self.execute(cmd, n_tries - 1)
raise SteamCMDException(message=f"Steamcmd was unable to run. exit code was {e.returncode}")

View File

@ -0,0 +1,78 @@
class SteamCMD_command:
"""
Used to construct a sequence of commands to sequentially be executed by SteamCMD.
This reduces the number of required logins, which when using the other provided
methods may result in getting rate limited by Steam.
To be used with the SteamCMD.execute() method.
"""
_commands = []
def __init__(self):
self._commands = []
def force_install_dir(self, install_dir: str):
"""
Sets the install directory for following app_update and workshop_download_item commands
:param install_dir: Directory to install to
:return: Index command was added at
"""
self._commands.append('+force_install_dir "{}"'.format(install_dir))
return len(self._commands) - 1
def app_update(self, app_id: int, validate: bool = False, beta: str = '', beta_pass: str = ''):
"""
Updates/installs an app
:param app_id: The Steam ID for the app you want to install
:param validate: Optional parameter for validation. Turn this on when updating something
:param beta: Optional parameter for running a beta branch.
:param beta_pass: Optional parameter for entering beta password.
:return: Index command was added at
"""
self._commands.append('+app_update {}{}{}{}'.format(
app_id,
' validate' if validate else '',
' -beta {}'.format(beta) if beta else '',
' -betapassword {}'.format(beta_pass) if beta_pass else '',
))
return len(self._commands) - 1
def workshop_download_item(self, app_id: int, workshop_id: int, validate: bool = False):
"""
Updates/installs workshop content
:param app_id: The parent application ID
:param workshop_id: The ID for workshop content. Can be found in the url.
:param validate: Optional parameter for validation. Turn this on when updating something
:return: Index command was added at
"""
self._commands.append('+workshop_download_item {} {}{}'.format(
app_id,
workshop_id,
' validate' if validate else ''
))
return len(self._commands) - 1
def custom(self, cmd: str):
"""
Custom SteamCMD command
:param cmd: Command to execute
:return: Index command was added at
"""
self._commands.append(cmd)
return len(self._commands) - 1
def remove(self, idx):
"""
Removes a command at the stated index
:param idx: Index of command to remove
:return: Whether command was removed
"""
if 0 <= idx < len(self._commands) and self._commands[idx]:
# Replacing with None to keep indexes intact
self._commands[idx] = None
return True
else:
return False
def get_cmd(self):
params = filter(None, self._commands)
return " ".join(params)