Compare commits

..

47 Commits
foo ... 2.2.4

Author SHA1 Message Date
125bc693f8 Update changelog 2024-04-25 07:33:05 +02:00
8a7fdad22f Test m patterns 2024-04-25 07:31:52 +02:00
c00a9c3597 Add support for m dot urls 2024-04-25 07:29:59 +02:00
0f17d92a8c Update changelog 2024-04-24 14:54:31 +02:00
ee1e0ce853 Upgrade actions/setup-python to v5 2024-04-24 14:52:42 +02:00
1c878bbca8 Update changelog 2024-04-24 14:51:08 +02:00
941440de41 Upgrade actions/checkout to v4 2024-04-24 14:50:50 +02:00
c0eae623a4 Test clips 2024-04-24 14:45:49 +02:00
f4d0643b07 Fix flaky test 2024-04-24 14:41:48 +02:00
ea4b714343 Test videos command 2024-04-24 14:39:17 +02:00
f815934e15 Simplify and test get_game_ids 2024-04-24 14:39:12 +02:00
5aa323e3e5 Log response content 2024-04-24 14:11:22 +02:00
3d03658850 Add tests 2024-04-24 08:59:01 +02:00
69b848d341 Respect --dry-run when downloading videos 2024-04-24 08:58:58 +02:00
2422871d70 Add request logging 2024-04-24 08:43:45 +02:00
44890b4101 Pass auth_token instead of headers 2024-04-24 08:43:00 +02:00
9aa108acbf Add type hints for post 2024-04-24 08:42:01 +02:00
3fa8bcef73 Pass request as content instead of data
data works, but should be used for form data passed as a dict, while
content takes bytes or str
2024-04-24 08:41:19 +02:00
3c0f8a8ece Add some cli tests 2024-04-24 08:38:55 +02:00
7845cf6f72 Improve typing 2024-04-24 08:11:33 +02:00
6ed98fa4ef Fix tuple typing to work on py 3.8 2024-04-24 08:10:56 +02:00
d777f0e98a Set up testing on github 2024-04-24 08:10:20 +02:00
ad0f0d2a41 Update changelog 2024-04-23 18:14:22 +02:00
1057fff61a Hopefully fix python 3.8 compat 2024-04-23 18:13:40 +02:00
f1924715ed Apply formatting 2024-04-23 17:35:46 +02:00
ca38b9239f Update changelog 2024-04-23 17:25:48 +02:00
a0ad66ee69 Remove union types to fix python 3.8 compat 2024-04-23 17:15:18 +02:00
e50499351b Make printing progress a bit more readable 2024-04-12 09:43:04 +02:00
0d3c3df2f8 Fix double colon in prompt 2024-04-10 08:47:38 +02:00
446b4f9f91 Fix progress calc when video duration is 0
fixes #146
2024-04-10 08:47:11 +02:00
bce573ef3c Update changelog 2024-04-10 08:32:21 +02:00
3ffa7acfef Fix url 2024-04-10 08:25:03 +02:00
30301c07b9 Fix tests 2024-04-10 08:04:21 +02:00
c1b58e178f Add clips --copact option 2024-04-06 10:43:12 +02:00
a7ad4d8dcc Extract playlist parsing code 2024-04-06 10:15:26 +02:00
d6390bc7a2 Fix bug in accessing video qualities 2024-04-04 08:36:10 +02:00
28f1977d1c Apply ruff formatting 2024-04-04 08:20:10 +02:00
be69e79b28 Add Chapter typed dict 2024-04-03 08:52:51 +02:00
c0e66fc416 Add AccessToken typed dict 2024-04-03 08:14:50 +02:00
a9facd46ac Add Clip typed dict 2024-04-02 09:50:26 +02:00
6a1900b628 Add Video typed dict 2024-04-02 09:44:50 +02:00
3270d857b1 Extract print_paged, improve types 2024-04-01 09:40:54 +02:00
3ae99fe159 Replace custom coloring fns with click.echo and style 2024-03-31 21:41:05 +02:00
9cf3ec2f07 Replace print_out with click.echo 2024-03-30 15:36:53 +01:00
64b88249f2 Remove print_err, unused 2024-03-30 10:11:44 +01:00
5dcc868275 Imporve types 2024-03-30 07:52:43 +01:00
11fbfd35fc Print placeholders in twitch-dl info 2024-03-30 07:35:08 +01:00
26 changed files with 1104 additions and 578 deletions

27
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: Run tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[test]"
- name: Run tests
run: |
pytest
- name: Validate minimum required version
run: |
vermin --no-tips twitchdl

View File

@ -3,11 +3,37 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.--> <!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0) ### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* **Requires python 3.8 or later** * Add m dot url support to video and clip regexes (thanks @localnerve)
* Migrated to click lib for cli parsing
* Add shell auto completion ### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)
### [2.2.1 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.1)
* Fix compat with Python < 3.10 (#152)
* Fix division by zero in progress calculation when video duration is reported
as 0
### [2.2.0 (2024-04-10)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires Python 3.8+**
* Migrated to Click library for generating the commandline interface
* Add shell auto completion, see 'Shell completion' in docs.
* Add setting defaults via environment variables, see 'Environment variables' in
docs
* Add `download --concat` option to avoid using ffmeg for joinig vods and concat
them instead. This will produce a `.ts` file by default.
* Add `download --dry-run` option to skip actual download (thanks @metacoma)
* Add video description to metadata (#129)
* Add `clips --compact` option for listing clips in one-per-line mode
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4) ### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)

View File

@ -24,7 +24,7 @@ publish :
twine upload dist/*.tar.gz dist/*.whl twine upload dist/*.tar.gz dist/*.whl
coverage: coverage:
py.test --cov=toot --cov-report html tests/ pytest --cov=twitchdl --cov-report html tests/
man: man:
scdoc < twitch-dl.1.scd > twitch-dl.1.man scdoc < twitch-dl.1.scd > twitch-dl.1.man

View File

@ -1,12 +1,36 @@
2.2.0: 2.2.4:
date: TBA date: 2024-04-25
changes: changes:
- "**Requires python 3.8 or later**" - "Add m dot url support to video and clip regexes (thanks @localnerve)"
2.2.3:
date: 2024-04-24
changes:
- "Respect --dry-run option when downloading videos"
- "Add automated tests on github actions"
2.2.2:
date: 2024-04-23
changes:
- "Fix more compat issues Python < 3.10 (#152)"
2.2.1:
date: 2024-04-23
changes:
- "Fix compat with Python < 3.10 (#152)"
- "Fix division by zero in progress calculation when video duration is reported as 0"
2.2.0:
date: 2024-04-10
changes:
- "**Requires Python 3.8+**"
- "Migrated to Click library for generating the commandline interface" - "Migrated to Click library for generating the commandline interface"
- "Add shell auto completion, see: https://twitch-dl.bezdomni.net/shell_completion.html" - "Add shell auto completion, see 'Shell completion' in docs."
- "Add setting defaults via environment variables, see: https://twitch-dl.bezdomni.net/environment_variables.html" - "Add setting defaults via environment variables, see 'Environment variables' in docs"
- "Add `download --concat` option to avoid using ffmeg for joinig vods and concat them instead. This will produce a `.ts` file by default." - "Add `download --concat` option to avoid using ffmeg for joinig vods and concat them instead. This will produce a `.ts` file by default."
- "Add `download --dry-run` option to skip actual download (thanks @metacoma)"
- "Add video description to metadata (#129)" - "Add video description to metadata (#129)"
- "Add `clips --compact` option for listing clips in one-per-line mode"
2.1.4: 2.1.4:
date: 2024-01-06 date: 2024-01-06

View File

@ -3,11 +3,37 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.--> <!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0) ### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* **Requires python 3.8 or later** * Add m dot url support to video and clip regexes (thanks @localnerve)
* Migrated to click lib for cli parsing
* Add shell auto completion ### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)
### [2.2.1 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.1)
* Fix compat with Python < 3.10 (#152)
* Fix division by zero in progress calculation when video duration is reported
as 0
### [2.2.0 (2024-04-10)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires Python 3.8+**
* Migrated to Click library for generating the commandline interface
* Add shell auto completion, see 'Shell completion' in docs.
* Add setting defaults via environment variables, see 'Environment variables' in
docs
* Add `download --concat` option to avoid using ffmeg for joinig vods and concat
them instead. This will produce a `.ts` file by default.
* Add `download --dry-run` option to skip actual download (thanks @metacoma)
* Add video description to metadata (#129)
* Add `clips --compact` option for listing clips in one-per-line mode
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4) ### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)

View File

@ -18,6 +18,11 @@ twitch-dl clips [OPTIONS] CHANNEL_NAME
<td>Fetch all clips, overrides --limit</td> <td>Fetch all clips, overrides --limit</td>
</tr> </tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show clips in compact mode, one line per video</td>
</tr>
<tr> <tr>
<td class="code">-d, --download</td> <td class="code">-d, --download</td>
<td>Download clips in given period (in source quality)</td> <td>Download clips in given period (in source quality)</td>
@ -25,7 +30,7 @@ twitch-dl clips [OPTIONS] CHANNEL_NAME
<tr> <tr>
<td class="code">-l, --limit INTEGER</td> <td class="code">-l, --limit INTEGER</td>
<td>Number of clips to fetch [max: 100] [default: <code>10</code>]</td> <td>Number of clips to fetch. Defaults to 40 in compact mode, 10 otherwise.</td>
</tr> </tr>
<tr> <tr>

View File

@ -28,7 +28,7 @@ twitch-dl download [OPTIONS] [IDS]...
<tr> <tr>
<td class="code">--concat</td> <td class="code">--concat</td>
<td>Do not use ffmpeg to join files, concat them instead</td> <td>Do not use ffmpeg to join files, concat them instead. This will produce a .ts file by default.</td>
</tr> </tr>
<tr> <tr>

View File

@ -43,6 +43,11 @@ dev = [
"vermin", "vermin",
] ]
test = [
"pytest",
"vermin",
]
[project.urls] [project.urls]
"Homepage" = "https://twitch-dl.bezdomni.net/" "Homepage" = "https://twitch-dl.bezdomni.net/"
"Source" = "https://github.com/ihabunek/twitch-dl" "Source" = "https://github.com/ihabunek/twitch-dl"
@ -51,8 +56,9 @@ dev = [
twitch-dl = "twitchdl.cli:cli" twitch-dl = "twitchdl.cli:cli"
[tool.pyright] [tool.pyright]
include = ["twitchdl"]
typeCheckingMode = "strict" typeCheckingMode = "strict"
pythonVersion = "3.8"
[tool.ruff] [tool.ruff]
line-length = 100 line-length = 100
target-version = "py38"

View File

@ -11,12 +11,10 @@ Usage: tag_version [version]
import subprocess import subprocess
import sys import sys
import textwrap import textwrap
import yaml
import twitchdl
from datetime import date from datetime import date
from os import path from os import path
from pkg_resources import get_distribution
import yaml
path = path.join(path.dirname(path.dirname(path.abspath(__file__))), "changelog.yaml") path = path.join(path.dirname(path.dirname(path.abspath(__file__))), "changelog.yaml")
with open(path, "r") as f: with open(path, "r") as f:
@ -33,15 +31,6 @@ if not changelog_item:
print(f"Version `{version}` not found in changelog.", file=sys.stderr) print(f"Version `{version}` not found in changelog.", file=sys.stderr)
sys.exit(1) sys.exit(1)
if twitchdl.__version__ != version:
print(f"twitchdl.__version__ is `{twitchdl.__version__}`, expected {version}.", file=sys.stderr)
sys.exit(1)
dist_version = get_distribution('twitch-dl').version
if dist_version != version:
print(f"Version in setup.py is `{dist_version}`, expected {version}.", file=sys.stderr)
sys.exit(1)
release_date = changelog_item["date"] release_date = changelog_item["date"]
changes = changelog_item["changes"] changes = changelog_item["changes"]
description = changelog_item["description"] if "description" in changelog_item else None description = changelog_item["description"] if "description" in changelog_item else None

View File

@ -3,9 +3,13 @@ These tests depend on the channel having some videos and clips published.
""" """
import httpx import httpx
import m3u8 import pytest
from twitchdl import twitch from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.commands.videos import get_game_ids
from twitchdl.exceptions import ConsoleError
from twitchdl.playlists import enumerate_vods, load_m3u8, parse_playlists
TEST_CHANNEL = "bananasaurus_rex" TEST_CHANNEL = "bananasaurus_rex"
@ -17,22 +21,25 @@ def test_get_videos():
video_id = videos["edges"][0]["node"]["id"] video_id = videos["edges"][0]["node"]["id"]
video = twitch.get_video(video_id) video = twitch.get_video(video_id)
assert video is not None
assert video["id"] == video_id assert video["id"] == video_id
access_token = twitch.get_access_token(video_id) access_token = twitch.get_access_token(video_id)
assert "signature" in access_token assert "signature" in access_token
assert "value" in access_token assert "value" in access_token
playlists = twitch.get_playlists(video_id, access_token) playlists_txt = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U") assert playlists_txt.startswith("#EXTM3U")
name, res, url = next(_parse_playlists(playlists)) playlists = parse_playlists(playlists_txt)
playlist = httpx.get(url).text playlist_url = playlists[0].url
assert playlist.startswith("#EXTM3U")
playlist = m3u8.loads(playlist) playlist_txt = httpx.get(playlist_url).text
vod_path = playlist.segments[0].uri assert playlist_txt.startswith("#EXTM3U")
assert vod_path == "0.ts"
playlist_m3u8 = load_m3u8(playlist_txt)
vods = enumerate_vods(playlist_m3u8)
assert vods[0].path == "0.ts"
def test_get_clips(): def test_get_clips():
@ -45,6 +52,19 @@ def test_get_clips():
slug = clips["edges"][0]["node"]["slug"] slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(slug) clip = twitch.get_clip(slug)
assert clip is not None
assert clip["slug"] == slug assert clip["slug"] == slug
assert get_clip_authenticated_url(slug, "source") assert get_clip_authenticated_url(slug, "source")
def test_get_games():
assert get_game_ids([]) == []
assert get_game_ids(["Bioshock"]) == ["15866"]
assert get_game_ids(["Bioshock", "Portal"]) == ["15866", "6187"]
def test_get_games_not_found():
with pytest.raises(ConsoleError) as ex:
get_game_ids(["the game which does not exist"])
assert str(ex.value) == "Game 'the game which does not exist' not found"

156
tests/test_cli.py Normal file
View File

@ -0,0 +1,156 @@
import json
import pytest
from click.testing import CliRunner, Result
from twitchdl import cli
@pytest.fixture(scope="session")
def runner():
return CliRunner(mix_stderr=False)
def assert_ok(result: Result):
if result.exit_code != 0:
raise AssertionError(
f"Command failed with exit code {result.exit_code}\nStderr: {result.stderr}"
)
def test_info_video(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595"])
assert_ok(result)
assert "Frost Fatales 2024 Day 1" in result.stdout
assert "frozenflygone playing Tomb Raider" in result.stdout
def test_info_video_json(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595", "--json"])
assert_ok(result)
video = json.loads(result.stdout)
assert video["title"] == "Frost Fatales 2024 Day 1"
assert video["game"] == {"id": "2770", "name": "Tomb Raider"}
assert video["creator"] == {"login": "frozenflygone", "displayName": "frozenflygone"}
def test_info_clip(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank"])
assert_ok(result)
assert "AGDQ Crashes during Bioshock run" in result.stdout
assert "GamesDoneQuick playing BioShock" in result.stdout
def test_info_clip_json(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank", "--json"])
assert_ok(result)
clip = json.loads(result.stdout)
assert clip["slug"] == "PoisedTalentedPuddingChefFrank"
assert clip["title"] == "AGDQ Crashes during Bioshock run"
assert clip["game"] == {"id": "15866", "name": "BioShock"}
assert clip["broadcaster"] == {"displayName": "GamesDoneQuick", "login": "gamesdonequick"}
def test_info_not_found(runner: CliRunner):
result = runner.invoke(cli.info, ["banana"])
assert result.exit_code == 1
assert "Clip banana not found" in result.stderr
result = runner.invoke(cli.info, ["12345"])
assert result.exit_code == 1
assert "Video 12345 not found" in result.stderr
result = runner.invoke(cli.info, [""])
assert result.exit_code == 1
assert "Invalid input" in result.stderr
def test_download_clip(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"PoisedTalentedPuddingChefFrank",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert (
"Found: AGDQ Crashes during Bioshock run by GamesDoneQuick, playing BioShock (30 sec)"
in result.stdout
)
assert (
"Target: 2020-01-10_3099545841_gamesdonequick_agdq_crashes_during_bioshock_run.mp4"
in result.stdout
)
assert "Dry run, clip not downloaded." in result.stdout
def test_download_video(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"2090131595",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert "Found: Frost Fatales 2024 Day 1 by frozenflygone" in result.stdout
assert (
"Output: 2024-03-14_2090131595_frozenflygone_frost_fatales_2024_day_1.mkv" in result.stdout
)
assert "Dry run, video not downloaded." in result.stdout
def test_videos(runner: CliRunner):
result = runner.invoke(cli.videos, ["gamesdonequick", "--json"])
assert_ok(result)
videos = json.loads(result.stdout)
assert videos["count"] == 10
assert videos["totalCount"] > 0
video = videos["videos"][0]
result = runner.invoke(cli.videos, "gamesdonequick")
assert_ok(result)
assert f"Video {video['id']}" in result.stdout
assert video["title"] in result.stdout
result = runner.invoke(cli.videos, ["gamesdonequick", "--compact"])
assert_ok(result)
assert video["id"] in result.stdout
assert video["title"][:60] in result.stdout
def test_videos_channel_not_found(runner: CliRunner):
result = runner.invoke(cli.videos, ["doesnotexisthopefully"])
assert result.exit_code == 1
assert result.stderr.strip() == "Error: Channel doesnotexisthopefully not found"
def test_clips(runner: CliRunner):
result = runner.invoke(cli.clips, ["gamesdonequick", "--json"])
assert_ok(result)
clips = json.loads(result.stdout)
clip = clips[0]
result = runner.invoke(cli.clips, "gamesdonequick")
assert_ok(result)
assert f"Clip {clip['slug']}" in result.stdout
assert clip["title"] in result.stdout
result = runner.invoke(cli.clips, ["gamesdonequick", "--compact"])
assert_ok(result)
assert clip["slug"] in result.stdout
assert clip["title"][:60] in result.stdout

View File

@ -1,35 +1,38 @@
import pytest import pytest
from twitchdl.utils import parse_video_identifier, parse_clip_identifier from twitchdl.utils import parse_clip_identifier, parse_video_identifier
TEST_VIDEO_PATTERNS = [ TEST_VIDEO_PATTERNS = [
("702689313", "702689313"), ("702689313", "702689313"),
("702689313", "https://twitch.tv/videos/702689313"), ("702689313", "https://twitch.tv/videos/702689313"),
("702689313", "https://www.twitch.tv/videos/702689313"), ("702689313", "https://www.twitch.tv/videos/702689313"),
("702689313", "https://m.twitch.tv/videos/702689313"),
] ]
TEST_CLIP_PATTERNS = { TEST_CLIP_PATTERNS = {
("AbrasivePlayfulMangoMau5", "AbrasivePlayfulMangoMau5"), ("AbrasivePlayfulMangoMau5", "AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://clips.twitch.tv/AbrasivePlayfulMangoMau5"), ("AbrasivePlayfulMangoMau5", "https://clips.twitch.tv/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://www.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"), ("AbrasivePlayfulMangoMau5", "https://www.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://m.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"), ("AbrasivePlayfulMangoMau5", "https://twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("HungryProudRadicchioDoggo", "HungryProudRadicchioDoggo"), ("HungryProudRadicchioDoggo", "HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://clips.twitch.tv/HungryProudRadicchioDoggo"), ("HungryProudRadicchioDoggo", "https://clips.twitch.tv/HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://www.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"), ("HungryProudRadicchioDoggo", "https://www.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://m.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"), ("HungryProudRadicchioDoggo", "https://twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"), ("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"), ("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"), ("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://www.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"), ("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://www.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://m.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
} }
@pytest.mark.parametrize("expected,input", TEST_VIDEO_PATTERNS) @pytest.mark.parametrize("expected,input", TEST_VIDEO_PATTERNS)
def test_video_patterns(expected, input): def test_video_patterns(expected: str, input: str):
assert parse_video_identifier(input) == expected assert parse_video_identifier(input) == expected
@pytest.mark.parametrize("expected,input", TEST_CLIP_PATTERNS) @pytest.mark.parametrize("expected,input", TEST_CLIP_PATTERNS)
def test_clip_patterns(expected, input): def test_clip_patterns(expected: str, input: str):
assert parse_clip_identifier(input) == expected assert parse_clip_identifier(input) == expected

View File

@ -1,12 +1,14 @@
import click
import logging import logging
import platform import platform
import re import re
import sys import sys
from typing import Optional, Tuple
import click
from twitchdl import __version__ from twitchdl import __version__
from twitchdl.commands.clips import ClipsPeriod
from twitchdl.entities import DownloadOptions from twitchdl.entities import DownloadOptions
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context # Tweak the Click context
# https://click.palletsprojects.com/en/8.1.x/api/#context # https://click.palletsprojects.com/en/8.1.x/api/#context
@ -29,13 +31,13 @@ json_option = click.option(
) )
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: int | None): def validate_positive(_ctx: click.Context, _param: click.Parameter, value: Optional[int]):
if value is not None and value <= 0: if value is not None and value <= 0:
raise click.BadParameter("must be greater than 0") raise click.BadParameter("must be greater than 0")
return value return value
def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None: def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> Optional[int]:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds.""" """Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
if not value: if not value:
return None return None
@ -55,7 +57,7 @@ def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> i
return hours * 3600 + minutes * 60 + seconds return hours * 3600 + minutes * 60 + seconds
def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None: def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> Optional[int]:
if not value: if not value:
return None return None
@ -84,12 +86,14 @@ def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> i
def cli(ctx: click.Context, color: bool, debug: bool): def cli(ctx: click.Context, color: bool, debug: bool):
"""twitch-dl - twitch.tv downloader """twitch-dl - twitch.tv downloader
https://toot.bezdomni.net/ https://twitch-dl.bezdomni.net/
""" """
ctx.color = color ctx.color = color
if debug: if debug:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.WARN)
logging.getLogger("httpcore").setLevel(logging.WARN)
@cli.command() @cli.command()
@ -100,6 +104,12 @@ def cli(ctx: click.Context, color: bool, debug: bool):
help="Fetch all clips, overrides --limit", help="Fetch all clips, overrides --limit",
is_flag=True, is_flag=True,
) )
@click.option(
"-c",
"--compact",
help="Show clips in compact mode, one line per video",
is_flag=True,
)
@click.option( @click.option(
"-d", "-d",
"--download", "--download",
@ -109,9 +119,8 @@ def cli(ctx: click.Context, color: bool, debug: bool):
@click.option( @click.option(
"-l", "-l",
"--limit", "--limit",
help="Number of clips to fetch [max: 100]", help="Number of clips to fetch. Defaults to 40 in compact mode, 10 otherwise.",
type=int, type=int,
default=10,
callback=validate_positive, callback=validate_positive,
) )
@click.option( @click.option(
@ -134,10 +143,11 @@ def cli(ctx: click.Context, color: bool, debug: bool):
def clips( def clips(
channel_name: str, channel_name: str,
all: bool, all: bool,
compact: bool,
download: bool, download: bool,
json: bool, json: bool,
limit: int, limit: Optional[int],
pager: int | None, pager: Optional[int],
period: ClipsPeriod, period: ClipsPeriod,
): ):
"""List or download clips for given CHANNEL_NAME.""" """List or download clips for given CHANNEL_NAME."""
@ -146,6 +156,7 @@ def clips(
clips( clips(
channel_name, channel_name,
all=all, all=all,
compact=compact,
download=download, download=download,
json=json, json=json,
limit=limit, limit=limit,
@ -246,20 +257,20 @@ def clips(
default=5, default=5,
) )
def download( def download(
ids: tuple[str, ...], ids: Tuple[str, ...],
auth_token: str | None, auth_token: Optional[str],
chapter: int | None, chapter: Optional[int],
concat: bool, concat: bool,
dry_run: bool, dry_run: bool,
end: int | None, end: Optional[int],
format: str, format: str,
keep: bool, keep: bool,
no_join: bool, no_join: bool,
overwrite: bool, overwrite: bool,
output: str, output: str,
quality: str | None, quality: Optional[str],
rate_limit: str | None, rate_limit: Optional[int],
start: int | None, start: Optional[int],
max_workers: int, max_workers: int,
): ):
"""Download videos or clips. """Download videos or clips.
@ -365,12 +376,12 @@ def videos(
channel_name: str, channel_name: str,
all: bool, all: bool,
compact: bool, compact: bool,
games_tuple: tuple[str, ...], games_tuple: Tuple[str, ...],
json: bool, json: bool,
limit: int | None, limit: Optional[int],
pager: int | None, pager: Optional[int],
sort: str, sort: VideosSort,
type: str, type: VideosType,
): ):
"""List or download clips for given CHANNEL_NAME.""" """List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.videos import videos from twitchdl.commands.videos import videos

View File

@ -1,30 +1,33 @@
import re import re
import sys import sys
from typing import Literal
from itertools import islice
from os import path from os import path
from typing import Callable, Generator, Optional
import click
from twitchdl import twitch, utils from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file from twitchdl.download import download_file
from twitchdl.output import print_out, print_clip, print_json from twitchdl.output import green, print_clip, print_clip_compact, print_json, print_paged, yellow
from twitchdl.twitch import Clip, ClipsPeriod
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
def clips( def clips(
channel_name: str, channel_name: str,
*, *,
all: bool = False, all: bool = False,
compact: bool = False,
download: bool = False, download: bool = False,
json: bool = False, json: bool = False,
limit: int = 10, limit: Optional[int] = None,
pager: int | None = None, pager: Optional[int] = None,
period: ClipsPeriod = "all_time", period: ClipsPeriod = "all_time",
): ):
# Set different defaults for limit for compact display
default_limit = 40 if compact else 10
# Ignore --limit if --pager or --all are given # Ignore --limit if --pager or --all are given
limit = sys.maxsize if all or pager else limit limit = sys.maxsize if all or pager else (limit or default_limit)
generator = twitch.channel_clips_generator(channel_name, period, limit) generator = twitch.channel_clips_generator(channel_name, period, limit)
@ -34,24 +37,15 @@ def clips(
if download: if download:
return _download_clips(generator) return _download_clips(generator)
print_fn = print_clip_compact if compact else print_clip
if pager: if pager:
return _print_paged(generator, pager) return print_paged("Clips", generator, print_fn, pager)
return _print_all(generator, all) return _print_all(generator, print_fn, all)
def _continue(): def _target_filename(clip: Clip):
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
try:
input()
except KeyboardInterrupt:
return False
return True
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"] url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url) _, ext = path.splitext(url)
ext = ext.lstrip(".") ext = ext.lstrip(".")
@ -61,63 +55,41 @@ def _target_filename(clip):
raise ValueError(f"Failed parsing date from: {clip['createdAt']}") raise ValueError(f"Failed parsing date from: {clip['createdAt']}")
date = "".join(match.groups()) date = "".join(match.groups())
name = "_".join([ name = "_".join(
date, [
clip["id"], date,
clip["broadcaster"]["login"], clip["id"],
utils.slugify(clip["title"]), clip["broadcaster"]["login"],
]) utils.slugify(clip["title"]),
]
)
return f"{name}.{ext}" return f"{name}.{ext}"
def _download_clips(generator): def _download_clips(generator: Generator[Clip, None, None]):
for clip in generator: for clip in generator:
target = _target_filename(clip) target = _target_filename(clip)
if path.exists(target): if path.exists(target):
print_out(f"Already downloaded: <green>{target}</green>") click.echo(f"Already downloaded: {green(target)}")
else: else:
url = get_clip_authenticated_url(clip["slug"], "source") url = get_clip_authenticated_url(clip["slug"], "source")
print_out(f"Downloading: <yellow>{target}</yellow>") click.echo(f"Downloading: {yellow(target)}")
download_file(url, target) download_file(url, target)
def _print_all(generator, all: bool): def _print_all(
generator: Generator[Clip, None, None],
print_fn: Callable[[Clip], None],
all: bool,
):
for clip in generator: for clip in generator:
print_out() print_fn(clip)
print_clip(clip)
if not all: if not all:
print_out( click.secho(
"\n<dim>There may be more clips. " + "\nThere may be more clips. "
"Increase the --limit, use --all or --pager to see the rest.</dim>" + "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
) )
def _print_paged(generator, page_size):
iterator = iter(generator)
page = list(islice(iterator, page_size))
first = 1
last = first + len(page) - 1
while True:
print_out("-" * 80)
print_out()
for clip in page:
print_clip(clip)
print_out()
last = first + len(page) - 1
print_out("-" * 80)
print_out(f"<yellow>Clips {first}-{last}</yellow>")
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not _continue():
break

View File

@ -1,27 +1,35 @@
import asyncio import asyncio
import platform
import httpx
import m3u8
import os import os
import platform
import re import re
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
from os import path from os import path
from pathlib import Path from pathlib import Path
from typing import List, Optional, OrderedDict from typing import Dict, List
from urllib.parse import urlparse, urlencode from urllib.parse import urlencode, urlparse
import click
import httpx
from twitchdl import twitch, utils from twitchdl import twitch, utils
from twitchdl.download import download_file from twitchdl.download import download_file
from twitchdl.entities import Data, DownloadOptions from twitchdl.entities import DownloadOptions
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all from twitchdl.http import download_all
from twitchdl.output import print_out from twitchdl.output import blue, bold, green, print_log, yellow
from twitchdl.playlists import (
enumerate_vods,
load_m3u8,
make_join_playlist,
parse_playlists,
select_playlist,
)
from twitchdl.twitch import Chapter, Clip, ClipAccessToken, Video
def download(ids: list[str], args: DownloadOptions): def download(ids: List[str], args: DownloadOptions):
for video_id in ids: for video_id in ids:
download_one(video_id, args) download_one(video_id, args)
@ -38,73 +46,40 @@ def download_one(video: str, args: DownloadOptions):
raise ConsoleError(f"Invalid input: {video}") raise ConsoleError(f"Invalid input: {video}")
def _parse_playlists(playlists_m3u8): def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
playlists = m3u8.loads(playlists_m3u8)
for p in sorted(playlists.playlists, key=lambda p: p.stream_info.resolution is None):
if p.stream_info.resolution:
name = p.media[0].name
description = "x".join(str(r) for r in p.stream_info.resolution)
else:
name = p.media[0].group_id
description = None
yield name, description, p.uri
def _get_playlist_by_name(playlists, quality):
if quality == "source":
_, _, uri = playlists[0]
return uri
for name, _, uri in playlists:
if name == quality:
return uri
available = ", ".join([name for (name, _, _) in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
print_out(f"{n + 1}) <b>{name}</b> <dim>({resolution})</dim>")
else:
print_out(f"{n + 1}) <b>{name}</b>")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
description = video["description"] or "" description = video["description"] or ""
description = description.strip() description = description.strip()
command = [ command = [
"ffmpeg", "ffmpeg",
"-i", playlist_path, "-i",
"-c", "copy", playlist_path,
"-metadata", f"artist={video['creator']['displayName']}", "-c",
"-metadata", f"title={video['title']}", "copy",
"-metadata", f"description={description}", "-metadata",
"-metadata", "encoded_by=twitch-dl", f"artist={video['creator']['displayName']}",
"-metadata",
f"title={video['title']}",
"-metadata",
f"description={description}",
"-metadata",
"encoded_by=twitch-dl",
"-stats", "-stats",
"-loglevel", "warning", "-loglevel",
"warning",
f"file:{target}", f"file:{target}",
] ]
if overwrite: if overwrite:
command.append("-y") command.append("-y")
print_out(f"<dim>{' '.join(command)}</dim>") click.secho(f"{' '.join(command)}", dim=True)
result = subprocess.run(command) result = subprocess.run(command)
if result.returncode != 0: if result.returncode != 0:
raise ConsoleError("Joining files failed") raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: list[str], target: str):
def _concat_vods(vod_paths: List[str], target: str):
tool = "type" if platform.system() == "Windows" else "cat" tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + vod_paths command = [tool] + vod_paths
@ -114,8 +89,8 @@ def _concat_vods(vod_paths: list[str], target: str):
raise ConsoleError(f"Joining files failed: {result.stderr}") raise ConsoleError(f"Joining files failed: {result.stderr}")
def get_video_substitutions(video: Data, format: str) -> Data: def get_video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video['publishedAt'].split("T") date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown" game = video["game"]["name"] if video["game"] else "Unknown"
return { return {
@ -133,8 +108,8 @@ def get_video_substitutions(video: Data, format: str) -> Data:
} }
def _video_target_filename(video: Data, args: DownloadOptions): def _video_target_filename(video: Video, args: DownloadOptions):
subs = get_video_substitutions(video, args.format) subs = get_video_placeholders(video, args.format)
try: try:
return args.output.format(**subs) return args.output.format(**subs)
@ -143,7 +118,7 @@ def _video_target_filename(video: Data, args: DownloadOptions):
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}") raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip, args: DownloadOptions): def _clip_target_filename(clip: Clip, args: DownloadOptions):
date, time = clip["createdAt"].split("T") date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown" game = clip["game"]["name"] if clip["game"] else "Unknown"
@ -173,26 +148,6 @@ def _clip_target_filename(clip, args: DownloadOptions):
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}") raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
"""Extract unique VOD paths for download from playlist."""
files = []
vod_start = 0
for segment in playlist.segments:
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition and segment.uri not in files:
files.append(segment.uri)
vod_start = vod_end
return files
def _crete_temp_dir(base_uri: str) -> str: def _crete_temp_dir(base_uri: str) -> str:
"""Create a temp dir to store downloads if it doesn't exist.""" """Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/") path = urlparse(base_uri).path.lstrip("/")
@ -201,8 +156,8 @@ def _crete_temp_dir(base_uri: str) -> str:
return str(temp_dir) return str(temp_dir)
def _get_clip_url(clip, quality): def _get_clip_url(access_token: ClipAccessToken, quality: str) -> str:
qualities = clip["videoQualities"] qualities = access_token["videoQualities"]
# Quality given as an argument # Quality given as an argument
if quality: if quality:
@ -219,18 +174,18 @@ def _get_clip_url(clip, quality):
raise ConsoleError(msg) raise ConsoleError(msg)
# Ask user to select quality # Ask user to select quality
print_out("\nAvailable qualities:") click.echo("\nAvailable qualities:")
for n, q in enumerate(qualities): for n, q in enumerate(qualities):
print_out(f"{n + 1}) {q['quality']} [{q['frameRate']} fps]") click.echo(f"{n + 1}) {bold(q['quality'])} [{q['frameRate']} fps]")
print_out() click.echo()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1) no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
selected_quality = qualities[no - 1] selected_quality = qualities[no - 1]
return selected_quality["sourceURL"] return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug, quality): def get_clip_authenticated_url(slug: str, quality: str):
print_out("<dim>Fetching access token...</dim>") print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug) access_token = twitch.get_clip_access_token(slug)
if not access_token: if not access_token:
@ -238,150 +193,145 @@ def get_clip_authenticated_url(slug, quality):
url = _get_clip_url(access_token, quality) url = _get_clip_url(access_token, quality)
query = urlencode({ query = urlencode(
"sig": access_token["playbackAccessToken"]["signature"], {
"token": access_token["playbackAccessToken"]["value"], "sig": access_token["playbackAccessToken"]["signature"],
}) "token": access_token["playbackAccessToken"]["value"],
}
)
return f"{url}?{query}" return f"{url}?{query}"
def _download_clip(slug: str, args: DownloadOptions) -> None: def _download_clip(slug: str, args: DownloadOptions) -> None:
print_out("<dim>Looking up clip...</dim>") print_log("Looking up clip...")
clip = twitch.get_clip(slug) clip = twitch.get_clip(slug)
if not clip: if not clip:
raise ConsoleError(f"Clip '{slug}' not found") raise ConsoleError(f"Clip '{slug}' not found")
title = clip["title"] title = clip["title"]
user = clip["broadcaster"]["displayName"] user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown" game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"]) duration = utils.format_duration(clip["durationSeconds"])
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
print_out(
f"Found: <green>{title}</green> by <yellow>{user}</yellow>, "+
f"playing <blue>{game}</blue> ({duration})"
)
target = _clip_target_filename(clip, args) target = _clip_target_filename(clip, args)
print_out(f"Target: <blue>{target}</blue>") click.echo(f"Target: {blue(target)}")
if not args.overwrite and path.exists(target): if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ") response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() not in ["", "y"]: if response.lower().strip() != "y":
raise ConsoleError("Aborted") raise click.Abort()
args.overwrite = True args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality) url = get_clip_authenticated_url(slug, args.quality)
print_out(f"<dim>Selected URL: {url}</dim>") print_log(f"Selected URL: {url}")
print_out("<dim>Downloading clip...</dim>") if args.dry_run:
click.echo("Dry run, clip not downloaded.")
if (args.dry_run is False): else:
print_log("Downloading clip...")
download_file(url, target) download_file(url, target)
click.echo(f"Downloaded: {blue(target)}")
print_out(f"Downloaded: <blue>{target}</blue>")
def _download_video(video_id, args: DownloadOptions) -> None: def _download_video(video_id: str, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start: if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time") raise ConsoleError("End time must be greater than start time")
print_out("<dim>Looking up video...</dim>") print_log("Looking up video...")
video = twitch.get_video(video_id) video = twitch.get_video(video_id)
if not video: if not video:
raise ConsoleError(f"Video {video_id} not found") raise ConsoleError(f"Video {video_id} not found")
title = video['title'] click.echo(f"Found: {blue(video['title'])} by {yellow(video['creator']['displayName'])}")
user = video['creator']['displayName']
print_out(f"Found: <blue>{title}</blue> by <yellow>{user}</yellow>")
target = _video_target_filename(video, args) target = _video_target_filename(video, args)
print_out(f"Output: <blue>{target}</blue>") click.echo(f"Output: {blue(target)}")
if not args.overwrite and path.exists(target): if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ") response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() not in ["", "y"]: if response.lower().strip() != "y":
raise ConsoleError("Aborted") raise click.Abort()
args.overwrite = True args.overwrite = True
# Chapter select or manual offset # Chapter select or manual offset
start, end = _determine_time_range(video_id, args) start, end = _determine_time_range(video_id, args)
print_out("<dim>Fetching access token...</dim>") print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token) access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
print_out("<dim>Fetching playlists...</dim>") print_log("Fetching playlists...")
playlists_m3u8 = twitch.get_playlists(video_id, access_token) playlists_text = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8)) playlists = parse_playlists(playlists_text)
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality playlist = select_playlist(playlists, args.quality)
else _select_playlist_interactive(playlists))
print_out("<dim>Fetching playlist...</dim>") print_log("Fetching playlist...")
response = httpx.get(playlist_uri) vods_text = http_get(playlist.url)
response.raise_for_status() vods_m3u8 = load_m3u8(vods_text)
playlist = m3u8.loads(response.text) vods = enumerate_vods(vods_m3u8, start, end)
base_uri = re.sub("/[^/]+$", "/", playlist_uri) if args.dry_run:
click.echo("Dry run, video not downloaded.")
return
base_uri = re.sub("/[^/]+$", "/", playlist.url)
target_dir = _crete_temp_dir(base_uri) target_dir = _crete_temp_dir(base_uri)
vod_paths = _get_vod_paths(playlist, start, end)
# Save playlists for debugging purposes # Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f: with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
f.write(playlists_m3u8) f.write(playlists_text)
with open(path.join(target_dir, "playlist.m3u8"), "w") as f: with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text) f.write(vods_text)
print_out(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}") click.echo(f"\nDownloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)] sources = [base_uri + vod.path for vod in vods]
targets = [os.path.join(target_dir, f"{vod.index:05d}.ts") for vod in vods]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit)) asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
# Make a modified playlist which references downloaded VODs join_playlist = make_join_playlist(vods_m3u8, vods, targets)
# Keep only the downloaded segments and skip the rest join_playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
org_segments = playlist.segments.copy() join_playlist.dump(join_playlist_path) # type: ignore
click.echo()
path_map = OrderedDict(zip(vod_paths, targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path)
print_out("")
if args.no_join: if args.no_join:
print_out("<dim>Skipping joining files...</dim>") print_log("Skipping joining files...")
print_out(f"VODs downloaded to:\n<blue>{target_dir}</blue>") click.echo(f"VODs downloaded to:\n{blue(target_dir)}")
return return
if args.concat: if args.concat:
print_out("<dim>Concating files...</dim>") print_log("Concating files...")
_concat_vods(targets, target) _concat_vods(targets, target)
else: else:
print_out("<dim>Joining files...</dim>") print_log("Joining files...")
_join_vods(playlist_path, target, args.overwrite, video) _join_vods(join_playlist_path, target, args.overwrite, video)
click.echo()
if args.keep: if args.keep:
print_out(f"\n<dim>Temporary files not deleted: {target_dir}</dim>") click.echo(f"Temporary files not deleted: {target_dir}")
else: else:
print_out("\n<dim>Deleting temporary files...</dim>") print_log("Deleting temporary files...")
shutil.rmtree(target_dir) shutil.rmtree(target_dir)
print_out(f"\nDownloaded: <green>{target}</green>") click.echo(f"\nDownloaded: {green(target)}")
def _determine_time_range(video_id, args: DownloadOptions): def http_get(url: str) -> str:
response = httpx.get(url)
response.raise_for_status()
return response.text
def _determine_time_range(video_id: str, args: DownloadOptions):
if args.start or args.end: if args.start or args.end:
return args.start, args.end return args.start, args.end
if args.chapter is not None: if args.chapter is not None:
print_out("<dim>Fetching chapters...</dim>") print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id) chapters = twitch.get_video_chapters(video_id)
if not chapters: if not chapters:
@ -393,9 +343,11 @@ def _determine_time_range(video_id, args: DownloadOptions):
try: try:
chapter = chapters[args.chapter - 1] chapter = chapters[args.chapter - 1]
except IndexError: except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.") raise ConsoleError(
f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters."
)
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n') click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000 start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000 duration = chapter["durationMilliseconds"] // 1000
return start, start + duration return start, start + duration
@ -403,11 +355,11 @@ def _determine_time_range(video_id, args: DownloadOptions):
return None, None return None, None
def _choose_chapter_interactive(chapters): def _choose_chapter_interactive(chapters: List[Chapter]):
print_out("\nChapters:") click.echo("\nChapters:")
for index, chapter in enumerate(chapters): for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000) duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>') click.echo(f'{index + 1}) {bold(chapter["description"])} ({duration})')
index = utils.read_int("Select a chapter", 1, len(chapters)) index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1] chapter = chapters[index - 1]
return chapter return chapter

View File

@ -1,11 +1,17 @@
from typing import List
import click
import m3u8 import m3u8
from twitchdl import utils, twitch from twitchdl import twitch, utils
from twitchdl.commands.download import get_video_substitutions from twitchdl.commands.download import get_video_placeholders
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video
from twitchdl.playlists import parse_playlists
from twitchdl.twitch import Chapter, Clip, Video
def info(id: str, *, json: bool = False, format="mkv"):
def info(id: str, *, json: bool = False):
video_id = utils.parse_video_identifier(id) video_id = utils.parse_video_identifier(id)
if video_id: if video_id:
print_log("Fetching video...") print_log("Fetching video...")
@ -23,16 +29,10 @@ def info(id: str, *, json: bool = False, format="mkv"):
print_log("Fetching chapters...") print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id) chapters = twitch.get_video_chapters(video_id)
substitutions = get_video_substitutions(video, format)
if json: if json:
video_json(video, playlists, chapters) video_json(video, playlists, chapters)
else: else:
video_info(video, playlists, chapters) video_info(video, playlists, chapters)
print_out("\nOutput format placeholders:")
for k, v in substitutions.items():
print(f" * {k} = {v}")
return return
clip_slug = utils.parse_clip_identifier(id) clip_slug = utils.parse_clip_identifier(id)
@ -51,25 +51,29 @@ def info(id: str, *, json: bool = False, format="mkv"):
raise ConsoleError(f"Invalid input: {id}") raise ConsoleError(f"Invalid input: {id}")
def video_info(video, playlists, chapters): def video_info(video: Video, playlists: str, chapters: List[Chapter]):
print_out() click.echo()
print_video(video) print_video(video)
print_out() click.echo("Playlists:")
print_out("Playlists:") for p in parse_playlists(playlists):
for p in m3u8.loads(playlists).playlists: click.echo(f"{bold(p.name)} {p.url}")
print_out(f"<b>{p.stream_info.video}</b> {p.uri}")
if chapters: if chapters:
print_out() click.echo()
print_out("Chapters:") click.echo("Chapters:")
for chapter in chapters: for chapter in chapters:
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True) start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
duration = utils.format_time(chapter["durationMilliseconds"] // 1000) duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{start} <b>{chapter["description"]}</b> ({duration})') click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = get_video_placeholders(video, format="mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
def video_json(video, playlists, chapters): def video_json(video: Video, playlists: str, chapters: List[Chapter]):
playlists = m3u8.loads(playlists).playlists playlists = m3u8.loads(playlists).playlists
video["playlists"] = [ video["playlists"] = [
@ -78,8 +82,9 @@ def video_json(video, playlists, chapters):
"resolution": p.stream_info.resolution, "resolution": p.stream_info.resolution,
"codecs": p.stream_info.codecs, "codecs": p.stream_info.codecs,
"video": p.stream_info.video, "video": p.stream_info.video,
"uri": p.uri "uri": p.uri,
} for p in playlists }
for p in playlists
] ]
video["chapters"] = chapters video["chapters"] = chapters
@ -87,11 +92,11 @@ def video_json(video, playlists, chapters):
print_json(video) print_json(video)
def clip_info(clip): def clip_info(clip: Clip):
print_out() click.echo()
print_clip(clip) print_clip(clip)
print_out() click.echo()
print_out("Download links:") click.echo("Download links:")
for q in clip["videoQualities"]: for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q)) click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")

View File

@ -1,8 +1,11 @@
import sys import sys
from typing import List, Optional
import click
from twitchdl import twitch from twitchdl import twitch
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_out, print_paged_videos, print_video, print_json, print_video_compact from twitchdl.output import print_json, print_log, print_paged, print_video, print_video_compact
def videos( def videos(
@ -10,14 +13,14 @@ def videos(
*, *,
all: bool, all: bool,
compact: bool, compact: bool,
games: list[str], games: List[str],
json: bool, json: bool,
limit: int | None, limit: Optional[int],
pager: int | None, pager: Optional[int],
sort: str, sort: twitch.VideosSort,
type: str, type: twitch.VideosType,
): ):
game_ids = _get_game_ids(games) game_ids = get_game_ids(games)
# Set different defaults for limit for compact display # Set different defaults for limit for compact display
limit = limit or (40 if compact else 10) limit = limit or (40 if compact else 10)
@ -26,23 +29,21 @@ def videos(
max_videos = sys.maxsize if all or pager else limit max_videos = sys.maxsize if all or pager else limit
total_count, generator = twitch.channel_videos_generator( total_count, generator = twitch.channel_videos_generator(
channel_name, max_videos, sort, type, game_ids=game_ids) channel_name, max_videos, sort, type, game_ids=game_ids
)
if json: if json:
videos = list(generator) videos = list(generator)
print_json({ print_json({"count": len(videos), "totalCount": total_count, "videos": videos})
"count": len(videos),
"totalCount": total_count,
"videos": videos
})
return return
if total_count == 0: if total_count == 0:
print_out("<yellow>No videos found</yellow>") click.echo("No videos found")
return return
if pager: if pager:
print_paged_videos(generator, pager, total_count) print_fn = print_video_compact if compact else print_video
print_paged("Videos", generator, print_fn, pager, total_count)
return return
count = 0 count = 0
@ -50,31 +51,29 @@ def videos(
if compact: if compact:
print_video_compact(video) print_video_compact(video)
else: else:
print_out() click.echo()
print_video(video) print_video(video)
count += 1 count += 1
print_out() click.echo()
print_out("-" * 80) click.echo("-" * 80)
print_out(f"<yellow>Videos 1-{count} of {total_count}</yellow>") click.echo(f"Videos 1-{count} of {total_count}")
if total_count > count: if total_count > count:
print_out() click.secho(
print_out( "\nThere are more videos. "
"<dim>There are more videos. Increase the --limit, use --all or --pager to see the rest.</dim>" + "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
) )
def _get_game_ids(names): def get_game_ids(names: List[str]) -> List[str]:
if not names: return [get_game_id(name) for name in names]
return []
game_ids = []
for name in names:
print_out(f"<dim>Looking up game '{name}'...</dim>")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game_id))
return game_ids def get_game_id(name: str) -> str:
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
return game_id

View File

@ -1,4 +1,5 @@
import os import os
import httpx import httpx
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError

View File

@ -1,25 +1,25 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any, Mapping, Optional
@dataclass @dataclass
class DownloadOptions: class DownloadOptions:
auth_token: str | None auth_token: Optional[str]
chapter: int | None chapter: Optional[int]
concat: bool concat: bool
dry_run: bool dry_run: bool
end: int | None end: Optional[int]
format: str format: str
keep: bool keep: bool
no_join: bool no_join: bool
overwrite: bool overwrite: bool
output: str output: str
quality: str | None quality: Optional[str]
rate_limit: str | None rate_limit: Optional[int]
start: int | None start: Optional[int]
max_workers: int max_workers: int
# Type for annotating decoded JSON # Type for annotating decoded JSON
# TODO: make data classes for common structs # TODO: make data classes for common structs
Data = dict[str, Any] Data = Mapping[str, Any]

View File

@ -1,5 +1,7 @@
import click import click
class ConsoleError(click.ClickException): class ConsoleError(click.ClickException):
"""Raised when an error occurs and script exectuion should halt.""" """Raised when an error occurs and script exectuion should halt."""
pass pass

View File

@ -1,12 +1,12 @@
import asyncio import asyncio
import httpx
import logging import logging
import os import os
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Optional from typing import List, Optional
import httpx
from twitchdl.progress import Progress from twitchdl.progress import Progress
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,6 +62,7 @@ class LimitingTokenBucket(TokenBucket):
class EndlessTokenBucket(TokenBucket): class EndlessTokenBucket(TokenBucket):
"""Used when download speed is not limited.""" """Used when download speed is not limited."""
def advance(self, size: int): def advance(self, size: int):
pass pass
@ -122,12 +123,22 @@ async def download_all(
targets: List[str], targets: List[str],
workers: int, workers: int,
*, *,
rate_limit: Optional[int] = None rate_limit: Optional[int] = None,
): ):
progress = Progress(len(sources)) progress = Progress(len(sources))
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket() token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client: async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers) semaphore = asyncio.Semaphore(workers)
tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket) tasks = [
for task_id, (source, target) in enumerate(zip(sources, targets))] download_with_retries(
client,
semaphore,
task_id,
source,
target,
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(zip(sources, targets))
]
await asyncio.gather(*tasks) await asyncio.gather(*tasks)

View File

@ -1,112 +1,56 @@
import json import json
import sys
import re
from itertools import islice from itertools import islice
from typing import Any, Callable, Generator, List, Optional, TypeVar
import click
from twitchdl import utils from twitchdl import utils
from typing import Any, Match from twitchdl.twitch import Clip, Video
T = TypeVar("T")
START_CODES = {
'b': '\033[1m',
'dim': '\033[2m',
'i': '\033[3m',
'u': '\033[4m',
'red': '\033[91m',
'green': '\033[92m',
'yellow': '\033[93m',
'blue': '\033[94m',
'magenta': '\033[95m',
'cyan': '\033[96m',
}
END_CODE = '\033[0m'
START_PATTERN = "<(" + "|".join(START_CODES.keys()) + ")>"
END_PATTERN = "</(" + "|".join(START_CODES.keys()) + ")>"
USE_ANSI_COLOR = "--no-color" not in sys.argv
def start_code(match: Match[str]) -> str:
name = match.group(1)
return START_CODES[name]
def colorize(text: str) -> str:
text = re.sub(START_PATTERN, start_code, text)
text = re.sub(END_PATTERN, END_CODE, text)
return text
def strip_tags(text: str) -> str:
text = re.sub(START_PATTERN, '', text)
text = re.sub(END_PATTERN, '', text)
return text
def truncate(string: str, length: int) -> str: def truncate(string: str, length: int) -> str:
if len(string) > length: if len(string) > length:
return string[:length - 1] + "" return string[: length - 1] + ""
return string return string
def print_out(*args, **kwargs):
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, **kwargs)
def print_json(data: Any): def print_json(data: Any):
print(json.dumps(data)) click.echo(json.dumps(data))
def print_err(*args, **kwargs): def print_log(message: Any):
args = [f"<red>{arg}</red>" for arg in args] click.secho(message, err=True, dim=True)
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_log(*args, **kwargs): def print_table(headers: List[str], data: List[List[str]]):
args = [f"<dim>{a}</dim>" for a in args] widths = [[len(cell) for cell in row] for row in data + [headers]]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args] widths = [max(width) for width in zip(*widths)]
print(*args, file=sys.stderr, **kwargs) underlines = ["-" * width for width in widths]
def print_row(row: List[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(cell.ljust(width), nl=False)
click.echo(" ", nl=False)
click.echo()
print_row(headers)
print_row(underlines)
for row in data:
print_row(row)
def print_video(video): def print_paged(
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "") label: str,
length = utils.format_duration(video["lengthSeconds"]) generator: Generator[T, Any, Any],
print_fn: Callable[[T], None],
channel = f"<blue>{video['creator']['displayName']}</blue>" if video["creator"] else "" page_size: int,
playing = f"playing <blue>{video['game']['name']}</blue>" if video["game"] else "" total_count: Optional[int] = None,
):
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video['id']}"
print_out(f"<b>Video {video['id']}</b>")
print_out(f"<green>{video['title']}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
if video["description"]:
print_out(f"Description: {video['description']}")
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue> ")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
def print_paged_videos(generator, page_size, total_count):
iterator = iter(generator) iterator = iter(generator)
page = list(islice(iterator, page_size)) page = list(islice(iterator, page_size))
@ -114,48 +58,89 @@ def print_paged_videos(generator, page_size, total_count):
last = first + len(page) - 1 last = first + len(page) - 1
while True: while True:
print_out("-" * 80) click.echo("-" * 80)
print_out() click.echo()
for video in page: for item in page:
print_video(video) print_fn(item)
print_out()
last = first + len(page) - 1 last = first + len(page) - 1
print_out("-" * 80) click.echo("-" * 80)
print_out(f"<yellow>Videos {first}-{last} of {total_count}</yellow>") click.echo(f"{label} {first}-{last} of {total_count or '???'}")
first = first + len(page) first = first + len(page)
last = first + 1 last = first + 1
page = list(islice(iterator, page_size)) page = list(islice(iterator, page_size))
if not page or not _continue(): if not page or not prompt_continue():
break break
def print_clip(clip): def print_video(video: Video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = blue(video["creator"]["displayName"]) if video["creator"] else ""
playing = f"playing {blue(video['game']['name'])}" if video["game"] else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video['id']}"
click.secho(f"Video {video['id']}", bold=True)
click.secho(video["title"], fg="green")
if channel or playing:
click.echo(" ".join([channel, playing]))
if video["description"]:
click.echo(f"Description: {video['description']}")
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)
click.echo()
def print_video_compact(video: Video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
click.echo(f"{bold(id)} {date} {green(title)} {blue(game)}")
def print_clip(clip: Clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "") published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"]) length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"] channel = clip["broadcaster"]["displayName"]
playing = ( playing = f"playing {blue(clip['game']['name'])}" if clip["game"] else ""
f"playing <blue>{clip['game']['name']}</blue>"
if clip["game"] else "" click.echo(f"Clip {bold(clip['slug'])}")
click.secho(clip["title"], fg="green")
click.echo(f"{blue(channel)} {playing}")
click.echo(
f"Published {blue(published_at)}"
+ f" Length: {blue(length)}"
+ f" Views: {blue(clip['viewCount'])}"
) )
click.secho(clip["url"], italic=True)
print_out(f"Clip <b>{clip['slug']}</b>") click.echo()
print_out(f"<green>{clip['title']}</green>")
print_out(f"<blue>{channel}</blue> {playing}")
print_out(
f"Published <blue>{published_at}</blue>" +
f" Length: <blue>{length}</blue>" +
f" Views: <blue>{clip["viewCount"]}</blue>"
)
print_out(f"<i>{clip['url']}</i>")
def _continue(): def print_clip_compact(clip: Clip):
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.") slug = clip["slug"]
date = clip["createdAt"][:10]
title = truncate(clip["title"], 50).ljust(50)
game = clip["game"]["name"] if clip["game"] else ""
game = truncate(game, 30).ljust(30)
click.echo(f"{date} {green(title)} {blue(game)} {bold(slug)}")
def prompt_continue():
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
try: try:
input() input()
@ -163,3 +148,30 @@ def _continue():
return False return False
return True return True
# Shorthand functions for coloring output
def blue(text: Any) -> str:
return click.style(text, fg="blue")
def cyan(text: Any) -> str:
return click.style(text, fg="cyan")
def green(text: Any) -> str:
return click.style(text, fg="green")
def yellow(text: Any) -> str:
return click.style(text, fg="yellow")
def bold(text: Any) -> str:
return click.style(text, bold=True)
def dim(text: Any) -> str:
return click.style(text, dim=True)

131
twitchdl/playlists.py Normal file
View File

@ -0,0 +1,131 @@
"""
Parse and manipulate m3u8 playlists.
"""
from dataclasses import dataclass
from typing import Generator, List, Optional, OrderedDict
import click
import m3u8
from twitchdl import utils
from twitchdl.output import bold, dim
@dataclass
class Playlist:
name: str
resolution: Optional[str]
url: str
@dataclass
class Vod:
index: int
"""Ordinal number of the VOD in the playlist"""
path: str
"""Path part of the VOD URL"""
duration: int
"""Segment duration in seconds"""
def parse_playlists(playlists_m3u8: str) -> List[Playlist]:
def _parse(source: str) -> Generator[Playlist, None, None]:
document = load_m3u8(source)
for p in document.playlists:
if p.stream_info.resolution:
name = p.media[0].name
resolution = "x".join(str(r) for r in p.stream_info.resolution)
else:
name = p.media[0].group_id
resolution = None
yield Playlist(name, resolution, p.uri)
# Move audio to bottom, it has no resolution
return sorted(_parse(playlists_m3u8), key=lambda p: p.resolution is None)
def load_m3u8(playlist_m3u8: str) -> m3u8.M3U8:
return m3u8.loads(playlist_m3u8)
def enumerate_vods(
document: m3u8.M3U8,
start: Optional[int] = None,
end: Optional[int] = None,
) -> List[Vod]:
"""Extract VODs for download from document."""
vods = []
vod_start = 0
for index, segment in enumerate(document.segments):
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition:
vods.append(Vod(index, segment.uri, segment.duration))
vod_start = vod_end
return vods
def make_join_playlist(
playlist: m3u8.M3U8,
vods: List[Vod],
targets: List[str],
) -> m3u8.Playlist:
"""
Make a modified playlist which references downloaded VODs
Keep only the downloaded segments and skip the rest
"""
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip([v.path for v in vods], targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
return playlist
def select_playlist(playlists: List[Playlist], quality: Optional[str]) -> Playlist:
return (
select_playlist_by_name(playlists, quality)
if quality is not None
else select_playlist_interactive(playlists)
)
def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist:
if quality == "source":
return playlists[0]
for playlist in playlists:
if playlist.name == quality:
return playlist
available = ", ".join([p.name for p in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise click.ClickException(msg)
def select_playlist_interactive(playlists: List[Playlist]) -> Playlist:
click.echo("\nAvailable qualities:")
for n, playlist in enumerate(playlists):
if playlist.resolution:
click.echo(f"{n + 1}) {bold(playlist.name)} {dim(f'({playlist.resolution})')}")
else:
click.echo(f"{n + 1}) {bold(playlist.name)}")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
playlist = playlists[no - 1]
return playlist

View File

@ -1,12 +1,13 @@
import logging import logging
import time import time
from collections import deque from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from statistics import mean from statistics import mean
from typing import Dict, NamedTuple, Optional, Deque from typing import Deque, Dict, NamedTuple, Optional
from twitchdl.output import print_out import click
from twitchdl.output import blue
from twitchdl.utils import format_size, format_time from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -93,18 +94,28 @@ class Progress:
task = self.tasks[task_id] task = self.tasks[task_id]
if task.size != task.downloaded: if task.size != task.downloaded:
logger.warn(f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b.") logger.warn(
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
self.vod_downloaded_count += 1 self.vod_downloaded_count += 1
self.print() self.print()
def _calculate_total(self): def _calculate_total(self):
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None self.estimated_total = (
int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
)
def _calculate_progress(self): def _calculate_progress(self):
self.speed = self._calculate_speed() self.speed = self._calculate_speed()
self.progress_perc = int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0 self.progress_perc = (
self.remaining_time = int((self.estimated_total - self.progress_bytes) / self.speed) if self.estimated_total and self.speed else None int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
)
self.remaining_time = (
int((self.estimated_total - self.progress_bytes) / self.speed)
if self.estimated_total and self.speed
else None
)
def _calculate_speed(self): def _calculate_speed(self):
if len(self.samples) < 2: if len(self.samples) < 2:
@ -116,7 +127,7 @@ class Progress:
size = last_sample.downloaded - first_sample.downloaded size = last_sample.downloaded - first_sample.downloaded
duration = last_sample.timestamp - first_sample.timestamp duration = last_sample.timestamp - first_sample.timestamp
return size / duration return size / duration if duration > 0 else None
def print(self): def print(self):
now = time.time() now = time.time()
@ -125,13 +136,20 @@ class Progress:
if now - self.last_printed < 0.1: if now - self.last_printed < 0.1:
return return
progress = " ".join([ click.echo(f"\rDownloaded {self.vod_downloaded_count}/{self.vod_count} VODs", nl=False)
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs", click.secho(f" {self.progress_perc}%", fg="blue", nl=False)
f"<blue>{self.progress_perc}%</blue>",
f"of <blue>~{format_size(self.estimated_total)}</blue>" if self.estimated_total else "", if self.estimated_total is not None:
f"at <blue>{format_size(self.speed)}/s</blue>" if self.speed else "", total = f"~{format_size(self.estimated_total)}"
f"ETA <blue>{format_time(self.remaining_time)}</blue>" if self.remaining_time is not None else "", click.echo(f" of {blue(total)}", nl=False)
])
if self.speed is not None:
speed = f"{format_size(self.speed)}/s"
click.echo(f" at {blue(speed)}", nl=False)
if self.remaining_time is not None:
click.echo(f" ETA {blue(format_time(self.remaining_time))}", nl=False)
click.echo(" ", nl=False)
print_out(f"\r{progress} ", end="")
self.last_printed = now self.last_printed = now

View File

@ -2,27 +2,109 @@
Twitch API access. Twitch API access.
""" """
import httpx
import json import json
import click import logging
import time
from typing import Any, Dict, Generator, List, Literal, Mapping, Optional, Tuple, TypedDict, Union
import click
import httpx
from typing import Dict
from twitchdl import CLIENT_ID from twitchdl import CLIENT_ID
from twitchdl.entities import Data
from twitchdl.exceptions import ConsoleError from twitchdl.exceptions import ConsoleError
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: List[VideoQuality]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
class GQLError(click.ClickException): class GQLError(click.ClickException):
def __init__(self, errors: list[str]): def __init__(self, errors: List[str]):
message = "GraphQL query failed." message = "GraphQL query failed."
for error in errors: for error in errors:
message += f"\n* {error}" message += f"\n* {error}"
super().__init__(message) super().__init__(message)
def authenticated_post(url, data=None, json=None, headers={}): Content = Union[str, bytes]
headers['Client-ID'] = CLIENT_ID Headers = Dict[str, str]
response = httpx.post(url, data=data, json=json, headers=headers)
def authenticated_post(
url: str,
*,
json: Any = None,
content: Optional[Content] = None,
auth_token: Optional[str] = None,
):
headers = {"Client-ID": CLIENT_ID}
if auth_token is not None:
headers["authorization"] = f"OAuth {auth_token}"
response = request("POST", url, content=content, json=json, headers=headers)
if response.status_code == 400: if response.status_code == 400:
data = response.json() data = response.json()
raise ConsoleError(data["message"]) raise ConsoleError(data["message"])
@ -32,16 +114,50 @@ def authenticated_post(url, data=None, json=None, headers={}):
return response return response
def request(
method: str,
url: str,
json: Any = None,
content: Optional[Content] = None,
headers: Optional[Mapping[str, str]] = None,
):
with httpx.Client() as client:
request = client.build_request(method, url, json=json, content=content, headers=headers)
log_request(request)
start = time.time()
response = client.send(request)
duration = time.time() - start
log_response(response, duration)
return response
logger = logging.getLogger(__name__)
def log_request(request: httpx.Request):
logger.debug(f"--> {request.method} {request.url}")
if request.content:
logger.debug(f"--> {request.content}")
def log_response(response: httpx.Response, duration: float):
request = response.request
duration_ms = int(1000 * duration)
logger.debug(f"<-- {request.method} {request.url} HTTP {response.status_code} {duration_ms}ms")
if response.content:
logger.debug(f"<-- {response.content}")
def gql_post(query: str): def gql_post(query: str):
url = "https://gql.twitch.tv/gql" url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, data=query) response = authenticated_post(url, content=query)
gql_raise_on_error(response) gql_raise_on_error(response)
return response.json() return response.json()
def gql_query(query: str, headers: Dict[str, str] = {}): def gql_query(query: str, auth_token: Optional[str] = None):
url = "https://gql.twitch.tv/gql" url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, headers=headers) response = authenticated_post(url, json={"query": query}, auth_token=auth_token)
gql_raise_on_error(response) gql_raise_on_error(response)
return response.json() return response.json()
@ -61,6 +177,7 @@ VIDEO_FIELDS = """
broadcastType broadcastType
lengthSeconds lengthSeconds
game { game {
id
name name
} }
creator { creator {
@ -94,7 +211,7 @@ CLIP_FIELDS = """
""" """
def get_video(video_id: str): def get_video(video_id: str) -> Optional[Video]:
query = f""" query = f"""
{{ {{
video(id: "{video_id}") {{ video(id: "{video_id}") {{
@ -107,7 +224,7 @@ def get_video(video_id: str):
return response["data"]["video"] return response["data"]["video"]
def get_clip(slug: str): def get_clip(slug: str) -> Optional[Clip]:
query = f""" query = f"""
{{ {{
clip(slug: "{slug}") {{ clip(slug: "{slug}") {{
@ -120,7 +237,7 @@ def get_clip(slug: str):
return response["data"]["clip"] return response["data"]["clip"]
def get_clip_access_token(slug: str): def get_clip_access_token(slug: str) -> ClipAccessToken:
query = f""" query = f"""
{{ {{
"operationName": "VideoAccessToken_Clip", "operationName": "VideoAccessToken_Clip",
@ -140,7 +257,12 @@ def get_clip_access_token(slug: str):
return response["data"]["clip"] return response["data"]["clip"]
def get_channel_clips(channel_id: str, period: str, limit: int, after: str | None= None): def get_channel_clips(
channel_id: str,
period: ClipsPeriod,
limit: int,
after: Optional[str] = None,
):
""" """
List channel clips. List channel clips.
@ -177,8 +299,12 @@ def get_channel_clips(channel_id: str, period: str, limit: int, after: str | Non
return response["data"]["user"]["clips"] return response["data"]["user"]["clips"]
def channel_clips_generator(channel_id, period, limit): def channel_clips_generator(
def _generator(clips, limit): channel_id: str,
period: ClipsPeriod,
limit: int,
) -> Generator[Clip, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Clip, None, None]:
for clip in clips["edges"]: for clip in clips["edges"]:
if limit < 1: if limit < 1:
return return
@ -199,11 +325,10 @@ def channel_clips_generator(channel_id, period, limit):
return _generator(clips, limit) return _generator(clips, limit)
def channel_clips_generator_old(channel_id, period, limit): def channel_clips_generator_old(channel_id: str, period: ClipsPeriod, limit: int):
cursor = "" cursor = ""
while True: while True:
clips = get_channel_clips( clips = get_channel_clips(channel_id, period, limit, after=cursor)
channel_id, period, limit, after=cursor)
if not clips["edges"]: if not clips["edges"]:
break break
@ -222,10 +347,11 @@ def get_channel_videos(
limit: int, limit: int,
sort: str, sort: str,
type: str = "archive", type: str = "archive",
game_ids: list[str] | None = None, game_ids: Optional[List[str]] = None,
after: str | None = None after: Optional[str] = None,
): ):
game_ids = game_ids or [] game_ids = game_ids or []
game_ids_str = f"[{','.join(game_ids)}]"
query = f""" query = f"""
{{ {{
@ -236,7 +362,7 @@ def get_channel_videos(
sort: {sort.upper()}, sort: {sort.upper()},
after: "{after or ''}", after: "{after or ''}",
options: {{ options: {{
gameIDs: {game_ids} gameIDs: {game_ids_str}
}} }}
) {{ ) {{
totalCount totalCount
@ -262,8 +388,16 @@ def get_channel_videos(
return response["data"]["user"]["videos"] return response["data"]["user"]["videos"]
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]): def channel_videos_generator(
def _generator(videos, max_videos): channel_id: str,
max_videos: int,
sort: VideosSort,
type: VideosType,
game_ids: Optional[List[str]] = None,
) -> Tuple[int, Generator[Video, None, None]]:
game_ids = game_ids or []
def _generator(videos: Data, max_videos: int) -> Generator[Video, None, None]:
for video in videos["edges"]: for video in videos["edges"]:
if max_videos < 1: if max_videos < 1:
return return
@ -284,7 +418,7 @@ def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
return videos["totalCount"], _generator(videos, max_videos) return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id, auth_token=None): def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessToken:
query = f""" query = f"""
{{ {{
videoPlaybackAccessToken( videoPlaybackAccessToken(
@ -301,12 +435,8 @@ def get_access_token(video_id, auth_token=None):
}} }}
""" """
headers = {}
if auth_token is not None:
headers['authorization'] = f'OAuth {auth_token}'
try: try:
response = gql_query(query, headers=headers) response = gql_query(query, auth_token=auth_token)
return response["data"]["videoPlaybackAccessToken"] return response["data"]["videoPlaybackAccessToken"]
except httpx.HTTPStatusError as error: except httpx.HTTPStatusError as error:
# Provide a more useful error message when server returns HTTP 401 # Provide a more useful error message when server returns HTTP 401
@ -323,24 +453,27 @@ def get_access_token(video_id, auth_token=None):
raise raise
def get_playlists(video_id, access_token): def get_playlists(video_id: str, access_token: AccessToken) -> str:
""" """
For a given video return a playlist which contains possible video qualities. For a given video return a playlist which contains possible video qualities.
""" """
url = f"https://usher.ttvnw.net/vod/{video_id}" url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(url, params={ response = httpx.get(
"nauth": access_token['value'], url,
"nauthsig": access_token['signature'], params={
"allow_audio_only": "true", "nauth": access_token["value"],
"allow_source": "true", "nauthsig": access_token["signature"],
"player": "twitchweb", "allow_audio_only": "true",
}) "allow_source": "true",
"player": "twitchweb",
},
)
response.raise_for_status() response.raise_for_status()
return response.content.decode('utf-8') return response.content.decode("utf-8")
def get_game_id(name): def get_game_id(name: str):
query = f""" query = f"""
{{ {{
game(name: "{name.strip()}") {{ game(name: "{name.strip()}") {{
@ -355,30 +488,29 @@ def get_game_id(name):
return game["id"] return game["id"]
def get_video_chapters(video_id: str): def get_video_chapters(video_id: str) -> List[Chapter]:
query = { query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo", "operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables": "variables": {
{
"includePrivate": False, "includePrivate": False,
"videoID": video_id "videoID": video_id,
}, },
"extensions": "extensions": {
{ "persistedQuery": {
"persistedQuery":
{
"version": 1, "version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41" "sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41",
} }
} },
} }
response = gql_post(json.dumps(query)) response = gql_post(json.dumps(query))
return list(_chapter_nodes(response["data"]["video"]["moments"])) return list(_chapter_nodes(response["data"]["video"]["moments"]))
def _chapter_nodes(collection): def _chapter_nodes(moments: Data) -> Generator[Chapter, None, None]:
for edge in collection["edges"]: for edge in moments["edges"]:
node = edge["node"] node = edge["node"]
node["game"] = node["details"]["game"]
del node["details"]
del node["moments"] del node["moments"]
yield node yield node

View File

@ -1,5 +1,8 @@
import re import re
import unicodedata import unicodedata
from typing import Optional, Union
import click
def _format_size(value: float, digits: int, unit: str): def _format_size(value: float, digits: int, unit: str):
@ -9,7 +12,7 @@ def _format_size(value: float, digits: int, unit: str):
return f"{int(value)}{unit}" return f"{int(value)}{unit}"
def format_size(bytes_: int, digits: int = 1): def format_size(bytes_: Union[int, float], digits: int = 1):
if bytes_ < 1024: if bytes_ < 1024:
return _format_size(bytes_, digits, "B") return _format_size(bytes_, digits, "B")
@ -24,7 +27,7 @@ def format_size(bytes_: int, digits: int = 1):
return _format_size(mega / 1024, digits, "GB") return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds: int | float) -> str: def format_duration(total_seconds: Union[int, float]) -> str:
total_seconds = int(total_seconds) total_seconds = int(total_seconds)
hours = total_seconds // 3600 hours = total_seconds // 3600
remainder = total_seconds % 3600 remainder = total_seconds % 3600
@ -40,7 +43,7 @@ def format_duration(total_seconds: int | float) -> str:
return f"{seconds} sec" return f"{seconds} sec"
def format_time(total_seconds: int | float, force_hours: bool = False) -> str: def format_time(total_seconds: Union[int, float], force_hours: bool = False) -> str:
total_seconds = int(total_seconds) total_seconds = int(total_seconds)
hours = total_seconds // 3600 hours = total_seconds // 3600
remainder = total_seconds % 3600 remainder = total_seconds % 3600
@ -53,15 +56,10 @@ def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
return f"{minutes:02}:{seconds:02}" return f"{minutes:02}:{seconds:02}"
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int: def read_int(msg: str, min: int, max: int, default: Optional[int] = None) -> int:
if default:
msg = msg + f" [default {default}]"
msg += ": "
while True: while True:
try: try:
val = input(msg) val = click.prompt(msg, default=default, type=int)
if default and not val: if default and not val:
return default return default
if min <= int(val) <= max: if min <= int(val) <= max:
@ -71,32 +69,32 @@ def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
def slugify(value: str) -> str: def slugify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value)) value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r'[^\w\s_-]', '', value) value = re.sub(r"[^\w\s_-]", "", value)
value = re.sub(r'[\s_-]+', '_', value) value = re.sub(r"[\s_-]+", "_", value)
return value.strip("_").lower() return value.strip("_").lower()
def titlify(value: str) -> str: def titlify(value: str) -> str:
value = unicodedata.normalize('NFKC', str(value)) value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value) value = re.sub(r"[^\w\s\[\]().-]", "", value)
value = re.sub(r'\s+', ' ', value) value = re.sub(r"\s+", " ", value)
return value.strip() return value.strip()
VIDEO_PATTERNS = [ VIDEO_PATTERNS = [
r"^(?P<id>\d+)?$", r"^(?P<id>\d+)?$",
r"^https://(www.)?twitch.tv/videos/(?P<id>\d+)(\?.+)?$", r"^https://(www\.|m\.)?twitch\.tv/videos/(?P<id>\d+)(\?.+)?$",
] ]
CLIP_PATTERNS = [ CLIP_PATTERNS = [
r"^(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)$", r"^(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)$",
r"^https://(www.)?twitch.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$", r"^https://(www\.|m\.)?twitch\.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips.twitch.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$", r"^https://clips\.twitch\.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
] ]
def parse_video_identifier(identifier: str) -> str | None: def parse_video_identifier(identifier: str) -> Optional[str]:
"""Given a video ID or URL returns the video ID, or null if not matched""" """Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS: for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier) match = re.match(pattern, identifier)
@ -104,7 +102,7 @@ def parse_video_identifier(identifier: str) -> str | None:
return match.group("id") return match.group("id")
def parse_clip_identifier(identifier: str) -> str | None: def parse_clip_identifier(identifier: str) -> Optional[str]:
"""Given a clip slug or URL returns the clip slug, or null if not matched""" """Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS: for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier) match = re.match(pattern, identifier)