Compare commits

...

114 Commits

Author SHA1 Message Date
Ivan Habunek
8c68132ddb
Trim line when printing table, simplify code 2024-08-30 13:39:41 +02:00
Ivan Habunek
75423c7671
Show playlists in a table 2024-08-30 13:34:19 +02:00
Ivan Habunek
7dae0e23cf
Show description with some spacing
Looks nicer
2024-08-30 13:34:08 +02:00
Ivan Habunek
dc99ee51bc
Improve logging a bit when downloading 2024-08-30 11:58:15 +02:00
Ivan Habunek
2c9420c43d
Update changelog 2024-08-30 11:47:14 +02:00
Ivan Habunek
4a86cb16c8
Use relative paths in generated m3u8 playlist
Since we're using relative paths for initi segments, and paths are
relative to the path where the playlist is located, this seems sensible.
Allows the folder to be moved, and the playlist will still work.
2024-08-30 11:44:07 +02:00
Ivan Habunek
cfefae1e69
Fix query to fetch HD qualities 2024-08-30 11:43:48 +02:00
Ivan Habunek
ac7cdba28e
Download init segments
These seem to occur in hd quality playlists like 1440p.
2024-08-30 11:42:54 +02:00
Ivan Habunek
2feef136ca
Ignore video files 2024-08-30 11:42:21 +02:00
Ivan Habunek
d85aeade86
Update changelog 2024-08-30 08:25:52 +02:00
Ivan Habunek
4db4285d9a
Handle videoQualities being null 2024-08-30 08:06:31 +02:00
Ivan Habunek
707b2da934
Improve types 2024-08-30 08:05:16 +02:00
Ivan Habunek
2846925c91
Allow newer versions of m3u8 2024-08-30 08:05:03 +02:00
Ivan Habunek
8f5d7f022f
Reinstate "VODs" in progress output 2024-08-30 07:59:51 +02:00
Ivan Habunek
ff4a514b8a
Handle video URLs containing account name
fixes #162
2024-08-30 07:55:58 +02:00
Ivan Habunek
f57612ffcc
Refactor download_all to work when count is unknown 2024-08-30 07:37:50 +02:00
Ivan Habunek
5be97b9e13
Fix tests 2024-08-28 15:21:22 +02:00
Ivan Habunek
b6e7f8b36c
Make Progress work when file count is not known 2024-08-28 15:18:43 +02:00
Ivan Habunek
42f7a9a1a5
Fix a crash when downloading clips 2024-08-28 14:21:22 +02:00
Ivan Habunek
789d3d1939
Add --target-dir option to clips command 2024-08-28 13:15:01 +02:00
Ivan Habunek
07efac1ae7
Add --verbose flag for verbose logging
Don't log HTTP payloads unless --verbose flag is given. Always logging
HTTP payloads tends to make the log unreadable.
2024-08-28 12:58:36 +02:00
Ivan Habunek
a808b7d8ec
Don't check if file exists in download_file
This is done outside the function.
2024-08-28 12:39:56 +02:00
Ivan Habunek
936c6a9da1
Don't stop downloading if one download fails 2024-08-28 12:33:05 +02:00
Ivan Habunek
7184feacee
Move download_file to http 2024-08-28 11:07:25 +02:00
Ivan Habunek
5679e66270
Improve naming 2024-08-28 11:02:12 +02:00
Ivan Habunek
1658aba124
Embrace pathlib 2024-08-28 10:59:23 +02:00
Ivan Habunek
29d3f7fec2
Extract file naming 2024-08-28 10:32:40 +02:00
Ivan Habunek
91e0447681
Remove unnecessary braces 2024-08-28 09:53:06 +02:00
Ivan Habunek
f9e60082ba
Improve downloading logic
Follow redirects to avoid saving an intermediate response, and raise an
error on invalid status to avoid saving an error response.
2024-08-28 09:52:15 +02:00
Ivan Habunek
141ecb7f29
Move entities to entites.py 2024-08-24 20:23:34 +02:00
Ivan Habunek
52b96aab15
Simplify persisted queries 2024-08-24 20:08:01 +02:00
Ivan Habunek
1bdf6a2c02
Remove unused function 2024-08-24 20:05:32 +02:00
Ivan Habunek
38636e2b21
Use ansi escape code to clear line 2024-05-31 12:57:51 +02:00
Ivan Habunek
3fe1faa18e
Update changelog 2024-05-19 09:11:45 +02:00
KryptonicDragon
a99a472ad3 add quotes to gql query 2024-05-19 09:09:02 +02:00
Ivan Habunek
47d62bc471
Improve types 2024-04-29 09:08:19 +02:00
Ivan Habunek
de95384e6b
Fix tests 2024-04-28 10:16:33 +02:00
Ivan Habunek
aac450a5bc
Calculate progress only when printing progress 2024-04-28 10:13:47 +02:00
Ivan Habunek
9549679679
Make Progress not a dataclass 2024-04-28 10:09:53 +02:00
Ivan Habunek
35e974bb45
Upgrade m3u8 dependency 2024-04-28 09:30:24 +02:00
Ivan Habunek
b8e3809810
Print a note if no ids given 2024-04-28 08:02:01 +02:00
Ivan Habunek
cf580fde09
Update changelog 2024-04-27 20:23:35 +02:00
Ivan Habunek
68c9e644a8
Sort playlists 2024-04-27 20:22:53 +02:00
Ivan Habunek
ace4427caa
Print playlists in a table 2024-04-27 20:22:53 +02:00
Ivan Habunek
97f48f7108
Improve playlist parsing
Better support for "enhanced broadcast" streams

issue #154
2024-04-27 20:22:53 +02:00
Ivan Habunek
f9e553c61f
Support styling in print_table 2024-04-27 20:22:53 +02:00
Ivan Habunek
4fac6c11c5
Delete generated website when cleaning 2024-04-27 20:22:52 +02:00
Ivan Habunek
125bc693f8
Update changelog 2024-04-25 07:33:05 +02:00
Ivan Habunek
8a7fdad22f
Test m patterns 2024-04-25 07:31:52 +02:00
Alex Grant
c00a9c3597 Add support for m dot urls 2024-04-25 07:29:59 +02:00
Ivan Habunek
0f17d92a8c
Update changelog 2024-04-24 14:54:31 +02:00
Ivan Habunek
ee1e0ce853
Upgrade actions/setup-python to v5 2024-04-24 14:52:42 +02:00
Ivan Habunek
1c878bbca8
Update changelog 2024-04-24 14:51:08 +02:00
Ivan Habunek
941440de41
Upgrade actions/checkout to v4 2024-04-24 14:50:50 +02:00
Ivan Habunek
c0eae623a4
Test clips 2024-04-24 14:45:49 +02:00
Ivan Habunek
f4d0643b07
Fix flaky test 2024-04-24 14:41:48 +02:00
Ivan Habunek
ea4b714343
Test videos command 2024-04-24 14:39:17 +02:00
Ivan Habunek
f815934e15
Simplify and test get_game_ids 2024-04-24 14:39:12 +02:00
Ivan Habunek
5aa323e3e5
Log response content 2024-04-24 14:11:22 +02:00
Ivan Habunek
3d03658850
Add tests 2024-04-24 08:59:01 +02:00
Ivan Habunek
69b848d341
Respect --dry-run when downloading videos 2024-04-24 08:58:58 +02:00
Ivan Habunek
2422871d70
Add request logging 2024-04-24 08:43:45 +02:00
Ivan Habunek
44890b4101
Pass auth_token instead of headers 2024-04-24 08:43:00 +02:00
Ivan Habunek
9aa108acbf
Add type hints for post 2024-04-24 08:42:01 +02:00
Ivan Habunek
3fa8bcef73
Pass request as content instead of data
data works, but should be used for form data passed as a dict, while
content takes bytes or str
2024-04-24 08:41:19 +02:00
Ivan Habunek
3c0f8a8ece
Add some cli tests 2024-04-24 08:38:55 +02:00
Ivan Habunek
7845cf6f72
Improve typing 2024-04-24 08:11:33 +02:00
Ivan Habunek
6ed98fa4ef
Fix tuple typing to work on py 3.8 2024-04-24 08:10:56 +02:00
Ivan Habunek
d777f0e98a
Set up testing on github 2024-04-24 08:10:20 +02:00
Ivan Habunek
ad0f0d2a41
Update changelog 2024-04-23 18:14:22 +02:00
Ivan Habunek
1057fff61a
Hopefully fix python 3.8 compat 2024-04-23 18:13:40 +02:00
Ivan Habunek
f1924715ed
Apply formatting 2024-04-23 17:35:46 +02:00
Ivan Habunek
ca38b9239f
Update changelog 2024-04-23 17:25:48 +02:00
Ivan Habunek
a0ad66ee69
Remove union types to fix python 3.8 compat 2024-04-23 17:15:18 +02:00
Ivan Habunek
e50499351b
Make printing progress a bit more readable 2024-04-12 09:43:04 +02:00
Ivan Habunek
0d3c3df2f8
Fix double colon in prompt 2024-04-10 08:47:38 +02:00
Ivan Habunek
446b4f9f91
Fix progress calc when video duration is 0
fixes #146
2024-04-10 08:47:11 +02:00
Ivan Habunek
bce573ef3c
Update changelog 2024-04-10 08:32:21 +02:00
Ivan Habunek
3ffa7acfef
Fix url 2024-04-10 08:25:03 +02:00
Ivan Habunek
30301c07b9
Fix tests 2024-04-10 08:04:21 +02:00
Ivan Habunek
c1b58e178f
Add clips --copact option 2024-04-06 10:43:12 +02:00
Ivan Habunek
a7ad4d8dcc
Extract playlist parsing code 2024-04-06 10:15:26 +02:00
Ivan Habunek
d6390bc7a2
Fix bug in accessing video qualities 2024-04-04 08:36:10 +02:00
Ivan Habunek
28f1977d1c
Apply ruff formatting 2024-04-04 08:20:10 +02:00
Ivan Habunek
be69e79b28
Add Chapter typed dict 2024-04-03 08:52:51 +02:00
Ivan Habunek
c0e66fc416
Add AccessToken typed dict 2024-04-03 08:14:50 +02:00
Ivan Habunek
a9facd46ac
Add Clip typed dict 2024-04-02 09:50:26 +02:00
Ivan Habunek
6a1900b628
Add Video typed dict 2024-04-02 09:44:50 +02:00
Ivan Habunek
3270d857b1
Extract print_paged, improve types 2024-04-01 09:40:54 +02:00
Ivan Habunek
3ae99fe159
Replace custom coloring fns with click.echo and style 2024-03-31 21:41:05 +02:00
Ivan Habunek
9cf3ec2f07
Replace print_out with click.echo 2024-03-30 15:36:53 +01:00
Ivan Habunek
64b88249f2
Remove print_err, unused 2024-03-30 10:11:44 +01:00
Ivan Habunek
5dcc868275
Imporve types 2024-03-30 07:52:43 +01:00
Ivan Habunek
11fbfd35fc
Print placeholders in twitch-dl info 2024-03-30 07:35:08 +01:00
Ivan Habunek
1c1e5955b8
Add video description to metadata 2024-03-29 09:45:04 +01:00
Ivan Habunek
8a097f5f93
Update docs and changelog 2024-03-29 08:49:35 +01:00
Ivan Habunek
7cc67133f5
Add download --concat option 2024-03-29 08:43:32 +01:00
Ivan Habunek
e228971f66
Remove requirements-dev.txt, included in pyproject 2024-03-29 08:43:32 +01:00
Ivan Habunek
96526c2ca5
Display exception errors in console 2024-03-29 08:43:32 +01:00
Ivan Habunek
9bf0f1b425
Use fstrings where possible 2024-03-29 08:43:31 +01:00
Ivan Habunek
77e75b5dad
Improve types 2024-03-29 08:43:31 +01:00
Ivan Habunek
cf1693b500
Fix env command 2024-03-29 08:43:31 +01:00
Ivan Habunek
52a7191d1f
Add docs and changelog 2024-03-29 08:43:31 +01:00
Ivan Habunek
24eb163d98
Regenerate docs 2024-03-29 08:43:30 +01:00
Ivan Habunek
13b063cbc6
Use backquotes to get code blocks in markdown 2024-03-29 08:43:30 +01:00
Ivan Habunek
43a4a6c4f5
Rework the docs script to work with click 2024-03-29 08:43:30 +01:00
Ivan Habunek
b0c21ac436
Convert to click 2024-03-29 08:43:30 +01:00
Ivan Habunek
229a849f87
Add dependency on click 2024-03-25 09:10:36 +01:00
Ivan Habunek
5589f33142
Add vermin config file 2024-03-25 09:10:36 +01:00
Ivan Habunek
40d52891f8
Don't hardcode version number 2024-03-23 08:55:56 +01:00
Ivan Habunek
c727d65694
Require python 3.8 2024-03-23 08:55:38 +01:00
Ivan Habunek
8592c41a50
Use an abstract base class for TokenBucket 2024-03-23 07:56:50 +01:00
Ivan Habunek
a9aefa871d
Migrate to pyproject.toml 2024-03-23 07:31:40 +01:00
Ryabin Sergey
65bf6a2b99 Add --dry-run option 2024-03-14 15:36:28 +01:00
46 changed files with 2309 additions and 1470 deletions

27
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: Run tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[test]"
- name: Run tests
run: |
pytest
- name: Validate minimum required version
run: |
vermin --no-tips twitchdl

2
.gitignore vendored
View File

@ -15,3 +15,5 @@ tmp/
/*.pyz
/pyrightconfig.json
/book
*.mp4
*.mkv

4
.vermin Normal file
View File

@ -0,0 +1,4 @@
[vermin]
only_show_violations = yes
show_tips = no
targets = 3.8

View File

@ -3,6 +3,60 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.5.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.5.0)
* Add support for HD video qualities (#163)
### [2.4.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.4.0)
* Add `clips --target-dir` option. Use in conjunction with `--download` to
specify target directory.
* Fix a crash when downloading clips (#160)
* Handle video URLs which contain the channel name (#162)
* Don't stop downloading clips if one download fails
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)
### [2.2.1 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.1)
* Fix compat with Python < 3.10 (#152)
* Fix division by zero in progress calculation when video duration is reported
as 0
### [2.2.0 (2024-04-10)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires Python 3.8+**
* Migrated to Click library for generating the commandline interface
* Add shell auto completion, see 'Shell completion' in docs.
* Add setting defaults via environment variables, see 'Environment variables' in
docs
* Add `download --concat` option to avoid using ffmeg for joinig vods and concat
them instead. This will produce a `.ts` file by default.
* Add `download --dry-run` option to skip actual download (thanks @metacoma)
* Add video description to metadata (#129)
* Add `clips --compact` option for listing clips in one-per-line mode
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)
* Fix error caused by twitch requiring https for the usher api (thanks

View File

@ -2,13 +2,12 @@
default : clean dist
dist :
python setup.py sdist --formats=gztar,zip
python setup.py bdist_wheel --python-tag=py3
dist:
python -m build
clean :
find . -name "*pyc" | xargs rm -rf $1
rm -rf build dist bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
rm -rf build dist book bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
bundle:
mkdir bundle
@ -25,7 +24,7 @@ publish :
twine upload dist/*.tar.gz dist/*.whl
coverage:
py.test --cov=toot --cov-report html tests/
pytest --cov=twitchdl --cov-report html tests/
man:
scdoc < twitch-dl.1.scd > twitch-dl.1.man

View File

@ -17,7 +17,7 @@ Resources
Requirements
------------
* Python 3.7 or later
* Python 3.8 or later
* [ffmpeg](https://ffmpeg.org/download.html), installed and on the system path
Quick start

View File

@ -1,3 +1,61 @@
2.5.0:
date: 2024-08-30
changes:
- "Add support for HD video qualities (#163)"
2.4.0:
date: 2024-08-30
changes:
- "Add `clips --target-dir` option. Use in conjunction with `--download` to specify target directory."
- "Fix a crash when downloading clips (#160)"
- "Handle video URLs which contain the channel name (#162)"
- "Don't stop downloading clips if one download fails"
2.3.1:
date: 2024-05-19
changes:
- "Fix fetching access token (#155, thanks @KryptonicDragon)"
2.3.0:
date: 2024-04-27
changes:
- "Show more playlist data when choosing quality"
- "Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams (#154)"
2.2.4:
date: 2024-04-25
changes:
- "Add m dot url support to video and clip regexes (thanks @localnerve)"
2.2.3:
date: 2024-04-24
changes:
- "Respect --dry-run option when downloading videos"
- "Add automated tests on github actions"
2.2.2:
date: 2024-04-23
changes:
- "Fix more compat issues Python < 3.10 (#152)"
2.2.1:
date: 2024-04-23
changes:
- "Fix compat with Python < 3.10 (#152)"
- "Fix division by zero in progress calculation when video duration is reported as 0"
2.2.0:
date: 2024-04-10
changes:
- "**Requires Python 3.8+**"
- "Migrated to Click library for generating the commandline interface"
- "Add shell auto completion, see 'Shell completion' in docs."
- "Add setting defaults via environment variables, see 'Environment variables' in docs"
- "Add `download --concat` option to avoid using ffmeg for joinig vods and concat them instead. This will produce a `.ts` file by default."
- "Add `download --dry-run` option to skip actual download (thanks @metacoma)"
- "Add video description to metadata (#129)"
- "Add `clips --compact` option for listing clips in one-per-line mode"
2.1.4:
date: 2024-01-06
changes:

View File

@ -9,6 +9,8 @@
- [twitch-dl clips](commands/clips.md)
- [twitch-dl info](commands/info.md)
- [twitch-dl env](commands/env.md)
- [Environemnt variables](environment_variables.md)
- [Shell completion](shell_completion.md)
- [Advanced](advanced.md)
[License](license.md)

View File

@ -3,6 +3,60 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.5.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.5.0)
* Add support for HD video qualities (#163)
### [2.4.0 (2024-08-30)](https://github.com/ihabunek/twitch-dl/releases/tag/2.4.0)
* Add `clips --target-dir` option. Use in conjunction with `--download` to
specify target directory.
* Fix a crash when downloading clips (#160)
* Handle video URLs which contain the channel name (#162)
* Don't stop downloading clips if one download fails
### [2.3.1 (2024-05-19)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.1)
* Fix fetching access token (#155, thanks @KryptonicDragon)
### [2.3.0 (2024-04-27)](https://github.com/ihabunek/twitch-dl/releases/tag/2.3.0)
* Show more playlist data when choosing quality
* Improve detection of 'source' quality for Twitch Enhanced Broadcast Streams
(#154)
### [2.2.4 (2024-04-25)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.4)
* Add m dot url support to video and clip regexes (thanks @localnerve)
### [2.2.3 (2024-04-24)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.3)
* Respect --dry-run option when downloading videos
* Add automated tests on github actions
### [2.2.2 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.2)
* Fix more compat issues Python < 3.10 (#152)
### [2.2.1 (2024-04-23)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.1)
* Fix compat with Python < 3.10 (#152)
* Fix division by zero in progress calculation when video duration is reported
as 0
### [2.2.0 (2024-04-10)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires Python 3.8+**
* Migrated to Click library for generating the commandline interface
* Add shell auto completion, see 'Shell completion' in docs.
* Add setting defaults via environment variables, see 'Environment variables' in
docs
* Add `download --concat` option to avoid using ffmeg for joinig vods and concat
them instead. This will produce a `.ts` file by default.
* Add `download --dry-run` option to skip actual download (thanks @metacoma)
* Add video description to metadata (#129)
* Add `clips --compact` option for listing clips in one-per-line mode
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)
* Fix error caused by twitch requiring https for the usher api (thanks

View File

@ -1,64 +1,57 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl clips
List or download clips for a channel.
List or download clips for given CHANNEL_NAME.
### USAGE
```
twitch-dl clips <channel_name> [FLAGS] [OPTIONS]
twitch-dl clips [OPTIONS] CHANNEL_NAME
```
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">&lt;channel_name&gt;</td>
<td>Name of the channel to list clips for.</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all videos, overrides --limit</td>
</tr>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON. Ignores <code>--pager</code>.</td>
</tr>
<tr>
<td class="code">-d, --download</td>
<td>Download all videos in given period (in source quality)</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-l, --limit</td>
<td>Number of videos to fetch (default 10, max 100)</td>
<td class="code">-a, --all</td>
<td>Fetch all clips, overrides --limit</td>
</tr>
<tr>
<td class="code">-P, --period</td>
<td>Period from which to return clips. Defaults to <code>all_time</code>. Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>.</td>
<td class="code">-c, --compact</td>
<td>Show clips in compact mode, one line per video</td>
</tr>
<tr>
<td class="code">-p, --pager</td>
<td class="code">-d, --download</td>
<td>Download clips in given period (in source quality)</td>
</tr>
<tr>
<td class="code">-l, --limit INTEGER</td>
<td>Number of clips to fetch. Defaults to 40 in compact mode, 10 otherwise.</td>
</tr>
<tr>
<td class="code">-p, --pager INTEGER</td>
<td>Number of clips to show per page. Disabled by default.</td>
</tr>
<tr>
<td class="code">-P, --period TEXT</td>
<td>Period from which to return clips Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>. [default: <code>all_time</code>]</td>
</tr>
<tr>
<td class="code">-t, --target-dir</td>
<td>Target directory when downloading clips [default: <code>.</code>]</td>
</tr>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -3,27 +3,49 @@
Download videos or clips.
Pass one or more video ID, clip slug or Twitch URL to download.
### USAGE
```
twitch-dl download <videos> [FLAGS] [OPTIONS]
twitch-dl download [OPTIONS] [IDS]...
```
### ARGUMENTS
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">&lt;videos&gt;</td>
<td>One or more video ID, clip slug or twitch URL to download.</td>
<td class="code">-a, --auth-token TEXT</td>
<td>Authentication token, passed to Twitch to access subscriber only VODs. Can be copied from the <code>auth_token</code> cookie in any browser logged in on Twitch.</td>
</tr>
</tbody>
</table>
### FLAGS
<tr>
<td class="code">-c, --chapter INTEGER</td>
<td>Download a single chapter of the video. Specify the chapter number or use the flag without a number to display a chapter select prompt.</td>
</tr>
<tr>
<td class="code">--concat</td>
<td>Do not use ffmpeg to join files, concat them instead. This will produce a .ts file by default.</td>
</tr>
<tr>
<td class="code">-d, --dry-run</td>
<td>Simulate the download provcess without actually downloading any files.</td>
</tr>
<tr>
<td class="code">-e, --end TEXT</td>
<td>Download video up to this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-f, --format TEXT</td>
<td>Video format to convert into, passed to ffmpeg as the target file extension. Defaults to <code>mkv</code>. If <code>--concat</code> is passed, defaults to <code>ts</code>.</td>
</tr>
<table>
<tbody>
<tr>
<td class="code">-k, --keep</td>
<td>Don&#x27;t delete downloaded VODs and playlists after merging.</td>
@ -38,56 +60,30 @@ twitch-dl download <videos> [FLAGS] [OPTIONS]
<td class="code">--overwrite</td>
<td>Overwrite the target file if it already exists without prompting.</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-w, --max-workers</td>
<td>Number of workers for downloading vods concurrently (default 5)</td>
<td class="code">-o, --output TEXT</td>
<td>Output file name template. See docs for details. [default: <code>{date}_{id}_{channel_login}_{title_slug}.{format}</code>]</td>
</tr>
<tr>
<td class="code">-s, --start</td>
<td>Download video from this time (hh:mm or hh:mm:ss)</td>
<td class="code">-q, --quality TEXT</td>
<td>Video quality, e.g. <code>720p</code>. Set to <code>source</code> to get best quality.</td>
</tr>
<tr>
<td class="code">-e, --end</td>
<td>Download video up to this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-f, --format</td>
<td>Video format to convert into, passed to ffmpeg as the target file extension. Defaults to <code>mkv</code>.</td>
</tr>
<tr>
<td class="code">-q, --quality</td>
<td>Video quality, e.g. 720p. Set to &#x27;source&#x27; to get best quality.</td>
</tr>
<tr>
<td class="code">-a, --auth-token</td>
<td>Authentication token, passed to Twitch to access subscriber only VODs. Can be copied from the &#x27;auth_token&#x27; cookie in any browser logged in on Twitch.</td>
</tr>
<tr>
<td class="code">-o, --output</td>
<td>Output file name template. See docs for details.</td>
</tr>
<tr>
<td class="code">-r, --rate-limit</td>
<td class="code">-r, --rate-limit TEXT</td>
<td>Limit the maximum download speed in bytes per second. Use &#x27;k&#x27; and &#x27;m&#x27; suffixes for kbps and mbps.</td>
</tr>
<tr>
<td class="code">-c, --chapter</td>
<td>Download a single chapter of the video. Specify the chapter number or use the flag without a number to display a chapter select prompt.</td>
<td class="code">-s, --start TEXT</td>
<td>Download video from this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-w, --max-workers INTEGER</td>
<td>Number of workers for downloading vods concurrently [default: <code>5</code>]</td>
</tr>
</tbody>
</table>

View File

@ -6,7 +6,7 @@ Print environment information for inclusion in bug reports.
### USAGE
```
twitch-dl env
twitch-dl env [OPTIONS]
```
<!-- ------------------- generated docs end ------------------- -->

View File

@ -6,27 +6,16 @@ Print information for a given Twitch URL, video ID or clip slug.
### USAGE
```
twitch-dl info <video> [FLAGS]
twitch-dl info [OPTIONS] ID
```
### ARGUMENTS
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">&lt;video&gt;</td>
<td>Video ID, clip slug, or URL</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON</td>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -1,73 +1,56 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl videos
List videos for a channel.
List or download clips for given CHANNEL_NAME.
### USAGE
```
twitch-dl videos <channel_name> [FLAGS] [OPTIONS]
twitch-dl videos [OPTIONS] CHANNEL_NAME
```
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">&lt;channel_name&gt;</td>
<td>Name of the channel to list videos for.</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all videos, overrides --limit</td>
</tr>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON. Ignores <code>--pager</code>.</td>
</tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show videos in compact mode, one line per video</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-g, --game</td>
<td class="code">-a, --all</td>
<td>Fetch all clips, overrides --limit</td>
</tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show videos in compact mode, one line per video</td>
</tr>
<tr>
<td class="code">-l, --limit INTEGER</td>
<td>Number of videos to fetch. Defaults to 40 in compact mode, 10 otherwise.</td>
</tr>
<tr>
<td class="code">-p, --pager INTEGER</td>
<td>Number of videos to show per page. Disabled by default.</td>
</tr>
<tr>
<td class="code">-g, --game TEXT</td>
<td>Show videos of given game (can be given multiple times)</td>
</tr>
<tr>
<td class="code">-l, --limit</td>
<td>Number of videos to fetch. Defaults to 40 in copmpact mode, 10 otherwise.</td>
<td class="code">-s, --sort TEXT</td>
<td>Sorting order of videos Possible values: <code>views</code>, <code>time</code>. [default: <code>time</code>]</td>
</tr>
<tr>
<td class="code">-s, --sort</td>
<td>Sorting order of videos. Defaults to <code>time</code>. Possible values: <code>views</code>, <code>time</code>.</td>
<td class="code">-t, --type TEXT</td>
<td>Broadcast type Possible values: <code>archive</code>, <code>highlight</code>, <code>upload</code>. [default: <code>archive</code>]</td>
</tr>
<tr>
<td class="code">-t, --type</td>
<td>Broadcast type. Defaults to <code>archive</code>. Possible values: <code>archive</code>, <code>highlight</code>, <code>upload</code>.</td>
</tr>
<tr>
<td class="code">-p, --pager</td>
<td>Print videos in pages. Ignores <code>--limit</code>. Defaults to 10.</td>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -0,0 +1,15 @@
# Environment variables
> Introduced in twitch-dl 2.2.0
twitch-dl allows setting defaults for parameters via environment variables.
Environment variables should be named `TWITCH_DL_<COMMAND_NAME>_<OPTION_NAME>`.
For example, when invoking `twitch-dl download`, if you always set `--quality
source` you can set the following environment variable to make this the
default:
```
TWITCH_DL_DOWNLOAD_QUALITY="source"
```

View File

@ -1,6 +1,6 @@
# Installation
twitch-dl requires **Python 3.7** or later.
twitch-dl requires **Python 3.8** or later.
## Prerequisite: FFmpeg

31
docs/shell_completion.md Normal file
View File

@ -0,0 +1,31 @@
# Shell completion
> Introduced in twitch-dl 2.2.0
twitch-dl uses [Click shell completion](https://click.palletsprojects.com/en/8.1.x/shell-completion/) which works on Bash, Fish and Zsh.
To enable completion, twitch-dl must be [installed](./installation.html) as a command and available by ivoking `twitch-dl`. Then follow the instructions for your shell.
**Bash**
Add to `~/.bashrc`:
```
eval "$(_TWITCH_DL_COMPLETE=bash_source twitch-dl)"
```
**Fish**
Add to `~/.config/fish/completions/twitch-dl.fish`:
```
_TWITCH_DL_COMPLETE=fish_source twitch-dl | source
```
**Zsh**
Add to `~/.zshrc`:
```
eval "$(_TWITCH_DL_COMPLETE=zsh_source twitch-dl)"
```

64
pyproject.toml Normal file
View File

@ -0,0 +1,64 @@
[build-system]
requires = ["setuptools>=64", "setuptools_scm>=8"]
build-backend = "setuptools.build_meta"
[project]
name = "twitch-dl"
authors = [{ name="Ivan Habunek", email="ivan@habunek.com" }]
description = "Quickly download videos from twitch.tv from the comort of your terminal emulator"
keywords=["twitch", "vod", "video", "download"]
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.8"
dynamic = ["version"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
]
dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=3.0.0,<7.0.0",
]
[tool.setuptools]
packages = [
"twitchdl",
"twitchdl.commands",
]
[tool.setuptools_scm]
[project.optional-dependencies]
dev = [
"build",
"pytest",
"pyyaml",
"setuptools",
"twine",
"vermin",
]
test = [
"pytest",
"vermin",
]
[project.urls]
"Homepage" = "https://twitch-dl.bezdomni.net/"
"Source" = "https://github.com/ihabunek/twitch-dl"
[project.scripts]
twitch-dl = "twitchdl.cli:cli"
[tool.pyright]
typeCheckingMode = "strict"
pythonVersion = "3.8"
[tool.ruff]
line-length = 100
target-version = "py38"

View File

@ -1,5 +0,0 @@
pytest
pyyaml
setuptools
twine
wheel

View File

@ -4,13 +4,16 @@
Auto-generates documentation from command defs in console.py.
"""
import click
import html
import os
import re
import shutil
import textwrap
from twitchdl.console import COMMANDS
from click import Command
from twitchdl.cli import cli
START_MARKER = "<!-- ------------------- generated docs start ------------------- -->"
@ -19,8 +22,11 @@ END_MARKER = "<!-- ------------------- generated docs end ------------------- --
def main():
update_changelog()
for command in COMMANDS:
update_docs(command)
parent_ctx = click.Context(cli, info_name="twitch-dl")
for name, command in cli.commands.items():
ctx = click.Context(cli, info_name=name, parent=parent_ctx)
update_docs(command, ctx)
def update_changelog():
@ -31,9 +37,9 @@ def update_changelog():
shutil.copy(source, target)
def update_docs(command):
def update_docs(command: Command, ctx: click.Context):
path = os.path.join("docs", "commands", f"{command.name}.md")
content = render_command(command)
content = render_command(command, ctx)
if not os.path.exists(path):
print(f"Creating: {path}")
@ -45,87 +51,29 @@ def update_docs(command):
write(path, content)
def render_command(command):
def render_command(command: Command, ctx: click.Context):
content = START_MARKER
content += f"\n# twitch-dl {command.name}\n\n"
content += command.description + "\n\n"
content += render_usage(command)
content += render_arguments(command)
content += render_flags(command)
content += render_options(command)
if command.help:
content += command.help + "\n\n"
content += render_usage(ctx, command)
content += render_options(ctx, command)
return content
def render_usage(command):
arguments = get_arguments(command)
arguments = " ".join(f"<{name}>" for [name, _] in arguments)
flags = get_flags(command)
options = get_options(command)
def render_usage(ctx: click.Context, command: Command):
content = "### USAGE\n\n"
content += "```\n"
content += f"twitch-dl {command.name} {arguments}"
if flags:
content += " [FLAGS]"
if options:
content += " [OPTIONS]"
content += command.get_usage(ctx).replace("Usage: ", "")
content += "\n```\n\n"
return content
def render_arguments(command):
arguments = get_arguments(command)
if not arguments:
return ""
content = "### ARGUMENTS\n\n"
content += "<table>\n"
content += "<tbody>"
for [name, params] in arguments:
content += textwrap.dedent(f"""
<tr>
<td class="code">&lt;{escape(name)}&gt;</td>
<td>{escape(params['help'])}</td>
</tr>
""")
content += "</tbody>\n"
content += "</table>\n\n"
return content
def render_flags(command):
flags = get_flags(command)
if not flags:
return ""
content = "### FLAGS\n\n"
content += "<table>\n"
content += "<tbody>"
for [names, params] in flags:
names = ", ".join(f"{name}" for name in names)
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(names)}</td>
<td>{escape(params['help'])}</td>
</tr>
""")
content += "</tbody>\n"
content += "</table>\n\n"
return content
def render_options(command):
options = get_options(command)
def render_options(ctx, command: Command):
options = list(get_options(command))
if not options:
return ""
@ -134,12 +82,11 @@ def render_options(command):
content += "<table>\n"
content += "<tbody>"
for [names, params] in options:
names = ", ".join(f"{name}" for name in names)
for opts, help in options:
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(names)}</td>
<td>{escape(params['help'])}{choices(params)}</td>
<td class="code">{escape(opts)}</td>
<td>{escape(help)}</td>
</tr>
""")
content += "</tbody>\n"
@ -148,37 +95,39 @@ def render_options(command):
return content
def choices(params):
if "choices" in params:
choices = ", ".join(code(c) for c in params["choices"])
def get_options(command: Command):
for option in command.params:
if isinstance(option, click.Option):
opts = ", ".join(option.opts)
opts += option_type(option)
help = option.help or ""
help = re.sub(r"\s+", " ", help)
help += choices(option)
if option.default:
help += f" [default: `{option.default}`]"
yield opts, help
def option_type(option: click.Option):
match option.type:
case click.types.StringParamType():
return " TEXT"
case click.types.Choice():
return " TEXT"
case click.types.IntParamType():
return " INTEGER"
case _:
return ""
def choices(option: click.Option):
if isinstance(option.type, click.Choice):
choices = ", ".join(f"`{c}`" for c in option.type.choices)
return f" Possible values: {choices}."
return ""
def get_arguments(command):
return [
[names[0], options]
for names, options in command.arguments
if len(names) == 1 and not names[0].startswith("-")
]
def get_flags(command):
return [
[names, options]
for names, options in command.arguments
if names[0].startswith("-") and "type" not in options
]
def get_options(command):
return [
[names, options]
for names, options in command.arguments
if names[0].startswith("-") and "type" in options
]
def read(path):
with open(path, "r") as f:
return f.read()
@ -189,10 +138,6 @@ def write(path, content):
return f.write(content)
def code(string):
return f"<code>{string}</code>"
def escape(text: str):
text = html.escape(text)
text = re.sub(r"`([\S]+)`", "<code>\\1</code>", text)

View File

@ -11,12 +11,10 @@ Usage: tag_version [version]
import subprocess
import sys
import textwrap
import yaml
import twitchdl
from datetime import date
from os import path
from pkg_resources import get_distribution
import yaml
path = path.join(path.dirname(path.dirname(path.abspath(__file__))), "changelog.yaml")
with open(path, "r") as f:
@ -33,15 +31,6 @@ if not changelog_item:
print(f"Version `{version}` not found in changelog.", file=sys.stderr)
sys.exit(1)
if twitchdl.__version__ != version:
print(f"twitchdl.__version__ is `{twitchdl.__version__}`, expected {version}.", file=sys.stderr)
sys.exit(1)
dist_version = get_distribution('twitch-dl').version
if dist_version != version:
print(f"Version in setup.py is `{dist_version}`, expected {version}.", file=sys.stderr)
sys.exit(1)
release_date = changelog_item["date"]
changes = changelog_item["changes"]
description = changelog_item["description"] if "description" in changelog_item else None

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
long_description = """
Quickly download videos from twitch.tv.
Works simliarly to youtube-dl but downloads multiple VODs in parallel which
makes it faster.
"""
setup(
name="twitch-dl",
version="2.1.4",
description="Twitch downloader",
long_description=long_description.strip(),
author="Ivan Habunek",
author_email="ivan@habunek.com",
url="https://github.com/ihabunek/twitch-dl/",
project_urls={
"Documentation": "https://twitch-dl.bezdomni.net/"
},
keywords="twitch vod video download",
license="GPLv3",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python :: 3",
],
packages=find_packages(),
python_requires=">=3.7",
install_requires=[
"m3u8>=1.0.0,<4.0.0",
"httpx>=0.17.0,<1.0.0",
],
entry_points={
"console_scripts": [
"twitch-dl=twitchdl.console:main",
],
}
)

View File

@ -3,9 +3,13 @@ These tests depend on the channel having some videos and clips published.
"""
import httpx
import m3u8
import pytest
from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.commands.videos import get_game_ids
from twitchdl.exceptions import ConsoleError
from twitchdl.playlists import enumerate_vods, load_m3u8, parse_playlists
TEST_CHANNEL = "bananasaurus_rex"
@ -17,22 +21,25 @@ def test_get_videos():
video_id = videos["edges"][0]["node"]["id"]
video = twitch.get_video(video_id)
assert video is not None
assert video["id"] == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
assert "value" in access_token
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
playlists_txt = twitch.get_playlists(video_id, access_token)
assert playlists_txt.startswith("#EXTM3U")
name, res, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")
playlists = parse_playlists(playlists_txt)
playlist_url = playlists[0].url
playlist = m3u8.loads(playlist)
vod_path = playlist.segments[0].uri
assert vod_path == "0.ts"
playlist_txt = httpx.get(playlist_url).text
assert playlist_txt.startswith("#EXTM3U")
playlist_m3u8 = load_m3u8(playlist_txt)
vods = enumerate_vods(playlist_m3u8)
assert vods[0].path == "0.ts"
def test_get_clips():
@ -45,6 +52,19 @@ def test_get_clips():
slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(slug)
assert clip is not None
assert clip["slug"] == slug
assert get_clip_authenticated_url(slug, "source")
def test_get_games():
assert get_game_ids([]) == []
assert get_game_ids(["Bioshock"]) == ["15866"]
assert get_game_ids(["Bioshock", "Portal"]) == ["15866", "6187"]
def test_get_games_not_found():
with pytest.raises(ConsoleError) as ex:
get_game_ids(["the game which does not exist"])
assert str(ex.value) == "Game 'the game which does not exist' not found"

156
tests/test_cli.py Normal file
View File

@ -0,0 +1,156 @@
import json
import pytest
from click.testing import CliRunner, Result
from twitchdl import cli
@pytest.fixture(scope="session")
def runner():
return CliRunner(mix_stderr=False)
def assert_ok(result: Result):
if result.exit_code != 0:
raise AssertionError(
f"Command failed with exit code {result.exit_code}\nStderr: {result.stderr}"
)
def test_info_video(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595"])
assert_ok(result)
assert "Frost Fatales 2024 Day 1" in result.stdout
assert "frozenflygone playing Tomb Raider" in result.stdout
def test_info_video_json(runner: CliRunner):
result = runner.invoke(cli.info, ["2090131595", "--json"])
assert_ok(result)
video = json.loads(result.stdout)
assert video["title"] == "Frost Fatales 2024 Day 1"
assert video["game"] == {"id": "2770", "name": "Tomb Raider"}
assert video["creator"] == {"login": "frozenflygone", "displayName": "frozenflygone"}
def test_info_clip(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank"])
assert_ok(result)
assert "AGDQ Crashes during Bioshock run" in result.stdout
assert "GamesDoneQuick playing BioShock" in result.stdout
def test_info_clip_json(runner: CliRunner):
result = runner.invoke(cli.info, ["PoisedTalentedPuddingChefFrank", "--json"])
assert_ok(result)
clip = json.loads(result.stdout)
assert clip["slug"] == "PoisedTalentedPuddingChefFrank"
assert clip["title"] == "AGDQ Crashes during Bioshock run"
assert clip["game"] == {"id": "15866", "name": "BioShock"}
assert clip["broadcaster"] == {"displayName": "GamesDoneQuick", "login": "gamesdonequick"}
def test_info_not_found(runner: CliRunner):
result = runner.invoke(cli.info, ["banana"])
assert result.exit_code == 1
assert "Clip banana not found" in result.stderr
result = runner.invoke(cli.info, ["12345"])
assert result.exit_code == 1
assert "Video 12345 not found" in result.stderr
result = runner.invoke(cli.info, [""])
assert result.exit_code == 1
assert "Invalid input" in result.stderr
def test_download_clip(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"PoisedTalentedPuddingChefFrank",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert (
"Found: AGDQ Crashes during Bioshock run by GamesDoneQuick, playing BioShock (30 sec)"
in result.stdout
)
assert (
"Target: 2020-01-10_3099545841_gamesdonequick_agdq_crashes_during_bioshock_run.mp4"
in result.stdout
)
assert "Dry run, clip not downloaded." in result.stdout
def test_download_video(runner: CliRunner):
result = runner.invoke(
cli.download,
[
"2090131595",
"-q",
"source",
"--dry-run",
],
)
assert_ok(result)
assert "Found: Frost Fatales 2024 Day 1 by frozenflygone" in result.stdout
assert (
"Output: 2024-03-14_2090131595_frozenflygone_frost_fatales_2024_day_1.mkv" in result.stdout
)
assert "Dry run, video not downloaded." in result.stdout
def test_videos(runner: CliRunner):
result = runner.invoke(cli.videos, ["gamesdonequick", "--json"])
assert_ok(result)
videos = json.loads(result.stdout)
assert videos["count"] == 10
assert videos["totalCount"] > 0
video = videos["videos"][0]
result = runner.invoke(cli.videos, "gamesdonequick")
assert_ok(result)
assert f"Video {video['id']}" in result.stdout
assert video["title"] in result.stdout
result = runner.invoke(cli.videos, ["gamesdonequick", "--compact"])
assert_ok(result)
assert video["id"] in result.stdout
assert video["title"][:60] in result.stdout
def test_videos_channel_not_found(runner: CliRunner):
result = runner.invoke(cli.videos, ["doesnotexisthopefully"])
assert result.exit_code == 1
assert result.stderr.strip() == "Error: Channel doesnotexisthopefully not found"
def test_clips(runner: CliRunner):
result = runner.invoke(cli.clips, ["gamesdonequick", "--json"])
assert_ok(result)
clips = json.loads(result.stdout)
clip = clips[0]
result = runner.invoke(cli.clips, "gamesdonequick")
assert_ok(result)
assert f"Clip {clip['slug']}" in result.stdout
assert clip["title"] in result.stdout
result = runner.invoke(cli.clips, ["gamesdonequick", "--compact"])
assert_ok(result)
assert clip["slug"] in result.stdout
assert clip["title"][:60] in result.stdout

View File

@ -1,35 +1,39 @@
import pytest
from twitchdl.utils import parse_video_identifier, parse_clip_identifier
from twitchdl.utils import parse_clip_identifier, parse_video_identifier
TEST_VIDEO_PATTERNS = [
("702689313", "702689313"),
("702689313", "https://twitch.tv/videos/702689313"),
("702689313", "https://www.twitch.tv/videos/702689313"),
("702689313", "https://m.twitch.tv/videos/702689313"),
("2223719525", "https://www.twitch.tv/r0dn3y/video/2223719525"),
]
TEST_CLIP_PATTERNS = {
("AbrasivePlayfulMangoMau5", "AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://clips.twitch.tv/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://www.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://m.twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("AbrasivePlayfulMangoMau5", "https://twitch.tv/dracul1nx/clip/AbrasivePlayfulMangoMau5"),
("HungryProudRadicchioDoggo", "HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://clips.twitch.tv/HungryProudRadicchioDoggo"),
("HungryProudRadicchioDoggo", "https://www.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://m.twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("HungryProudRadicchioDoggo", "https://twitch.tv/bananasaurus_rex/clip/HungryProudRadicchioDoggo?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://www.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
("GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ", "https://m.twitch.tv/dracul1nx/clip/GloriousColdbloodedTortoiseRuleFive-E017utJ4DZmHVpfQ?filter=clips&range=7d&sort=time"),
}
@pytest.mark.parametrize("expected,input", TEST_VIDEO_PATTERNS)
def test_video_patterns(expected, input):
def test_video_patterns(expected: str, input: str):
assert parse_video_identifier(input) == expected
@pytest.mark.parametrize("expected,input", TEST_CLIP_PATTERNS)
def test_clip_patterns(expected, input):
def test_clip_patterns(expected: str, input: str):
assert parse_clip_identifier(input) == expected

View File

@ -8,8 +8,8 @@ def test_initial_values():
assert progress.progress_perc == 0
assert progress.remaining_time is None
assert progress.speed is None
assert progress.vod_count == 10
assert progress.vod_downloaded_count == 0
assert progress.file_count == 10
assert progress.downloaded_count == 0
def test_downloaded():
@ -23,26 +23,31 @@ def test_downloaded():
assert progress.progress_perc == 0
progress.advance(1, 100)
progress._recalculate()
assert progress.downloaded == 100
assert progress.progress_bytes == 100
assert progress.progress_perc == 11
progress.advance(2, 200)
progress._recalculate()
assert progress.downloaded == 300
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 450
assert progress.progress_bytes == 450
assert progress.progress_perc == 50
progress.advance(1, 50)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 500
assert progress.progress_perc == 55
progress.abort(2)
progress._recalculate()
assert progress.downloaded == 500
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
@ -52,6 +57,7 @@ def test_downloaded():
progress.advance(1, 150)
progress.advance(2, 300)
progress.advance(3, 150)
progress._recalculate()
assert progress.downloaded == 1100
assert progress.progress_bytes == 900
@ -71,12 +77,15 @@ def test_estimated_total():
assert progress.estimated_total is None
progress.start(1, 12000)
progress._recalculate()
assert progress.estimated_total == 12000 * 3
progress.start(2, 11000)
progress._recalculate()
assert progress.estimated_total == 11500 * 3
progress.start(3, 10000)
progress._recalculate()
assert progress.estimated_total == 11000 * 3
@ -87,16 +96,16 @@ def test_vod_downloaded_count():
progress.start(2, 100)
progress.start(3, 100)
assert progress.vod_downloaded_count == 0
assert progress.downloaded_count == 0
progress.advance(1, 100)
progress.end(1)
assert progress.vod_downloaded_count == 1
assert progress.downloaded_count == 1
progress.advance(2, 100)
progress.end(2)
assert progress.vod_downloaded_count == 2
assert progress.downloaded_count == 2
progress.advance(3, 100)
progress.end(3)
assert progress.vod_downloaded_count == 3
assert progress.downloaded_count == 3

View File

@ -1,3 +1,8 @@
__version__ = "2.1.4"
from importlib import metadata
try:
__version__ = metadata.version("twitch-dl")
except metadata.PackageNotFoundError:
__version__ = "0.0.0"
CLIENT_ID = "kd1unb4b3q4t58fwlpcbzcbnm76a8fp"

View File

@ -1,3 +1,3 @@
from twitchdl.console import main
from twitchdl.cli import cli
main()
cli()

422
twitchdl/cli.py Normal file
View File

@ -0,0 +1,422 @@
import logging
import platform
import re
import sys
from pathlib import Path
from typing import Optional, Tuple
import click
from twitchdl import __version__
from twitchdl.entities import DownloadOptions
from twitchdl.naming import DEFAULT_OUTPUT_TEMPLATE
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context
# https://click.palletsprojects.com/en/8.1.x/api/#context
CONTEXT = dict(
# Enable using environment variables to set options
auto_envvar_prefix="TWITCH_DL",
# Add shorthand -h for invoking help
help_option_names=["-h", "--help"],
# Always show default values for options
show_default=True,
# Make help a bit wider
max_content_width=100,
)
json_option = click.option(
"--json",
is_flag=True,
default=False,
help="Print data as JSON rather than human readable text",
)
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: Optional[int]):
if value is not None and value <= 0:
raise click.BadParameter("must be greater than 0")
return value
def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> Optional[int]:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
if not value:
return None
parts = [int(p) for p in value.split(":")]
if not 2 <= len(parts) <= 3:
raise click.BadParameter("invalid time")
hours = parts[0]
minutes = parts[1]
seconds = parts[2] if len(parts) > 2 else 0
if hours < 0 or not (0 <= minutes <= 59) or not (0 <= seconds <= 59):
raise click.BadParameter("invalid time")
return hours * 3600 + minutes * 60 + seconds
def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> Optional[int]:
if not value:
return None
match = re.search(r"^([0-9]+)(k|m|)$", value, flags=re.IGNORECASE)
if not match:
raise click.BadParameter("must be an integer, followed by an optional 'k' or 'm'")
amount = int(match.group(1))
unit = match.group(2)
if unit == "k":
return amount * 1024
if unit == "m":
return amount * 1024 * 1024
return amount
@click.group(context_settings=CONTEXT)
@click.option("--debug/--no-debug", default=False, help="Enable debug logging to stderr")
@click.option("--verbose/--no-verbose", default=False, help="More verbose debug logging")
@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output")
@click.version_option(package_name="twitch-dl")
@click.pass_context
def cli(ctx: click.Context, color: bool, debug: bool, verbose: bool):
"""twitch-dl - twitch.tv downloader
https://twitch-dl.bezdomni.net/
"""
ctx.color = color
if debug:
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARN)
logging.getLogger("httpcore").setLevel(logging.WARN)
@cli.command()
@click.argument("channel_name")
@click.option(
"-a",
"--all",
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-c",
"--compact",
help="Show clips in compact mode, one line per video",
is_flag=True,
)
@click.option(
"-d",
"--download",
help="Download clips in given period (in source quality)",
is_flag=True,
)
@click.option(
"-l",
"--limit",
help="Number of clips to fetch. Defaults to 40 in compact mode, 10 otherwise.",
type=int,
callback=validate_positive,
)
@click.option(
"-p",
"--pager",
help="Number of clips to show per page. Disabled by default.",
type=int,
callback=validate_positive,
is_flag=False,
flag_value=10,
)
@click.option(
"-P",
"--period",
help="Period from which to return clips",
default="all_time",
type=click.Choice(["last_day", "last_week", "last_month", "all_time"]),
)
@click.option(
"-t",
"--target-dir",
help="Target directory when downloading clips",
type=click.Path(
file_okay=False,
readable=False,
writable=True,
path_type=Path,
),
default=Path(),
)
@json_option
def clips(
channel_name: str,
all: bool,
compact: bool,
download: bool,
json: bool,
limit: Optional[int],
pager: Optional[int],
period: ClipsPeriod,
target_dir: Path,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.clips import clips
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
clips(
channel_name,
all=all,
compact=compact,
download=download,
json=json,
limit=limit,
pager=pager,
period=period,
target_dir=target_dir,
)
@cli.command()
@click.argument("ids", nargs=-1)
@click.option(
"-a",
"--auth-token",
help="""Authentication token, passed to Twitch to access subscriber only
VODs. Can be copied from the `auth_token` cookie in any browser logged
in on Twitch.""",
)
@click.option(
"-c",
"--chapter",
help="""Download a single chapter of the video. Specify the chapter number
or use the flag without a number to display a chapter select prompt.
""",
type=int,
is_flag=False,
flag_value=0,
)
@click.option(
"--concat",
is_flag=True,
help="""Do not use ffmpeg to join files, concat them instead. This will
produce a .ts file by default.""",
)
@click.option(
"-d",
"--dry-run",
help="Simulate the download provcess without actually downloading any files.",
is_flag=True,
)
@click.option(
"-e",
"--end",
help="Download video up to this time (hh:mm or hh:mm:ss)",
callback=validate_time,
)
@click.option(
"-f",
"--format",
help="""Video format to convert into, passed to ffmpeg as the target file
extension. Defaults to `mkv`. If `--concat` is passed, defaults to
`ts`.""",
)
@click.option(
"-k",
"--keep",
help="Don't delete downloaded VODs and playlists after merging.",
is_flag=True,
)
@click.option(
"--no-join",
help="Don't run ffmpeg to join the downloaded vods, implies --keep.",
is_flag=True,
)
@click.option(
"--overwrite",
help="Overwrite the target file if it already exists without prompting.",
is_flag=True,
)
@click.option(
"-o",
"--output",
help="Output file name template. See docs for details.",
default=DEFAULT_OUTPUT_TEMPLATE,
)
@click.option(
"-q",
"--quality",
help="Video quality, e.g. `720p`. Set to `source` to get best quality.",
)
@click.option(
"-r",
"--rate-limit",
help="""Limit the maximum download speed in bytes per second. Use 'k' and
'm' suffixes for kbps and mbps.""",
callback=validate_rate,
)
@click.option(
"-s",
"--start",
help="Download video from this time (hh:mm or hh:mm:ss)",
callback=validate_time,
)
@click.option(
"-w",
"--max-workers",
help="Number of workers for downloading vods concurrently",
type=int,
default=5,
)
def download(
ids: Tuple[str, ...],
auth_token: Optional[str],
chapter: Optional[int],
concat: bool,
dry_run: bool,
end: Optional[int],
format: str,
keep: bool,
no_join: bool,
overwrite: bool,
output: str,
quality: Optional[str],
rate_limit: Optional[int],
start: Optional[int],
max_workers: int,
):
"""Download videos or clips.
Pass one or more video ID, clip slug or Twitch URL to download.
"""
from twitchdl.commands.download import download
if not format:
format = "ts" if concat else "mkv"
options = DownloadOptions(
auth_token=auth_token,
chapter=chapter,
concat=concat,
dry_run=dry_run,
end=end,
format=format,
keep=keep,
no_join=no_join,
overwrite=overwrite,
output=output,
quality=quality,
rate_limit=rate_limit,
start=start,
max_workers=max_workers,
)
download(list(ids), options)
@cli.command()
def env():
"""Print environment information for inclusion in bug reports."""
click.echo(f"twitch-dl {__version__}")
click.echo(f"Python {sys.version}")
click.echo(f"Platform: {platform.platform()}")
@cli.command()
@click.argument("id")
@json_option
def info(id: str, json: bool):
"""Print information for a given Twitch URL, video ID or clip slug."""
from twitchdl.commands.info import info
info(id, json=json)
@cli.command()
@click.argument("channel_name")
@click.option(
"-a",
"--all",
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-c",
"--compact",
help="Show videos in compact mode, one line per video",
is_flag=True,
)
@click.option(
"-l",
"--limit",
help="Number of videos to fetch. Defaults to 40 in compact mode, 10 otherwise.",
type=int,
callback=validate_positive,
)
@click.option(
"-p",
"--pager",
help="Number of videos to show per page. Disabled by default.",
type=int,
callback=validate_positive,
is_flag=False,
flag_value=10,
)
@click.option(
"-g",
"--game",
"games_tuple",
help="Show videos of given game (can be given multiple times)",
multiple=True,
)
@click.option(
"-s",
"--sort",
help="Sorting order of videos",
default="time",
type=click.Choice(["views", "time"]),
)
@click.option(
"-t",
"--type",
help="Broadcast type",
default="archive",
type=click.Choice(["archive", "highlight", "upload"]),
)
@json_option
def videos(
channel_name: str,
all: bool,
compact: bool,
games_tuple: Tuple[str, ...],
json: bool,
limit: Optional[int],
pager: Optional[int],
sort: VideosSort,
type: VideosType,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.videos import videos
# Click provides a tuple, make it a list instead
games = list(games_tuple)
videos(
channel_name,
all=all,
compact=compact,
games=games,
json=json,
limit=limit,
pager=pager,
sort=sort,
type=type,
)

View File

@ -1,13 +0,0 @@
from .clips import clips
from .download import download
from .env import env
from .info import info
from .videos import videos
__all__ = [
clips,
download,
env,
info,
videos,
]

View File

@ -1,110 +1,109 @@
import re
import sys
from itertools import islice
from os import path
from pathlib import Path
from typing import Callable, Generator, List, Optional
import click
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.output import print_out, print_clip, print_json
from twitchdl.entities import VideoQuality
from twitchdl.http import download_file
from twitchdl.output import green, print_clip, print_clip_compact, print_json, print_paged, yellow
from twitchdl.twitch import Clip, ClipsPeriod
def clips(args):
def clips(
channel_name: str,
*,
all: bool = False,
compact: bool = False,
download: bool = False,
json: bool = False,
limit: Optional[int] = None,
pager: Optional[int] = None,
period: ClipsPeriod = "all_time",
target_dir: Path = Path(),
):
# Set different defaults for limit for compact display
default_limit = 40 if compact else 10
# Ignore --limit if --pager or --all are given
limit = sys.maxsize if args.all or args.pager else args.limit
limit = sys.maxsize if all or pager else (limit or default_limit)
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
generator = twitch.channel_clips_generator(channel_name, period, limit)
if args.json:
if json:
return print_json(list(generator))
if args.download:
return _download_clips(generator)
if download:
return _download_clips(target_dir, generator)
if args.pager:
print(args)
return _print_paged(generator, args.pager)
print_fn = print_clip_compact if compact else print_clip
return _print_all(generator, args)
if pager:
return print_paged("Clips", generator, print_fn, pager)
return _print_all(generator, print_fn, all)
def _continue():
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
try:
input()
except KeyboardInterrupt:
return False
return True
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"]
def _target_filename(clip: Clip, video_qualities: List[VideoQuality]):
url = video_qualities[0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
if not match:
raise ValueError(f"Failed parsing date from: {clip['createdAt']}")
date = "".join(match.groups())
name = "_".join([
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
])
name = "_".join(
[
date,
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
]
)
return "{}.{}".format(name, ext)
return f"{name}.{ext}"
def _download_clips(generator):
def _download_clips(target_dir: Path, generator: Generator[Clip, None, None]):
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
for clip in generator:
target = _target_filename(clip)
# videoQualities can be null in some circumstances, see:
# https://github.com/ihabunek/twitch-dl/issues/160
if not clip["videoQualities"]:
continue
if path.exists(target):
print_out("Already downloaded: <green>{}</green>".format(target))
target = target_dir / _target_filename(clip, clip["videoQualities"])
if target.exists():
click.echo(f"Already downloaded: {green(target)}")
else:
url = get_clip_authenticated_url(clip["slug"], "source")
print_out("Downloading: <yellow>{}</yellow>".format(target))
download_file(url, target)
try:
url = get_clip_authenticated_url(clip["slug"], "source")
click.echo(f"Downloading: {yellow(target)}")
download_file(url, target)
except Exception as ex:
click.secho(ex, err=True, fg="red")
def _print_all(generator, args):
def _print_all(
generator: Generator[Clip, None, None],
print_fn: Callable[[Clip], None],
all: bool,
):
for clip in generator:
print_out()
print_clip(clip)
print_fn(clip)
if not args.all:
print_out(
"\n<dim>There may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.</dim>"
if not all:
click.secho(
"\nThere may be more clips. "
+ "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
)
def _print_paged(generator, page_size):
iterator = iter(generator)
page = list(islice(iterator, page_size))
first = 1
last = first + len(page) - 1
while True:
print_out("-" * 80)
print_out()
for clip in page:
print_clip(clip)
print_out()
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Clips {}-{}</yellow>".format(first, last))
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not _continue():
break

View File

@ -1,176 +1,44 @@
import asyncio
import httpx
import m3u8
import os
import platform
import re
import shlex
import shutil
import subprocess
import tempfile
from os import path
from pathlib import Path
from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode
from typing import List, Optional
from urllib.parse import urlencode, urlparse
import click
import httpx
from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.entities import DownloadOptions
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.output import print_out
from twitchdl.http import download_all, download_file
from twitchdl.naming import clip_filename, video_filename
from twitchdl.output import blue, bold, green, print_log, yellow
from twitchdl.playlists import (
enumerate_vods,
get_init_sections,
load_m3u8,
make_join_playlist,
parse_playlists,
select_playlist,
)
from twitchdl.twitch import Chapter, ClipAccessToken, Video
def _parse_playlists(playlists_m3u8):
playlists = m3u8.loads(playlists_m3u8)
def download(ids: List[str], args: DownloadOptions):
if not ids:
print_log("No IDs to downlad given")
return
for p in sorted(playlists.playlists, key=lambda p: p.stream_info.resolution is None):
if p.stream_info.resolution:
name = p.media[0].name
description = "x".join(str(r) for r in p.stream_info.resolution)
else:
name = p.media[0].group_id
description = None
yield name, description, p.uri
def _get_playlist_by_name(playlists, quality):
if quality == "source":
_, _, uri = playlists[0]
return uri
for name, _, uri in playlists:
if name == quality:
return uri
available = ", ".join([name for (name, _, _) in playlists])
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
raise ConsoleError(msg)
def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
print_out("{}) <b>{}</b> <dim>({})</dim>".format(n + 1, name, resolution))
else:
print_out("{}) <b>{}</b>".format(n + 1, name))
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path, target, overwrite, video):
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
"file:{}".format(target),
]
if overwrite:
command.append("-y")
print_out("<dim>{}</dim>".format(" ".join(command)))
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
def _video_target_filename(video, args):
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
subs = {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video["publishedAt"],
"format": args.format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _clip_target_filename(clip, args):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
"""Extract unique VOD paths for download from playlist."""
files = []
vod_start = 0
for segment in playlist.segments:
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition and segment.uri not in files:
files.append(segment.uri)
vod_start = vod_end
return files
def _crete_temp_dir(base_uri: str) -> str:
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
temp_dir = Path(tempfile.gettempdir(), "twitch-dl", path)
temp_dir.mkdir(parents=True, exist_ok=True)
return str(temp_dir)
def download(args):
for video_id in args.videos:
for video_id in ids:
download_one(video_id, args)
def download_one(video: str, args):
def download_one(video: str, args: DownloadOptions):
video_id = utils.parse_video_identifier(video)
if video_id:
return _download_video(video_id, args)
@ -179,11 +47,62 @@ def download_one(video: str, args):
if clip_slug:
return _download_clip(clip_slug, args)
raise ConsoleError("Invalid input: {}".format(video))
raise ConsoleError(f"Invalid input: {video}")
def _get_clip_url(clip, quality):
qualities = clip["videoQualities"]
def _join_vods(playlist_path: Path, target: Path, overwrite: bool, video: Video):
description = video["description"] or ""
description = description.strip()
command: List[str] = [
"ffmpeg",
"-i",
str(playlist_path),
"-c",
"copy",
"-metadata",
f"artist={video['creator']['displayName']}",
"-metadata",
f"title={video['title']}",
"-metadata",
f"description={description}",
"-metadata",
"encoded_by=twitch-dl",
"-stats",
"-loglevel",
"warning",
f"file:{target}",
]
if overwrite:
command.append("-y")
click.secho(f"{shlex.join(command)}", dim=True)
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: List[Path], target: Path):
tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + [str(p) for p in vod_paths]
with open(target, "wb") as target_file:
result = subprocess.run(command, stdout=target_file)
if result.returncode != 0:
raise ConsoleError(f"Joining files failed: {result.stderr}")
def _crete_temp_dir(base_uri: str) -> Path:
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
temp_dir = Path(tempfile.gettempdir(), "twitch-dl", path)
temp_dir.mkdir(parents=True, exist_ok=True)
return temp_dir
def _get_clip_url(access_token: ClipAccessToken, quality: Optional[str]) -> str:
qualities = access_token["videoQualities"]
# Quality given as an argument
if quality:
@ -196,162 +115,181 @@ def _get_clip_url(clip, quality):
return q["sourceURL"]
available = ", ".join([str(q["quality"]) for q in qualities])
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
# Ask user to select quality
print_out("\nAvailable qualities:")
click.echo("\nAvailable qualities:")
for n, q in enumerate(qualities):
print_out("{}) {} [{} fps]".format(n + 1, q["quality"], q["frameRate"]))
print_out()
click.echo(f"{n + 1}) {bold(q['quality'])} [{q['frameRate']} fps]")
click.echo()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
selected_quality = qualities[no - 1]
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug, quality):
print_out("<dim>Fetching access token...</dim>")
def get_clip_authenticated_url(slug: str, quality: Optional[str]):
print_log("Fetching access token...")
access_token = twitch.get_clip_access_token(slug)
if not access_token:
raise ConsoleError("Access token not found for slug '{}'".format(slug))
raise ConsoleError(f"Access token not found for slug '{slug}'")
url = _get_clip_url(access_token, quality)
query = urlencode({
"sig": access_token["playbackAccessToken"]["signature"],
"token": access_token["playbackAccessToken"]["value"],
})
query = urlencode(
{
"sig": access_token["playbackAccessToken"]["signature"],
"token": access_token["playbackAccessToken"]["value"],
}
)
return "{}?{}".format(url, query)
return f"{url}?{query}"
def _download_clip(slug: str, args) -> None:
print_out("<dim>Looking up clip...</dim>")
def _download_clip(slug: str, args: DownloadOptions) -> None:
print_log("Looking up clip...")
clip = twitch.get_clip(slug)
game = clip["game"]["name"] if clip["game"] else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
raise ConsoleError(f"Clip '{slug}' not found")
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip["title"],
clip["broadcaster"]["displayName"],
game,
utils.format_duration(clip["durationSeconds"])
))
title = clip["title"]
user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"])
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
target = _clip_target_filename(clip, args)
print_out("Target: <blue>{}</blue>".format(target))
target = Path(clip_filename(clip, args.output))
click.echo(f"Target: {blue(target)}")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
if response.lower().strip() not in ["", "y"]:
raise ConsoleError("Aborted")
if not args.overwrite and target.exists():
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality)
print_out("<dim>Selected URL: {}</dim>".format(url))
print_log(f"Selected URL: {url}")
print_out("<dim>Downloading clip...</dim>")
download_file(url, target)
print_out("Downloaded: <blue>{}</blue>".format(target))
if args.dry_run:
click.echo("Dry run, clip not downloaded.")
else:
print_log("Downloading clip...")
download_file(url, target)
click.echo(f"Downloaded: {blue(target)}")
def _download_video(video_id, args) -> None:
def _download_video(video_id: str, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
print_out("<dim>Looking up video...</dim>")
print_log("Looking up video...")
video = twitch.get_video(video_id)
if not video:
raise ConsoleError("Video {} not found".format(video_id))
raise ConsoleError(f"Video {video_id} not found")
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
video['title'], video['creator']['displayName']))
click.echo(f"Found: {blue(video['title'])} by {yellow(video['creator']['displayName'])}")
target = _video_target_filename(video, args)
print_out("Output: <blue>{}</blue>".format(target))
target = Path(video_filename(video, args.format, args.output))
click.echo(f"Output: {blue(target)}")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
if response.lower().strip() not in ["", "y"]:
raise ConsoleError("Aborted")
if not args.overwrite and target.exists():
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
args.overwrite = True
# Chapter select or manual offset
start, end = _determine_time_range(video_id, args)
print_out("<dim>Fetching access token...</dim>")
print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
print_out("<dim>Fetching playlists...</dim>")
playlists_m3u8 = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8))
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality
else _select_playlist_interactive(playlists))
print_log("Fetching playlists...")
playlists_text = twitch.get_playlists(video_id, access_token)
playlists = parse_playlists(playlists_text)
playlist = select_playlist(playlists, args.quality)
print_out("<dim>Fetching playlist...</dim>")
response = httpx.get(playlist_uri)
response.raise_for_status()
playlist = m3u8.loads(response.text)
print_log("Fetching playlist...")
vods_text = http_get(playlist.url)
vods_m3u8 = load_m3u8(vods_text)
vods = enumerate_vods(vods_m3u8, start, end)
base_uri = re.sub("/[^/]+$", "/", playlist_uri)
target_dir = _crete_temp_dir(base_uri)
vod_paths = _get_vod_paths(playlist, start, end)
# Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
f.write(playlists_m3u8)
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text)
print_out("\nDownloading {} VODs using {} workers to {}".format(
len(vod_paths), args.max_workers, target_dir))
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, "{:05d}.ts".format(k)) for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
# Make a modified playlist which references downloaded VODs
# Keep only the downloaded segments and skip the rest
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip(vod_paths, targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path)
if args.no_join:
print_out("\n\n<dim>Skipping joining files...</dim>")
print_out("VODs downloaded to:\n<blue>{}</blue>".format(target_dir))
if args.dry_run:
click.echo("Dry run, video not downloaded.")
return
print_out("\n\nJoining files...")
_join_vods(playlist_path, target, args.overwrite, video)
base_uri = re.sub("/[^/]+$", "/", playlist.url)
target_dir = _crete_temp_dir(base_uri)
# Save playlists for debugging purposes
with open(target_dir / "playlists.m3u8", "w") as f:
f.write(playlists_text)
with open(target_dir / "playlist.m3u8", "w") as f:
f.write(vods_text)
init_sections = get_init_sections(vods_m3u8)
for uri in init_sections:
print_log(f"Downloading init section {uri}...")
download_file(f"{base_uri}{uri}", target_dir / uri)
print_log(f"Downloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + vod.path for vod in vods]
targets = [target_dir / f"{vod.index:05d}.ts" for vod in vods]
asyncio.run(
download_all(
zip(sources, targets),
args.max_workers,
rate_limit=args.rate_limit,
count=len(vods),
)
)
join_playlist = make_join_playlist(vods_m3u8, vods, targets)
join_playlist_path = target_dir / "playlist_downloaded.m3u8"
join_playlist.dump(join_playlist_path) # type: ignore
click.echo()
if args.no_join:
print_log("Skipping joining files...")
click.echo(f"VODs downloaded to:\n{blue(target_dir)}")
return
if args.concat:
print_log("Concating files...")
_concat_vods(targets, target)
else:
print_log("Joining files...")
_join_vods(join_playlist_path, target, args.overwrite, video)
click.echo()
if args.keep:
print_out("\n<dim>Temporary files not deleted: {}</dim>".format(target_dir))
click.echo(f"Temporary files not deleted: {yellow(target_dir)}")
else:
print_out("\n<dim>Deleting temporary files...</dim>")
print_log("Deleting temporary files...")
shutil.rmtree(target_dir)
print_out("\nDownloaded: <green>{}</green>".format(target))
click.echo(f"Downloaded: {green(target)}")
def _determine_time_range(video_id, args):
def http_get(url: str) -> str:
response = httpx.get(url)
response.raise_for_status()
return response.text
def _determine_time_range(video_id: str, args: DownloadOptions):
if args.start or args.end:
return args.start, args.end
if args.chapter is not None:
print_out("<dim>Fetching chapters...</dim>")
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
if not chapters:
@ -363,9 +301,11 @@ def _determine_time_range(video_id, args):
try:
chapter = chapters[args.chapter - 1]
except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
raise ConsoleError(
f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters."
)
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n')
click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000
return start, start + duration
@ -373,11 +313,11 @@ def _determine_time_range(video_id, args):
return None, None
def _choose_chapter_interactive(chapters):
print_out("\nChapters:")
def _choose_chapter_interactive(chapters: List[Chapter]):
click.echo("\nChapters:")
for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>')
click.echo(f'{index + 1}) {bold(chapter["description"])} ({duration})')
index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1]
return chapter

View File

@ -1,9 +0,0 @@
import platform
import sys
import twitchdl
def env(args):
print("twitch-dl", twitchdl.__version__)
print("Platform:", platform.platform())
print("Python", sys.version)

View File

@ -1,18 +1,24 @@
from typing import List
import click
import m3u8
from twitchdl import utils, twitch
from twitchdl import twitch, utils
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
from twitchdl.naming import video_placeholders
from twitchdl.output import bold, dim, print_clip, print_json, print_log, print_table, print_video
from twitchdl.playlists import parse_playlists
from twitchdl.twitch import Chapter, Clip, Video
def info(args):
video_id = utils.parse_video_identifier(args.video)
def info(id: str, *, json: bool = False):
video_id = utils.parse_video_identifier(id)
if video_id:
print_log("Fetching video...")
video = twitch.get_video(video_id)
if not video:
raise ConsoleError("Video {} not found".format(video_id))
raise ConsoleError(f"Video {video_id} not found")
print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id)
@ -23,47 +29,61 @@ def info(args):
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
if args.json:
if json:
video_json(video, playlists, chapters)
else:
video_info(video, playlists, chapters)
return
clip_slug = utils.parse_clip_identifier(args.video)
clip_slug = utils.parse_clip_identifier(id)
if clip_slug:
print_log("Fetching clip...")
clip = twitch.get_clip(clip_slug)
if not clip:
raise ConsoleError("Clip {} not found".format(clip_slug))
raise ConsoleError(f"Clip {clip_slug} not found")
if args.json:
if json:
print_json(clip)
else:
clip_info(clip)
return
raise ConsoleError("Invalid input: {}".format(args.video))
raise ConsoleError(f"Invalid input: {id}")
def video_info(video, playlists, chapters):
print_out()
def video_info(video: Video, playlists: str, chapters: List[Chapter]):
click.echo()
print_video(video)
print_out()
print_out("Playlists:")
for p in m3u8.loads(playlists).playlists:
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
click.echo("Playlists:\n")
playlist_headers = ["Name", "Group", "Resolution", "URL"]
playlist_data = [
[
f"{p.name} {dim('source')}" if p.is_source else p.name,
p.group_id,
f"{p.resolution}",
p.url,
]
for p in parse_playlists(playlists)
]
print_table(playlist_headers, playlist_data)
if chapters:
print_out()
print_out("Chapters:")
click.echo()
click.echo("Chapters:")
for chapter in chapters:
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{start} <b>{chapter["description"]}</b> ({duration})')
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = video_placeholders(video, format="mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
def video_json(video, playlists, chapters):
def video_json(video: Video, playlists: str, chapters: List[Chapter]):
playlists = m3u8.loads(playlists).playlists
video["playlists"] = [
@ -72,8 +92,9 @@ def video_json(video, playlists, chapters):
"resolution": p.stream_info.resolution,
"codecs": p.stream_info.codecs,
"video": p.stream_info.video,
"uri": p.uri
} for p in playlists
"uri": p.uri,
}
for p in playlists
]
video["chapters"] = chapters
@ -81,11 +102,14 @@ def video_json(video, playlists, chapters):
print_json(video)
def clip_info(clip):
print_out()
def clip_info(clip: Clip):
click.echo()
print_clip(clip)
print_out()
print_out("Download links:")
click.echo()
click.echo("Download links:")
for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))
if clip["videoQualities"]:
for q in clip["videoQualities"]:
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")
else:
click.echo("No download URLs found")

View File

@ -1,69 +1,79 @@
import sys
from typing import List, Optional
import click
from twitchdl import twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_out, print_paged_videos, print_video, print_json, print_video_compact
from twitchdl.output import print_json, print_log, print_paged, print_video, print_video_compact
def videos(args):
game_ids = _get_game_ids(args.game)
def videos(
channel_name: str,
*,
all: bool,
compact: bool,
games: List[str],
json: bool,
limit: Optional[int],
pager: Optional[int],
sort: twitch.VideosSort,
type: twitch.VideosType,
):
game_ids = get_game_ids(games)
# Set different defaults for limit for compact display
limit = args.limit or (40 if args.compact else 10)
limit = limit or (40 if compact else 10)
# Ignore --limit if --pager or --all are given
max_videos = sys.maxsize if args.all or args.pager else limit
max_videos = sys.maxsize if all or pager else limit
total_count, generator = twitch.channel_videos_generator(
args.channel_name, max_videos, args.sort, args.type, game_ids=game_ids)
channel_name, max_videos, sort, type, game_ids=game_ids
)
if args.json:
if json:
videos = list(generator)
print_json({
"count": len(videos),
"totalCount": total_count,
"videos": videos
})
print_json({"count": len(videos), "totalCount": total_count, "videos": videos})
return
if total_count == 0:
print_out("<yellow>No videos found</yellow>")
click.echo("No videos found")
return
if args.pager:
print_paged_videos(generator, args.pager, total_count)
if pager:
print_fn = print_video_compact if compact else print_video
print_paged("Videos", generator, print_fn, pager, total_count)
return
count = 0
for video in generator:
if args.compact:
if compact:
print_video_compact(video)
else:
print_out()
click.echo()
print_video(video)
count += 1
print_out()
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(1, count, total_count))
click.echo()
click.echo("-" * 80)
click.echo(f"Videos 1-{count} of {total_count}")
if total_count > count:
print_out()
print_out(
"<dim>There are more videos. Increase the --limit, use --all or --pager to see the rest.</dim>"
click.secho(
"\nThere are more videos. "
+ "Increase the --limit, use --all or --pager to see the rest.",
dim=True,
)
def _get_game_ids(names):
if not names:
return []
def get_game_ids(names: List[str]) -> List[str]:
return [get_game_id(name) for name in names]
game_ids = []
for name in names:
print_out("<dim>Looking up game '{}'...</dim>".format(name))
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError("Game '{}' not found".format(name))
game_ids.append(int(game_id))
return game_ids
def get_game_id(name: str) -> str:
print_log(f"Looking up game '{name}'...")
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
return game_id

View File

@ -1,332 +0,0 @@
# -*- coding: utf-8 -*-
import logging
import sys
import re
from argparse import ArgumentParser, ArgumentTypeError
from typing import NamedTuple, List, Tuple, Any, Dict
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_err
from twitchdl.twitch import GQLError
from . import commands, __version__
Argument = Tuple[List[str], Dict[str, Any]]
class Command(NamedTuple):
name: str
description: str
arguments: List[Argument]
CLIENT_WEBSITE = "https://twitch-dl.bezdomni.net/"
def time(value: str) -> int:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
parts = [int(p) for p in value.split(":")]
if not 2 <= len(parts) <= 3:
raise ArgumentTypeError()
hours = parts[0]
minutes = parts[1]
seconds = parts[2] if len(parts) > 2 else 0
if hours < 0 or not (0 <= minutes <= 59) or not (0 <= seconds <= 59):
raise ArgumentTypeError()
return hours * 3600 + minutes * 60 + seconds
def pos_integer(value: str) -> int:
try:
parsed = int(value)
except ValueError:
raise ArgumentTypeError("must be an integer")
if parsed < 1:
raise ArgumentTypeError("must be positive")
return parsed
def rate(value: str) -> int:
match = re.search(r"^([0-9]+)(k|m|)$", value, flags=re.IGNORECASE)
if not match:
raise ArgumentTypeError("must be an integer, followed by an optional 'k' or 'm'")
amount = int(match.group(1))
unit = match.group(2)
if unit == "k":
return amount * 1024
if unit == "m":
return amount * 1024 * 1024
return amount
COMMANDS = [
Command(
name="videos",
description="List videos for a channel.",
arguments=[
(["channel_name"], {
"help": "Name of the channel to list videos for.",
"type": str,
}),
(["-g", "--game"], {
"help": "Show videos of given game (can be given multiple times)",
"action": "append",
"type": str,
}),
(["-l", "--limit"], {
"help": "Number of videos to fetch. Defaults to 40 in copmpact mode, 10 otherwise.",
"type": pos_integer,
}),
(["-a", "--all"], {
"help": "Fetch all videos, overrides --limit",
"action": "store_true",
"default": False,
}),
(["-s", "--sort"], {
"help": "Sorting order of videos. Defaults to `time`.",
"type": str,
"choices": ["views", "time"],
"default": "time",
}),
(["-t", "--type"], {
"help": "Broadcast type. Defaults to `archive`.",
"type": str,
"choices": ["archive", "highlight", "upload"],
"default": "archive",
}),
(["-j", "--json"], {
"help": "Show results as JSON. Ignores `--pager`.",
"action": "store_true",
"default": False,
}),
(["-p", "--pager"], {
"help": "Print videos in pages. Ignores `--limit`. Defaults to 10.",
"type": pos_integer,
"nargs": "?",
"const": 10,
}),
(["-c", "--compact"], {
"help": "Show videos in compact mode, one line per video",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="clips",
description="List or download clips for a channel.",
arguments=[
(["channel_name"], {
"help": "Name of the channel to list clips for.",
"type": str,
}),
(["-l", "--limit"], {
"help": "Number of videos to fetch (default 10, max 100)",
"type": pos_integer,
"default": 10,
}),
(["-a", "--all"], {
"help": "Fetch all videos, overrides --limit",
"action": "store_true",
"default": False,
}),
(["-P", "--period"], {
"help": "Period from which to return clips. Defaults to `all_time`.",
"type": str,
"choices": ["last_day", "last_week", "last_month", "all_time"],
"default": "all_time",
}),
(["-j", "--json"], {
"help": "Show results as JSON. Ignores `--pager`.",
"action": "store_true",
"default": False,
}),
(["-p", "--pager"], {
"help": "Number of clips to show per page. Disabled by default.",
"type": pos_integer,
"nargs": "?",
"const": 10,
}),
(["-d", "--download"], {
"help": "Download all videos in given period (in source quality)",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="download",
description="Download videos or clips.",
arguments=[
(["videos"], {
"help": "One or more video ID, clip slug or twitch URL to download.",
"type": str,
"nargs": "+",
}),
(["-w", "--max-workers"], {
"help": "Number of workers for downloading vods concurrently (default 5)",
"type": int,
"default": 5,
}),
(["-s", "--start"], {
"help": "Download video from this time (hh:mm or hh:mm:ss)",
"type": time,
"default": None,
}),
(["-e", "--end"], {
"help": "Download video up to this time (hh:mm or hh:mm:ss)",
"type": time,
"default": None,
}),
(["-f", "--format"], {
"help": "Video format to convert into, passed to ffmpeg as the "
"target file extension. Defaults to `mkv`.",
"type": str,
"default": "mkv",
}),
(["-k", "--keep"], {
"help": "Don't delete downloaded VODs and playlists after merging.",
"action": "store_true",
"default": False,
}),
(["-q", "--quality"], {
"help": "Video quality, e.g. 720p. Set to 'source' to get best quality.",
"type": str,
}),
(["-a", "--auth-token"], {
"help": "Authentication token, passed to Twitch to access subscriber only "
"VODs. Can be copied from the 'auth_token' cookie in any browser "
"logged in on Twitch.",
"type": str,
"default": None,
}),
(["--no-join"], {
"help": "Don't run ffmpeg to join the downloaded vods, implies --keep.",
"action": "store_true",
"default": False,
}),
(["--overwrite"], {
"help": "Overwrite the target file if it already exists without prompting.",
"action": "store_true",
"default": False,
}),
(["-o", "--output"], {
"help": "Output file name template. See docs for details.",
"type": str,
"default": "{date}_{id}_{channel_login}_{title_slug}.{format}"
}),
(["-r", "--rate-limit"], {
"help": "Limit the maximum download speed in bytes per second. "
"Use 'k' and 'm' suffixes for kbps and mbps.",
"type": rate,
}),
(["-c", "--chapter"], {
"help": "Download a single chapter of the video. Specify the chapter number or "
"use the flag without a number to display a chapter select prompt.",
"type": int,
"nargs": "?",
"const": 0
}),
],
),
Command(
name="info",
description="Print information for a given Twitch URL, video ID or clip slug.",
arguments=[
(["video"], {
"help": "Video ID, clip slug, or URL",
"type": str,
}),
(["-j", "--json"], {
"help": "Show results as JSON",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="env",
description="Print environment information for inclusion in bug reports.",
arguments=[],
)
]
COMMON_ARGUMENTS = [
(["--debug"], {
"help": "show debug log in console",
"action": 'store_true',
"default": False,
}),
(["--no-color"], {
"help": "disable ANSI colors in output",
"action": 'store_true',
"default": False,
})
]
def get_parser():
description = "A script for downloading videos from Twitch"
parser = ArgumentParser(prog='twitch-dl', description=description, epilog=CLIENT_WEBSITE)
parser.add_argument("--version", help="show version number", action='store_true')
subparsers = parser.add_subparsers(title="commands")
for command in COMMANDS:
sub = subparsers.add_parser(
command.name,
description=command.description,
epilog=CLIENT_WEBSITE
)
# Set the function to call to the function of same name in the "commands" package
sub.set_defaults(func=commands.__dict__.get(command.name))
for args, kwargs in command.arguments + COMMON_ARGUMENTS:
sub.add_argument(*args, **kwargs)
return parser
def main():
parser = get_parser()
args = parser.parse_args()
if "--debug" in sys.argv:
logging.basicConfig(level=logging.DEBUG)
if args.version:
print("twitch-dl v{}".format(__version__))
return
if "func" not in args:
parser.print_help()
return
try:
args.func(args)
except ConsoleError as e:
print_err(e)
sys.exit(1)
except KeyboardInterrupt:
print_err("\nOperation canceled")
sys.exit(1)
except GQLError as e:
print_err(e)
for err in e.errors:
print_err("*", err["message"])
sys.exit(1)

View File

@ -1,38 +0,0 @@
import os
import httpx
CHUNK_SIZE = 1024
CONNECT_TIMEOUT = 5
RETRY_COUNT = 5
class DownloadFailed(Exception):
pass
def _download(url: str, path: str):
tmp_path = path + ".tmp"
size = 0
with httpx.stream("GET", url, timeout=CONNECT_TIMEOUT) as response:
with open(tmp_path, "wb") as target:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
target.write(chunk)
size += len(chunk)
os.rename(tmp_path, path)
return size
def download_file(url: str, path: str, retries: int = RETRY_COUNT):
if os.path.exists(path):
from_disk = True
return (os.path.getsize(path), from_disk)
from_disk = False
for _ in range(retries):
try:
return (_download(url, path), from_disk)
except httpx.RequestError:
pass
raise DownloadFailed(":(")

92
twitchdl/entities.py Normal file
View File

@ -0,0 +1,92 @@
from dataclasses import dataclass
from typing import Any, List, Literal, Mapping, Optional, TypedDict
@dataclass
class DownloadOptions:
auth_token: Optional[str]
chapter: Optional[int]
concat: bool
dry_run: bool
end: Optional[int]
format: str
keep: bool
no_join: bool
overwrite: bool
output: str
quality: Optional[str]
rate_limit: Optional[int]
start: Optional[int]
max_workers: int
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class AccessToken(TypedDict):
signature: str
value: str
class User(TypedDict):
login: str
displayName: str
class Game(TypedDict):
id: str
name: str
class VideoQuality(TypedDict):
frameRate: str
quality: str
sourceURL: str
class ClipAccessToken(TypedDict):
id: str
playbackAccessToken: AccessToken
videoQualities: List[VideoQuality]
class Clip(TypedDict):
id: str
slug: str
title: str
createdAt: str
viewCount: int
durationSeconds: int
url: str
videoQualities: Optional[List[VideoQuality]]
game: Game
broadcaster: User
class Video(TypedDict):
id: str
title: str
description: str
publishedAt: str
broadcastType: str
lengthSeconds: int
game: Game
creator: User
class Chapter(TypedDict):
id: str
durationMilliseconds: int
positionMilliseconds: int
type: str
description: str
subDescription: str
thumbnailURL: str
game: Game
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = Mapping[str, Any]

View File

@ -1,4 +1,7 @@
import click
class ConsoleError(Exception):
class ConsoleError(click.ClickException):
"""Raised when an error occurs and script exectuion should halt."""
pass

View File

@ -1,11 +1,14 @@
import asyncio
import httpx
import logging
import os
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterable, Optional, Tuple
from typing import List, Optional, Union
import httpx
from twitchdl.exceptions import ConsoleError
from twitchdl.progress import Progress
logger = logging.getLogger(__name__)
@ -25,7 +28,13 @@ https://www.python-httpx.org/advanced/#timeout-configuration
"""
class TokenBucket:
class TokenBucket(ABC):
@abstractmethod
def advance(self, size: int):
pass
class LimitingTokenBucket(TokenBucket):
"""Limit the download speed by strategically inserting sleeps."""
def __init__(self, rate: int, capacity: Optional[int] = None):
@ -53,22 +62,20 @@ class TokenBucket:
self.last_refilled = now
class EndlessTokenBucket:
class EndlessTokenBucket(TokenBucket):
"""Used when download speed is not limited."""
def advance(self, size: int):
pass
AnyTokenBucket = Union[TokenBucket, EndlessTokenBucket]
async def download(
client: httpx.AsyncClient,
task_id: int,
source: str,
target: str,
target: Path,
progress: Progress,
token_bucket: AnyTokenBucket,
token_bucket: TokenBucket,
):
# Download to a temp file first, then copy to target when over to avoid
# getting saving chunks which may persist if canceled or --keep is used
@ -91,12 +98,12 @@ async def download_with_retries(
semaphore: asyncio.Semaphore,
task_id: int,
source: str,
target: str,
target: Path,
progress: Progress,
token_bucket: AnyTokenBucket,
token_bucket: TokenBucket,
):
async with semaphore:
if os.path.exists(target):
if target.exists():
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return
@ -114,16 +121,56 @@ async def download_with_retries(
async def download_all(
sources: List[str],
targets: List[str],
source_targets: Iterable[Tuple[str, Path]],
workers: int,
*,
rate_limit: Optional[int] = None
count: Optional[int] = None,
rate_limit: Optional[int] = None,
):
progress = Progress(len(sources))
token_bucket = TokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
progress = Progress(count)
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket)
for task_id, (source, target) in enumerate(zip(sources, targets))]
tasks = [
download_with_retries(
client,
semaphore,
task_id,
source,
target,
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(source_targets)
]
await asyncio.gather(*tasks)
def download_file(url: str, target: Path, retries: int = RETRY_COUNT) -> None:
"""Download URL to given target path with retries"""
error_message = ""
for r in range(retries):
try:
retry_info = f" (retry {r})" if r > 0 else ""
logger.info(f"Downloading {url} to {target}{retry_info}")
return _do_download_file(url, target)
except httpx.HTTPStatusError as ex:
logger.error(ex)
error_message = f"Server responded with HTTP {ex.response.status_code}"
except httpx.RequestError as ex:
logger.error(ex)
error_message = str(ex)
raise ConsoleError(f"Failed downloading after {retries} attempts: {error_message}")
def _do_download_file(url: str, target: Path) -> None:
tmp_path = Path(str(target) + ".tmp")
with httpx.stream("GET", url, timeout=TIMEOUT, follow_redirects=True) as response:
response.raise_for_status()
with open(tmp_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
f.write(chunk)
os.rename(tmp_path, target)

72
twitchdl/naming.py Normal file
View File

@ -0,0 +1,72 @@
import os
from typing import Dict
from twitchdl import utils
from twitchdl.entities import Clip, Video
from twitchdl.exceptions import ConsoleError
DEFAULT_OUTPUT_TEMPLATE = "{date}_{id}_{channel_login}_{title_slug}.{format}"
def video_filename(video: Video, format: str, output: str) -> str:
subs = video_placeholders(video, format)
return _format(output, subs)
def video_placeholders(video: Video, format: str) -> Dict[str, str]:
date, time = video["publishedAt"].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video["publishedAt"],
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video["id"],
"time": time,
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def clip_filename(clip: Clip, output: str):
subs = clip_placeholders(clip)
return _format(output, subs)
def clip_placeholders(clip: Clip) -> Dict[str, str]:
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
if clip["videoQualities"]:
url = clip["videoQualities"][0]["sourceURL"]
_, ext = os.path.splitext(url)
ext = ext.lstrip(".")
else:
ext = "mp4"
return {
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
def _format(output: str, subs: Dict[str, str]) -> str:
try:
return output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")

View File

@ -1,111 +1,68 @@
# -*- coding: utf-8 -*-
import json
import sys
import re
from itertools import islice
from typing import Any, Callable, Generator, List, Optional, TypeVar
import click
from twitchdl import utils
from typing import Any, Match
from twitchdl.entities import Clip, Video
T = TypeVar("T")
START_CODES = {
'b': '\033[1m',
'dim': '\033[2m',
'i': '\033[3m',
'u': '\033[4m',
'red': '\033[91m',
'green': '\033[92m',
'yellow': '\033[93m',
'blue': '\033[94m',
'magenta': '\033[95m',
'cyan': '\033[96m',
}
END_CODE = '\033[0m'
START_PATTERN = "<(" + "|".join(START_CODES.keys()) + ")>"
END_PATTERN = "</(" + "|".join(START_CODES.keys()) + ")>"
USE_ANSI_COLOR = "--no-color" not in sys.argv
def start_code(match: Match[str]) -> str:
name = match.group(1)
return START_CODES[name]
def colorize(text: str) -> str:
text = re.sub(START_PATTERN, start_code, text)
text = re.sub(END_PATTERN, END_CODE, text)
return text
def strip_tags(text: str) -> str:
text = re.sub(START_PATTERN, '', text)
text = re.sub(END_PATTERN, '', text)
return text
def clear_line():
sys.stdout.write("\033[1K")
sys.stdout.write("\r")
def truncate(string: str, length: int) -> str:
if len(string) > length:
return string[:length - 1] + ""
return string[: length - 1] + ""
return string
def print_out(*args, **kwargs):
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, **kwargs)
def print_json(data: Any):
print(json.dumps(data))
click.echo(json.dumps(data))
def print_err(*args, **kwargs):
args = ["<red>{}</red>".format(a) for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_log(message: Any):
click.secho(message, err=True, dim=True)
def print_log(*args, **kwargs):
args = ["<dim>{}</dim>".format(a) for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def visual_len(text: str):
return len(click.unstyle(text))
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
# Can't find URL in video object, strange
url = "https://www.twitch.tv/videos/{}".format(video["id"])
print_out("<b>Video {}</b>".format(video["id"]))
print_out("<green>{}</green>".format(video["title"]))
if channel or playing:
print_out(" ".join([channel, playing]))
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
print_out("<i>{}</i>".format(url))
def ljust(text: str, width: int):
diff = width - visual_len(text)
return text + (" " * diff) if diff > 0 else text
def print_video_compact(video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
def print_table(headers: List[str], data: List[List[str]]):
widths = [[visual_len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
underlines = ["-" * width for width in widths]
def print_row(row: List[str]):
parts = (ljust(cell, widths[idx]) for idx, cell in enumerate(row))
click.echo(" ".join(parts).strip())
print_row(headers)
print_row(underlines)
for row in data:
print_row(row)
def print_paged_videos(generator, page_size, total_count):
def print_paged(
label: str,
generator: Generator[T, Any, Any],
print_fn: Callable[[T], None],
page_size: int,
total_count: Optional[int] = None,
):
iterator = iter(generator)
page = list(islice(iterator, page_size))
@ -113,47 +70,90 @@ def print_paged_videos(generator, page_size, total_count):
last = first + len(page) - 1
while True:
print_out("-" * 80)
click.echo("-" * 80)
print_out()
for video in page:
print_video(video)
print_out()
click.echo()
for item in page:
print_fn(item)
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(first, last, total_count))
click.echo("-" * 80)
click.echo(f"{label} {first}-{last} of {total_count or '???'}")
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not _continue():
if not page or not prompt_continue():
break
def print_clip(clip):
def print_video(video: Video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = blue(video["creator"]["displayName"]) if video["creator"] else ""
playing = f"playing {blue(video['game']['name'])}" if video["game"] else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video['id']}"
click.secho(f"Video {video['id']}", bold=True)
click.secho(video["title"], fg="green")
if channel or playing:
click.echo(" ".join([channel, playing]))
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)
if video["description"]:
click.echo(f"\nDescription:\n{video['description']}")
click.echo()
def print_video_compact(video: Video):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
click.echo(f"{bold(id)} {date} {green(title)} {blue(game)}")
def print_clip(clip: Clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
playing = (
"playing <blue>{}</blue>".format(clip["game"]["name"])
if clip["game"] else ""
playing = f"playing {blue(clip['game']['name'])}" if clip["game"] else ""
click.echo(f"Clip {bold(clip['slug'])}")
click.secho(clip["title"], fg="green")
click.echo(f"{blue(channel)} {playing}")
click.echo(
f"Published {blue(published_at)}"
+ f" Length: {blue(length)}"
+ f" Views: {blue(clip['viewCount'])}"
)
print_out("Clip <b>{}</b>".format(clip["slug"]))
print_out("<green>{}</green>".format(clip["title"]))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(
"Published <blue>{}</blue>"
" Length: <blue>{}</blue>"
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
print_out("<i>{}</i>".format(clip["url"]))
click.secho(clip["url"], italic=True)
click.echo()
def _continue():
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
def print_clip_compact(clip: Clip):
slug = clip["slug"]
date = clip["createdAt"][:10]
title = truncate(clip["title"], 50).ljust(50)
game = clip["game"]["name"] if clip["game"] else ""
game = truncate(game, 30).ljust(30)
click.echo(f"{date} {green(title)} {blue(game)} {bold(slug)}")
def prompt_continue():
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
try:
input()
@ -161,3 +161,30 @@ def _continue():
return False
return True
# Shorthand functions for coloring output
def blue(text: Any) -> str:
return click.style(text, fg="blue")
def cyan(text: Any) -> str:
return click.style(text, fg="cyan")
def green(text: Any) -> str:
return click.style(text, fg="green")
def yellow(text: Any) -> str:
return click.style(text, fg="yellow")
def bold(text: Any) -> str:
return click.style(text, bold=True)
def dim(text: Any) -> str:
return click.style(text, dim=True)

180
twitchdl/playlists.py Normal file
View File

@ -0,0 +1,180 @@
"""
Parse and manipulate m3u8 playlists.
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Generator, List, Optional, OrderedDict, Set
import click
import m3u8
from twitchdl import utils
from twitchdl.output import bold, dim, print_table
@dataclass
class Playlist:
name: str
group_id: str
resolution: Optional[str]
url: str
is_source: bool
@dataclass
class Vod:
index: int
"""Ordinal number of the VOD in the playlist"""
path: str
"""Path part of the VOD URL"""
duration: int
"""Segment duration in seconds"""
def parse_playlists(playlists_m3u8: str) -> List[Playlist]:
def _parse(source: str) -> Generator[Playlist, None, None]:
document = load_m3u8(source)
for p in document.playlists:
resolution = (
"x".join(str(r) for r in p.stream_info.resolution)
if p.stream_info.resolution
else None
)
media = p.media[0]
is_source = media.group_id == "chunked"
yield Playlist(media.name, media.group_id, resolution, p.uri, is_source)
return list(_parse(playlists_m3u8))
def load_m3u8(playlist_m3u8: str) -> m3u8.M3U8:
return m3u8.loads(playlist_m3u8)
def enumerate_vods(
document: m3u8.M3U8,
start: Optional[int] = None,
end: Optional[int] = None,
) -> List[Vod]:
"""Extract VODs for download from document."""
vods = []
vod_start = 0
for index, segment in enumerate(document.segments):
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition:
vods.append(Vod(index, segment.uri, segment.duration))
vod_start = vod_end
return vods
def make_join_playlist(
playlist: m3u8.M3U8,
vods: List[Vod],
targets: List[Path],
) -> m3u8.Playlist:
"""
Make a modified playlist which references downloaded VODs
Keep only the downloaded segments and skip the rest
"""
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip([v.path for v in vods], targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = str(path_map[segment.uri].name)
playlist.segments.append(segment)
return playlist
def select_playlist(playlists: List[Playlist], quality: Optional[str]) -> Playlist:
return (
select_playlist_by_name(playlists, quality)
if quality is not None
else select_playlist_interactive(playlists)
)
def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist:
if quality == "source":
for playlist in playlists:
if playlist.is_source:
return playlist
raise click.ClickException("Source quality not found, please report an issue on github.")
for playlist in playlists:
if playlist.name == quality or playlist.group_id == quality:
return playlist
available = ", ".join([p.name for p in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise click.ClickException(msg)
def select_playlist_interactive(playlists: List[Playlist]) -> Playlist:
playlists = sorted(playlists, key=_playlist_key)
headers = ["#", "Name", "Group ID", "Resolution"]
rows = [
[
f"{n + 1})",
bold(playlist.name),
dim(playlist.group_id),
dim(playlist.resolution or ""),
]
for n, playlist in enumerate(playlists)
]
click.echo()
print_table(headers, rows)
default = 1
for index, playlist in enumerate(playlists):
if playlist.is_source:
default = index + 1
no = utils.read_int("\nChoose quality", min=1, max=len(playlists) + 1, default=default)
playlist = playlists[no - 1]
return playlist
MAX = 1_000_000
def _playlist_key(playlist: Playlist) -> int:
"""Attempt to sort playlists so that source quality is on top, audio only
is on bottom and others are sorted descending by resolution."""
if playlist.is_source:
return 0
if playlist.group_id == "audio_only":
return MAX
try:
return MAX - int(playlist.name.split("p")[0])
except Exception:
pass
return MAX
def get_init_sections(playlist: m3u8.M3U8) -> Set[str]:
# TODO: we're ignoring initi_section.base_uri and bytes
return set(
segment.init_section.uri
for segment in playlist.segments
if segment.init_section is not None
)

View File

@ -1,12 +1,13 @@
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from dataclasses import dataclass
from statistics import mean
from typing import Dict, NamedTuple, Optional, Deque
from typing import Deque, Dict, NamedTuple, Optional
from twitchdl.output import print_out
import click
from twitchdl.output import blue, clear_line
from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__)
@ -21,7 +22,7 @@ class Task:
size: int
downloaded: int = 0
def advance(self, size):
def advance(self, size: int):
self.downloaded += size
@ -30,28 +31,25 @@ class Sample(NamedTuple):
timestamp: float
@dataclass
class Progress:
vod_count: int
downloaded: int = 0
estimated_total: Optional[int] = None
last_printed: float = field(default_factory=time.time)
progress_bytes: int = 0
progress_perc: int = 0
remaining_time: Optional[int] = None
speed: Optional[float] = None
start_time: float = field(default_factory=time.time)
tasks: Dict[TaskId, Task] = field(default_factory=dict)
vod_downloaded_count: int = 0
samples: Deque[Sample] = field(default_factory=lambda: deque(maxlen=100))
def __init__(self, file_count: Optional[int] = None):
self.downloaded: int = 0
self.estimated_total: Optional[int] = None
self.last_printed: Optional[float] = None
self.progress_bytes: int = 0
self.progress_perc: int = 0
self.remaining_time: Optional[int] = None
self.samples: Deque[Sample] = deque(maxlen=1000)
self.speed: Optional[float] = None
self.tasks: Dict[TaskId, Task] = {}
self.file_count = file_count
self.downloaded_count: int = 0
def start(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot start, already started")
self.tasks[task_id] = Task(task_id, size)
self._calculate_total()
self._calculate_progress()
self.print()
def advance(self, task_id: int, size: int):
@ -62,7 +60,6 @@ class Progress:
self.progress_bytes += size
self.tasks[task_id].advance(size)
self.samples.append(Sample(self.downloaded, time.time()))
self._calculate_progress()
self.print()
def already_downloaded(self, task_id: int, size: int):
@ -71,9 +68,7 @@ class Progress:
self.tasks[task_id] = Task(task_id, size)
self.progress_bytes += size
self.vod_downloaded_count += 1
self._calculate_total()
self._calculate_progress()
self.downloaded_count += 1
self.print()
def abort(self, task_id: int):
@ -82,9 +77,6 @@ class Progress:
del self.tasks[task_id]
self.progress_bytes = sum(t.downloaded for t in self.tasks.values())
self._calculate_total()
self._calculate_progress()
self.print()
def end(self, task_id: int):
@ -93,18 +85,28 @@ class Progress:
task = self.tasks[task_id]
if task.size != task.downloaded:
logger.warn(f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b.")
logger.warn(
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
self.vod_downloaded_count += 1
self.downloaded_count += 1
self.print()
def _calculate_total(self):
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
def _recalculate(self):
if self.tasks and self.file_count:
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.file_count)
else:
self.estimated_total = None
def _calculate_progress(self):
self.speed = self._calculate_speed()
self.progress_perc = int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
self.remaining_time = int((self.estimated_total - self.progress_bytes) / self.speed) if self.estimated_total and self.speed else None
self.progress_perc = (
int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
)
self.remaining_time = (
int((self.estimated_total - self.progress_bytes) / self.speed)
if self.estimated_total and self.speed
else None
)
def _calculate_speed(self):
if len(self.samples) < 2:
@ -116,22 +118,31 @@ class Progress:
size = last_sample.downloaded - first_sample.downloaded
duration = last_sample.timestamp - first_sample.timestamp
return size / duration
return size / duration if duration > 0 else None
def print(self):
now = time.time()
# Don't print more often than 10 times per second
if now - self.last_printed < 0.1:
if self.last_printed and now - self.last_printed < 0.1:
return
progress = " ".join([
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs",
f"<blue>{self.progress_perc}%</blue>",
f"of <blue>~{format_size(self.estimated_total)}</blue>" if self.estimated_total else "",
f"at <blue>{format_size(self.speed)}/s</blue>" if self.speed else "",
f"ETA <blue>{format_time(self.remaining_time)}</blue>" if self.remaining_time is not None else "",
])
self._recalculate()
clear_line()
total_label = f"/{self.file_count}" if self.file_count else ""
click.echo(f"Downloaded {self.downloaded_count}{total_label} VODs", nl=False)
click.secho(f" {self.progress_perc}%", fg="blue", nl=False)
if self.estimated_total is not None:
total = f"~{format_size(self.estimated_total)}"
click.echo(f" of {blue(total)}", nl=False)
if self.speed is not None:
speed = f"{format_size(self.speed)}/s"
click.echo(f" at {blue(speed)}", nl=False)
if self.remaining_time is not None:
click.echo(f" ETA {blue(format_time(self.remaining_time))}", nl=False)
print_out(f"\r{progress} ", end="")
self.last_printed = now

View File

@ -2,24 +2,54 @@
Twitch API access.
"""
import logging
import random
import time
from typing import Any, Dict, Generator, List, Mapping, Optional, Tuple, Union
import click
import httpx
import json
from typing import Dict
from twitchdl import CLIENT_ID
from twitchdl.entities import (
AccessToken,
Chapter,
Clip,
ClipAccessToken,
ClipsPeriod,
Data,
Video,
VideosSort,
VideosType,
)
from twitchdl.exceptions import ConsoleError
from twitchdl.utils import format_size
class GQLError(Exception):
def __init__(self, errors):
super().__init__("GraphQL query failed")
self.errors = errors
class GQLError(click.ClickException):
def __init__(self, errors: List[str]):
message = "GraphQL query failed."
for error in errors:
message += f"\n* {error}"
super().__init__(message)
def authenticated_post(url, data=None, json=None, headers={}):
headers['Client-ID'] = CLIENT_ID
Content = Union[str, bytes]
Headers = Dict[str, str]
response = httpx.post(url, data=data, json=json, headers=headers)
def authenticated_post(
url: str,
*,
json: Any = None,
content: Optional[Content] = None,
auth_token: Optional[str] = None,
):
headers = {"Client-ID": CLIENT_ID}
if auth_token is not None:
headers["authorization"] = f"OAuth {auth_token}"
response = request("POST", url, content=content, json=json, headers=headers)
if response.status_code == 400:
data = response.json()
raise ConsoleError(data["message"])
@ -29,33 +59,71 @@ def authenticated_post(url, data=None, json=None, headers={}):
return response
def gql_post(query):
def request(
method: str,
url: str,
json: Any = None,
content: Optional[Content] = None,
headers: Optional[Mapping[str, str]] = None,
):
with httpx.Client() as client:
request = client.build_request(method, url, json=json, content=content, headers=headers)
log_request(request)
start = time.time()
response = client.send(request)
duration = time.time() - start
log_response(response, duration)
return response
logger = logging.getLogger(__name__)
def log_request(request: httpx.Request):
logger.info(f"--> {request.method} {request.url}")
if request.content:
logger.debug(f"--> {request.content}")
def log_response(response: httpx.Response, duration_seconds: float):
request = response.request
duration = f"{int(1000 * duration_seconds)}ms"
size = format_size(len(response.content))
logger.info(f"<-- {request.method} {request.url} HTTP {response.status_code} {duration} {size}")
if response.content:
logger.debug(f"<-- {response.content}")
def gql_persisted_query(query: Data):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, data=query).json()
if "errors" in response:
raise GQLError(response["errors"])
return response
response = authenticated_post(url, json=query)
gql_raise_on_error(response)
return response.json()
def gql_query(query: str, headers: Dict[str, str] = {}):
def gql_query(query: str, auth_token: Optional[str] = None):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, headers=headers).json()
response = authenticated_post(url, json={"query": query}, auth_token=auth_token)
gql_raise_on_error(response)
return response.json()
if "errors" in response:
raise GQLError(response["errors"])
return response
def gql_raise_on_error(response: httpx.Response):
data = response.json()
if "errors" in data:
errors = [e["message"] for e in data["errors"]]
raise GQLError(errors)
VIDEO_FIELDS = """
id
title
description
publishedAt
broadcastType
lengthSeconds
game {
id
name
}
creator {
@ -89,55 +157,54 @@ CLIP_FIELDS = """
"""
def get_video(video_id):
query = """
def get_video(video_id: str) -> Optional[Video]:
query = f"""
{{
video(id: "{video_id}") {{
{fields}
{VIDEO_FIELDS}
}}
}}
"""
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
return response["data"]["video"]
def get_clip(slug):
query = """
def get_clip(slug: str) -> Optional[Clip]:
query = f"""
{{
clip(slug: "{}") {{
{fields}
clip(slug: "{slug}") {{
{CLIP_FIELDS}
}}
}}
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
response = gql_query(query)
return response["data"]["clip"]
def get_clip_access_token(slug):
query = """
{{
def get_clip_access_token(slug: str) -> ClipAccessToken:
query = {
"operationName": "VideoAccessToken_Clip",
"variables": {{
"slug": "{slug}"
}},
"extensions": {{
"persistedQuery": {{
"variables": {"slug": slug},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "36b89d2507fce29e5ca551df756d27c1cfe079e2609642b4390aa4c35796eb11"
}}
}}
}}
"""
"sha256Hash": "36b89d2507fce29e5ca551df756d27c1cfe079e2609642b4390aa4c35796eb11",
}
},
}
response = gql_post(query.format(slug=slug).strip())
response = gql_persisted_query(query)
return response["data"]["clip"]
def get_channel_clips(channel_id, period, limit, after=None):
def get_channel_clips(
channel_id: str,
period: ClipsPeriod,
limit: int,
after: Optional[str] = None,
):
"""
List channel clips.
@ -147,10 +214,10 @@ def get_channel_clips(channel_id, period, limit, after=None):
* sorting by VIEWS_DESC and TRENDING returns the same results
* there is no totalCount
"""
query = """
query = f"""
{{
user(login: "{channel_id}") {{
clips(first: {limit}, after: "{after}", criteria: {{ period: {period}, sort: VIEWS_DESC }}) {{
clips(first: {limit}, after: "{after or ''}", criteria: {{ period: {period.upper()}, sort: VIEWS_DESC }}) {{
pageInfo {{
hasNextPage
hasPreviousPage
@ -158,7 +225,7 @@ def get_channel_clips(channel_id, period, limit, after=None):
edges {{
cursor
node {{
{fields}
{CLIP_FIELDS}
}}
}}
}}
@ -166,24 +233,20 @@ def get_channel_clips(channel_id, period, limit, after=None):
}}
"""
query = query.format(
channel_id=channel_id,
after=after if after else "",
limit=limit,
period=period.upper(),
fields=CLIP_FIELDS
)
response = gql_query(query)
user = response["data"]["user"]
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
raise ConsoleError(f"Channel {channel_id} not found")
return response["data"]["user"]["clips"]
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
def channel_clips_generator(
channel_id: str,
period: ClipsPeriod,
limit: int,
) -> Generator[Clip, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Clip, None, None]:
for clip in clips["edges"]:
if limit < 1:
return
@ -204,35 +267,27 @@ def channel_clips_generator(channel_id, period, limit):
return _generator(clips, limit)
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
clips = get_channel_clips(
channel_id, period, limit, after=cursor)
def get_channel_videos(
channel_id: str,
limit: int,
sort: str,
type: str = "archive",
game_ids: Optional[List[str]] = None,
after: Optional[str] = None,
):
game_ids = game_ids or []
game_ids_str = f"[{','.join(game_ids)}]"
if not clips["edges"]:
break
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
yield clips, has_next
if not cursor:
break
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
query = """
query = f"""
{{
user(login: "{channel_id}") {{
videos(
first: {limit},
type: {type},
sort: {sort},
after: "{after}",
type: {type.upper()},
sort: {sort.upper()},
after: "{after or ''}",
options: {{
gameIDs: {game_ids}
gameIDs: {game_ids_str}
}}
) {{
totalCount
@ -242,7 +297,7 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
edges {{
cursor
node {{
{fields}
{VIDEO_FIELDS}
}}
}}
}}
@ -250,26 +305,24 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
}}
"""
query = query.format(
channel_id=channel_id,
game_ids=game_ids,
after=after if after else "",
limit=limit,
sort=sort.upper(),
type=type.upper(),
fields=VIDEO_FIELDS
)
response = gql_query(query)
if not response["data"]["user"]:
raise ConsoleError("Channel {} not found".format(channel_id))
raise ConsoleError(f"Channel {channel_id} not found")
return response["data"]["user"]["videos"]
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
def _generator(videos, max_videos):
def channel_videos_generator(
channel_id: str,
max_videos: int,
sort: VideosSort,
type: VideosType,
game_ids: Optional[List[str]] = None,
) -> Tuple[int, Generator[Video, None, None]]:
game_ids = game_ids or []
def _generator(videos: Data, max_videos: int) -> Generator[Video, None, None]:
for video in videos["edges"]:
if max_videos < 1:
return
@ -290,11 +343,11 @@ def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id, auth_token=None):
query = """
def get_access_token(video_id: str, auth_token: Optional[str] = None) -> AccessToken:
query = f"""
{{
videoPlaybackAccessToken(
id: {video_id},
id: "{video_id}",
params: {{
platform: "web",
playerBackend: "mediaplayer",
@ -307,14 +360,8 @@ def get_access_token(video_id, auth_token=None):
}}
"""
query = query.format(video_id=video_id)
headers = {}
if auth_token is not None:
headers['authorization'] = f'OAuth {auth_token}'
try:
response = gql_query(query, headers=headers)
response = gql_query(query, auth_token=auth_token)
return response["data"]["videoPlaybackAccessToken"]
except httpx.HTTPStatusError as error:
# Provide a more useful error message when server returns HTTP 401
@ -331,62 +378,68 @@ def get_access_token(video_id, auth_token=None):
raise
def get_playlists(video_id, access_token):
def get_playlists(video_id: str, access_token: AccessToken) -> str:
"""
For a given video return a playlist which contains possible video qualities.
"""
url = "https://usher.ttvnw.net/vod/{}".format(video_id)
url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(
url,
params={
"nauth": access_token["value"],
"nauthsig": access_token["signature"],
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
"platform": "web",
"supported_codecs": "av1,h265,h264",
"p": random.randint(1000000, 10000000),
},
)
response = httpx.get(url, params={
"nauth": access_token['value'],
"nauthsig": access_token['signature'],
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
})
response.raise_for_status()
return response.content.decode('utf-8')
return response.content.decode("utf-8")
def get_game_id(name):
query = """
def get_game_id(name: str):
query = f"""
{{
game(name: "{}") {{
game(name: "{name.strip()}") {{
id
}}
}}
"""
response = gql_query(query.format(name.strip()))
response = gql_query(query)
game = response["data"]["game"]
if game:
return game["id"]
def get_video_chapters(video_id):
def get_video_chapters(video_id: str) -> List[Chapter]:
query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables":
{
"variables": {
"includePrivate": False,
"videoID": video_id
"videoID": video_id,
},
"extensions":
{
"persistedQuery":
{
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41"
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41",
}
}
},
}
response = gql_post(json.dumps(query))
response = gql_persisted_query(query)
return list(_chapter_nodes(response["data"]["video"]["moments"]))
def _chapter_nodes(collection):
for edge in collection["edges"]:
def _chapter_nodes(moments: Data) -> Generator[Chapter, None, None]:
for edge in moments["edges"]:
node = edge["node"]
node["game"] = node["details"]["game"]
del node["details"]
del node["moments"]
yield node

View File

@ -1,15 +1,18 @@
import re
import unicodedata
from typing import Optional, Union
import click
def _format_size(value, digits, unit):
def _format_size(value: float, digits: int, unit: str):
if digits > 0:
return "{{:.{}f}}{}".format(digits, unit).format(value)
return f"{{:.{digits}f}}{unit}".format(value)
else:
return "{{:d}}{}".format(unit).format(value)
return f"{int(value)}{unit}"
def format_size(bytes_, digits=1):
def format_size(bytes_: Union[int, float], digits: int = 1):
if bytes_ < 1024:
return _format_size(bytes_, digits, "B")
@ -24,7 +27,7 @@ def format_size(bytes_, digits=1):
return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds):
def format_duration(total_seconds: Union[int, float]) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -32,15 +35,15 @@ def format_duration(total_seconds):
seconds = total_seconds % 60
if hours:
return "{} h {} min".format(hours, minutes)
return f"{hours} h {minutes} min"
if minutes:
return "{} min {} sec".format(minutes, seconds)
return f"{minutes} min {seconds} sec"
return "{} sec".format(seconds)
return f"{seconds} sec"
def format_time(total_seconds, force_hours=False):
def format_time(total_seconds: Union[int, float], force_hours: bool = False) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -53,15 +56,10 @@ def format_time(total_seconds, force_hours=False):
return f"{minutes:02}:{seconds:02}"
def read_int(msg, min, max, default=None):
if default:
msg = msg + f" [default {default}]"
msg += ": "
def read_int(msg: str, min: int, max: int, default: Optional[int] = None) -> int:
while True:
try:
val = input(msg)
val = click.prompt(msg, default=default, type=int)
if default and not val:
return default
if min <= int(val) <= max:
@ -70,33 +68,34 @@ def read_int(msg, min, max, default=None):
pass
def slugify(value):
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s_-]', '', value)
value = re.sub(r'[\s_-]+', '_', value)
def slugify(value: str) -> str:
value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r"[^\w\s_-]", "", value)
value = re.sub(r"[\s_-]+", "_", value)
return value.strip("_").lower()
def titlify(value):
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value)
value = re.sub(r'\s+', ' ', value)
def titlify(value: str) -> str:
value = unicodedata.normalize("NFKC", str(value))
value = re.sub(r"[^\w\s\[\]().-]", "", value)
value = re.sub(r"\s+", " ", value)
return value.strip()
VIDEO_PATTERNS = [
r"^(?P<id>\d+)?$",
r"^https://(www.)?twitch.tv/videos/(?P<id>\d+)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/videos/(?P<id>\d+)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/video/(?P<id>\d+)(\?.+)?$",
]
CLIP_PATTERNS = [
r"^(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)$",
r"^https://(www.)?twitch.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips.twitch.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://(www\.|m\.)?twitch\.tv/\w+/clip/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
r"^https://clips\.twitch\.tv/(?P<slug>[A-Za-z0-9]+(?:-[A-Za-z0-9_-]{16})?)(\?.+)?$",
]
def parse_video_identifier(identifier):
def parse_video_identifier(identifier: str) -> Optional[str]:
"""Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier)
@ -104,7 +103,7 @@ def parse_video_identifier(identifier):
return match.group("id")
def parse_clip_identifier(identifier):
def parse_clip_identifier(identifier: str) -> Optional[str]:
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier)