Compare commits

..

6 Commits
2.1.2 ... types

Author SHA1 Message Date
c0a7ea1f27 wip 2022-08-27 15:26:54 +02:00
afe38b84cd Extract Game dataclass 2022-08-27 12:51:28 +02:00
aed0b993a7 Add dataclasses for clips 2022-08-27 12:01:23 +02:00
599b7783d0 Some more types 2022-08-27 11:59:50 +02:00
9cc7c05d8a Expand tests 2022-08-27 11:58:19 +02:00
98d2ce0bc7 Add dependency on pytest-cov 2022-08-27 11:57:57 +02:00
19 changed files with 350 additions and 345 deletions

View File

@ -3,25 +3,6 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.1.2 (2023-04-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.2)
* Fix error caused by twitch changing the Usher domain (thanks @adsa95)
### [2.1.1 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.1)
* Fix Python 3.7 compatibility (#117, thanks @eliduvid)
* Fix default value for game_ids (#102, thanks @FunnyPocketBook)
### [2.1.0 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.0)
* Add chapter list to `info` command
* Add `download --chapter` option for downloading a single chapter
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
* Fix an issue where a temp vod file would be renamed while still being open,
which caused an exception on Windows (#111)
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
This release switches from using `requests` to `httpx` for making http requests,

View File

@ -1,25 +1,3 @@
2.1.2:
date: 2023-04-18
changes:
- "Fix error caused by twitch changing the Usher domain (thanks @adsa95)"
2.1.1:
date: 2022-11-20
changes:
- "Fix Python 3.7 compatibility (#117, thanks @eliduvid)"
- "Fix default value for game_ids (#102, thanks @FunnyPocketBook)"
2.1.0:
date: 2022-11-20
changes:
- "Add chapter list to `info` command"
- "Add `download --chapter` option for downloading a single chapter"
2.0.1:
date: 2022-09-09
changes:
- "Fix an issue where a temp vod file would be renamed while still being open, which caused an exception on Windows (#111)"
2.0.0:
date: 2022-08-18
description: |

View File

@ -3,25 +3,6 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.1.2 (2023-04-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.2)
* Fix error caused by twitch changing the Usher domain (thanks @adsa95)
### [2.1.1 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.1)
* Fix Python 3.7 compatibility (#117, thanks @eliduvid)
* Fix default value for game_ids (#102, thanks @FunnyPocketBook)
### [2.1.0 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.0)
* Add chapter list to `info` command
* Add `download --chapter` option for downloading a single chapter
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
* Fix an issue where a temp vod file would be renamed while still being open,
which caused an exception on Windows (#111)
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
This release switches from using `requests` to `httpx` for making http requests,

View File

@ -84,11 +84,6 @@ twitch-dl download <videos> [FLAGS] [OPTIONS]
<td class="code">-r, --rate-limit</td>
<td>Limit the maximum download speed in bytes per second. Use &#x27;k&#x27; and &#x27;m&#x27; suffixes for kbps and mbps.</td>
</tr>
<tr>
<td class="code">-c, --chapter</td>
<td>Download a single chapter of the video. Specify the chapter number or use the flag without a number to display a chapter select prompt.</td>
</tr>
</tbody>
</table>

View File

@ -1,4 +1,5 @@
pytest
pytest-cov
twine
wheel
pyyaml

View File

@ -11,7 +11,7 @@ makes it faster.
setup(
name="twitch-dl",
version="2.1.2",
version="2.0.0",
description="Twitch downloader",
long_description=long_description.strip(),
author="Ivan Habunek",

View File

@ -6,18 +6,19 @@ import httpx
import m3u8
from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
from twitchdl.models import Game, VideosPage
TEST_CHANNEL = "bananasaurus_rex"
def test_get_videos():
videos = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert videos["pageInfo"]
assert len(videos["edges"]) > 0
page = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert isinstance(page, VideosPage)
assert len(page.videos) > 0
video_id = videos["edges"][0]["node"]["id"]
video_id = page.videos[0].id
video = twitch.get_video(video_id)
assert video["id"] == video_id
assert video and video.id == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
@ -26,7 +27,7 @@ def test_get_videos():
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
name, res, url = next(_parse_playlists(playlists))
_, _, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")
@ -36,15 +37,22 @@ def test_get_videos():
def test_get_clips():
"""
This test depends on the channel having some videos published.
"""
clips = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
assert clips["pageInfo"]
assert len(clips["edges"]) > 0
page = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
assert len(page.clips) > 0
slug = clips["edges"][0]["node"]["slug"]
slug = page.clips[0].slug
clip = twitch.get_clip(slug)
assert clip["slug"] == slug
assert clip.slug == slug
assert get_clip_authenticated_url(slug, "source")
assert get_clip_authenticated_url(slug, "source").startswith("https")
def test_get_game():
game = twitch.find_game("The Witness")
assert isinstance(game, Game)
assert game.id == "17324"
assert game.name == "The Witness"
assert game.description
game = twitch.find_game("Does Not Exist Hopefully")
assert game is None

5
tests/test_twitch.py Normal file
View File

@ -0,0 +1,5 @@
from twitchdl.twitch import channel_clips_generator
# def test_clips_generator():
# channel_clips_generator("foo", "bar", 100)

View File

@ -1,3 +1,3 @@
__version__ = "2.1.2"
__version__ = "2.0.0"
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"

View File

@ -7,6 +7,7 @@ from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.models import Clip, ClipGenerator
from twitchdl.output import print_out, print_clip, print_json
@ -17,13 +18,12 @@ def clips(args):
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
if args.json:
return print_json(list(generator))
return print_json([c.raw for c in generator])
if args.download:
return _download_clips(generator)
if args.pager:
print(args)
return _print_paged(generator, args.pager)
return _print_all(generator, args)
@ -40,38 +40,41 @@ def _continue():
return True
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"]
def _target_filename(clip: Clip):
url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip.created_at)
if not match:
raise ValueError(f"Invalid date: {clip.created_at}")
date = "".join(match.groups())
name = "_".join([
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
clip.id,
clip.broadcaster.login,
utils.slugify(clip.title),
])
return "{}.{}".format(name, ext)
def _download_clips(generator):
for clip in generator:
def _download_clips(clips: ClipGenerator):
for clip in clips:
target = _target_filename(clip)
if path.exists(target):
print_out("Already downloaded: <green>{}</green>".format(target))
else:
url = get_clip_authenticated_url(clip["slug"], "source")
url = get_clip_authenticated_url(clip.slug, "source")
print_out("Downloading: <yellow>{}</yellow>".format(target))
download_file(url, target)
def _print_all(generator, args):
for clip in generator:
def _print_all(clips: ClipGenerator, args):
for clip in clips:
print_out()
print_clip(clip)
@ -82,8 +85,8 @@ def _print_all(generator, args):
)
def _print_paged(generator, page_size):
iterator = iter(generator)
def _print_paged(clips: ClipGenerator, page_size: int):
iterator = iter(clips)
page = list(islice(iterator, page_size))
first = 1

View File

@ -16,6 +16,7 @@ from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.models import Clip, Video
from twitchdl.output import print_out
@ -51,22 +52,22 @@ def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
print_out("{}) <b>{}</b> <dim>({})</dim>".format(n + 1, name, resolution))
print_out("{}) {} [{}]".format(n + 1, name, resolution))
else:
print_out("{}) <b>{}</b>".format(n + 1, name))
print_out("{}) {}".format(n + 1, name))
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path, target, overwrite, video):
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
@ -82,22 +83,22 @@ def _join_vods(playlist_path, target, overwrite, video):
raise ConsoleError("Joining files failed")
def _video_target_filename(video, args):
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def _video_target_filename(video: Video, args) -> str:
date, time = video.published_at.split("T")
game = video.game.name if video.game else "Unknown"
subs = {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"channel": video.creator.display_name,
"channel_login": video.creator.login,
"date": date,
"datetime": video["publishedAt"],
"datetime": video.published_at,
"format": args.format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"id": video.id,
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
}
try:
@ -107,27 +108,27 @@ def _video_target_filename(video, args):
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _clip_target_filename(clip, args):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
def _clip_target_filename(clip: Clip, args) -> str:
date, time = clip.created_at.split("T")
game = clip.game.name if clip.game else "Unknown"
url = clip["videoQualities"][0]["sourceURL"]
url = clip.video_qualities[0].source_url
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"channel": clip.broadcaster.display_name,
"channel_login": clip.broadcaster.login,
"date": date,
"datetime": clip["createdAt"],
"datetime": clip.created_at,
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"id": clip.id,
"slug": clip.slug,
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
"title": utils.titlify(clip.title),
"title_slug": utils.slugify(clip.title),
}
try:
@ -182,7 +183,7 @@ def download_one(video: str, args):
raise ConsoleError("Invalid input: {}".format(video))
def _get_clip_url(clip, quality):
def _get_clip_url(clip, quality) -> str:
qualities = clip["videoQualities"]
# Quality given as an argument
@ -210,7 +211,7 @@ def _get_clip_url(clip, quality):
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug, quality):
def get_clip_authenticated_url(slug: str, quality: str) -> str:
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_clip_access_token(slug)
@ -227,19 +228,19 @@ def get_clip_authenticated_url(slug, quality):
return "{}?{}".format(url, query)
def _download_clip(slug: str, args) -> None:
def _download_clip(slug: str, args):
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip["game"]["name"] if clip["game"] else "Unknown"
game = clip.game.name if clip.game else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip["title"],
clip["broadcaster"]["displayName"],
clip.title,
clip.broadcaster.display_name,
game,
utils.format_duration(clip["durationSeconds"])
utils.format_duration(clip.duration_seconds)
))
target = _clip_target_filename(clip, args)
@ -270,8 +271,8 @@ def _download_video(video_id, args) -> None:
if not video:
raise ConsoleError("Video {} not found".format(video_id))
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
video['title'], video['creator']['displayName']))
creator = f" by <yellow>{video.creator.display_name}" if video.creator else ""
print_out(f"Found: <blue>{video.title}</blue>{creator}")
target = _video_target_filename(video, args)
print_out("Output: <blue>{}</blue>".format(target))
@ -282,9 +283,6 @@ def _download_video(video_id, args) -> None:
raise ConsoleError("Aborted")
args.overwrite = True
# Chapter select or manual offset
start, end = _determine_time_range(video_id, args)
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
@ -301,7 +299,7 @@ def _download_video(video_id, args) -> None:
base_uri = re.sub("/[^/]+$", "/", playlist_uri)
target_dir = _crete_temp_dir(base_uri)
vod_paths = _get_vod_paths(playlist, start, end)
vod_paths = _get_vod_paths(playlist, args.start, args.end)
# Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
@ -344,40 +342,3 @@ def _download_video(video_id, args) -> None:
shutil.rmtree(target_dir)
print_out("\nDownloaded: <green>{}</green>".format(target))
def _determine_time_range(video_id, args):
if args.start or args.end:
return args.start, args.end
if args.chapter is not None:
print_out("<dim>Fetching chapters...</dim>")
chapters = twitch.get_video_chapters(video_id)
if not chapters:
raise ConsoleError("This video has no chapters")
if args.chapter == 0:
chapter = _choose_chapter_interactive(chapters)
else:
try:
chapter = chapters[args.chapter - 1]
except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n')
start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000
return start, start + duration
return None, None
def _choose_chapter_interactive(chapters):
print_out("\nChapters:")
for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>')
index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1]
return chapter

View File

@ -2,6 +2,7 @@ import m3u8
from twitchdl import utils, twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, Video
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
@ -20,14 +21,12 @@ def info(args):
print_log("Fetching playlists...")
playlists = twitch.get_playlists(video_id, access_token)
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
if args.json:
video_json(video, playlists, chapters)
else:
video_info(video, playlists, chapters)
return
if video:
if args.json:
video_json(video, playlists)
else:
video_info(video, playlists)
return
clip_slug = utils.parse_clip_identifier(args.video)
if clip_slug:
@ -37,7 +36,7 @@ def info(args):
raise ConsoleError("Clip {} not found".format(clip_slug))
if args.json:
print_json(clip)
print_json(clip.raw)
else:
clip_info(clip)
return
@ -45,7 +44,7 @@ def info(args):
raise ConsoleError("Invalid input: {}".format(args.video))
def video_info(video, playlists, chapters):
def video_info(video: Video, playlists):
print_out()
print_video(video)
@ -54,19 +53,12 @@ def video_info(video, playlists, chapters):
for p in m3u8.loads(playlists).playlists:
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
if chapters:
print_out()
print_out("Chapters:")
for chapter in chapters:
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{start} <b>{chapter["description"]}</b> ({duration})')
def video_json(video, playlists, chapters):
def video_json(video: Video, playlists):
playlists = m3u8.loads(playlists).playlists
json = video.raw
video["playlists"] = [
json["playlists"] = [
{
"bandwidth": p.stream_info.bandwidth,
"resolution": p.stream_info.resolution,
@ -76,16 +68,14 @@ def video_json(video, playlists, chapters):
} for p in playlists
]
video["chapters"] = chapters
print_json(video)
print_json(json)
def clip_info(clip):
def clip_info(clip: Clip):
print_out()
print_clip(clip)
print_out()
print_out("Download links:")
for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
for q in clip.video_qualities:
print_out(f"<b>{q.quality}p{q.frame_rate}</b> {q.source_url}")

View File

@ -22,7 +22,7 @@ def videos(args):
print_json({
"count": len(videos),
"totalCount": total_count,
"videos": videos
"videos": [v.raw for v in videos]
})
return
@ -60,10 +60,10 @@ def _get_game_ids(names):
game_ids = []
for name in names:
print_out("<dim>Looking up game '{}'...</dim>".format(name))
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError("Game '{}' not found".format(name))
game_ids.append(int(game_id))
print_out(f"<dim>Looking up game '{name}'...</dim>")
game = twitch.find_game(name)
if not game:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game.id))
return game_ids

View File

@ -22,7 +22,7 @@ class Command(NamedTuple):
arguments: List[Argument]
CLIENT_WEBSITE = "https://twitch-dl.bezdomni.net/"
CLIENT_WEBSITE = 'https://github.com/ihabunek/twitch-dl'
def time(value: str) -> int:
@ -233,13 +233,6 @@ COMMANDS = [
"Use 'k' and 'm' suffixes for kbps and mbps.",
"type": rate,
}),
(["-c", "--chapter"], {
"help": "Download a single chapter of the video. Specify the chapter number or "
"use the flag without a number to display a chapter select prompt.",
"type": int,
"nargs": "?",
"const": 0
}),
],
),
Command(

View File

@ -83,7 +83,7 @@ async def download(
token_bucket.advance(size)
progress.advance(task_id, size)
progress.end(task_id)
os.rename(tmp_target, target)
os.rename(tmp_target, target)
async def download_with_retries(
@ -117,7 +117,7 @@ async def download_all(
sources: List[str],
targets: List[str],
workers: int,
*,
/, *,
rate_limit: Optional[int] = None
):
progress = Progress(len(sources))

141
twitchdl/models.py Normal file
View File

@ -0,0 +1,141 @@
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass
Json = Dict[str, Any]
GameID = str
@dataclass(frozen=True)
class Broadcaster():
login: str
display_name: str
@staticmethod
def from_json(data: Json) -> "Broadcaster":
return Broadcaster(data["login"], data["displayName"])
@dataclass(frozen=True)
class VideoQuality():
frame_rate: int
quality: str
source_url: str
@staticmethod
def from_json(data: Json) -> "VideoQuality":
return VideoQuality(data["frameRate"], data["quality"], data["sourceURL"])
@dataclass(frozen=True)
class Game():
id: str
name: str
description: str
@staticmethod
def from_json(data: Json) -> "Game":
return Game(data["id"], data["name"], data["description"])
@dataclass(frozen=True)
class Clip():
id: str
slug: str
title: str
created_at: str
view_count: int
duration_seconds: int
url: str
game: Optional[Game]
broadcaster: Broadcaster
video_qualities: List[VideoQuality]
raw: Json
@staticmethod
def from_json(data: Json) -> "Clip":
game = Game.from_json(data["game"]) if data["game"] else None
broadcaster = Broadcaster.from_json(data["broadcaster"])
video_qualities = [VideoQuality.from_json(q) for q in data["videoQualities"]]
return Clip(
data["id"],
data["slug"],
data["title"],
data["createdAt"],
data["viewCount"],
data["durationSeconds"],
data["url"],
game,
broadcaster,
video_qualities,
data
)
@dataclass(frozen=True)
class ClipsPage():
cursor: str
has_next_page: bool
has_previous_page: bool
clips: List[Clip]
@staticmethod
def from_json(data: Json) -> "ClipsPage":
return ClipsPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"]["hasPreviousPage"],
[Clip.from_json(c["node"]) for c in data["edges"]]
)
@dataclass(frozen=True)
class Video():
id: str
title: str
published_at: str
broadcast_type: str
length_seconds: int
game: Optional[Game]
creator: Broadcaster
raw: Json
@staticmethod
def from_json(data: Json) -> "Video":
game = Game.from_json(data["game"]) if data["game"] else None
creator = Broadcaster.from_json(data["creator"])
return Video(
data["id"],
data["title"],
data["publishedAt"],
data["broadcastType"],
data["lengthSeconds"],
game,
creator,
data
)
@dataclass(frozen=True)
class VideosPage():
cursor: str
has_next_page: bool
has_previous_page: bool
total_count: int
videos: List[Video]
@staticmethod
def from_json(data: Json) -> "VideosPage":
return VideosPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"].get("hasPreviousPage"),
data["totalCount"],
[Video.from_json(c["node"]) for c in data["edges"]]
)
ClipGenerator = Generator[Clip, None, None]
VideoGenerator = Generator[Video, None, None]

View File

@ -6,6 +6,7 @@ import re
from itertools import islice
from twitchdl import utils
from twitchdl.models import Clip, Video
from typing import Any, Match
@ -77,32 +78,31 @@ def print_log(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
def print_video(video: Video):
published_at = video.published_at.replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video.length_seconds)
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
channel = f"<blue>{video.creator.display_name}</blue>" if video.creator else ""
playing = f"playing <blue>{video.game.name}</blue>" if video.game else ""
# Can't find URL in video object, strange
url = "https://www.twitch.tv/videos/{}".format(video["id"])
url = f"https://www.twitch.tv/videos/{video.id}"
print_out("<b>Video {}</b>".format(video["id"]))
print_out("<green>{}</green>".format(video["title"]))
print_out(f"<b>Video {video.id}</b>")
print_out(f"<green>{video.title}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
print_out("<i>{}</i>".format(url))
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue>")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
date = video.published_at[:10]
game = video.game.name if video.game else ""
title = truncate(video.title, 80).ljust(80)
print_out(f"<b>{video.id}</b> {date} <green>{title}</green> <blue>{game}</blue>")
def print_paged_videos(generator, page_size, total_count):
@ -133,23 +133,23 @@ def print_paged_videos(generator, page_size, total_count):
break
def print_clip(clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
def print_clip(clip: Clip):
published_at = clip.created_at.replace("T", " @ ").replace("Z", "")
length = utils.format_time(clip.duration_seconds)
channel = clip.broadcaster.display_name
playing = (
"playing <blue>{}</blue>".format(clip["game"]["name"])
if clip["game"] else ""
"playing <blue>{}</blue>".format(clip.game.name)
if clip.game else ""
)
print_out("Clip <b>{}</b>".format(clip["slug"]))
print_out("<green>{}</green>".format(clip["title"]))
print_out("Clip <b>{}</b>".format(clip.slug))
print_out("<green>{}</green>".format(clip.title))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(
"Published <blue>{}</blue>"
" Length: <blue>{}</blue>"
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
print_out("<i>{}</i>".format(clip["url"]))
f"Published: <blue>{published_at}</blue>"
f" Length: <blue>{length}</blue>"
f" Views: <blue>{clip.view_count}</blue>")
print_out(f"<i>{clip.url}</i>")
def _continue():

View File

@ -3,11 +3,11 @@ Twitch API access.
"""
import httpx
import json
from typing import Dict
from twitchdl import CLIENT_ID
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game, Video, VideoGenerator, VideosPage
from typing import Dict, Optional, Tuple
class GQLError(Exception):
@ -49,23 +49,29 @@ def gql_query(query: str, headers: Dict[str, str] = {}):
return response
VIDEO_FIELDS = """
GAME_FIELDS = """
id
name
description
"""
VIDEO_FIELDS = f"""
id
title
publishedAt
broadcastType
lengthSeconds
game {
name
}
creator {
game {{
{GAME_FIELDS}
}}
creator {{
login
displayName
}
}}
"""
CLIP_FIELDS = """
CLIP_FIELDS = f"""
id
slug
title
@ -73,23 +79,22 @@ CLIP_FIELDS = """
viewCount
durationSeconds
url
videoQualities {
videoQualities {{
frameRate
quality
sourceURL
}
game {
id
name
}
broadcaster {
displayName
}}
game {{
{GAME_FIELDS}
}}
broadcaster {{
login
}
displayName
}}
"""
def get_video(video_id):
def get_video(video_id: str) -> Optional[Video]:
query = """
{{
video(id: "{video_id}") {{
@ -101,10 +106,11 @@ def get_video(video_id):
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
return response["data"]["video"]
if response["data"]["video"]:
return Video.from_json(response["data"]["video"])
def get_clip(slug):
def get_clip(slug: str) -> Clip:
query = """
{{
clip(slug: "{}") {{
@ -114,7 +120,7 @@ def get_clip(slug):
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
return response["data"]["clip"]
return Clip.from_json(response["data"]["clip"])
def get_clip_access_token(slug):
@ -137,7 +143,7 @@ def get_clip_access_token(slug):
return response["data"]["clip"]
def get_channel_clips(channel_id, period, limit, after=None):
def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
"""
List channel clips.
@ -179,50 +185,47 @@ def get_channel_clips(channel_id, period, limit, after=None):
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["clips"]
return ClipsPage.from_json(response["data"]["user"]["clips"])
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
for clip in clips["edges"]:
def channel_clips_generator(channel_id: str, period, limit: int) -> ClipGenerator:
def _generator(page: ClipsPage, limit: int):
for clip in page.clips:
if limit < 1:
return
yield clip["node"]
yield clip
limit -= 1
has_next = clips["pageInfo"]["hasNextPage"]
if limit < 1 or not has_next:
if limit < 1 or not page.has_next_page:
return
req_limit = min(limit, 100)
cursor = clips["edges"][-1]["cursor"]
clips = get_channel_clips(channel_id, period, req_limit, cursor)
yield from _generator(clips, limit)
next_page = get_channel_clips(channel_id, period, req_limit, page.cursor)
yield from _generator(next_page, limit)
req_limit = min(limit, 100)
clips = get_channel_clips(channel_id, period, req_limit)
return _generator(clips, limit)
page = get_channel_clips(channel_id, period, req_limit)
return _generator(page, limit)
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
clips = get_channel_clips(
channel_id, period, limit, after=cursor)
page = get_channel_clips(channel_id, period, limit, after=cursor)
if not clips["edges"]:
if not page.clips:
break
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
has_next = page.has_next_page
cursor = page.cursor if has_next else None
yield clips, has_next
yield page.clips, has_next
if not cursor:
break
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None) -> VideosPage:
query = """
{{
user(login: "{channel_id}") {{
@ -265,29 +268,27 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
if not response["data"]["user"]:
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["videos"]
return VideosPage.from_json(response["data"]["user"]["videos"])
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
def _generator(videos, max_videos):
for video in videos["edges"]:
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None) -> Tuple[int, VideoGenerator]:
def _generator(page, max_videos):
for video in page.videos:
if max_videos < 1:
return
yield video["node"]
yield video
max_videos -= 1
has_next = videos["pageInfo"]["hasNextPage"]
if max_videos < 1 or not has_next:
if max_videos < 1 or not page.has_next_page:
return
limit = min(max_videos, 100)
cursor = videos["edges"][-1]["cursor"]
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, cursor)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, page.cursor)
yield from _generator(videos, max_videos)
limit = min(max_videos, 100)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids)
return videos["totalCount"], _generator(videos, max_videos)
page = get_channel_videos(channel_id, limit, sort, type, game_ids)
return page.total_count, _generator(page, max_videos)
def get_access_token(video_id, auth_token=None):
@ -335,7 +336,7 @@ def get_playlists(video_id, access_token):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = "http://usher.ttvnw.net/vod/{}".format(video_id)
url = "http://usher.twitch.tv/vod/{}".format(video_id)
response = httpx.get(url, params={
"nauth": access_token['value'],
@ -348,45 +349,15 @@ def get_playlists(video_id, access_token):
return response.content.decode('utf-8')
def get_game_id(name):
query = """
def find_game(name: str) -> Optional[Game]:
query = f"""
{{
game(name: "{}") {{
id
game(name: "{name.strip()}") {{
{GAME_FIELDS}
}}
}}
"""
response = gql_query(query.format(name.strip()))
game = response["data"]["game"]
if game:
return game["id"]
def get_video_chapters(video_id):
query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables":
{
"includePrivate": False,
"videoID": video_id
},
"extensions":
{
"persistedQuery":
{
"version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41"
}
}
}
response = gql_post(json.dumps(query))
return list(_chapter_nodes(response["data"]["video"]["moments"]))
def _chapter_nodes(collection):
for edge in collection["edges"]:
node = edge["node"]
del node["moments"]
yield node
response = gql_query(query)
if response["data"]["game"]:
return Game.from_json(response["data"]["game"])

View File

@ -24,7 +24,7 @@ def format_size(bytes_, digits=1):
return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds):
def format_duration(total_seconds: int) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -40,29 +40,26 @@ def format_duration(total_seconds):
return "{} sec".format(seconds)
def format_time(total_seconds, force_hours=False):
def format_time(total_seconds: int) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
minutes = remainder // 60
seconds = total_seconds % 60
if hours or force_hours:
if hours:
return f"{hours:02}:{minutes:02}:{seconds:02}"
return f"{minutes:02}:{seconds:02}"
def read_int(msg, min, max, default=None):
if default:
msg = msg + f" [default {default}]"
msg += ": "
def read_int(msg, min, max, default):
msg = msg + " [default {}]: ".format(default)
while True:
try:
val = input(msg)
if default and not val:
if not val:
return default
if min <= int(val) <= max:
return int(val)
@ -70,14 +67,14 @@ def read_int(msg, min, max, default=None):
pass
def slugify(value):
def slugify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s_-]', '', value)
value = re.sub(r'[\s_-]+', '_', value)
return value.strip("_").lower()
def titlify(value):
def titlify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value)
value = re.sub(r'\s+', ' ', value)
@ -96,7 +93,7 @@ CLIP_PATTERNS = [
]
def parse_video_identifier(identifier):
def parse_video_identifier(identifier: str) -> str:
"""Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier)
@ -104,7 +101,7 @@ def parse_video_identifier(identifier):
return match.group("id")
def parse_clip_identifier(identifier):
def parse_clip_identifier(identifier: str) -> str:
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier)