Compare commits

..

11 Commits
2.0.0 ... types

Author SHA1 Message Date
c0a7ea1f27 wip 2022-08-27 15:26:54 +02:00
afe38b84cd Extract Game dataclass 2022-08-27 12:51:28 +02:00
aed0b993a7 Add dataclasses for clips 2022-08-27 12:01:23 +02:00
599b7783d0 Some more types 2022-08-27 11:59:50 +02:00
9cc7c05d8a Expand tests 2022-08-27 11:58:19 +02:00
98d2ce0bc7 Add dependency on pytest-cov 2022-08-27 11:57:57 +02:00
662ce72195 Accept newer versions of m3u8 lib 2022-08-20 13:25:00 +02:00
a4b2434735 Start adding types 2022-08-20 13:25:00 +02:00
280a284fb2 Expand clips tests 2022-08-20 11:16:47 +02:00
235b13c257 Remove unused function 2022-08-20 11:10:47 +02:00
8e3a41e415 Expand tests 2022-08-19 09:35:56 +02:00
15 changed files with 383 additions and 207 deletions

View File

@ -1,4 +1,5 @@
pytest
pytest-cov
twine
wheel
pyyaml

View File

@ -31,7 +31,7 @@ setup(
packages=find_packages(),
python_requires=">=3.7",
install_requires=[
"m3u8>=1.0.0,<2.0.0",
"m3u8>=1.0.0,<4.0.0",
"httpx>=0.17.0,<1.0.0",
],
entry_points={

View File

@ -2,29 +2,57 @@
These tests depend on the channel having some videos and clips published.
"""
import httpx
import m3u8
from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
from twitchdl.models import Game, VideosPage
TEST_CHANNEL = "bananasaurus_rex"
def test_get_videos():
videos = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert videos["pageInfo"]
assert len(videos["edges"]) > 0
page = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert isinstance(page, VideosPage)
assert len(page.videos) > 0
video_id = videos["edges"][0]["node"]["id"]
video_id = page.videos[0].id
video = twitch.get_video(video_id)
assert video["id"] == video_id
assert video and video.id == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
assert "value" in access_token
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
_, _, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")
playlist = m3u8.loads(playlist)
vod_path = playlist.segments[0].uri
assert vod_path == "0.ts"
def test_get_clips():
"""
This test depends on the channel having some videos published.
"""
clips = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
assert clips["pageInfo"]
assert len(clips["edges"]) > 0
page = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
assert len(page.clips) > 0
clip_slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(clip_slug)
assert clip["slug"] == clip_slug
slug = page.clips[0].slug
clip = twitch.get_clip(slug)
assert clip.slug == slug
assert get_clip_authenticated_url(slug, "source").startswith("https")
def test_get_game():
game = twitch.find_game("The Witness")
assert isinstance(game, Game)
assert game.id == "17324"
assert game.name == "The Witness"
assert game.description
game = twitch.find_game("Does Not Exist Hopefully")
assert game is None

5
tests/test_twitch.py Normal file
View File

@ -0,0 +1,5 @@
from twitchdl.twitch import channel_clips_generator
# def test_clips_generator():
# channel_clips_generator("foo", "bar", 100)

View File

@ -7,6 +7,7 @@ from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.models import Clip, ClipGenerator
from twitchdl.output import print_out, print_clip, print_json
@ -17,13 +18,12 @@ def clips(args):
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
if args.json:
return print_json(list(generator))
return print_json([c.raw for c in generator])
if args.download:
return _download_clips(generator)
if args.pager:
print(args)
return _print_paged(generator, args.pager)
return _print_all(generator, args)
@ -40,38 +40,41 @@ def _continue():
return True
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"]
def _target_filename(clip: Clip):
url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip.created_at)
if not match:
raise ValueError(f"Invalid date: {clip.created_at}")
date = "".join(match.groups())
name = "_".join([
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
clip.id,
clip.broadcaster.login,
utils.slugify(clip.title),
])
return "{}.{}".format(name, ext)
def _download_clips(generator):
for clip in generator:
def _download_clips(clips: ClipGenerator):
for clip in clips:
target = _target_filename(clip)
if path.exists(target):
print_out("Already downloaded: <green>{}</green>".format(target))
else:
url = get_clip_authenticated_url(clip["slug"], "source")
url = get_clip_authenticated_url(clip.slug, "source")
print_out("Downloading: <yellow>{}</yellow>".format(target))
download_file(url, target)
def _print_all(generator, args):
for clip in generator:
def _print_all(clips: ClipGenerator, args):
for clip in clips:
print_out()
print_clip(clip)
@ -82,8 +85,8 @@ def _print_all(generator, args):
)
def _print_paged(generator, page_size):
iterator = iter(generator)
def _print_paged(clips: ClipGenerator, page_size: int):
iterator = iter(clips)
page = list(islice(iterator, page_size))
first = 1

View File

@ -9,13 +9,14 @@ import tempfile
from os import path
from pathlib import Path
from typing import OrderedDict
from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode
from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.models import Clip, Video
from twitchdl.output import print_out
@ -60,13 +61,13 @@ def _select_playlist_interactive(playlists):
return uri
def _join_vods(playlist_path, target, overwrite, video):
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
@ -82,22 +83,22 @@ def _join_vods(playlist_path, target, overwrite, video):
raise ConsoleError("Joining files failed")
def _video_target_filename(video, args):
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def _video_target_filename(video: Video, args) -> str:
date, time = video.published_at.split("T")
game = video.game.name if video.game else "Unknown"
subs = {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"channel": video.creator.display_name,
"channel_login": video.creator.login,
"date": date,
"datetime": video["publishedAt"],
"datetime": video.published_at,
"format": args.format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"id": video.id,
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
}
try:
@ -107,27 +108,27 @@ def _video_target_filename(video, args):
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _clip_target_filename(clip, args):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
def _clip_target_filename(clip: Clip, args) -> str:
date, time = clip.created_at.split("T")
game = clip.game.name if clip.game else "Unknown"
url = clip["videoQualities"][0]["sourceURL"]
url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"channel": clip.broadcaster.display_name,
"channel_login": clip.broadcaster.login,
"date": date,
"datetime": clip["createdAt"],
"datetime": clip.created_at,
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"id": clip.id,
"slug": clip.slug,
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
"title": utils.titlify(clip.title),
"title_slug": utils.slugify(clip.title),
}
try:
@ -137,7 +138,7 @@ def _clip_target_filename(clip, args):
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _get_vod_paths(playlist, start, end):
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
"""Extract unique VOD paths for download from playlist."""
files = []
vod_start = 0
@ -157,7 +158,7 @@ def _get_vod_paths(playlist, start, end):
return files
def _crete_temp_dir(base_uri):
def _crete_temp_dir(base_uri: str) -> str:
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
temp_dir = Path(tempfile.gettempdir(), "twitch-dl", path)
@ -166,11 +167,11 @@ def _crete_temp_dir(base_uri):
def download(args):
for video in args.videos:
download_one(video, args)
for video_id in args.videos:
download_one(video_id, args)
def download_one(video, args):
def download_one(video: str, args):
video_id = utils.parse_video_identifier(video)
if video_id:
return _download_video(video_id, args)
@ -182,7 +183,7 @@ def download_one(video, args):
raise ConsoleError("Invalid input: {}".format(video))
def _get_clip_url(clip, quality):
def _get_clip_url(clip, quality) -> str:
qualities = clip["videoQualities"]
# Quality given as an argument
@ -210,7 +211,7 @@ def _get_clip_url(clip, quality):
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug, quality):
def get_clip_authenticated_url(slug: str, quality: str) -> str:
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_clip_access_token(slug)
@ -227,19 +228,19 @@ def get_clip_authenticated_url(slug, quality):
return "{}?{}".format(url, query)
def _download_clip(slug, args):
def _download_clip(slug: str, args):
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip["game"]["name"] if clip["game"] else "Unknown"
game = clip.game.name if clip.game else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip["title"],
clip["broadcaster"]["displayName"],
clip.title,
clip.broadcaster.display_name,
game,
utils.format_duration(clip["durationSeconds"])
utils.format_duration(clip.duration_seconds)
))
target = _clip_target_filename(clip, args)
@ -260,7 +261,7 @@ def _download_clip(slug, args):
print_out("Downloaded: <blue>{}</blue>".format(target))
def _download_video(video_id, args):
def _download_video(video_id, args) -> None:
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
@ -270,8 +271,8 @@ def _download_video(video_id, args):
if not video:
raise ConsoleError("Video {} not found".format(video_id))
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
video['title'], video['creator']['displayName']))
creator = f" by <yellow>{video.creator.display_name}" if video.creator else ""
print_out(f"Found: <blue>{video.title}</blue>{creator}")
target = _video_target_filename(video, args)
print_out("Output: <blue>{}</blue>".format(target))

View File

@ -2,6 +2,7 @@ import m3u8
from twitchdl import utils, twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, Video
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
@ -35,7 +36,7 @@ def info(args):
raise ConsoleError("Clip {} not found".format(clip_slug))
if args.json:
print_json(clip)
print_json(clip.raw)
else:
clip_info(clip)
return
@ -43,7 +44,7 @@ def info(args):
raise ConsoleError("Invalid input: {}".format(args.video))
def video_info(video, playlists):
def video_info(video: Video, playlists):
print_out()
print_video(video)
@ -53,10 +54,11 @@ def video_info(video, playlists):
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
def video_json(video, playlists):
def video_json(video: Video, playlists):
playlists = m3u8.loads(playlists).playlists
json = video.raw
video["playlists"] = [
json["playlists"] = [
{
"bandwidth": p.stream_info.bandwidth,
"resolution": p.stream_info.resolution,
@ -66,14 +68,14 @@ def video_json(video, playlists):
} for p in playlists
]
print_json(video)
print_json(json)
def clip_info(clip):
def clip_info(clip: Clip):
print_out()
print_clip(clip)
print_out()
print_out("Download links:")
for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
for q in clip.video_qualities:
print_out(f"<b>{q.quality}p{q.frame_rate}</b> {q.source_url}")

View File

@ -22,7 +22,7 @@ def videos(args):
print_json({
"count": len(videos),
"totalCount": total_count,
"videos": videos
"videos": [v.raw for v in videos]
})
return
@ -60,10 +60,10 @@ def _get_game_ids(names):
game_ids = []
for name in names:
print_out("<dim>Looking up game '{}'...</dim>".format(name))
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError("Game '{}' not found".format(name))
game_ids.append(int(game_id))
print_out(f"<dim>Looking up game '{name}'...</dim>")
game = twitch.find_game(name)
if not game:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game.id))
return game_ids

View File

@ -5,7 +5,7 @@ import sys
import re
from argparse import ArgumentParser, ArgumentTypeError
from collections import namedtuple
from typing import NamedTuple, List, Tuple, Any, Dict
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_err
@ -13,12 +13,19 @@ from twitchdl.twitch import GQLError
from . import commands, __version__
Command = namedtuple("Command", ["name", "description", "arguments"])
Argument = Tuple[List[str], Dict[str, Any]]
class Command(NamedTuple):
name: str
description: str
arguments: List[Argument]
CLIENT_WEBSITE = 'https://github.com/ihabunek/twitch-dl'
def time(value):
def time(value: str) -> int:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
parts = [int(p) for p in value.split(":")]
@ -35,19 +42,19 @@ def time(value):
return hours * 3600 + minutes * 60 + seconds
def pos_integer(value):
def pos_integer(value: str) -> int:
try:
value = int(value)
parsed = int(value)
except ValueError:
raise ArgumentTypeError("must be an integer")
if value < 1:
if parsed < 1:
raise ArgumentTypeError("must be positive")
return value
return parsed
def rate(value):
def rate(value: str) -> int:
match = re.search(r"^([0-9]+)(k|m|)$", value, flags=re.IGNORECASE)
if not match:

View File

@ -10,7 +10,7 @@ class DownloadFailed(Exception):
pass
def _download(url, path):
def _download(url: str, path: str):
tmp_path = path + ".tmp"
size = 0
with httpx.stream("GET", url, timeout=CONNECT_TIMEOUT) as response:
@ -23,7 +23,7 @@ def _download(url, path):
return size
def download_file(url, path, retries=RETRY_COUNT):
def download_file(url: str, path: str, retries: int = RETRY_COUNT):
if os.path.exists(path):
from_disk = True
return (os.path.getsize(path), from_disk)

View File

@ -55,7 +55,7 @@ class TokenBucket:
class EndlessTokenBucket:
"""Used when download speed is not limited."""
def advance(self, size):
def advance(self, size: int):
pass

141
twitchdl/models.py Normal file
View File

@ -0,0 +1,141 @@
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass
Json = Dict[str, Any]
GameID = str
@dataclass(frozen=True)
class Broadcaster():
login: str
display_name: str
@staticmethod
def from_json(data: Json) -> "Broadcaster":
return Broadcaster(data["login"], data["displayName"])
@dataclass(frozen=True)
class VideoQuality():
frame_rate: int
quality: str
source_url: str
@staticmethod
def from_json(data: Json) -> "VideoQuality":
return VideoQuality(data["frameRate"], data["quality"], data["sourceURL"])
@dataclass(frozen=True)
class Game():
id: str
name: str
description: str
@staticmethod
def from_json(data: Json) -> "Game":
return Game(data["id"], data["name"], data["description"])
@dataclass(frozen=True)
class Clip():
id: str
slug: str
title: str
created_at: str
view_count: int
duration_seconds: int
url: str
game: Optional[Game]
broadcaster: Broadcaster
video_qualities: List[VideoQuality]
raw: Json
@staticmethod
def from_json(data: Json) -> "Clip":
game = Game.from_json(data["game"]) if data["game"] else None
broadcaster = Broadcaster.from_json(data["broadcaster"])
video_qualities = [VideoQuality.from_json(q) for q in data["videoQualities"]]
return Clip(
data["id"],
data["slug"],
data["title"],
data["createdAt"],
data["viewCount"],
data["durationSeconds"],
data["url"],
game,
broadcaster,
video_qualities,
data
)
@dataclass(frozen=True)
class ClipsPage():
cursor: str
has_next_page: bool
has_previous_page: bool
clips: List[Clip]
@staticmethod
def from_json(data: Json) -> "ClipsPage":
return ClipsPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"]["hasPreviousPage"],
[Clip.from_json(c["node"]) for c in data["edges"]]
)
@dataclass(frozen=True)
class Video():
id: str
title: str
published_at: str
broadcast_type: str
length_seconds: int
game: Optional[Game]
creator: Broadcaster
raw: Json
@staticmethod
def from_json(data: Json) -> "Video":
game = Game.from_json(data["game"]) if data["game"] else None
creator = Broadcaster.from_json(data["creator"])
return Video(
data["id"],
data["title"],
data["publishedAt"],
data["broadcastType"],
data["lengthSeconds"],
game,
creator,
data
)
@dataclass(frozen=True)
class VideosPage():
cursor: str
has_next_page: bool
has_previous_page: bool
total_count: int
videos: List[Video]
@staticmethod
def from_json(data: Json) -> "VideosPage":
return VideosPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"].get("hasPreviousPage"),
data["totalCount"],
[Video.from_json(c["node"]) for c in data["edges"]]
)
ClipGenerator = Generator[Clip, None, None]
VideoGenerator = Generator[Video, None, None]

View File

@ -6,6 +6,8 @@ import re
from itertools import islice
from twitchdl import utils
from twitchdl.models import Clip, Video
from typing import Any, Match
START_CODES = {
@ -29,26 +31,26 @@ END_PATTERN = "</(" + "|".join(START_CODES.keys()) + ")>"
USE_ANSI_COLOR = "--no-color" not in sys.argv
def start_code(match):
def start_code(match: Match[str]) -> str:
name = match.group(1)
return START_CODES[name]
def colorize(text):
def colorize(text: str) -> str:
text = re.sub(START_PATTERN, start_code, text)
text = re.sub(END_PATTERN, END_CODE, text)
return text
def strip_tags(text):
def strip_tags(text: str) -> str:
text = re.sub(START_PATTERN, '', text)
text = re.sub(END_PATTERN, '', text)
return text
def truncate(string, length):
def truncate(string: str, length: int) -> str:
if len(string) > length:
return string[:length - 1] + ""
@ -60,7 +62,7 @@ def print_out(*args, **kwargs):
print(*args, **kwargs)
def print_json(data):
def print_json(data: Any):
print(json.dumps(data))
@ -76,32 +78,31 @@ def print_log(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
def print_video(video: Video):
published_at = video.published_at.replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video.length_seconds)
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
channel = f"<blue>{video.creator.display_name}</blue>" if video.creator else ""
playing = f"playing <blue>{video.game.name}</blue>" if video.game else ""
# Can't find URL in video object, strange
url = "https://www.twitch.tv/videos/{}".format(video["id"])
url = f"https://www.twitch.tv/videos/{video.id}"
print_out("<b>Video {}</b>".format(video["id"]))
print_out("<green>{}</green>".format(video["title"]))
print_out(f"<b>Video {video.id}</b>")
print_out(f"<green>{video.title}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
print_out("<i>{}</i>".format(url))
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue>")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
date = video.published_at[:10]
game = video.game.name if video.game else ""
title = truncate(video.title, 80).ljust(80)
print_out(f"<b>{video.id}</b> {date} <green>{title}</green> <blue>{game}</blue>")
def print_paged_videos(generator, page_size, total_count):
@ -132,23 +133,23 @@ def print_paged_videos(generator, page_size, total_count):
break
def print_clip(clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
def print_clip(clip: Clip):
published_at = clip.created_at.replace("T", " @ ").replace("Z", "")
length = utils.format_time(clip.duration_seconds)
channel = clip.broadcaster.display_name
playing = (
"playing <blue>{}</blue>".format(clip["game"]["name"])
if clip["game"] else ""
"playing <blue>{}</blue>".format(clip.game.name)
if clip.game else ""
)
print_out("Clip <b>{}</b>".format(clip["slug"]))
print_out("<green>{}</green>".format(clip["title"]))
print_out("Clip <b>{}</b>".format(clip.slug))
print_out("<green>{}</green>".format(clip.title))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(
"Published <blue>{}</blue>"
" Length: <blue>{}</blue>"
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
print_out("<i>{}</i>".format(clip["url"]))
f"Published: <blue>{published_at}</blue>"
f" Length: <blue>{length}</blue>"
f" Views: <blue>{clip.view_count}</blue>")
print_out(f"<i>{clip.url}</i>")
def _continue():

View File

@ -6,6 +6,8 @@ import httpx
from twitchdl import CLIENT_ID
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game, Video, VideoGenerator, VideosPage
from typing import Dict, Optional, Tuple
class GQLError(Exception):
@ -14,21 +16,6 @@ class GQLError(Exception):
self.errors = errors
def authenticated_get(url, params={}, headers={}):
headers['Client-ID'] = CLIENT_ID
response = httpx.get(url, params=params, headers=headers)
if 400 <= response.status_code < 500:
data = response.json()
# TODO: this does not look nice in the console since data["message"]
# can contain a JSON encoded object.
raise ConsoleError(data["message"])
response.raise_for_status()
return response
def authenticated_post(url, data=None, json=None, headers={}):
headers['Client-ID'] = CLIENT_ID
@ -52,7 +39,7 @@ def gql_post(query):
return response
def gql_query(query, headers={}):
def gql_query(query: str, headers: Dict[str, str] = {}):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, headers=headers).json()
@ -62,23 +49,29 @@ def gql_query(query, headers={}):
return response
VIDEO_FIELDS = """
GAME_FIELDS = """
id
name
description
"""
VIDEO_FIELDS = f"""
id
title
publishedAt
broadcastType
lengthSeconds
game {
name
}
creator {
game {{
{GAME_FIELDS}
}}
creator {{
login
displayName
}
}}
"""
CLIP_FIELDS = """
CLIP_FIELDS = f"""
id
slug
title
@ -86,23 +79,22 @@ CLIP_FIELDS = """
viewCount
durationSeconds
url
videoQualities {
videoQualities {{
frameRate
quality
sourceURL
}
game {
id
name
}
broadcaster {
displayName
}}
game {{
{GAME_FIELDS}
}}
broadcaster {{
login
}
displayName
}}
"""
def get_video(video_id):
def get_video(video_id: str) -> Optional[Video]:
query = """
{{
video(id: "{video_id}") {{
@ -114,10 +106,11 @@ def get_video(video_id):
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
return response["data"]["video"]
if response["data"]["video"]:
return Video.from_json(response["data"]["video"])
def get_clip(slug):
def get_clip(slug: str) -> Clip:
query = """
{{
clip(slug: "{}") {{
@ -127,7 +120,7 @@ def get_clip(slug):
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
return response["data"]["clip"]
return Clip.from_json(response["data"]["clip"])
def get_clip_access_token(slug):
@ -150,7 +143,7 @@ def get_clip_access_token(slug):
return response["data"]["clip"]
def get_channel_clips(channel_id, period, limit, after=None):
def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
"""
List channel clips.
@ -192,50 +185,47 @@ def get_channel_clips(channel_id, period, limit, after=None):
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["clips"]
return ClipsPage.from_json(response["data"]["user"]["clips"])
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
for clip in clips["edges"]:
def channel_clips_generator(channel_id: str, period, limit: int) -> ClipGenerator:
def _generator(page: ClipsPage, limit: int):
for clip in page.clips:
if limit < 1:
return
yield clip["node"]
yield clip
limit -= 1
has_next = clips["pageInfo"]["hasNextPage"]
if limit < 1 or not has_next:
if limit < 1 or not page.has_next_page:
return
req_limit = min(limit, 100)
cursor = clips["edges"][-1]["cursor"]
clips = get_channel_clips(channel_id, period, req_limit, cursor)
yield from _generator(clips, limit)
next_page = get_channel_clips(channel_id, period, req_limit, page.cursor)
yield from _generator(next_page, limit)
req_limit = min(limit, 100)
clips = get_channel_clips(channel_id, period, req_limit)
return _generator(clips, limit)
page = get_channel_clips(channel_id, period, req_limit)
return _generator(page, limit)
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
clips = get_channel_clips(
channel_id, period, limit, after=cursor)
page = get_channel_clips(channel_id, period, limit, after=cursor)
if not clips["edges"]:
if not page.clips:
break
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
has_next = page.has_next_page
cursor = page.cursor if has_next else None
yield clips, has_next
yield page.clips, has_next
if not cursor:
break
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None) -> VideosPage:
query = """
{{
user(login: "{channel_id}") {{
@ -278,29 +268,27 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
if not response["data"]["user"]:
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["videos"]
return VideosPage.from_json(response["data"]["user"]["videos"])
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None):
def _generator(videos, max_videos):
for video in videos["edges"]:
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None) -> Tuple[int, VideoGenerator]:
def _generator(page, max_videos):
for video in page.videos:
if max_videos < 1:
return
yield video["node"]
yield video
max_videos -= 1
has_next = videos["pageInfo"]["hasNextPage"]
if max_videos < 1 or not has_next:
if max_videos < 1 or not page.has_next_page:
return
limit = min(max_videos, 100)
cursor = videos["edges"][-1]["cursor"]
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, cursor)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, page.cursor)
yield from _generator(videos, max_videos)
limit = min(max_videos, 100)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids)
return videos["totalCount"], _generator(videos, max_videos)
page = get_channel_videos(channel_id, limit, sort, type, game_ids)
return page.total_count, _generator(page, max_videos)
def get_access_token(video_id, auth_token=None):
@ -361,16 +349,15 @@ def get_playlists(video_id, access_token):
return response.content.decode('utf-8')
def get_game_id(name):
query = """
def find_game(name: str) -> Optional[Game]:
query = f"""
{{
game(name: "{}") {{
id
game(name: "{name.strip()}") {{
{GAME_FIELDS}
}}
}}
"""
response = gql_query(query.format(name.strip()))
game = response["data"]["game"]
if game:
return game["id"]
response = gql_query(query)
if response["data"]["game"]:
return Game.from_json(response["data"]["game"])

View File

@ -24,7 +24,7 @@ def format_size(bytes_, digits=1):
return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds):
def format_duration(total_seconds: int) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -40,7 +40,7 @@ def format_duration(total_seconds):
return "{} sec".format(seconds)
def format_time(total_seconds):
def format_time(total_seconds: int) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -67,14 +67,14 @@ def read_int(msg, min, max, default):
pass
def slugify(value):
def slugify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s_-]', '', value)
value = re.sub(r'[\s_-]+', '_', value)
return value.strip("_").lower()
def titlify(value):
def titlify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value)
value = re.sub(r'\s+', ' ', value)
@ -93,7 +93,7 @@ CLIP_PATTERNS = [
]
def parse_video_identifier(identifier):
def parse_video_identifier(identifier: str) -> str:
"""Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier)
@ -101,7 +101,7 @@ def parse_video_identifier(identifier):
return match.group("id")
def parse_clip_identifier(identifier):
def parse_clip_identifier(identifier: str) -> str:
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier)