Add dataclasses for clips

This commit is contained in:
Ivan Habunek 2022-08-27 12:01:23 +02:00
parent 599b7783d0
commit aed0b993a7
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
6 changed files with 165 additions and 69 deletions

View File

@ -7,6 +7,7 @@ from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.models import Clip, ClipGenerator
from twitchdl.output import print_out, print_clip, print_json
@ -17,13 +18,12 @@ def clips(args):
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
if args.json:
return print_json(list(generator))
return print_json([c.raw for c in generator])
if args.download:
return _download_clips(generator)
if args.pager:
print(args)
return _print_paged(generator, args.pager)
return _print_all(generator, args)
@ -40,38 +40,41 @@ def _continue():
return True
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"]
def _target_filename(clip: Clip):
url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip.created_at)
if not match:
raise ValueError(f"Invalid date: {clip.created_at}")
date = "".join(match.groups())
name = "_".join([
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
clip.id,
clip.broadcaster.login,
utils.slugify(clip.title),
])
return "{}.{}".format(name, ext)
def _download_clips(generator):
for clip in generator:
def _download_clips(clips: ClipGenerator):
for clip in clips:
target = _target_filename(clip)
if path.exists(target):
print_out("Already downloaded: <green>{}</green>".format(target))
else:
url = get_clip_authenticated_url(clip["slug"], "source")
url = get_clip_authenticated_url(clip.slug, "source")
print_out("Downloading: <yellow>{}</yellow>".format(target))
download_file(url, target)
def _print_all(generator, args):
for clip in generator:
def _print_all(clips: ClipGenerator, args):
for clip in clips:
print_out()
print_clip(clip)
@ -82,8 +85,8 @@ def _print_all(generator, args):
)
def _print_paged(generator, page_size):
iterator = iter(generator)
def _print_paged(clips: ClipGenerator, page_size: int):
iterator = iter(clips)
page = list(islice(iterator, page_size))
first = 1

View File

@ -16,6 +16,7 @@ from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.models import Clip
from twitchdl.output import print_out
@ -107,27 +108,27 @@ def _video_target_filename(video, args):
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _clip_target_filename(clip, args):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
def _clip_target_filename(clip: Clip, args) -> str:
date, time = clip.created_at.split("T")
game = clip.game.name if clip.game else "Unknown"
url = clip["videoQualities"][0]["sourceURL"]
url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"channel": clip.broadcaster.display_name,
"channel_login": clip.broadcaster.login,
"date": date,
"datetime": clip["createdAt"],
"datetime": clip.created_at,
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"id": clip.id,
"slug": clip.slug,
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
"title": utils.titlify(clip.title),
"title_slug": utils.slugify(clip.title),
}
try:
@ -230,16 +231,16 @@ def get_clip_authenticated_url(slug, quality):
def _download_clip(slug: str, args) -> None:
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip["game"]["name"] if clip["game"] else "Unknown"
game = clip.game.name if clip.game else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip["title"],
clip["broadcaster"]["displayName"],
clip.title,
clip.broadcaster.display_name,
game,
utils.format_duration(clip["durationSeconds"])
utils.format_duration(clip.duration_seconds)
))
target = _clip_target_filename(clip, args)

View File

@ -2,6 +2,7 @@ import m3u8
from twitchdl import utils, twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
@ -35,7 +36,7 @@ def info(args):
raise ConsoleError("Clip {} not found".format(clip_slug))
if args.json:
print_json(clip)
print_json(clip.raw)
else:
clip_info(clip)
return
@ -69,11 +70,11 @@ def video_json(video, playlists):
print_json(video)
def clip_info(clip):
def clip_info(clip: Clip):
print_out()
print_clip(clip)
print_out()
print_out("Download links:")
for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
for q in clip.video_qualities:
print_out(f"<b>{q.quality}p{q.frame_rate}</b> {q.source_url}")

92
twitchdl/models.py Normal file
View File

@ -0,0 +1,92 @@
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass
Json = Dict[str, Any]
GameID = str
@dataclass(frozen=True)
class Broadcaster():
login: str
display_name: str
@staticmethod
def from_json(data: Json) -> "Broadcaster":
return Broadcaster(data["login"], data["displayName"])
@dataclass(frozen=True)
class VideoQuality():
frame_rate: int
quality: str
source_url: str
@staticmethod
def from_json(data: Json) -> "VideoQuality":
return VideoQuality(data["frameRate"], data["quality"], data["sourceURL"])
@dataclass(frozen=True)
class Game():
id: str
name: str
@staticmethod
def from_json(data: Json) -> "Game":
return Game(data["id"], data["name"])
@dataclass(frozen=True)
class Clip():
id: str
slug: str
title: str
created_at: str
view_count: int
duration_seconds: int
url: str
game: Optional[Game]
broadcaster: Broadcaster
video_qualities: List[VideoQuality]
raw: Json
@staticmethod
def from_json(data: Json) -> "Clip":
game = Game.from_json(data["game"]) if data["game"] else None
broadcaster = Broadcaster.from_json(data["broadcaster"])
video_qualities = [VideoQuality.from_json(q) for q in data["videoQualities"]]
return Clip(
data["id"],
data["slug"],
data["title"],
data["createdAt"],
data["viewCount"],
data["durationSeconds"],
data["url"],
game,
broadcaster,
video_qualities,
data
)
@dataclass(frozen=True)
class ClipsPage():
cursor: str
has_next_page: bool
has_previous_page: bool
clips: List[Clip]
@staticmethod
def from_json(data: Json) -> "ClipsPage":
return ClipsPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"]["hasPreviousPage"],
[Clip.from_json(c["node"]) for c in data["edges"]]
)
ClipGenerator = Generator[Clip, None, None]

View File

@ -6,6 +6,7 @@ import re
from itertools import islice
from twitchdl import utils
from twitchdl.models import Clip
from typing import Any, Match
@ -133,23 +134,23 @@ def print_paged_videos(generator, page_size, total_count):
break
def print_clip(clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
def print_clip(clip: Clip):
published_at = clip.created_at.replace("T", " @ ").replace("Z", "")
length = utils.format_time(clip.duration_seconds)
channel = clip.broadcaster.display_name
playing = (
"playing <blue>{}</blue>".format(clip["game"]["name"])
if clip["game"] else ""
"playing <blue>{}</blue>".format(clip.game.name)
if clip.game else ""
)
print_out("Clip <b>{}</b>".format(clip["slug"]))
print_out("<green>{}</green>".format(clip["title"]))
print_out("Clip <b>{}</b>".format(clip.slug))
print_out("<green>{}</green>".format(clip.title))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(
"Published <blue>{}</blue>"
" Length: <blue>{}</blue>"
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
print_out("<i>{}</i>".format(clip["url"]))
f"Published: <blue>{published_at}</blue>"
f" Length: <blue>{length}</blue>"
f" Views: <blue>{clip.view_count}</blue>")
print_out(f"<i>{clip.url}</i>")
def _continue():

View File

@ -4,9 +4,10 @@ Twitch API access.
import httpx
from typing import Dict
from twitchdl import CLIENT_ID
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, ClipsPage, ClipGenerator, GameID
from typing import Dict, Optional
class GQLError(Exception):
@ -82,8 +83,8 @@ CLIP_FIELDS = """
name
}
broadcaster {
displayName
login
displayName
}
"""
@ -103,7 +104,7 @@ def get_video(video_id):
return response["data"]["video"]
def get_clip(slug):
def get_clip(slug: str) -> Clip:
query = """
{{
clip(slug: "{}") {{
@ -113,7 +114,7 @@ def get_clip(slug):
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
return response["data"]["clip"]
return Clip.from_json(response["data"]["clip"])
def get_clip_access_token(slug):
@ -136,7 +137,7 @@ def get_clip_access_token(slug):
return response["data"]["clip"]
def get_channel_clips(channel_id, period, limit, after=None):
def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
"""
List channel clips.
@ -178,44 +179,41 @@ def get_channel_clips(channel_id, period, limit, after=None):
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["clips"]
return ClipsPage.from_json(response["data"]["user"]["clips"])
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
for clip in clips["edges"]:
def channel_clips_generator(channel_id: str, period, limit: int) -> ClipGenerator:
def _generator(page: ClipsPage, limit: int):
for clip in page.clips:
if limit < 1:
return
yield clip["node"]
yield clip
limit -= 1
has_next = clips["pageInfo"]["hasNextPage"]
if limit < 1 or not has_next:
if limit < 1 or not page.has_next_page:
return
req_limit = min(limit, 100)
cursor = clips["edges"][-1]["cursor"]
clips = get_channel_clips(channel_id, period, req_limit, cursor)
yield from _generator(clips, limit)
next_page = get_channel_clips(channel_id, period, req_limit, page.cursor)
yield from _generator(next_page, limit)
req_limit = min(limit, 100)
clips = get_channel_clips(channel_id, period, req_limit)
return _generator(clips, limit)
page = get_channel_clips(channel_id, period, req_limit)
return _generator(page, limit)
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
clips = get_channel_clips(
channel_id, period, limit, after=cursor)
page = get_channel_clips(channel_id, period, limit, after=cursor)
if not clips["edges"]:
if not page.clips:
break
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
has_next = page.has_next_page
cursor = page.cursor if has_next else None
yield clips, has_next
yield page.clips, has_next
if not cursor:
break