diff --git a/tests/test_api.py b/tests/test_api.py index 4c6f6ce..6453a28 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -6,19 +6,19 @@ import httpx import m3u8 from twitchdl import twitch from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url -from twitchdl.models import Game +from twitchdl.models import Game, VideosPage TEST_CHANNEL = "bananasaurus_rex" def test_get_videos(): - videos = twitch.get_channel_videos(TEST_CHANNEL, 3, "time") - assert videos["pageInfo"] - assert len(videos["edges"]) > 0 + page = twitch.get_channel_videos(TEST_CHANNEL, 3, "time") + assert isinstance(page, VideosPage) + assert len(page.videos) > 0 - video_id = videos["edges"][0]["node"]["id"] + video_id = page.videos[0].id video = twitch.get_video(video_id) - assert video["id"] == video_id + assert video and video.id == video_id access_token = twitch.get_access_token(video_id) assert "signature" in access_token @@ -27,7 +27,7 @@ def test_get_videos(): playlists = twitch.get_playlists(video_id, access_token) assert playlists.startswith("#EXTM3U") - name, res, url = next(_parse_playlists(playlists)) + _, _, url = next(_parse_playlists(playlists)) playlist = httpx.get(url).text assert playlist.startswith("#EXTM3U") diff --git a/twitchdl/commands/download.py b/twitchdl/commands/download.py index cdc3a97..d590634 100644 --- a/twitchdl/commands/download.py +++ b/twitchdl/commands/download.py @@ -16,7 +16,7 @@ from twitchdl import twitch, utils from twitchdl.download import download_file from twitchdl.exceptions import ConsoleError from twitchdl.http import download_all -from twitchdl.models import Clip +from twitchdl.models import Clip, Video from twitchdl.output import print_out @@ -61,13 +61,13 @@ def _select_playlist_interactive(playlists): return uri -def _join_vods(playlist_path, target, overwrite, video): +def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video): command = [ "ffmpeg", "-i", playlist_path, "-c", "copy", - "-metadata", "artist={}".format(video["creator"]["displayName"]), - "-metadata", "title={}".format(video["title"]), + "-metadata", f"artist={video.creator.display_name}", + "-metadata", f"title={video.title}", "-metadata", "encoded_by=twitch-dl", "-stats", "-loglevel", "warning", @@ -83,22 +83,22 @@ def _join_vods(playlist_path, target, overwrite, video): raise ConsoleError("Joining files failed") -def _video_target_filename(video, args): - date, time = video['publishedAt'].split("T") - game = video["game"]["name"] if video["game"] else "Unknown" +def _video_target_filename(video: Video, args) -> str: + date, time = video.published_at.split("T") + game = video.game.name if video.game else "Unknown" subs = { - "channel": video["creator"]["displayName"], - "channel_login": video["creator"]["login"], + "channel": video.creator.display_name, + "channel_login": video.creator.login, "date": date, - "datetime": video["publishedAt"], + "datetime": video.published_at, "format": args.format, "game": game, "game_slug": utils.slugify(game), - "id": video["id"], + "id": video.id, "time": time, - "title": utils.titlify(video["title"]), - "title_slug": utils.slugify(video["title"]), + "title": utils.titlify(video.title), + "title_slug": utils.slugify(video.title), } try: @@ -183,7 +183,7 @@ def download_one(video: str, args): raise ConsoleError("Invalid input: {}".format(video)) -def _get_clip_url(clip, quality): +def _get_clip_url(clip, quality) -> str: qualities = clip["videoQualities"] # Quality given as an argument @@ -211,7 +211,7 @@ def _get_clip_url(clip, quality): return selected_quality["sourceURL"] -def get_clip_authenticated_url(slug, quality): +def get_clip_authenticated_url(slug: str, quality: str) -> str: print_out("Fetching access token...") access_token = twitch.get_clip_access_token(slug) @@ -228,7 +228,7 @@ def get_clip_authenticated_url(slug, quality): return "{}?{}".format(url, query) -def _download_clip(slug: str, args) -> None: +def _download_clip(slug: str, args): print_out("Looking up clip...") clip = twitch.get_clip(slug) game = clip.game.name if clip.game else "Unknown" @@ -271,8 +271,8 @@ def _download_video(video_id, args) -> None: if not video: raise ConsoleError("Video {} not found".format(video_id)) - print_out("Found: {} by {}".format( - video['title'], video['creator']['displayName'])) + creator = f" by {video.creator.display_name}" if video.creator else "" + print_out(f"Found: {video.title}{creator}") target = _video_target_filename(video, args) print_out("Output: {}".format(target)) diff --git a/twitchdl/commands/info.py b/twitchdl/commands/info.py index 7bd1d89..0df1596 100644 --- a/twitchdl/commands/info.py +++ b/twitchdl/commands/info.py @@ -2,7 +2,7 @@ import m3u8 from twitchdl import utils, twitch from twitchdl.exceptions import ConsoleError -from twitchdl.models import Clip +from twitchdl.models import Clip, Video from twitchdl.output import print_video, print_clip, print_json, print_out, print_log @@ -44,7 +44,7 @@ def info(args): raise ConsoleError("Invalid input: {}".format(args.video)) -def video_info(video, playlists): +def video_info(video: Video, playlists): print_out() print_video(video) @@ -54,10 +54,11 @@ def video_info(video, playlists): print_out("{} {}".format(p.stream_info.video, p.uri)) -def video_json(video, playlists): +def video_json(video: Video, playlists): playlists = m3u8.loads(playlists).playlists + json = video.raw - video["playlists"] = [ + json["playlists"] = [ { "bandwidth": p.stream_info.bandwidth, "resolution": p.stream_info.resolution, @@ -67,7 +68,7 @@ def video_json(video, playlists): } for p in playlists ] - print_json(video) + print_json(json) def clip_info(clip: Clip): diff --git a/twitchdl/commands/videos.py b/twitchdl/commands/videos.py index bbe1121..ea5ee50 100644 --- a/twitchdl/commands/videos.py +++ b/twitchdl/commands/videos.py @@ -22,7 +22,7 @@ def videos(args): print_json({ "count": len(videos), "totalCount": total_count, - "videos": videos + "videos": [v.raw for v in videos] }) return @@ -60,10 +60,10 @@ def _get_game_ids(names): game_ids = [] for name in names: - print_out("Looking up game '{}'...".format(name)) - game_id = twitch.get_game_id(name) - if not game_id: - raise ConsoleError("Game '{}' not found".format(name)) - game_ids.append(int(game_id)) + print_out(f"Looking up game '{name}'...") + game = twitch.find_game(name) + if not game: + raise ConsoleError(f"Game '{name}' not found") + game_ids.append(int(game.id)) return game_ids diff --git a/twitchdl/models.py b/twitchdl/models.py index 2cefa3f..d339185 100644 --- a/twitchdl/models.py +++ b/twitchdl/models.py @@ -90,4 +90,52 @@ class ClipsPage(): ) +@dataclass(frozen=True) +class Video(): + id: str + title: str + published_at: str + broadcast_type: str + length_seconds: int + game: Optional[Game] + creator: Broadcaster + raw: Json + + @staticmethod + def from_json(data: Json) -> "Video": + game = Game.from_json(data["game"]) if data["game"] else None + creator = Broadcaster.from_json(data["creator"]) + + return Video( + data["id"], + data["title"], + data["publishedAt"], + data["broadcastType"], + data["lengthSeconds"], + game, + creator, + data + ) + + +@dataclass(frozen=True) +class VideosPage(): + cursor: str + has_next_page: bool + has_previous_page: bool + total_count: int + videos: List[Video] + + @staticmethod + def from_json(data: Json) -> "VideosPage": + return VideosPage( + data["edges"][-1]["cursor"], + data["pageInfo"]["hasNextPage"], + data["pageInfo"].get("hasPreviousPage"), + data["totalCount"], + [Video.from_json(c["node"]) for c in data["edges"]] + ) + + ClipGenerator = Generator[Clip, None, None] +VideoGenerator = Generator[Video, None, None] diff --git a/twitchdl/output.py b/twitchdl/output.py index 6748b46..1127d17 100644 --- a/twitchdl/output.py +++ b/twitchdl/output.py @@ -6,7 +6,7 @@ import re from itertools import islice from twitchdl import utils -from twitchdl.models import Clip +from twitchdl.models import Clip, Video from typing import Any, Match @@ -78,32 +78,31 @@ def print_log(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) -def print_video(video): - published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "") - length = utils.format_duration(video["lengthSeconds"]) +def print_video(video: Video): + published_at = video.published_at.replace("T", " @ ").replace("Z", "") + length = utils.format_duration(video.length_seconds) - channel = "{}".format(video["creator"]["displayName"]) if video["creator"] else "" - playing = "playing {}".format(video["game"]["name"]) if video["game"] else "" + channel = f"{video.creator.display_name}" if video.creator else "" + playing = f"playing {video.game.name}" if video.game else "" # Can't find URL in video object, strange - url = "https://www.twitch.tv/videos/{}".format(video["id"]) + url = f"https://www.twitch.tv/videos/{video.id}" - print_out("Video {}".format(video["id"])) - print_out("{}".format(video["title"])) + print_out(f"Video {video.id}") + print_out(f"{video.title}") if channel or playing: print_out(" ".join([channel, playing])) - print_out("Published {} Length: {} ".format(published_at, length)) - print_out("{}".format(url)) + print_out(f"Published {published_at} Length: {length}") + print_out(f"{url}") def print_video_compact(video): - id = video["id"] - date = video["publishedAt"][:10] - game = video["game"]["name"] if video["game"] else "" - title = truncate(video["title"], 80).ljust(80) - print_out(f'{id} {date} {title} {game}') + date = video.published_at[:10] + game = video.game.name if video.game else "" + title = truncate(video.title, 80).ljust(80) + print_out(f"{video.id} {date} {title} {game}") def print_paged_videos(generator, page_size, total_count): diff --git a/twitchdl/twitch.py b/twitchdl/twitch.py index 806d2ed..614c870 100644 --- a/twitchdl/twitch.py +++ b/twitchdl/twitch.py @@ -6,8 +6,8 @@ import httpx from twitchdl import CLIENT_ID from twitchdl.exceptions import ConsoleError -from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game -from typing import Dict, Optional +from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game, Video, VideoGenerator, VideosPage +from typing import Dict, Optional, Tuple class GQLError(Exception): @@ -94,7 +94,7 @@ CLIP_FIELDS = f""" """ -def get_video(video_id): +def get_video(video_id: str) -> Optional[Video]: query = """ {{ video(id: "{video_id}") {{ @@ -106,7 +106,8 @@ def get_video(video_id): query = query.format(video_id=video_id, fields=VIDEO_FIELDS) response = gql_query(query) - return response["data"]["video"] + if response["data"]["video"]: + return Video.from_json(response["data"]["video"]) def get_clip(slug: str) -> Clip: @@ -224,7 +225,7 @@ def channel_clips_generator_old(channel_id, period, limit): break -def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None): +def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None) -> VideosPage: query = """ {{ user(login: "{channel_id}") {{ @@ -267,29 +268,27 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft if not response["data"]["user"]: raise ConsoleError("Channel {} not found".format(channel_id)) - return response["data"]["user"]["videos"] + return VideosPage.from_json(response["data"]["user"]["videos"]) -def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None): - def _generator(videos, max_videos): - for video in videos["edges"]: +def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None) -> Tuple[int, VideoGenerator]: + def _generator(page, max_videos): + for video in page.videos: if max_videos < 1: return - yield video["node"] + yield video max_videos -= 1 - has_next = videos["pageInfo"]["hasNextPage"] - if max_videos < 1 or not has_next: + if max_videos < 1 or not page.has_next_page: return limit = min(max_videos, 100) - cursor = videos["edges"][-1]["cursor"] - videos = get_channel_videos(channel_id, limit, sort, type, game_ids, cursor) + videos = get_channel_videos(channel_id, limit, sort, type, game_ids, page.cursor) yield from _generator(videos, max_videos) limit = min(max_videos, 100) - videos = get_channel_videos(channel_id, limit, sort, type, game_ids) - return videos["totalCount"], _generator(videos, max_videos) + page = get_channel_videos(channel_id, limit, sort, type, game_ids) + return page.total_count, _generator(page, max_videos) def get_access_token(video_id, auth_token=None):