Compare commits

..

1 Commits

Author SHA1 Message Date
0e8e2e3f40 wip 2024-04-01 10:52:41 +02:00
28 changed files with 558 additions and 1052 deletions

View File

@ -1,27 +0,0 @@
name: Run tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[test]"
- name: Run tests
run: |
pytest
- name: Validate minimum required version
run: |
vermin --no-tips twitchdl

View File

@ -3,47 +3,11 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)
### [2.2.1 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.1)
* Fix compat with Python < 3.10 (#152)
* Fix division by zero in progress calculation when video duration is reported
as 0
### [2.2.0 (2024-04-10)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires Python 3.8+**
* Migrated to Click library for generating the commandline interface
* Add shell auto completion, see 'Shell completion' in docs.
* Add setting defaults via environment variables, see 'Environment variables' in
docs
* Add `download --concat` option to avoid using ffmeg for joinig vods and concat
them instead. This will produce a `.ts` file by default.
* Add `download --dry-run` option to skip actual download (thanks @metacoma)
* Add video description to metadata (#129)
* Add `clips --compact` option for listing clips in one-per-line mode
* **Requires python 3.8 or later**
* Migrated to click lib for cli parsing
* Add shell auto completion
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)

View File

@ -7,7 +7,7 @@ dist:
clean :
find . -name "*pyc" | xargs rm -rf $1
rm -rf build dist book bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
rm -rf build dist bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
bundle:
mkdir bundle
@ -24,7 +24,7 @@ publish :
twine upload dist/*.tar.gz dist/*.whl
coverage:
pytest --cov=twitchdl --cov-report html tests/
py.test --cov=toot --cov-report html tests/
man:
scdoc < twitch-dl.1.scd > twitch-dl.1.man

View File

@ -1,47 +1,12 @@
2.3.1:
date: 2024-05-19
changes:
- "Fix fetching access token (#155, thanks @KryptonicDragon)"
2.3.0:
date: 2024-04-27
changes:
- "Show more playlist data when choosing quality"
- "Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams (#154)"
2.2.4:
date: 2024-04-25
changes:
- "Add m dot url support to video and clip regexes (thanks @localnerve)"
2.2.3:
date: 2024-04-24
changes:
- "Respect --dry-run option when downloading videos"
- "Add automated tests on github actions"
2.2.2:
date: 2024-04-23
changes:
- "Fix more compat issues Python < 3.10 (#152)"
2.2.1:
date: 2024-04-23
changes:
- "Fix compat with Python < 3.10 (#152)"
- "Fix division by zero in progress calculation when video duration is reported as 0"
2.2.0:
date: 2024-04-10
date: TBA
changes:
- "**Requires Python 3.8+**"
- "**Requires python 3.8 or later**"
- "Migrated to Click library for generating the commandline interface"
- "Add shell auto completion, see 'Shell completion' in docs."
- "Add setting defaults via environment variables, see 'Environment variables' in docs"
- "Add shell auto completion, see: https://twitch-dl.bezdomni.net/shell_completion.html"
- "Add setting defaults via environment variables, see: https://twitch-dl.bezdomni.net/environment_variables.html"
- "Add `download --concat` option to avoid using ffmeg for joinig vods and concat them instead. This will produce a `.ts` file by default."
- "Add `download --dry-run` option to skip actual download (thanks @metacoma)"
- "Add video description to metadata (#129)"
- "Add `clips --compact` option for listing clips in one-per-line mode"
2.1.4:
date: 2024-01-06

View File

@ -3,47 +3,11 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)
### [2.2.1 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.1)
* Fix compat with Python < 3.10 (#152)
* Fix division by zero in progress calculation when video duration is reported
as 0
### [2.2.0 (2024-04-10)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires Python 3.8+**
* Migrated to Click library for generating the commandline interface
* Add shell auto completion, see 'Shell completion' in docs.
* Add setting defaults via environment variables, see 'Environment variables' in
docs
* Add `download --concat` option to avoid using ffmeg for joinig vods and concat
them instead. This will produce a `.ts` file by default.
* Add `download --dry-run` option to skip actual download (thanks @metacoma)
* Add video description to metadata (#129)
* Add `clips --compact` option for listing clips in one-per-line mode
* **Requires python 3.8 or later**
* Migrated to click lib for cli parsing
* Add shell auto completion
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)

View File

@ -18,11 +18,6 @@ twitch-dl clips [OPTIONS] CHANNEL_NAME
<td>Fetch all clips, overrides --limit</td>
</tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show clips in compact mode, one line per video</td>
</tr>
<tr>
<td class="code">-d, --download</td>
<td>Download clips in given period (in source quality)</td>
@ -30,7 +25,7 @@ twitch-dl clips [OPTIONS] CHANNEL_NAME
<tr>
<td class="code">-l, --limit INTEGER</td>
<td>Number of clips to fetch. Defaults to 40 in compact mode, 10 otherwise.</td>
<td>Number of clips to fetch [max: 100] [default: <code>10</code>]</td>
</tr>
<tr>

View File

@ -28,7 +28,7 @@ twitch-dl download [OPTIONS] [IDS]...
<tr>
<td class="code">--concat</td>
<td>Do not use ffmpeg to join files, concat them instead. This will produce a .ts file by default.</td>
<td>Do not use ffmpeg to join files, concat them instead</td>
</tr>
<tr>

View File

@ -22,7 +22,8 @@ classifiers = [
dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=3.0.0,<5.0.0",
"m3u8>=1.0.0,<4.0.0",
"python-dateutil>=2.8.0,<3.0.0",
]
[tool.setuptools]
@ -43,11 +44,6 @@ dev = [
"vermin",
]
test = [
"pytest",
"vermin",
]
[project.urls]
"Homepage" = "https://twitch-dl.bezdomni.net/"
"Source" = "https://github.com/ihabunek/twitch-dl"
@ -56,9 +52,8 @@ test = [
twitch-dl = "twitchdl.cli:cli"
[tool.pyright]
include = ["twitchdl"]
typeCheckingMode = "strict"
pythonVersion = "3.8"
[tool.ruff]
line-length = 100
target-version = "py38"

View File

@ -11,10 +11,12 @@ Usage: tag_version [version]
import subprocess
import sys
import textwrap
import yaml
import twitchdl
from datetime import date
from os import path
import yaml
from pkg_resources import get_distribution
path = path.join(path.dirname(path.dirname(path.abspath(__file__))), "changelog.yaml")
with open(path, "r") as f:
@ -31,6 +33,15 @@ if not changelog_item:
print(f"Version `{version}` not found in changelog.", file=sys.stderr)
sys.exit(1)
if twitchdl.__version__ != version:
print(f"twitchdl.__version__ is `{twitchdl.__version__}`, expected {version}.", file=sys.stderr)
sys.exit(1)
dist_version = get_distribution('twitch-dl').version
if dist_version != version:
print(f"Version in setup.py is `{dist_version}`, expected {version}.", file=sys.stderr)
sys.exit(1)
release_date = changelog_item["date"]
changes = changelog_item["changes"]
description = changelog_item["description"] if "description" in changelog_item else None

View File

@ -3,13 +3,9 @@ These tests depend on the channel having some videos and clips published.
"""
import httpx
import pytest
import m3u8
from twitchdl import twitch
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.commands.videos import get_game_ids
from twitchdl.exceptions import ConsoleError
from twitchdl.playlists import enumerate_vods, load_m3u8, parse_playlists
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
TEST_CHANNEL = "bananasaurus_rex"
@ -21,25 +17,22 @@ def test_get_videos():
video_id = videos["edges"][0]["node"]["id"]
video = twitch.get_video(video_id)
assert video is not None
assert video["id"] == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
assert "value" in access_token
playlists_txt = twitch.get_playlists(video_id, access_token)
assert playlists_txt.startswith("#EXTM3U")
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
playlists = parse_playlists(playlists_txt)
playlist_url = playlists[0].url
name, res, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")
playlist_txt = httpx.get(playlist_url).text
assert playlist_txt.startswith("#EXTM3U")
playlist_m3u8 = load_m3u8(playlist_txt)
vods = enumerate_vods(playlist_m3u8)
assert vods[0].path == "0.ts"
playlist = m3u8.loads(playlist)
vod_path = playlist.segments[0].uri
assert vod_path == "0.ts"
def test_get_clips():
@ -52,19 +45,6 @@ def test_get_clips():
slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(slug)
assert clip is not None
assert clip["slug"] == slug
assert get_clip_authenticated_url(slug, "source")
def test_get_games():
assert get_game_ids([]) == []
assert get_game_ids(["Bioshock"]) == ["15866"]
assert get_game_ids(["Bioshock", "Portal"]) == ["15866", "6187"]
def test_get_games_not_found():
with pytest.raises(ConsoleError) as ex:
get_game_ids(["the game which does not exist"])
assert str(ex.value) == "Game 'the game which does not exist' not found"

View File

@ -1,156 +0,0 @@
import json
import pytest
from click.testing import CliRunner, Result
from twitchdl import cli
@pytest.fixture(scope="session")
def runner():
return CliRunner(mix_stderr=False)
def assert_ok(result: Result):
if result.exit_code != 0:
raise AssertionError(
f"Command failed with exit code {result.exit_code}\nStderr: {result.stderr}"
)
def test_info_video(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595"])
assert_ok(result)
assert "Frost Fatales 2024 Day 1" in result.stdout
assert "frozenflygone playing Tomb Raider" in result.stdout
def test_info_video_json(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595", "--json"])
assert_ok(result)
video = json.loads(result.stdout)
assert video["title"] == "Frost Fatales 2024 Day 1"
assert video["game"] == {"id": "2770", "name": "Tomb Raider"}
assert video["creator"] == {"login": "frozenflygone", "displayName": "frozenflygone"}
def test_info_clip(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank"])
assert_ok(result)
assert "AGDQ Crashes during Bioshock run" in result.stdout
assert "GamesDoneQuick playing BioShock" in result.stdout
def test_info_clip_json(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank", "--json"])
assert_ok(result)
clip = json.loads(result.stdout)
assert clip["slug"] == "PoisedTalentedPuddingChefFrank"
assert clip["title"] == "AGDQ Crashes during Bioshock run"
assert clip["game"] == {"id": "15866", "name": "BioShock"}
assert clip["broadcaster"] == {"displayName": "GamesDoneQuick", "login": "gamesdonequick"}
def test_info_not_found(runner: CliRunner):
result = runner.invoke(cli.info, ["banana"])
assert result.exit_code == 1
assert "Clip banana not found" in result.stderr
result = runner.invoke(cli.info, ["12345"])
assert result.exit_code == 1
assert "Video 12345 not found" in result.stderr
result = runner.invoke(cli.info, [""])
assert result.exit_code == 1
assert "Invalid input" in result.stderr
def test_download_clip(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"PoisedTalentedPuddingChefFrank",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert (
"Found: AGDQ Crashes during Bioshock run by GamesDoneQuick, playing BioShock (30 sec)"
in result.stdout
)
assert (
"Target: 2020-01-10_3099545841_gamesdonequick_agdq_crashes_during_bioshock_run.mp4"
in result.stdout
)
assert "Dry run, clip not downloaded." in result.stdout
def test_download_video(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"2090131595",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert "Found: Frost Fatales 2024 Day 1 by frozenflygone" in result.stdout
assert (
"Output: 2024-03-14_2090131595_frozenflygone_frost_fatales_2024_day_1.mkv" in result.stdout
)
assert "Dry run, video not downloaded." in result.stdout
def test_videos(runner: CliRunner):
result = runner.invoke(cli.videos, ["gamesdonequick", "--json"])
assert_ok(result)
videos = json.loads(result.stdout)
assert videos["count"] == 10
assert videos["totalCount"] > 0
video = videos["videos"][0]
result = runner.invoke(cli.videos, "gamesdonequick")
assert_ok(result)
assert f"Video {video['id']}" in result.stdout
assert video["title"] in result.stdout
result = runner.invoke(cli.videos, ["gamesdonequick", "--compact"])
assert_ok(result)
assert video["id"] in result.stdout
assert video["title"][:60] in result.stdout
def test_videos_channel_not_found(runner: CliRunner):
result = runner.invoke(cli.videos, ["doesnotexisthopefully"])
assert result.exit_code == 1
assert result.stderr.strip() == "Error: Channel doesnotexisthopefully not found"
def test_clips(runner: CliRunner):
result = runner.invoke(cli.clips, ["gamesdonequick", "--json"])
assert_ok(result)
clips = json.loads(result.stdout)
clip = clips[0]
result = runner.invoke(cli.clips, "gamesdonequick")
assert_ok(result)
assert f"Clip {clip['slug']}" in result.stdout
assert clip["title"] in result.stdout
result = runner.invoke(cli.clips, ["gamesdonequick", "--compact"])
assert_ok(result)
assert clip["slug"] in result.stdout
assert clip["title"][:60] in result.stdout

View File

@ -1,38 +1,35 @@
import pytest
from twitchdl.utils import parse_clip_identifier, parse_video_identifier
from twitchdl.utils import parse_video_identifier, parse_clip_identifier
TEST_VIDEO_PATTERNS = [
("702689313", "702689313"),
("702689313", "https://twitch.tv/videos/702689313"),
("702689313", "https://www.twitch.tv/videos/702689313"),
("702689313", "https://m.twitch.tv/videos/702689313"),
]
TEST_CLIP_PATTERNS = {
("AbrasivePlayfulMangoMau5", "AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://clips.twitch.tv/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://www.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://m.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("HungryProudRadicchioDoggo", "HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://clips.twitch.tv/HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://www.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://m.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://www.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://m.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
}
@pytest.mark.parametrize("expected,input", TEST_VIDEO_PATTERNS)
def test_video_patterns(expected: str, input: str):
def test_video_patterns(expected, input):
assert parse_video_identifier(input) == expected
@pytest.mark.parametrize("expected,input", TEST_CLIP_PATTERNS)
def test_clip_patterns(expected: str, input: str):
def test_clip_patterns(expected, input):
assert parse_clip_identifier(input) == expected

View File

@ -23,31 +23,26 @@ def test_downloaded():
assert progress.progress_perc == 0
progress.advance(1, 100)
progress._recalculate()
assert progress.downloaded == 100
assert progress.progress_bytes == 100
assert progress.progress_perc == 11
progress.advance(2, 200)
progress._recalculate()
assert progress.downloaded == 300
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 450
assert progress.progress_bytes == 450
assert progress.progress_perc == 50
progress.advance(1, 50)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 500
assert progress.progress_perc == 55
progress.abort(2)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
@ -57,7 +52,6 @@ def test_downloaded():
progress.advance(1, 150)
progress.advance(2, 300)
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 1100
assert progress.progress_bytes == 900
@ -77,15 +71,12 @@ def test_estimated_total():
assert progress.estimated_total is None
progress.start(1, 12000)
progress._recalculate()
assert progress.estimated_total == 12000 * 3
progress.start(2, 11000)
progress._recalculate()
assert progress.estimated_total == 11500 * 3
progress.start(3, 10000)
progress._recalculate()
assert progress.estimated_total == 11000 * 3

View File

@ -1,10 +1,8 @@
import click
import logging
import platform
import re
import sys
from typing import Optional, Tuple
import click
from twitchdl import __version__
from twitchdl.entities import DownloadOptions
@ -31,13 +29,13 @@ json_option = click.option(
)
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: Optional[int]):
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: int | None):
if value is not None and value <= 0:
raise click.BadParameter("must be greater than 0")
return value
def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> Optional[int]:
def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
if not value:
return None
@ -57,7 +55,7 @@ def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> O
return hours * 3600 + minutes * 60 + seconds
def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> Optional[int]:
def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None:
if not value:
return None
@ -86,14 +84,12 @@ def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> O
def cli(ctx: click.Context, color: bool, debug: bool):
"""twitch-dl - twitch.tv downloader
https://twitch-dl.bezdomni.net/
https://toot.bezdomni.net/
"""
ctx.color = color
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.WARN)
logging.getLogger("httpcore").setLevel(logging.WARN)
logging.basicConfig(level=logging.INFO)
@cli.command()
@ -104,12 +100,6 @@ def cli(ctx: click.Context, color: bool, debug: bool):
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-c",
"--compact",
help="Show clips in compact mode, one line per video",
is_flag=True,
)
@click.option(
"-d",
"--download",
@ -119,8 +109,9 @@ def cli(ctx: click.Context, color: bool, debug: bool):
@click.option(
"-l",
"--limit",
help="Number of clips to fetch. Defaults to 40 in compact mode, 10 otherwise.",
help="Number of clips to fetch [max: 100]",
type=int,
default=10,
callback=validate_positive,
)
@click.option(
@ -143,11 +134,10 @@ def cli(ctx: click.Context, color: bool, debug: bool):
def clips(
channel_name: str,
all: bool,
compact: bool,
download: bool,
json: bool,
limit: Optional[int],
pager: Optional[int],
limit: int,
pager: int | None,
period: ClipsPeriod,
):
"""List or download clips for given CHANNEL_NAME."""
@ -156,7 +146,6 @@ def clips(
clips(
channel_name,
all=all,
compact=compact,
download=download,
json=json,
limit=limit,
@ -257,20 +246,20 @@ def clips(
default=5,
)
def download(
ids: Tuple[str, ...],
auth_token: Optional[str],
chapter: Optional[int],
ids: tuple[str, ...],
auth_token: str | None,
chapter: int | None,
concat: bool,
dry_run: bool,
end: Optional[int],
end: int | None,
format: str,
keep: bool,
no_join: bool,
overwrite: bool,
output: str,
quality: Optional[str],
rate_limit: Optional[int],
start: Optional[int],
quality: str | None,
rate_limit: str | None,
start: int | None,
max_workers: int,
):
"""Download videos or clips.
@ -376,10 +365,10 @@ def videos(
channel_name: str,
all: bool,
compact: bool,
games_tuple: Tuple[str, ...],
games_tuple: tuple[str, ...],
json: bool,
limit: Optional[int],
pager: Optional[int],
limit: int | None,
pager: int | None,
sort: VideosSort,
type: VideosType,
):

View File

@ -1,33 +1,31 @@
from typing import Generator
import click
import re
import sys
from os import path
from typing import Callable, Generator, Optional
import click
from itertools import islice
from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.output import green, print_clip, print_clip_compact, print_json, print_paged, yellow
from twitchdl.twitch import Clip, ClipsPeriod
from twitchdl.entities import Data
from twitchdl.output import green, print_clip, print_json, print_paged, yellow
def clips(
channel_name: str,
*,
all: bool = False,
compact: bool = False,
download: bool = False,
json: bool = False,
limit: Optional[int] = None,
pager: Optional[int] = None,
period: ClipsPeriod = "all_time",
limit: int = 10,
pager: int | None = None,
period: twitch.ClipsPeriod = "all_time",
):
# Set different defaults for limit for compact display
default_limit = 40 if compact else 10
# Ignore --limit if --pager or --all are given
limit = sys.maxsize if all or pager else (limit or default_limit)
limit = sys.maxsize if all or pager else limit
generator = twitch.channel_clips_generator(channel_name, period, limit)
@ -37,15 +35,26 @@ def clips(
if download:
return _download_clips(generator)
print_fn = print_clip_compact if compact else print_clip
if pager:
return print_paged("Clips", generator, print_fn, pager)
return print_paged("Clips", generator, print_clip, pager)
return _print_all(generator, print_fn, all)
return _print_all(generator, all)
def _target_filename(clip: Clip):
def _continue():
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
try:
input()
except KeyboardInterrupt:
return False
return True
def _target_filename(clip: Data):
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
@ -55,19 +64,17 @@ def _target_filename(clip: Clip):
raise ValueError(f"Failed parsing date from: {clip['createdAt']}")
date = "".join(match.groups())
name = "_".join(
[
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
]
)
name = "_".join([
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
])
return f"{name}.{ext}"
def _download_clips(generator: Generator[Clip, None, None]):
def _download_clips(generator: Generator[Data, None, None]):
for clip in generator:
target = _target_filename(clip)
@ -79,17 +86,14 @@ def _download_clips(generator: Generator[Clip, None, None]):
download_file(url, target)
def _print_all(
generator: Generator[Clip, None, None],
print_fn: Callable[[Clip], None],
all: bool,
):
def _print_all(generator: Generator[Data, None, None], all: bool):
for clip in generator:
print_fn(clip)
click.echo()
print_clip(clip)
if not all:
click.secho(
"\nThere may be more clips. "
+ "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
"\nThere may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.",
dim=True
)

View File

@ -1,39 +1,29 @@
import asyncio
import os
import platform
import click
import httpx
import m3u8
import os
import re
import shutil
import subprocess
import tempfile
from os import path
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlencode, urlparse
import click
import httpx
from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode
from twitchdl import twitch, utils
from twitchdl.conversion import from_dict
from twitchdl.download import download_file
from twitchdl.entities import DownloadOptions
from twitchdl.entities import Data, DownloadOptions, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.output import blue, bold, green, print_log, yellow
from twitchdl.playlists import (
enumerate_vods,
load_m3u8,
make_join_playlist,
parse_playlists,
select_playlist,
)
from twitchdl.twitch import Chapter, Clip, ClipAccessToken, Video
from twitchdl.output import blue, bold, dim, green, print_log, yellow
def download(ids: List[str], args: DownloadOptions):
if not ids:
print_log("No IDs to downlad given")
return
def download(ids: list[str], args: DownloadOptions):
for video_id in ids:
download_one(video_id, args)
@ -50,40 +40,73 @@ def download_one(video: str, args: DownloadOptions):
raise ConsoleError(f"Invalid input: {video}")
def _parse_playlists(playlists_m3u8):
playlists = m3u8.loads(playlists_m3u8)
for p in sorted(playlists.playlists, key=lambda p: p.stream_info.resolution is None):
if p.stream_info.resolution:
name = p.media[0].name
description = "x".join(str(r) for r in p.stream_info.resolution)
else:
name = p.media[0].group_id
description = None
yield name, description, p.uri
def _get_playlist_by_name(playlists, quality):
if quality == "source":
_, _, uri = playlists[0]
return uri
for name, _, uri in playlists:
if name == quality:
return uri
available = ", ".join([name for (name, _, _) in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
def _select_playlist_interactive(playlists):
click.echo("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
click.echo(f"{n + 1}) {bold(name)} {dim(f'({resolution})')}")
else:
click.echo(f"{n + 1}) {bold(name)}")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
description = video["description"] or ""
description = video.description or ""
description = description.strip()
command = [
"ffmpeg",
"-i",
playlist_path,
"-c",
"copy",
"-metadata",
f"artist={video['creator']['displayName']}",
"-metadata",
f"title={video['title']}",
"-metadata",
f"description={description}",
"-metadata",
"encoded_by=twitch-dl",
"-i", playlist_path,
"-c", "copy",
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", f"description={description}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel",
"warning",
"-loglevel", "warning",
f"file:{target}",
]
if overwrite:
command.append("-y")
click.secho(f"{' '.join(command)}", dim=True)
click.secho(f"{' '.join(command)}", dim = True)
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: List[str], target: str):
def _concat_vods(vod_paths: list[str], target: str):
tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + vod_paths
@ -93,22 +116,24 @@ def _concat_vods(vod_paths: List[str], target: str):
raise ConsoleError(f"Joining files failed: {result.stderr}")
def get_video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def get_video_placeholders(video: Video, format: str) -> Data:
date = video.published_at.date().isoformat()
time = video.published_at.time().isoformat()
datetime = video.published_at.isoformat().replace("+00:00", "Z")
game = video.game.name if video.game else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"channel": video.creator.display_name,
"channel_login": video.creator.login,
"date": date,
"datetime": video["publishedAt"],
"datetime": datetime,
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"id": video.id,
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
}
@ -122,7 +147,7 @@ def _video_target_filename(video: Video, args: DownloadOptions):
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip: Clip, args: DownloadOptions):
def _clip_target_filename(clip, args: DownloadOptions):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
@ -152,6 +177,26 @@ def _clip_target_filename(clip: Clip, args: DownloadOptions):
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
"""Extract unique VOD paths for download from playlist."""
files = []
vod_start = 0
for segment in playlist.segments:
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition and segment.uri not in files:
files.append(segment.uri)
vod_start = vod_end
return files
def _crete_temp_dir(base_uri: str) -> str:
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
@ -160,8 +205,8 @@ def _crete_temp_dir(base_uri: str) -> str:
return str(temp_dir)
def _get_clip_url(access_token: ClipAccessToken, quality: Optional[str]) -> str:
qualities = access_token["videoQualities"]
def _get_clip_url(clip, quality):
qualities = clip["videoQualities"]
# Quality given as an argument
if quality:
@ -188,7 +233,7 @@ def _get_clip_url(access_token: ClipAccessToken, quality: Optional[str]) -> str:
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug: str, quality: Optional[str]):
def get_clip_authenticated_url(slug, quality):
print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug)
@ -197,12 +242,10 @@ def get_clip_authenticated_url(slug: str, quality: Optional[str]):
url = _get_clip_url(access_token, quality)
query = urlencode(
{
"sig": access_token["playbackAccessToken"]["signature"],
"token": access_token["playbackAccessToken"]["value"],
}
)
query = urlencode({
"sig": access_token["playbackAccessToken"]["signature"],
"token": access_token["playbackAccessToken"]["value"],
})
return f"{url}?{query}"
@ -245,18 +288,19 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
raise ConsoleError("End time must be greater than start time")
print_log("Looking up video...")
video = twitch.get_video(video_id)
response = twitch.get_video(video_id)
if not video:
if not response:
raise ConsoleError(f"Video {video_id} not found")
click.echo(f"Found: {blue(video['title'])} by {yellow(video['creator']['displayName'])}")
video = from_dict(Video, response)
click.echo(f"Found: {blue(video.title)} by {yellow(video.creator.display_name)}")
target = _video_target_filename(video, args)
click.echo(f"Output: {blue(target)}")
if not args.overwrite and path.exists(target):
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
response = click.prompt("File exists. Overwrite? [Y/n]: ", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
args.overwrite = True
@ -268,37 +312,45 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
print_log("Fetching playlists...")
playlists_text = twitch.get_playlists(video_id, access_token)
playlists = parse_playlists(playlists_text)
playlist = select_playlist(playlists, args.quality)
playlists_m3u8 = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8))
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality
else _select_playlist_interactive(playlists))
print_log("Fetching playlist...")
vods_text = http_get(playlist.url)
vods_m3u8 = load_m3u8(vods_text)
vods = enumerate_vods(vods_m3u8, start, end)
response = httpx.get(playlist_uri)
response.raise_for_status()
playlist = m3u8.loads(response.text)
if args.dry_run:
click.echo("Dry run, video not downloaded.")
return
base_uri = re.sub("/[^/]+$", "/", playlist.url)
base_uri = re.sub("/[^/]+$", "/", playlist_uri)
target_dir = _crete_temp_dir(base_uri)
vod_paths = _get_vod_paths(playlist, start, end)
# Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
f.write(playlists_text)
f.write(playlists_m3u8)
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(vods_text)
f.write(response.text)
click.echo(f"\nDownloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + vod.path for vod in vods]
targets = [os.path.join(target_dir, f"{vod.index:05d}.ts") for vod in vods]
click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
join_playlist = make_join_playlist(vods_m3u8, vods, targets)
join_playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
join_playlist.dump(join_playlist_path) # type: ignore
# Make a modified playlist which references downloaded VODs
# Keep only the downloaded segments and skip the rest
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip(vod_paths, targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path)
click.echo()
if args.no_join:
@ -311,7 +363,7 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
_concat_vods(targets, target)
else:
print_log("Joining files...")
_join_vods(join_playlist_path, target, args.overwrite, video)
_join_vods(playlist_path, target, args.overwrite, video)
click.echo()
@ -324,13 +376,7 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
click.echo(f"\nDownloaded: {green(target)}")
def http_get(url: str) -> str:
response = httpx.get(url)
response.raise_for_status()
return response.text
def _determine_time_range(video_id: str, args: DownloadOptions):
def _determine_time_range(video_id, args: DownloadOptions):
if args.start or args.end:
return args.start, args.end
@ -347,9 +393,7 @@ def _determine_time_range(video_id: str, args: DownloadOptions):
try:
chapter = chapters[args.chapter - 1]
except IndexError:
raise ConsoleError(
f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters."
)
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000
@ -359,7 +403,7 @@ def _determine_time_range(video_id: str, args: DownloadOptions):
return None, None
def _choose_chapter_interactive(chapters: List[Chapter]):
def _choose_chapter_interactive(chapters):
click.echo("\nChapters:")
for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)

View File

@ -1,23 +1,21 @@
from typing import List
import click
import m3u8
from twitchdl import twitch, utils
from twitchdl import utils, twitch
from twitchdl.commands.download import get_video_placeholders
from twitchdl.conversion import from_dict
from twitchdl.entities import Data, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video
from twitchdl.playlists import parse_playlists
from twitchdl.twitch import Chapter, Clip, Video
from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log
def info(id: str, *, json: bool = False):
video_id = utils.parse_video_identifier(id)
if video_id:
print_log("Fetching video...")
video = twitch.get_video(video_id)
response = twitch.get_video(video_id)
if not video:
if not response:
raise ConsoleError(f"Video {video_id} not found")
print_log("Fetching access token...")
@ -30,8 +28,9 @@ def info(id: str, *, json: bool = False):
chapters = twitch.get_video_chapters(video_id)
if json:
video_json(video, playlists, chapters)
video_json(response, playlists, chapters)
else:
video = from_dict(Video, response)
video_info(video, playlists, chapters)
return
@ -51,13 +50,13 @@ def info(id: str, *, json: bool = False):
raise ConsoleError(f"Invalid input: {id}")
def video_info(video: Video, playlists: str, chapters: List[Chapter]):
def video_info(video: Video, playlists, chapters):
click.echo()
print_video(video)
click.echo("Playlists:")
for p in parse_playlists(playlists):
click.echo(f"{bold(p.name)} {p.url}")
for p in m3u8.loads(playlists).playlists:
click.echo(f"{bold(p.stream_info.video)} {p.uri}")
if chapters:
click.echo()
@ -67,13 +66,13 @@ def video_info(video: Video, playlists: str, chapters: List[Chapter]):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = get_video_placeholders(video, format="mkv")
placeholders = get_video_placeholders(video, format = "mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
def video_json(video: Video, playlists: str, chapters: List[Chapter]):
def video_json(video, playlists, chapters):
playlists = m3u8.loads(playlists).playlists
video["playlists"] = [
@ -82,9 +81,8 @@ def video_json(video: Video, playlists: str, chapters: List[Chapter]):
"resolution": p.stream_info.resolution,
"codecs": p.stream_info.codecs,
"video": p.stream_info.video,
"uri": p.uri,
}
for p in playlists
"uri": p.uri
} for p in playlists
]
video["chapters"] = chapters
@ -92,7 +90,7 @@ def video_json(video: Video, playlists: str, chapters: List[Chapter]):
print_json(video)
def clip_info(clip: Clip):
def clip_info(clip: Data):
click.echo()
print_clip(clip)
click.echo()

View File

@ -1,11 +1,10 @@
import sys
from typing import List, Optional
import click
from twitchdl import twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_json, print_log, print_paged, print_video, print_video_compact
from twitchdl.output import print_log, print_paged, print_video, print_json, print_video_compact
def videos(
@ -13,14 +12,14 @@ def videos(
*,
all: bool,
compact: bool,
games: List[str],
games: list[str],
json: bool,
limit: Optional[int],
pager: Optional[int],
limit: int | None,
pager: int | None,
sort: twitch.VideosSort,
type: twitch.VideosType,
):
game_ids = get_game_ids(games)
game_ids = _get_game_ids(games)
# Set different defaults for limit for compact display
limit = limit or (40 if compact else 10)
@ -29,12 +28,15 @@ def videos(
max_videos = sys.maxsize if all or pager else limit
total_count, generator = twitch.channel_videos_generator(
channel_name, max_videos, sort, type, game_ids=game_ids
)
channel_name, max_videos, sort, type, game_ids=game_ids)
if json:
videos = list(generator)
print_json({"count": len(videos), "totalCount": total_count, "videos": videos})
print_json({
"count": len(videos),
"totalCount": total_count,
"videos": videos
})
return
if total_count == 0:
@ -61,19 +63,21 @@ def videos(
if total_count > count:
click.secho(
"\nThere are more videos. "
+ "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
"\nThere are more videos. Increase the --limit, use --all or --pager to see the rest.",
dim=True
)
def get_game_ids(names: List[str]) -> List[str]:
return [get_game_id(name) for name in names]
def _get_game_ids(names: list[str]) -> list[str]:
if not names:
return []
game_ids = []
for name in names:
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game_id))
def get_game_id(name: str) -> str:
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
return game_id
return game_ids

87
twitchdl/conversion.py Normal file
View File

@ -0,0 +1,87 @@
import re
import dataclasses
from dataclasses import Field, is_dataclass
from datetime import date, datetime
from dateutil import parser
from typing import Any, Generator, Type, TypeVar, Union, get_args, get_origin, Callable
from typing import get_type_hints
# Generic data class instance
T = TypeVar("T")
# Dict of data decoded from JSON
Data = dict[str, Any]
def snake_to_camel(name: str):
def repl(match: re.Match[str]):
return match.group(1).upper()
return re.sub(r"_([a-z])", repl, name)
def from_dict(cls: Type[T], data: Data, key_fn: Callable[[str], str] = snake_to_camel) -> T:
"""Convert a nested dict into an instance of `cls`."""
def _fields() -> Generator[tuple[str, Any], None, None]:
hints = get_type_hints(cls)
for field in dataclasses.fields(cls):
field_type = _prune_optional(hints[field.name])
dict_field_name = key_fn(field.name)
if (value := data.get(dict_field_name)) is not None:
field_value = _convert(field_type, value)
else:
field_value = _get_default_value(field)
yield field.name, field_value
return cls(**dict(_fields()))
def from_dict_list(cls: Type[T], data: list[Data]) -> list[T]:
return [from_dict(cls, x) for x in data]
def _get_default_value(field: Field[Any]):
if field.default is not dataclasses.MISSING:
return field.default
if field.default_factory is not dataclasses.MISSING:
return field.default_factory()
return None
def _convert(field_type: Type[Any], value: Any) -> Any:
if value is None:
return None
if field_type in [str, int, bool, dict]:
return value
if field_type == datetime:
return parser.parse(value)
if field_type == date:
return date.fromisoformat(value)
if get_origin(field_type) == list:
(inner_type,) = get_args(field_type)
return [_convert(inner_type, x) for x in value]
if is_dataclass(field_type):
return from_dict(field_type, value)
raise ValueError(f"Not implemented for type '{field_type}'")
def _prune_optional(field_type: Type[Any]):
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
if get_origin(field_type) == Union:
args = get_args(field_type)
if len(args) == 2 and args[1] == type(None): # noqa
return args[0]
return field_type

View File

@ -1,5 +1,4 @@
import os
import httpx
from twitchdl.exceptions import ConsoleError

View File

@ -1,25 +1,53 @@
from dataclasses import dataclass
from typing import Any, Mapping, Optional
from datetime import datetime
from typing import Any
@dataclass
class DownloadOptions:
auth_token: Optional[str]
chapter: Optional[int]
auth_token: str | None
chapter: int | None
concat: bool
dry_run: bool
end: Optional[int]
end: int | None
format: str
keep: bool
no_join: bool
overwrite: bool
output: str
quality: Optional[str]
rate_limit: Optional[int]
start: Optional[int]
quality: str | None
rate_limit: str | None
start: int | None
max_workers: int
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = Mapping[str, Any]
Data = dict[str, Any]
@dataclass
class User:
login: str
display_name: str
@dataclass
class Game:
name: str
@dataclass
class Video:
id: str
title: str
description: str
published_at: datetime
broadcast_type: str
length_seconds: int
game: Game
creator: User
@dataclass
class AccessToken:
signature: str
value: str

View File

@ -1,7 +1,5 @@
import click
class ConsoleError(click.ClickException):
"""Raised when an error occurs and script exectuion should halt."""
pass

View File

@ -1,12 +1,12 @@
import asyncio
import httpx
import logging
import os
import time
from abc import ABC, abstractmethod
from typing import List, Optional
import httpx
from twitchdl.progress import Progress
logger = logging.getLogger(__name__)
@ -62,7 +62,6 @@ class LimitingTokenBucket(TokenBucket):
class EndlessTokenBucket(TokenBucket):
"""Used when download speed is not limited."""
def advance(self, size: int):
pass
@ -123,22 +122,12 @@ async def download_all(
targets: List[str],
workers: int,
*,
rate_limit: Optional[int] = None,
rate_limit: Optional[int] = None
):
progress = Progress(len(sources))
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
tasks = [
download_with_retries(
client,
semaphore,
task_id,
source,
target,
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(zip(sources, targets))
]
tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket)
for task_id, (source, target) in enumerate(zip(sources, targets))]
await asyncio.gather(*tasks)

View File

@ -1,18 +1,18 @@
import json
from itertools import islice
from typing import Any, Callable, Generator, List, Optional, TypeVar
import click
import json
from itertools import islice
from twitchdl import utils
from twitchdl.twitch import Clip, Video
from typing import Any, Callable, Generator, TypeVar
from twitchdl.entities import Data, Video
T = TypeVar("T")
def truncate(string: str, length: int) -> str:
if len(string) > length:
return string[: length - 1] + ""
return string[:length - 1] + ""
return string
@ -25,24 +25,15 @@ def print_log(message: Any):
click.secho(message, err=True, dim=True)
def visual_len(text: str):
return len(click.unstyle(text))
def ljust(text: str, width: int):
diff = width - visual_len(text)
return text + (" " * diff) if diff > 0 else text
def print_table(headers: List[str], data: List[List[str]]):
widths = [[visual_len(cell) for cell in row] for row in data + [headers]]
def print_table(headers: list[str], data: list[list[str]]):
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
underlines = ["-" * width for width in widths]
def print_row(row: List[str]):
def print_row(row: list[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(ljust(cell, width), nl=False)
click.echo(cell.ljust(width), nl=False)
click.echo(" ", nl=False)
click.echo()
@ -58,7 +49,7 @@ def print_paged(
generator: Generator[T, Any, Any],
print_fn: Callable[[T], None],
page_size: int,
total_count: Optional[int] = None,
total_count: int | None = None,
):
iterator = iter(generator)
page = list(islice(iterator, page_size))
@ -86,31 +77,32 @@ def print_paged(
break
def print_video(video: Video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = blue(video["creator"]["displayName"]) if video["creator"] else ""
playing = f"playing {blue(video['game']['name'])}" if video["game"] else ""
def print_video(video: Video):
published_at = str(video.published_at.astimezone())
length = utils.format_duration(video.length_seconds)
channel = blue(video.creator.display_name) if video.creator else ""
playing = f"playing {blue(video.game.name)}" if video.game else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video['id']}"
url = f"https://www.twitch.tv/videos/{video.id}"
click.secho(f"Video {video['id']}", bold=True)
click.secho(video["title"], fg="green")
click.secho(f"Video {video.id}", bold=True)
click.secho(f"{video.title}", fg="green")
if channel or playing:
click.echo(" ".join([channel, playing]))
if video["description"]:
click.echo(f"Description: {video['description']}")
if video.description:
click.echo(f"Description: {video.description}")
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)
click.echo()
def print_video_compact(video: Video):
def print_video_compact(video: Data):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
@ -118,7 +110,7 @@ def print_video_compact(video: Video):
click.echo(f"{bold(id)} {date} {green(title)} {blue(game)}")
def print_clip(clip: Clip):
def print_clip(clip: Data):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
@ -128,22 +120,11 @@ def print_clip(clip: Clip):
click.secho(clip["title"], fg="green")
click.echo(f"{blue(channel)} {playing}")
click.echo(
f"Published {blue(published_at)}"
+ f" Length: {blue(length)}"
+ f" Views: {blue(clip['viewCount'])}"
f"Published {blue(published_at)}" +
f" Length: {blue(length)}" +
f" Views: {blue(clip['viewCount'])}"
)
click.secho(clip["url"], italic=True)
click.echo()
def print_clip_compact(clip: Clip):
slug = clip["slug"]
date = clip["createdAt"][:10]
title = truncate(clip["title"], 50).ljust(50)
game = clip["game"]["name"] if clip["game"] else ""
game = truncate(game, 30).ljust(30)
click.echo(f"{date} {green(title)} {blue(game)} {bold(slug)}")
def prompt_continue():
@ -161,7 +142,6 @@ def prompt_continue():
# Shorthand functions for coloring output
def blue(text: Any) -> str:
return click.style(text, fg="blue")

View File

@ -1,170 +0,0 @@
"""
Parse and manipulate m3u8 playlists.
"""
from dataclasses import dataclass
from typing import Generator, List, Optional, OrderedDict
import click
import m3u8
from twitchdl import utils
from twitchdl.output import bold, dim, print_table
@dataclass
class Playlist:
name: str
group_id: str
resolution: Optional[str]
url: str
is_source: bool
@dataclass
class Vod:
index: int
"""Ordinal number of the VOD in the playlist"""
path: str
"""Path part of the VOD URL"""
duration: int
"""Segment duration in seconds"""
def parse_playlists(playlists_m3u8: str) -> List[Playlist]:
def _parse(source: str) -> Generator[Playlist, None, None]:
document = load_m3u8(source)
for p in document.playlists:
resolution = (
"x".join(str(r) for r in p.stream_info.resolution)
if p.stream_info.resolution
else None
)
media = p.media[0]
is_source = media.group_id == "chunked"
yield Playlist(media.name, media.group_id, resolution, p.uri, is_source)
return list(_parse(playlists_m3u8))
def load_m3u8(playlist_m3u8: str) -> m3u8.M3U8:
return m3u8.loads(playlist_m3u8)
def enumerate_vods(
document: m3u8.M3U8,
start: Optional[int] = None,
end: Optional[int] = None,
) -> List[Vod]:
"""Extract VODs for download from document."""
vods = []
vod_start = 0
for index, segment in enumerate(document.segments):
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition:
vods.append(Vod(index, segment.uri, segment.duration))
vod_start = vod_end
return vods
def make_join_playlist(
playlist: m3u8.M3U8,
vods: List[Vod],
targets: List[str],
) -> m3u8.Playlist:
"""
Make a modified playlist which references downloaded VODs
Keep only the downloaded segments and skip the rest
"""
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip([v.path for v in vods], targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
return playlist
def select_playlist(playlists: List[Playlist], quality: Optional[str]) -> Playlist:
return (
select_playlist_by_name(playlists, quality)
if quality is not None
else select_playlist_interactive(playlists)
)
def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist:
if quality == "source":
for playlist in playlists:
if playlist.is_source:
return playlist
raise click.ClickException("Source quality not found, please report an issue on github.")
for playlist in playlists:
if playlist.name == quality or playlist.group_id == quality:
return playlist
available = ", ".join([p.name for p in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise click.ClickException(msg)
def select_playlist_interactive(playlists: List[Playlist]) -> Playlist:
playlists = sorted(playlists, key=_playlist_key)
headers = ["#", "Name", "Group ID", "Resolution"]
rows = [
[
f"{n + 1})",
bold(playlist.name),
dim(playlist.group_id),
dim(playlist.resolution or ""),
]
for n, playlist in enumerate(playlists)
]
click.echo()
print_table(headers, rows)
default = 1
for index, playlist in enumerate(playlists):
if playlist.is_source:
default = index + 1
no = utils.read_int("\nChoose quality", min=1, max=len(playlists) + 1, default=default)
playlist = playlists[no - 1]
return playlist
MAX = 1_000_000
def _playlist_key(playlist: Playlist) -> int:
"""Attempt to sort playlists so that source quality is on top, audio only
is on bottom and others are sorted descending by resolution."""
if playlist.is_source:
return 0
if playlist.group_id == "audio_only":
return MAX
try:
return MAX - int(playlist.name.split("p")[0])
except Exception:
pass
return MAX

View File

@ -1,11 +1,11 @@
import click
import logging
import time
from collections import deque
from dataclasses import dataclass
from statistics import mean
from typing import Deque, Dict, NamedTuple, Optional
import click
from collections import deque
from dataclasses import dataclass, field
from statistics import mean
from typing import Dict, NamedTuple, Optional, Deque
from twitchdl.output import blue
from twitchdl.utils import format_size, format_time
@ -31,25 +31,28 @@ class Sample(NamedTuple):
timestamp: float
@dataclass
class Progress:
def __init__(self, vod_count: int):
self.downloaded: int = 0
self.estimated_total: Optional[int] = None
self.last_printed: Optional[float] = None
self.progress_bytes: int = 0
self.progress_perc: int = 0
self.remaining_time: Optional[int] = None
self.samples: Deque[Sample] = deque(maxlen=1000)
self.speed: Optional[float] = None
self.tasks: Dict[TaskId, Task] = {}
self.vod_count = vod_count
self.vod_downloaded_count: int = 0
vod_count: int
downloaded: int = 0
estimated_total: Optional[int] = None
last_printed: float = field(default_factory=time.time)
progress_bytes: int = 0
progress_perc: int = 0
remaining_time: Optional[int] = None
speed: Optional[float] = None
start_time: float = field(default_factory=time.time)
tasks: Dict[TaskId, Task] = field(default_factory=dict)
vod_downloaded_count: int = 0
samples: Deque[Sample] = field(default_factory=lambda: deque(maxlen=100))
def start(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot start, already started")
self.tasks[task_id] = Task(task_id, size)
self._calculate_total()
self._calculate_progress()
self.print()
def advance(self, task_id: int, size: int):
@ -60,6 +63,7 @@ class Progress:
self.progress_bytes += size
self.tasks[task_id].advance(size)
self.samples.append(Sample(self.downloaded, time.time()))
self._calculate_progress()
self.print()
def already_downloaded(self, task_id: int, size: int):
@ -69,6 +73,8 @@ class Progress:
self.tasks[task_id] = Task(task_id, size)
self.progress_bytes += size
self.vod_downloaded_count += 1
self._calculate_total()
self._calculate_progress()
self.print()
def abort(self, task_id: int):
@ -77,6 +83,9 @@ class Progress:
del self.tasks[task_id]
self.progress_bytes = sum(t.downloaded for t in self.tasks.values())
self._calculate_total()
self._calculate_progress()
self.print()
def end(self, task_id: int):
@ -85,26 +94,18 @@ class Progress:
task = self.tasks[task_id]
if task.size != task.downloaded:
logger.warn(
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
logger.warn(f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b.")
self.vod_downloaded_count += 1
self.print()
def _recalculate(self):
self.estimated_total = (
int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
)
def _calculate_total(self):
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
def _calculate_progress(self):
self.speed = self._calculate_speed()
self.progress_perc = (
int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
)
self.remaining_time = (
int((self.estimated_total - self.progress_bytes) / self.speed)
if self.estimated_total and self.speed
else None
)
self.progress_perc = int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
self.remaining_time = int((self.estimated_total - self.progress_bytes) / self.speed) if self.estimated_total and self.speed else None
def _calculate_speed(self):
if len(self.samples) < 2:
@ -116,31 +117,22 @@ class Progress:
size = last_sample.downloaded - first_sample.downloaded
duration = last_sample.timestamp - first_sample.timestamp
return size / duration if duration > 0 else None
return size / duration
def print(self):
now = time.time()
# Don't print more often than 10 times per second
if self.last_printed and now - self.last_printed < 0.1:
if now - self.last_printed < 0.1:
return
self._recalculate()
click.echo(f"\rDownloaded {self.vod_downloaded_count}/{self.vod_count} VODs", nl=False)
click.secho(f" {self.progress_perc}%", fg="blue", nl=False)
if self.estimated_total is not None:
total = f"~{format_size(self.estimated_total)}"
click.echo(f" of {blue(total)}", nl=False)
if self.speed is not None:
speed = f"{format_size(self.speed)}/s"
click.echo(f" at {blue(speed)}", nl=False)
if self.remaining_time is not None:
click.echo(f" ETA {blue(format_time(self.remaining_time))}", nl=False)
click.echo(" ", nl=False)
progress = " ".join([
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs",
blue(self.progress_perc),
f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "",
f"at {blue(format_size(self.speed))}/s" if self.speed else "",
f"ETA {blue(format_time(self.remaining_time))}" if self.remaining_time is not None else "",
])
click.echo(f"\r{progress} ", nl=False)
self.last_printed = now

View File

@ -2,109 +2,33 @@
Twitch API access.
"""
import json
import logging
import time
from typing import Any, Dict, Generator, List, Literal, Mapping, Optional, Tuple, TypedDict, Union
import click
import httpx
import json
import click
from typing import Dict, Generator, Literal
from twitchdl import CLIENT_ID
from twitchdl.entities import Data
from twitchdl.entities import AccessToken, Data
from twitchdl.exceptions import ConsoleError
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: List[VideoQuality]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
class GQLError(click.ClickException):
def __init__(self, errors: List[str]):
def __init__(self, errors: list[str]):
message = "GraphQL query failed."
for error in errors:
message += f"\n* {error}"
super().__init__(message)
Content = Union[str, bytes]
Headers = Dict[str, str]
def authenticated_post(url, data=None, json=None, headers={}):
headers['Client-ID'] = CLIENT_ID
def authenticated_post(
url: str,
*,
json: Any = None,
content: Optional[Content] = None,
auth_token: Optional[str] = None,
):
headers = {"Client-ID": CLIENT_ID}
if auth_token is not None:
headers["authorization"] = f"OAuth {auth_token}"
response = request("POST", url, content=content, json=json, headers=headers)
response = httpx.post(url, data=data, json=json, headers=headers)
if response.status_code == 400:
data = response.json()
raise ConsoleError(data["message"])
@ -114,50 +38,16 @@ def authenticated_post(
return response
def request(
method: str,
url: str,
json: Any = None,
content: Optional[Content] = None,
headers: Optional[Mapping[str, str]] = None,
):
with httpx.Client() as client:
request = client.build_request(method, url, json=json, content=content, headers=headers)
log_request(request)
start = time.time()
response = client.send(request)
duration = time.time() - start
log_response(response, duration)
return response
logger = logging.getLogger(__name__)
def log_request(request: httpx.Request):
logger.debug(f"--> {request.method} {request.url}")
if request.content:
logger.debug(f"--> {request.content}")
def log_response(response: httpx.Response, duration: float):
request = response.request
duration_ms = int(1000 * duration)
logger.debug(f"<-- {request.method} {request.url} HTTP {response.status_code} {duration_ms}ms")
if response.content:
logger.debug(f"<-- {response.content}")
def gql_post(query: str):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, content=query)
response = authenticated_post(url, data=query)
gql_raise_on_error(response)
return response.json()
def gql_query(query: str, auth_token: Optional[str] = None):
def gql_query(query: str, headers: Dict[str, str] = {}):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, auth_token=auth_token)
response = authenticated_post(url, json={"query": query}, headers=headers)
gql_raise_on_error(response)
return response.json()
@ -177,7 +67,6 @@ VIDEO_FIELDS = """
broadcastType
lengthSeconds
game {
id
name
}
creator {
@ -211,7 +100,7 @@ CLIP_FIELDS = """
"""
def get_video(video_id: str) -> Optional[Video]:
def get_video(video_id: str):
query = f"""
{{
video(id: "{video_id}") {{
@ -224,7 +113,7 @@ def get_video(video_id: str) -> Optional[Video]:
return response["data"]["video"]
def get_clip(slug: str) -> Optional[Clip]:
def get_clip(slug: str):
query = f"""
{{
clip(slug: "{slug}") {{
@ -237,7 +126,7 @@ def get_clip(slug: str) -> Optional[Clip]:
return response["data"]["clip"]
def get_clip_access_token(slug: str) -> ClipAccessToken:
def get_clip_access_token(slug: str):
query = f"""
{{
"operationName": "VideoAccessToken_Clip",
@ -257,12 +146,7 @@ def get_clip_access_token(slug: str) -> ClipAccessToken:
return response["data"]["clip"]
def get_channel_clips(
channel_id: str,
period: ClipsPeriod,
limit: int,
after: Optional[str] = None,
):
def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None= None):
"""
List channel clips.
@ -299,12 +183,8 @@ def get_channel_clips(
return response["data"]["user"]["clips"]
def channel_clips_generator(
channel_id: str,
period: ClipsPeriod,
limit: int,
) -> Generator[Clip, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Clip, None, None]:
def channel_clips_generator(channel_id: str, period: str, limit: int) -> Generator[Data, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Data, None, None]:
for clip in clips["edges"]:
if limit < 1:
return
@ -325,10 +205,11 @@ def channel_clips_generator(
return _generator(clips, limit)
def channel_clips_generator_old(channel_id: str, period: ClipsPeriod, limit: int):
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
clips = get_channel_clips(channel_id, period, limit, after=cursor)
clips = get_channel_clips(
channel_id, period, limit, after=cursor)
if not clips["edges"]:
break
@ -347,11 +228,10 @@ def get_channel_videos(
limit: int,
sort: str,
type: str = "archive",
game_ids: Optional[List[str]] = None,
after: Optional[str] = None,
game_ids: list[str] | None = None,
after: str | None = None
):
game_ids = game_ids or []
game_ids_str = f"[{','.join(game_ids)}]"
query = f"""
{{
@ -362,7 +242,7 @@ def get_channel_videos(
sort: {sort.upper()},
after: "{after or ''}",
options: {{
gameIDs: {game_ids_str}
gameIDs: {game_ids}
}}
) {{
totalCount
@ -393,11 +273,11 @@ def channel_videos_generator(
max_videos: int,
sort: VideosSort,
type: VideosType,
game_ids: Optional[List[str]] = None,
) -> Tuple[int, Generator[Video, None, None]]:
game_ids: list[str] | None = None
) -> tuple[int, Generator[Data, None, None]]:
game_ids = game_ids or []
def _generator(videos: Data, max_videos: int) -> Generator[Video, None, None]:
def _generator(videos: Data, max_videos: int) -> Generator[Data, None, None]:
for video in videos["edges"]:
if max_videos < 1:
return
@ -418,11 +298,11 @@ def channel_videos_generator(
return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessToken:
def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToken:
query = f"""
{{
videoPlaybackAccessToken(
id: "{video_id}",
id: {video_id},
params: {{
platform: "web",
playerBackend: "mediaplayer",
@ -435,9 +315,17 @@ def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessT
}}
"""
headers = {}
if auth_token is not None:
headers['authorization'] = f'OAuth {auth_token}'
try:
response = gql_query(query, auth_token=auth_token)
return response["data"]["videoPlaybackAccessToken"]
response = gql_query(query, headers=headers)
return AccessToken(
response["data"]["videoPlaybackAccessToken"]["signature"],
response["data"]["videoPlaybackAccessToken"]["value"],
)
except httpx.HTTPStatusError as error:
# Provide a more useful error message when server returns HTTP 401
# Unauthorized while using a user-provided auth token.
@ -453,27 +341,24 @@ def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessT
raise
def get_playlists(video_id: str, access_token: AccessToken) -> str:
def get_playlists(video_id: str, access_token: AccessToken):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(
url,
params={
"nauth": access_token["value"],
"nauthsig": access_token["signature"],
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
},
)
response = httpx.get(url, params={
"nauth": access_token.value,
"nauthsig": access_token.signature,
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
})
response.raise_for_status()
return response.content.decode("utf-8")
return response.content.decode('utf-8')
def get_game_id(name: str):
def get_game_id(name):
query = f"""
{{
game(name: "{name.strip()}") {{
@ -488,29 +373,30 @@ def get_game_id(name: str):
return game["id"]
def get_video_chapters(video_id: str) -> List[Chapter]:
def get_video_chapters(video_id: str):
query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables": {
"variables":
{
"includePrivate": False,
"videoID": video_id,
"videoID": video_id
},
"extensions": {
"persistedQuery": {
"extensions":
{
"persistedQuery":
{
"version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41",
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41"
}
},
}
}
response = gql_post(json.dumps(query))
return list(_chapter_nodes(response["data"]["video"]["moments"]))
def _chapter_nodes(moments: Data) -> Generator[Chapter, None, None]:
for edge in moments["edges"]:
def _chapter_nodes(collection):
for edge in collection["edges"]:
node = edge["node"]
node["game"] = node["details"]["game"]
del node["details"]
del node["moments"]
yield node

View File

@ -1,6 +1,5 @@
import re
import unicodedata
from typing import Optional, Union
import click
@ -12,7 +11,7 @@ def _format_size(value: float, digits: int, unit: str):
return f"{int(value)}{unit}"
def format_size(bytes_: Union[int, float], digits: int = 1):
def format_size(bytes_: int | float, digits: int = 1):
if bytes_ < 1024:
return _format_size(bytes_, digits, "B")
@ -27,7 +26,7 @@ def format_size(bytes_: Union[int, float], digits: int = 1):
return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds: Union[int, float]) -> str:
def format_duration(total_seconds: int | float) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -43,7 +42,7 @@ def format_duration(total_seconds: Union[int, float]) -> str:
return f"{seconds} sec"
def format_time(total_seconds: Union[int, float], force_hours: bool = False) -> str:
def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -56,7 +55,7 @@ def format_time(total_seconds: Union[int, float], force_hours: bool = False) ->
return f"{minutes:02}:{seconds:02}"
def read_int(msg: str, min: int, max: int, default: Optional[int] = None) -> int:
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
while True:
try:
val = click.prompt(msg, default=default, type=int)
@ -69,32 +68,32 @@ def read_int(msg: str, min: int, max: int, default: Optional[int] = None) -> int
def slugify(value: str) -> str:
value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r"[^\w\s_-]", "", value)
value = re.sub(r"[\s_-]+", "_", value)
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s_-]', '', value)
value = re.sub(r'[\s_-]+', '_', value)
return value.strip("_").lower()
def titlify(value: str) -> str:
value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r"[^\w\s\[\]().-]", "", value)
value = re.sub(r"\s+", " ", value)
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value)
value = re.sub(r'\s+', ' ', value)
return value.strip()
VIDEO_PATTERNS = [
r"^(?P<id>\d+)?$",
r"^https://(www\.|m\.)?twitch\.tv/videos/(?P<id>\d+)(\?.+)?$",
r"^https://(www.)?twitch.tv/videos/(?P<id>\d+)(\?.+)?$",
]
CLIP_PATTERNS = [
r"^(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips\.twitch\.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://(www.)?twitch.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips.twitch.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
]
def parse_video_identifier(identifier: str) -> Optional[str]:
def parse_video_identifier(identifier: str) -> str | None:
"""Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier)
@ -102,7 +101,7 @@ def parse_video_identifier(identifier: str) -> Optional[str]:
return match.group("id")
def parse_clip_identifier(identifier: str) -> Optional[str]:
def parse_clip_identifier(identifier: str) -> str | None:
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier)