mirror of
https://github.com/ihabunek/twitch-dl
synced 2024-08-30 18:32:25 +00:00
Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
33b5cc0a01 | |||
c9ab6237e8 |
@ -3,6 +3,11 @@ twitch-dl changelog
|
||||
|
||||
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
|
||||
|
||||
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
|
||||
|
||||
* Fix an issue where a temp vod file would be renamed while still being open,
|
||||
which caused an exception on Windows (#111)
|
||||
|
||||
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
|
||||
|
||||
This release switches from using `requests` to `httpx` for making http requests,
|
||||
|
@ -1,3 +1,8 @@
|
||||
2.0.1:
|
||||
date: 2022-09-09
|
||||
changes:
|
||||
- "Fix an issue where a temp vod file would be renamed while still being open, which caused an exception on Windows (#111)"
|
||||
|
||||
2.0.0:
|
||||
date: 2022-08-18
|
||||
description: |
|
||||
|
@ -3,6 +3,11 @@ twitch-dl changelog
|
||||
|
||||
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
|
||||
|
||||
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
|
||||
|
||||
* Fix an issue where a temp vod file would be renamed while still being open,
|
||||
which caused an exception on Windows (#111)
|
||||
|
||||
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
|
||||
|
||||
This release switches from using `requests` to `httpx` for making http requests,
|
||||
|
@ -1,5 +1,4 @@
|
||||
pytest
|
||||
pytest-cov
|
||||
twine
|
||||
wheel
|
||||
pyyaml
|
2
setup.py
2
setup.py
@ -11,7 +11,7 @@ makes it faster.
|
||||
|
||||
setup(
|
||||
name="twitch-dl",
|
||||
version="2.0.0",
|
||||
version="2.0.1",
|
||||
description="Twitch downloader",
|
||||
long_description=long_description.strip(),
|
||||
author="Ivan Habunek",
|
||||
|
@ -6,19 +6,18 @@ import httpx
|
||||
import m3u8
|
||||
from twitchdl import twitch
|
||||
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
|
||||
from twitchdl.models import Game, VideosPage
|
||||
|
||||
TEST_CHANNEL = "bananasaurus_rex"
|
||||
|
||||
|
||||
def test_get_videos():
|
||||
page = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
|
||||
assert isinstance(page, VideosPage)
|
||||
assert len(page.videos) > 0
|
||||
videos = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
|
||||
assert videos["pageInfo"]
|
||||
assert len(videos["edges"]) > 0
|
||||
|
||||
video_id = page.videos[0].id
|
||||
video_id = videos["edges"][0]["node"]["id"]
|
||||
video = twitch.get_video(video_id)
|
||||
assert video and video.id == video_id
|
||||
assert video["id"] == video_id
|
||||
|
||||
access_token = twitch.get_access_token(video_id)
|
||||
assert "signature" in access_token
|
||||
@ -27,7 +26,7 @@ def test_get_videos():
|
||||
playlists = twitch.get_playlists(video_id, access_token)
|
||||
assert playlists.startswith("#EXTM3U")
|
||||
|
||||
_, _, url = next(_parse_playlists(playlists))
|
||||
name, res, url = next(_parse_playlists(playlists))
|
||||
playlist = httpx.get(url).text
|
||||
assert playlist.startswith("#EXTM3U")
|
||||
|
||||
@ -37,22 +36,15 @@ def test_get_videos():
|
||||
|
||||
|
||||
def test_get_clips():
|
||||
page = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
|
||||
assert len(page.clips) > 0
|
||||
"""
|
||||
This test depends on the channel having some videos published.
|
||||
"""
|
||||
clips = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
|
||||
assert clips["pageInfo"]
|
||||
assert len(clips["edges"]) > 0
|
||||
|
||||
slug = page.clips[0].slug
|
||||
slug = clips["edges"][0]["node"]["slug"]
|
||||
clip = twitch.get_clip(slug)
|
||||
assert clip.slug == slug
|
||||
assert clip["slug"] == slug
|
||||
|
||||
assert get_clip_authenticated_url(slug, "source").startswith("https")
|
||||
|
||||
|
||||
def test_get_game():
|
||||
game = twitch.find_game("The Witness")
|
||||
assert isinstance(game, Game)
|
||||
assert game.id == "17324"
|
||||
assert game.name == "The Witness"
|
||||
assert game.description
|
||||
|
||||
game = twitch.find_game("Does Not Exist Hopefully")
|
||||
assert game is None
|
||||
assert get_clip_authenticated_url(slug, "source")
|
||||
|
@ -1,5 +0,0 @@
|
||||
from twitchdl.twitch import channel_clips_generator
|
||||
|
||||
|
||||
# def test_clips_generator():
|
||||
# channel_clips_generator("foo", "bar", 100)
|
@ -1,3 +1,3 @@
|
||||
__version__ = "2.0.0"
|
||||
__version__ = "2.0.1"
|
||||
|
||||
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
|
||||
|
@ -7,7 +7,6 @@ from os import path
|
||||
from twitchdl import twitch, utils
|
||||
from twitchdl.commands.download import get_clip_authenticated_url
|
||||
from twitchdl.download import download_file
|
||||
from twitchdl.models import Clip, ClipGenerator
|
||||
from twitchdl.output import print_out, print_clip, print_json
|
||||
|
||||
|
||||
@ -18,12 +17,13 @@ def clips(args):
|
||||
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
|
||||
|
||||
if args.json:
|
||||
return print_json([c.raw for c in generator])
|
||||
return print_json(list(generator))
|
||||
|
||||
if args.download:
|
||||
return _download_clips(generator)
|
||||
|
||||
if args.pager:
|
||||
print(args)
|
||||
return _print_paged(generator, args.pager)
|
||||
|
||||
return _print_all(generator, args)
|
||||
@ -40,41 +40,38 @@ def _continue():
|
||||
return True
|
||||
|
||||
|
||||
def _target_filename(clip: Clip):
|
||||
url = clip.video_qualities[0].source_url
|
||||
def _target_filename(clip):
|
||||
url = clip["videoQualities"][0]["sourceURL"]
|
||||
_, ext = path.splitext(url)
|
||||
ext = ext.lstrip(".")
|
||||
|
||||
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip.created_at)
|
||||
if not match:
|
||||
raise ValueError(f"Invalid date: {clip.created_at}")
|
||||
|
||||
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
|
||||
date = "".join(match.groups())
|
||||
|
||||
name = "_".join([
|
||||
date,
|
||||
clip.id,
|
||||
clip.broadcaster.login,
|
||||
utils.slugify(clip.title),
|
||||
clip["id"],
|
||||
clip["broadcaster"]["login"],
|
||||
utils.slugify(clip["title"]),
|
||||
])
|
||||
|
||||
return "{}.{}".format(name, ext)
|
||||
|
||||
|
||||
def _download_clips(clips: ClipGenerator):
|
||||
for clip in clips:
|
||||
def _download_clips(generator):
|
||||
for clip in generator:
|
||||
target = _target_filename(clip)
|
||||
|
||||
if path.exists(target):
|
||||
print_out("Already downloaded: <green>{}</green>".format(target))
|
||||
else:
|
||||
url = get_clip_authenticated_url(clip.slug, "source")
|
||||
url = get_clip_authenticated_url(clip["slug"], "source")
|
||||
print_out("Downloading: <yellow>{}</yellow>".format(target))
|
||||
download_file(url, target)
|
||||
|
||||
|
||||
def _print_all(clips: ClipGenerator, args):
|
||||
for clip in clips:
|
||||
def _print_all(generator, args):
|
||||
for clip in generator:
|
||||
print_out()
|
||||
print_clip(clip)
|
||||
|
||||
@ -85,8 +82,8 @@ def _print_all(clips: ClipGenerator, args):
|
||||
)
|
||||
|
||||
|
||||
def _print_paged(clips: ClipGenerator, page_size: int):
|
||||
iterator = iter(clips)
|
||||
def _print_paged(generator, page_size):
|
||||
iterator = iter(generator)
|
||||
page = list(islice(iterator, page_size))
|
||||
|
||||
first = 1
|
||||
|
@ -16,7 +16,6 @@ from twitchdl import twitch, utils
|
||||
from twitchdl.download import download_file
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
from twitchdl.http import download_all
|
||||
from twitchdl.models import Clip, Video
|
||||
from twitchdl.output import print_out
|
||||
|
||||
|
||||
@ -61,13 +60,13 @@ def _select_playlist_interactive(playlists):
|
||||
return uri
|
||||
|
||||
|
||||
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
|
||||
def _join_vods(playlist_path, target, overwrite, video):
|
||||
command = [
|
||||
"ffmpeg",
|
||||
"-i", playlist_path,
|
||||
"-c", "copy",
|
||||
"-metadata", f"artist={video.creator.display_name}",
|
||||
"-metadata", f"title={video.title}",
|
||||
"-metadata", "artist={}".format(video["creator"]["displayName"]),
|
||||
"-metadata", "title={}".format(video["title"]),
|
||||
"-metadata", "encoded_by=twitch-dl",
|
||||
"-stats",
|
||||
"-loglevel", "warning",
|
||||
@ -83,22 +82,22 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
|
||||
raise ConsoleError("Joining files failed")
|
||||
|
||||
|
||||
def _video_target_filename(video: Video, args) -> str:
|
||||
date, time = video.published_at.split("T")
|
||||
game = video.game.name if video.game else "Unknown"
|
||||
def _video_target_filename(video, args):
|
||||
date, time = video['publishedAt'].split("T")
|
||||
game = video["game"]["name"] if video["game"] else "Unknown"
|
||||
|
||||
subs = {
|
||||
"channel": video.creator.display_name,
|
||||
"channel_login": video.creator.login,
|
||||
"channel": video["creator"]["displayName"],
|
||||
"channel_login": video["creator"]["login"],
|
||||
"date": date,
|
||||
"datetime": video.published_at,
|
||||
"datetime": video["publishedAt"],
|
||||
"format": args.format,
|
||||
"game": game,
|
||||
"game_slug": utils.slugify(game),
|
||||
"id": video.id,
|
||||
"id": video["id"],
|
||||
"time": time,
|
||||
"title": utils.titlify(video.title),
|
||||
"title_slug": utils.slugify(video.title),
|
||||
"title": utils.titlify(video["title"]),
|
||||
"title_slug": utils.slugify(video["title"]),
|
||||
}
|
||||
|
||||
try:
|
||||
@ -108,27 +107,27 @@ def _video_target_filename(video: Video, args) -> str:
|
||||
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
|
||||
|
||||
|
||||
def _clip_target_filename(clip: Clip, args) -> str:
|
||||
date, time = clip.created_at.split("T")
|
||||
game = clip.game.name if clip.game else "Unknown"
|
||||
def _clip_target_filename(clip, args):
|
||||
date, time = clip["createdAt"].split("T")
|
||||
game = clip["game"]["name"] if clip["game"] else "Unknown"
|
||||
|
||||
url = clip.video_qualities[0].source_url
|
||||
url = clip["videoQualities"][0]["sourceURL"]
|
||||
_, ext = path.splitext(url)
|
||||
ext = ext.lstrip(".")
|
||||
|
||||
subs = {
|
||||
"channel": clip.broadcaster.display_name,
|
||||
"channel_login": clip.broadcaster.login,
|
||||
"channel": clip["broadcaster"]["displayName"],
|
||||
"channel_login": clip["broadcaster"]["login"],
|
||||
"date": date,
|
||||
"datetime": clip.created_at,
|
||||
"datetime": clip["createdAt"],
|
||||
"format": ext,
|
||||
"game": game,
|
||||
"game_slug": utils.slugify(game),
|
||||
"id": clip.id,
|
||||
"slug": clip.slug,
|
||||
"id": clip["id"],
|
||||
"slug": clip["slug"],
|
||||
"time": time,
|
||||
"title": utils.titlify(clip.title),
|
||||
"title_slug": utils.slugify(clip.title),
|
||||
"title": utils.titlify(clip["title"]),
|
||||
"title_slug": utils.slugify(clip["title"]),
|
||||
}
|
||||
|
||||
try:
|
||||
@ -183,7 +182,7 @@ def download_one(video: str, args):
|
||||
raise ConsoleError("Invalid input: {}".format(video))
|
||||
|
||||
|
||||
def _get_clip_url(clip, quality) -> str:
|
||||
def _get_clip_url(clip, quality):
|
||||
qualities = clip["videoQualities"]
|
||||
|
||||
# Quality given as an argument
|
||||
@ -211,7 +210,7 @@ def _get_clip_url(clip, quality) -> str:
|
||||
return selected_quality["sourceURL"]
|
||||
|
||||
|
||||
def get_clip_authenticated_url(slug: str, quality: str) -> str:
|
||||
def get_clip_authenticated_url(slug, quality):
|
||||
print_out("<dim>Fetching access token...</dim>")
|
||||
access_token = twitch.get_clip_access_token(slug)
|
||||
|
||||
@ -228,19 +227,19 @@ def get_clip_authenticated_url(slug: str, quality: str) -> str:
|
||||
return "{}?{}".format(url, query)
|
||||
|
||||
|
||||
def _download_clip(slug: str, args):
|
||||
def _download_clip(slug: str, args) -> None:
|
||||
print_out("<dim>Looking up clip...</dim>")
|
||||
clip = twitch.get_clip(slug)
|
||||
game = clip.game.name if clip.game else "Unknown"
|
||||
game = clip["game"]["name"] if clip["game"] else "Unknown"
|
||||
|
||||
if not clip:
|
||||
raise ConsoleError("Clip '{}' not found".format(slug))
|
||||
|
||||
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
|
||||
clip.title,
|
||||
clip.broadcaster.display_name,
|
||||
clip["title"],
|
||||
clip["broadcaster"]["displayName"],
|
||||
game,
|
||||
utils.format_duration(clip.duration_seconds)
|
||||
utils.format_duration(clip["durationSeconds"])
|
||||
))
|
||||
|
||||
target = _clip_target_filename(clip, args)
|
||||
@ -271,8 +270,8 @@ def _download_video(video_id, args) -> None:
|
||||
if not video:
|
||||
raise ConsoleError("Video {} not found".format(video_id))
|
||||
|
||||
creator = f" by <yellow>{video.creator.display_name}" if video.creator else ""
|
||||
print_out(f"Found: <blue>{video.title}</blue>{creator}")
|
||||
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
|
||||
video['title'], video['creator']['displayName']))
|
||||
|
||||
target = _video_target_filename(video, args)
|
||||
print_out("Output: <blue>{}</blue>".format(target))
|
||||
|
@ -2,7 +2,6 @@ import m3u8
|
||||
|
||||
from twitchdl import utils, twitch
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
from twitchdl.models import Clip, Video
|
||||
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
|
||||
|
||||
|
||||
@ -36,7 +35,7 @@ def info(args):
|
||||
raise ConsoleError("Clip {} not found".format(clip_slug))
|
||||
|
||||
if args.json:
|
||||
print_json(clip.raw)
|
||||
print_json(clip)
|
||||
else:
|
||||
clip_info(clip)
|
||||
return
|
||||
@ -44,7 +43,7 @@ def info(args):
|
||||
raise ConsoleError("Invalid input: {}".format(args.video))
|
||||
|
||||
|
||||
def video_info(video: Video, playlists):
|
||||
def video_info(video, playlists):
|
||||
print_out()
|
||||
print_video(video)
|
||||
|
||||
@ -54,11 +53,10 @@ def video_info(video: Video, playlists):
|
||||
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
|
||||
|
||||
|
||||
def video_json(video: Video, playlists):
|
||||
def video_json(video, playlists):
|
||||
playlists = m3u8.loads(playlists).playlists
|
||||
json = video.raw
|
||||
|
||||
json["playlists"] = [
|
||||
video["playlists"] = [
|
||||
{
|
||||
"bandwidth": p.stream_info.bandwidth,
|
||||
"resolution": p.stream_info.resolution,
|
||||
@ -68,14 +66,14 @@ def video_json(video: Video, playlists):
|
||||
} for p in playlists
|
||||
]
|
||||
|
||||
print_json(json)
|
||||
print_json(video)
|
||||
|
||||
|
||||
def clip_info(clip: Clip):
|
||||
def clip_info(clip):
|
||||
print_out()
|
||||
print_clip(clip)
|
||||
print_out()
|
||||
print_out("Download links:")
|
||||
|
||||
for q in clip.video_qualities:
|
||||
print_out(f"<b>{q.quality}p{q.frame_rate}</b> {q.source_url}")
|
||||
for q in clip["videoQualities"]:
|
||||
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
|
||||
|
@ -22,7 +22,7 @@ def videos(args):
|
||||
print_json({
|
||||
"count": len(videos),
|
||||
"totalCount": total_count,
|
||||
"videos": [v.raw for v in videos]
|
||||
"videos": videos
|
||||
})
|
||||
return
|
||||
|
||||
@ -60,10 +60,10 @@ def _get_game_ids(names):
|
||||
|
||||
game_ids = []
|
||||
for name in names:
|
||||
print_out(f"<dim>Looking up game '{name}'...</dim>")
|
||||
game = twitch.find_game(name)
|
||||
if not game:
|
||||
raise ConsoleError(f"Game '{name}' not found")
|
||||
game_ids.append(int(game.id))
|
||||
print_out("<dim>Looking up game '{}'...</dim>".format(name))
|
||||
game_id = twitch.get_game_id(name)
|
||||
if not game_id:
|
||||
raise ConsoleError("Game '{}' not found".format(name))
|
||||
game_ids.append(int(game_id))
|
||||
|
||||
return game_ids
|
||||
|
@ -83,7 +83,7 @@ async def download(
|
||||
token_bucket.advance(size)
|
||||
progress.advance(task_id, size)
|
||||
progress.end(task_id)
|
||||
os.rename(tmp_target, target)
|
||||
os.rename(tmp_target, target)
|
||||
|
||||
|
||||
async def download_with_retries(
|
||||
|
@ -1,141 +0,0 @@
|
||||
from typing import Any, Dict, List, Optional, Generator
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
Json = Dict[str, Any]
|
||||
GameID = str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Broadcaster():
|
||||
login: str
|
||||
display_name: str
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "Broadcaster":
|
||||
return Broadcaster(data["login"], data["displayName"])
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VideoQuality():
|
||||
frame_rate: int
|
||||
quality: str
|
||||
source_url: str
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "VideoQuality":
|
||||
return VideoQuality(data["frameRate"], data["quality"], data["sourceURL"])
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Game():
|
||||
id: str
|
||||
name: str
|
||||
description: str
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "Game":
|
||||
return Game(data["id"], data["name"], data["description"])
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Clip():
|
||||
id: str
|
||||
slug: str
|
||||
title: str
|
||||
created_at: str
|
||||
view_count: int
|
||||
duration_seconds: int
|
||||
url: str
|
||||
game: Optional[Game]
|
||||
broadcaster: Broadcaster
|
||||
video_qualities: List[VideoQuality]
|
||||
raw: Json
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "Clip":
|
||||
game = Game.from_json(data["game"]) if data["game"] else None
|
||||
broadcaster = Broadcaster.from_json(data["broadcaster"])
|
||||
video_qualities = [VideoQuality.from_json(q) for q in data["videoQualities"]]
|
||||
|
||||
return Clip(
|
||||
data["id"],
|
||||
data["slug"],
|
||||
data["title"],
|
||||
data["createdAt"],
|
||||
data["viewCount"],
|
||||
data["durationSeconds"],
|
||||
data["url"],
|
||||
game,
|
||||
broadcaster,
|
||||
video_qualities,
|
||||
data
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ClipsPage():
|
||||
cursor: str
|
||||
has_next_page: bool
|
||||
has_previous_page: bool
|
||||
clips: List[Clip]
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "ClipsPage":
|
||||
return ClipsPage(
|
||||
data["edges"][-1]["cursor"],
|
||||
data["pageInfo"]["hasNextPage"],
|
||||
data["pageInfo"]["hasPreviousPage"],
|
||||
[Clip.from_json(c["node"]) for c in data["edges"]]
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Video():
|
||||
id: str
|
||||
title: str
|
||||
published_at: str
|
||||
broadcast_type: str
|
||||
length_seconds: int
|
||||
game: Optional[Game]
|
||||
creator: Broadcaster
|
||||
raw: Json
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "Video":
|
||||
game = Game.from_json(data["game"]) if data["game"] else None
|
||||
creator = Broadcaster.from_json(data["creator"])
|
||||
|
||||
return Video(
|
||||
data["id"],
|
||||
data["title"],
|
||||
data["publishedAt"],
|
||||
data["broadcastType"],
|
||||
data["lengthSeconds"],
|
||||
game,
|
||||
creator,
|
||||
data
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VideosPage():
|
||||
cursor: str
|
||||
has_next_page: bool
|
||||
has_previous_page: bool
|
||||
total_count: int
|
||||
videos: List[Video]
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: Json) -> "VideosPage":
|
||||
return VideosPage(
|
||||
data["edges"][-1]["cursor"],
|
||||
data["pageInfo"]["hasNextPage"],
|
||||
data["pageInfo"].get("hasPreviousPage"),
|
||||
data["totalCount"],
|
||||
[Video.from_json(c["node"]) for c in data["edges"]]
|
||||
)
|
||||
|
||||
|
||||
ClipGenerator = Generator[Clip, None, None]
|
||||
VideoGenerator = Generator[Video, None, None]
|
@ -6,7 +6,6 @@ import re
|
||||
|
||||
from itertools import islice
|
||||
from twitchdl import utils
|
||||
from twitchdl.models import Clip, Video
|
||||
from typing import Any, Match
|
||||
|
||||
|
||||
@ -78,31 +77,32 @@ def print_log(*args, **kwargs):
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
|
||||
|
||||
def print_video(video: Video):
|
||||
published_at = video.published_at.replace("T", " @ ").replace("Z", "")
|
||||
length = utils.format_duration(video.length_seconds)
|
||||
def print_video(video):
|
||||
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
|
||||
length = utils.format_duration(video["lengthSeconds"])
|
||||
|
||||
channel = f"<blue>{video.creator.display_name}</blue>" if video.creator else ""
|
||||
playing = f"playing <blue>{video.game.name}</blue>" if video.game else ""
|
||||
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
|
||||
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
|
||||
|
||||
# Can't find URL in video object, strange
|
||||
url = f"https://www.twitch.tv/videos/{video.id}"
|
||||
url = "https://www.twitch.tv/videos/{}".format(video["id"])
|
||||
|
||||
print_out(f"<b>Video {video.id}</b>")
|
||||
print_out(f"<green>{video.title}</green>")
|
||||
print_out("<b>Video {}</b>".format(video["id"]))
|
||||
print_out("<green>{}</green>".format(video["title"]))
|
||||
|
||||
if channel or playing:
|
||||
print_out(" ".join([channel, playing]))
|
||||
|
||||
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue>")
|
||||
print_out(f"<i>{url}</i>")
|
||||
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
|
||||
print_out("<i>{}</i>".format(url))
|
||||
|
||||
|
||||
def print_video_compact(video):
|
||||
date = video.published_at[:10]
|
||||
game = video.game.name if video.game else ""
|
||||
title = truncate(video.title, 80).ljust(80)
|
||||
print_out(f"<b>{video.id}</b> {date} <green>{title}</green> <blue>{game}</blue>")
|
||||
id = video["id"]
|
||||
date = video["publishedAt"][:10]
|
||||
game = video["game"]["name"] if video["game"] else ""
|
||||
title = truncate(video["title"], 80).ljust(80)
|
||||
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
|
||||
|
||||
|
||||
def print_paged_videos(generator, page_size, total_count):
|
||||
@ -133,23 +133,23 @@ def print_paged_videos(generator, page_size, total_count):
|
||||
break
|
||||
|
||||
|
||||
def print_clip(clip: Clip):
|
||||
published_at = clip.created_at.replace("T", " @ ").replace("Z", "")
|
||||
length = utils.format_time(clip.duration_seconds)
|
||||
channel = clip.broadcaster.display_name
|
||||
def print_clip(clip):
|
||||
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
|
||||
length = utils.format_duration(clip["durationSeconds"])
|
||||
channel = clip["broadcaster"]["displayName"]
|
||||
playing = (
|
||||
"playing <blue>{}</blue>".format(clip.game.name)
|
||||
if clip.game else ""
|
||||
"playing <blue>{}</blue>".format(clip["game"]["name"])
|
||||
if clip["game"] else ""
|
||||
)
|
||||
|
||||
print_out("Clip <b>{}</b>".format(clip.slug))
|
||||
print_out("<green>{}</green>".format(clip.title))
|
||||
print_out("Clip <b>{}</b>".format(clip["slug"]))
|
||||
print_out("<green>{}</green>".format(clip["title"]))
|
||||
print_out("<blue>{}</blue> {}".format(channel, playing))
|
||||
print_out(
|
||||
f"Published: <blue>{published_at}</blue>"
|
||||
f" Length: <blue>{length}</blue>"
|
||||
f" Views: <blue>{clip.view_count}</blue>")
|
||||
print_out(f"<i>{clip.url}</i>")
|
||||
"Published <blue>{}</blue>"
|
||||
" Length: <blue>{}</blue>"
|
||||
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
|
||||
print_out("<i>{}</i>".format(clip["url"]))
|
||||
|
||||
|
||||
def _continue():
|
||||
|
@ -4,10 +4,9 @@ Twitch API access.
|
||||
|
||||
import httpx
|
||||
|
||||
from typing import Dict
|
||||
from twitchdl import CLIENT_ID
|
||||
from twitchdl.exceptions import ConsoleError
|
||||
from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game, Video, VideoGenerator, VideosPage
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
|
||||
class GQLError(Exception):
|
||||
@ -49,29 +48,23 @@ def gql_query(query: str, headers: Dict[str, str] = {}):
|
||||
return response
|
||||
|
||||
|
||||
GAME_FIELDS = """
|
||||
id
|
||||
name
|
||||
description
|
||||
"""
|
||||
|
||||
VIDEO_FIELDS = f"""
|
||||
VIDEO_FIELDS = """
|
||||
id
|
||||
title
|
||||
publishedAt
|
||||
broadcastType
|
||||
lengthSeconds
|
||||
game {{
|
||||
{GAME_FIELDS}
|
||||
}}
|
||||
creator {{
|
||||
game {
|
||||
name
|
||||
}
|
||||
creator {
|
||||
login
|
||||
displayName
|
||||
}}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
CLIP_FIELDS = f"""
|
||||
CLIP_FIELDS = """
|
||||
id
|
||||
slug
|
||||
title
|
||||
@ -79,22 +72,23 @@ CLIP_FIELDS = f"""
|
||||
viewCount
|
||||
durationSeconds
|
||||
url
|
||||
videoQualities {{
|
||||
videoQualities {
|
||||
frameRate
|
||||
quality
|
||||
sourceURL
|
||||
}}
|
||||
game {{
|
||||
{GAME_FIELDS}
|
||||
}}
|
||||
broadcaster {{
|
||||
login
|
||||
}
|
||||
game {
|
||||
id
|
||||
name
|
||||
}
|
||||
broadcaster {
|
||||
displayName
|
||||
}}
|
||||
login
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def get_video(video_id: str) -> Optional[Video]:
|
||||
def get_video(video_id):
|
||||
query = """
|
||||
{{
|
||||
video(id: "{video_id}") {{
|
||||
@ -106,11 +100,10 @@ def get_video(video_id: str) -> Optional[Video]:
|
||||
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
|
||||
|
||||
response = gql_query(query)
|
||||
if response["data"]["video"]:
|
||||
return Video.from_json(response["data"]["video"])
|
||||
return response["data"]["video"]
|
||||
|
||||
|
||||
def get_clip(slug: str) -> Clip:
|
||||
def get_clip(slug):
|
||||
query = """
|
||||
{{
|
||||
clip(slug: "{}") {{
|
||||
@ -120,7 +113,7 @@ def get_clip(slug: str) -> Clip:
|
||||
"""
|
||||
|
||||
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
|
||||
return Clip.from_json(response["data"]["clip"])
|
||||
return response["data"]["clip"]
|
||||
|
||||
|
||||
def get_clip_access_token(slug):
|
||||
@ -143,7 +136,7 @@ def get_clip_access_token(slug):
|
||||
return response["data"]["clip"]
|
||||
|
||||
|
||||
def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
|
||||
def get_channel_clips(channel_id, period, limit, after=None):
|
||||
"""
|
||||
List channel clips.
|
||||
|
||||
@ -185,47 +178,50 @@ def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
|
||||
if not user:
|
||||
raise ConsoleError("Channel {} not found".format(channel_id))
|
||||
|
||||
return ClipsPage.from_json(response["data"]["user"]["clips"])
|
||||
return response["data"]["user"]["clips"]
|
||||
|
||||
|
||||
def channel_clips_generator(channel_id: str, period, limit: int) -> ClipGenerator:
|
||||
def _generator(page: ClipsPage, limit: int):
|
||||
for clip in page.clips:
|
||||
def channel_clips_generator(channel_id, period, limit):
|
||||
def _generator(clips, limit):
|
||||
for clip in clips["edges"]:
|
||||
if limit < 1:
|
||||
return
|
||||
yield clip
|
||||
yield clip["node"]
|
||||
limit -= 1
|
||||
|
||||
if limit < 1 or not page.has_next_page:
|
||||
has_next = clips["pageInfo"]["hasNextPage"]
|
||||
if limit < 1 or not has_next:
|
||||
return
|
||||
|
||||
req_limit = min(limit, 100)
|
||||
next_page = get_channel_clips(channel_id, period, req_limit, page.cursor)
|
||||
yield from _generator(next_page, limit)
|
||||
cursor = clips["edges"][-1]["cursor"]
|
||||
clips = get_channel_clips(channel_id, period, req_limit, cursor)
|
||||
yield from _generator(clips, limit)
|
||||
|
||||
req_limit = min(limit, 100)
|
||||
page = get_channel_clips(channel_id, period, req_limit)
|
||||
return _generator(page, limit)
|
||||
clips = get_channel_clips(channel_id, period, req_limit)
|
||||
return _generator(clips, limit)
|
||||
|
||||
|
||||
def channel_clips_generator_old(channel_id, period, limit):
|
||||
cursor = ""
|
||||
while True:
|
||||
page = get_channel_clips(channel_id, period, limit, after=cursor)
|
||||
clips = get_channel_clips(
|
||||
channel_id, period, limit, after=cursor)
|
||||
|
||||
if not page.clips:
|
||||
if not clips["edges"]:
|
||||
break
|
||||
|
||||
has_next = page.has_next_page
|
||||
cursor = page.cursor if has_next else None
|
||||
has_next = clips["pageInfo"]["hasNextPage"]
|
||||
cursor = clips["edges"][-1]["cursor"] if has_next else None
|
||||
|
||||
yield page.clips, has_next
|
||||
yield clips, has_next
|
||||
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
|
||||
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None) -> VideosPage:
|
||||
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
|
||||
query = """
|
||||
{{
|
||||
user(login: "{channel_id}") {{
|
||||
@ -268,27 +264,29 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
|
||||
if not response["data"]["user"]:
|
||||
raise ConsoleError("Channel {} not found".format(channel_id))
|
||||
|
||||
return VideosPage.from_json(response["data"]["user"]["videos"])
|
||||
return response["data"]["user"]["videos"]
|
||||
|
||||
|
||||
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None) -> Tuple[int, VideoGenerator]:
|
||||
def _generator(page, max_videos):
|
||||
for video in page.videos:
|
||||
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None):
|
||||
def _generator(videos, max_videos):
|
||||
for video in videos["edges"]:
|
||||
if max_videos < 1:
|
||||
return
|
||||
yield video
|
||||
yield video["node"]
|
||||
max_videos -= 1
|
||||
|
||||
if max_videos < 1 or not page.has_next_page:
|
||||
has_next = videos["pageInfo"]["hasNextPage"]
|
||||
if max_videos < 1 or not has_next:
|
||||
return
|
||||
|
||||
limit = min(max_videos, 100)
|
||||
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, page.cursor)
|
||||
cursor = videos["edges"][-1]["cursor"]
|
||||
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, cursor)
|
||||
yield from _generator(videos, max_videos)
|
||||
|
||||
limit = min(max_videos, 100)
|
||||
page = get_channel_videos(channel_id, limit, sort, type, game_ids)
|
||||
return page.total_count, _generator(page, max_videos)
|
||||
videos = get_channel_videos(channel_id, limit, sort, type, game_ids)
|
||||
return videos["totalCount"], _generator(videos, max_videos)
|
||||
|
||||
|
||||
def get_access_token(video_id, auth_token=None):
|
||||
@ -349,15 +347,16 @@ def get_playlists(video_id, access_token):
|
||||
return response.content.decode('utf-8')
|
||||
|
||||
|
||||
def find_game(name: str) -> Optional[Game]:
|
||||
query = f"""
|
||||
def get_game_id(name):
|
||||
query = """
|
||||
{{
|
||||
game(name: "{name.strip()}") {{
|
||||
{GAME_FIELDS}
|
||||
game(name: "{}") {{
|
||||
id
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
|
||||
response = gql_query(query)
|
||||
if response["data"]["game"]:
|
||||
return Game.from_json(response["data"]["game"])
|
||||
response = gql_query(query.format(name.strip()))
|
||||
game = response["data"]["game"]
|
||||
if game:
|
||||
return game["id"]
|
||||
|
@ -24,7 +24,7 @@ def format_size(bytes_, digits=1):
|
||||
return _format_size(mega / 1024, digits, "GB")
|
||||
|
||||
|
||||
def format_duration(total_seconds: int) -> str:
|
||||
def format_duration(total_seconds):
|
||||
total_seconds = int(total_seconds)
|
||||
hours = total_seconds // 3600
|
||||
remainder = total_seconds % 3600
|
||||
@ -40,7 +40,7 @@ def format_duration(total_seconds: int) -> str:
|
||||
return "{} sec".format(seconds)
|
||||
|
||||
|
||||
def format_time(total_seconds: int) -> str:
|
||||
def format_time(total_seconds):
|
||||
total_seconds = int(total_seconds)
|
||||
hours = total_seconds // 3600
|
||||
remainder = total_seconds % 3600
|
||||
@ -67,14 +67,14 @@ def read_int(msg, min, max, default):
|
||||
pass
|
||||
|
||||
|
||||
def slugify(value: str) -> str:
|
||||
def slugify(value):
|
||||
value = unicodedata.normalize('NFKC', str(value))
|
||||
value = re.sub(r'[^\w\s_-]', '', value)
|
||||
value = re.sub(r'[\s_-]+', '_', value)
|
||||
return value.strip("_").lower()
|
||||
|
||||
|
||||
def titlify(value: str) -> str:
|
||||
def titlify(value):
|
||||
value = unicodedata.normalize('NFKC', str(value))
|
||||
value = re.sub(r'[^\w\s\[\]().-]', '', value)
|
||||
value = re.sub(r'\s+', ' ', value)
|
||||
@ -93,7 +93,7 @@ CLIP_PATTERNS = [
|
||||
]
|
||||
|
||||
|
||||
def parse_video_identifier(identifier: str) -> str:
|
||||
def parse_video_identifier(identifier):
|
||||
"""Given a video ID or URL returns the video ID, or null if not matched"""
|
||||
for pattern in VIDEO_PATTERNS:
|
||||
match = re.match(pattern, identifier)
|
||||
@ -101,7 +101,7 @@ def parse_video_identifier(identifier: str) -> str:
|
||||
return match.group("id")
|
||||
|
||||
|
||||
def parse_clip_identifier(identifier: str) -> str:
|
||||
def parse_clip_identifier(identifier):
|
||||
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
|
||||
for pattern in CLIP_PATTERNS:
|
||||
match = re.match(pattern, identifier)
|
||||
|
Reference in New Issue
Block a user