Compare commits

..

1 Commits
2.5.0 ... test

Author SHA1 Message Date
900914377b Add testing on github 2024-04-24 08:09:26 +02:00
27 changed files with 559 additions and 802 deletions

View File

@ -10,9 +10,9 @@ jobs:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies

2
.gitignore vendored
View File

@ -15,5 +15,3 @@ tmp/
/*.pyz
/pyrightconfig.json
/book
*.mp4
*.mkv

View File

@ -3,37 +3,6 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.5.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.5.0)
* Add support for HD video qualities (#163)
### [2.4.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.4.0)
* Add `clips --target-dir` option. Use in conjunction with `--download` to
specify target directory.
* Fix a crash when downloading clips (#160)
* Handle video URLs which contain the channel name (#162)
* Don't stop downloading clips if one download fails
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)

View File

@ -7,7 +7,7 @@ dist:
clean :
find . -name "*pyc" | xargs rm -rf $1
rm -rf build dist book bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
rm -rf build dist bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
bundle:
mkdir bundle
@ -24,7 +24,7 @@ publish :
twine upload dist/*.tar.gz dist/*.whl
coverage:
pytest --cov=twitchdl --cov-report html tests/
py.test --cov=toot --cov-report html tests/
man:
scdoc < twitch-dl.1.scd > twitch-dl.1.man

View File

@ -1,38 +1,3 @@
2.5.0:
date: 2024-08-30
changes:
- "Add support for HD video qualities (#163)"
2.4.0:
date: 2024-08-30
changes:
- "Add `clips --target-dir` option. Use in conjunction with `--download` to specify target directory."
- "Fix a crash when downloading clips (#160)"
- "Handle video URLs which contain the channel name (#162)"
- "Don't stop downloading clips if one download fails"
2.3.1:
date: 2024-05-19
changes:
- "Fix fetching access token (#155, thanks @KryptonicDragon)"
2.3.0:
date: 2024-04-27
changes:
- "Show more playlist data when choosing quality"
- "Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams (#154)"
2.2.4:
date: 2024-04-25
changes:
- "Add m dot url support to video and clip regexes (thanks @localnerve)"
2.2.3:
date: 2024-04-24
changes:
- "Respect --dry-run option when downloading videos"
- "Add automated tests on github actions"
2.2.2:
date: 2024-04-23
changes:

View File

@ -3,37 +3,6 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.5.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.5.0)
* Add support for HD video qualities (#163)
### [2.4.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.4.0)
* Add `clips --target-dir` option. Use in conjunction with `--download` to
specify target directory.
* Fix a crash when downloading clips (#160)
* Handle video URLs which contain the channel name (#162)
* Don't stop downloading clips if one download fails
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)

View File

@ -43,11 +43,6 @@ twitch-dl clips [OPTIONS] CHANNEL_NAME
<td>Period from which to return clips Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>. [default: <code>all_time</code>]</td>
</tr>
<tr>
<td class="code">-t, --target-dir</td>
<td>Target directory when downloading clips [default: <code>.</code>]</td>
</tr>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>

View File

@ -22,7 +22,7 @@ classifiers = [
dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=3.0.0,<7.0.0",
"m3u8>=1.0.0,<4.0.0",
]
[tool.setuptools]
@ -56,6 +56,7 @@ test = [
twitch-dl = "twitchdl.cli:cli"
[tool.pyright]
include = ["twitchdl"]
typeCheckingMode = "strict"
pythonVersion = "3.8"

View File

@ -3,12 +3,9 @@ These tests depend on the channel having some videos and clips published.
"""
import httpx
import pytest
from twitchdl import twitch
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.commands.videos import get_game_ids
from twitchdl.exceptions import ConsoleError
from twitchdl.playlists import enumerate_vods, load_m3u8, parse_playlists
TEST_CHANNEL = "bananasaurus_rex"
@ -56,15 +53,3 @@ def test_get_clips():
assert clip["slug"] == slug
assert get_clip_authenticated_url(slug, "source")
def test_get_games():
assert get_game_ids([]) == []
assert get_game_ids(["Bioshock"]) == ["15866"]
assert get_game_ids(["Bioshock", "Portal"]) == ["15866", "6187"]
def test_get_games_not_found():
with pytest.raises(ConsoleError) as ex:
get_game_ids(["the game which does not exist"])
assert str(ex.value) == "Game 'the game which does not exist' not found"

View File

@ -1,156 +0,0 @@
import json
import pytest
from click.testing import CliRunner, Result
from twitchdl import cli
@pytest.fixture(scope="session")
def runner():
return CliRunner(mix_stderr=False)
def assert_ok(result: Result):
if result.exit_code != 0:
raise AssertionError(
f"Command failed with exit code {result.exit_code}\nStderr: {result.stderr}"
)
def test_info_video(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595"])
assert_ok(result)
assert "Frost Fatales 2024 Day 1" in result.stdout
assert "frozenflygone playing Tomb Raider" in result.stdout
def test_info_video_json(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595", "--json"])
assert_ok(result)
video = json.loads(result.stdout)
assert video["title"] == "Frost Fatales 2024 Day 1"
assert video["game"] == {"id": "2770", "name": "Tomb Raider"}
assert video["creator"] == {"login": "frozenflygone", "displayName": "frozenflygone"}
def test_info_clip(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank"])
assert_ok(result)
assert "AGDQ Crashes during Bioshock run" in result.stdout
assert "GamesDoneQuick playing BioShock" in result.stdout
def test_info_clip_json(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank", "--json"])
assert_ok(result)
clip = json.loads(result.stdout)
assert clip["slug"] == "PoisedTalentedPuddingChefFrank"
assert clip["title"] == "AGDQ Crashes during Bioshock run"
assert clip["game"] == {"id": "15866", "name": "BioShock"}
assert clip["broadcaster"] == {"displayName": "GamesDoneQuick", "login": "gamesdonequick"}
def test_info_not_found(runner: CliRunner):
result = runner.invoke(cli.info, ["banana"])
assert result.exit_code == 1
assert "Clip banana not found" in result.stderr
result = runner.invoke(cli.info, ["12345"])
assert result.exit_code == 1
assert "Video 12345 not found" in result.stderr
result = runner.invoke(cli.info, [""])
assert result.exit_code == 1
assert "Invalid input" in result.stderr
def test_download_clip(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"PoisedTalentedPuddingChefFrank",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert (
"Found: AGDQ Crashes during Bioshock run by GamesDoneQuick, playing BioShock (30 sec)"
in result.stdout
)
assert (
"Target: 2020-01-10_3099545841_gamesdonequick_agdq_crashes_during_bioshock_run.mp4"
in result.stdout
)
assert "Dry run, clip not downloaded." in result.stdout
def test_download_video(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"2090131595",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert "Found: Frost Fatales 2024 Day 1 by frozenflygone" in result.stdout
assert (
"Output: 2024-03-14_2090131595_frozenflygone_frost_fatales_2024_day_1.mkv" in result.stdout
)
assert "Dry run, video not downloaded." in result.stdout
def test_videos(runner: CliRunner):
result = runner.invoke(cli.videos, ["gamesdonequick", "--json"])
assert_ok(result)
videos = json.loads(result.stdout)
assert videos["count"] == 10
assert videos["totalCount"] > 0
video = videos["videos"][0]
result = runner.invoke(cli.videos, "gamesdonequick")
assert_ok(result)
assert f"Video {video['id']}" in result.stdout
assert video["title"] in result.stdout
result = runner.invoke(cli.videos, ["gamesdonequick", "--compact"])
assert_ok(result)
assert video["id"] in result.stdout
assert video["title"][:60] in result.stdout
def test_videos_channel_not_found(runner: CliRunner):
result = runner.invoke(cli.videos, ["doesnotexisthopefully"])
assert result.exit_code == 1
assert result.stderr.strip() == "Error: Channel doesnotexisthopefully not found"
def test_clips(runner: CliRunner):
result = runner.invoke(cli.clips, ["gamesdonequick", "--json"])
assert_ok(result)
clips = json.loads(result.stdout)
clip = clips[0]
result = runner.invoke(cli.clips, "gamesdonequick")
assert_ok(result)
assert f"Clip {clip['slug']}" in result.stdout
assert clip["title"] in result.stdout
result = runner.invoke(cli.clips, ["gamesdonequick", "--compact"])
assert_ok(result)
assert clip["slug"] in result.stdout
assert clip["title"][:60] in result.stdout

View File

@ -1,39 +1,35 @@
import pytest
from twitchdl.utils import parse_clip_identifier, parse_video_identifier
from twitchdl.utils import parse_video_identifier, parse_clip_identifier
TEST_VIDEO_PATTERNS = [
("702689313", "702689313"),
("702689313", "https://twitch.tv/videos/702689313"),
("702689313", "https://www.twitch.tv/videos/702689313"),
("702689313", "https://m.twitch.tv/videos/702689313"),
("2223719525", "https://www.twitch.tv/r0dn3y/video/2223719525"),
]
TEST_CLIP_PATTERNS = {
("AbrasivePlayfulMangoMau5", "AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://clips.twitch.tv/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://www.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://m.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("HungryProudRadicchioDoggo", "HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://clips.twitch.tv/HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://www.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://m.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://www.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://m.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
}
@pytest.mark.parametrize("expected,input", TEST_VIDEO_PATTERNS)
def test_video_patterns(expected: str, input: str):
def test_video_patterns(expected, input):
assert parse_video_identifier(input) == expected
@pytest.mark.parametrize("expected,input", TEST_CLIP_PATTERNS)
def test_clip_patterns(expected: str, input: str):
def test_clip_patterns(expected, input):
assert parse_clip_identifier(input) == expected

View File

@ -8,8 +8,8 @@ def test_initial_values():
assert progress.progress_perc == 0
assert progress.remaining_time is None
assert progress.speed is None
assert progress.file_count == 10
assert progress.downloaded_count == 0
assert progress.vod_count == 10
assert progress.vod_downloaded_count == 0
def test_downloaded():
@ -23,31 +23,26 @@ def test_downloaded():
assert progress.progress_perc == 0
progress.advance(1, 100)
progress._recalculate()
assert progress.downloaded == 100
assert progress.progress_bytes == 100
assert progress.progress_perc == 11
progress.advance(2, 200)
progress._recalculate()
assert progress.downloaded == 300
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 450
assert progress.progress_bytes == 450
assert progress.progress_perc == 50
progress.advance(1, 50)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 500
assert progress.progress_perc == 55
progress.abort(2)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
@ -57,7 +52,6 @@ def test_downloaded():
progress.advance(1, 150)
progress.advance(2, 300)
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 1100
assert progress.progress_bytes == 900
@ -77,15 +71,12 @@ def test_estimated_total():
assert progress.estimated_total is None
progress.start(1, 12000)
progress._recalculate()
assert progress.estimated_total == 12000 * 3
progress.start(2, 11000)
progress._recalculate()
assert progress.estimated_total == 11500 * 3
progress.start(3, 10000)
progress._recalculate()
assert progress.estimated_total == 11000 * 3
@ -96,16 +87,16 @@ def test_vod_downloaded_count():
progress.start(2, 100)
progress.start(3, 100)
assert progress.downloaded_count == 0
assert progress.vod_downloaded_count == 0
progress.advance(1, 100)
progress.end(1)
assert progress.downloaded_count == 1
assert progress.vod_downloaded_count == 1
progress.advance(2, 100)
progress.end(2)
assert progress.downloaded_count == 2
assert progress.vod_downloaded_count == 2
progress.advance(3, 100)
progress.end(3)
assert progress.downloaded_count == 3
assert progress.vod_downloaded_count == 3

View File

@ -2,14 +2,12 @@ import logging
import platform
import re
import sys
from pathlib import Path
from typing import Optional, Tuple
import click
from twitchdl import __version__
from twitchdl.entities import DownloadOptions
from twitchdl.naming import DEFAULT_OUTPUT_TEMPLATE
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context
@ -81,12 +79,11 @@ def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> O
@click.group(context_settings=CONTEXT)
@click.option("--debug/--no-debug", default=False, help="Enable debug logging to stderr")
@click.option("--verbose/--no-verbose", default=False, help="More verbose debug logging")
@click.option("--debug/--no-debug", default=False, help="Log debug info to stderr")
@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output")
@click.version_option(package_name="twitch-dl")
@click.pass_context
def cli(ctx: click.Context, color: bool, debug: bool, verbose: bool):
def cli(ctx: click.Context, color: bool, debug: bool):
"""twitch-dl - twitch.tv downloader
https://twitch-dl.bezdomni.net/
@ -94,9 +91,7 @@ def cli(ctx: click.Context, color: bool, debug: bool, verbose: bool):
ctx.color = color
if debug:
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARN)
logging.getLogger("httpcore").setLevel(logging.WARN)
logging.basicConfig(level=logging.INFO)
@cli.command()
@ -142,18 +137,6 @@ def cli(ctx: click.Context, color: bool, debug: bool, verbose: bool):
default="all_time",
type=click.Choice(["last_day", "last_week", "last_month", "all_time"]),
)
@click.option(
"-t",
"--target-dir",
help="Target directory when downloading clips",
type=click.Path(
file_okay=False,
readable=False,
writable=True,
path_type=Path,
),
default=Path(),
)
@json_option
def clips(
channel_name: str,
@ -164,14 +147,10 @@ def clips(
limit: Optional[int],
pager: Optional[int],
period: ClipsPeriod,
target_dir: Path,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.clips import clips
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
clips(
channel_name,
all=all,
@ -181,7 +160,6 @@ def clips(
limit=limit,
pager=pager,
period=period,
target_dir=target_dir,
)
@ -249,7 +227,7 @@ def clips(
"-o",
"--output",
help="Output file name template. See docs for details.",
default=DEFAULT_OUTPUT_TEMPLATE,
default="{date}_{id}_{channel_login}_{title_slug}.{format}",
)
@click.option(
"-q",

View File

@ -1,15 +1,13 @@
import re
import sys
from os import path
from pathlib import Path
from typing import Callable, Generator, List, Optional
from typing import Callable, Generator, Optional
import click
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.entities import VideoQuality
from twitchdl.http import download_file
from twitchdl.download import download_file
from twitchdl.output import green, print_clip, print_clip_compact, print_json, print_paged, yellow
from twitchdl.twitch import Clip, ClipsPeriod
@ -24,7 +22,6 @@ def clips(
limit: Optional[int] = None,
pager: Optional[int] = None,
period: ClipsPeriod = "all_time",
target_dir: Path = Path(),
):
# Set different defaults for limit for compact display
default_limit = 40 if compact else 10
@ -38,7 +35,7 @@ def clips(
return print_json(list(generator))
if download:
return _download_clips(target_dir, generator)
return _download_clips(generator)
print_fn = print_clip_compact if compact else print_clip
@ -48,8 +45,8 @@ def clips(
return _print_all(generator, print_fn, all)
def _target_filename(clip: Clip, video_qualities: List[VideoQuality]):
url = video_qualities[0]["sourceURL"]
def _target_filename(clip: Clip):
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
@ -70,27 +67,16 @@ def _target_filename(clip: Clip, video_qualities: List[VideoQuality]):
return f"{name}.{ext}"
def _download_clips(target_dir: Path, generator: Generator[Clip, None, None]):
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
def _download_clips(generator: Generator[Clip, None, None]):
for clip in generator:
# videoQualities can be null in some circumstances, see:
# https://github.com/ihabunek/twitch-dl/issues/160
if not clip["videoQualities"]:
continue
target = _target_filename(clip)
target = target_dir / _target_filename(clip, clip["videoQualities"])
if target.exists():
if path.exists(target):
click.echo(f"Already downloaded: {green(target)}")
else:
try:
url = get_clip_authenticated_url(clip["slug"], "source")
click.echo(f"Downloading: {yellow(target)}")
download_file(url, target)
except Exception as ex:
click.secho(ex, err=True, fg="red")
url = get_clip_authenticated_url(clip["slug"], "source")
click.echo(f"Downloading: {yellow(target)}")
download_file(url, target)
def _print_all(

View File

@ -1,38 +1,35 @@
import asyncio
import os
import platform
import re
import shutil
import subprocess
import tempfile
from os import path
from pathlib import Path
from typing import List, Optional
from typing import Dict, List
from urllib.parse import urlencode, urlparse
import click
import httpx
from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.entities import DownloadOptions
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all, download_file
from twitchdl.naming import clip_filename, video_filename
from twitchdl.http import download_all
from twitchdl.output import blue, bold, green, print_log, yellow
from twitchdl.playlists import (
enumerate_vods,
get_init_sections,
load_m3u8,
make_join_playlist,
parse_playlists,
select_playlist,
)
from twitchdl.twitch import Chapter, ClipAccessToken, Video
from twitchdl.twitch import Chapter, Clip, ClipAccessToken, Video
def download(ids: List[str], args: DownloadOptions):
if not ids:
print_log("No IDs to downlad given")
return
for video_id in ids:
download_one(video_id, args)
@ -49,14 +46,14 @@ def download_one(video: str, args: DownloadOptions):
raise ConsoleError(f"Invalid input: {video}")
def _join_vods(playlist_path: Path, target: Path, overwrite: bool, video: Video):
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
description = video["description"] or ""
description = description.strip()
command: List[str] = [
command = [
"ffmpeg",
"-i",
str(playlist_path),
playlist_path,
"-c",
"copy",
"-metadata",
@ -82,9 +79,9 @@ def _join_vods(playlist_path: Path, target: Path, overwrite: bool, video: Video)
raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: List[Path], target: Path):
def _concat_vods(vod_paths: List[str], target: str):
tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + [str(p) for p in vod_paths]
command = [tool] + vod_paths
with open(target, "wb") as target_file:
result = subprocess.run(command, stdout=target_file)
@ -92,15 +89,74 @@ def _concat_vods(vod_paths: List[Path], target: Path):
raise ConsoleError(f"Joining files failed: {result.stderr}")
def _crete_temp_dir(base_uri: str) -> Path:
def get_video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video["publishedAt"],
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def _video_target_filename(video: Video, args: DownloadOptions):
subs = get_video_placeholders(video, args.format)
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip: Clip, args: DownloadOptions):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _crete_temp_dir(base_uri: str) -> str:
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
temp_dir = Path(tempfile.gettempdir(), "twitch-dl", path)
temp_dir.mkdir(parents=True, exist_ok=True)
return temp_dir
return str(temp_dir)
def _get_clip_url(access_token: ClipAccessToken, quality: Optional[str]) -> str:
def _get_clip_url(access_token: ClipAccessToken, quality: str) -> str:
qualities = access_token["videoQualities"]
# Quality given as an argument
@ -128,7 +184,7 @@ def _get_clip_url(access_token: ClipAccessToken, quality: Optional[str]) -> str:
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug: str, quality: Optional[str]):
def get_clip_authenticated_url(slug: str, quality: str):
print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug)
@ -160,10 +216,10 @@ def _download_clip(slug: str, args: DownloadOptions) -> None:
duration = utils.format_duration(clip["durationSeconds"])
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
target = Path(clip_filename(clip, args.output))
target = _clip_target_filename(clip, args)
click.echo(f"Target: {blue(target)}")
if not args.overwrite and target.exists():
if not args.overwrite and path.exists(target):
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
@ -192,10 +248,10 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
click.echo(f"Found: {blue(video['title'])} by {yellow(video['creator']['displayName'])}")
target = Path(video_filename(video, args.format, args.output))
target = _video_target_filename(video, args)
click.echo(f"Output: {blue(target)}")
if not args.overwrite and target.exists():
if not args.overwrite and path.exists(target):
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
@ -217,40 +273,23 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
vods_m3u8 = load_m3u8(vods_text)
vods = enumerate_vods(vods_m3u8, start, end)
if args.dry_run:
click.echo("Dry run, video not downloaded.")
return
base_uri = re.sub("/[^/]+$", "/", playlist.url)
target_dir = _crete_temp_dir(base_uri)
# Save playlists for debugging purposes
with open(target_dir / "playlists.m3u8", "w") as f:
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
f.write(playlists_text)
with open(target_dir / "playlist.m3u8", "w") as f:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(vods_text)
click.echo(f"\nDownloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
init_sections = get_init_sections(vods_m3u8)
for uri in init_sections:
print_log(f"Downloading init section {uri}...")
download_file(f"{base_uri}{uri}", target_dir / uri)
sources = [base_uri + vod.path for vod in vods]
targets = [target_dir / f"{vod.index:05d}.ts" for vod in vods]
asyncio.run(
download_all(
zip(sources, targets),
args.max_workers,
rate_limit=args.rate_limit,
count=len(vods),
)
)
targets = [os.path.join(target_dir, f"{vod.index:05d}.ts") for vod in vods]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
join_playlist = make_join_playlist(vods_m3u8, vods, targets)
join_playlist_path = target_dir / "playlist_downloaded.m3u8"
join_playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
join_playlist.dump(join_playlist_path) # type: ignore
click.echo()

View File

@ -4,8 +4,8 @@ import click
import m3u8
from twitchdl import twitch, utils
from twitchdl.commands.download import get_video_placeholders
from twitchdl.exceptions import ConsoleError
from twitchdl.naming import video_placeholders
from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video
from twitchdl.playlists import parse_playlists
from twitchdl.twitch import Chapter, Clip, Video
@ -67,7 +67,7 @@ def video_info(video: Video, playlists: str, chapters: List[Chapter]):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = video_placeholders(video, format="mkv")
placeholders = get_video_placeholders(video, format="mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
@ -98,8 +98,5 @@ def clip_info(clip: Clip):
click.echo()
click.echo("Download links:")
if clip["videoQualities"]:
for q in clip["videoQualities"]:
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")
else:
click.echo("No download URLs found")
for q in clip["videoQualities"]:
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")

View File

@ -20,7 +20,7 @@ def videos(
sort: twitch.VideosSort,
type: twitch.VideosType,
):
game_ids = get_game_ids(games)
game_ids = _get_game_ids(games)
# Set different defaults for limit for compact display
limit = limit or (40 if compact else 10)
@ -67,13 +67,16 @@ def videos(
)
def get_game_ids(names: List[str]) -> List[str]:
return [get_game_id(name) for name in names]
def _get_game_ids(names: List[str]) -> List[str]:
if not names:
return []
game_ids = []
for name in names:
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game_id))
def get_game_id(name: str) -> str:
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
return game_id
return game_ids

37
twitchdl/download.py Normal file
View File

@ -0,0 +1,37 @@
import os
import httpx
from twitchdl.exceptions import ConsoleError
CHUNK_SIZE = 1024
CONNECT_TIMEOUT = 5
RETRY_COUNT = 5
def _download(url: str, path: str):
tmp_path = path + ".tmp"
size = 0
with httpx.stream("GET", url, timeout=CONNECT_TIMEOUT) as response:
with open(tmp_path, "wb") as target:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
target.write(chunk)
size += len(chunk)
os.rename(tmp_path, path)
return size
def download_file(url: str, path: str, retries: int = RETRY_COUNT):
if os.path.exists(path):
from_disk = True
return (os.path.getsize(path), from_disk)
from_disk = False
for _ in range(retries):
try:
return (_download(url, path), from_disk)
except httpx.RequestError:
pass
raise ConsoleError(f"Failed downloading after {retries} attempts: {url}")

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, List, Literal, Mapping, Optional, TypedDict
from typing import Any, Mapping, Optional
@dataclass
@ -20,73 +20,6 @@ class DownloadOptions:
max_workers: int
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: Optional[List[VideoQuality]]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = Mapping[str, Any]

View File

@ -3,12 +3,10 @@ import logging
import os
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterable, Optional, Tuple
from typing import List, Optional
import httpx
from twitchdl.exceptions import ConsoleError
from twitchdl.progress import Progress
logger = logging.getLogger(__name__)
@ -73,7 +71,7 @@ async def download(
client: httpx.AsyncClient,
task_id: int,
source: str,
target: Path,
target: str,
progress: Progress,
token_bucket: TokenBucket,
):
@ -98,12 +96,12 @@ async def download_with_retries(
semaphore: asyncio.Semaphore,
task_id: int,
source: str,
target: Path,
target: str,
progress: Progress,
token_bucket: TokenBucket,
):
async with semaphore:
if target.exists():
if os.path.exists(target):
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return
@ -121,13 +119,13 @@ async def download_with_retries(
async def download_all(
source_targets: Iterable[Tuple[str, Path]],
sources: List[str],
targets: List[str],
workers: int,
*,
count: Optional[int] = None,
rate_limit: Optional[int] = None,
):
progress = Progress(count)
progress = Progress(len(sources))
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
@ -141,36 +139,6 @@ async def download_all(
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(source_targets)
for task_id, (source, target) in enumerate(zip(sources, targets))
]
await asyncio.gather(*tasks)
def download_file(url: str, target: Path, retries: int = RETRY_COUNT) -> None:
"""Download URL to given target path with retries"""
error_message = ""
for r in range(retries):
try:
retry_info = f" (retry {r})" if r > 0 else ""
logger.info(f"Downloading {url} to {target}{retry_info}")
return _do_download_file(url, target)
except httpx.HTTPStatusError as ex:
logger.error(ex)
error_message = f"Server responded with HTTP {ex.response.status_code}"
except httpx.RequestError as ex:
logger.error(ex)
error_message = str(ex)
raise ConsoleError(f"Failed downloading after {retries} attempts: {error_message}")
def _do_download_file(url: str, target: Path) -> None:
tmp_path = Path(str(target) + ".tmp")
with httpx.stream("GET", url, timeout=TIMEOUT, follow_redirects=True) as response:
response.raise_for_status()
with open(tmp_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
f.write(chunk)
os.rename(tmp_path, target)

View File

@ -1,72 +0,0 @@
import os
from typing import Dict
from twitchdl import utils
from twitchdl.entities import Clip, Video
from twitchdl.exceptions import ConsoleError
DEFAULT_OUTPUT_TEMPLATE = "{date}_{id}_{channel_login}_{title_slug}.{format}"
def video_filename(video: Video, format: str, output: str) -> str:
subs = video_placeholders(video, format)
return _format(output, subs)
def video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video["publishedAt"],
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def clip_filename(clip: Clip, output: str):
subs = clip_placeholders(clip)
return _format(output, subs)
def clip_placeholders(clip: Clip) -> Dict[str, str]:
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
if clip["videoQualities"]:
url = clip["videoQualities"][0]["sourceURL"]
_, ext = os.path.splitext(url)
ext = ext.lstrip(".")
else:
ext = "mp4"
return {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
def _format(output: str, subs: Dict[str, str]) -> str:
try:
return output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")

View File

@ -1,21 +1,15 @@
import json
import sys
from itertools import islice
from typing import Any, Callable, Generator, List, Optional, TypeVar
import click
from twitchdl import utils
from twitchdl.entities import Clip, Video
from twitchdl.twitch import Clip, Video
T = TypeVar("T")
def clear_line():
sys.stdout.write("\033[1K")
sys.stdout.write("\r")
def truncate(string: str, length: int) -> str:
if len(string) > length:
return string[: length - 1] + ""
@ -31,24 +25,15 @@ def print_log(message: Any):
click.secho(message, err=True, dim=True)
def visual_len(text: str):
return len(click.unstyle(text))
def ljust(text: str, width: int):
diff = width - visual_len(text)
return text + (" " * diff) if diff > 0 else text
def print_table(headers: List[str], data: List[List[str]]):
widths = [[visual_len(cell) for cell in row] for row in data + [headers]]
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
underlines = ["-" * width for width in widths]
def print_row(row: List[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(ljust(cell, width), nl=False)
click.echo(cell.ljust(width), nl=False)
click.echo(" ", nl=False)
click.echo()

View File

@ -3,23 +3,20 @@ Parse and manipulate m3u8 playlists.
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Generator, List, Optional, OrderedDict, Set
from typing import Generator, List, Optional, OrderedDict
import click
import m3u8
from twitchdl import utils
from twitchdl.output import bold, dim, print_table
from twitchdl.output import bold, dim
@dataclass
class Playlist:
name: str
group_id: str
resolution: Optional[str]
url: str
is_source: bool
@dataclass
@ -37,17 +34,21 @@ def parse_playlists(playlists_m3u8: str) -> List[Playlist]:
document = load_m3u8(source)
for p in document.playlists:
resolution = (
"x".join(str(r) for r in p.stream_info.resolution)
if p.stream_info.resolution
else None
)
from pprint import pp
media = p.media[0]
is_source = media.group_id == "chunked"
yield Playlist(media.name, media.group_id, resolution, p.uri, is_source)
pp(p.__dict__)
pp(p.stream_info.__dict__)
if p.stream_info.resolution:
name = p.media[0].name
resolution = "x".join(str(r) for r in p.stream_info.resolution)
else:
name = p.media[0].group_id
resolution = None
return list(_parse(playlists_m3u8))
yield Playlist(name, resolution, p.uri)
# Move audio to bottom, it has no resolution
return sorted(_parse(playlists_m3u8), key=lambda p: p.resolution is None)
def load_m3u8(playlist_m3u8: str) -> m3u8.M3U8:
@ -82,7 +83,7 @@ def enumerate_vods(
def make_join_playlist(
playlist: m3u8.M3U8,
vods: List[Vod],
targets: List[Path],
targets: List[str],
) -> m3u8.Playlist:
"""
Make a modified playlist which references downloaded VODs
@ -94,7 +95,7 @@ def make_join_playlist(
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = str(path_map[segment.uri].name)
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
return playlist
@ -110,13 +111,10 @@ def select_playlist(playlists: List[Playlist], quality: Optional[str]) -> Playli
def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist:
if quality == "source":
for playlist in playlists:
if playlist.is_source:
return playlist
raise click.ClickException("Source quality not found, please report an issue on github.")
return playlists[0]
for playlist in playlists:
if playlist.name == quality or playlist.group_id == quality:
if playlist.name == quality:
return playlist
available = ", ".join([p.name for p in playlists])
@ -125,56 +123,13 @@ def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist
def select_playlist_interactive(playlists: List[Playlist]) -> Playlist:
playlists = sorted(playlists, key=_playlist_key)
headers = ["#", "Name", "Group ID", "Resolution"]
click.echo("\nAvailable qualities:")
for n, playlist in enumerate(playlists):
if playlist.resolution:
click.echo(f"{n + 1}) {bold(playlist.name)} {dim(f'({playlist.resolution})')}")
else:
click.echo(f"{n + 1}) {bold(playlist.name)}")
rows = [
[
f"{n + 1})",
bold(playlist.name),
dim(playlist.group_id),
dim(playlist.resolution or ""),
]
for n, playlist in enumerate(playlists)
]
click.echo()
print_table(headers, rows)
default = 1
for index, playlist in enumerate(playlists):
if playlist.is_source:
default = index + 1
no = utils.read_int("\nChoose quality", min=1, max=len(playlists) + 1, default=default)
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
playlist = playlists[no - 1]
return playlist
MAX = 1_000_000
def _playlist_key(playlist: Playlist) -> int:
"""Attempt to sort playlists so that source quality is on top, audio only
is on bottom and others are sorted descending by resolution."""
if playlist.is_source:
return 0
if playlist.group_id == "audio_only":
return MAX
try:
return MAX - int(playlist.name.split("p")[0])
except Exception:
pass
return MAX
def get_init_sections(playlist: m3u8.M3U8) -> Set[str]:
# TODO: we're ignoring initi_section.base_uri and bytes
return set(
segment.init_section.uri
for segment in playlist.segments
if segment.init_section is not None
)

View File

@ -1,13 +1,13 @@
import logging
import time
from collections import deque
from dataclasses import dataclass
from dataclasses import dataclass, field
from statistics import mean
from typing import Deque, Dict, NamedTuple, Optional
import click
from twitchdl.output import blue, clear_line
from twitchdl.output import blue
from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__)
@ -31,25 +31,28 @@ class Sample(NamedTuple):
timestamp: float
@dataclass
class Progress:
def __init__(self, file_count: Optional[int] = None):
self.downloaded: int = 0
self.estimated_total: Optional[int] = None
self.last_printed: Optional[float] = None
self.progress_bytes: int = 0
self.progress_perc: int = 0
self.remaining_time: Optional[int] = None
self.samples: Deque[Sample] = deque(maxlen=1000)
self.speed: Optional[float] = None
self.tasks: Dict[TaskId, Task] = {}
self.file_count = file_count
self.downloaded_count: int = 0
vod_count: int
downloaded: int = 0
estimated_total: Optional[int] = None
last_printed: float = field(default_factory=time.time)
progress_bytes: int = 0
progress_perc: int = 0
remaining_time: Optional[int] = None
speed: Optional[float] = None
start_time: float = field(default_factory=time.time)
tasks: Dict[TaskId, Task] = field(default_factory=dict)
vod_downloaded_count: int = 0
samples: Deque[Sample] = field(default_factory=lambda: deque(maxlen=100))
def start(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot start, already started")
self.tasks[task_id] = Task(task_id, size)
self._calculate_total()
self._calculate_progress()
self.print()
def advance(self, task_id: int, size: int):
@ -60,6 +63,7 @@ class Progress:
self.progress_bytes += size
self.tasks[task_id].advance(size)
self.samples.append(Sample(self.downloaded, time.time()))
self._calculate_progress()
self.print()
def already_downloaded(self, task_id: int, size: int):
@ -68,7 +72,9 @@ class Progress:
self.tasks[task_id] = Task(task_id, size)
self.progress_bytes += size
self.downloaded_count += 1
self.vod_downloaded_count += 1
self._calculate_total()
self._calculate_progress()
self.print()
def abort(self, task_id: int):
@ -77,6 +83,9 @@ class Progress:
del self.tasks[task_id]
self.progress_bytes = sum(t.downloaded for t in self.tasks.values())
self._calculate_total()
self._calculate_progress()
self.print()
def end(self, task_id: int):
@ -89,15 +98,15 @@ class Progress:
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
self.downloaded_count += 1
self.vod_downloaded_count += 1
self.print()
def _recalculate(self):
if self.tasks and self.file_count:
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.file_count)
else:
self.estimated_total = None
def _calculate_total(self):
self.estimated_total = (
int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
)
def _calculate_progress(self):
self.speed = self._calculate_speed()
self.progress_perc = (
int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
@ -124,14 +133,10 @@ class Progress:
now = time.time()
# Don't print more often than 10 times per second
if self.last_printed and now - self.last_printed < 0.1:
if now - self.last_printed < 0.1:
return
self._recalculate()
clear_line()
total_label = f"/{self.file_count}" if self.file_count else ""
click.echo(f"Downloaded {self.downloaded_count}{total_label} VODs", nl=False)
click.echo(f"\rDownloaded {self.vod_downloaded_count}/{self.vod_count} VODs", nl=False)
click.secho(f" {self.progress_perc}%", fg="blue", nl=False)
if self.estimated_total is not None:
@ -145,4 +150,6 @@ class Progress:
if self.remaining_time is not None:
click.echo(f" ETA {blue(format_time(self.remaining_time))}", nl=False)
click.echo(" ", nl=False)
self.last_printed = now

View File

@ -2,28 +2,81 @@
Twitch API access.
"""
import logging
import random
import time
from typing import Any, Dict, Generator, List, Mapping, Optional, Tuple, Union
import json
from typing import Dict, Generator, List, Literal, Mapping, Optional, Tuple, TypedDict
import click
import httpx
from twitchdl import CLIENT_ID
from twitchdl.entities import (
AccessToken,
Chapter,
Clip,
ClipAccessToken,
ClipsPeriod,
Data,
Video,
VideosSort,
VideosType,
)
from twitchdl.entities import Data
from twitchdl.exceptions import ConsoleError
from twitchdl.utils import format_size
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: List[VideoQuality]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
class GQLError(click.ClickException):
@ -34,22 +87,10 @@ class GQLError(click.ClickException):
super().__init__(message)
Content = Union[str, bytes]
Headers = Dict[str, str]
def authenticated_post(url, data=None, json=None, headers={}):
headers["Client-ID"] = CLIENT_ID
def authenticated_post(
url: str,
*,
json: Any = None,
content: Optional[Content] = None,
auth_token: Optional[str] = None,
):
headers = {"Client-ID": CLIENT_ID}
if auth_token is not None:
headers["authorization"] = f"OAuth {auth_token}"
response = request("POST", url, content=content, json=json, headers=headers)
response = httpx.post(url, data=data, json=json, headers=headers)
if response.status_code == 400:
data = response.json()
raise ConsoleError(data["message"])
@ -59,51 +100,16 @@ def authenticated_post(
return response
def request(
method: str,
url: str,
json: Any = None,
content: Optional[Content] = None,
headers: Optional[Mapping[str, str]] = None,
):
with httpx.Client() as client:
request = client.build_request(method, url, json=json, content=content, headers=headers)
log_request(request)
start = time.time()
response = client.send(request)
duration = time.time() - start
log_response(response, duration)
return response
logger = logging.getLogger(__name__)
def log_request(request: httpx.Request):
logger.info(f"--> {request.method} {request.url}")
if request.content:
logger.debug(f"--> {request.content}")
def log_response(response: httpx.Response, duration_seconds: float):
request = response.request
duration = f"{int(1000 * duration_seconds)}ms"
size = format_size(len(response.content))
logger.info(f"<-- {request.method} {request.url} HTTP {response.status_code} {duration} {size}")
if response.content:
logger.debug(f"<-- {response.content}")
def gql_persisted_query(query: Data):
def gql_post(query: str):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json=query)
response = authenticated_post(url, data=query)
gql_raise_on_error(response)
return response.json()
def gql_query(query: str, auth_token: Optional[str] = None):
def gql_query(query: str, headers: Dict[str, str] = {}):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, auth_token=auth_token)
response = authenticated_post(url, json={"query": query}, headers=headers)
gql_raise_on_error(response)
return response.json()
@ -184,18 +190,22 @@ def get_clip(slug: str) -> Optional[Clip]:
def get_clip_access_token(slug: str) -> ClipAccessToken:
query = {
query = f"""
{{
"operationName": "VideoAccessToken_Clip",
"variables": {"slug": slug},
"extensions": {
"persistedQuery": {
"variables": {{
"slug": "{slug}"
}},
"extensions": {{
"persistedQuery": {{
"version": 1,
"sha256Hash": "36b89d2507fce29e5ca551df756d27c1cfe079e2609642b4390aa4c35796eb11",
}
},
}
"sha256Hash": "36b89d2507fce29e5ca551df756d27c1cfe079e2609642b4390aa4c35796eb11"
}}
}}
}}
"""
response = gql_persisted_query(query)
response = gql_post(query.strip())
return response["data"]["clip"]
@ -267,6 +277,23 @@ def channel_clips_generator(
return _generator(clips, limit)
def channel_clips_generator_old(channel_id: str, period: ClipsPeriod, limit: int):
cursor = ""
while True:
clips = get_channel_clips(channel_id, period, limit, after=cursor)
if not clips["edges"]:
break
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
yield clips, has_next
if not cursor:
break
def get_channel_videos(
channel_id: str,
limit: int,
@ -276,7 +303,6 @@ def get_channel_videos(
after: Optional[str] = None,
):
game_ids = game_ids or []
game_ids_str = f"[{','.join(game_ids)}]"
query = f"""
{{
@ -287,7 +313,7 @@ def get_channel_videos(
sort: {sort.upper()},
after: "{after or ''}",
options: {{
gameIDs: {game_ids_str}
gameIDs: {game_ids}
}}
) {{
totalCount
@ -347,7 +373,7 @@ def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessT
query = f"""
{{
videoPlaybackAccessToken(
id: "{video_id}",
id: {video_id},
params: {{
platform: "web",
playerBackend: "mediaplayer",
@ -360,8 +386,12 @@ def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessT
}}
"""
headers: Mapping[str, str] = {}
if auth_token is not None:
headers["authorization"] = f"OAuth {auth_token}"
try:
response = gql_query(query, auth_token=auth_token)
response = gql_query(query, headers=headers)
return response["data"]["videoPlaybackAccessToken"]
except httpx.HTTPStatusError as error:
# Provide a more useful error message when server returns HTTP 401
@ -392,12 +422,8 @@ def get_playlists(video_id: str, access_token: AccessToken) -> str:
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
"platform": "web",
"supported_codecs": "av1,h265,h264",
"p": random.randint(1000000, 10000000),
},
)
response.raise_for_status()
return response.content.decode("utf-8")
@ -432,7 +458,7 @@ def get_video_chapters(video_id: str) -> List[Chapter]:
},
}
response = gql_persisted_query(query)
response = gql_post(json.dumps(query))
return list(_chapter_nodes(response["data"]["video"]["moments"]))

View File

@ -84,14 +84,13 @@ def titlify(value: str) -> str:
VIDEO_PATTERNS = [
r"^(?P<id>\d+)?$",
r"^https://(www\.|m\.)?twitch\.tv/videos/(?P<id>\d+)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/video/(?P<id>\d+)(\?.+)?$",
r"^https://(www.)?twitch.tv/videos/(?P<id>\d+)(\?.+)?$",
]
CLIP_PATTERNS = [
r"^(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips\.twitch\.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://(www.)?twitch.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips.twitch.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
]

203
video_before.json Normal file
View File

@ -0,0 +1,203 @@
{
"id": "2115833882",
"title": "Games I Feel Like Speedrun Marathon !newyt !Slender !Merch",
"description": null,
"publishedAt": "2024-04-10T05:01:12Z",
"broadcastType": "ARCHIVE",
"lengthSeconds": 30163,
"game": {
"id": "21063",
"name": "Saw"
},
"creator": {
"login": "ecdycis",
"displayName": "Ecdycis"
},
"playlists": [
{
"bandwidth": 6395968,
"resolution": [
1920,
1080
],
"codecs": "avc1.64002A,mp4a.40.2",
"video": "chunked",
"uri": "https://d2nvs31859zcd8.cloudfront.net/3104d817048588f268fa_ecdycis_43996750059_1712725267/chunked/index-muted-GKGIUOE29B.m3u8"
},
{
"bandwidth": 3430341,
"resolution": [
1280,
720
],
"codecs": "avc1.4D0020,mp4a.40.2",
"video": "720p60",
"uri": "https://d2nvs31859zcd8.cloudfront.net/3104d817048588f268fa_ecdycis_43996750059_1712725267/720p60/index-muted-GKGIUOE29B.m3u8"
},
{
"bandwidth": 1448243,
"resolution": [
852,
480
],
"codecs": "avc1.4D001F,mp4a.40.2",
"video": "480p30",
"uri": "https://d2nvs31859zcd8.cloudfront.net/3104d817048588f268fa_ecdycis_43996750059_1712725267/480p30/index-muted-GKGIUOE29B.m3u8"
},
{
"bandwidth": 215544,
"resolution": null,
"codecs": "mp4a.40.2",
"video": "audio_only",
"uri": "https://d2nvs31859zcd8.cloudfront.net/3104d817048588f268fa_ecdycis_43996750059_1712725267/audio_only/index-muted-GKGIUOE29B.m3u8"
},
{
"bandwidth": 709051,
"resolution": [
640,
360
],
"codecs": "avc1.4D001E,mp4a.40.2",
"video": "360p30",
"uri": "https://d2nvs31859zcd8.cloudfront.net/3104d817048588f268fa_ecdycis_43996750059_1712725267/360p30/index-muted-GKGIUOE29B.m3u8"
},
{
"bandwidth": 288844,
"resolution": [
284,
160
],
"codecs": "avc1.4D000C,mp4a.40.2",
"video": "160p30",
"uri": "https://d2nvs31859zcd8.cloudfront.net/3104d817048588f268fa_ecdycis_43996750059_1712725267/160p30/index-muted-GKGIUOE29B.m3u8"
}
],
"chapters": [
{
"id": "6049e803f47a0b9cf64ce95adcf3d96d",
"durationMilliseconds": 4891000,
"positionMilliseconds": 0,
"type": "GAME_CHANGE",
"description": "Saw",
"subDescription": "",
"thumbnailURL": "",
"video": {
"id": "2115833882",
"lengthSeconds": 30163,
"__typename": "Video"
},
"__typename": "VideoMoment",
"game": {
"id": "21063",
"displayName": "Saw",
"boxArtURL": "https://static-cdn.jtvnw.net/ttv-boxart/21063_IGDB-40x53.jpg",
"__typename": "Game"
}
},
{
"id": "c3be4ce5a7bae802d7a6df02baf62a85",
"durationMilliseconds": 6235000,
"positionMilliseconds": 4891000,
"type": "GAME_CHANGE",
"description": "Resident Evil 7: Biohazard",
"subDescription": "",
"thumbnailURL": "",
"video": {
"id": "2115833882",
"lengthSeconds": 30163,
"__typename": "Video"
},
"__typename": "VideoMoment",
"game": {
"id": "492934",
"displayName": "Resident Evil 7: Biohazard",
"boxArtURL": "https://static-cdn.jtvnw.net/ttv-boxart/492934_IGDB-40x53.jpg",
"__typename": "Game"
}
},
{
"id": "bd584f17bc1f91fc11ed14dcec5e4742",
"durationMilliseconds": 3401000,
"positionMilliseconds": 11126000,
"type": "GAME_CHANGE",
"description": "Silent Hill: Homecoming",
"subDescription": "",
"thumbnailURL": "",
"video": {
"id": "2115833882",
"lengthSeconds": 30163,
"__typename": "Video"
},
"__typename": "VideoMoment",
"game": {
"id": "18864",
"displayName": "Silent Hill: Homecoming",
"boxArtURL": "https://static-cdn.jtvnw.net/ttv-boxart/18864_IGDB-40x53.jpg",
"__typename": "Game"
}
},
{
"id": "9f4be8bc7b5bf398213bc602a4b39c4d",
"durationMilliseconds": 3490000,
"positionMilliseconds": 14527000,
"type": "GAME_CHANGE",
"description": "Silent Hill 2",
"subDescription": "",
"thumbnailURL": "",
"video": {
"id": "2115833882",
"lengthSeconds": 30163,
"__typename": "Video"
},
"__typename": "VideoMoment",
"game": {
"id": "9891",
"displayName": "Silent Hill 2",
"boxArtURL": "https://static-cdn.jtvnw.net/ttv-boxart/9891_IGDB-40x53.jpg",
"__typename": "Game"
}
},
{
"id": "7a42d537681decc10660c33f9071a37f",
"durationMilliseconds": 7241000,
"positionMilliseconds": 18017000,
"type": "GAME_CHANGE",
"description": "The Darkness",
"subDescription": "",
"thumbnailURL": "",
"video": {
"id": "2115833882",
"lengthSeconds": 30163,
"__typename": "Video"
},
"__typename": "VideoMoment",
"game": {
"id": "8448",
"displayName": "The Darkness",
"boxArtURL": "https://static-cdn.jtvnw.net/ttv-boxart/8448_IGDB-40x53.jpg",
"__typename": "Game"
}
},
{
"id": "64418186f436d80b1359d2e6686222ef",
"durationMilliseconds": 4905000,
"positionMilliseconds": 25258000,
"type": "GAME_CHANGE",
"description": "Silent Hill 4: The Room",
"subDescription": "",
"thumbnailURL": "",
"video": {
"id": "2115833882",
"lengthSeconds": 30163,
"__typename": "Video"
},
"__typename": "VideoMoment",
"game": {
"id": "5804",
"displayName": "Silent Hill 4: The Room",
"boxArtURL": "https://static-cdn.jtvnw.net/ttv-boxart/5804_IGDB-40x53.jpg",
"__typename": "Game"
}
}
]
}