Use fstrings where possible

This commit is contained in:
Ivan Habunek 2024-03-28 12:06:50 +01:00
parent 77e75b5dad
commit 9bf0f1b425
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
7 changed files with 113 additions and 124 deletions

View File

@ -68,7 +68,7 @@ def _target_filename(clip):
utils.slugify(clip["title"]),
])
return "{}.{}".format(name, ext)
return f"{name}.{ext}"
def _download_clips(generator):
@ -76,10 +76,10 @@ def _download_clips(generator):
target = _target_filename(clip)
if path.exists(target):
print_out("Already downloaded: <green>{}</green>".format(target))
print_out(f"Already downloaded: <green>{target}</green>")
else:
url = get_clip_authenticated_url(clip["slug"], "source")
print_out("Downloading: <yellow>{}</yellow>".format(target))
print_out(f"Downloading: <yellow>{target}</yellow>")
download_file(url, target)
@ -113,7 +113,7 @@ def _print_paged(generator, page_size):
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Clips {}-{}</yellow>".format(first, last))
print_out(f"<yellow>Clips {first}-{last}</yellow>")
first = first + len(page)
last = first + 1

View File

@ -34,7 +34,7 @@ def download_one(video: str, args: DownloadOptions):
if clip_slug:
return _download_clip(clip_slug, args)
raise ConsoleError("Invalid input: {}".format(video))
raise ConsoleError(f"Invalid input: {video}")
def _parse_playlists(playlists_m3u8):
@ -61,7 +61,7 @@ def _get_playlist_by_name(playlists, quality):
return uri
available = ", ".join([name for (name, _, _) in playlists])
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
@ -69,9 +69,9 @@ def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
print_out("{}) <b>{}</b> <dim>({})</dim>".format(n + 1, name, resolution))
print_out(f"{n + 1}) <b>{name}</b> <dim>({resolution})</dim>")
else:
print_out("{}) <b>{}</b>".format(n + 1, name))
print_out(f"{n + 1}) <b>{name}</b>")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
@ -83,18 +83,18 @@ def _join_vods(playlist_path, target, overwrite, video):
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", f"artist={video['creator']['displayName']}",
"-metadata", f"title={video['title']}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
"file:{}".format(target),
f"file:{target}",
]
if overwrite:
command.append("-y")
print_out("<dim>{}</dim>".format(" ".join(command)))
print_out(f"<dim>{' '.join(command)}</dim>")
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
@ -122,7 +122,7 @@ def _video_target_filename(video, args: DownloadOptions):
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip, args: DownloadOptions):
@ -152,7 +152,7 @@ def _clip_target_filename(clip, args: DownloadOptions):
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
@ -197,13 +197,13 @@ def _get_clip_url(clip, quality):
return q["sourceURL"]
available = ", ".join([str(q["quality"]) for q in qualities])
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
# Ask user to select quality
print_out("\nAvailable qualities:")
for n, q in enumerate(qualities):
print_out("{}) {} [{} fps]".format(n + 1, q["quality"], q["frameRate"]))
print_out(f"{n + 1}) {q['quality']} [{q['frameRate']} fps]")
print_out()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
@ -216,7 +216,7 @@ def get_clip_authenticated_url(slug, quality):
access_token = twitch.get_clip_access_token(slug)
if not access_token:
raise ConsoleError("Access token not found for slug '{}'".format(slug))
raise ConsoleError(f"Access token not found for slug '{slug}'")
url = _get_clip_url(access_token, quality)
@ -225,26 +225,29 @@ def get_clip_authenticated_url(slug, quality):
"token": access_token["playbackAccessToken"]["value"],
})
return "{}?{}".format(url, query)
return f"{url}?{query}"
def _download_clip(slug: str, args: DownloadOptions) -> None:
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip["game"]["name"] if clip["game"] else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
raise ConsoleError(f"Clip '{slug}' not found")
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip["title"],
clip["broadcaster"]["displayName"],
game,
utils.format_duration(clip["durationSeconds"])
))
title = clip["title"]
user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"])
print_out(
f"Found: <green>{title}</green> by <yellow>{user}</yellow>, "+
f"playing <blue>{game}</blue> ({duration})"
)
target = _clip_target_filename(clip, args)
print_out("Target: <blue>{}</blue>".format(target))
print_out(f"Target: <blue>{target}</blue>")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
@ -253,17 +256,17 @@ def _download_clip(slug: str, args: DownloadOptions) -> None:
args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality)
print_out("<dim>Selected URL: {}</dim>".format(url))
print_out(f"<dim>Selected URL: {url}</dim>")
print_out("<dim>Downloading clip...</dim>")
if (args.dry_run is False):
download_file(url, target)
print_out("Downloaded: <blue>{}</blue>".format(target))
print_out(f"Downloaded: <blue>{target}</blue>")
def _download_video(video_id, args) -> None:
def _download_video(video_id, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
@ -271,13 +274,14 @@ def _download_video(video_id, args) -> None:
video = twitch.get_video(video_id)
if not video:
raise ConsoleError("Video {} not found".format(video_id))
raise ConsoleError(f"Video {video_id} not found")
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
video['title'], video['creator']['displayName']))
title = video['title']
user = video['creator']['displayName']
print_out(f"Found: <blue>{title}</blue> by <yellow>{user}</yellow>")
target = _video_target_filename(video, args)
print_out("Output: <blue>{}</blue>".format(target))
print_out(f"Output: <blue>{target}</blue>")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
@ -312,10 +316,9 @@ def _download_video(video_id, args) -> None:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text)
print_out("\nDownloading {} VODs using {} workers to {}".format(
len(vod_paths), args.max_workers, target_dir))
print_out(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, "{:05d}.ts".format(k)) for k, _ in enumerate(vod_paths)]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
# Make a modified playlist which references downloaded VODs
@ -334,22 +337,22 @@ def _download_video(video_id, args) -> None:
if args.no_join:
print_out("\n\n<dim>Skipping joining files...</dim>")
print_out("VODs downloaded to:\n<blue>{}</blue>".format(target_dir))
print_out(f"VODs downloaded to:\n<blue>{target_dir}</blue>")
return
print_out("\n\nJoining files...")
_join_vods(playlist_path, target, args.overwrite, video)
if args.keep:
print_out("\n<dim>Temporary files not deleted: {}</dim>".format(target_dir))
print_out(f"\n<dim>Temporary files not deleted: {target_dir}</dim>")
else:
print_out("\n<dim>Deleting temporary files...</dim>")
shutil.rmtree(target_dir)
print_out("\nDownloaded: <green>{}</green>".format(target))
print_out(f"\nDownloaded: <green>{target}</green>")
def _determine_time_range(video_id, args):
def _determine_time_range(video_id, args: DownloadOptions):
if args.start or args.end:
return args.start, args.end

View File

@ -11,7 +11,7 @@ def info(id: str, *, json: bool = False):
video = twitch.get_video(video_id)
if not video:
raise ConsoleError("Video {} not found".format(video_id))
raise ConsoleError(f"Video {video_id} not found")
print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id)
@ -33,7 +33,7 @@ def info(id: str, *, json: bool = False):
print_log("Fetching clip...")
clip = twitch.get_clip(clip_slug)
if not clip:
raise ConsoleError("Clip {} not found".format(clip_slug))
raise ConsoleError(f"Clip {clip_slug} not found")
if json:
print_json(clip)
@ -41,7 +41,7 @@ def info(id: str, *, json: bool = False):
clip_info(clip)
return
raise ConsoleError("Invalid input: {}".format(id))
raise ConsoleError(f"Invalid input: {id}")
def video_info(video, playlists, chapters):
@ -51,7 +51,7 @@ def video_info(video, playlists, chapters):
print_out()
print_out("Playlists:")
for p in m3u8.loads(playlists).playlists:
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
print_out(f"<b>{p.stream_info.video}</b> {p.uri}")
if chapters:
print_out()

View File

@ -56,7 +56,7 @@ def videos(
print_out()
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(1, count, total_count))
print_out(f"<yellow>Videos 1-{count} of {total_count}</yellow>")
if total_count > count:
print_out()
@ -71,10 +71,10 @@ def _get_game_ids(names):
game_ids = []
for name in names:
print_out("<dim>Looking up game '{}'...</dim>".format(name))
print_out(f"<dim>Looking up game '{name}'...</dim>")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError("Game '{}' not found".format(name))
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game_id))
return game_ids

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import json
import sys
import re
@ -66,13 +64,13 @@ def print_json(data: Any):
def print_err(*args, **kwargs):
args = ["<red>{}</red>".format(a) for a in args]
args = [f"<red>{arg}</red>" for arg in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_log(*args, **kwargs):
args = ["<dim>{}</dim>".format(a) for a in args]
args = [f"<dim>{a}</dim>" for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
@ -81,20 +79,20 @@ def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
channel = f"<blue>{video['creator']['displayName']}</blue>" if video["creator"] else ""
playing = f"playing <blue>{video['game']['name']}</blue>" if video["game"] else ""
# Can't find URL in video object, strange
url = "https://www.twitch.tv/videos/{}".format(video["id"])
url = f"https://www.twitch.tv/videos/{video['id']}"
print_out("<b>Video {}</b>".format(video["id"]))
print_out("<green>{}</green>".format(video["title"]))
print_out(f"<b>Video {video['id']}</b>")
print_out(f"<green>{video['title']}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
print_out("<i>{}</i>".format(url))
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue> ")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
@ -123,7 +121,7 @@ def print_paged_videos(generator, page_size, total_count):
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(first, last, total_count))
print_out(f"<yellow>Videos {first}-{last} of {total_count}</yellow>")
first = first + len(page)
last = first + 1
@ -138,18 +136,19 @@ def print_clip(clip):
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
playing = (
"playing <blue>{}</blue>".format(clip["game"]["name"])
f"playing <blue>{clip['game']['name']}</blue>"
if clip["game"] else ""
)
print_out("Clip <b>{}</b>".format(clip["slug"]))
print_out("<green>{}</green>".format(clip["title"]))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(f"Clip <b>{clip['slug']}</b>")
print_out(f"<green>{clip['title']}</green>")
print_out(f"<blue>{channel}</blue> {playing}")
print_out(
"Published <blue>{}</blue>"
" Length: <blue>{}</blue>"
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
print_out("<i>{}</i>".format(clip["url"]))
f"Published <blue>{published_at}</blue>" +
f" Length: <blue>{length}</blue>" +
f" Views: <blue>{clip["viewCount"]}</blue>"
)
print_out(f"<i>{clip['url']}</i>")
def _continue():

View File

@ -89,36 +89,34 @@ CLIP_FIELDS = """
"""
def get_video(video_id):
query = """
def get_video(video_id: str):
query = f"""
{{
video(id: "{video_id}") {{
{fields}
{VIDEO_FIELDS}
}}
}}
"""
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
return response["data"]["video"]
def get_clip(slug):
query = """
def get_clip(slug: str):
query = f"""
{{
clip(slug: "{}") {{
{fields}
clip(slug: "{slug}") {{
{CLIP_FIELDS}
}}
}}
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
response = gql_query(query)
return response["data"]["clip"]
def get_clip_access_token(slug):
query = """
def get_clip_access_token(slug: str):
query = f"""
{{
"operationName": "VideoAccessToken_Clip",
"variables": {{
@ -133,11 +131,11 @@ def get_clip_access_token(slug):
}}
"""
response = gql_post(query.format(slug=slug).strip())
response = gql_post(query.strip())
return response["data"]["clip"]
def get_channel_clips(channel_id, period, limit, after=None):
def get_channel_clips(channel_id: str, period: str, limit: int, after: str | None= None):
"""
List channel clips.
@ -147,10 +145,10 @@ def get_channel_clips(channel_id, period, limit, after=None):
* sorting by VIEWS_DESC and TRENDING returns the same results
* there is no totalCount
"""
query = """
query = f"""
{{
user(login: "{channel_id}") {{
clips(first: {limit}, after: "{after}", criteria: {{ period: {period}, sort: VIEWS_DESC }}) {{
clips(first: {limit}, after: "{after or ''}", criteria: {{ period: {period.upper()}, sort: VIEWS_DESC }}) {{
pageInfo {{
hasNextPage
hasPreviousPage
@ -158,7 +156,7 @@ def get_channel_clips(channel_id, period, limit, after=None):
edges {{
cursor
node {{
{fields}
{CLIP_FIELDS}
}}
}}
}}
@ -166,18 +164,10 @@ def get_channel_clips(channel_id, period, limit, after=None):
}}
"""
query = query.format(
channel_id=channel_id,
after=after if after else "",
limit=limit,
period=period.upper(),
fields=CLIP_FIELDS
)
response = gql_query(query)
user = response["data"]["user"]
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
raise ConsoleError(f"Channel {channel_id} not found")
return response["data"]["user"]["clips"]
@ -222,15 +212,24 @@ def channel_clips_generator_old(channel_id, period, limit):
break
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
query = """
def get_channel_videos(
channel_id: str,
limit: int,
sort: str,
type: str = "archive",
game_ids: list[str] | None = None,
after: str | None = None
):
game_ids = game_ids or []
query = f"""
{{
user(login: "{channel_id}") {{
videos(
first: {limit},
type: {type},
sort: {sort},
after: "{after}",
type: {type.upper()},
sort: {sort.upper()},
after: "{after or ''}",
options: {{
gameIDs: {game_ids}
}}
@ -242,7 +241,7 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
edges {{
cursor
node {{
{fields}
{VIDEO_FIELDS}
}}
}}
}}
@ -250,20 +249,10 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
}}
"""
query = query.format(
channel_id=channel_id,
game_ids=game_ids,
after=after if after else "",
limit=limit,
sort=sort.upper(),
type=type.upper(),
fields=VIDEO_FIELDS
)
response = gql_query(query)
if not response["data"]["user"]:
raise ConsoleError("Channel {} not found".format(channel_id))
raise ConsoleError(f"Channel {channel_id} not found")
return response["data"]["user"]["videos"]
@ -291,7 +280,7 @@ def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
def get_access_token(video_id, auth_token=None):
query = """
query = f"""
{{
videoPlaybackAccessToken(
id: {video_id},
@ -307,8 +296,6 @@ def get_access_token(video_id, auth_token=None):
}}
"""
query = query.format(video_id=video_id)
headers = {}
if auth_token is not None:
headers['authorization'] = f'OAuth {auth_token}'
@ -335,7 +322,7 @@ def get_playlists(video_id, access_token):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = "https://usher.ttvnw.net/vod/{}".format(video_id)
url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(url, params={
"nauth": access_token['value'],
@ -349,21 +336,21 @@ def get_playlists(video_id, access_token):
def get_game_id(name):
query = """
query = f"""
{{
game(name: "{}") {{
game(name: "{name.strip()}") {{
id
}}
}}
"""
response = gql_query(query.format(name.strip()))
response = gql_query(query)
game = response["data"]["game"]
if game:
return game["id"]
def get_video_chapters(video_id):
def get_video_chapters(video_id: str):
query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables":

View File

@ -4,9 +4,9 @@ import unicodedata
def _format_size(value: float, digits: int, unit: str):
if digits > 0:
return "{{:.{}f}}{}".format(digits, unit).format(value)
return f"{{:.{digits}f}}{unit}".format(value)
else:
return "{{:d}}{}".format(unit).format(value)
return f"{int(value)}{unit}"
def format_size(bytes_: int, digits: int = 1):
@ -32,12 +32,12 @@ def format_duration(total_seconds: int | float) -> str:
seconds = total_seconds % 60
if hours:
return "{} h {} min".format(hours, minutes)
return f"{hours} h {minutes} min"
if minutes:
return "{} min {} sec".format(minutes, seconds)
return f"{minutes} min {seconds} sec"
return "{} sec".format(seconds)
return f"{seconds} sec"
def format_time(total_seconds: int | float, force_hours: bool = False) -> str: