Apply ruff formatting

This commit is contained in:
Ivan Habunek 2024-04-04 08:20:10 +02:00
parent be69e79b28
commit 28f1977d1c
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
11 changed files with 152 additions and 106 deletions

View File

@ -56,3 +56,4 @@ typeCheckingMode = "strict"
[tool.ruff] [tool.ruff]
line-length = 100 line-length = 100
target-version = "py38"

View File

@ -1,10 +1,10 @@
import click
import re import re
import sys import sys
from os import path from os import path
from typing import Generator from typing import Generator
import click
from twitchdl import twitch, utils from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file from twitchdl.download import download_file
@ -62,12 +62,14 @@ def _target_filename(clip: Clip):
raise ValueError(f"Failed parsing date from: {clip['createdAt']}") raise ValueError(f"Failed parsing date from: {clip['createdAt']}")
date = "".join(match.groups()) date = "".join(match.groups())
name = "_".join([ name = "_".join(
date, [
clip["id"], date,
clip["broadcaster"]["login"], clip["id"],
utils.slugify(clip["title"]), clip["broadcaster"]["login"],
]) utils.slugify(clip["title"]),
]
)
return f"{name}.{ext}" return f"{name}.{ext}"
@ -91,7 +93,7 @@ def _print_all(generator: Generator[Clip, None, None], all: bool):
if not all: if not all:
click.secho( click.secho(
"\nThere may be more clips. " + "\nThere may be more clips. "
"Increase the --limit, use --all or --pager to see the rest.", + "Increase the --limit, use --all or --pager to see the rest.",
dim=True dim=True,
) )

View File

@ -1,18 +1,18 @@
import asyncio import asyncio
import platform
import click
import httpx
import m3u8
import os import os
import platform
import re import re
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
from os import path from os import path
from pathlib import Path from pathlib import Path
from typing import List, Optional, OrderedDict from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode from urllib.parse import urlencode, urlparse
import click
import httpx
import m3u8
from twitchdl import twitch, utils from twitchdl import twitch, utils
from twitchdl.download import download_file from twitchdl.download import download_file
@ -87,25 +87,33 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
command = [ command = [
"ffmpeg", "ffmpeg",
"-i", playlist_path, "-i",
"-c", "copy", playlist_path,
"-metadata", f"artist={video['creator']['displayName']}", "-c",
"-metadata", f"title={video['title']}", "copy",
"-metadata", f"description={description}", "-metadata",
"-metadata", "encoded_by=twitch-dl", f"artist={video['creator']['displayName']}",
"-metadata",
f"title={video['title']}",
"-metadata",
f"description={description}",
"-metadata",
"encoded_by=twitch-dl",
"-stats", "-stats",
"-loglevel", "warning", "-loglevel",
"warning",
f"file:{target}", f"file:{target}",
] ]
if overwrite: if overwrite:
command.append("-y") command.append("-y")
click.secho(f"{' '.join(command)}", dim = True) click.secho(f"{' '.join(command)}", dim=True)
result = subprocess.run(command) result = subprocess.run(command)
if result.returncode != 0: if result.returncode != 0:
raise ConsoleError("Joining files failed") raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: list[str], target: str): def _concat_vods(vod_paths: list[str], target: str):
tool = "type" if platform.system() == "Windows" else "cat" tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + vod_paths command = [tool] + vod_paths
@ -117,7 +125,7 @@ def _concat_vods(vod_paths: list[str], target: str):
def get_video_placeholders(video: Video, format: str) -> dict[str, str]: def get_video_placeholders(video: Video, format: str) -> dict[str, str]:
date, time = video['publishedAt'].split("T") date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown" game = video["game"]["name"] if video["game"] else "Unknown"
return { return {
@ -145,7 +153,7 @@ def _video_target_filename(video: Video, args: DownloadOptions):
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}") raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip, args: DownloadOptions): def _clip_target_filename(clip: Clip, args: DownloadOptions):
date, time = clip["createdAt"].split("T") date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown" game = clip["game"]["name"] if clip["game"] else "Unknown"
@ -240,10 +248,12 @@ def get_clip_authenticated_url(slug: str, quality: str):
url = _get_clip_url(access_token, quality) url = _get_clip_url(access_token, quality)
query = urlencode({ query = urlencode(
"sig": access_token["signature"], {
"token": access_token["value"], "sig": access_token["signature"],
}) "token": access_token["value"],
}
)
return f"{url}?{query}" return f"{url}?{query}"
@ -313,8 +323,11 @@ def _download_video(video_id, args: DownloadOptions) -> None:
print_log("Fetching playlists...") print_log("Fetching playlists...")
playlists_m3u8 = twitch.get_playlists(video_id, access_token) playlists_m3u8 = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8)) playlists = list(_parse_playlists(playlists_m3u8))
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality playlist_uri = (
else _select_playlist_interactive(playlists)) _get_playlist_by_name(playlists, args.quality)
if args.quality
else _select_playlist_interactive(playlists)
)
print_log("Fetching playlist...") print_log("Fetching playlist...")
response = httpx.get(playlist_uri) response = httpx.get(playlist_uri)
@ -331,7 +344,9 @@ def _download_video(video_id, args: DownloadOptions) -> None:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f: with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text) f.write(response.text)
click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}") click.echo(
f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}"
)
sources = [base_uri + path for path in vod_paths] sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)] targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit)) asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
@ -392,7 +407,9 @@ def _determine_time_range(video_id: str, args: DownloadOptions):
try: try:
chapter = chapters[args.chapter - 1] chapter = chapters[args.chapter - 1]
except IndexError: except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.") raise ConsoleError(
f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters."
)
click.echo(f'Chapter selected: {blue(chapter["description"])}\n') click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000 start = chapter["positionMilliseconds"] // 1000

View File

@ -1,11 +1,11 @@
import click import click
import m3u8 import m3u8
from twitchdl import utils, twitch from twitchdl import twitch, utils
from twitchdl.commands.download import get_video_placeholders from twitchdl.commands.download import get_video_placeholders
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video
from twitchdl.twitch import Clip, Video from twitchdl.twitch import Chapter, Clip, Video
def info(id: str, *, json: bool = False): def info(id: str, *, json: bool = False):
@ -48,7 +48,7 @@ def info(id: str, *, json: bool = False):
raise ConsoleError(f"Invalid input: {id}") raise ConsoleError(f"Invalid input: {id}")
def video_info(video: Video, playlists, chapters): def video_info(video: Video, playlists, chapters: list[Chapter]):
click.echo() click.echo()
print_video(video) print_video(video)
@ -64,7 +64,7 @@ def video_info(video: Video, playlists, chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000) duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
click.echo(f'{start} {bold(chapter["description"])} ({duration})') click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = get_video_placeholders(video, format = "mkv") placeholders = get_video_placeholders(video, format="mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()] placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("") click.echo("")
print_table(["Placeholder", "Value"], placeholders) print_table(["Placeholder", "Value"], placeholders)
@ -79,8 +79,9 @@ def video_json(video, playlists, chapters):
"resolution": p.stream_info.resolution, "resolution": p.stream_info.resolution,
"codecs": p.stream_info.codecs, "codecs": p.stream_info.codecs,
"video": p.stream_info.video, "video": p.stream_info.video,
"uri": p.uri "uri": p.uri,
} for p in playlists }
for p in playlists
] ]
video["chapters"] = chapters video["chapters"] = chapters

View File

@ -4,7 +4,7 @@ import click
from twitchdl import twitch from twitchdl import twitch
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_log, print_paged, print_video, print_json, print_video_compact from twitchdl.output import print_json, print_log, print_paged, print_video, print_video_compact
def videos( def videos(
@ -28,15 +28,12 @@ def videos(
max_videos = sys.maxsize if all or pager else limit max_videos = sys.maxsize if all or pager else limit
total_count, generator = twitch.channel_videos_generator( total_count, generator = twitch.channel_videos_generator(
channel_name, max_videos, sort, type, game_ids=game_ids) channel_name, max_videos, sort, type, game_ids=game_ids
)
if json: if json:
videos = list(generator) videos = list(generator)
print_json({ print_json({"count": len(videos), "totalCount": total_count, "videos": videos})
"count": len(videos),
"totalCount": total_count,
"videos": videos
})
return return
if total_count == 0: if total_count == 0:
@ -63,8 +60,9 @@ def videos(
if total_count > count: if total_count > count:
click.secho( click.secho(
"\nThere are more videos. Increase the --limit, use --all or --pager to see the rest.", "\nThere are more videos. "
dim=True + "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
) )

View File

@ -1,5 +1,7 @@
import click import click
class ConsoleError(click.ClickException): class ConsoleError(click.ClickException):
"""Raised when an error occurs and script exectuion should halt.""" """Raised when an error occurs and script exectuion should halt."""
pass pass

View File

@ -1,12 +1,12 @@
import asyncio import asyncio
import httpx
import logging import logging
import os import os
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Optional from typing import List, Optional
import httpx
from twitchdl.progress import Progress from twitchdl.progress import Progress
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,6 +62,7 @@ class LimitingTokenBucket(TokenBucket):
class EndlessTokenBucket(TokenBucket): class EndlessTokenBucket(TokenBucket):
"""Used when download speed is not limited.""" """Used when download speed is not limited."""
def advance(self, size: int): def advance(self, size: int):
pass pass
@ -122,12 +123,22 @@ async def download_all(
targets: List[str], targets: List[str],
workers: int, workers: int,
*, *,
rate_limit: Optional[int] = None rate_limit: Optional[int] = None,
): ):
progress = Progress(len(sources)) progress = Progress(len(sources))
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket() token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client: async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers) semaphore = asyncio.Semaphore(workers)
tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket) tasks = [
for task_id, (source, target) in enumerate(zip(sources, targets))] download_with_retries(
client,
semaphore,
task_id,
source,
target,
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(zip(sources, targets))
]
await asyncio.gather(*tasks) await asyncio.gather(*tasks)

View File

@ -12,7 +12,7 @@ T = TypeVar("T")
def truncate(string: str, length: int) -> str: def truncate(string: str, length: int) -> str:
if len(string) > length: if len(string) > length:
return string[:length - 1] + "" return string[: length - 1] + ""
return string return string
@ -77,12 +77,11 @@ def print_paged(
break break
def print_video(video: Video): def print_video(video: Video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "") published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"]) length = utils.format_duration(video["lengthSeconds"])
channel = blue(video['creator']['displayName']) if video["creator"] else "" channel = blue(video["creator"]["displayName"]) if video["creator"] else ""
playing = f"playing {blue(video['game']['name'])}" if video["game"] else "" playing = f"playing {blue(video['game']['name'])}" if video["game"] else ""
# Can't find URL in video object, strange # Can't find URL in video object, strange
@ -120,9 +119,9 @@ def print_clip(clip: Clip):
click.secho(clip["title"], fg="green") click.secho(clip["title"], fg="green")
click.echo(f"{blue(channel)} {playing}") click.echo(f"{blue(channel)} {playing}")
click.echo( click.echo(
f"Published {blue(published_at)}" + f"Published {blue(published_at)}"
f" Length: {blue(length)}" + + f" Length: {blue(length)}"
f" Views: {blue(clip['viewCount'])}" + f" Views: {blue(clip['viewCount'])}"
) )
click.secho(clip["url"], italic=True) click.secho(clip["url"], italic=True)
@ -142,6 +141,7 @@ def prompt_continue():
# Shorthand functions for coloring output # Shorthand functions for coloring output
def blue(text: Any) -> str: def blue(text: Any) -> str:
return click.style(text, fg="blue") return click.style(text, fg="blue")

View File

@ -1,11 +1,11 @@
import click
import logging import logging
import time import time
from collections import deque from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from statistics import mean from statistics import mean
from typing import Dict, NamedTuple, Optional, Deque from typing import Deque, Dict, NamedTuple, Optional
import click
from twitchdl.output import blue from twitchdl.output import blue
from twitchdl.utils import format_size, format_time from twitchdl.utils import format_size, format_time
@ -94,18 +94,28 @@ class Progress:
task = self.tasks[task_id] task = self.tasks[task_id]
if task.size != task.downloaded: if task.size != task.downloaded:
logger.warn(f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b.") logger.warn(
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
self.vod_downloaded_count += 1 self.vod_downloaded_count += 1
self.print() self.print()
def _calculate_total(self): def _calculate_total(self):
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None self.estimated_total = (
int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
)
def _calculate_progress(self): def _calculate_progress(self):
self.speed = self._calculate_speed() self.speed = self._calculate_speed()
self.progress_perc = int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0 self.progress_perc = (
self.remaining_time = int((self.estimated_total - self.progress_bytes) / self.speed) if self.estimated_total and self.speed else None int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
)
self.remaining_time = (
int((self.estimated_total - self.progress_bytes) / self.speed)
if self.estimated_total and self.speed
else None
)
def _calculate_speed(self): def _calculate_speed(self):
if len(self.samples) < 2: if len(self.samples) < 2:
@ -126,13 +136,17 @@ class Progress:
if now - self.last_printed < 0.1: if now - self.last_printed < 0.1:
return return
progress = " ".join([ progress = " ".join(
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs", [
blue(self.progress_perc), f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs",
f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "", f"{blue(self.progress_perc)}%",
f"at {blue(format_size(self.speed))}/s" if self.speed else "", f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "",
f"ETA {blue(format_time(self.remaining_time))}" if self.remaining_time is not None else "", f"at {blue(format_size(self.speed))}/s" if self.speed else "",
]) f"ETA {blue(format_time(self.remaining_time))}"
if self.remaining_time is not None
else "",
]
)
click.echo(f"\r{progress} ", nl=False) click.echo(f"\r{progress} ", nl=False)
self.last_printed = now self.last_printed = now

View File

@ -2,16 +2,16 @@
Twitch API access. Twitch API access.
""" """
import httpx
import json import json
import click
from typing import Dict, Generator, Literal, TypedDict from typing import Dict, Generator, Literal, TypedDict
import click
import httpx
from twitchdl import CLIENT_ID from twitchdl import CLIENT_ID
from twitchdl.entities import Data from twitchdl.entities import Data
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"] ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"] VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"] VideosType = Literal["archive", "highlight", "upload"]
@ -203,7 +203,7 @@ def get_clip_access_token(slug: str) -> AccessToken:
return response["data"]["clip"]["playbackAccessToken"] return response["data"]["clip"]["playbackAccessToken"]
def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None= None): def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None = None):
""" """
List channel clips. List channel clips.
@ -243,7 +243,7 @@ def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: s
def channel_clips_generator( def channel_clips_generator(
channel_id: str, channel_id: str,
period: ClipsPeriod, period: ClipsPeriod,
limit: int limit: int,
) -> Generator[Clip, None, None]: ) -> Generator[Clip, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Clip, None, None]: def _generator(clips: Data, limit: int) -> Generator[Clip, None, None]:
for clip in clips["edges"]: for clip in clips["edges"]:
@ -289,7 +289,7 @@ def get_channel_videos(
sort: str, sort: str,
type: str = "archive", type: str = "archive",
game_ids: list[str] | None = None, game_ids: list[str] | None = None,
after: str | None = None after: str | None = None,
): ):
game_ids = game_ids or [] game_ids = game_ids or []
@ -333,7 +333,7 @@ def channel_videos_generator(
max_videos: int, max_videos: int,
sort: VideosSort, sort: VideosSort,
type: VideosType, type: VideosType,
game_ids: list[str] | None = None game_ids: list[str] | None = None,
) -> tuple[int, Generator[Video, None, None]]: ) -> tuple[int, Generator[Video, None, None]]:
game_ids = game_ids or [] game_ids = game_ids or []
@ -403,13 +403,16 @@ def get_playlists(video_id: str, access_token: AccessToken):
""" """
url = f"https://usher.ttvnw.net/vod/{video_id}" url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(url, params={ response = httpx.get(
"nauth": access_token["value"], url,
"nauthsig": access_token["signature"], params={
"allow_audio_only": "true", "nauth": access_token["value"],
"allow_source": "true", "nauthsig": access_token["signature"],
"player": "twitchweb", "allow_audio_only": "true",
}) "allow_source": "true",
"player": "twitchweb",
},
)
response.raise_for_status() response.raise_for_status()
return response.content.decode("utf-8") return response.content.decode("utf-8")
@ -432,19 +435,16 @@ def get_game_id(name: str):
def get_video_chapters(video_id: str) -> list[Chapter]: def get_video_chapters(video_id: str) -> list[Chapter]:
query = { query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo", "operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables": "variables": {
{
"includePrivate": False, "includePrivate": False,
"videoID": video_id "videoID": video_id,
}, },
"extensions": "extensions": {
{ "persistedQuery": {
"persistedQuery":
{
"version": 1, "version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41" "sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41",
} }
} },
} }
response = gql_post(json.dumps(query)) response = gql_post(json.dumps(query))

View File

@ -68,16 +68,16 @@ def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
def slugify(value: str) -> str: def slugify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value)) value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r'[^\w\s_-]', '', value) value = re.sub(r"[^\w\s_-]", "", value)
value = re.sub(r'[\s_-]+', '_', value) value = re.sub(r"[\s_-]+", "_", value)
return value.strip("_").lower() return value.strip("_").lower()
def titlify(value: str) -> str: def titlify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value)) value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value) value = re.sub(r"[^\w\s\[\]().-]", "", value)
value = re.sub(r'\s+', ' ', value) value = re.sub(r"\s+", " ", value)
return value.strip() return value.strip()