From 28f1977d1c43d7263c498d728754010f2b26a4bf Mon Sep 17 00:00:00 2001 From: Ivan Habunek Date: Thu, 4 Apr 2024 08:20:10 +0200 Subject: [PATCH] Apply ruff formatting --- pyproject.toml | 1 + twitchdl/commands/clips.py | 24 +++++++------ twitchdl/commands/download.py | 65 ++++++++++++++++++++++------------- twitchdl/commands/info.py | 15 ++++---- twitchdl/commands/videos.py | 16 ++++----- twitchdl/exceptions.py | 2 ++ twitchdl/http.py | 21 ++++++++--- twitchdl/output.py | 12 +++---- twitchdl/progress.py | 42 ++++++++++++++-------- twitchdl/twitch.py | 48 +++++++++++++------------- twitchdl/utils.py | 12 +++---- 11 files changed, 152 insertions(+), 106 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94baf06..691e40b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,3 +56,4 @@ typeCheckingMode = "strict" [tool.ruff] line-length = 100 +target-version = "py38" diff --git a/twitchdl/commands/clips.py b/twitchdl/commands/clips.py index 2af105c..0f9d420 100644 --- a/twitchdl/commands/clips.py +++ b/twitchdl/commands/clips.py @@ -1,10 +1,10 @@ -import click import re import sys - from os import path from typing import Generator +import click + from twitchdl import twitch, utils from twitchdl.commands.download import get_clip_authenticated_url from twitchdl.download import download_file @@ -62,12 +62,14 @@ def _target_filename(clip: Clip): raise ValueError(f"Failed parsing date from: {clip['createdAt']}") date = "".join(match.groups()) - name = "_".join([ - date, - clip["id"], - clip["broadcaster"]["login"], - utils.slugify(clip["title"]), - ]) + name = "_".join( + [ + date, + clip["id"], + clip["broadcaster"]["login"], + utils.slugify(clip["title"]), + ] + ) return f"{name}.{ext}" @@ -91,7 +93,7 @@ def _print_all(generator: Generator[Clip, None, None], all: bool): if not all: click.secho( - "\nThere may be more clips. " + - "Increase the --limit, use --all or --pager to see the rest.", - dim=True + "\nThere may be more clips. " + + "Increase the --limit, use --all or --pager to see the rest.", + dim=True, ) diff --git a/twitchdl/commands/download.py b/twitchdl/commands/download.py index b61152c..24b3ed3 100644 --- a/twitchdl/commands/download.py +++ b/twitchdl/commands/download.py @@ -1,18 +1,18 @@ import asyncio -import platform -import click -import httpx -import m3u8 import os +import platform import re import shutil import subprocess import tempfile - from os import path from pathlib import Path from typing import List, Optional, OrderedDict -from urllib.parse import urlparse, urlencode +from urllib.parse import urlencode, urlparse + +import click +import httpx +import m3u8 from twitchdl import twitch, utils from twitchdl.download import download_file @@ -87,25 +87,33 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video): command = [ "ffmpeg", - "-i", playlist_path, - "-c", "copy", - "-metadata", f"artist={video['creator']['displayName']}", - "-metadata", f"title={video['title']}", - "-metadata", f"description={description}", - "-metadata", "encoded_by=twitch-dl", + "-i", + playlist_path, + "-c", + "copy", + "-metadata", + f"artist={video['creator']['displayName']}", + "-metadata", + f"title={video['title']}", + "-metadata", + f"description={description}", + "-metadata", + "encoded_by=twitch-dl", "-stats", - "-loglevel", "warning", + "-loglevel", + "warning", f"file:{target}", ] if overwrite: command.append("-y") - click.secho(f"{' '.join(command)}", dim = True) + click.secho(f"{' '.join(command)}", dim=True) result = subprocess.run(command) if result.returncode != 0: raise ConsoleError("Joining files failed") + def _concat_vods(vod_paths: list[str], target: str): tool = "type" if platform.system() == "Windows" else "cat" command = [tool] + vod_paths @@ -117,7 +125,7 @@ def _concat_vods(vod_paths: list[str], target: str): def get_video_placeholders(video: Video, format: str) -> dict[str, str]: - date, time = video['publishedAt'].split("T") + date, time = video["publishedAt"].split("T") game = video["game"]["name"] if video["game"] else "Unknown" return { @@ -145,7 +153,7 @@ def _video_target_filename(video: Video, args: DownloadOptions): raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}") -def _clip_target_filename(clip, args: DownloadOptions): +def _clip_target_filename(clip: Clip, args: DownloadOptions): date, time = clip["createdAt"].split("T") game = clip["game"]["name"] if clip["game"] else "Unknown" @@ -240,10 +248,12 @@ def get_clip_authenticated_url(slug: str, quality: str): url = _get_clip_url(access_token, quality) - query = urlencode({ - "sig": access_token["signature"], - "token": access_token["value"], - }) + query = urlencode( + { + "sig": access_token["signature"], + "token": access_token["value"], + } + ) return f"{url}?{query}" @@ -313,8 +323,11 @@ def _download_video(video_id, args: DownloadOptions) -> None: print_log("Fetching playlists...") playlists_m3u8 = twitch.get_playlists(video_id, access_token) playlists = list(_parse_playlists(playlists_m3u8)) - playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality - else _select_playlist_interactive(playlists)) + playlist_uri = ( + _get_playlist_by_name(playlists, args.quality) + if args.quality + else _select_playlist_interactive(playlists) + ) print_log("Fetching playlist...") response = httpx.get(playlist_uri) @@ -331,7 +344,9 @@ def _download_video(video_id, args: DownloadOptions) -> None: with open(path.join(target_dir, "playlist.m3u8"), "w") as f: f.write(response.text) - click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}") + click.echo( + f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}" + ) sources = [base_uri + path for path in vod_paths] targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)] asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit)) @@ -392,7 +407,9 @@ def _determine_time_range(video_id: str, args: DownloadOptions): try: chapter = chapters[args.chapter - 1] except IndexError: - raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.") + raise ConsoleError( + f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters." + ) click.echo(f'Chapter selected: {blue(chapter["description"])}\n') start = chapter["positionMilliseconds"] // 1000 diff --git a/twitchdl/commands/info.py b/twitchdl/commands/info.py index 85e166e..ae15ffe 100644 --- a/twitchdl/commands/info.py +++ b/twitchdl/commands/info.py @@ -1,11 +1,11 @@ import click import m3u8 -from twitchdl import utils, twitch +from twitchdl import twitch, utils from twitchdl.commands.download import get_video_placeholders from twitchdl.exceptions import ConsoleError -from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log -from twitchdl.twitch import Clip, Video +from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video +from twitchdl.twitch import Chapter, Clip, Video def info(id: str, *, json: bool = False): @@ -48,7 +48,7 @@ def info(id: str, *, json: bool = False): raise ConsoleError(f"Invalid input: {id}") -def video_info(video: Video, playlists, chapters): +def video_info(video: Video, playlists, chapters: list[Chapter]): click.echo() print_video(video) @@ -64,7 +64,7 @@ def video_info(video: Video, playlists, chapters): duration = utils.format_time(chapter["durationMilliseconds"] // 1000) click.echo(f'{start} {bold(chapter["description"])} ({duration})') - placeholders = get_video_placeholders(video, format = "mkv") + placeholders = get_video_placeholders(video, format="mkv") placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()] click.echo("") print_table(["Placeholder", "Value"], placeholders) @@ -79,8 +79,9 @@ def video_json(video, playlists, chapters): "resolution": p.stream_info.resolution, "codecs": p.stream_info.codecs, "video": p.stream_info.video, - "uri": p.uri - } for p in playlists + "uri": p.uri, + } + for p in playlists ] video["chapters"] = chapters diff --git a/twitchdl/commands/videos.py b/twitchdl/commands/videos.py index 2314d78..f93048b 100644 --- a/twitchdl/commands/videos.py +++ b/twitchdl/commands/videos.py @@ -4,7 +4,7 @@ import click from twitchdl import twitch from twitchdl.exceptions import ConsoleError -from twitchdl.output import print_log, print_paged, print_video, print_json, print_video_compact +from twitchdl.output import print_json, print_log, print_paged, print_video, print_video_compact def videos( @@ -28,15 +28,12 @@ def videos( max_videos = sys.maxsize if all or pager else limit total_count, generator = twitch.channel_videos_generator( - channel_name, max_videos, sort, type, game_ids=game_ids) + channel_name, max_videos, sort, type, game_ids=game_ids + ) if json: videos = list(generator) - print_json({ - "count": len(videos), - "totalCount": total_count, - "videos": videos - }) + print_json({"count": len(videos), "totalCount": total_count, "videos": videos}) return if total_count == 0: @@ -63,8 +60,9 @@ def videos( if total_count > count: click.secho( - "\nThere are more videos. Increase the --limit, use --all or --pager to see the rest.", - dim=True + "\nThere are more videos. " + + "Increase the --limit, use --all or --pager to see the rest.", + dim=True, ) diff --git a/twitchdl/exceptions.py b/twitchdl/exceptions.py index d5fe940..804451c 100644 --- a/twitchdl/exceptions.py +++ b/twitchdl/exceptions.py @@ -1,5 +1,7 @@ import click + class ConsoleError(click.ClickException): """Raised when an error occurs and script exectuion should halt.""" + pass diff --git a/twitchdl/http.py b/twitchdl/http.py index 515c837..3450145 100644 --- a/twitchdl/http.py +++ b/twitchdl/http.py @@ -1,12 +1,12 @@ import asyncio -import httpx import logging import os import time - from abc import ABC, abstractmethod from typing import List, Optional +import httpx + from twitchdl.progress import Progress logger = logging.getLogger(__name__) @@ -62,6 +62,7 @@ class LimitingTokenBucket(TokenBucket): class EndlessTokenBucket(TokenBucket): """Used when download speed is not limited.""" + def advance(self, size: int): pass @@ -122,12 +123,22 @@ async def download_all( targets: List[str], workers: int, *, - rate_limit: Optional[int] = None + rate_limit: Optional[int] = None, ): progress = Progress(len(sources)) token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket() async with httpx.AsyncClient(timeout=TIMEOUT) as client: semaphore = asyncio.Semaphore(workers) - tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket) - for task_id, (source, target) in enumerate(zip(sources, targets))] + tasks = [ + download_with_retries( + client, + semaphore, + task_id, + source, + target, + progress, + token_bucket, + ) + for task_id, (source, target) in enumerate(zip(sources, targets)) + ] await asyncio.gather(*tasks) diff --git a/twitchdl/output.py b/twitchdl/output.py index 879e31e..6695393 100644 --- a/twitchdl/output.py +++ b/twitchdl/output.py @@ -12,7 +12,7 @@ T = TypeVar("T") def truncate(string: str, length: int) -> str: if len(string) > length: - return string[:length - 1] + "…" + return string[: length - 1] + "…" return string @@ -77,12 +77,11 @@ def print_paged( break - def print_video(video: Video): published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "") length = utils.format_duration(video["lengthSeconds"]) - channel = blue(video['creator']['displayName']) if video["creator"] else "" + channel = blue(video["creator"]["displayName"]) if video["creator"] else "" playing = f"playing {blue(video['game']['name'])}" if video["game"] else "" # Can't find URL in video object, strange @@ -120,9 +119,9 @@ def print_clip(clip: Clip): click.secho(clip["title"], fg="green") click.echo(f"{blue(channel)} {playing}") click.echo( - f"Published {blue(published_at)}" + - f" Length: {blue(length)}" + - f" Views: {blue(clip['viewCount'])}" + f"Published {blue(published_at)}" + + f" Length: {blue(length)}" + + f" Views: {blue(clip['viewCount'])}" ) click.secho(clip["url"], italic=True) @@ -142,6 +141,7 @@ def prompt_continue(): # Shorthand functions for coloring output + def blue(text: Any) -> str: return click.style(text, fg="blue") diff --git a/twitchdl/progress.py b/twitchdl/progress.py index 470072d..a8b54d5 100644 --- a/twitchdl/progress.py +++ b/twitchdl/progress.py @@ -1,11 +1,11 @@ -import click import logging import time - from collections import deque from dataclasses import dataclass, field from statistics import mean -from typing import Dict, NamedTuple, Optional, Deque +from typing import Deque, Dict, NamedTuple, Optional + +import click from twitchdl.output import blue from twitchdl.utils import format_size, format_time @@ -94,18 +94,28 @@ class Progress: task = self.tasks[task_id] if task.size != task.downloaded: - logger.warn(f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b.") + logger.warn( + f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b." + ) self.vod_downloaded_count += 1 self.print() def _calculate_total(self): - self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None + self.estimated_total = ( + int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None + ) def _calculate_progress(self): self.speed = self._calculate_speed() - self.progress_perc = int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0 - self.remaining_time = int((self.estimated_total - self.progress_bytes) / self.speed) if self.estimated_total and self.speed else None + self.progress_perc = ( + int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0 + ) + self.remaining_time = ( + int((self.estimated_total - self.progress_bytes) / self.speed) + if self.estimated_total and self.speed + else None + ) def _calculate_speed(self): if len(self.samples) < 2: @@ -126,13 +136,17 @@ class Progress: if now - self.last_printed < 0.1: return - progress = " ".join([ - f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs", - blue(self.progress_perc), - f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "", - f"at {blue(format_size(self.speed))}/s" if self.speed else "", - f"ETA {blue(format_time(self.remaining_time))}" if self.remaining_time is not None else "", - ]) + progress = " ".join( + [ + f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs", + f"{blue(self.progress_perc)}%", + f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "", + f"at {blue(format_size(self.speed))}/s" if self.speed else "", + f"ETA {blue(format_time(self.remaining_time))}" + if self.remaining_time is not None + else "", + ] + ) click.echo(f"\r{progress} ", nl=False) self.last_printed = now diff --git a/twitchdl/twitch.py b/twitchdl/twitch.py index 0107c21..6349c41 100644 --- a/twitchdl/twitch.py +++ b/twitchdl/twitch.py @@ -2,16 +2,16 @@ Twitch API access. """ -import httpx import json -import click - from typing import Dict, Generator, Literal, TypedDict + +import click +import httpx + from twitchdl import CLIENT_ID from twitchdl.entities import Data from twitchdl.exceptions import ConsoleError - ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"] VideosSort = Literal["views", "time"] VideosType = Literal["archive", "highlight", "upload"] @@ -203,7 +203,7 @@ def get_clip_access_token(slug: str) -> AccessToken: return response["data"]["clip"]["playbackAccessToken"] -def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None= None): +def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None = None): """ List channel clips. @@ -243,7 +243,7 @@ def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: s def channel_clips_generator( channel_id: str, period: ClipsPeriod, - limit: int + limit: int, ) -> Generator[Clip, None, None]: def _generator(clips: Data, limit: int) -> Generator[Clip, None, None]: for clip in clips["edges"]: @@ -289,7 +289,7 @@ def get_channel_videos( sort: str, type: str = "archive", game_ids: list[str] | None = None, - after: str | None = None + after: str | None = None, ): game_ids = game_ids or [] @@ -333,7 +333,7 @@ def channel_videos_generator( max_videos: int, sort: VideosSort, type: VideosType, - game_ids: list[str] | None = None + game_ids: list[str] | None = None, ) -> tuple[int, Generator[Video, None, None]]: game_ids = game_ids or [] @@ -403,13 +403,16 @@ def get_playlists(video_id: str, access_token: AccessToken): """ url = f"https://usher.ttvnw.net/vod/{video_id}" - response = httpx.get(url, params={ - "nauth": access_token["value"], - "nauthsig": access_token["signature"], - "allow_audio_only": "true", - "allow_source": "true", - "player": "twitchweb", - }) + response = httpx.get( + url, + params={ + "nauth": access_token["value"], + "nauthsig": access_token["signature"], + "allow_audio_only": "true", + "allow_source": "true", + "player": "twitchweb", + }, + ) response.raise_for_status() return response.content.decode("utf-8") @@ -432,19 +435,16 @@ def get_game_id(name: str): def get_video_chapters(video_id: str) -> list[Chapter]: query = { "operationName": "VideoPlayer_ChapterSelectButtonVideo", - "variables": - { + "variables": { "includePrivate": False, - "videoID": video_id + "videoID": video_id, }, - "extensions": - { - "persistedQuery": - { + "extensions": { + "persistedQuery": { "version": 1, - "sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41" + "sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41", } - } + }, } response = gql_post(json.dumps(query)) diff --git a/twitchdl/utils.py b/twitchdl/utils.py index 1f8114e..77a93e5 100644 --- a/twitchdl/utils.py +++ b/twitchdl/utils.py @@ -68,16 +68,16 @@ def read_int(msg: str, min: int, max: int, default: int | None = None) -> int: def slugify(value: str) -> str: - value = unicodedata.normalize('NFKC', str(value)) - value = re.sub(r'[^\w\s_-]', '', value) - value = re.sub(r'[\s_-]+', '_', value) + value = unicodedata.normalize("NFKC", str(value)) + value = re.sub(r"[^\w\s_-]", "", value) + value = re.sub(r"[\s_-]+", "_", value) return value.strip("_").lower() def titlify(value: str) -> str: - value = unicodedata.normalize('NFKC', str(value)) - value = re.sub(r'[^\w\s\[\]().-]', '', value) - value = re.sub(r'\s+', ' ', value) + value = unicodedata.normalize("NFKC", str(value)) + value = re.sub(r"[^\w\s\[\]().-]", "", value) + value = re.sub(r"\s+", " ", value) return value.strip()