Compare commits

..

7 Commits

Author SHA1 Message Date
0e8e2e3f40 wip 2024-04-01 10:52:41 +02:00
3270d857b1 Extract print_paged, improve types 2024-04-01 09:40:54 +02:00
3ae99fe159 Replace custom coloring fns with click.echo and style 2024-03-31 21:41:05 +02:00
9cf3ec2f07 Replace print_out with click.echo 2024-03-30 15:36:53 +01:00
64b88249f2 Remove print_err, unused 2024-03-30 10:11:44 +01:00
5dcc868275 Imporve types 2024-03-30 07:52:43 +01:00
11fbfd35fc Print placeholders in twitch-dl info 2024-03-30 07:35:08 +01:00
12 changed files with 404 additions and 289 deletions

View File

@ -23,6 +23,7 @@ dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=1.0.0,<4.0.0",
"python-dateutil>=2.8.0,<3.0.0",
]
[tool.setuptools]

View File

@ -5,8 +5,8 @@ import re
import sys
from twitchdl import __version__
from twitchdl.commands.clips import ClipsPeriod
from twitchdl.entities import DownloadOptions
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context
# https://click.palletsprojects.com/en/8.1.x/api/#context
@ -369,8 +369,8 @@ def videos(
json: bool,
limit: int | None,
pager: int | None,
sort: str,
type: str,
sort: VideosSort,
type: VideosType,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.videos import videos

View File

@ -1,16 +1,17 @@
from typing import Generator
import click
import re
import sys
from typing import Literal
from itertools import islice
from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.output import print_out, print_clip, print_json
from twitchdl.entities import Data
from twitchdl.output import green, print_clip, print_json, print_paged, yellow
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
def clips(
@ -21,7 +22,7 @@ def clips(
json: bool = False,
limit: int = 10,
pager: int | None = None,
period: ClipsPeriod = "all_time",
period: twitch.ClipsPeriod = "all_time",
):
# Ignore --limit if --pager or --all are given
limit = sys.maxsize if all or pager else limit
@ -35,13 +36,15 @@ def clips(
return _download_clips(generator)
if pager:
return _print_paged(generator, pager)
return print_paged("Clips", generator, print_clip, pager)
return _print_all(generator, all)
def _continue():
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
try:
input()
@ -51,7 +54,7 @@ def _continue():
return True
def _target_filename(clip):
def _target_filename(clip: Data):
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
@ -71,53 +74,26 @@ def _target_filename(clip):
return f"{name}.{ext}"
def _download_clips(generator):
def _download_clips(generator: Generator[Data, None, None]):
for clip in generator:
target = _target_filename(clip)
if path.exists(target):
print_out(f"Already downloaded: <green>{target}</green>")
click.echo(f"Already downloaded: {green(target)}")
else:
url = get_clip_authenticated_url(clip["slug"], "source")
print_out(f"Downloading: <yellow>{target}</yellow>")
click.echo(f"Downloading: {yellow(target)}")
download_file(url, target)
def _print_all(generator, all: bool):
def _print_all(generator: Generator[Data, None, None], all: bool):
for clip in generator:
print_out()
click.echo()
print_clip(clip)
if not all:
print_out(
"\n<dim>There may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.</dim>"
click.secho(
"\nThere may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.",
dim=True
)
def _print_paged(generator, page_size):
iterator = iter(generator)
page = list(islice(iterator, page_size))
first = 1
last = first + len(page) - 1
while True:
print_out("-" * 80)
print_out()
for clip in page:
print_clip(clip)
print_out()
last = first + len(page) - 1
print_out("-" * 80)
print_out(f"<yellow>Clips {first}-{last}</yellow>")
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not _continue():
break

View File

@ -1,5 +1,6 @@
import asyncio
import platform
import click
import httpx
import m3u8
import os
@ -14,11 +15,12 @@ from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode
from twitchdl import twitch, utils
from twitchdl.conversion import from_dict
from twitchdl.download import download_file
from twitchdl.entities import Data, DownloadOptions
from twitchdl.entities import Data, DownloadOptions, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.output import print_out
from twitchdl.output import blue, bold, dim, green, print_log, yellow
def download(ids: list[str], args: DownloadOptions):
@ -67,28 +69,28 @@ def _get_playlist_by_name(playlists, quality):
def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:")
click.echo("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
print_out(f"{n + 1}) <b>{name}</b> <dim>({resolution})</dim>")
click.echo(f"{n + 1}) {bold(name)} {dim(f'({resolution})')}")
else:
print_out(f"{n + 1}) <b>{name}</b>")
click.echo(f"{n + 1}) {bold(name)}")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
description = video["description"] or ""
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
description = video.description or ""
description = description.strip()
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", f"artist={video['creator']['displayName']}",
"-metadata", f"title={video['title']}",
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", f"description={description}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
@ -99,7 +101,7 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
if overwrite:
command.append("-y")
print_out(f"<dim>{' '.join(command)}</dim>")
click.secho(f"{' '.join(command)}", dim = True)
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
@ -114,27 +116,29 @@ def _concat_vods(vod_paths: list[str], target: str):
raise ConsoleError(f"Joining files failed: {result.stderr}")
def get_video_substitutions(video: Data, format: str) -> Data:
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def get_video_placeholders(video: Video, format: str) -> Data:
date = video.published_at.date().isoformat()
time = video.published_at.time().isoformat()
datetime = video.published_at.isoformat().replace("+00:00", "Z")
game = video.game.name if video.game else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"channel": video.creator.display_name,
"channel_login": video.creator.login,
"date": date,
"datetime": video["publishedAt"],
"datetime": datetime,
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"id": video.id,
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
}
def _video_target_filename(video: Data, args: DownloadOptions):
subs = get_video_substitutions(video, args.format)
def _video_target_filename(video: Video, args: DownloadOptions):
subs = get_video_placeholders(video, args.format)
try:
return args.output.format(**subs)
@ -219,10 +223,10 @@ def _get_clip_url(clip, quality):
raise ConsoleError(msg)
# Ask user to select quality
print_out("\nAvailable qualities:")
click.echo("\nAvailable qualities:")
for n, q in enumerate(qualities):
print_out(f"{n + 1}) {q['quality']} [{q['frameRate']} fps]")
print_out()
click.echo(f"{n + 1}) {bold(q['quality'])} [{q['frameRate']} fps]")
click.echo()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
selected_quality = qualities[no - 1]
@ -230,7 +234,7 @@ def _get_clip_url(clip, quality):
def get_clip_authenticated_url(slug, quality):
print_out("<dim>Fetching access token...</dim>")
print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug)
if not access_token:
@ -247,79 +251,73 @@ def get_clip_authenticated_url(slug, quality):
def _download_clip(slug: str, args: DownloadOptions) -> None:
print_out("<dim>Looking up clip...</dim>")
print_log("Looking up clip...")
clip = twitch.get_clip(slug)
if not clip:
raise ConsoleError(f"Clip '{slug}' not found")
title = clip["title"]
user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"])
print_out(
f"Found: <green>{title}</green> by <yellow>{user}</yellow>, "+
f"playing <blue>{game}</blue> ({duration})"
)
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
target = _clip_target_filename(clip, args)
print_out(f"Target: <blue>{target}</blue>")
click.echo(f"Target: {blue(target)}")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
if response.lower().strip() not in ["", "y"]:
raise ConsoleError("Aborted")
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality)
print_out(f"<dim>Selected URL: {url}</dim>")
print_log(f"Selected URL: {url}")
print_out("<dim>Downloading clip...</dim>")
if (args.dry_run is False):
if args.dry_run:
click.echo("Dry run, clip not downloaded.")
else:
print_log("Downloading clip...")
download_file(url, target)
print_out(f"Downloaded: <blue>{target}</blue>")
click.echo(f"Downloaded: {blue(target)}")
def _download_video(video_id, args: DownloadOptions) -> None:
def _download_video(video_id: str, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
print_out("<dim>Looking up video...</dim>")
video = twitch.get_video(video_id)
print_log("Looking up video...")
response = twitch.get_video(video_id)
if not video:
if not response:
raise ConsoleError(f"Video {video_id} not found")
title = video['title']
user = video['creator']['displayName']
print_out(f"Found: <blue>{title}</blue> by <yellow>{user}</yellow>")
video = from_dict(Video, response)
click.echo(f"Found: {blue(video.title)} by {yellow(video.creator.display_name)}")
target = _video_target_filename(video, args)
print_out(f"Output: <blue>{target}</blue>")
click.echo(f"Output: {blue(target)}")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
if response.lower().strip() not in ["", "y"]:
raise ConsoleError("Aborted")
response = click.prompt("File exists. Overwrite? [Y/n]: ", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
args.overwrite = True
# Chapter select or manual offset
start, end = _determine_time_range(video_id, args)
print_out("<dim>Fetching access token...</dim>")
print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
print_out("<dim>Fetching playlists...</dim>")
print_log("Fetching playlists...")
playlists_m3u8 = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8))
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality
else _select_playlist_interactive(playlists))
print_out("<dim>Fetching playlist...</dim>")
print_log("Fetching playlist...")
response = httpx.get(playlist_uri)
response.raise_for_status()
playlist = m3u8.loads(response.text)
@ -334,7 +332,7 @@ def _download_video(video_id, args: DownloadOptions) -> None:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text)
print_out(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
@ -353,27 +351,29 @@ def _download_video(video_id, args: DownloadOptions) -> None:
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path)
print_out("")
click.echo()
if args.no_join:
print_out("<dim>Skipping joining files...</dim>")
print_out(f"VODs downloaded to:\n<blue>{target_dir}</blue>")
print_log("Skipping joining files...")
click.echo(f"VODs downloaded to:\n{blue(target_dir)}")
return
if args.concat:
print_out("<dim>Concating files...</dim>")
print_log("Concating files...")
_concat_vods(targets, target)
else:
print_out("<dim>Joining files...</dim>")
print_log("Joining files...")
_join_vods(playlist_path, target, args.overwrite, video)
click.echo()
if args.keep:
print_out(f"\n<dim>Temporary files not deleted: {target_dir}</dim>")
click.echo(f"Temporary files not deleted: {target_dir}")
else:
print_out("\n<dim>Deleting temporary files...</dim>")
print_log("Deleting temporary files...")
shutil.rmtree(target_dir)
print_out(f"\nDownloaded: <green>{target}</green>")
click.echo(f"\nDownloaded: {green(target)}")
def _determine_time_range(video_id, args: DownloadOptions):
@ -381,7 +381,7 @@ def _determine_time_range(video_id, args: DownloadOptions):
return args.start, args.end
if args.chapter is not None:
print_out("<dim>Fetching chapters...</dim>")
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
if not chapters:
@ -395,7 +395,7 @@ def _determine_time_range(video_id, args: DownloadOptions):
except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n')
click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000
return start, start + duration
@ -404,10 +404,10 @@ def _determine_time_range(video_id, args: DownloadOptions):
def _choose_chapter_interactive(chapters):
print_out("\nChapters:")
click.echo("\nChapters:")
for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>')
click.echo(f'{index + 1}) {bold(chapter["description"])} ({duration})')
index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1]
return chapter

View File

@ -1,17 +1,21 @@
import click
import m3u8
from twitchdl import utils, twitch
from twitchdl.commands.download import get_video_substitutions
from twitchdl.commands.download import get_video_placeholders
from twitchdl.conversion import from_dict
from twitchdl.entities import Data, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log
def info(id: str, *, json: bool = False, format="mkv"):
def info(id: str, *, json: bool = False):
video_id = utils.parse_video_identifier(id)
if video_id:
print_log("Fetching video...")
video = twitch.get_video(video_id)
response = twitch.get_video(video_id)
if not video:
if not response:
raise ConsoleError(f"Video {video_id} not found")
print_log("Fetching access token...")
@ -23,16 +27,11 @@ def info(id: str, *, json: bool = False, format="mkv"):
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
substitutions = get_video_substitutions(video, format)
if json:
video_json(video, playlists, chapters)
video_json(response, playlists, chapters)
else:
video = from_dict(Video, response)
video_info(video, playlists, chapters)
print_out("\nOutput format placeholders:")
for k, v in substitutions.items():
print(f" * {k} = {v}")
return
clip_slug = utils.parse_clip_identifier(id)
@ -51,22 +50,26 @@ def info(id: str, *, json: bool = False, format="mkv"):
raise ConsoleError(f"Invalid input: {id}")
def video_info(video, playlists, chapters):
print_out()
def video_info(video: Video, playlists, chapters):
click.echo()
print_video(video)
print_out()
print_out("Playlists:")
click.echo("Playlists:")
for p in m3u8.loads(playlists).playlists:
print_out(f"<b>{p.stream_info.video}</b> {p.uri}")
click.echo(f"{bold(p.stream_info.video)} {p.uri}")
if chapters:
print_out()
print_out("Chapters:")
click.echo()
click.echo("Chapters:")
for chapter in chapters:
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{start} <b>{chapter["description"]}</b> ({duration})')
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = get_video_placeholders(video, format = "mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
def video_json(video, playlists, chapters):
@ -87,11 +90,11 @@ def video_json(video, playlists, chapters):
print_json(video)
def clip_info(clip):
print_out()
def clip_info(clip: Data):
click.echo()
print_clip(clip)
print_out()
print_out("Download links:")
click.echo()
click.echo("Download links:")
for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")

View File

@ -1,8 +1,10 @@
import sys
import click
from twitchdl import twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_out, print_paged_videos, print_video, print_json, print_video_compact
from twitchdl.output import print_log, print_paged, print_video, print_json, print_video_compact
def videos(
@ -14,8 +16,8 @@ def videos(
json: bool,
limit: int | None,
pager: int | None,
sort: str,
type: str,
sort: twitch.VideosSort,
type: twitch.VideosType,
):
game_ids = _get_game_ids(games)
@ -38,11 +40,12 @@ def videos(
return
if total_count == 0:
print_out("<yellow>No videos found</yellow>")
click.echo("No videos found")
return
if pager:
print_paged_videos(generator, pager, total_count)
print_fn = print_video_compact if compact else print_video
print_paged("Videos", generator, print_fn, pager, total_count)
return
count = 0
@ -50,28 +53,28 @@ def videos(
if compact:
print_video_compact(video)
else:
print_out()
click.echo()
print_video(video)
count += 1
print_out()
print_out("-" * 80)
print_out(f"<yellow>Videos 1-{count} of {total_count}</yellow>")
click.echo()
click.echo("-" * 80)
click.echo(f"Videos 1-{count} of {total_count}")
if total_count > count:
print_out()
print_out(
"<dim>There are more videos. Increase the --limit, use --all or --pager to see the rest.</dim>"
click.secho(
"\nThere are more videos. Increase the --limit, use --all or --pager to see the rest.",
dim=True
)
def _get_game_ids(names):
def _get_game_ids(names: list[str]) -> list[str]:
if not names:
return []
game_ids = []
for name in names:
print_out(f"<dim>Looking up game '{name}'...</dim>")
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")

87
twitchdl/conversion.py Normal file
View File

@ -0,0 +1,87 @@
import re
import dataclasses
from dataclasses import Field, is_dataclass
from datetime import date, datetime
from dateutil import parser
from typing import Any, Generator, Type, TypeVar, Union, get_args, get_origin, Callable
from typing import get_type_hints
# Generic data class instance
T = TypeVar("T")
# Dict of data decoded from JSON
Data = dict[str, Any]
def snake_to_camel(name: str):
def repl(match: re.Match[str]):
return match.group(1).upper()
return re.sub(r"_([a-z])", repl, name)
def from_dict(cls: Type[T], data: Data, key_fn: Callable[[str], str] = snake_to_camel) -> T:
"""Convert a nested dict into an instance of `cls`."""
def _fields() -> Generator[tuple[str, Any], None, None]:
hints = get_type_hints(cls)
for field in dataclasses.fields(cls):
field_type = _prune_optional(hints[field.name])
dict_field_name = key_fn(field.name)
if (value := data.get(dict_field_name)) is not None:
field_value = _convert(field_type, value)
else:
field_value = _get_default_value(field)
yield field.name, field_value
return cls(**dict(_fields()))
def from_dict_list(cls: Type[T], data: list[Data]) -> list[T]:
return [from_dict(cls, x) for x in data]
def _get_default_value(field: Field[Any]):
if field.default is not dataclasses.MISSING:
return field.default
if field.default_factory is not dataclasses.MISSING:
return field.default_factory()
return None
def _convert(field_type: Type[Any], value: Any) -> Any:
if value is None:
return None
if field_type in [str, int, bool, dict]:
return value
if field_type == datetime:
return parser.parse(value)
if field_type == date:
return date.fromisoformat(value)
if get_origin(field_type) == list:
(inner_type,) = get_args(field_type)
return [_convert(inner_type, x) for x in value]
if is_dataclass(field_type):
return from_dict(field_type, value)
raise ValueError(f"Not implemented for type '{field_type}'")
def _prune_optional(field_type: Type[Any]):
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
if get_origin(field_type) == Union:
args = get_args(field_type)
if len(args) == 2 and args[1] == type(None): # noqa
return args[0]
return field_type

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@ -23,3 +24,30 @@ class DownloadOptions:
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = dict[str, Any]
@dataclass
class User:
login: str
display_name: str
@dataclass
class Game:
name: str
@dataclass
class Video:
id: str
title: str
description: str
published_at: datetime
broadcast_type: str
length_seconds: int
game: Game
creator: User
@dataclass
class AccessToken:
signature: str
value: str

View File

@ -1,50 +1,13 @@
import click
import json
import sys
import re
from itertools import islice
from twitchdl import utils
from typing import Any, Match
from typing import Any, Callable, Generator, TypeVar
from twitchdl.entities import Data, Video
START_CODES = {
'b': '\033[1m',
'dim': '\033[2m',
'i': '\033[3m',
'u': '\033[4m',
'red': '\033[91m',
'green': '\033[92m',
'yellow': '\033[93m',
'blue': '\033[94m',
'magenta': '\033[95m',
'cyan': '\033[96m',
}
END_CODE = '\033[0m'
START_PATTERN = "<(" + "|".join(START_CODES.keys()) + ")>"
END_PATTERN = "</(" + "|".join(START_CODES.keys()) + ")>"
USE_ANSI_COLOR = "--no-color" not in sys.argv
def start_code(match: Match[str]) -> str:
name = match.group(1)
return START_CODES[name]
def colorize(text: str) -> str:
text = re.sub(START_PATTERN, start_code, text)
text = re.sub(END_PATTERN, END_CODE, text)
return text
def strip_tags(text: str) -> str:
text = re.sub(START_PATTERN, '', text)
text = re.sub(END_PATTERN, '', text)
return text
T = TypeVar("T")
def truncate(string: str, length: int) -> str:
@ -54,59 +17,40 @@ def truncate(string: str, length: int) -> str:
return string
def print_out(*args, **kwargs):
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, **kwargs)
def print_json(data: Any):
print(json.dumps(data))
click.echo(json.dumps(data))
def print_err(*args, **kwargs):
args = [f"<red>{arg}</red>" for arg in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_log(message: Any):
click.secho(message, err=True, dim=True)
def print_log(*args, **kwargs):
args = [f"<dim>{a}</dim>" for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_table(headers: list[str], data: list[list[str]]):
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
underlines = ["-" * width for width in widths]
def print_row(row: list[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(cell.ljust(width), nl=False)
click.echo(" ", nl=False)
click.echo()
print_row(headers)
print_row(underlines)
for row in data:
print_row(row)
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = f"<blue>{video['creator']['displayName']}</blue>" if video["creator"] else ""
playing = f"playing <blue>{video['game']['name']}</blue>" if video["game"] else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video['id']}"
print_out(f"<b>Video {video['id']}</b>")
print_out(f"<green>{video['title']}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
if video["description"]:
print_out(f"Description: {video['description']}")
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue> ")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
def print_paged_videos(generator, page_size, total_count):
def print_paged(
label: str,
generator: Generator[T, Any, Any],
print_fn: Callable[[T], None],
page_size: int,
total_count: int | None = None,
):
iterator = iter(generator)
page = list(islice(iterator, page_size))
@ -114,48 +58,79 @@ def print_paged_videos(generator, page_size, total_count):
last = first + len(page) - 1
while True:
print_out("-" * 80)
click.echo("-" * 80)
print_out()
for video in page:
print_video(video)
print_out()
click.echo()
for item in page:
print_fn(item)
last = first + len(page) - 1
print_out("-" * 80)
print_out(f"<yellow>Videos {first}-{last} of {total_count}</yellow>")
click.echo("-" * 80)
click.echo(f"{label} {first}-{last} of {total_count or '???'}")
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not _continue():
if not page or not prompt_continue():
break
def print_clip(clip):
def print_video(video: Video):
published_at = str(video.published_at.astimezone())
length = utils.format_duration(video.length_seconds)
channel = blue(video.creator.display_name) if video.creator else ""
playing = f"playing {blue(video.game.name)}" if video.game else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video.id}"
click.secho(f"Video {video.id}", bold=True)
click.secho(f"{video.title}", fg="green")
if channel or playing:
click.echo(" ".join([channel, playing]))
if video.description:
click.echo(f"Description: {video.description}")
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)
click.echo()
def print_video_compact(video: Data):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
click.echo(f"{bold(id)} {date} {green(title)} {blue(game)}")
def print_clip(clip: Data):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
playing = (
f"playing <blue>{clip['game']['name']}</blue>"
if clip["game"] else ""
playing = f"playing {blue(clip['game']['name'])}" if clip["game"] else ""
click.echo(f"Clip {bold(clip['slug'])}")
click.secho(clip["title"], fg="green")
click.echo(f"{blue(channel)} {playing}")
click.echo(
f"Published {blue(published_at)}" +
f" Length: {blue(length)}" +
f" Views: {blue(clip['viewCount'])}"
)
print_out(f"Clip <b>{clip['slug']}</b>")
print_out(f"<green>{clip['title']}</green>")
print_out(f"<blue>{channel}</blue> {playing}")
print_out(
f"Published <blue>{published_at}</blue>" +
f" Length: <blue>{length}</blue>" +
f" Views: <blue>{clip["viewCount"]}</blue>"
)
print_out(f"<i>{clip['url']}</i>")
click.secho(clip["url"], italic=True)
def _continue():
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
def prompt_continue():
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
try:
input()
@ -163,3 +138,29 @@ def _continue():
return False
return True
# Shorthand functions for coloring output
def blue(text: Any) -> str:
return click.style(text, fg="blue")
def cyan(text: Any) -> str:
return click.style(text, fg="cyan")
def green(text: Any) -> str:
return click.style(text, fg="green")
def yellow(text: Any) -> str:
return click.style(text, fg="yellow")
def bold(text: Any) -> str:
return click.style(text, bold=True)
def dim(text: Any) -> str:
return click.style(text, dim=True)

View File

@ -1,3 +1,4 @@
import click
import logging
import time
@ -6,7 +7,7 @@ from dataclasses import dataclass, field
from statistics import mean
from typing import Dict, NamedTuple, Optional, Deque
from twitchdl.output import print_out
from twitchdl.output import blue
from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__)
@ -127,11 +128,11 @@ class Progress:
progress = " ".join([
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs",
f"<blue>{self.progress_perc}%</blue>",
f"of <blue>~{format_size(self.estimated_total)}</blue>" if self.estimated_total else "",
f"at <blue>{format_size(self.speed)}/s</blue>" if self.speed else "",
f"ETA <blue>{format_time(self.remaining_time)}</blue>" if self.remaining_time is not None else "",
blue(self.progress_perc),
f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "",
f"at {blue(format_size(self.speed))}/s" if self.speed else "",
f"ETA {blue(format_time(self.remaining_time))}" if self.remaining_time is not None else "",
])
print_out(f"\r{progress} ", end="")
click.echo(f"\r{progress} ", nl=False)
self.last_printed = now

View File

@ -6,11 +6,17 @@ import httpx
import json
import click
from typing import Dict
from typing import Dict, Generator, Literal
from twitchdl import CLIENT_ID
from twitchdl.entities import AccessToken, Data
from twitchdl.exceptions import ConsoleError
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class GQLError(click.ClickException):
def __init__(self, errors: list[str]):
message = "GraphQL query failed."
@ -140,7 +146,7 @@ def get_clip_access_token(slug: str):
return response["data"]["clip"]
def get_channel_clips(channel_id: str, period: str, limit: int, after: str | None= None):
def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None= None):
"""
List channel clips.
@ -177,8 +183,8 @@ def get_channel_clips(channel_id: str, period: str, limit: int, after: str | Non
return response["data"]["user"]["clips"]
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
def channel_clips_generator(channel_id: str, period: str, limit: int) -> Generator[Data, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Data, None, None]:
for clip in clips["edges"]:
if limit < 1:
return
@ -262,8 +268,16 @@ def get_channel_videos(
return response["data"]["user"]["videos"]
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
def _generator(videos, max_videos):
def channel_videos_generator(
channel_id: str,
max_videos: int,
sort: VideosSort,
type: VideosType,
game_ids: list[str] | None = None
) -> tuple[int, Generator[Data, None, None]]:
game_ids = game_ids or []
def _generator(videos: Data, max_videos: int) -> Generator[Data, None, None]:
for video in videos["edges"]:
if max_videos < 1:
return
@ -284,7 +298,7 @@ def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id, auth_token=None):
def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToken:
query = f"""
{{
videoPlaybackAccessToken(
@ -307,7 +321,11 @@ def get_access_token(video_id, auth_token=None):
try:
response = gql_query(query, headers=headers)
return response["data"]["videoPlaybackAccessToken"]
return AccessToken(
response["data"]["videoPlaybackAccessToken"]["signature"],
response["data"]["videoPlaybackAccessToken"]["value"],
)
except httpx.HTTPStatusError as error:
# Provide a more useful error message when server returns HTTP 401
# Unauthorized while using a user-provided auth token.
@ -323,15 +341,15 @@ def get_access_token(video_id, auth_token=None):
raise
def get_playlists(video_id, access_token):
def get_playlists(video_id: str, access_token: AccessToken):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(url, params={
"nauth": access_token['value'],
"nauthsig": access_token['signature'],
"nauth": access_token.value,
"nauthsig": access_token.signature,
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",

View File

@ -1,6 +1,8 @@
import re
import unicodedata
import click
def _format_size(value: float, digits: int, unit: str):
if digits > 0:
@ -9,7 +11,7 @@ def _format_size(value: float, digits: int, unit: str):
return f"{int(value)}{unit}"
def format_size(bytes_: int, digits: int = 1):
def format_size(bytes_: int | float, digits: int = 1):
if bytes_ < 1024:
return _format_size(bytes_, digits, "B")
@ -54,14 +56,9 @@ def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
if default:
msg = msg + f" [default {default}]"
msg += ": "
while True:
try:
val = input(msg)
val = click.prompt(msg, default=default, type=int)
if default and not val:
return default
if min <= int(val) <= max: