mirror of
https://github.com/ihabunek/twitch-dl
synced 2024-08-30 18:32:25 +00:00
Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
0e8e2e3f40 | |||
3270d857b1 | |||
3ae99fe159 | |||
9cf3ec2f07 | |||
64b88249f2 | |||
5dcc868275 | |||
11fbfd35fc |
@ -23,6 +23,7 @@ dependencies = [
|
||||
"click>=8.0.0,<9.0.0",
|
||||
"httpx>=0.17.0,<1.0.0",
|
||||
"m3u8>=1.0.0,<4.0.0",
|
||||
"python-dateutil>=2.8.0,<3.0.0",
|
||||
]
|
||||
|
||||
[tool.setuptools]
|
||||
|
@ -5,8 +5,8 @@ import re
|
||||
import sys
|
||||
|
||||
from twitchdl import __version__
|
||||
from twitchdl.commands.clips import ClipsPeriod
|
||||
from twitchdl.entities import DownloadOptions
|
||||
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
|
||||
|
||||
# Tweak the Click context
|
||||
# https://click.palletsprojects.com/en/8.1.x/api/#context
|
||||
@ -369,8 +369,8 @@ def videos(
|
||||
json: bool,
|
||||
limit: int | None,
|
||||
pager: int | None,
|
||||
sort: str,
|
||||
type: str,
|
||||
sort: VideosSort,
|
||||
type: VideosType,
|
||||
):
|
||||
"""List or download clips for given CHANNEL_NAME."""
|
||||
from twitchdl.commands.videos import videos
|
||||
|
@ -1,16 +1,17 @@
|
||||
from typing import Generator
|
||||
import click
|
||||
import re
|
||||
import sys
|
||||
|
||||
from typing import Literal
|
||||
from itertools import islice
|
||||
from os import path
|
||||
|
||||
from twitchdl import twitch, utils
|
||||
from twitchdl.commands.download import get_clip_authenticated_url
|
||||
from twitchdl.download import download_file
|
||||
from twitchdl.output import print_out, print_clip, print_json
|
||||
from twitchdl.entities import Data
|
||||
from twitchdl.output import green, print_clip, print_json, print_paged, yellow
|
||||
|
||||
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
|
||||
|
||||
|
||||
def clips(
|
||||
@ -21,7 +22,7 @@ def clips(
|
||||
json: bool = False,
|
||||
limit: int = 10,
|
||||
pager: int | None = None,
|
||||
period: ClipsPeriod = "all_time",
|
||||
period: twitch.ClipsPeriod = "all_time",
|
||||
):
|
||||
# Ignore --limit if --pager or --all are given
|
||||
limit = sys.maxsize if all or pager else limit
|
||||
@ -35,13 +36,15 @@ def clips(
|
||||
return _download_clips(generator)
|
||||
|
||||
if pager:
|
||||
return _print_paged(generator, pager)
|
||||
return print_paged("Clips", generator, print_clip, pager)
|
||||
|
||||
return _print_all(generator, all)
|
||||
|
||||
|
||||
def _continue():
|
||||
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
|
||||
enter = click.style("Enter", bold=True, fg="green")
|
||||
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
|
||||
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
|
||||
|
||||
try:
|
||||
input()
|
||||
@ -51,7 +54,7 @@ def _continue():
|
||||
return True
|
||||
|
||||
|
||||
def _target_filename(clip):
|
||||
def _target_filename(clip: Data):
|
||||
url = clip["videoQualities"][0]["sourceURL"]
|
||||
_, ext = path.splitext(url)
|
||||
ext = ext.lstrip(".")
|
||||
@ -71,53 +74,26 @@ def _target_filename(clip):
|
||||
return f"{name}.{ext}"
|
||||
|
||||
|
||||
def _download_clips(generator):
|
||||
def _download_clips(generator: Generator[Data, None, None]):
|
||||
for clip in generator:
|
||||
target = _target_filename(clip)
|
||||
|
||||
if path.exists(target):
|
||||
print_out(f"Already downloaded: <green>{target}</green>")
|
||||
click.echo(f"Already downloaded: {green(target)}")
|
||||
else:
|
||||
url = get_clip_authenticated_url(clip["slug"], "source")
|
||||
print_out(f"Downloading: <yellow>{target}</yellow>")
|
||||
click.echo(f"Downloading: {yellow(target)}")
|
||||
download_file(url, target)
|
||||
|
||||
|
||||
def _print_all(generator, all: bool):
|
||||
def _print_all(generator: Generator[Data, None, None], all: bool):
|
||||
for clip in generator:
|
||||
print_out()
|
||||
click.echo()
|
||||
print_clip(clip)
|
||||
|
||||
if not all:
|
||||
print_out(
|
||||
"\n<dim>There may be more clips. " +
|
||||
"Increase the --limit, use --all or --pager to see the rest.</dim>"
|
||||
click.secho(
|
||||
"\nThere may be more clips. " +
|
||||
"Increase the --limit, use --all or --pager to see the rest.",
|
||||
dim=True
|
||||
)
|
||||
|
||||
|
||||
def _print_paged(generator, page_size):
|
||||
iterator = iter(generator)
|
||||
page = list(islice(iterator, page_size))
|
||||
|
||||
first = 1
|
||||
last = first + len(page) - 1
|
||||
|
||||
while True:
|
||||
print_out("-" * 80)
|
||||
|
||||
print_out()
|
||||
for clip in page:
|
||||
print_clip(clip)
|
||||
print_out()
|
||||
|
||||
last = first + len(page) - 1
|
||||
|
||||
print_out("-" * 80)
|
||||
print_out(f"<yellow>Clips {first}-{last}</yellow>")
|
||||
|
||||
first = first + len(page)
|
||||
last = first + 1
|
||||
|
||||
page = list(islice(iterator, page_size))
|
||||
if not page or not _continue():
|
||||
break
|
||||
|
@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import platform
|
||||
import click
|
||||
import httpx
|
||||
import m3u8
|
||||
import os
|
||||
@ -14,11 +15,12 @@ from typing import List, Optional, OrderedDict
|
||||
from urllib.parse import urlparse, urlencode
|
||||
|
||||
from twitchdl import twitch, utils
|
||||
from twitchdl.conversion import from_dict
|
||||
from twitchdl.download import download_file
|
||||
from twitchdl.entities import Data, DownloadOptions
|
||||
from twitchdl.entities import Data, DownloadOptions, Video
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
from twitchdl.http import download_all
|
||||
from twitchdl.output import print_out
|
||||
from twitchdl.output import blue, bold, dim, green, print_log, yellow
|
||||
|
||||
|
||||
def download(ids: list[str], args: DownloadOptions):
|
||||
@ -67,28 +69,28 @@ def _get_playlist_by_name(playlists, quality):
|
||||
|
||||
|
||||
def _select_playlist_interactive(playlists):
|
||||
print_out("\nAvailable qualities:")
|
||||
click.echo("\nAvailable qualities:")
|
||||
for n, (name, resolution, uri) in enumerate(playlists):
|
||||
if resolution:
|
||||
print_out(f"{n + 1}) <b>{name}</b> <dim>({resolution})</dim>")
|
||||
click.echo(f"{n + 1}) {bold(name)} {dim(f'({resolution})')}")
|
||||
else:
|
||||
print_out(f"{n + 1}) <b>{name}</b>")
|
||||
click.echo(f"{n + 1}) {bold(name)}")
|
||||
|
||||
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
|
||||
_, _, uri = playlists[no - 1]
|
||||
return uri
|
||||
|
||||
|
||||
def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
|
||||
description = video["description"] or ""
|
||||
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
|
||||
description = video.description or ""
|
||||
description = description.strip()
|
||||
|
||||
command = [
|
||||
"ffmpeg",
|
||||
"-i", playlist_path,
|
||||
"-c", "copy",
|
||||
"-metadata", f"artist={video['creator']['displayName']}",
|
||||
"-metadata", f"title={video['title']}",
|
||||
"-metadata", f"artist={video.creator.display_name}",
|
||||
"-metadata", f"title={video.title}",
|
||||
"-metadata", f"description={description}",
|
||||
"-metadata", "encoded_by=twitch-dl",
|
||||
"-stats",
|
||||
@ -99,7 +101,7 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
|
||||
if overwrite:
|
||||
command.append("-y")
|
||||
|
||||
print_out(f"<dim>{' '.join(command)}</dim>")
|
||||
click.secho(f"{' '.join(command)}", dim = True)
|
||||
result = subprocess.run(command)
|
||||
if result.returncode != 0:
|
||||
raise ConsoleError("Joining files failed")
|
||||
@ -114,27 +116,29 @@ def _concat_vods(vod_paths: list[str], target: str):
|
||||
raise ConsoleError(f"Joining files failed: {result.stderr}")
|
||||
|
||||
|
||||
def get_video_substitutions(video: Data, format: str) -> Data:
|
||||
date, time = video['publishedAt'].split("T")
|
||||
game = video["game"]["name"] if video["game"] else "Unknown"
|
||||
def get_video_placeholders(video: Video, format: str) -> Data:
|
||||
date = video.published_at.date().isoformat()
|
||||
time = video.published_at.time().isoformat()
|
||||
datetime = video.published_at.isoformat().replace("+00:00", "Z")
|
||||
game = video.game.name if video.game else "Unknown"
|
||||
|
||||
return {
|
||||
"channel": video["creator"]["displayName"],
|
||||
"channel_login": video["creator"]["login"],
|
||||
"channel": video.creator.display_name,
|
||||
"channel_login": video.creator.login,
|
||||
"date": date,
|
||||
"datetime": video["publishedAt"],
|
||||
"datetime": datetime,
|
||||
"format": format,
|
||||
"game": game,
|
||||
"game_slug": utils.slugify(game),
|
||||
"id": video["id"],
|
||||
"id": video.id,
|
||||
"time": time,
|
||||
"title": utils.titlify(video["title"]),
|
||||
"title_slug": utils.slugify(video["title"]),
|
||||
"title": utils.titlify(video.title),
|
||||
"title_slug": utils.slugify(video.title),
|
||||
}
|
||||
|
||||
|
||||
def _video_target_filename(video: Data, args: DownloadOptions):
|
||||
subs = get_video_substitutions(video, args.format)
|
||||
def _video_target_filename(video: Video, args: DownloadOptions):
|
||||
subs = get_video_placeholders(video, args.format)
|
||||
|
||||
try:
|
||||
return args.output.format(**subs)
|
||||
@ -219,10 +223,10 @@ def _get_clip_url(clip, quality):
|
||||
raise ConsoleError(msg)
|
||||
|
||||
# Ask user to select quality
|
||||
print_out("\nAvailable qualities:")
|
||||
click.echo("\nAvailable qualities:")
|
||||
for n, q in enumerate(qualities):
|
||||
print_out(f"{n + 1}) {q['quality']} [{q['frameRate']} fps]")
|
||||
print_out()
|
||||
click.echo(f"{n + 1}) {bold(q['quality'])} [{q['frameRate']} fps]")
|
||||
click.echo()
|
||||
|
||||
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
|
||||
selected_quality = qualities[no - 1]
|
||||
@ -230,7 +234,7 @@ def _get_clip_url(clip, quality):
|
||||
|
||||
|
||||
def get_clip_authenticated_url(slug, quality):
|
||||
print_out("<dim>Fetching access token...</dim>")
|
||||
print_log("Fetching access token...")
|
||||
access_token = twitch.get_clip_access_token(slug)
|
||||
|
||||
if not access_token:
|
||||
@ -247,79 +251,73 @@ def get_clip_authenticated_url(slug, quality):
|
||||
|
||||
|
||||
def _download_clip(slug: str, args: DownloadOptions) -> None:
|
||||
print_out("<dim>Looking up clip...</dim>")
|
||||
print_log("Looking up clip...")
|
||||
clip = twitch.get_clip(slug)
|
||||
|
||||
if not clip:
|
||||
raise ConsoleError(f"Clip '{slug}' not found")
|
||||
|
||||
|
||||
title = clip["title"]
|
||||
user = clip["broadcaster"]["displayName"]
|
||||
game = clip["game"]["name"] if clip["game"] else "Unknown"
|
||||
duration = utils.format_duration(clip["durationSeconds"])
|
||||
|
||||
print_out(
|
||||
f"Found: <green>{title}</green> by <yellow>{user}</yellow>, "+
|
||||
f"playing <blue>{game}</blue> ({duration})"
|
||||
)
|
||||
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
|
||||
|
||||
target = _clip_target_filename(clip, args)
|
||||
print_out(f"Target: <blue>{target}</blue>")
|
||||
click.echo(f"Target: {blue(target)}")
|
||||
|
||||
if not args.overwrite and path.exists(target):
|
||||
response = input("File exists. Overwrite? [Y/n]: ")
|
||||
if response.lower().strip() not in ["", "y"]:
|
||||
raise ConsoleError("Aborted")
|
||||
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
|
||||
if response.lower().strip() != "y":
|
||||
raise click.Abort()
|
||||
args.overwrite = True
|
||||
|
||||
url = get_clip_authenticated_url(slug, args.quality)
|
||||
print_out(f"<dim>Selected URL: {url}</dim>")
|
||||
print_log(f"Selected URL: {url}")
|
||||
|
||||
print_out("<dim>Downloading clip...</dim>")
|
||||
|
||||
if (args.dry_run is False):
|
||||
if args.dry_run:
|
||||
click.echo("Dry run, clip not downloaded.")
|
||||
else:
|
||||
print_log("Downloading clip...")
|
||||
download_file(url, target)
|
||||
|
||||
print_out(f"Downloaded: <blue>{target}</blue>")
|
||||
click.echo(f"Downloaded: {blue(target)}")
|
||||
|
||||
|
||||
def _download_video(video_id, args: DownloadOptions) -> None:
|
||||
def _download_video(video_id: str, args: DownloadOptions) -> None:
|
||||
if args.start and args.end and args.end <= args.start:
|
||||
raise ConsoleError("End time must be greater than start time")
|
||||
|
||||
print_out("<dim>Looking up video...</dim>")
|
||||
video = twitch.get_video(video_id)
|
||||
print_log("Looking up video...")
|
||||
response = twitch.get_video(video_id)
|
||||
|
||||
if not video:
|
||||
if not response:
|
||||
raise ConsoleError(f"Video {video_id} not found")
|
||||
|
||||
title = video['title']
|
||||
user = video['creator']['displayName']
|
||||
print_out(f"Found: <blue>{title}</blue> by <yellow>{user}</yellow>")
|
||||
video = from_dict(Video, response)
|
||||
click.echo(f"Found: {blue(video.title)} by {yellow(video.creator.display_name)}")
|
||||
|
||||
target = _video_target_filename(video, args)
|
||||
print_out(f"Output: <blue>{target}</blue>")
|
||||
click.echo(f"Output: {blue(target)}")
|
||||
|
||||
if not args.overwrite and path.exists(target):
|
||||
response = input("File exists. Overwrite? [Y/n]: ")
|
||||
if response.lower().strip() not in ["", "y"]:
|
||||
raise ConsoleError("Aborted")
|
||||
response = click.prompt("File exists. Overwrite? [Y/n]: ", default="Y", show_default=False)
|
||||
if response.lower().strip() != "y":
|
||||
raise click.Abort()
|
||||
args.overwrite = True
|
||||
|
||||
# Chapter select or manual offset
|
||||
start, end = _determine_time_range(video_id, args)
|
||||
|
||||
print_out("<dim>Fetching access token...</dim>")
|
||||
print_log("Fetching access token...")
|
||||
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
|
||||
|
||||
print_out("<dim>Fetching playlists...</dim>")
|
||||
print_log("Fetching playlists...")
|
||||
playlists_m3u8 = twitch.get_playlists(video_id, access_token)
|
||||
playlists = list(_parse_playlists(playlists_m3u8))
|
||||
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality
|
||||
else _select_playlist_interactive(playlists))
|
||||
|
||||
print_out("<dim>Fetching playlist...</dim>")
|
||||
print_log("Fetching playlist...")
|
||||
response = httpx.get(playlist_uri)
|
||||
response.raise_for_status()
|
||||
playlist = m3u8.loads(response.text)
|
||||
@ -334,7 +332,7 @@ def _download_video(video_id, args: DownloadOptions) -> None:
|
||||
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
|
||||
f.write(response.text)
|
||||
|
||||
print_out(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
|
||||
click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
|
||||
sources = [base_uri + path for path in vod_paths]
|
||||
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
|
||||
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
|
||||
@ -353,27 +351,29 @@ def _download_video(video_id, args: DownloadOptions) -> None:
|
||||
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
|
||||
playlist.dump(playlist_path)
|
||||
|
||||
print_out("")
|
||||
click.echo()
|
||||
|
||||
if args.no_join:
|
||||
print_out("<dim>Skipping joining files...</dim>")
|
||||
print_out(f"VODs downloaded to:\n<blue>{target_dir}</blue>")
|
||||
print_log("Skipping joining files...")
|
||||
click.echo(f"VODs downloaded to:\n{blue(target_dir)}")
|
||||
return
|
||||
|
||||
if args.concat:
|
||||
print_out("<dim>Concating files...</dim>")
|
||||
print_log("Concating files...")
|
||||
_concat_vods(targets, target)
|
||||
else:
|
||||
print_out("<dim>Joining files...</dim>")
|
||||
print_log("Joining files...")
|
||||
_join_vods(playlist_path, target, args.overwrite, video)
|
||||
|
||||
click.echo()
|
||||
|
||||
if args.keep:
|
||||
print_out(f"\n<dim>Temporary files not deleted: {target_dir}</dim>")
|
||||
click.echo(f"Temporary files not deleted: {target_dir}")
|
||||
else:
|
||||
print_out("\n<dim>Deleting temporary files...</dim>")
|
||||
print_log("Deleting temporary files...")
|
||||
shutil.rmtree(target_dir)
|
||||
|
||||
print_out(f"\nDownloaded: <green>{target}</green>")
|
||||
click.echo(f"\nDownloaded: {green(target)}")
|
||||
|
||||
|
||||
def _determine_time_range(video_id, args: DownloadOptions):
|
||||
@ -381,7 +381,7 @@ def _determine_time_range(video_id, args: DownloadOptions):
|
||||
return args.start, args.end
|
||||
|
||||
if args.chapter is not None:
|
||||
print_out("<dim>Fetching chapters...</dim>")
|
||||
print_log("Fetching chapters...")
|
||||
chapters = twitch.get_video_chapters(video_id)
|
||||
|
||||
if not chapters:
|
||||
@ -395,7 +395,7 @@ def _determine_time_range(video_id, args: DownloadOptions):
|
||||
except IndexError:
|
||||
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
|
||||
|
||||
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n')
|
||||
click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
|
||||
start = chapter["positionMilliseconds"] // 1000
|
||||
duration = chapter["durationMilliseconds"] // 1000
|
||||
return start, start + duration
|
||||
@ -404,10 +404,10 @@ def _determine_time_range(video_id, args: DownloadOptions):
|
||||
|
||||
|
||||
def _choose_chapter_interactive(chapters):
|
||||
print_out("\nChapters:")
|
||||
click.echo("\nChapters:")
|
||||
for index, chapter in enumerate(chapters):
|
||||
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
|
||||
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>')
|
||||
click.echo(f'{index + 1}) {bold(chapter["description"])} ({duration})')
|
||||
index = utils.read_int("Select a chapter", 1, len(chapters))
|
||||
chapter = chapters[index - 1]
|
||||
return chapter
|
||||
|
@ -1,17 +1,21 @@
|
||||
import click
|
||||
import m3u8
|
||||
|
||||
from twitchdl import utils, twitch
|
||||
from twitchdl.commands.download import get_video_substitutions
|
||||
from twitchdl.commands.download import get_video_placeholders
|
||||
from twitchdl.conversion import from_dict
|
||||
from twitchdl.entities import Data, Video
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
|
||||
from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log
|
||||
|
||||
def info(id: str, *, json: bool = False, format="mkv"):
|
||||
|
||||
def info(id: str, *, json: bool = False):
|
||||
video_id = utils.parse_video_identifier(id)
|
||||
if video_id:
|
||||
print_log("Fetching video...")
|
||||
video = twitch.get_video(video_id)
|
||||
response = twitch.get_video(video_id)
|
||||
|
||||
if not video:
|
||||
if not response:
|
||||
raise ConsoleError(f"Video {video_id} not found")
|
||||
|
||||
print_log("Fetching access token...")
|
||||
@ -23,16 +27,11 @@ def info(id: str, *, json: bool = False, format="mkv"):
|
||||
print_log("Fetching chapters...")
|
||||
chapters = twitch.get_video_chapters(video_id)
|
||||
|
||||
substitutions = get_video_substitutions(video, format)
|
||||
|
||||
if json:
|
||||
video_json(video, playlists, chapters)
|
||||
video_json(response, playlists, chapters)
|
||||
else:
|
||||
video = from_dict(Video, response)
|
||||
video_info(video, playlists, chapters)
|
||||
|
||||
print_out("\nOutput format placeholders:")
|
||||
for k, v in substitutions.items():
|
||||
print(f" * {k} = {v}")
|
||||
return
|
||||
|
||||
clip_slug = utils.parse_clip_identifier(id)
|
||||
@ -51,22 +50,26 @@ def info(id: str, *, json: bool = False, format="mkv"):
|
||||
raise ConsoleError(f"Invalid input: {id}")
|
||||
|
||||
|
||||
def video_info(video, playlists, chapters):
|
||||
print_out()
|
||||
def video_info(video: Video, playlists, chapters):
|
||||
click.echo()
|
||||
print_video(video)
|
||||
|
||||
print_out()
|
||||
print_out("Playlists:")
|
||||
click.echo("Playlists:")
|
||||
for p in m3u8.loads(playlists).playlists:
|
||||
print_out(f"<b>{p.stream_info.video}</b> {p.uri}")
|
||||
click.echo(f"{bold(p.stream_info.video)} {p.uri}")
|
||||
|
||||
if chapters:
|
||||
print_out()
|
||||
print_out("Chapters:")
|
||||
click.echo()
|
||||
click.echo("Chapters:")
|
||||
for chapter in chapters:
|
||||
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
|
||||
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
|
||||
print_out(f'{start} <b>{chapter["description"]}</b> ({duration})')
|
||||
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
|
||||
|
||||
placeholders = get_video_placeholders(video, format = "mkv")
|
||||
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
|
||||
click.echo("")
|
||||
print_table(["Placeholder", "Value"], placeholders)
|
||||
|
||||
|
||||
def video_json(video, playlists, chapters):
|
||||
@ -87,11 +90,11 @@ def video_json(video, playlists, chapters):
|
||||
print_json(video)
|
||||
|
||||
|
||||
def clip_info(clip):
|
||||
print_out()
|
||||
def clip_info(clip: Data):
|
||||
click.echo()
|
||||
print_clip(clip)
|
||||
print_out()
|
||||
print_out("Download links:")
|
||||
click.echo()
|
||||
click.echo("Download links:")
|
||||
|
||||
for q in clip["videoQualities"]:
|
||||
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
|
||||
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")
|
||||
|
@ -1,8 +1,10 @@
|
||||
import sys
|
||||
|
||||
import click
|
||||
|
||||
from twitchdl import twitch
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
from twitchdl.output import print_out, print_paged_videos, print_video, print_json, print_video_compact
|
||||
from twitchdl.output import print_log, print_paged, print_video, print_json, print_video_compact
|
||||
|
||||
|
||||
def videos(
|
||||
@ -14,8 +16,8 @@ def videos(
|
||||
json: bool,
|
||||
limit: int | None,
|
||||
pager: int | None,
|
||||
sort: str,
|
||||
type: str,
|
||||
sort: twitch.VideosSort,
|
||||
type: twitch.VideosType,
|
||||
):
|
||||
game_ids = _get_game_ids(games)
|
||||
|
||||
@ -38,11 +40,12 @@ def videos(
|
||||
return
|
||||
|
||||
if total_count == 0:
|
||||
print_out("<yellow>No videos found</yellow>")
|
||||
click.echo("No videos found")
|
||||
return
|
||||
|
||||
if pager:
|
||||
print_paged_videos(generator, pager, total_count)
|
||||
print_fn = print_video_compact if compact else print_video
|
||||
print_paged("Videos", generator, print_fn, pager, total_count)
|
||||
return
|
||||
|
||||
count = 0
|
||||
@ -50,28 +53,28 @@ def videos(
|
||||
if compact:
|
||||
print_video_compact(video)
|
||||
else:
|
||||
print_out()
|
||||
click.echo()
|
||||
print_video(video)
|
||||
count += 1
|
||||
|
||||
print_out()
|
||||
print_out("-" * 80)
|
||||
print_out(f"<yellow>Videos 1-{count} of {total_count}</yellow>")
|
||||
click.echo()
|
||||
click.echo("-" * 80)
|
||||
click.echo(f"Videos 1-{count} of {total_count}")
|
||||
|
||||
if total_count > count:
|
||||
print_out()
|
||||
print_out(
|
||||
"<dim>There are more videos. Increase the --limit, use --all or --pager to see the rest.</dim>"
|
||||
click.secho(
|
||||
"\nThere are more videos. Increase the --limit, use --all or --pager to see the rest.",
|
||||
dim=True
|
||||
)
|
||||
|
||||
|
||||
def _get_game_ids(names):
|
||||
def _get_game_ids(names: list[str]) -> list[str]:
|
||||
if not names:
|
||||
return []
|
||||
|
||||
game_ids = []
|
||||
for name in names:
|
||||
print_out(f"<dim>Looking up game '{name}'...</dim>")
|
||||
print_log(f"Looking up game '{name}'...")
|
||||
game_id = twitch.get_game_id(name)
|
||||
if not game_id:
|
||||
raise ConsoleError(f"Game '{name}' not found")
|
||||
|
87
twitchdl/conversion.py
Normal file
87
twitchdl/conversion.py
Normal file
@ -0,0 +1,87 @@
|
||||
import re
|
||||
import dataclasses
|
||||
|
||||
from dataclasses import Field, is_dataclass
|
||||
from datetime import date, datetime
|
||||
from dateutil import parser
|
||||
from typing import Any, Generator, Type, TypeVar, Union, get_args, get_origin, Callable
|
||||
from typing import get_type_hints
|
||||
|
||||
# Generic data class instance
|
||||
T = TypeVar("T")
|
||||
|
||||
# Dict of data decoded from JSON
|
||||
Data = dict[str, Any]
|
||||
|
||||
|
||||
def snake_to_camel(name: str):
|
||||
def repl(match: re.Match[str]):
|
||||
return match.group(1).upper()
|
||||
|
||||
return re.sub(r"_([a-z])", repl, name)
|
||||
|
||||
|
||||
def from_dict(cls: Type[T], data: Data, key_fn: Callable[[str], str] = snake_to_camel) -> T:
|
||||
"""Convert a nested dict into an instance of `cls`."""
|
||||
|
||||
def _fields() -> Generator[tuple[str, Any], None, None]:
|
||||
hints = get_type_hints(cls)
|
||||
for field in dataclasses.fields(cls):
|
||||
field_type = _prune_optional(hints[field.name])
|
||||
|
||||
dict_field_name = key_fn(field.name)
|
||||
if (value := data.get(dict_field_name)) is not None:
|
||||
field_value = _convert(field_type, value)
|
||||
else:
|
||||
field_value = _get_default_value(field)
|
||||
|
||||
yield field.name, field_value
|
||||
|
||||
return cls(**dict(_fields()))
|
||||
|
||||
|
||||
def from_dict_list(cls: Type[T], data: list[Data]) -> list[T]:
|
||||
return [from_dict(cls, x) for x in data]
|
||||
|
||||
|
||||
def _get_default_value(field: Field[Any]):
|
||||
if field.default is not dataclasses.MISSING:
|
||||
return field.default
|
||||
|
||||
if field.default_factory is not dataclasses.MISSING:
|
||||
return field.default_factory()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _convert(field_type: Type[Any], value: Any) -> Any:
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if field_type in [str, int, bool, dict]:
|
||||
return value
|
||||
|
||||
if field_type == datetime:
|
||||
return parser.parse(value)
|
||||
|
||||
if field_type == date:
|
||||
return date.fromisoformat(value)
|
||||
|
||||
if get_origin(field_type) == list:
|
||||
(inner_type,) = get_args(field_type)
|
||||
return [_convert(inner_type, x) for x in value]
|
||||
|
||||
if is_dataclass(field_type):
|
||||
return from_dict(field_type, value)
|
||||
|
||||
raise ValueError(f"Not implemented for type '{field_type}'")
|
||||
|
||||
|
||||
def _prune_optional(field_type: Type[Any]):
|
||||
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
|
||||
if get_origin(field_type) == Union:
|
||||
args = get_args(field_type)
|
||||
if len(args) == 2 and args[1] == type(None): # noqa
|
||||
return args[0]
|
||||
|
||||
return field_type
|
@ -1,4 +1,5 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
|
||||
@ -23,3 +24,30 @@ class DownloadOptions:
|
||||
# Type for annotating decoded JSON
|
||||
# TODO: make data classes for common structs
|
||||
Data = dict[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class User:
|
||||
login: str
|
||||
display_name: str
|
||||
|
||||
@dataclass
|
||||
class Game:
|
||||
name: str
|
||||
|
||||
@dataclass
|
||||
class Video:
|
||||
id: str
|
||||
title: str
|
||||
description: str
|
||||
published_at: datetime
|
||||
broadcast_type: str
|
||||
length_seconds: int
|
||||
game: Game
|
||||
creator: User
|
||||
|
||||
|
||||
@dataclass
|
||||
class AccessToken:
|
||||
signature: str
|
||||
value: str
|
||||
|
@ -1,50 +1,13 @@
|
||||
import click
|
||||
import json
|
||||
import sys
|
||||
import re
|
||||
|
||||
from itertools import islice
|
||||
from twitchdl import utils
|
||||
from typing import Any, Match
|
||||
from typing import Any, Callable, Generator, TypeVar
|
||||
|
||||
from twitchdl.entities import Data, Video
|
||||
|
||||
START_CODES = {
|
||||
'b': '\033[1m',
|
||||
'dim': '\033[2m',
|
||||
'i': '\033[3m',
|
||||
'u': '\033[4m',
|
||||
'red': '\033[91m',
|
||||
'green': '\033[92m',
|
||||
'yellow': '\033[93m',
|
||||
'blue': '\033[94m',
|
||||
'magenta': '\033[95m',
|
||||
'cyan': '\033[96m',
|
||||
}
|
||||
|
||||
END_CODE = '\033[0m'
|
||||
|
||||
START_PATTERN = "<(" + "|".join(START_CODES.keys()) + ")>"
|
||||
END_PATTERN = "</(" + "|".join(START_CODES.keys()) + ")>"
|
||||
|
||||
USE_ANSI_COLOR = "--no-color" not in sys.argv
|
||||
|
||||
|
||||
def start_code(match: Match[str]) -> str:
|
||||
name = match.group(1)
|
||||
return START_CODES[name]
|
||||
|
||||
|
||||
def colorize(text: str) -> str:
|
||||
text = re.sub(START_PATTERN, start_code, text)
|
||||
text = re.sub(END_PATTERN, END_CODE, text)
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def strip_tags(text: str) -> str:
|
||||
text = re.sub(START_PATTERN, '', text)
|
||||
text = re.sub(END_PATTERN, '', text)
|
||||
|
||||
return text
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def truncate(string: str, length: int) -> str:
|
||||
@ -54,59 +17,40 @@ def truncate(string: str, length: int) -> str:
|
||||
return string
|
||||
|
||||
|
||||
def print_out(*args, **kwargs):
|
||||
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
|
||||
print(*args, **kwargs)
|
||||
|
||||
|
||||
def print_json(data: Any):
|
||||
print(json.dumps(data))
|
||||
click.echo(json.dumps(data))
|
||||
|
||||
|
||||
def print_err(*args, **kwargs):
|
||||
args = [f"<red>{arg}</red>" for arg in args]
|
||||
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
def print_log(message: Any):
|
||||
click.secho(message, err=True, dim=True)
|
||||
|
||||
|
||||
def print_log(*args, **kwargs):
|
||||
args = [f"<dim>{a}</dim>" for a in args]
|
||||
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
def print_table(headers: list[str], data: list[list[str]]):
|
||||
widths = [[len(cell) for cell in row] for row in data + [headers]]
|
||||
widths = [max(width) for width in zip(*widths)]
|
||||
underlines = ["-" * width for width in widths]
|
||||
|
||||
def print_row(row: list[str]):
|
||||
for idx, cell in enumerate(row):
|
||||
width = widths[idx]
|
||||
click.echo(cell.ljust(width), nl=False)
|
||||
click.echo(" ", nl=False)
|
||||
click.echo()
|
||||
|
||||
print_row(headers)
|
||||
print_row(underlines)
|
||||
|
||||
for row in data:
|
||||
print_row(row)
|
||||
|
||||
|
||||
def print_video(video):
|
||||
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
|
||||
length = utils.format_duration(video["lengthSeconds"])
|
||||
|
||||
channel = f"<blue>{video['creator']['displayName']}</blue>" if video["creator"] else ""
|
||||
playing = f"playing <blue>{video['game']['name']}</blue>" if video["game"] else ""
|
||||
|
||||
# Can't find URL in video object, strange
|
||||
url = f"https://www.twitch.tv/videos/{video['id']}"
|
||||
|
||||
print_out(f"<b>Video {video['id']}</b>")
|
||||
print_out(f"<green>{video['title']}</green>")
|
||||
|
||||
if channel or playing:
|
||||
print_out(" ".join([channel, playing]))
|
||||
|
||||
if video["description"]:
|
||||
print_out(f"Description: {video['description']}")
|
||||
|
||||
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue> ")
|
||||
print_out(f"<i>{url}</i>")
|
||||
|
||||
|
||||
def print_video_compact(video):
|
||||
id = video["id"]
|
||||
date = video["publishedAt"][:10]
|
||||
game = video["game"]["name"] if video["game"] else ""
|
||||
title = truncate(video["title"], 80).ljust(80)
|
||||
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
|
||||
|
||||
|
||||
def print_paged_videos(generator, page_size, total_count):
|
||||
def print_paged(
|
||||
label: str,
|
||||
generator: Generator[T, Any, Any],
|
||||
print_fn: Callable[[T], None],
|
||||
page_size: int,
|
||||
total_count: int | None = None,
|
||||
):
|
||||
iterator = iter(generator)
|
||||
page = list(islice(iterator, page_size))
|
||||
|
||||
@ -114,48 +58,79 @@ def print_paged_videos(generator, page_size, total_count):
|
||||
last = first + len(page) - 1
|
||||
|
||||
while True:
|
||||
print_out("-" * 80)
|
||||
click.echo("-" * 80)
|
||||
|
||||
print_out()
|
||||
for video in page:
|
||||
print_video(video)
|
||||
print_out()
|
||||
click.echo()
|
||||
for item in page:
|
||||
print_fn(item)
|
||||
|
||||
last = first + len(page) - 1
|
||||
|
||||
print_out("-" * 80)
|
||||
print_out(f"<yellow>Videos {first}-{last} of {total_count}</yellow>")
|
||||
click.echo("-" * 80)
|
||||
click.echo(f"{label} {first}-{last} of {total_count or '???'}")
|
||||
|
||||
first = first + len(page)
|
||||
last = first + 1
|
||||
|
||||
page = list(islice(iterator, page_size))
|
||||
if not page or not _continue():
|
||||
if not page or not prompt_continue():
|
||||
break
|
||||
|
||||
|
||||
def print_clip(clip):
|
||||
|
||||
def print_video(video: Video):
|
||||
published_at = str(video.published_at.astimezone())
|
||||
length = utils.format_duration(video.length_seconds)
|
||||
|
||||
channel = blue(video.creator.display_name) if video.creator else ""
|
||||
playing = f"playing {blue(video.game.name)}" if video.game else ""
|
||||
|
||||
# Can't find URL in video object, strange
|
||||
url = f"https://www.twitch.tv/videos/{video.id}"
|
||||
|
||||
click.secho(f"Video {video.id}", bold=True)
|
||||
click.secho(f"{video.title}", fg="green")
|
||||
|
||||
if channel or playing:
|
||||
click.echo(" ".join([channel, playing]))
|
||||
|
||||
if video.description:
|
||||
click.echo(f"Description: {video.description}")
|
||||
|
||||
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
|
||||
click.secho(url, italic=True)
|
||||
click.echo()
|
||||
|
||||
|
||||
def print_video_compact(video: Data):
|
||||
id = video["id"]
|
||||
date = video["publishedAt"][:10]
|
||||
game = video["game"]["name"] if video["game"] else ""
|
||||
title = truncate(video["title"], 80).ljust(80)
|
||||
click.echo(f"{bold(id)} {date} {green(title)} {blue(game)}")
|
||||
|
||||
|
||||
def print_clip(clip: Data):
|
||||
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
|
||||
length = utils.format_duration(clip["durationSeconds"])
|
||||
channel = clip["broadcaster"]["displayName"]
|
||||
playing = (
|
||||
f"playing <blue>{clip['game']['name']}</blue>"
|
||||
if clip["game"] else ""
|
||||
playing = f"playing {blue(clip['game']['name'])}" if clip["game"] else ""
|
||||
|
||||
click.echo(f"Clip {bold(clip['slug'])}")
|
||||
click.secho(clip["title"], fg="green")
|
||||
click.echo(f"{blue(channel)} {playing}")
|
||||
click.echo(
|
||||
f"Published {blue(published_at)}" +
|
||||
f" Length: {blue(length)}" +
|
||||
f" Views: {blue(clip['viewCount'])}"
|
||||
)
|
||||
|
||||
print_out(f"Clip <b>{clip['slug']}</b>")
|
||||
print_out(f"<green>{clip['title']}</green>")
|
||||
print_out(f"<blue>{channel}</blue> {playing}")
|
||||
print_out(
|
||||
f"Published <blue>{published_at}</blue>" +
|
||||
f" Length: <blue>{length}</blue>" +
|
||||
f" Views: <blue>{clip["viewCount"]}</blue>"
|
||||
)
|
||||
print_out(f"<i>{clip['url']}</i>")
|
||||
click.secho(clip["url"], italic=True)
|
||||
|
||||
|
||||
def _continue():
|
||||
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
|
||||
def prompt_continue():
|
||||
enter = click.style("Enter", bold=True, fg="green")
|
||||
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
|
||||
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
|
||||
|
||||
try:
|
||||
input()
|
||||
@ -163,3 +138,29 @@ def _continue():
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Shorthand functions for coloring output
|
||||
|
||||
def blue(text: Any) -> str:
|
||||
return click.style(text, fg="blue")
|
||||
|
||||
|
||||
def cyan(text: Any) -> str:
|
||||
return click.style(text, fg="cyan")
|
||||
|
||||
|
||||
def green(text: Any) -> str:
|
||||
return click.style(text, fg="green")
|
||||
|
||||
|
||||
def yellow(text: Any) -> str:
|
||||
return click.style(text, fg="yellow")
|
||||
|
||||
|
||||
def bold(text: Any) -> str:
|
||||
return click.style(text, bold=True)
|
||||
|
||||
|
||||
def dim(text: Any) -> str:
|
||||
return click.style(text, dim=True)
|
||||
|
@ -1,3 +1,4 @@
|
||||
import click
|
||||
import logging
|
||||
import time
|
||||
|
||||
@ -6,7 +7,7 @@ from dataclasses import dataclass, field
|
||||
from statistics import mean
|
||||
from typing import Dict, NamedTuple, Optional, Deque
|
||||
|
||||
from twitchdl.output import print_out
|
||||
from twitchdl.output import blue
|
||||
from twitchdl.utils import format_size, format_time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -127,11 +128,11 @@ class Progress:
|
||||
|
||||
progress = " ".join([
|
||||
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs",
|
||||
f"<blue>{self.progress_perc}%</blue>",
|
||||
f"of <blue>~{format_size(self.estimated_total)}</blue>" if self.estimated_total else "",
|
||||
f"at <blue>{format_size(self.speed)}/s</blue>" if self.speed else "",
|
||||
f"ETA <blue>{format_time(self.remaining_time)}</blue>" if self.remaining_time is not None else "",
|
||||
blue(self.progress_perc),
|
||||
f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "",
|
||||
f"at {blue(format_size(self.speed))}/s" if self.speed else "",
|
||||
f"ETA {blue(format_time(self.remaining_time))}" if self.remaining_time is not None else "",
|
||||
])
|
||||
|
||||
print_out(f"\r{progress} ", end="")
|
||||
click.echo(f"\r{progress} ", nl=False)
|
||||
self.last_printed = now
|
||||
|
@ -6,11 +6,17 @@ import httpx
|
||||
import json
|
||||
import click
|
||||
|
||||
from typing import Dict
|
||||
from typing import Dict, Generator, Literal
|
||||
from twitchdl import CLIENT_ID
|
||||
from twitchdl.entities import AccessToken, Data
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
|
||||
|
||||
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
|
||||
VideosSort = Literal["views", "time"]
|
||||
VideosType = Literal["archive", "highlight", "upload"]
|
||||
|
||||
|
||||
class GQLError(click.ClickException):
|
||||
def __init__(self, errors: list[str]):
|
||||
message = "GraphQL query failed."
|
||||
@ -140,7 +146,7 @@ def get_clip_access_token(slug: str):
|
||||
return response["data"]["clip"]
|
||||
|
||||
|
||||
def get_channel_clips(channel_id: str, period: str, limit: int, after: str | None= None):
|
||||
def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None= None):
|
||||
"""
|
||||
List channel clips.
|
||||
|
||||
@ -177,8 +183,8 @@ def get_channel_clips(channel_id: str, period: str, limit: int, after: str | Non
|
||||
return response["data"]["user"]["clips"]
|
||||
|
||||
|
||||
def channel_clips_generator(channel_id, period, limit):
|
||||
def _generator(clips, limit):
|
||||
def channel_clips_generator(channel_id: str, period: str, limit: int) -> Generator[Data, None, None]:
|
||||
def _generator(clips: Data, limit: int) -> Generator[Data, None, None]:
|
||||
for clip in clips["edges"]:
|
||||
if limit < 1:
|
||||
return
|
||||
@ -262,8 +268,16 @@ def get_channel_videos(
|
||||
return response["data"]["user"]["videos"]
|
||||
|
||||
|
||||
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
|
||||
def _generator(videos, max_videos):
|
||||
def channel_videos_generator(
|
||||
channel_id: str,
|
||||
max_videos: int,
|
||||
sort: VideosSort,
|
||||
type: VideosType,
|
||||
game_ids: list[str] | None = None
|
||||
) -> tuple[int, Generator[Data, None, None]]:
|
||||
game_ids = game_ids or []
|
||||
|
||||
def _generator(videos: Data, max_videos: int) -> Generator[Data, None, None]:
|
||||
for video in videos["edges"]:
|
||||
if max_videos < 1:
|
||||
return
|
||||
@ -284,7 +298,7 @@ def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
|
||||
return videos["totalCount"], _generator(videos, max_videos)
|
||||
|
||||
|
||||
def get_access_token(video_id, auth_token=None):
|
||||
def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToken:
|
||||
query = f"""
|
||||
{{
|
||||
videoPlaybackAccessToken(
|
||||
@ -307,7 +321,11 @@ def get_access_token(video_id, auth_token=None):
|
||||
|
||||
try:
|
||||
response = gql_query(query, headers=headers)
|
||||
return response["data"]["videoPlaybackAccessToken"]
|
||||
return AccessToken(
|
||||
response["data"]["videoPlaybackAccessToken"]["signature"],
|
||||
response["data"]["videoPlaybackAccessToken"]["value"],
|
||||
)
|
||||
|
||||
except httpx.HTTPStatusError as error:
|
||||
# Provide a more useful error message when server returns HTTP 401
|
||||
# Unauthorized while using a user-provided auth token.
|
||||
@ -323,15 +341,15 @@ def get_access_token(video_id, auth_token=None):
|
||||
raise
|
||||
|
||||
|
||||
def get_playlists(video_id, access_token):
|
||||
def get_playlists(video_id: str, access_token: AccessToken):
|
||||
"""
|
||||
For a given video return a playlist which contains possible video qualities.
|
||||
"""
|
||||
url = f"https://usher.ttvnw.net/vod/{video_id}"
|
||||
|
||||
response = httpx.get(url, params={
|
||||
"nauth": access_token['value'],
|
||||
"nauthsig": access_token['signature'],
|
||||
"nauth": access_token.value,
|
||||
"nauthsig": access_token.signature,
|
||||
"allow_audio_only": "true",
|
||||
"allow_source": "true",
|
||||
"player": "twitchweb",
|
||||
|
@ -1,6 +1,8 @@
|
||||
import re
|
||||
import unicodedata
|
||||
|
||||
import click
|
||||
|
||||
|
||||
def _format_size(value: float, digits: int, unit: str):
|
||||
if digits > 0:
|
||||
@ -9,7 +11,7 @@ def _format_size(value: float, digits: int, unit: str):
|
||||
return f"{int(value)}{unit}"
|
||||
|
||||
|
||||
def format_size(bytes_: int, digits: int = 1):
|
||||
def format_size(bytes_: int | float, digits: int = 1):
|
||||
if bytes_ < 1024:
|
||||
return _format_size(bytes_, digits, "B")
|
||||
|
||||
@ -54,14 +56,9 @@ def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
|
||||
|
||||
|
||||
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
|
||||
if default:
|
||||
msg = msg + f" [default {default}]"
|
||||
|
||||
msg += ": "
|
||||
|
||||
while True:
|
||||
try:
|
||||
val = input(msg)
|
||||
val = click.prompt(msg, default=default, type=int)
|
||||
if default and not val:
|
||||
return default
|
||||
if min <= int(val) <= max:
|
||||
|
Reference in New Issue
Block a user