diff --git a/twitchdl/commands/clips.py b/twitchdl/commands/clips.py
index eb012f3..7fa68ff 100644
--- a/twitchdl/commands/clips.py
+++ b/twitchdl/commands/clips.py
@@ -7,6 +7,7 @@ from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
+from twitchdl.models import Clip, ClipGenerator
from twitchdl.output import print_out, print_clip, print_json
@@ -17,13 +18,12 @@ def clips(args):
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
if args.json:
- return print_json(list(generator))
+ return print_json([c.raw for c in generator])
if args.download:
return _download_clips(generator)
if args.pager:
- print(args)
return _print_paged(generator, args.pager)
return _print_all(generator, args)
@@ -40,38 +40,41 @@ def _continue():
return True
-def _target_filename(clip):
- url = clip["videoQualities"][0]["sourceURL"]
+def _target_filename(clip: Clip):
+ url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
- match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
+ match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip.created_at)
+ if not match:
+ raise ValueError(f"Invalid date: {clip.created_at}")
+
date = "".join(match.groups())
name = "_".join([
date,
- clip["id"],
- clip["broadcaster"]["login"],
- utils.slugify(clip["title"]),
+ clip.id,
+ clip.broadcaster.login,
+ utils.slugify(clip.title),
])
return "{}.{}".format(name, ext)
-def _download_clips(generator):
- for clip in generator:
+def _download_clips(clips: ClipGenerator):
+ for clip in clips:
target = _target_filename(clip)
if path.exists(target):
print_out("Already downloaded: {}".format(target))
else:
- url = get_clip_authenticated_url(clip["slug"], "source")
+ url = get_clip_authenticated_url(clip.slug, "source")
print_out("Downloading: {}".format(target))
download_file(url, target)
-def _print_all(generator, args):
- for clip in generator:
+def _print_all(clips: ClipGenerator, args):
+ for clip in clips:
print_out()
print_clip(clip)
@@ -82,8 +85,8 @@ def _print_all(generator, args):
)
-def _print_paged(generator, page_size):
- iterator = iter(generator)
+def _print_paged(clips: ClipGenerator, page_size: int):
+ iterator = iter(clips)
page = list(islice(iterator, page_size))
first = 1
diff --git a/twitchdl/commands/download.py b/twitchdl/commands/download.py
index 5c85320..cdc3a97 100644
--- a/twitchdl/commands/download.py
+++ b/twitchdl/commands/download.py
@@ -16,6 +16,7 @@ from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
+from twitchdl.models import Clip
from twitchdl.output import print_out
@@ -107,27 +108,27 @@ def _video_target_filename(video, args):
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
-def _clip_target_filename(clip, args):
- date, time = clip["createdAt"].split("T")
- game = clip["game"]["name"] if clip["game"] else "Unknown"
+def _clip_target_filename(clip: Clip, args) -> str:
+ date, time = clip.created_at.split("T")
+ game = clip.game.name if clip.game else "Unknown"
- url = clip["videoQualities"][0]["sourceURL"]
+ url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
- "channel": clip["broadcaster"]["displayName"],
- "channel_login": clip["broadcaster"]["login"],
+ "channel": clip.broadcaster.display_name,
+ "channel_login": clip.broadcaster.login,
"date": date,
- "datetime": clip["createdAt"],
+ "datetime": clip.created_at,
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
- "id": clip["id"],
- "slug": clip["slug"],
+ "id": clip.id,
+ "slug": clip.slug,
"time": time,
- "title": utils.titlify(clip["title"]),
- "title_slug": utils.slugify(clip["title"]),
+ "title": utils.titlify(clip.title),
+ "title_slug": utils.slugify(clip.title),
}
try:
@@ -230,16 +231,16 @@ def get_clip_authenticated_url(slug, quality):
def _download_clip(slug: str, args) -> None:
print_out("Looking up clip...")
clip = twitch.get_clip(slug)
- game = clip["game"]["name"] if clip["game"] else "Unknown"
+ game = clip.game.name if clip.game else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
print_out("Found: {} by {}, playing {} ({})".format(
- clip["title"],
- clip["broadcaster"]["displayName"],
+ clip.title,
+ clip.broadcaster.display_name,
game,
- utils.format_duration(clip["durationSeconds"])
+ utils.format_duration(clip.duration_seconds)
))
target = _clip_target_filename(clip, args)
diff --git a/twitchdl/commands/info.py b/twitchdl/commands/info.py
index e52e117..7bd1d89 100644
--- a/twitchdl/commands/info.py
+++ b/twitchdl/commands/info.py
@@ -2,6 +2,7 @@ import m3u8
from twitchdl import utils, twitch
from twitchdl.exceptions import ConsoleError
+from twitchdl.models import Clip
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
@@ -35,7 +36,7 @@ def info(args):
raise ConsoleError("Clip {} not found".format(clip_slug))
if args.json:
- print_json(clip)
+ print_json(clip.raw)
else:
clip_info(clip)
return
@@ -69,11 +70,11 @@ def video_json(video, playlists):
print_json(video)
-def clip_info(clip):
+def clip_info(clip: Clip):
print_out()
print_clip(clip)
print_out()
print_out("Download links:")
- for q in clip["videoQualities"]:
- print_out("{quality}p{frameRate} {sourceURL}".format(**q))
+ for q in clip.video_qualities:
+ print_out(f"{q.quality}p{q.frame_rate} {q.source_url}")
diff --git a/twitchdl/models.py b/twitchdl/models.py
new file mode 100644
index 0000000..8b100b6
--- /dev/null
+++ b/twitchdl/models.py
@@ -0,0 +1,92 @@
+from typing import Any, Dict, List, Optional, Generator
+from dataclasses import dataclass
+
+
+Json = Dict[str, Any]
+GameID = str
+
+
+@dataclass(frozen=True)
+class Broadcaster():
+ login: str
+ display_name: str
+
+ @staticmethod
+ def from_json(data: Json) -> "Broadcaster":
+ return Broadcaster(data["login"], data["displayName"])
+
+
+@dataclass(frozen=True)
+class VideoQuality():
+ frame_rate: int
+ quality: str
+ source_url: str
+
+ @staticmethod
+ def from_json(data: Json) -> "VideoQuality":
+ return VideoQuality(data["frameRate"], data["quality"], data["sourceURL"])
+
+
+@dataclass(frozen=True)
+class Game():
+ id: str
+ name: str
+
+ @staticmethod
+ def from_json(data: Json) -> "Game":
+ return Game(data["id"], data["name"])
+
+
+@dataclass(frozen=True)
+class Clip():
+ id: str
+ slug: str
+ title: str
+ created_at: str
+ view_count: int
+ duration_seconds: int
+ url: str
+ game: Optional[Game]
+ broadcaster: Broadcaster
+ video_qualities: List[VideoQuality]
+ raw: Json
+
+ @staticmethod
+ def from_json(data: Json) -> "Clip":
+ game = Game.from_json(data["game"]) if data["game"] else None
+ broadcaster = Broadcaster.from_json(data["broadcaster"])
+ video_qualities = [VideoQuality.from_json(q) for q in data["videoQualities"]]
+
+ return Clip(
+ data["id"],
+ data["slug"],
+ data["title"],
+ data["createdAt"],
+ data["viewCount"],
+ data["durationSeconds"],
+ data["url"],
+ game,
+ broadcaster,
+ video_qualities,
+ data
+ )
+
+
+@dataclass(frozen=True)
+class ClipsPage():
+ cursor: str
+ has_next_page: bool
+ has_previous_page: bool
+ clips: List[Clip]
+
+ @staticmethod
+ def from_json(data: Json) -> "ClipsPage":
+ return ClipsPage(
+ data["edges"][-1]["cursor"],
+ data["pageInfo"]["hasNextPage"],
+ data["pageInfo"]["hasPreviousPage"],
+ [Clip.from_json(c["node"]) for c in data["edges"]]
+ )
+
+
+ClipGenerator = Generator[Clip, None, None]
diff --git a/twitchdl/output.py b/twitchdl/output.py
index d9c3028..6748b46 100644
--- a/twitchdl/output.py
+++ b/twitchdl/output.py
@@ -6,6 +6,7 @@ import re
from itertools import islice
from twitchdl import utils
+from twitchdl.models import Clip
from typing import Any, Match
@@ -133,23 +134,23 @@ def print_paged_videos(generator, page_size, total_count):
break
-def print_clip(clip):
- published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
- length = utils.format_duration(clip["durationSeconds"])
- channel = clip["broadcaster"]["displayName"]
+def print_clip(clip: Clip):
+ published_at = clip.created_at.replace("T", " @ ").replace("Z", "")
+ length = utils.format_time(clip.duration_seconds)
+ channel = clip.broadcaster.display_name
playing = (
- "playing {}".format(clip["game"]["name"])
- if clip["game"] else ""
+ "playing {}".format(clip.game.name)
+ if clip.game else ""
)
- print_out("Clip {}".format(clip["slug"]))
- print_out("{}".format(clip["title"]))
+ print_out("Clip {}".format(clip.slug))
+ print_out("{}".format(clip.title))
print_out("{} {}".format(channel, playing))
print_out(
- "Published {}"
- " Length: {}"
- " Views: {}".format(published_at, length, clip["viewCount"]))
- print_out("{}".format(clip["url"]))
+ f"Published: {published_at}"
+ f" Length: {length}"
+ f" Views: {clip.view_count}")
+ print_out(f"{clip.url}")
def _continue():
diff --git a/twitchdl/twitch.py b/twitchdl/twitch.py
index 1683727..7deec44 100644
--- a/twitchdl/twitch.py
+++ b/twitchdl/twitch.py
@@ -4,9 +4,10 @@ Twitch API access.
import httpx
-from typing import Dict
from twitchdl import CLIENT_ID
from twitchdl.exceptions import ConsoleError
+from twitchdl.models import Clip, ClipsPage, ClipGenerator, GameID
+from typing import Dict, Optional
class GQLError(Exception):
@@ -82,8 +83,8 @@ CLIP_FIELDS = """
name
}
broadcaster {
- displayName
login
+ displayName
}
"""
@@ -103,7 +104,7 @@ def get_video(video_id):
return response["data"]["video"]
-def get_clip(slug):
+def get_clip(slug: str) -> Clip:
query = """
{{
clip(slug: "{}") {{
@@ -113,7 +114,7 @@ def get_clip(slug):
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
- return response["data"]["clip"]
+ return Clip.from_json(response["data"]["clip"])
def get_clip_access_token(slug):
@@ -136,7 +137,7 @@ def get_clip_access_token(slug):
return response["data"]["clip"]
-def get_channel_clips(channel_id, period, limit, after=None):
+def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
"""
List channel clips.
@@ -178,44 +179,41 @@ def get_channel_clips(channel_id, period, limit, after=None):
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
- return response["data"]["user"]["clips"]
+ return ClipsPage.from_json(response["data"]["user"]["clips"])
-def channel_clips_generator(channel_id, period, limit):
- def _generator(clips, limit):
- for clip in clips["edges"]:
+def channel_clips_generator(channel_id: str, period, limit: int) -> ClipGenerator:
+ def _generator(page: ClipsPage, limit: int):
+ for clip in page.clips:
if limit < 1:
return
- yield clip["node"]
+ yield clip
limit -= 1
- has_next = clips["pageInfo"]["hasNextPage"]
- if limit < 1 or not has_next:
+ if limit < 1 or not page.has_next_page:
return
req_limit = min(limit, 100)
- cursor = clips["edges"][-1]["cursor"]
- clips = get_channel_clips(channel_id, period, req_limit, cursor)
- yield from _generator(clips, limit)
+ next_page = get_channel_clips(channel_id, period, req_limit, page.cursor)
+ yield from _generator(next_page, limit)
req_limit = min(limit, 100)
- clips = get_channel_clips(channel_id, period, req_limit)
- return _generator(clips, limit)
+ page = get_channel_clips(channel_id, period, req_limit)
+ return _generator(page, limit)
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
- clips = get_channel_clips(
- channel_id, period, limit, after=cursor)
+ page = get_channel_clips(channel_id, period, limit, after=cursor)
- if not clips["edges"]:
+ if not page.clips:
break
- has_next = clips["pageInfo"]["hasNextPage"]
- cursor = clips["edges"][-1]["cursor"] if has_next else None
+ has_next = page.has_next_page
+ cursor = page.cursor if has_next else None
- yield clips, has_next
+ yield page.clips, has_next
if not cursor:
break