This commit is contained in:
Ivan Habunek 2022-08-27 15:26:54 +02:00
parent afe38b84cd
commit c0a7ea1f27
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
7 changed files with 115 additions and 68 deletions

View File

@ -6,19 +6,19 @@ import httpx
import m3u8
from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
from twitchdl.models import Game
from twitchdl.models import Game, VideosPage
TEST_CHANNEL = "bananasaurus_rex"
def test_get_videos():
videos = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert videos["pageInfo"]
assert len(videos["edges"]) > 0
page = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert isinstance(page, VideosPage)
assert len(page.videos) > 0
video_id = videos["edges"][0]["node"]["id"]
video_id = page.videos[0].id
video = twitch.get_video(video_id)
assert video["id"] == video_id
assert video and video.id == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
@ -27,7 +27,7 @@ def test_get_videos():
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
name, res, url = next(_parse_playlists(playlists))
_, _, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")

View File

@ -16,7 +16,7 @@ from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.models import Clip
from twitchdl.models import Clip, Video
from twitchdl.output import print_out
@ -61,13 +61,13 @@ def _select_playlist_interactive(playlists):
return uri
def _join_vods(playlist_path, target, overwrite, video):
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
@ -83,22 +83,22 @@ def _join_vods(playlist_path, target, overwrite, video):
raise ConsoleError("Joining files failed")
def _video_target_filename(video, args):
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def _video_target_filename(video: Video, args) -> str:
date, time = video.published_at.split("T")
game = video.game.name if video.game else "Unknown"
subs = {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"channel": video.creator.display_name,
"channel_login": video.creator.login,
"date": date,
"datetime": video["publishedAt"],
"datetime": video.published_at,
"format": args.format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"id": video.id,
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
}
try:
@ -183,7 +183,7 @@ def download_one(video: str, args):
raise ConsoleError("Invalid input: {}".format(video))
def _get_clip_url(clip, quality):
def _get_clip_url(clip, quality) -> str:
qualities = clip["videoQualities"]
# Quality given as an argument
@ -211,7 +211,7 @@ def _get_clip_url(clip, quality):
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug, quality):
def get_clip_authenticated_url(slug: str, quality: str) -> str:
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_clip_access_token(slug)
@ -228,7 +228,7 @@ def get_clip_authenticated_url(slug, quality):
return "{}?{}".format(url, query)
def _download_clip(slug: str, args) -> None:
def _download_clip(slug: str, args):
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip.game.name if clip.game else "Unknown"
@ -271,8 +271,8 @@ def _download_video(video_id, args) -> None:
if not video:
raise ConsoleError("Video {} not found".format(video_id))
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
video['title'], video['creator']['displayName']))
creator = f" by <yellow>{video.creator.display_name}" if video.creator else ""
print_out(f"Found: <blue>{video.title}</blue>{creator}")
target = _video_target_filename(video, args)
print_out("Output: <blue>{}</blue>".format(target))

View File

@ -2,7 +2,7 @@ import m3u8
from twitchdl import utils, twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip
from twitchdl.models import Clip, Video
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
@ -44,7 +44,7 @@ def info(args):
raise ConsoleError("Invalid input: {}".format(args.video))
def video_info(video, playlists):
def video_info(video: Video, playlists):
print_out()
print_video(video)
@ -54,10 +54,11 @@ def video_info(video, playlists):
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
def video_json(video, playlists):
def video_json(video: Video, playlists):
playlists = m3u8.loads(playlists).playlists
json = video.raw
video["playlists"] = [
json["playlists"] = [
{
"bandwidth": p.stream_info.bandwidth,
"resolution": p.stream_info.resolution,
@ -67,7 +68,7 @@ def video_json(video, playlists):
} for p in playlists
]
print_json(video)
print_json(json)
def clip_info(clip: Clip):

View File

@ -22,7 +22,7 @@ def videos(args):
print_json({
"count": len(videos),
"totalCount": total_count,
"videos": videos
"videos": [v.raw for v in videos]
})
return
@ -60,10 +60,10 @@ def _get_game_ids(names):
game_ids = []
for name in names:
print_out("<dim>Looking up game '{}'...</dim>".format(name))
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError("Game '{}' not found".format(name))
game_ids.append(int(game_id))
print_out(f"<dim>Looking up game '{name}'...</dim>")
game = twitch.find_game(name)
if not game:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game.id))
return game_ids

View File

@ -90,4 +90,52 @@ class ClipsPage():
)
@dataclass(frozen=True)
class Video():
id: str
title: str
published_at: str
broadcast_type: str
length_seconds: int
game: Optional[Game]
creator: Broadcaster
raw: Json
@staticmethod
def from_json(data: Json) -> "Video":
game = Game.from_json(data["game"]) if data["game"] else None
creator = Broadcaster.from_json(data["creator"])
return Video(
data["id"],
data["title"],
data["publishedAt"],
data["broadcastType"],
data["lengthSeconds"],
game,
creator,
data
)
@dataclass(frozen=True)
class VideosPage():
cursor: str
has_next_page: bool
has_previous_page: bool
total_count: int
videos: List[Video]
@staticmethod
def from_json(data: Json) -> "VideosPage":
return VideosPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"].get("hasPreviousPage"),
data["totalCount"],
[Video.from_json(c["node"]) for c in data["edges"]]
)
ClipGenerator = Generator[Clip, None, None]
VideoGenerator = Generator[Video, None, None]

View File

@ -6,7 +6,7 @@ import re
from itertools import islice
from twitchdl import utils
from twitchdl.models import Clip
from twitchdl.models import Clip, Video
from typing import Any, Match
@ -78,32 +78,31 @@ def print_log(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
def print_video(video: Video):
published_at = video.published_at.replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video.length_seconds)
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
channel = f"<blue>{video.creator.display_name}</blue>" if video.creator else ""
playing = f"playing <blue>{video.game.name}</blue>" if video.game else ""
# Can't find URL in video object, strange
url = "https://www.twitch.tv/videos/{}".format(video["id"])
url = f"https://www.twitch.tv/videos/{video.id}"
print_out("<b>Video {}</b>".format(video["id"]))
print_out("<green>{}</green>".format(video["title"]))
print_out(f"<b>Video {video.id}</b>")
print_out(f"<green>{video.title}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
print_out("<i>{}</i>".format(url))
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue>")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
date = video.published_at[:10]
game = video.game.name if video.game else ""
title = truncate(video.title, 80).ljust(80)
print_out(f"<b>{video.id}</b> {date} <green>{title}</green> <blue>{game}</blue>")
def print_paged_videos(generator, page_size, total_count):

View File

@ -6,8 +6,8 @@ import httpx
from twitchdl import CLIENT_ID
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game
from typing import Dict, Optional
from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game, Video, VideoGenerator, VideosPage
from typing import Dict, Optional, Tuple
class GQLError(Exception):
@ -94,7 +94,7 @@ CLIP_FIELDS = f"""
"""
def get_video(video_id):
def get_video(video_id: str) -> Optional[Video]:
query = """
{{
video(id: "{video_id}") {{
@ -106,7 +106,8 @@ def get_video(video_id):
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
return response["data"]["video"]
if response["data"]["video"]:
return Video.from_json(response["data"]["video"])
def get_clip(slug: str) -> Clip:
@ -224,7 +225,7 @@ def channel_clips_generator_old(channel_id, period, limit):
break
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None) -> VideosPage:
query = """
{{
user(login: "{channel_id}") {{
@ -267,29 +268,27 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
if not response["data"]["user"]:
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["videos"]
return VideosPage.from_json(response["data"]["user"]["videos"])
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None):
def _generator(videos, max_videos):
for video in videos["edges"]:
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None) -> Tuple[int, VideoGenerator]:
def _generator(page, max_videos):
for video in page.videos:
if max_videos < 1:
return
yield video["node"]
yield video
max_videos -= 1
has_next = videos["pageInfo"]["hasNextPage"]
if max_videos < 1 or not has_next:
if max_videos < 1 or not page.has_next_page:
return
limit = min(max_videos, 100)
cursor = videos["edges"][-1]["cursor"]
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, cursor)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, page.cursor)
yield from _generator(videos, max_videos)
limit = min(max_videos, 100)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids)
return videos["totalCount"], _generator(videos, max_videos)
page = get_channel_videos(channel_id, limit, sort, type, game_ids)
return page.total_count, _generator(page, max_videos)
def get_access_token(video_id, auth_token=None):