This commit is contained in:
Ivan Habunek 2024-04-01 10:52:41 +02:00
parent 3270d857b1
commit 0e8e2e3f40
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
7 changed files with 168 additions and 43 deletions

View File

@ -23,6 +23,7 @@ dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=1.0.0,<4.0.0",
"python-dateutil>=2.8.0,<3.0.0",
]
[tool.setuptools]

View File

@ -15,8 +15,9 @@ from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode
from twitchdl import twitch, utils
from twitchdl.conversion import from_dict
from twitchdl.download import download_file
from twitchdl.entities import Data, DownloadOptions
from twitchdl.entities import Data, DownloadOptions, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.output import blue, bold, dim, green, print_log, yellow
@ -80,16 +81,16 @@ def _select_playlist_interactive(playlists):
return uri
def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
description = video["description"] or ""
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
description = video.description or ""
description = description.strip()
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", f"artist={video['creator']['displayName']}",
"-metadata", f"title={video['title']}",
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", f"description={description}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
@ -115,26 +116,28 @@ def _concat_vods(vod_paths: list[str], target: str):
raise ConsoleError(f"Joining files failed: {result.stderr}")
def get_video_placeholders(video: Data, format: str) -> Data:
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def get_video_placeholders(video: Video, format: str) -> Data:
date = video.published_at.date().isoformat()
time = video.published_at.time().isoformat()
datetime = video.published_at.isoformat().replace("+00:00", "Z")
game = video.game.name if video.game else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"channel": video.creator.display_name,
"channel_login": video.creator.login,
"date": date,
"datetime": video["publishedAt"],
"datetime": datetime,
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"id": video.id,
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
}
def _video_target_filename(video: Data, args: DownloadOptions):
def _video_target_filename(video: Video, args: DownloadOptions):
subs = get_video_placeholders(video, args.format)
try:
@ -280,19 +283,18 @@ def _download_clip(slug: str, args: DownloadOptions) -> None:
click.echo(f"Downloaded: {blue(target)}")
def _download_video(video_id, args: DownloadOptions) -> None:
def _download_video(video_id: str, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
print_log("Looking up video...")
video = twitch.get_video(video_id)
response = twitch.get_video(video_id)
if not video:
if not response:
raise ConsoleError(f"Video {video_id} not found")
title = video["title"]
user = video["creator"]["displayName"]
click.echo(f"Found: {blue(title)} by {yellow(user)}")
video = from_dict(Video, response)
click.echo(f"Found: {blue(video.title)} by {yellow(video.creator.display_name)}")
target = _video_target_filename(video, args)
click.echo(f"Output: {blue(target)}")

View File

@ -3,17 +3,19 @@ import m3u8
from twitchdl import utils, twitch
from twitchdl.commands.download import get_video_placeholders
from twitchdl.entities import Data
from twitchdl.conversion import from_dict
from twitchdl.entities import Data, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log
def info(id: str, *, json: bool = False):
video_id = utils.parse_video_identifier(id)
if video_id:
print_log("Fetching video...")
video = twitch.get_video(video_id)
response = twitch.get_video(video_id)
if not video:
if not response:
raise ConsoleError(f"Video {video_id} not found")
print_log("Fetching access token...")
@ -26,8 +28,9 @@ def info(id: str, *, json: bool = False):
chapters = twitch.get_video_chapters(video_id)
if json:
video_json(video, playlists, chapters)
video_json(response, playlists, chapters)
else:
video = from_dict(Video, response)
video_info(video, playlists, chapters)
return
@ -47,7 +50,7 @@ def info(id: str, *, json: bool = False):
raise ConsoleError(f"Invalid input: {id}")
def video_info(video, playlists, chapters):
def video_info(video: Video, playlists, chapters):
click.echo()
print_video(video)

87
twitchdl/conversion.py Normal file
View File

@ -0,0 +1,87 @@
import re
import dataclasses
from dataclasses import Field, is_dataclass
from datetime import date, datetime
from dateutil import parser
from typing import Any, Generator, Type, TypeVar, Union, get_args, get_origin, Callable
from typing import get_type_hints
# Generic data class instance
T = TypeVar("T")
# Dict of data decoded from JSON
Data = dict[str, Any]
def snake_to_camel(name: str):
def repl(match: re.Match[str]):
return match.group(1).upper()
return re.sub(r"_([a-z])", repl, name)
def from_dict(cls: Type[T], data: Data, key_fn: Callable[[str], str] = snake_to_camel) -> T:
"""Convert a nested dict into an instance of `cls`."""
def _fields() -> Generator[tuple[str, Any], None, None]:
hints = get_type_hints(cls)
for field in dataclasses.fields(cls):
field_type = _prune_optional(hints[field.name])
dict_field_name = key_fn(field.name)
if (value := data.get(dict_field_name)) is not None:
field_value = _convert(field_type, value)
else:
field_value = _get_default_value(field)
yield field.name, field_value
return cls(**dict(_fields()))
def from_dict_list(cls: Type[T], data: list[Data]) -> list[T]:
return [from_dict(cls, x) for x in data]
def _get_default_value(field: Field[Any]):
if field.default is not dataclasses.MISSING:
return field.default
if field.default_factory is not dataclasses.MISSING:
return field.default_factory()
return None
def _convert(field_type: Type[Any], value: Any) -> Any:
if value is None:
return None
if field_type in [str, int, bool, dict]:
return value
if field_type == datetime:
return parser.parse(value)
if field_type == date:
return date.fromisoformat(value)
if get_origin(field_type) == list:
(inner_type,) = get_args(field_type)
return [_convert(inner_type, x) for x in value]
if is_dataclass(field_type):
return from_dict(field_type, value)
raise ValueError(f"Not implemented for type '{field_type}'")
def _prune_optional(field_type: Type[Any]):
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
if get_origin(field_type) == Union:
args = get_args(field_type)
if len(args) == 2 and args[1] == type(None): # noqa
return args[0]
return field_type

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@ -23,3 +24,30 @@ class DownloadOptions:
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = dict[str, Any]
@dataclass
class User:
login: str
display_name: str
@dataclass
class Game:
name: str
@dataclass
class Video:
id: str
title: str
description: str
published_at: datetime
broadcast_type: str
length_seconds: int
game: Game
creator: User
@dataclass
class AccessToken:
signature: str
value: str

View File

@ -5,7 +5,7 @@ from itertools import islice
from twitchdl import utils
from typing import Any, Callable, Generator, TypeVar
from twitchdl.entities import Data
from twitchdl.entities import Data, Video
T = TypeVar("T")
@ -78,24 +78,24 @@ def print_paged(
def print_video(video: Data):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
def print_video(video: Video):
published_at = str(video.published_at.astimezone())
length = utils.format_duration(video.length_seconds)
channel = blue(video['creator']['displayName']) if video["creator"] else ""
playing = f"playing {blue(video['game']['name'])}" if video["game"] else ""
channel = blue(video.creator.display_name) if video.creator else ""
playing = f"playing {blue(video.game.name)}" if video.game else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video['id']}"
url = f"https://www.twitch.tv/videos/{video.id}"
click.secho(f"Video {video['id']}", bold=True)
click.secho(f"{video['title']}", fg="green")
click.secho(f"Video {video.id}", bold=True)
click.secho(f"{video.title}", fg="green")
if channel or playing:
click.echo(" ".join([channel, playing]))
if video["description"]:
click.echo(f"Description: {video['description']}")
if video.description:
click.echo(f"Description: {video.description}")
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)

View File

@ -8,7 +8,7 @@ import click
from typing import Dict, Generator, Literal
from twitchdl import CLIENT_ID
from twitchdl.entities import Data
from twitchdl.entities import AccessToken, Data
from twitchdl.exceptions import ConsoleError
@ -298,7 +298,7 @@ def channel_videos_generator(
return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id, auth_token=None):
def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToken:
query = f"""
{{
videoPlaybackAccessToken(
@ -321,7 +321,11 @@ def get_access_token(video_id, auth_token=None):
try:
response = gql_query(query, headers=headers)
return response["data"]["videoPlaybackAccessToken"]
return AccessToken(
response["data"]["videoPlaybackAccessToken"]["signature"],
response["data"]["videoPlaybackAccessToken"]["value"],
)
except httpx.HTTPStatusError as error:
# Provide a more useful error message when server returns HTTP 401
# Unauthorized while using a user-provided auth token.
@ -337,15 +341,15 @@ def get_access_token(video_id, auth_token=None):
raise
def get_playlists(video_id, access_token):
def get_playlists(video_id: str, access_token: AccessToken):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(url, params={
"nauth": access_token['value'],
"nauthsig": access_token['signature'],
"nauth": access_token.value,
"nauthsig": access_token.signature,
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",