Replace print_out with click.echo

This commit is contained in:
Ivan Habunek 2024-03-30 15:36:53 +01:00
parent 64b88249f2
commit 9cf3ec2f07
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
4 changed files with 81 additions and 62 deletions

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import platform import platform
import click
import httpx import httpx
import m3u8 import m3u8
import os import os
@ -18,7 +19,7 @@ from twitchdl.download import download_file
from twitchdl.entities import Data, DownloadOptions from twitchdl.entities import Data, DownloadOptions
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all from twitchdl.http import download_all
from twitchdl.output import print_out from twitchdl.output import blue, bold, dim, green, print_log, print_out, yellow
def download(ids: list[str], args: DownloadOptions): def download(ids: list[str], args: DownloadOptions):
@ -67,12 +68,12 @@ def _get_playlist_by_name(playlists, quality):
def _select_playlist_interactive(playlists): def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:") click.echo("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists): for n, (name, resolution, uri) in enumerate(playlists):
if resolution: if resolution:
print_out(f"{n + 1}) <b>{name}</b> <dim>({resolution})</dim>") click.echo(f"{n + 1}) {bold(name)} {dim(f'({resolution})')}")
else: else:
print_out(f"{n + 1}) <b>{name}</b>") click.echo(f"{n + 1}) {bold(name)}")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1) no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1] _, _, uri = playlists[no - 1]
@ -99,7 +100,7 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
if overwrite: if overwrite:
command.append("-y") command.append("-y")
print_out(f"<dim>{' '.join(command)}</dim>") click.secho(f"{' '.join(command)}", dim = True)
result = subprocess.run(command) result = subprocess.run(command)
if result.returncode != 0: if result.returncode != 0:
raise ConsoleError("Joining files failed") raise ConsoleError("Joining files failed")
@ -219,10 +220,10 @@ def _get_clip_url(clip, quality):
raise ConsoleError(msg) raise ConsoleError(msg)
# Ask user to select quality # Ask user to select quality
print_out("\nAvailable qualities:") click.echo("\nAvailable qualities:")
for n, q in enumerate(qualities): for n, q in enumerate(qualities):
print_out(f"{n + 1}) {q['quality']} [{q['frameRate']} fps]") click.echo(f"{n + 1}) {bold(q['quality'])} [{q['frameRate']} fps]")
print_out() click.echo()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1) no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
selected_quality = qualities[no - 1] selected_quality = qualities[no - 1]
@ -230,7 +231,7 @@ def _get_clip_url(clip, quality):
def get_clip_authenticated_url(slug, quality): def get_clip_authenticated_url(slug, quality):
print_out("<dim>Fetching access token...</dim>") print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug) access_token = twitch.get_clip_access_token(slug)
if not access_token: if not access_token:
@ -247,79 +248,74 @@ def get_clip_authenticated_url(slug, quality):
def _download_clip(slug: str, args: DownloadOptions) -> None: def _download_clip(slug: str, args: DownloadOptions) -> None:
print_out("<dim>Looking up clip...</dim>") print_log("Looking up clip...")
clip = twitch.get_clip(slug) clip = twitch.get_clip(slug)
if not clip: if not clip:
raise ConsoleError(f"Clip '{slug}' not found") raise ConsoleError(f"Clip '{slug}' not found")
title = clip["title"] title = clip["title"]
user = clip["broadcaster"]["displayName"] user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown" game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"]) duration = utils.format_duration(clip["durationSeconds"])
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
print_out(
f"Found: <green>{title}</green> by <yellow>{user}</yellow>, "+
f"playing <blue>{game}</blue> ({duration})"
)
target = _clip_target_filename(clip, args) target = _clip_target_filename(clip, args)
print_out(f"Target: <blue>{target}</blue>") click.echo(f"Target: {blue(target)}")
if not args.overwrite and path.exists(target): if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ") response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() not in ["", "y"]: if response.lower().strip() != "y":
raise ConsoleError("Aborted") raise click.Abort()
args.overwrite = True args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality) url = get_clip_authenticated_url(slug, args.quality)
print_out(f"<dim>Selected URL: {url}</dim>") print_log(f"Selected URL: {url}")
print_out("<dim>Downloading clip...</dim>") if args.dry_run:
click.echo("Dry run, clip not downloaded.")
if (args.dry_run is False): else:
print_log("Downloading clip...")
download_file(url, target) download_file(url, target)
click.echo(f"Downloaded: {blue(target)}")
print_out(f"Downloaded: <blue>{target}</blue>")
def _download_video(video_id, args: DownloadOptions) -> None: def _download_video(video_id, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start: if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time") raise ConsoleError("End time must be greater than start time")
print_out("<dim>Looking up video...</dim>") print_log("Looking up video...")
video = twitch.get_video(video_id) video = twitch.get_video(video_id)
if not video: if not video:
raise ConsoleError(f"Video {video_id} not found") raise ConsoleError(f"Video {video_id} not found")
title = video['title'] title = video["title"]
user = video['creator']['displayName'] user = video["creator"]["displayName"]
print_out(f"Found: <blue>{title}</blue> by <yellow>{user}</yellow>") click.echo(f"Found: {blue(title)} by {yellow(user)}")
target = _video_target_filename(video, args) target = _video_target_filename(video, args)
print_out(f"Output: <blue>{target}</blue>") click.echo(f"Output: {blue(target)}")
if not args.overwrite and path.exists(target): if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ") response = click.prompt("File exists. Overwrite? [Y/n]: ", default="Y", show_default=False)
if response.lower().strip() not in ["", "y"]: if response.lower().strip() != "y":
raise ConsoleError("Aborted") raise click.Abort()
args.overwrite = True args.overwrite = True
# Chapter select or manual offset # Chapter select or manual offset
start, end = _determine_time_range(video_id, args) start, end = _determine_time_range(video_id, args)
print_out("<dim>Fetching access token...</dim>") print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token) access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
print_out("<dim>Fetching playlists...</dim>") print_log("Fetching playlists...")
playlists_m3u8 = twitch.get_playlists(video_id, access_token) playlists_m3u8 = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8)) playlists = list(_parse_playlists(playlists_m3u8))
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality
else _select_playlist_interactive(playlists)) else _select_playlist_interactive(playlists))
print_out("<dim>Fetching playlist...</dim>") print_log("Fetching playlist...")
response = httpx.get(playlist_uri) response = httpx.get(playlist_uri)
response.raise_for_status() response.raise_for_status()
playlist = m3u8.loads(response.text) playlist = m3u8.loads(response.text)
@ -334,7 +330,7 @@ def _download_video(video_id, args: DownloadOptions) -> None:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f: with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text) f.write(response.text)
print_out(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}") click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths] sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)] targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit)) asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
@ -353,27 +349,29 @@ def _download_video(video_id, args: DownloadOptions) -> None:
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8") playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path) playlist.dump(playlist_path)
print_out("") click.echo()
if args.no_join: if args.no_join:
print_out("<dim>Skipping joining files...</dim>") print_log("Skipping joining files...")
print_out(f"VODs downloaded to:\n<blue>{target_dir}</blue>") click.echo(f"VODs downloaded to:\n{blue(target_dir)}")
return return
if args.concat: if args.concat:
print_out("<dim>Concating files...</dim>") print_log("Concating files...")
_concat_vods(targets, target) _concat_vods(targets, target)
else: else:
print_out("<dim>Joining files...</dim>") print_log("Joining files...")
_join_vods(playlist_path, target, args.overwrite, video) _join_vods(playlist_path, target, args.overwrite, video)
click.echo()
if args.keep: if args.keep:
print_out(f"\n<dim>Temporary files not deleted: {target_dir}</dim>") click.echo(f"Temporary files not deleted: {target_dir}")
else: else:
print_out("\n<dim>Deleting temporary files...</dim>") print_log("Deleting temporary files...")
shutil.rmtree(target_dir) shutil.rmtree(target_dir)
print_out(f"\nDownloaded: <green>{target}</green>") click.echo(f"\nDownloaded: {green(target)}")
def _determine_time_range(video_id, args: DownloadOptions): def _determine_time_range(video_id, args: DownloadOptions):
@ -381,7 +379,7 @@ def _determine_time_range(video_id, args: DownloadOptions):
return args.start, args.end return args.start, args.end
if args.chapter is not None: if args.chapter is not None:
print_out("<dim>Fetching chapters...</dim>") print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id) chapters = twitch.get_video_chapters(video_id)
if not chapters: if not chapters:
@ -395,7 +393,7 @@ def _determine_time_range(video_id, args: DownloadOptions):
except IndexError: except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.") raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n') click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000 start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000 duration = chapter["durationMilliseconds"] // 1000
return start, start + duration return start, start + duration
@ -404,10 +402,10 @@ def _determine_time_range(video_id, args: DownloadOptions):
def _choose_chapter_interactive(chapters): def _choose_chapter_interactive(chapters):
print_out("\nChapters:") click.echo("\nChapters:")
for index, chapter in enumerate(chapters): for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000) duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>') click.echo(f'{index + 1}) {bold(chapter["description"])} ({duration})')
index = utils.read_int("Select a chapter", 1, len(chapters)) index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1] chapter = chapters[index - 1]
return chapter return chapter

View File

@ -2,7 +2,7 @@ import sys
from twitchdl import twitch from twitchdl import twitch
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_out, print_paged_videos, print_video, print_json, print_video_compact from twitchdl.output import print_log, print_out, print_paged_videos, print_video, print_json, print_video_compact
def videos( def videos(
@ -71,7 +71,7 @@ def _get_game_ids(names: list[str]) -> list[str]:
game_ids = [] game_ids = []
for name in names: for name in names:
print_out(f"<dim>Looking up game '{name}'...</dim>") print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name) game_id = twitch.get_game_id(name)
if not game_id: if not game_id:
raise ConsoleError(f"Game '{name}' not found") raise ConsoleError(f"Game '{name}' not found")

View File

@ -64,10 +64,8 @@ def print_json(data: Any):
click.echo(json.dumps(data)) click.echo(json.dumps(data))
def print_log(*args, **kwargs): def print_log(message: Any):
args = [f"<dim>{a}</dim>" for a in args] click.secho(message, err=True, dim=True)
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_table(headers: list[str], data: list[list[str]]): def print_table(headers: list[str], data: list[list[str]]):
@ -177,3 +175,29 @@ def _continue():
return False return False
return True return True
# Shorthand functions for coloring output
def blue(text: Any) -> str:
return click.style(text, fg="blue")
def cyan(text: Any) -> str:
return click.style(text, fg="cyan")
def green(text: Any) -> str:
return click.style(text, fg="green")
def yellow(text: Any) -> str:
return click.style(text, fg="yellow")
def bold(text: Any) -> str:
return click.style(text, bold=True)
def dim(text: Any) -> str:
return click.style(text, dim=True)

View File

@ -1,6 +1,8 @@
import re import re
import unicodedata import unicodedata
import click
def _format_size(value: float, digits: int, unit: str): def _format_size(value: float, digits: int, unit: str):
if digits > 0: if digits > 0:
@ -54,14 +56,9 @@ def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int: def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
if default:
msg = msg + f" [default {default}]"
msg += ": "
while True: while True:
try: try:
val = input(msg) val = click.prompt(msg, default=default, type=int)
if default and not val: if default and not val:
return default return default
if min <= int(val) <= max: if min <= int(val) <= max: