diff --git a/pyproject.toml b/pyproject.toml index 94baf06..2e0418f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "click>=8.0.0,<9.0.0", "httpx>=0.17.0,<1.0.0", "m3u8>=1.0.0,<4.0.0", + "python-dateutil>=2.8.0,<3.0.0", ] [tool.setuptools] diff --git a/twitchdl/commands/download.py b/twitchdl/commands/download.py index acd98af..a0e0de7 100644 --- a/twitchdl/commands/download.py +++ b/twitchdl/commands/download.py @@ -15,8 +15,9 @@ from typing import List, Optional, OrderedDict from urllib.parse import urlparse, urlencode from twitchdl import twitch, utils +from twitchdl.conversion import from_dict from twitchdl.download import download_file -from twitchdl.entities import Data, DownloadOptions +from twitchdl.entities import Data, DownloadOptions, Video from twitchdl.exceptions import ConsoleError from twitchdl.http import download_all from twitchdl.output import blue, bold, dim, green, print_log, yellow @@ -80,16 +81,16 @@ def _select_playlist_interactive(playlists): return uri -def _join_vods(playlist_path: str, target: str, overwrite: bool, video): - description = video["description"] or "" +def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video): + description = video.description or "" description = description.strip() command = [ "ffmpeg", "-i", playlist_path, "-c", "copy", - "-metadata", f"artist={video['creator']['displayName']}", - "-metadata", f"title={video['title']}", + "-metadata", f"artist={video.creator.display_name}", + "-metadata", f"title={video.title}", "-metadata", f"description={description}", "-metadata", "encoded_by=twitch-dl", "-stats", @@ -115,26 +116,28 @@ def _concat_vods(vod_paths: list[str], target: str): raise ConsoleError(f"Joining files failed: {result.stderr}") -def get_video_placeholders(video: Data, format: str) -> Data: - date, time = video['publishedAt'].split("T") - game = video["game"]["name"] if video["game"] else "Unknown" +def get_video_placeholders(video: Video, format: str) -> Data: + date = video.published_at.date().isoformat() + time = video.published_at.time().isoformat() + datetime = video.published_at.isoformat().replace("+00:00", "Z") + game = video.game.name if video.game else "Unknown" return { - "channel": video["creator"]["displayName"], - "channel_login": video["creator"]["login"], + "channel": video.creator.display_name, + "channel_login": video.creator.login, "date": date, - "datetime": video["publishedAt"], + "datetime": datetime, "format": format, "game": game, "game_slug": utils.slugify(game), - "id": video["id"], + "id": video.id, "time": time, - "title": utils.titlify(video["title"]), - "title_slug": utils.slugify(video["title"]), + "title": utils.titlify(video.title), + "title_slug": utils.slugify(video.title), } -def _video_target_filename(video: Data, args: DownloadOptions): +def _video_target_filename(video: Video, args: DownloadOptions): subs = get_video_placeholders(video, args.format) try: @@ -280,19 +283,18 @@ def _download_clip(slug: str, args: DownloadOptions) -> None: click.echo(f"Downloaded: {blue(target)}") -def _download_video(video_id, args: DownloadOptions) -> None: +def _download_video(video_id: str, args: DownloadOptions) -> None: if args.start and args.end and args.end <= args.start: raise ConsoleError("End time must be greater than start time") print_log("Looking up video...") - video = twitch.get_video(video_id) + response = twitch.get_video(video_id) - if not video: + if not response: raise ConsoleError(f"Video {video_id} not found") - title = video["title"] - user = video["creator"]["displayName"] - click.echo(f"Found: {blue(title)} by {yellow(user)}") + video = from_dict(Video, response) + click.echo(f"Found: {blue(video.title)} by {yellow(video.creator.display_name)}") target = _video_target_filename(video, args) click.echo(f"Output: {blue(target)}") diff --git a/twitchdl/commands/info.py b/twitchdl/commands/info.py index 6e8bc87..1a3a0d1 100644 --- a/twitchdl/commands/info.py +++ b/twitchdl/commands/info.py @@ -3,17 +3,19 @@ import m3u8 from twitchdl import utils, twitch from twitchdl.commands.download import get_video_placeholders -from twitchdl.entities import Data +from twitchdl.conversion import from_dict +from twitchdl.entities import Data, Video from twitchdl.exceptions import ConsoleError from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log + def info(id: str, *, json: bool = False): video_id = utils.parse_video_identifier(id) if video_id: print_log("Fetching video...") - video = twitch.get_video(video_id) + response = twitch.get_video(video_id) - if not video: + if not response: raise ConsoleError(f"Video {video_id} not found") print_log("Fetching access token...") @@ -26,8 +28,9 @@ def info(id: str, *, json: bool = False): chapters = twitch.get_video_chapters(video_id) if json: - video_json(video, playlists, chapters) + video_json(response, playlists, chapters) else: + video = from_dict(Video, response) video_info(video, playlists, chapters) return @@ -47,7 +50,7 @@ def info(id: str, *, json: bool = False): raise ConsoleError(f"Invalid input: {id}") -def video_info(video, playlists, chapters): +def video_info(video: Video, playlists, chapters): click.echo() print_video(video) diff --git a/twitchdl/conversion.py b/twitchdl/conversion.py new file mode 100644 index 0000000..d275105 --- /dev/null +++ b/twitchdl/conversion.py @@ -0,0 +1,87 @@ +import re +import dataclasses + +from dataclasses import Field, is_dataclass +from datetime import date, datetime +from dateutil import parser +from typing import Any, Generator, Type, TypeVar, Union, get_args, get_origin, Callable +from typing import get_type_hints + +# Generic data class instance +T = TypeVar("T") + +# Dict of data decoded from JSON +Data = dict[str, Any] + + +def snake_to_camel(name: str): + def repl(match: re.Match[str]): + return match.group(1).upper() + + return re.sub(r"_([a-z])", repl, name) + + +def from_dict(cls: Type[T], data: Data, key_fn: Callable[[str], str] = snake_to_camel) -> T: + """Convert a nested dict into an instance of `cls`.""" + + def _fields() -> Generator[tuple[str, Any], None, None]: + hints = get_type_hints(cls) + for field in dataclasses.fields(cls): + field_type = _prune_optional(hints[field.name]) + + dict_field_name = key_fn(field.name) + if (value := data.get(dict_field_name)) is not None: + field_value = _convert(field_type, value) + else: + field_value = _get_default_value(field) + + yield field.name, field_value + + return cls(**dict(_fields())) + + +def from_dict_list(cls: Type[T], data: list[Data]) -> list[T]: + return [from_dict(cls, x) for x in data] + + +def _get_default_value(field: Field[Any]): + if field.default is not dataclasses.MISSING: + return field.default + + if field.default_factory is not dataclasses.MISSING: + return field.default_factory() + + return None + + +def _convert(field_type: Type[Any], value: Any) -> Any: + if value is None: + return None + + if field_type in [str, int, bool, dict]: + return value + + if field_type == datetime: + return parser.parse(value) + + if field_type == date: + return date.fromisoformat(value) + + if get_origin(field_type) == list: + (inner_type,) = get_args(field_type) + return [_convert(inner_type, x) for x in value] + + if is_dataclass(field_type): + return from_dict(field_type, value) + + raise ValueError(f"Not implemented for type '{field_type}'") + + +def _prune_optional(field_type: Type[Any]): + """For `Optional[]` returns the encapsulated ``.""" + if get_origin(field_type) == Union: + args = get_args(field_type) + if len(args) == 2 and args[1] == type(None): # noqa + return args[0] + + return field_type diff --git a/twitchdl/entities.py b/twitchdl/entities.py index 78c744e..87cd165 100644 --- a/twitchdl/entities.py +++ b/twitchdl/entities.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from datetime import datetime from typing import Any @@ -23,3 +24,30 @@ class DownloadOptions: # Type for annotating decoded JSON # TODO: make data classes for common structs Data = dict[str, Any] + + +@dataclass +class User: + login: str + display_name: str + +@dataclass +class Game: + name: str + +@dataclass +class Video: + id: str + title: str + description: str + published_at: datetime + broadcast_type: str + length_seconds: int + game: Game + creator: User + + +@dataclass +class AccessToken: + signature: str + value: str diff --git a/twitchdl/output.py b/twitchdl/output.py index 0e94a68..a67929d 100644 --- a/twitchdl/output.py +++ b/twitchdl/output.py @@ -5,7 +5,7 @@ from itertools import islice from twitchdl import utils from typing import Any, Callable, Generator, TypeVar -from twitchdl.entities import Data +from twitchdl.entities import Data, Video T = TypeVar("T") @@ -78,24 +78,24 @@ def print_paged( -def print_video(video: Data): - published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "") - length = utils.format_duration(video["lengthSeconds"]) +def print_video(video: Video): + published_at = str(video.published_at.astimezone()) + length = utils.format_duration(video.length_seconds) - channel = blue(video['creator']['displayName']) if video["creator"] else "" - playing = f"playing {blue(video['game']['name'])}" if video["game"] else "" + channel = blue(video.creator.display_name) if video.creator else "" + playing = f"playing {blue(video.game.name)}" if video.game else "" # Can't find URL in video object, strange - url = f"https://www.twitch.tv/videos/{video['id']}" + url = f"https://www.twitch.tv/videos/{video.id}" - click.secho(f"Video {video['id']}", bold=True) - click.secho(f"{video['title']}", fg="green") + click.secho(f"Video {video.id}", bold=True) + click.secho(f"{video.title}", fg="green") if channel or playing: click.echo(" ".join([channel, playing])) - if video["description"]: - click.echo(f"Description: {video['description']}") + if video.description: + click.echo(f"Description: {video.description}") click.echo(f"Published {blue(published_at)} Length: {blue(length)} ") click.secho(url, italic=True) diff --git a/twitchdl/twitch.py b/twitchdl/twitch.py index 4054466..1b615c9 100644 --- a/twitchdl/twitch.py +++ b/twitchdl/twitch.py @@ -8,7 +8,7 @@ import click from typing import Dict, Generator, Literal from twitchdl import CLIENT_ID -from twitchdl.entities import Data +from twitchdl.entities import AccessToken, Data from twitchdl.exceptions import ConsoleError @@ -298,7 +298,7 @@ def channel_videos_generator( return videos["totalCount"], _generator(videos, max_videos) -def get_access_token(video_id, auth_token=None): +def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToken: query = f""" {{ videoPlaybackAccessToken( @@ -321,7 +321,11 @@ def get_access_token(video_id, auth_token=None): try: response = gql_query(query, headers=headers) - return response["data"]["videoPlaybackAccessToken"] + return AccessToken( + response["data"]["videoPlaybackAccessToken"]["signature"], + response["data"]["videoPlaybackAccessToken"]["value"], + ) + except httpx.HTTPStatusError as error: # Provide a more useful error message when server returns HTTP 401 # Unauthorized while using a user-provided auth token. @@ -337,15 +341,15 @@ def get_access_token(video_id, auth_token=None): raise -def get_playlists(video_id, access_token): +def get_playlists(video_id: str, access_token: AccessToken): """ For a given video return a playlist which contains possible video qualities. """ url = f"https://usher.ttvnw.net/vod/{video_id}" response = httpx.get(url, params={ - "nauth": access_token['value'], - "nauthsig": access_token['signature'], + "nauth": access_token.value, + "nauthsig": access_token.signature, "allow_audio_only": "true", "allow_source": "true", "player": "twitchweb",