Compare commits

..

49 Commits
2.2.3 ... chat

Author SHA1 Message Date
8a10b2bba5 wip 2024-08-30 17:44:45 +02:00
ec985efcd1 Add chat command 2024-08-30 12:21:01 +02:00
dc99ee51bc Improve logging a bit when downloading 2024-08-30 11:58:15 +02:00
2c9420c43d Update changelog 2024-08-30 11:47:14 +02:00
4a86cb16c8 Use relative paths in generated m3u8 playlist
Since we're using relative paths for initi segments, and paths are
relative to the path where the playlist is located, this seems sensible.
Allows the folder to be moved, and the playlist will still work.
2024-08-30 11:44:07 +02:00
cfefae1e69 Fix query to fetch HD qualities 2024-08-30 11:43:48 +02:00
ac7cdba28e Download init segments
These seem to occur in hd quality playlists like 1440p.
2024-08-30 11:42:54 +02:00
2feef136ca Ignore video files 2024-08-30 11:42:21 +02:00
d85aeade86 Update changelog 2024-08-30 08:25:52 +02:00
4db4285d9a Handle videoQualities being null 2024-08-30 08:06:31 +02:00
707b2da934 Improve types 2024-08-30 08:05:16 +02:00
2846925c91 Allow newer versions of m3u8 2024-08-30 08:05:03 +02:00
8f5d7f022f Reinstate "VODs" in progress output 2024-08-30 07:59:51 +02:00
ff4a514b8a Handle video URLs containing account name
fixes #162
2024-08-30 07:55:58 +02:00
f57612ffcc Refactor download_all to work when count is unknown 2024-08-30 07:37:50 +02:00
5be97b9e13 Fix tests 2024-08-28 15:21:22 +02:00
b6e7f8b36c Make Progress work when file count is not known 2024-08-28 15:18:43 +02:00
42f7a9a1a5 Fix a crash when downloading clips 2024-08-28 14:21:22 +02:00
789d3d1939 Add --target-dir option to clips command 2024-08-28 13:15:01 +02:00
07efac1ae7 Add --verbose flag for verbose logging
Don't log HTTP payloads unless --verbose flag is given. Always logging
HTTP payloads tends to make the log unreadable.
2024-08-28 12:58:36 +02:00
a808b7d8ec Don't check if file exists in download_file
This is done outside the function.
2024-08-28 12:39:56 +02:00
936c6a9da1 Don't stop downloading if one download fails 2024-08-28 12:33:05 +02:00
7184feacee Move download_file to http 2024-08-28 11:07:25 +02:00
5679e66270 Improve naming 2024-08-28 11:02:12 +02:00
1658aba124 Embrace pathlib 2024-08-28 10:59:23 +02:00
29d3f7fec2 Extract file naming 2024-08-28 10:32:40 +02:00
91e0447681 Remove unnecessary braces 2024-08-28 09:53:06 +02:00
f9e60082ba Improve downloading logic
Follow redirects to avoid saving an intermediate response, and raise an
error on invalid status to avoid saving an error response.
2024-08-28 09:52:15 +02:00
141ecb7f29 Move entities to entites.py 2024-08-24 20:23:34 +02:00
52b96aab15 Simplify persisted queries 2024-08-24 20:08:01 +02:00
1bdf6a2c02 Remove unused function 2024-08-24 20:05:32 +02:00
38636e2b21 Use ansi escape code to clear line 2024-05-31 12:57:51 +02:00
3fe1faa18e Update changelog 2024-05-19 09:11:45 +02:00
a99a472ad3 add quotes to gql query 2024-05-19 09:09:02 +02:00
47d62bc471 Improve types 2024-04-29 09:08:19 +02:00
de95384e6b Fix tests 2024-04-28 10:16:33 +02:00
aac450a5bc Calculate progress only when printing progress 2024-04-28 10:13:47 +02:00
9549679679 Make Progress not a dataclass 2024-04-28 10:09:53 +02:00
35e974bb45 Upgrade m3u8 dependency 2024-04-28 09:30:24 +02:00
b8e3809810 Print a note if no ids given 2024-04-28 08:02:01 +02:00
cf580fde09 Update changelog 2024-04-27 20:23:35 +02:00
68c9e644a8 Sort playlists 2024-04-27 20:22:53 +02:00
ace4427caa Print playlists in a table 2024-04-27 20:22:53 +02:00
97f48f7108 Improve playlist parsing
Better support for "enhanced broadcast" streams

issue #154
2024-04-27 20:22:53 +02:00
f9e553c61f Support styling in print_table 2024-04-27 20:22:53 +02:00
4fac6c11c5 Delete generated website when cleaning 2024-04-27 20:22:52 +02:00
125bc693f8 Update changelog 2024-04-25 07:33:05 +02:00
8a7fdad22f Test m patterns 2024-04-25 07:31:52 +02:00
c00a9c3597 Add support for m dot urls 2024-04-25 07:29:59 +02:00
24 changed files with 1224 additions and 337 deletions

2
.gitignore vendored
View File

@ -15,3 +15,5 @@ tmp/
/*.pyz
/pyrightconfig.json
/book
*.mp4
*.mkv

View File

@ -3,6 +3,32 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.5.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.5.0)
* Add support for HD video qualities (#163)
### [2.4.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.4.0)
* Add `clips --target-dir` option. Use in conjunction with `--download` to
specify target directory.
* Fix a crash when downloading clips (#160)
* Handle video URLs which contain the channel name (#162)
* Don't stop downloading clips if one download fails
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos

View File

@ -7,7 +7,7 @@ dist:
clean :
find . -name "*pyc" | xargs rm -rf $1
rm -rf build dist bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
rm -rf build dist book bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
bundle:
mkdir bundle

View File

@ -1,3 +1,32 @@
2.5.0:
date: 2024-08-30
changes:
- "Add support for HD video qualities (#163)"
2.4.0:
date: 2024-08-30
changes:
- "Add `clips --target-dir` option. Use in conjunction with `--download` to specify target directory."
- "Fix a crash when downloading clips (#160)"
- "Handle video URLs which contain the channel name (#162)"
- "Don't stop downloading clips if one download fails"
2.3.1:
date: 2024-05-19
changes:
- "Fix fetching access token (#155, thanks @KryptonicDragon)"
2.3.0:
date: 2024-04-27
changes:
- "Show more playlist data when choosing quality"
- "Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams (#154)"
2.2.4:
date: 2024-04-25
changes:
- "Add m dot url support to video and clip regexes (thanks @localnerve)"
2.2.3:
date: 2024-04-24
changes:

View File

@ -3,6 +3,32 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.5.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.5.0)
* Add support for HD video qualities (#163)
### [2.4.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.4.0)
* Add `clips --target-dir` option. Use in conjunction with `--download` to
specify target directory.
* Fix a crash when downloading clips (#160)
* Handle video URLs which contain the channel name (#162)
* Don't stop downloading clips if one download fails
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos

View File

@ -43,6 +43,11 @@ twitch-dl clips [OPTIONS] CHANNEL_NAME
<td>Period from which to return clips Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>. [default: <code>all_time</code>]</td>
</tr>
<tr>
<td class="code">-t, --target-dir</td>
<td>Target directory when downloading clips [default: <code>.</code>]</td>
</tr>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>

View File

@ -22,7 +22,7 @@ classifiers = [
dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=1.0.0,<4.0.0",
"m3u8>=3.0.0,<7.0.0",
]
[tool.setuptools]
@ -34,6 +34,13 @@ packages = [
[tool.setuptools_scm]
[project.optional-dependencies]
# This is made optional because it is not pure python, and when used prevents
# distribution of twitch-dl as a pyz archive while keeping it cross-platform.
chat = [
"pillow>=9",
"fonttools>=4,<5",
]
dev = [
"build",
"pytest",

View File

@ -1,35 +1,39 @@
import pytest
from twitchdl.utils import parse_video_identifier, parse_clip_identifier
from twitchdl.utils import parse_clip_identifier, parse_video_identifier
TEST_VIDEO_PATTERNS = [
("702689313", "702689313"),
("702689313", "https://twitch.tv/videos/702689313"),
("702689313", "https://www.twitch.tv/videos/702689313"),
("702689313", "https://m.twitch.tv/videos/702689313"),
("2223719525", "https://www.twitch.tv/r0dn3y/video/2223719525"),
]
TEST_CLIP_PATTERNS = {
("AbrasivePlayfulMangoMau5", "AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://clips.twitch.tv/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://www.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://m.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("HungryProudRadicchioDoggo", "HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://clips.twitch.tv/HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://www.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://m.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://www.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://m.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
}
@pytest.mark.parametrize("expected,input", TEST_VIDEO_PATTERNS)
def test_video_patterns(expected, input):
def test_video_patterns(expected: str, input: str):
assert parse_video_identifier(input) == expected
@pytest.mark.parametrize("expected,input", TEST_CLIP_PATTERNS)
def test_clip_patterns(expected, input):
def test_clip_patterns(expected: str, input: str):
assert parse_clip_identifier(input) == expected

View File

@ -8,8 +8,8 @@ def test_initial_values():
assert progress.progress_perc == 0
assert progress.remaining_time is None
assert progress.speed is None
assert progress.vod_count == 10
assert progress.vod_downloaded_count == 0
assert progress.file_count == 10
assert progress.downloaded_count == 0
def test_downloaded():
@ -23,26 +23,31 @@ def test_downloaded():
assert progress.progress_perc == 0
progress.advance(1, 100)
progress._recalculate()
assert progress.downloaded == 100
assert progress.progress_bytes == 100
assert progress.progress_perc == 11
progress.advance(2, 200)
progress._recalculate()
assert progress.downloaded == 300
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 450
assert progress.progress_bytes == 450
assert progress.progress_perc == 50
progress.advance(1, 50)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 500
assert progress.progress_perc == 55
progress.abort(2)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
@ -52,6 +57,7 @@ def test_downloaded():
progress.advance(1, 150)
progress.advance(2, 300)
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 1100
assert progress.progress_bytes == 900
@ -71,12 +77,15 @@ def test_estimated_total():
assert progress.estimated_total is None
progress.start(1, 12000)
progress._recalculate()
assert progress.estimated_total == 12000 * 3
progress.start(2, 11000)
progress._recalculate()
assert progress.estimated_total == 11500 * 3
progress.start(3, 10000)
progress._recalculate()
assert progress.estimated_total == 11000 * 3
@ -87,16 +96,16 @@ def test_vod_downloaded_count():
progress.start(2, 100)
progress.start(3, 100)
assert progress.vod_downloaded_count == 0
assert progress.downloaded_count == 0
progress.advance(1, 100)
progress.end(1)
assert progress.vod_downloaded_count == 1
assert progress.downloaded_count == 1
progress.advance(2, 100)
progress.end(2)
assert progress.vod_downloaded_count == 2
assert progress.downloaded_count == 2
progress.advance(3, 100)
progress.end(3)
assert progress.vod_downloaded_count == 3
assert progress.downloaded_count == 3

74
twitchdl/cache.py Normal file
View File

@ -0,0 +1,74 @@
import hashlib
import logging
import os
import sys
from pathlib import Path
from typing import Optional
from twitchdl.http import download_file
CACHE_SUBFOLDER = "twitch-dl"
logger = logging.getLogger(__name__)
def download_cached(
url: str,
*,
filename: Optional[str] = None,
subfolder: Optional[str] = None,
) -> Optional[Path]:
cache_dir = get_cache_dir()
target_dir = cache_dir / subfolder if subfolder else cache_dir
target_dir.mkdir(parents=True, exist_ok=True)
if not filename:
filename = hashlib.sha256(url.encode()).hexdigest()
target = target_dir / filename
if not target.exists():
download_file(url, target)
return target
def get_text_font() -> Path:
url = "https://cdn.jsdelivr.net/gh/notofonts/notofonts.github.io/fonts/NotoSans/full/ttf/NotoSans-Light.ttf"
path = download_cached(url, subfolder="fonts", filename="NotoSans-Light.ttf")
if not path:
raise ValueError(f"Failed downloading font from {url}")
return path
def get_noto_color_emoji_font() -> Path:
url = "https://github.com/googlefonts/noto-emoji/raw/main/fonts/NotoColorEmoji.ttf"
path = download_cached(url, subfolder="fonts", filename="NotoColorEmoji.ttf")
if not path:
raise ValueError(f"Failed downloading font from {url}")
return path
def get_cache_dir() -> Path:
path = _cache_dir_path()
path.mkdir(parents=True, exist_ok=True)
return path
def _cache_dir_path() -> Path:
"""Returns the path to the cache directory"""
# Windows
if sys.platform == "win32" and "APPDATA" in os.environ:
return Path(os.environ["APPDATA"], CACHE_SUBFOLDER, "cache")
# Mac OS
if sys.platform == "darwin":
return Path.home() / "Library" / "Caches" / CACHE_SUBFOLDER
# Respect XDG_CONFIG_HOME env variable if set
# https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
if "XDG_CACHE_HOME" in os.environ:
return Path(os.environ["XDG_CACHE_HOME"], CACHE_SUBFOLDER)
return Path.home() / ".cache" / CACHE_SUBFOLDER

391
twitchdl/chat.py Normal file
View File

@ -0,0 +1,391 @@
"""
Generate a video containing the twitch chat.
TODO:
- support clips
- use fonttool to find which characters are supported by a font
"""
from __future__ import annotations
import math
import re
import shutil
import subprocess
import time
from itertools import groupby
from pathlib import Path
from typing import Dict, Generator, List, Optional, Tuple
import click
from PIL import Image, ImageDraw, ImageFont
from twitchdl import cache
from twitchdl.entities import Badge, Comment, Emote
from twitchdl.exceptions import ConsoleError
from twitchdl.output import clear_line, cursor_previous_line, green, print_log
from twitchdl.twitch import get_comments, get_video, get_video_comments
from twitchdl.utils import format_time, iterate_with_next, parse_video_identifier
emoji_pattern = re.compile(
r"["
r"\U0001F600-\U0001F64F" # Emoticons
r"\U0001F300-\U0001F5FF" # Symbols & Pictographs
r"\U0001F680-\U0001F6FF" # Transport & Map Symbols
r"\U0001F700-\U0001F77F" # Alchemical Symbols
r"\U0001F780-\U0001F7FF" # Geometric Shapes Extended
r"\U0001F800-\U0001F8FF" # Supplemental Arrows-C
r"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
r"\U0001FA00-\U0001FA6F" # Chess Symbols
r"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A
r"\U00002702-\U000027B0" # Dingbats
r"\U0001F1E6-\U0001F1FF" # Flags (iOS)
r"\U00002500-\U00002BEF" # Various Symbols
r"\U0001F900-\U0001F9FF" # Additional Emoji in Unicode 10.0
r"\U0001F1F2-\U0001F1F4" # Enclosed characters
r"\U0001F1E6-\U0001F1FF" # Regional indicator symbols
r"\U0001F004" # Mahjong Tile Red Dragon
r"\U0001F0CF" # Playing Card Black Joker
r"\U0001F18E" # Negative Squared AB
r"\U0001F191-\U0001F251" # Squared CJK Unified Ideographs
r"\U00002600-\U000026FF" # Miscellaneous Symbols
r"]+",
flags=re.UNICODE,
)
def render_chat(id: str, width: int, height: int, font_size: int, dark: bool):
foreground = "#ffffff" if dark else "#000000"
background = "#000000" if dark else "#ffffff"
screen = Screen(width, height, font_size, foreground, background)
frames: List[Tuple[Path, int]] = []
video_id = parse_video_identifier(id)
if not video_id:
raise ConsoleError("Invalid video ID")
print_log("Looking up video...")
video = get_video(video_id)
if not video:
raise ConsoleError(f"Video {video_id} not found")
total_duration = video["lengthSeconds"]
video_comments = get_video_comments(video_id)
badges_by_id = {badge["id"]: badge for badge in video_comments["badges"]}
cache_dir = cache.get_cache_dir() / "chat" / video_id
cache_dir.mkdir(parents=True, exist_ok=True)
first = True
start = time.monotonic()
for index, offset, duration, comments in group_comments(video_id, total_duration):
if first:
# Save the initial empty frame
frame_path = cache_dir / f"chat_{0:05d}.bmp"
screen.image.save(frame_path)
frames.append((frame_path, offset))
first = False
for comment in comments:
draw_comment(screen, comment, dark, badges_by_id)
screen.next_line()
frame_path = cache_dir / f"chat_{offset:05d}.bmp"
screen.image.save(frame_path)
frames.append((frame_path, duration))
_print_progress(index, offset, start, total_duration)
spec_path = cache_dir / "concat.txt"
with open(spec_path, "w") as f:
for path, duration in frames:
f.write(f"file '{path.resolve()}'\n")
f.write(f"duration {duration}\n")
# TODO
output_path = Path(f"chat_{video_id}.mp4")
print_status("Generating chat video...", dim=True)
generate_video(spec_path, output_path)
print_status("Deleting cache...", dim=True)
shutil.rmtree(cache_dir)
def _print_progress(index: int, offset: int, start: float, total_duration: int):
perc = 100 * offset / total_duration
duration = time.monotonic() - start
print_status(
f"Rendering chat frame {index} at {index / duration:.1f}fps, "
+ f"{format_time(offset)}/{format_time(total_duration)} ({perc:.0f}%)",
transient=True,
)
def add_frame_to_spec(concat_spec: str, frame_path: Path, duration: int) -> str:
concat_spec += f"file '{frame_path.resolve()}'\n"
concat_spec += f"duration {duration}\n"
return concat_spec
def draw_comment(screen: Screen, comment: Comment, dark: bool, badges_by_id: Dict[str, Badge]):
time = format_time(comment["contentOffsetSeconds"])
screen.draw_text(time + " ", "gray")
for message_badge in comment["message"]["userBadges"]:
# Skip 'empty' badges
if message_badge["id"] == "Ozs=":
continue
badge = badges_by_id.get(message_badge["id"])
if not badge:
print_status(f"Badge not found: {message_badge}")
continue
badge_path = download_badge(badge)
if not badge_path:
print_status(f"Failed downloading badge {message_badge}")
continue
badge_image = Image.open(badge_path)
screen.draw_image(badge_image)
if comment["message"]["userBadges"]:
screen.draw_text(" ")
user = comment["commenter"]["displayName"] if comment["commenter"] else "UNKWNOW"
user_color = comment["message"]["userColor"]
screen.draw_text(user, user_color)
screen.draw_text(": ")
for fragment in comment["message"]["fragments"]:
if fragment["emote"]:
emote_path = download_emote(fragment["emote"], dark)
if emote_path:
emote_image = Image.open(emote_path)
screen.draw_image(emote_image)
else:
print_status(f"Failed downloading emote {fragment['emote']}")
screen.draw_text(" " + fragment["text"])
else:
text_blocks = emoji_pattern.split(fragment["text"])
emoji_blocks = emoji_pattern.findall(fragment["text"])
for block_index, text in enumerate(text_blocks):
for word in re.split(r"\s", text):
if word:
screen.draw_text(" " + word)
if len(emoji_blocks) > block_index:
emoji_block = emoji_blocks[block_index]
for emoji in emoji_block:
screen.draw_emoji(emoji)
class Screen:
def __init__(self, width: int, height: int, font_size: int, foreground: str, background: str):
self.foreground = foreground
self.background = background
self.x: int = 0
self.y: int = 0
self.text_font = ImageFont.truetype(cache.get_text_font(), font_size)
self.emoji_font = ImageFont.truetype(cache.get_noto_color_emoji_font(), 109)
ascent, descent = self.text_font.getmetrics()
self.ascent = ascent
self.descent = descent
self.line_height = ascent + descent
left, _, right, _ = self.text_font.getbbox(" ")
self.space_size = int(right - left)
self._image = Image.new("RGBA", (width, height), self.background)
self._draw = ImageDraw.Draw(self._image)
self._draw.font = self.text_font
@property
def image(self):
return self._image
@image.setter
def image(self, image: Image.Image):
self._image = image
self._draw = ImageDraw.Draw(self._image)
self._draw.font = self.text_font
@property
def draw(self) -> ImageDraw.ImageDraw:
return self._draw
def draw_text(self, text: str, color: Optional[str] = None):
length = math.ceil(self.draw.textlength(text)) # type: ignore
if self.image.width < self.x + length:
self.next_line()
self.draw.text((self.x, self.y), text, fill=color or self.foreground) # type: ignore
self.x += length
def draw_image(self, image: Image.Image):
if self.image.width < self.x + image.width:
self.next_line()
x = self.x + self.space_size
y = self.y
if image.height < self.line_height:
y += self.line_height - image.height - 2 # baseline align (ish)
if image.mode != self.image.mode:
image = image.convert(self.image.mode)
self.image.alpha_composite(image, (x, y))
self.x += image.width + self.space_size
def draw_emoji(self, emoji: str):
left, top, right, bottom = self.emoji_font.getbbox(emoji)
source_width = int(right - left)
source_height = int(bottom - top)
source_size = (source_width, source_height)
if source_width == 0 or source_height == 0:
print_status(f"Emoji '{emoji}' not renderable in emoji font, falling back to text font")
self.draw_text(emoji)
return
aspect_ratio = source_width / source_height
target_height = self.line_height
target_width = int(target_height * aspect_ratio)
target_size = (target_width, target_height)
if self.image.width < self.x + target_width:
self.next_line()
emoji_image = Image.new("RGBA", source_size)
emoji_draw = ImageDraw.Draw(emoji_image)
emoji_draw.text((0, 0), emoji, font=self.emoji_font, embedded_color=True) # type: ignore
resized = emoji_image.resize(target_size)
self.image.alpha_composite(resized, (self.x + self.space_size, self.y))
self.x += target_width + self.space_size
def next_line(self):
required_height = self.y + self.line_height * 2
if self.image.height < required_height:
self.shift(required_height - self.image.height)
self.x = 0
self.y += self.line_height
def shift(self, dy: int):
cropped_image = self.image.crop((0, dy, self.image.width, self.image.height))
shifted_image = Image.new(self.image.mode, self.image.size, color=self.background)
shifted_image.paste(cropped_image, (0, 0))
self.image = shifted_image
self.y -= dy
def pad(self, px: int, py: int):
width = self.image.width + 2 * px
height = self.image.height + 2 * py
padded_image = Image.new(self.image.mode, (width, height), color=self.background)
padded_image.paste(self.image, (px, py))
return padded_image
def generate_video(spec_path: Path, output: Path):
print_status("Generating chat video...")
command = [
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
spec_path,
"-fps_mode",
"vfr",
"-pix_fmt",
"yuv420p",
"-stats",
"-loglevel",
"warning",
output,
"-y",
]
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
print_status(f"Saved: {green(output)}")
def shift(image: Image.Image, dy: int, background: str):
cropped_image = image.crop((0, dy, image.width, image.height))
shifted_image = Image.new(image.mode, image.size, color=background)
shifted_image.paste(cropped_image, (0, 0))
return shifted_image
def pad(image: Image.Image, px: int, py: int, background: str):
width = image.width + 2 * px
height = image.height + 2 * py
padded_image = Image.new(image.mode, (width, height), color=background)
padded_image.paste(image, (px, py))
return padded_image
def download_badge(badge: Badge) -> Optional[Path]:
# TODO: make badge size configurable?
url = badge["image1x"]
return cache.download_cached(url, subfolder="badges")
def download_emote(emote: Emote, dark: bool) -> Optional[Path]:
# TODO: make emote size customizable
emote_id = emote["emoteID"]
variant = "dark" if dark else "light"
url = f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/{variant}/1.0"
return cache.download_cached(url, subfolder="emotes")
def group_comments(video_id: str, total_duration: int):
g1 = generate_comments(video_id)
g2 = groupby(g1, lambda x: x["contentOffsetSeconds"])
# Delazify the comments list, without this they are consumed before we get to them
g3 = ((offset, list(comments)) for offset, comments in g2)
g4 = iterate_with_next(g3)
g5 = enumerate(g4)
# We need to go deeper? ^^;
for index, ((offset, comments), next_pair) in g5:
next_offset = next_pair[0] if next_pair else total_duration
duration = next_offset - offset
yield index, offset, duration, comments
def generate_comments(video_id: str) -> Generator[Comment, None, None]:
page = 1
has_next = True
cursor = None
while has_next:
video = get_comments(video_id, cursor=cursor)
for comment in video["comments"]["edges"]:
yield comment["node"]
has_next = video["comments"]["pageInfo"]["hasNextPage"]
cursor = video["comments"]["edges"][-1]["cursor"]
page += 1
_prev_transient = False
def print_status(message: str, transient: bool = False, dim: bool = False):
global _prev_transient
if _prev_transient:
cursor_previous_line()
clear_line()
click.secho(message, err=True, dim=dim)
_prev_transient = transient

View File

@ -2,12 +2,17 @@ import logging
import platform
import re
import sys
from pathlib import Path
from textwrap import dedent
from typing import Optional, Tuple
import click
from twitchdl import __version__
from twitchdl.entities import DownloadOptions
from twitchdl.exceptions import ConsoleError
from twitchdl.naming import DEFAULT_OUTPUT_TEMPLATE
from twitchdl.output import print_log
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context
@ -79,11 +84,12 @@ def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> O
@click.group(context_settings=CONTEXT)
@click.option("--debug/--no-debug", default=False, help="Log debug info to stderr")
@click.option("--debug/--no-debug", default=False, help="Enable debug logging to stderr")
@click.option("--verbose/--no-verbose", default=False, help="More verbose debug logging")
@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output")
@click.version_option(package_name="twitch-dl")
@click.pass_context
def cli(ctx: click.Context, color: bool, debug: bool):
def cli(ctx: click.Context, color: bool, debug: bool, verbose: bool):
"""twitch-dl - twitch.tv downloader
https://twitch-dl.bezdomni.net/
@ -91,9 +97,10 @@ def cli(ctx: click.Context, color: bool, debug: bool):
ctx.color = color
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARN)
logging.getLogger("httpcore").setLevel(logging.WARN)
logging.getLogger("PIL").setLevel(logging.WARN)
@cli.command()
@ -139,6 +146,18 @@ def cli(ctx: click.Context, color: bool, debug: bool):
default="all_time",
type=click.Choice(["last_day", "last_week", "last_month", "all_time"]),
)
@click.option(
"-t",
"--target-dir",
help="Target directory when downloading clips",
type=click.Path(
file_okay=False,
readable=False,
writable=True,
path_type=Path,
),
default=Path(),
)
@json_option
def clips(
channel_name: str,
@ -149,10 +168,14 @@ def clips(
limit: Optional[int],
pager: Optional[int],
period: ClipsPeriod,
target_dir: Path,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.clips import clips
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
clips(
channel_name,
all=all,
@ -162,6 +185,7 @@ def clips(
limit=limit,
pager=pager,
period=period,
target_dir=target_dir,
)
@ -229,7 +253,7 @@ def clips(
"-o",
"--output",
help="Output file name template. See docs for details.",
default="{date}_{id}_{channel_login}_{title_slug}.{format}",
default=DEFAULT_OUTPUT_TEMPLATE,
)
@click.option(
"-q",
@ -400,3 +424,55 @@ def videos(
sort=sort,
type=type,
)
@cli.command()
@click.argument("id")
@click.option(
"-w",
"--width",
help="Chat width in pixels",
type=int,
default=400,
callback=validate_positive,
)
@click.option(
"-h",
"--height",
help="Chat height in pixels",
type=int,
default=600,
callback=validate_positive,
)
@click.option(
"--font-size",
help="Font size",
type=int,
default=18,
callback=validate_positive,
)
@click.option(
"--dark",
help="Dark mode",
is_flag=True,
)
def chat(id: str, width: int, height: int, font_size: int, dark: bool):
"""Render chat for a given video"""
print_log("Chat command is still experimental, try it out and report any bugs.")
try:
from twitchdl.chat import render_chat
render_chat(id, width, height, font_size, dark)
except ModuleNotFoundError as ex:
raise ConsoleError(
dedent(f"""
{ex}
This command requires twitch-dl to be installed with optional "chat" dependencies:
pipx install "twitch-dl[chat]"
See documentation for more info:
https://twitch-dl.bezdomni.net/commands/chat.html
""")
)

View File

@ -1,13 +1,15 @@
import re
import sys
from os import path
from typing import Callable, Generator, Optional
from pathlib import Path
from typing import Callable, Generator, List, Optional
import click
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.entities import VideoQuality
from twitchdl.http import download_file
from twitchdl.output import green, print_clip, print_clip_compact, print_json, print_paged, yellow
from twitchdl.twitch import Clip, ClipsPeriod
@ -22,6 +24,7 @@ def clips(
limit: Optional[int] = None,
pager: Optional[int] = None,
period: ClipsPeriod = "all_time",
target_dir: Path = Path(),
):
# Set different defaults for limit for compact display
default_limit = 40 if compact else 10
@ -35,7 +38,7 @@ def clips(
return print_json(list(generator))
if download:
return _download_clips(generator)
return _download_clips(target_dir, generator)
print_fn = print_clip_compact if compact else print_clip
@ -45,8 +48,8 @@ def clips(
return _print_all(generator, print_fn, all)
def _target_filename(clip: Clip):
url = clip["videoQualities"][0]["sourceURL"]
def _target_filename(clip: Clip, video_qualities: List[VideoQuality]):
url = video_qualities[0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
@ -67,16 +70,27 @@ def _target_filename(clip: Clip):
return f"{name}.{ext}"
def _download_clips(generator: Generator[Clip, None, None]):
for clip in generator:
target = _target_filename(clip)
def _download_clips(target_dir: Path, generator: Generator[Clip, None, None]):
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
if path.exists(target):
for clip in generator:
# videoQualities can be null in some circumstances, see:
# https://github.com/ihabunek/twitch-dl/issues/160
if not clip["videoQualities"]:
continue
target = target_dir / _target_filename(clip, clip["videoQualities"])
if target.exists():
click.echo(f"Already downloaded: {green(target)}")
else:
url = get_clip_authenticated_url(clip["slug"], "source")
click.echo(f"Downloading: {yellow(target)}")
download_file(url, target)
try:
url = get_clip_authenticated_url(clip["slug"], "source")
click.echo(f"Downloading: {yellow(target)}")
download_file(url, target)
except Exception as ex:
click.secho(ex, err=True, fg="red")
def _print_all(

View File

@ -1,35 +1,39 @@
import asyncio
import os
import platform
import re
import shlex
import shutil
import subprocess
import tempfile
from os import path
from pathlib import Path
from typing import Dict, List
from typing import List, Optional
from urllib.parse import urlencode, urlparse
import click
import httpx
from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.entities import DownloadOptions
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.http import download_all, download_file
from twitchdl.naming import clip_filename, video_filename
from twitchdl.output import blue, bold, green, print_log, yellow
from twitchdl.playlists import (
enumerate_vods,
get_init_sections,
load_m3u8,
make_join_playlist,
parse_playlists,
select_playlist,
)
from twitchdl.twitch import Chapter, Clip, ClipAccessToken, Video
from twitchdl.twitch import Chapter, ClipAccessToken, Video
def download(ids: List[str], args: DownloadOptions):
if not ids:
print_log("No IDs to downlad given")
return
for video_id in ids:
download_one(video_id, args)
@ -46,14 +50,14 @@ def download_one(video: str, args: DownloadOptions):
raise ConsoleError(f"Invalid input: {video}")
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
def _join_vods(playlist_path: Path, target: Path, overwrite: bool, video: Video):
description = video["description"] or ""
description = description.strip()
command = [
command: List[str] = [
"ffmpeg",
"-i",
playlist_path,
str(playlist_path),
"-c",
"copy",
"-metadata",
@ -73,15 +77,15 @@ def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
if overwrite:
command.append("-y")
click.secho(f"{' '.join(command)}", dim=True)
click.secho(f"{shlex.join(command)}", dim=True)
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: List[str], target: str):
def _concat_vods(vod_paths: List[Path], target: Path):
tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + vod_paths
command = [tool] + [str(p) for p in vod_paths]
with open(target, "wb") as target_file:
result = subprocess.run(command, stdout=target_file)
@ -89,74 +93,15 @@ def _concat_vods(vod_paths: List[str], target: str):
raise ConsoleError(f"Joining files failed: {result.stderr}")
def get_video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video["publishedAt"],
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def _video_target_filename(video: Video, args: DownloadOptions):
subs = get_video_placeholders(video, args.format)
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip: Clip, args: DownloadOptions):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _crete_temp_dir(base_uri: str) -> str:
def _crete_temp_dir(base_uri: str) -> Path:
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
temp_dir = Path(tempfile.gettempdir(), "twitch-dl", path)
temp_dir.mkdir(parents=True, exist_ok=True)
return str(temp_dir)
return temp_dir
def _get_clip_url(access_token: ClipAccessToken, quality: str) -> str:
def _get_clip_url(access_token: ClipAccessToken, quality: Optional[str]) -> str:
qualities = access_token["videoQualities"]
# Quality given as an argument
@ -184,7 +129,7 @@ def _get_clip_url(access_token: ClipAccessToken, quality: str) -> str:
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug: str, quality: str):
def get_clip_authenticated_url(slug: str, quality: Optional[str]):
print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug)
@ -216,10 +161,10 @@ def _download_clip(slug: str, args: DownloadOptions) -> None:
duration = utils.format_duration(clip["durationSeconds"])
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
target = _clip_target_filename(clip, args)
target = Path(clip_filename(clip, args.output))
click.echo(f"Target: {blue(target)}")
if not args.overwrite and path.exists(target):
if not args.overwrite and target.exists():
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
@ -248,10 +193,10 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
click.echo(f"Found: {blue(video['title'])} by {yellow(video['creator']['displayName'])}")
target = _video_target_filename(video, args)
target = Path(video_filename(video, args.format, args.output))
click.echo(f"Output: {blue(target)}")
if not args.overwrite and path.exists(target):
if not args.overwrite and target.exists():
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
@ -281,19 +226,32 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
target_dir = _crete_temp_dir(base_uri)
# Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
with open(target_dir / "playlists.m3u8", "w") as f:
f.write(playlists_text)
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
with open(target_dir / "playlist.m3u8", "w") as f:
f.write(vods_text)
click.echo(f"\nDownloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
init_sections = get_init_sections(vods_m3u8)
for uri in init_sections:
print_log(f"Downloading init section {uri}...")
download_file(f"{base_uri}{uri}", target_dir / uri)
print_log(f"Downloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + vod.path for vod in vods]
targets = [os.path.join(target_dir, f"{vod.index:05d}.ts") for vod in vods]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
targets = [target_dir / f"{vod.index:05d}.ts" for vod in vods]
asyncio.run(
download_all(
zip(sources, targets),
args.max_workers,
rate_limit=args.rate_limit,
count=len(vods),
)
)
join_playlist = make_join_playlist(vods_m3u8, vods, targets)
join_playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
join_playlist_path = target_dir / "playlist_downloaded.m3u8"
join_playlist.dump(join_playlist_path) # type: ignore
click.echo()
@ -312,12 +270,12 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
click.echo()
if args.keep:
click.echo(f"Temporary files not deleted: {target_dir}")
click.echo(f"Temporary files not deleted: {yellow(target_dir)}")
else:
print_log("Deleting temporary files...")
shutil.rmtree(target_dir)
click.echo(f"\nDownloaded: {green(target)}")
click.echo(f"Downloaded: {green(target)}")
def http_get(url: str) -> str:

View File

@ -4,8 +4,8 @@ import click
import m3u8
from twitchdl import twitch, utils
from twitchdl.commands.download import get_video_placeholders
from twitchdl.exceptions import ConsoleError
from twitchdl.naming import video_placeholders
from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video
from twitchdl.playlists import parse_playlists
from twitchdl.twitch import Chapter, Clip, Video
@ -67,7 +67,7 @@ def video_info(video: Video, playlists: str, chapters: List[Chapter]):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = get_video_placeholders(video, format="mkv")
placeholders = video_placeholders(video, format="mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
@ -98,5 +98,8 @@ def clip_info(clip: Clip):
click.echo()
click.echo("Download links:")
for q in clip["videoQualities"]:
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")
if clip["videoQualities"]:
for q in clip["videoQualities"]:
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")
else:
click.echo("No download URLs found")

View File

@ -1,37 +0,0 @@
import os
import httpx
from twitchdl.exceptions import ConsoleError
CHUNK_SIZE = 1024
CONNECT_TIMEOUT = 5
RETRY_COUNT = 5
def _download(url: str, path: str):
tmp_path = path + ".tmp"
size = 0
with httpx.stream("GET", url, timeout=CONNECT_TIMEOUT) as response:
with open(tmp_path, "wb") as target:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
target.write(chunk)
size += len(chunk)
os.rename(tmp_path, path)
return size
def download_file(url: str, path: str, retries: int = RETRY_COUNT):
if os.path.exists(path):
from_disk = True
return (os.path.getsize(path), from_disk)
from_disk = False
for _ in range(retries):
try:
return (_download(url, path), from_disk)
except httpx.RequestError:
pass
raise ConsoleError(f"Failed downloading after {retries} attempts: {url}")

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Mapping, Optional
from typing import Any, List, Literal, Mapping, Optional, TypedDict
@dataclass
@ -20,6 +20,144 @@ class DownloadOptions:
max_workers: int
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: Optional[List[VideoQuality]]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = Mapping[str, Any]
class Commenter(TypedDict):
id: str
login: str
displayName: str
Emote = TypedDict(
"Emote",
{
"id": str,
"emoteID": str,
"from": int,
},
)
class Message_Fragment(TypedDict):
emote: Optional[Emote]
text: str
class Message_Badge(TypedDict):
id: str
setID: str
version: str
class Message(TypedDict):
fragments: List[Message_Fragment]
userBadges: List[Message_Badge]
userColor: str
class Comment(TypedDict):
id: str
commenter: Commenter
contentOffsetSeconds: int
createdAt: str
message: Message
class Badge(TypedDict):
id: str
setID: str
version: str
title: str
image1x: str
image2x: str
image4x: str
clickAction: str
clickURL: str
class VideoComments_Owner(TypedDict):
id: str
login: str
broadcastBadges: List[Badge]
class VideoComments_Video(TypedDict):
id: str
broadcastType: str
lengthSeconds: int
owner: VideoComments_Owner
class VideoComments(TypedDict):
video: VideoComments_Video
badges: List[Badge]

View File

@ -3,10 +3,12 @@ import logging
import os
import time
from abc import ABC, abstractmethod
from typing import List, Optional
from pathlib import Path
from typing import Iterable, Optional, Tuple
import httpx
from twitchdl.exceptions import ConsoleError
from twitchdl.progress import Progress
logger = logging.getLogger(__name__)
@ -71,7 +73,7 @@ async def download(
client: httpx.AsyncClient,
task_id: int,
source: str,
target: str,
target: Path,
progress: Progress,
token_bucket: TokenBucket,
):
@ -96,12 +98,12 @@ async def download_with_retries(
semaphore: asyncio.Semaphore,
task_id: int,
source: str,
target: str,
target: Path,
progress: Progress,
token_bucket: TokenBucket,
):
async with semaphore:
if os.path.exists(target):
if target.exists():
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return
@ -119,13 +121,13 @@ async def download_with_retries(
async def download_all(
sources: List[str],
targets: List[str],
source_targets: Iterable[Tuple[str, Path]],
workers: int,
*,
count: Optional[int] = None,
rate_limit: Optional[int] = None,
):
progress = Progress(len(sources))
progress = Progress(count)
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
@ -139,6 +141,36 @@ async def download_all(
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(zip(sources, targets))
for task_id, (source, target) in enumerate(source_targets)
]
await asyncio.gather(*tasks)
def download_file(url: str, target: Path, retries: int = RETRY_COUNT) -> None:
"""Download URL to given target path with retries"""
error_message = ""
for r in range(retries):
try:
retry_info = f" (retry {r})" if r > 0 else ""
logger.info(f"Downloading {url} to {target}{retry_info}")
return _do_download_file(url, target)
except httpx.HTTPStatusError as ex:
logger.error(ex)
error_message = f"Server responded with HTTP {ex.response.status_code}"
except httpx.RequestError as ex:
logger.error(ex)
error_message = str(ex)
raise ConsoleError(f"Failed downloading after {retries} attempts: {error_message}")
def _do_download_file(url: str, target: Path) -> None:
tmp_path = Path(str(target) + ".tmp")
with httpx.stream("GET", url, timeout=TIMEOUT, follow_redirects=True) as response:
response.raise_for_status()
with open(tmp_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
f.write(chunk)
os.rename(tmp_path, target)

72
twitchdl/naming.py Normal file
View File

@ -0,0 +1,72 @@
import os
from typing import Dict
from twitchdl import utils
from twitchdl.entities import Clip, Video
from twitchdl.exceptions import ConsoleError
DEFAULT_OUTPUT_TEMPLATE = "{date}_{id}_{channel_login}_{title_slug}.{format}"
def video_filename(video: Video, format: str, output: str) -> str:
subs = video_placeholders(video, format)
return _format(output, subs)
def video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video["publishedAt"],
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def clip_filename(clip: Clip, output: str):
subs = clip_placeholders(clip)
return _format(output, subs)
def clip_placeholders(clip: Clip) -> Dict[str, str]:
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
if clip["videoQualities"]:
url = clip["videoQualities"][0]["sourceURL"]
_, ext = os.path.splitext(url)
ext = ext.lstrip(".")
else:
ext = "mp4"
return {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
def _format(output: str, subs: Dict[str, str]) -> str:
try:
return output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")

View File

@ -1,15 +1,25 @@
import json
import sys
from itertools import islice
from typing import Any, Callable, Generator, List, Optional, TypeVar
import click
from twitchdl import utils
from twitchdl.twitch import Clip, Video
from twitchdl.entities import Clip, Video
T = TypeVar("T")
def cursor_previous_line():
sys.stdout.write("\033[1F")
def clear_line():
sys.stdout.write("\033[2K")
sys.stdout.write("\r")
def truncate(string: str, length: int) -> str:
if len(string) > length:
return string[: length - 1] + ""
@ -21,19 +31,28 @@ def print_json(data: Any):
click.echo(json.dumps(data))
def print_log(message: Any):
click.secho(message, err=True, dim=True)
def print_log(message: Any, *, nl: bool = True):
click.secho(message, err=True, dim=True, nl=nl)
def visual_len(text: str):
return len(click.unstyle(text))
def ljust(text: str, width: int):
diff = width - visual_len(text)
return text + (" " * diff) if diff > 0 else text
def print_table(headers: List[str], data: List[List[str]]):
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [[visual_len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
underlines = ["-" * width for width in widths]
def print_row(row: List[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(cell.ljust(width), nl=False)
click.echo(ljust(cell, width), nl=False)
click.echo(" ", nl=False)
click.echo()

View File

@ -3,20 +3,23 @@ Parse and manipulate m3u8 playlists.
"""
from dataclasses import dataclass
from typing import Generator, List, Optional, OrderedDict
from pathlib import Path
from typing import Generator, List, Optional, OrderedDict, Set
import click
import m3u8
from twitchdl import utils
from twitchdl.output import bold, dim
from twitchdl.output import bold, dim, print_table
@dataclass
class Playlist:
name: str
group_id: str
resolution: Optional[str]
url: str
is_source: bool
@dataclass
@ -34,17 +37,17 @@ def parse_playlists(playlists_m3u8: str) -> List[Playlist]:
document = load_m3u8(source)
for p in document.playlists:
if p.stream_info.resolution:
name = p.media[0].name
resolution = "x".join(str(r) for r in p.stream_info.resolution)
else:
name = p.media[0].group_id
resolution = None
resolution = (
"x".join(str(r) for r in p.stream_info.resolution)
if p.stream_info.resolution
else None
)
yield Playlist(name, resolution, p.uri)
media = p.media[0]
is_source = media.group_id == "chunked"
yield Playlist(media.name, media.group_id, resolution, p.uri, is_source)
# Move audio to bottom, it has no resolution
return sorted(_parse(playlists_m3u8), key=lambda p: p.resolution is None)
return list(_parse(playlists_m3u8))
def load_m3u8(playlist_m3u8: str) -> m3u8.M3U8:
@ -79,7 +82,7 @@ def enumerate_vods(
def make_join_playlist(
playlist: m3u8.M3U8,
vods: List[Vod],
targets: List[str],
targets: List[Path],
) -> m3u8.Playlist:
"""
Make a modified playlist which references downloaded VODs
@ -91,7 +94,7 @@ def make_join_playlist(
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
segment.uri = str(path_map[segment.uri].name)
playlist.segments.append(segment)
return playlist
@ -107,10 +110,13 @@ def select_playlist(playlists: List[Playlist], quality: Optional[str]) -> Playli
def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist:
if quality == "source":
return playlists[0]
for playlist in playlists:
if playlist.is_source:
return playlist
raise click.ClickException("Source quality not found, please report an issue on github.")
for playlist in playlists:
if playlist.name == quality:
if playlist.name == quality or playlist.group_id == quality:
return playlist
available = ", ".join([p.name for p in playlists])
@ -119,13 +125,56 @@ def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist
def select_playlist_interactive(playlists: List[Playlist]) -> Playlist:
click.echo("\nAvailable qualities:")
for n, playlist in enumerate(playlists):
if playlist.resolution:
click.echo(f"{n + 1}) {bold(playlist.name)} {dim(f'({playlist.resolution})')}")
else:
click.echo(f"{n + 1}) {bold(playlist.name)}")
playlists = sorted(playlists, key=_playlist_key)
headers = ["#", "Name", "Group ID", "Resolution"]
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
rows = [
[
f"{n + 1})",
bold(playlist.name),
dim(playlist.group_id),
dim(playlist.resolution or ""),
]
for n, playlist in enumerate(playlists)
]
click.echo()
print_table(headers, rows)
default = 1
for index, playlist in enumerate(playlists):
if playlist.is_source:
default = index + 1
no = utils.read_int("\nChoose quality", min=1, max=len(playlists) + 1, default=default)
playlist = playlists[no - 1]
return playlist
MAX = 1_000_000
def _playlist_key(playlist: Playlist) -> int:
"""Attempt to sort playlists so that source quality is on top, audio only
is on bottom and others are sorted descending by resolution."""
if playlist.is_source:
return 0
if playlist.group_id == "audio_only":
return MAX
try:
return MAX - int(playlist.name.split("p")[0])
except Exception:
pass
return MAX
def get_init_sections(playlist: m3u8.M3U8) -> Set[str]:
# TODO: we're ignoring initi_section.base_uri and bytes
return set(
segment.init_section.uri
for segment in playlist.segments
if segment.init_section is not None
)

View File

@ -1,13 +1,13 @@
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from dataclasses import dataclass
from statistics import mean
from typing import Deque, Dict, NamedTuple, Optional
import click
from twitchdl.output import blue
from twitchdl.output import blue, clear_line
from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__)
@ -31,28 +31,25 @@ class Sample(NamedTuple):
timestamp: float
@dataclass
class Progress:
vod_count: int
downloaded: int = 0
estimated_total: Optional[int] = None
last_printed: float = field(default_factory=time.time)
progress_bytes: int = 0
progress_perc: int = 0
remaining_time: Optional[int] = None
speed: Optional[float] = None
start_time: float = field(default_factory=time.time)
tasks: Dict[TaskId, Task] = field(default_factory=dict)
vod_downloaded_count: int = 0
samples: Deque[Sample] = field(default_factory=lambda: deque(maxlen=100))
def __init__(self, file_count: Optional[int] = None):
self.downloaded: int = 0
self.estimated_total: Optional[int] = None
self.last_printed: Optional[float] = None
self.progress_bytes: int = 0
self.progress_perc: int = 0
self.remaining_time: Optional[int] = None
self.samples: Deque[Sample] = deque(maxlen=1000)
self.speed: Optional[float] = None
self.tasks: Dict[TaskId, Task] = {}
self.file_count = file_count
self.downloaded_count: int = 0
def start(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot start, already started")
self.tasks[task_id] = Task(task_id, size)
self._calculate_total()
self._calculate_progress()
self.print()
def advance(self, task_id: int, size: int):
@ -63,7 +60,6 @@ class Progress:
self.progress_bytes += size
self.tasks[task_id].advance(size)
self.samples.append(Sample(self.downloaded, time.time()))
self._calculate_progress()
self.print()
def already_downloaded(self, task_id: int, size: int):
@ -72,9 +68,7 @@ class Progress:
self.tasks[task_id] = Task(task_id, size)
self.progress_bytes += size
self.vod_downloaded_count += 1
self._calculate_total()
self._calculate_progress()
self.downloaded_count += 1
self.print()
def abort(self, task_id: int):
@ -83,9 +77,6 @@ class Progress:
del self.tasks[task_id]
self.progress_bytes = sum(t.downloaded for t in self.tasks.values())
self._calculate_total()
self._calculate_progress()
self.print()
def end(self, task_id: int):
@ -98,15 +89,15 @@ class Progress:
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
self.vod_downloaded_count += 1
self.downloaded_count += 1
self.print()
def _calculate_total(self):
self.estimated_total = (
int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
)
def _recalculate(self):
if self.tasks and self.file_count:
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.file_count)
else:
self.estimated_total = None
def _calculate_progress(self):
self.speed = self._calculate_speed()
self.progress_perc = (
int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
@ -133,10 +124,14 @@ class Progress:
now = time.time()
# Don't print more often than 10 times per second
if now - self.last_printed < 0.1:
if self.last_printed and now - self.last_printed < 0.1:
return
click.echo(f"\rDownloaded {self.vod_downloaded_count}/{self.vod_count} VODs", nl=False)
self._recalculate()
clear_line()
total_label = f"/{self.file_count}" if self.file_count else ""
click.echo(f"Downloaded {self.downloaded_count}{total_label} VODs", nl=False)
click.secho(f" {self.progress_perc}%", fg="blue", nl=False)
if self.estimated_total is not None:
@ -150,6 +145,4 @@ class Progress:
if self.remaining_time is not None:
click.echo(f" ETA {blue(format_time(self.remaining_time))}", nl=False)
click.echo(" ", nl=False)
self.last_printed = now

View File

@ -2,83 +2,29 @@
Twitch API access.
"""
import json
import logging
import random
import time
from typing import Any, Dict, Generator, List, Literal, Mapping, Optional, Tuple, TypedDict, Union
from typing import Any, Dict, Generator, List, Mapping, Optional, Tuple, Union
import click
import httpx
from twitchdl import CLIENT_ID
from twitchdl.entities import Data
from twitchdl.entities import (
AccessToken,
Chapter,
Clip,
ClipAccessToken,
ClipsPeriod,
Data,
Video,
VideoComments,
VideosSort,
VideosType,
)
from twitchdl.exceptions import ConsoleError
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: List[VideoQuality]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
from twitchdl.utils import format_size, remove_null_values
class GQLError(click.ClickException):
@ -135,22 +81,23 @@ logger = logging.getLogger(__name__)
def log_request(request: httpx.Request):
logger.debug(f"--> {request.method} {request.url}")
logger.info(f"--> {request.method} {request.url}")
if request.content:
logger.debug(f"--> {request.content}")
def log_response(response: httpx.Response, duration: float):
def log_response(response: httpx.Response, duration_seconds: float):
request = response.request
duration_ms = int(1000 * duration)
logger.debug(f"<-- {request.method} {request.url} HTTP {response.status_code} {duration_ms}ms")
duration = f"{int(1000 * duration_seconds)}ms"
size = format_size(len(response.content))
logger.info(f"<-- {request.method} {request.url} HTTP {response.status_code} {duration} {size}")
if response.content:
logger.debug(f"<-- {response.content}")
def gql_post(query: str):
def gql_persisted_query(query: Data):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, content=query)
response = authenticated_post(url, json=query)
gql_raise_on_error(response)
return response.json()
@ -238,22 +185,18 @@ def get_clip(slug: str) -> Optional[Clip]:
def get_clip_access_token(slug: str) -> ClipAccessToken:
query = f"""
{{
query = {
"operationName": "VideoAccessToken_Clip",
"variables": {{
"slug": "{slug}"
}},
"extensions": {{
"persistedQuery": {{
"variables": {"slug": slug},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "36b89d2507fce29e5ca551df756d27c1cfe079e2609642b4390aa4c35796eb11"
}}
}}
}}
"""
"sha256Hash": "36b89d2507fce29e5ca551df756d27c1cfe079e2609642b4390aa4c35796eb11",
}
},
}
response = gql_post(query.strip())
response = gql_persisted_query(query)
return response["data"]["clip"]
@ -325,23 +268,6 @@ def channel_clips_generator(
return _generator(clips, limit)
def channel_clips_generator_old(channel_id: str, period: ClipsPeriod, limit: int):
cursor = ""
while True:
clips = get_channel_clips(channel_id, period, limit, after=cursor)
if not clips["edges"]:
break
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
yield clips, has_next
if not cursor:
break
def get_channel_videos(
channel_id: str,
limit: int,
@ -422,7 +348,7 @@ def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessT
query = f"""
{{
videoPlaybackAccessToken(
id: {video_id},
id: "{video_id}",
params: {{
platform: "web",
playerBackend: "mediaplayer",
@ -467,8 +393,12 @@ def get_playlists(video_id: str, access_token: AccessToken) -> str:
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
"platform": "web",
"supported_codecs": "av1,h265,h264",
"p": random.randint(1000000, 10000000),
},
)
response.raise_for_status()
return response.content.decode("utf-8")
@ -503,7 +433,7 @@ def get_video_chapters(video_id: str) -> List[Chapter]:
},
}
response = gql_post(json.dumps(query))
response = gql_persisted_query(query)
return list(_chapter_nodes(response["data"]["video"]["moments"]))
@ -514,3 +444,51 @@ def _chapter_nodes(moments: Data) -> Generator[Chapter, None, None]:
del node["details"]
del node["moments"]
yield node
def get_comments(
video_id: str,
*,
cursor: Optional[str] = None,
offset_seconds: Optional[int] = None,
):
variables = remove_null_values(
{
"videoID": video_id,
"cursor": cursor,
"contentOffsetSeconds": offset_seconds,
}
)
query = {
"operationName": "VideoCommentsByOffsetOrCursor",
"variables": variables,
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "b70a3591ff0f4e0313d126c6a1502d79a1c02baebb288227c582044aa76adf6a",
}
},
}
response = gql_persisted_query(query)
return response["data"]["video"]
def get_video_comments(video_id: str) -> VideoComments:
query = {
"operationName": "VideoComments",
"variables": {
"videoID": video_id,
"hasVideoID": True,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "be06407e8d7cda72f2ee086ebb11abb6b062a7deb8985738e648090904d2f0eb",
}
},
}
response = gql_persisted_query(query)
return response["data"]

View File

@ -1,9 +1,14 @@
import re
import unicodedata
from typing import Optional, Union
from itertools import chain, islice, tee
from typing import Dict, Iterable, Optional, Tuple, TypeVar, Union
import click
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
def _format_size(value: float, digits: int, unit: str):
if digits > 0:
@ -84,13 +89,14 @@ def titlify(value: str) -> str:
VIDEO_PATTERNS = [
r"^(?P<id>\d+)?$",
r"^https://(www.)?twitch.tv/videos/(?P<id>\d+)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/videos/(?P<id>\d+)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/video/(?P<id>\d+)(\?.+)?$",
]
CLIP_PATTERNS = [
r"^(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)$",
r"^https://(www.)?twitch.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips.twitch.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips\.twitch\.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
]
@ -108,3 +114,16 @@ def parse_clip_identifier(identifier: str) -> Optional[str]:
match = re.match(pattern, identifier)
if match:
return match.group("slug")
def remove_null_values(adict: Dict[K, V]) -> Dict[K, V]:
return {k: v for k, v in adict.items() if v is not None}
def iterate_with_next(iterable: Iterable[T]) -> Iterable[Tuple[T, Optional[T]]]:
"""
Creates an iterator which provides the current and next item.
"""
items, nexts = tee(iterable, 2)
nexts = chain(islice(nexts, 1, None), [None])
return zip(items, nexts)