Compare commits

..

41 Commits
types ... foo

Author SHA1 Message Date
a21e965e04 wip 2024-03-29 09:58:36 +01:00
1c1e5955b8 Add video description to metadata 2024-03-29 09:45:04 +01:00
8a097f5f93 Update docs and changelog 2024-03-29 08:49:35 +01:00
7cc67133f5 Add download --concat option 2024-03-29 08:43:32 +01:00
e228971f66 Remove requirements-dev.txt, included in pyproject 2024-03-29 08:43:32 +01:00
96526c2ca5 Display exception errors in console 2024-03-29 08:43:32 +01:00
9bf0f1b425 Use fstrings where possible 2024-03-29 08:43:31 +01:00
77e75b5dad Improve types 2024-03-29 08:43:31 +01:00
cf1693b500 Fix env command 2024-03-29 08:43:31 +01:00
52a7191d1f Add docs and changelog 2024-03-29 08:43:31 +01:00
24eb163d98 Regenerate docs 2024-03-29 08:43:30 +01:00
13b063cbc6 Use backquotes to get code blocks in markdown 2024-03-29 08:43:30 +01:00
43a4a6c4f5 Rework the docs script to work with click 2024-03-29 08:43:30 +01:00
b0c21ac436 Convert to click 2024-03-29 08:43:30 +01:00
229a849f87 Add dependency on click 2024-03-25 09:10:36 +01:00
5589f33142 Add vermin config file 2024-03-25 09:10:36 +01:00
40d52891f8 Don't hardcode version number 2024-03-23 08:55:56 +01:00
c727d65694 Require python 3.8 2024-03-23 08:55:38 +01:00
8592c41a50 Use an abstract base class for TokenBucket 2024-03-23 07:56:50 +01:00
a9aefa871d Migrate to pyproject.toml 2024-03-23 07:31:40 +01:00
65bf6a2b99 Add --dry-run option 2024-03-14 15:36:28 +01:00
9685ea6a36 Add setuptools as dev dependency 2024-01-06 19:48:41 +01:00
73757b557e Bump version, add changelog 2024-01-06 19:43:48 +01:00
ddead05712 update http to https 2024-01-06 19:40:06 +01:00
bcb55be7ad Bump version 2023-05-07 11:50:17 +02:00
a4bdb90faa Update changelog 2023-05-07 11:49:53 +02:00
8db7dd7b8a Updated client id
updated client id to test downloading based on solution here: https://github.com/xenova/chat-downloader/issues/207
2023-05-07 11:45:43 +02:00
5a43a4388c Bump version, add changelog 2023-04-18 07:09:18 +02:00
6f461d889c Update get playlists endpoint URL 2023-04-18 07:05:01 +02:00
7f6e792eae Update changelog, bump version 2022-11-20 15:46:57 +01:00
2402c3bfca Change default value of named parameter...
..."game_ids" in "channel_videos_generator" from "None" to empty list []
2022-11-20 15:35:49 +01:00
533c91d133 Removing positional-only arguments for compatibility with Python 3.7 2022-11-20 15:26:32 +01:00
ddf1b10e56 Bump version 2022-11-20 11:19:14 +01:00
523f6449c3 Update changelog and docs 2022-11-20 11:14:41 +01:00
32cb9b6602 Make styling a bit more consistent 2022-11-20 11:14:15 +01:00
c4cbf588a2 Add --chapter option to download command 2022-11-20 10:44:32 +01:00
35e53ba4fd Add chapters to video info 2022-11-20 09:42:09 +01:00
d505056fee Add endpoint for fetching chapters 2022-11-20 09:41:55 +01:00
8658d0fa24 Change the website from github to bezdomni 2022-09-09 08:33:09 +02:00
33b5cc0a01 Bump version, add changelog 2022-09-09 08:11:46 +02:00
c9ab6237e8 Don't rename the file while it's still open
issue #111
2022-09-09 08:05:03 +02:00
40 changed files with 1290 additions and 1162 deletions

4
.vermin Normal file
View File

@ -0,0 +1,4 @@
[vermin]
only_show_violations = yes
show_tips = no
targets = 3.8

View File

@ -3,6 +3,40 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires python 3.8 or later**
* Migrated to click lib for cli parsing
* Add shell auto completion
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)
* Fix error caused by twitch requiring https for the usher api (thanks
@deanpcmad)
### [2.1.3 (2023-05-07)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.3)
* Replace client ID with one that works for now (thanks @mwhite34)
### [2.1.2 (2023-04-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.2)
* Fix error caused by twitch changing the Usher domain (thanks @adsa95)
### [2.1.1 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.1)
* Fix Python 3.7 compatibility (#117, thanks @eliduvid)
* Fix default value for game_ids (#102, thanks @FunnyPocketBook)
### [2.1.0 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.0)
* Add chapter list to `info` command
* Add `download --chapter` option for downloading a single chapter
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
* Fix an issue where a temp vod file would be renamed while still being open,
which caused an exception on Windows (#111)
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
This release switches from using `requests` to `httpx` for making http requests,

View File

@ -2,9 +2,8 @@
default : clean dist
dist :
python setup.py sdist --formats=gztar,zip
python setup.py bdist_wheel --python-tag=py3
dist:
python -m build
clean :
find . -name "*pyc" | xargs rm -rf $1

View File

@ -17,7 +17,7 @@ Resources
Requirements
------------
* Python 3.7 or later
* Python 3.8 or later
* [ffmpeg](https://ffmpeg.org/download.html), installed and on the system path
Quick start

View File

@ -1,3 +1,45 @@
2.2.0:
date: TBA
changes:
- "**Requires python 3.8 or later**"
- "Migrated to Click library for generating the commandline interface"
- "Add shell auto completion, see: https://twitch-dl.bezdomni.net/shell_completion.html"
- "Add setting defaults via environment variables, see: https://twitch-dl.bezdomni.net/environment_variables.html"
- "Add `download --concat` option to avoid using ffmeg for joinig vods and concat them instead. This will produce a `.ts` file by default."
- "Add video description to metadata (#129)"
2.1.4:
date: 2024-01-06
changes:
- "Fix error caused by twitch requiring https for the usher api (thanks @deanpcmad)"
2.1.3:
date: 2023-05-07
changes:
- "Replace client ID with one that works for now (thanks @mwhite34)"
2.1.2:
date: 2023-04-18
changes:
- "Fix error caused by twitch changing the Usher domain (thanks @adsa95)"
2.1.1:
date: 2022-11-20
changes:
- "Fix Python 3.7 compatibility (#117, thanks @eliduvid)"
- "Fix default value for game_ids (#102, thanks @FunnyPocketBook)"
2.1.0:
date: 2022-11-20
changes:
- "Add chapter list to `info` command"
- "Add `download --chapter` option for downloading a single chapter"
2.0.1:
date: 2022-09-09
changes:
- "Fix an issue where a temp vod file would be renamed while still being open, which caused an exception on Windows (#111)"
2.0.0:
date: 2022-08-18
description: |

View File

@ -9,6 +9,8 @@
- [twitch-dl clips](commands/clips.md)
- [twitch-dl info](commands/info.md)
- [twitch-dl env](commands/env.md)
- [Environemnt variables](environment_variables.md)
- [Shell completion](shell_completion.md)
- [Advanced](advanced.md)
[License](license.md)

View File

@ -3,6 +3,40 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires python 3.8 or later**
* Migrated to click lib for cli parsing
* Add shell auto completion
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)
* Fix error caused by twitch requiring https for the usher api (thanks
@deanpcmad)
### [2.1.3 (2023-05-07)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.3)
* Replace client ID with one that works for now (thanks @mwhite34)
### [2.1.2 (2023-04-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.2)
* Fix error caused by twitch changing the Usher domain (thanks @adsa95)
### [2.1.1 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.1)
* Fix Python 3.7 compatibility (#117, thanks @eliduvid)
* Fix default value for game_ids (#102, thanks @FunnyPocketBook)
### [2.1.0 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.0)
* Add chapter list to `info` command
* Add `download --chapter` option for downloading a single chapter
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
* Fix an issue where a temp vod file would be renamed while still being open,
which caused an exception on Windows (#111)
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
This release switches from using `requests` to `httpx` for making http requests,

View File

@ -1,64 +1,47 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl clips
List or download clips for a channel.
List or download clips for given CHANNEL_NAME.
### USAGE
```
twitch-dl clips <channel_name> [FLAGS] [OPTIONS]
twitch-dl clips [OPTIONS] CHANNEL_NAME
```
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">&lt;channel_name&gt;</td>
<td>Name of the channel to list clips for.</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all videos, overrides --limit</td>
</tr>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON. Ignores <code>--pager</code>.</td>
</tr>
<tr>
<td class="code">-d, --download</td>
<td>Download all videos in given period (in source quality)</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-l, --limit</td>
<td>Number of videos to fetch (default 10, max 100)</td>
<td class="code">-a, --all</td>
<td>Fetch all clips, overrides --limit</td>
</tr>
<tr>
<td class="code">-P, --period</td>
<td>Period from which to return clips. Defaults to <code>all_time</code>. Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>.</td>
<td class="code">-d, --download</td>
<td>Download clips in given period (in source quality)</td>
</tr>
<tr>
<td class="code">-p, --pager</td>
<td class="code">-l, --limit INTEGER</td>
<td>Number of clips to fetch [max: 100] [default: <code>10</code>]</td>
</tr>
<tr>
<td class="code">-p, --pager INTEGER</td>
<td>Number of clips to show per page. Disabled by default.</td>
</tr>
<tr>
<td class="code">-P, --period TEXT</td>
<td>Period from which to return clips Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>. [default: <code>all_time</code>]</td>
</tr>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -3,27 +3,49 @@
Download videos or clips.
Pass one or more video ID, clip slug or Twitch URL to download.
### USAGE
```
twitch-dl download <videos> [FLAGS] [OPTIONS]
twitch-dl download [OPTIONS] [IDS]...
```
### ARGUMENTS
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">&lt;videos&gt;</td>
<td>One or more video ID, clip slug or twitch URL to download.</td>
<td class="code">-a, --auth-token TEXT</td>
<td>Authentication token, passed to Twitch to access subscriber only VODs. Can be copied from the <code>auth_token</code> cookie in any browser logged in on Twitch.</td>
</tr>
</tbody>
</table>
### FLAGS
<tr>
<td class="code">-c, --chapter INTEGER</td>
<td>Download a single chapter of the video. Specify the chapter number or use the flag without a number to display a chapter select prompt.</td>
</tr>
<tr>
<td class="code">--concat</td>
<td>Do not use ffmpeg to join files, concat them instead</td>
</tr>
<tr>
<td class="code">-d, --dry-run</td>
<td>Simulate the download provcess without actually downloading any files.</td>
</tr>
<tr>
<td class="code">-e, --end TEXT</td>
<td>Download video up to this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-f, --format TEXT</td>
<td>Video format to convert into, passed to ffmpeg as the target file extension. Defaults to <code>mkv</code>. If <code>--concat</code> is passed, defaults to <code>ts</code>.</td>
</tr>
<table>
<tbody>
<tr>
<td class="code">-k, --keep</td>
<td>Don&#x27;t delete downloaded VODs and playlists after merging.</td>
@ -38,51 +60,30 @@ twitch-dl download <videos> [FLAGS] [OPTIONS]
<td class="code">--overwrite</td>
<td>Overwrite the target file if it already exists without prompting.</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-w, --max-workers</td>
<td>Number of workers for downloading vods concurrently (default 5)</td>
<td class="code">-o, --output TEXT</td>
<td>Output file name template. See docs for details. [default: <code>{date}_{id}_{channel_login}_{title_slug}.{format}</code>]</td>
</tr>
<tr>
<td class="code">-s, --start</td>
<td class="code">-q, --quality TEXT</td>
<td>Video quality, e.g. <code>720p</code>. Set to <code>source</code> to get best quality.</td>
</tr>
<tr>
<td class="code">-r, --rate-limit TEXT</td>
<td>Limit the maximum download speed in bytes per second. Use &#x27;k&#x27; and &#x27;m&#x27; suffixes for kbps and mbps.</td>
</tr>
<tr>
<td class="code">-s, --start TEXT</td>
<td>Download video from this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-e, --end</td>
<td>Download video up to this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-f, --format</td>
<td>Video format to convert into, passed to ffmpeg as the target file extension. Defaults to <code>mkv</code>.</td>
</tr>
<tr>
<td class="code">-q, --quality</td>
<td>Video quality, e.g. 720p. Set to &#x27;source&#x27; to get best quality.</td>
</tr>
<tr>
<td class="code">-a, --auth-token</td>
<td>Authentication token, passed to Twitch to access subscriber only VODs. Can be copied from the &#x27;auth_token&#x27; cookie in any browser logged in on Twitch.</td>
</tr>
<tr>
<td class="code">-o, --output</td>
<td>Output file name template. See docs for details.</td>
</tr>
<tr>
<td class="code">-r, --rate-limit</td>
<td>Limit the maximum download speed in bytes per second. Use &#x27;k&#x27; and &#x27;m&#x27; suffixes for kbps and mbps.</td>
<td class="code">-w, --max-workers INTEGER</td>
<td>Number of workers for downloading vods concurrently [default: <code>5</code>]</td>
</tr>
</tbody>
</table>

View File

@ -6,7 +6,7 @@ Print environment information for inclusion in bug reports.
### USAGE
```
twitch-dl env
twitch-dl env [OPTIONS]
```
<!-- ------------------- generated docs end ------------------- -->

View File

@ -6,27 +6,16 @@ Print information for a given Twitch URL, video ID or clip slug.
### USAGE
```
twitch-dl info <video> [FLAGS]
twitch-dl info [OPTIONS] ID
```
### ARGUMENTS
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">&lt;video&gt;</td>
<td>Video ID, clip slug, or URL</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON</td>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -1,73 +1,56 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl videos
List videos for a channel.
List or download clips for given CHANNEL_NAME.
### USAGE
```
twitch-dl videos <channel_name> [FLAGS] [OPTIONS]
twitch-dl videos [OPTIONS] CHANNEL_NAME
```
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">&lt;channel_name&gt;</td>
<td>Name of the channel to list videos for.</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all videos, overrides --limit</td>
</tr>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON. Ignores <code>--pager</code>.</td>
</tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show videos in compact mode, one line per video</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-g, --game</td>
<td class="code">-a, --all</td>
<td>Fetch all clips, overrides --limit</td>
</tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show videos in compact mode, one line per video</td>
</tr>
<tr>
<td class="code">-l, --limit INTEGER</td>
<td>Number of videos to fetch. Defaults to 40 in compact mode, 10 otherwise.</td>
</tr>
<tr>
<td class="code">-p, --pager INTEGER</td>
<td>Number of videos to show per page. Disabled by default.</td>
</tr>
<tr>
<td class="code">-g, --game TEXT</td>
<td>Show videos of given game (can be given multiple times)</td>
</tr>
<tr>
<td class="code">-l, --limit</td>
<td>Number of videos to fetch. Defaults to 40 in copmpact mode, 10 otherwise.</td>
<td class="code">-s, --sort TEXT</td>
<td>Sorting order of videos Possible values: <code>views</code>, <code>time</code>. [default: <code>time</code>]</td>
</tr>
<tr>
<td class="code">-s, --sort</td>
<td>Sorting order of videos. Defaults to <code>time</code>. Possible values: <code>views</code>, <code>time</code>.</td>
<td class="code">-t, --type TEXT</td>
<td>Broadcast type Possible values: <code>archive</code>, <code>highlight</code>, <code>upload</code>. [default: <code>archive</code>]</td>
</tr>
<tr>
<td class="code">-t, --type</td>
<td>Broadcast type. Defaults to <code>archive</code>. Possible values: <code>archive</code>, <code>highlight</code>, <code>upload</code>.</td>
</tr>
<tr>
<td class="code">-p, --pager</td>
<td>Print videos in pages. Ignores <code>--limit</code>. Defaults to 10.</td>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -0,0 +1,15 @@
# Environment variables
> Introduced in twitch-dl 2.2.0
twitch-dl allows setting defaults for parameters via environment variables.
Environment variables should be named `TWITCH_DL_<COMMAND_NAME>_<OPTION_NAME>`.
For example, when invoking `twitch-dl download`, if you always set `--quality
source` you can set the following environment variable to make this the
default:
```
TWITCH_DL_DOWNLOAD_QUALITY="source"
```

View File

@ -1,6 +1,6 @@
# Installation
twitch-dl requires **Python 3.7** or later.
twitch-dl requires **Python 3.8** or later.
## Prerequisite: FFmpeg

31
docs/shell_completion.md Normal file
View File

@ -0,0 +1,31 @@
# Shell completion
> Introduced in twitch-dl 2.2.0
twitch-dl uses [Click shell completion](https://click.palletsprojects.com/en/8.1.x/shell-completion/) which works on Bash, Fish and Zsh.
To enable completion, twitch-dl must be [installed](./installation.html) as a command and available by ivoking `twitch-dl`. Then follow the instructions for your shell.
**Bash**
Add to `~/.bashrc`:
```
eval "$(_TWITCH_DL_COMPLETE=bash_source twitch-dl)"
```
**Fish**
Add to `~/.config/fish/completions/twitch-dl.fish`:
```
_TWITCH_DL_COMPLETE=fish_source twitch-dl | source
```
**Zsh**
Add to `~/.zshrc`:
```
eval "$(_TWITCH_DL_COMPLETE=zsh_source twitch-dl)"
```

58
pyproject.toml Normal file
View File

@ -0,0 +1,58 @@
[build-system]
requires = ["setuptools>=64", "setuptools_scm>=8"]
build-backend = "setuptools.build_meta"
[project]
name = "twitch-dl"
authors = [{ name="Ivan Habunek", email="ivan@habunek.com" }]
description = "Quickly download videos from twitch.tv from the comort of your terminal emulator"
keywords=["twitch", "vod", "video", "download"]
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.8"
dynamic = ["version"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
]
dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=1.0.0,<4.0.0",
]
[tool.setuptools]
packages = [
"twitchdl",
"twitchdl.commands",
]
[tool.setuptools_scm]
[project.optional-dependencies]
dev = [
"build",
"pytest",
"pyyaml",
"setuptools",
"twine",
"vermin",
]
[project.urls]
"Homepage" = "https://twitch-dl.bezdomni.net/"
"Source" = "https://github.com/ihabunek/twitch-dl"
[project.scripts]
twitch-dl = "twitchdl.cli:cli"
[tool.pyright]
include = ["twitchdl"]
typeCheckingMode = "strict"
[tool.ruff]
line-length = 100

View File

@ -1,5 +0,0 @@
pytest
pytest-cov
twine
wheel
pyyaml

View File

@ -4,13 +4,16 @@
Auto-generates documentation from command defs in console.py.
"""
import click
import html
import os
import re
import shutil
import textwrap
from twitchdl.console import COMMANDS
from click import Command
from twitchdl.cli import cli
START_MARKER = "<!-- ------------------- generated docs start ------------------- -->"
@ -19,8 +22,11 @@ END_MARKER = "<!-- ------------------- generated docs end ------------------- --
def main():
update_changelog()
for command in COMMANDS:
update_docs(command)
parent_ctx = click.Context(cli, info_name="twitch-dl")
for name, command in cli.commands.items():
ctx = click.Context(cli, info_name=name, parent=parent_ctx)
update_docs(command, ctx)
def update_changelog():
@ -31,9 +37,9 @@ def update_changelog():
shutil.copy(source, target)
def update_docs(command):
def update_docs(command: Command, ctx: click.Context):
path = os.path.join("docs", "commands", f"{command.name}.md")
content = render_command(command)
content = render_command(command, ctx)
if not os.path.exists(path):
print(f"Creating: {path}")
@ -45,87 +51,29 @@ def update_docs(command):
write(path, content)
def render_command(command):
def render_command(command: Command, ctx: click.Context):
content = START_MARKER
content += f"\n# twitch-dl {command.name}\n\n"
content += command.description + "\n\n"
content += render_usage(command)
content += render_arguments(command)
content += render_flags(command)
content += render_options(command)
if command.help:
content += command.help + "\n\n"
content += render_usage(ctx, command)
content += render_options(ctx, command)
return content
def render_usage(command):
arguments = get_arguments(command)
arguments = " ".join(f"<{name}>" for [name, _] in arguments)
flags = get_flags(command)
options = get_options(command)
def render_usage(ctx: click.Context, command: Command):
content = "### USAGE\n\n"
content += "```\n"
content += f"twitch-dl {command.name} {arguments}"
if flags:
content += " [FLAGS]"
if options:
content += " [OPTIONS]"
content += command.get_usage(ctx).replace("Usage: ", "")
content += "\n```\n\n"
return content
def render_arguments(command):
arguments = get_arguments(command)
if not arguments:
return ""
content = "### ARGUMENTS\n\n"
content += "<table>\n"
content += "<tbody>"
for [name, params] in arguments:
content += textwrap.dedent(f"""
<tr>
<td class="code">&lt;{escape(name)}&gt;</td>
<td>{escape(params['help'])}</td>
</tr>
""")
content += "</tbody>\n"
content += "</table>\n\n"
return content
def render_flags(command):
flags = get_flags(command)
if not flags:
return ""
content = "### FLAGS\n\n"
content += "<table>\n"
content += "<tbody>"
for [names, params] in flags:
names = ", ".join(f"{name}" for name in names)
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(names)}</td>
<td>{escape(params['help'])}</td>
</tr>
""")
content += "</tbody>\n"
content += "</table>\n\n"
return content
def render_options(command):
options = get_options(command)
def render_options(ctx, command: Command):
options = list(get_options(command))
if not options:
return ""
@ -134,12 +82,11 @@ def render_options(command):
content += "<table>\n"
content += "<tbody>"
for [names, params] in options:
names = ", ".join(f"{name}" for name in names)
for opts, help in options:
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(names)}</td>
<td>{escape(params['help'])}{choices(params)}</td>
<td class="code">{escape(opts)}</td>
<td>{escape(help)}</td>
</tr>
""")
content += "</tbody>\n"
@ -148,37 +95,39 @@ def render_options(command):
return content
def choices(params):
if "choices" in params:
choices = ", ".join(code(c) for c in params["choices"])
def get_options(command: Command):
for option in command.params:
if isinstance(option, click.Option):
opts = ", ".join(option.opts)
opts += option_type(option)
help = option.help or ""
help = re.sub(r"\s+", " ", help)
help += choices(option)
if option.default:
help += f" [default: `{option.default}`]"
yield opts, help
def option_type(option: click.Option):
match option.type:
case click.types.StringParamType():
return " TEXT"
case click.types.Choice():
return " TEXT"
case click.types.IntParamType():
return " INTEGER"
case _:
return ""
def choices(option: click.Option):
if isinstance(option.type, click.Choice):
choices = ", ".join(f"`{c}`" for c in option.type.choices)
return f" Possible values: {choices}."
return ""
def get_arguments(command):
return [
[names[0], options]
for names, options in command.arguments
if len(names) == 1 and not names[0].startswith("-")
]
def get_flags(command):
return [
[names, options]
for names, options in command.arguments
if names[0].startswith("-") and "type" not in options
]
def get_options(command):
return [
[names, options]
for names, options in command.arguments
if names[0].startswith("-") and "type" in options
]
def read(path):
with open(path, "r") as f:
return f.read()
@ -189,10 +138,6 @@ def write(path, content):
return f.write(content)
def code(string):
return f"<code>{string}</code>"
def escape(text: str):
text = html.escape(text)
text = re.sub(r"`([\S]+)`", "<code>\\1</code>", text)

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
long_description = """
Quickly download videos from twitch.tv.
Works simliarly to youtube-dl but downloads multiple VODs in parallel which
makes it faster.
"""
setup(
name="twitch-dl",
version="2.0.0",
description="Twitch downloader",
long_description=long_description.strip(),
author="Ivan Habunek",
author_email="ivan@habunek.com",
url="https://github.com/ihabunek/twitch-dl/",
project_urls={
"Documentation": "https://twitch-dl.bezdomni.net/"
},
keywords="twitch vod video download",
license="GPLv3",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python :: 3",
],
packages=find_packages(),
python_requires=">=3.7",
install_requires=[
"m3u8>=1.0.0,<4.0.0",
"httpx>=0.17.0,<1.0.0",
],
entry_points={
"console_scripts": [
"twitch-dl=twitchdl.console:main",
],
}
)

View File

@ -6,19 +6,18 @@ import httpx
import m3u8
from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
from twitchdl.models import Game, VideosPage
TEST_CHANNEL = "bananasaurus_rex"
def test_get_videos():
page = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert isinstance(page, VideosPage)
assert len(page.videos) > 0
videos = twitch.get_channel_videos(TEST_CHANNEL, 3, "time")
assert videos["pageInfo"]
assert len(videos["edges"]) > 0
video_id = page.videos[0].id
video_id = videos["edges"][0]["node"]["id"]
video = twitch.get_video(video_id)
assert video and video.id == video_id
assert video["id"] == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
@ -27,7 +26,7 @@ def test_get_videos():
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
_, _, url = next(_parse_playlists(playlists))
name, res, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")
@ -37,22 +36,15 @@ def test_get_videos():
def test_get_clips():
page = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
assert len(page.clips) > 0
"""
This test depends on the channel having some videos published.
"""
clips = twitch.get_channel_clips(TEST_CHANNEL, "all_time", 3)
assert clips["pageInfo"]
assert len(clips["edges"]) > 0
slug = page.clips[0].slug
slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(slug)
assert clip.slug == slug
assert clip["slug"] == slug
assert get_clip_authenticated_url(slug, "source").startswith("https")
def test_get_game():
game = twitch.find_game("The Witness")
assert isinstance(game, Game)
assert game.id == "17324"
assert game.name == "The Witness"
assert game.description
game = twitch.find_game("Does Not Exist Hopefully")
assert game is None
assert get_clip_authenticated_url(slug, "source")

View File

@ -1,5 +0,0 @@
from twitchdl.twitch import channel_clips_generator
# def test_clips_generator():
# channel_clips_generator("foo", "bar", 100)

View File

@ -1,3 +1,8 @@
__version__ = "2.0.0"
from importlib import metadata
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
try:
__version__ = metadata.version("twitch-dl")
except metadata.PackageNotFoundError:
__version__ = "0.0.0"
CLIENT_ID = "kd1unb4b3q4t58fwlpcbzcbnm76a8fp"

View File

@ -1,3 +1,3 @@
from twitchdl.console import main
from twitchdl.cli import cli
main()
cli()

391
twitchdl/cli.py Normal file
View File

@ -0,0 +1,391 @@
import click
import logging
import platform
import re
import sys
from twitchdl import __version__
from twitchdl.commands.clips import ClipsPeriod
from twitchdl.entities import DownloadOptions
# Tweak the Click context
# https://click.palletsprojects.com/en/8.1.x/api/#context
CONTEXT = dict(
# Enable using environment variables to set options
auto_envvar_prefix="TWITCH_DL",
# Add shorthand -h for invoking help
help_option_names=["-h", "--help"],
# Always show default values for options
show_default=True,
# Make help a bit wider
max_content_width=100,
)
json_option = click.option(
"--json",
is_flag=True,
default=False,
help="Print data as JSON rather than human readable text",
)
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: int | None):
if value is not None and value <= 0:
raise click.BadParameter("must be greater than 0")
return value
def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
if not value:
return None
parts = [int(p) for p in value.split(":")]
if not 2 <= len(parts) <= 3:
raise click.BadParameter("invalid time")
hours = parts[0]
minutes = parts[1]
seconds = parts[2] if len(parts) > 2 else 0
if hours < 0 or not (0 <= minutes <= 59) or not (0 <= seconds <= 59):
raise click.BadParameter("invalid time")
return hours * 3600 + minutes * 60 + seconds
def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None:
if not value:
return None
match = re.search(r"^([0-9]+)(k|m|)$", value, flags=re.IGNORECASE)
if not match:
raise click.BadParameter("must be an integer, followed by an optional 'k' or 'm'")
amount = int(match.group(1))
unit = match.group(2)
if unit == "k":
return amount * 1024
if unit == "m":
return amount * 1024 * 1024
return amount
@click.group(context_settings=CONTEXT)
@click.option("--debug/--no-debug", default=False, help="Log debug info to stderr")
@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output")
@click.version_option(package_name="twitch-dl")
@click.pass_context
def cli(ctx: click.Context, color: bool, debug: bool):
"""twitch-dl - twitch.tv downloader
https://toot.bezdomni.net/
"""
ctx.color = color
if debug:
logging.basicConfig(level=logging.INFO)
@cli.command()
@click.argument("channel_name")
@click.option(
"-a",
"--all",
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-d",
"--download",
help="Download clips in given period (in source quality)",
is_flag=True,
)
@click.option(
"-l",
"--limit",
help="Number of clips to fetch [max: 100]",
type=int,
default=10,
callback=validate_positive,
)
@click.option(
"-p",
"--pager",
help="Number of clips to show per page. Disabled by default.",
type=int,
callback=validate_positive,
is_flag=False,
flag_value=10,
)
@click.option(
"-P",
"--period",
help="Period from which to return clips",
default="all_time",
type=click.Choice(["last_day", "last_week", "last_month", "all_time"]),
)
@json_option
def clips(
channel_name: str,
all: bool,
download: bool,
json: bool,
limit: int,
pager: int | None,
period: ClipsPeriod,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.clips import clips
clips(
channel_name,
all=all,
download=download,
json=json,
limit=limit,
pager=pager,
period=period,
)
@cli.command()
@click.argument("ids", nargs=-1)
@click.option(
"-a",
"--auth-token",
help="""Authentication token, passed to Twitch to access subscriber only
VODs. Can be copied from the `auth_token` cookie in any browser logged
in on Twitch.""",
)
@click.option(
"-c",
"--chapter",
help="""Download a single chapter of the video. Specify the chapter number
or use the flag without a number to display a chapter select prompt.
""",
type=int,
is_flag=False,
flag_value=0,
)
@click.option(
"--concat",
is_flag=True,
help="""Do not use ffmpeg to join files, concat them instead. This will
produce a .ts file by default.""",
)
@click.option(
"-d",
"--dry-run",
help="Simulate the download provcess without actually downloading any files.",
is_flag=True,
)
@click.option(
"-e",
"--end",
help="Download video up to this time (hh:mm or hh:mm:ss)",
callback=validate_time,
)
@click.option(
"-f",
"--format",
help="""Video format to convert into, passed to ffmpeg as the target file
extension. Defaults to `mkv`. If `--concat` is passed, defaults to
`ts`.""",
)
@click.option(
"-k",
"--keep",
help="Don't delete downloaded VODs and playlists after merging.",
is_flag=True,
)
@click.option(
"--no-join",
help="Don't run ffmpeg to join the downloaded vods, implies --keep.",
is_flag=True,
)
@click.option(
"--overwrite",
help="Overwrite the target file if it already exists without prompting.",
is_flag=True,
)
@click.option(
"-o",
"--output",
help="Output file name template. See docs for details.",
default="{date}_{id}_{channel_login}_{title_slug}.{format}",
)
@click.option(
"-q",
"--quality",
help="Video quality, e.g. `720p`. Set to `source` to get best quality.",
)
@click.option(
"-r",
"--rate-limit",
help="""Limit the maximum download speed in bytes per second. Use 'k' and
'm' suffixes for kbps and mbps.""",
callback=validate_rate,
)
@click.option(
"-s",
"--start",
help="Download video from this time (hh:mm or hh:mm:ss)",
callback=validate_time,
)
@click.option(
"-w",
"--max-workers",
help="Number of workers for downloading vods concurrently",
type=int,
default=5,
)
def download(
ids: tuple[str, ...],
auth_token: str | None,
chapter: int | None,
concat: bool,
dry_run: bool,
end: int | None,
format: str,
keep: bool,
no_join: bool,
overwrite: bool,
output: str,
quality: str | None,
rate_limit: str | None,
start: int | None,
max_workers: int,
):
"""Download videos or clips.
Pass one or more video ID, clip slug or Twitch URL to download.
"""
from twitchdl.commands.download import download
if not format:
format = "ts" if concat else "mkv"
options = DownloadOptions(
auth_token=auth_token,
chapter=chapter,
concat=concat,
dry_run=dry_run,
end=end,
format=format,
keep=keep,
no_join=no_join,
overwrite=overwrite,
output=output,
quality=quality,
rate_limit=rate_limit,
start=start,
max_workers=max_workers,
)
download(list(ids), options)
@cli.command()
def env():
"""Print environment information for inclusion in bug reports."""
click.echo(f"twitch-dl {__version__}")
click.echo(f"Python {sys.version}")
click.echo(f"Platform: {platform.platform()}")
@cli.command()
@click.argument("id")
@json_option
def info(id: str, json: bool):
"""Print information for a given Twitch URL, video ID or clip slug."""
from twitchdl.commands.info import info
info(id, json=json)
@cli.command()
@click.argument("channel_name")
@click.option(
"-a",
"--all",
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-c",
"--compact",
help="Show videos in compact mode, one line per video",
is_flag=True,
)
@click.option(
"-l",
"--limit",
help="Number of videos to fetch. Defaults to 40 in compact mode, 10 otherwise.",
type=int,
callback=validate_positive,
)
@click.option(
"-p",
"--pager",
help="Number of videos to show per page. Disabled by default.",
type=int,
callback=validate_positive,
is_flag=False,
flag_value=10,
)
@click.option(
"-g",
"--game",
"games_tuple",
help="Show videos of given game (can be given multiple times)",
multiple=True,
)
@click.option(
"-s",
"--sort",
help="Sorting order of videos",
default="time",
type=click.Choice(["views", "time"]),
)
@click.option(
"-t",
"--type",
help="Broadcast type",
default="archive",
type=click.Choice(["archive", "highlight", "upload"]),
)
@json_option
def videos(
channel_name: str,
all: bool,
compact: bool,
games_tuple: tuple[str, ...],
json: bool,
limit: int | None,
pager: int | None,
sort: str,
type: str,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.videos import videos
# Click provides a tuple, make it a list instead
games = list(games_tuple)
videos(
channel_name,
all=all,
compact=compact,
games=games,
json=json,
limit=limit,
pager=pager,
sort=sort,
type=type,
)

View File

@ -1,13 +0,0 @@
from .clips import clips
from .download import download
from .env import env
from .info import info
from .videos import videos
__all__ = [
clips,
download,
env,
info,
videos,
]

View File

@ -1,32 +1,43 @@
import re
import sys
from typing import Literal
from itertools import islice
from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.models import Clip, ClipGenerator
from twitchdl.output import print_out, print_clip, print_json
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
def clips(args):
def clips(
channel_name: str,
*,
all: bool = False,
download: bool = False,
json: bool = False,
limit: int = 10,
pager: int | None = None,
period: ClipsPeriod = "all_time",
):
# Ignore --limit if --pager or --all are given
limit = sys.maxsize if args.all or args.pager else args.limit
limit = sys.maxsize if all or pager else limit
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
generator = twitch.channel_clips_generator(channel_name, period, limit)
if args.json:
return print_json([c.raw for c in generator])
if json:
return print_json(list(generator))
if args.download:
if download:
return _download_clips(generator)
if args.pager:
return _print_paged(generator, args.pager)
if pager:
return _print_paged(generator, pager)
return _print_all(generator, args)
return _print_all(generator, all)
def _continue():
@ -40,53 +51,52 @@ def _continue():
return True
def _target_filename(clip: Clip):
url = clip.video_qualities[0].source_url
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip.created_at)
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
if not match:
raise ValueError(f"Invalid date: {clip.created_at}")
raise ValueError(f"Failed parsing date from: {clip['createdAt']}")
date = "".join(match.groups())
name = "_".join([
date,
clip.id,
clip.broadcaster.login,
utils.slugify(clip.title),
clip["id"],
clip["broadcaster"]["login"],
utils.slugify(clip["title"]),
])
return "{}.{}".format(name, ext)
return f"{name}.{ext}"
def _download_clips(clips: ClipGenerator):
for clip in clips:
def _download_clips(generator):
for clip in generator:
target = _target_filename(clip)
if path.exists(target):
print_out("Already downloaded: <green>{}</green>".format(target))
print_out(f"Already downloaded: <green>{target}</green>")
else:
url = get_clip_authenticated_url(clip.slug, "source")
print_out("Downloading: <yellow>{}</yellow>".format(target))
url = get_clip_authenticated_url(clip["slug"], "source")
print_out(f"Downloading: <yellow>{target}</yellow>")
download_file(url, target)
def _print_all(clips: ClipGenerator, args):
for clip in clips:
def _print_all(generator, all: bool):
for clip in generator:
print_out()
print_clip(clip)
if not args.all:
if not all:
print_out(
"\n<dim>There may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.</dim>"
)
def _print_paged(clips: ClipGenerator, page_size: int):
iterator = iter(clips)
def _print_paged(generator, page_size):
iterator = iter(generator)
page = list(islice(iterator, page_size))
first = 1
@ -103,7 +113,7 @@ def _print_paged(clips: ClipGenerator, page_size: int):
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Clips {}-{}</yellow>".format(first, last))
print_out(f"<yellow>Clips {first}-{last}</yellow>")
first = first + len(page)
last = first + 1

View File

@ -1,4 +1,5 @@
import asyncio
import platform
import httpx
import m3u8
import os
@ -14,12 +15,29 @@ from urllib.parse import urlparse, urlencode
from twitchdl import twitch, utils
from twitchdl.download import download_file
from twitchdl.entities import Data, DownloadOptions
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.models import Clip, Video
from twitchdl.output import print_out
def download(ids: list[str], args: DownloadOptions):
for video_id in ids:
download_one(video_id, args)
def download_one(video: str, args: DownloadOptions):
video_id = utils.parse_video_identifier(video)
if video_id:
return _download_video(video_id, args)
clip_slug = utils.parse_clip_identifier(video)
if clip_slug:
return _download_clip(clip_slug, args)
raise ConsoleError(f"Invalid input: {video}")
def _parse_playlists(playlists_m3u8):
playlists = m3u8.loads(playlists_m3u8)
@ -44,7 +62,7 @@ def _get_playlist_by_name(playlists, quality):
return uri
available = ", ".join([name for (name, _, _) in playlists])
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
@ -52,90 +70,107 @@ def _select_playlist_interactive(playlists):
print_out("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
print_out("{}) {} [{}]".format(n + 1, name, resolution))
print_out(f"{n + 1}) <b>{name}</b> <dim>({resolution})</dim>")
else:
print_out("{}) {}".format(n + 1, name))
print_out(f"{n + 1}) <b>{name}</b>")
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
def _join_vods(playlist_path: str, target: str, overwrite: bool, video):
description = video["description"] or ""
description = description.strip()
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", f"artist={video['creator']['displayName']}",
"-metadata", f"title={video['title']}",
"-metadata", f"description={description}",
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
"file:{}".format(target),
f"file:{target}",
]
if overwrite:
command.append("-y")
print_out("<dim>{}</dim>".format(" ".join(command)))
print_out(f"<dim>{' '.join(command)}</dim>")
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: list[str], target: str):
tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + vod_paths
def _video_target_filename(video: Video, args) -> str:
date, time = video.published_at.split("T")
game = video.game.name if video.game else "Unknown"
with open(target, "wb") as target_file:
result = subprocess.run(command, stdout=target_file)
if result.returncode != 0:
raise ConsoleError(f"Joining files failed: {result.stderr}")
subs = {
"channel": video.creator.display_name,
"channel_login": video.creator.login,
def get_video_substitutions(video: Data, format: str) -> Data:
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
return {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": video.published_at,
"format": args.format,
"datetime": video["publishedAt"],
"format": format,
"game": game,
"game_slug": utils.slugify(game),
"id": video.id,
"id": video["id"],
"time": time,
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def _video_target_filename(video: Data, args: DownloadOptions):
subs = get_video_substitutions(video, args.format)
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _clip_target_filename(clip: Clip, args) -> str:
date, time = clip.created_at.split("T")
game = clip.game.name if clip.game else "Unknown"
def _clip_target_filename(clip, args: DownloadOptions):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
url = clip.video_qualities[0].source_url
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
subs = {
"channel": clip.broadcaster.display_name,
"channel_login": clip.broadcaster.login,
"channel": clip["broadcaster"]["displayName"],
"channel_login": clip["broadcaster"]["login"],
"date": date,
"datetime": clip.created_at,
"datetime": clip["createdAt"],
"format": ext,
"game": game,
"game_slug": utils.slugify(game),
"id": clip.id,
"slug": clip.slug,
"id": clip["id"],
"slug": clip["slug"],
"time": time,
"title": utils.titlify(clip.title),
"title_slug": utils.slugify(clip.title),
"title": utils.titlify(clip["title"]),
"title_slug": utils.slugify(clip["title"]),
}
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
@ -166,24 +201,7 @@ def _crete_temp_dir(base_uri: str) -> str:
return str(temp_dir)
def download(args):
for video_id in args.videos:
download_one(video_id, args)
def download_one(video: str, args):
video_id = utils.parse_video_identifier(video)
if video_id:
return _download_video(video_id, args)
clip_slug = utils.parse_clip_identifier(video)
if clip_slug:
return _download_clip(clip_slug, args)
raise ConsoleError("Invalid input: {}".format(video))
def _get_clip_url(clip, quality) -> str:
def _get_clip_url(clip, quality):
qualities = clip["videoQualities"]
# Quality given as an argument
@ -197,13 +215,13 @@ def _get_clip_url(clip, quality) -> str:
return q["sourceURL"]
available = ", ".join([str(q["quality"]) for q in qualities])
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise ConsoleError(msg)
# Ask user to select quality
print_out("\nAvailable qualities:")
for n, q in enumerate(qualities):
print_out("{}) {} [{} fps]".format(n + 1, q["quality"], q["frameRate"]))
print_out(f"{n + 1}) {q['quality']} [{q['frameRate']} fps]")
print_out()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
@ -211,12 +229,12 @@ def _get_clip_url(clip, quality) -> str:
return selected_quality["sourceURL"]
def get_clip_authenticated_url(slug: str, quality: str) -> str:
def get_clip_authenticated_url(slug, quality):
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_clip_access_token(slug)
if not access_token:
raise ConsoleError("Access token not found for slug '{}'".format(slug))
raise ConsoleError(f"Access token not found for slug '{slug}'")
url = _get_clip_url(access_token, quality)
@ -225,26 +243,29 @@ def get_clip_authenticated_url(slug: str, quality: str) -> str:
"token": access_token["playbackAccessToken"]["value"],
})
return "{}?{}".format(url, query)
return f"{url}?{query}"
def _download_clip(slug: str, args):
def _download_clip(slug: str, args: DownloadOptions) -> None:
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip.game.name if clip.game else "Unknown"
if not clip:
raise ConsoleError("Clip '{}' not found".format(slug))
raise ConsoleError(f"Clip '{slug}' not found")
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip.title,
clip.broadcaster.display_name,
game,
utils.format_duration(clip.duration_seconds)
))
title = clip["title"]
user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"])
print_out(
f"Found: <green>{title}</green> by <yellow>{user}</yellow>, "+
f"playing <blue>{game}</blue> ({duration})"
)
target = _clip_target_filename(clip, args)
print_out("Target: <blue>{}</blue>".format(target))
print_out(f"Target: <blue>{target}</blue>")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
@ -253,15 +274,17 @@ def _download_clip(slug: str, args):
args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality)
print_out("<dim>Selected URL: {}</dim>".format(url))
print_out(f"<dim>Selected URL: {url}</dim>")
print_out("<dim>Downloading clip...</dim>")
download_file(url, target)
print_out("Downloaded: <blue>{}</blue>".format(target))
if (args.dry_run is False):
download_file(url, target)
print_out(f"Downloaded: <blue>{target}</blue>")
def _download_video(video_id, args) -> None:
def _download_video(video_id, args: DownloadOptions) -> None:
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
@ -269,13 +292,14 @@ def _download_video(video_id, args) -> None:
video = twitch.get_video(video_id)
if not video:
raise ConsoleError("Video {} not found".format(video_id))
raise ConsoleError(f"Video {video_id} not found")
creator = f" by <yellow>{video.creator.display_name}" if video.creator else ""
print_out(f"Found: <blue>{video.title}</blue>{creator}")
title = video['title']
user = video['creator']['displayName']
print_out(f"Found: <blue>{title}</blue> by <yellow>{user}</yellow>")
target = _video_target_filename(video, args)
print_out("Output: <blue>{}</blue>".format(target))
print_out(f"Output: <blue>{target}</blue>")
if not args.overwrite and path.exists(target):
response = input("File exists. Overwrite? [Y/n]: ")
@ -283,6 +307,9 @@ def _download_video(video_id, args) -> None:
raise ConsoleError("Aborted")
args.overwrite = True
# Chapter select or manual offset
start, end = _determine_time_range(video_id, args)
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
@ -299,7 +326,7 @@ def _download_video(video_id, args) -> None:
base_uri = re.sub("/[^/]+$", "/", playlist_uri)
target_dir = _crete_temp_dir(base_uri)
vod_paths = _get_vod_paths(playlist, args.start, args.end)
vod_paths = _get_vod_paths(playlist, start, end)
# Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
@ -307,10 +334,9 @@ def _download_video(video_id, args) -> None:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text)
print_out("\nDownloading {} VODs using {} workers to {}".format(
len(vod_paths), args.max_workers, target_dir))
print_out(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, "{:05d}.ts".format(k)) for k, _ in enumerate(vod_paths)]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
# Make a modified playlist which references downloaded VODs
@ -327,18 +353,61 @@ def _download_video(video_id, args) -> None:
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path)
print_out("")
if args.no_join:
print_out("\n\n<dim>Skipping joining files...</dim>")
print_out("VODs downloaded to:\n<blue>{}</blue>".format(target_dir))
print_out("<dim>Skipping joining files...</dim>")
print_out(f"VODs downloaded to:\n<blue>{target_dir}</blue>")
return
print_out("\n\nJoining files...")
_join_vods(playlist_path, target, args.overwrite, video)
if args.concat:
print_out("<dim>Concating files...</dim>")
_concat_vods(targets, target)
else:
print_out("<dim>Joining files...</dim>")
_join_vods(playlist_path, target, args.overwrite, video)
if args.keep:
print_out("\n<dim>Temporary files not deleted: {}</dim>".format(target_dir))
print_out(f"\n<dim>Temporary files not deleted: {target_dir}</dim>")
else:
print_out("\n<dim>Deleting temporary files...</dim>")
shutil.rmtree(target_dir)
print_out("\nDownloaded: <green>{}</green>".format(target))
print_out(f"\nDownloaded: <green>{target}</green>")
def _determine_time_range(video_id, args: DownloadOptions):
if args.start or args.end:
return args.start, args.end
if args.chapter is not None:
print_out("<dim>Fetching chapters...</dim>")
chapters = twitch.get_video_chapters(video_id)
if not chapters:
raise ConsoleError("This video has no chapters")
if args.chapter == 0:
chapter = _choose_chapter_interactive(chapters)
else:
try:
chapter = chapters[args.chapter - 1]
except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
print_out(f'Chapter selected: <blue>{chapter["description"]}</blue>\n')
start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000
return start, start + duration
return None, None
def _choose_chapter_interactive(chapters):
print_out("\nChapters:")
for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{index + 1}) <b>{chapter["description"]}</b> <dim>({duration})</dim>')
index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1]
return chapter

View File

@ -1,9 +0,0 @@
import platform
import sys
import twitchdl
def env(args):
print("twitch-dl", twitchdl.__version__)
print("Platform:", platform.platform())
print("Python", sys.version)

View File

@ -1,19 +1,18 @@
import m3u8
from twitchdl import utils, twitch
from twitchdl.commands.download import get_video_substitutions
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, Video
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
def info(args):
video_id = utils.parse_video_identifier(args.video)
def info(id: str, *, json: bool = False, format="mkv"):
video_id = utils.parse_video_identifier(id)
if video_id:
print_log("Fetching video...")
video = twitch.get_video(video_id)
if not video:
raise ConsoleError("Video {} not found".format(video_id))
raise ConsoleError(f"Video {video_id} not found")
print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id)
@ -21,44 +20,59 @@ def info(args):
print_log("Fetching playlists...")
playlists = twitch.get_playlists(video_id, access_token)
if video:
if args.json:
video_json(video, playlists)
else:
video_info(video, playlists)
return
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
clip_slug = utils.parse_clip_identifier(args.video)
substitutions = get_video_substitutions(video, format)
if json:
video_json(video, playlists, chapters)
else:
video_info(video, playlists, chapters)
print_out("\nOutput format placeholders:")
for k, v in substitutions.items():
print(f" * {k} = {v}")
return
clip_slug = utils.parse_clip_identifier(id)
if clip_slug:
print_log("Fetching clip...")
clip = twitch.get_clip(clip_slug)
if not clip:
raise ConsoleError("Clip {} not found".format(clip_slug))
raise ConsoleError(f"Clip {clip_slug} not found")
if args.json:
print_json(clip.raw)
if json:
print_json(clip)
else:
clip_info(clip)
return
raise ConsoleError("Invalid input: {}".format(args.video))
raise ConsoleError(f"Invalid input: {id}")
def video_info(video: Video, playlists):
def video_info(video, playlists, chapters):
print_out()
print_video(video)
print_out()
print_out("Playlists:")
for p in m3u8.loads(playlists).playlists:
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
print_out(f"<b>{p.stream_info.video}</b> {p.uri}")
if chapters:
print_out()
print_out("Chapters:")
for chapter in chapters:
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
print_out(f'{start} <b>{chapter["description"]}</b> ({duration})')
def video_json(video: Video, playlists):
def video_json(video, playlists, chapters):
playlists = m3u8.loads(playlists).playlists
json = video.raw
json["playlists"] = [
video["playlists"] = [
{
"bandwidth": p.stream_info.bandwidth,
"resolution": p.stream_info.resolution,
@ -68,14 +82,16 @@ def video_json(video: Video, playlists):
} for p in playlists
]
print_json(json)
video["chapters"] = chapters
print_json(video)
def clip_info(clip: Clip):
def clip_info(clip):
print_out()
print_clip(clip)
print_out()
print_out("Download links:")
for q in clip.video_qualities:
print_out(f"<b>{q.quality}p{q.frame_rate}</b> {q.source_url}")
for q in clip["videoQualities"]:
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))

View File

@ -5,24 +5,35 @@ from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_out, print_paged_videos, print_video, print_json, print_video_compact
def videos(args):
game_ids = _get_game_ids(args.game)
def videos(
channel_name: str,
*,
all: bool,
compact: bool,
games: list[str],
json: bool,
limit: int | None,
pager: int | None,
sort: str,
type: str,
):
game_ids = _get_game_ids(games)
# Set different defaults for limit for compact display
limit = args.limit or (40 if args.compact else 10)
limit = limit or (40 if compact else 10)
# Ignore --limit if --pager or --all are given
max_videos = sys.maxsize if args.all or args.pager else limit
max_videos = sys.maxsize if all or pager else limit
total_count, generator = twitch.channel_videos_generator(
args.channel_name, max_videos, args.sort, args.type, game_ids=game_ids)
channel_name, max_videos, sort, type, game_ids=game_ids)
if args.json:
if json:
videos = list(generator)
print_json({
"count": len(videos),
"totalCount": total_count,
"videos": [v.raw for v in videos]
"videos": videos
})
return
@ -30,13 +41,13 @@ def videos(args):
print_out("<yellow>No videos found</yellow>")
return
if args.pager:
print_paged_videos(generator, args.pager, total_count)
if pager:
print_paged_videos(generator, pager, total_count)
return
count = 0
for video in generator:
if args.compact:
if compact:
print_video_compact(video)
else:
print_out()
@ -45,7 +56,7 @@ def videos(args):
print_out()
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(1, count, total_count))
print_out(f"<yellow>Videos 1-{count} of {total_count}</yellow>")
if total_count > count:
print_out()
@ -61,9 +72,9 @@ def _get_game_ids(names):
game_ids = []
for name in names:
print_out(f"<dim>Looking up game '{name}'...</dim>")
game = twitch.find_game(name)
if not game:
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
game_ids.append(int(game.id))
game_ids.append(int(game_id))
return game_ids

View File

@ -1,325 +0,0 @@
# -*- coding: utf-8 -*-
import logging
import sys
import re
from argparse import ArgumentParser, ArgumentTypeError
from typing import NamedTuple, List, Tuple, Any, Dict
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_err
from twitchdl.twitch import GQLError
from . import commands, __version__
Argument = Tuple[List[str], Dict[str, Any]]
class Command(NamedTuple):
name: str
description: str
arguments: List[Argument]
CLIENT_WEBSITE = 'https://github.com/ihabunek/twitch-dl'
def time(value: str) -> int:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
parts = [int(p) for p in value.split(":")]
if not 2 <= len(parts) <= 3:
raise ArgumentTypeError()
hours = parts[0]
minutes = parts[1]
seconds = parts[2] if len(parts) > 2 else 0
if hours < 0 or not (0 <= minutes <= 59) or not (0 <= seconds <= 59):
raise ArgumentTypeError()
return hours * 3600 + minutes * 60 + seconds
def pos_integer(value: str) -> int:
try:
parsed = int(value)
except ValueError:
raise ArgumentTypeError("must be an integer")
if parsed < 1:
raise ArgumentTypeError("must be positive")
return parsed
def rate(value: str) -> int:
match = re.search(r"^([0-9]+)(k|m|)$", value, flags=re.IGNORECASE)
if not match:
raise ArgumentTypeError("must be an integer, followed by an optional 'k' or 'm'")
amount = int(match.group(1))
unit = match.group(2)
if unit == "k":
return amount * 1024
if unit == "m":
return amount * 1024 * 1024
return amount
COMMANDS = [
Command(
name="videos",
description="List videos for a channel.",
arguments=[
(["channel_name"], {
"help": "Name of the channel to list videos for.",
"type": str,
}),
(["-g", "--game"], {
"help": "Show videos of given game (can be given multiple times)",
"action": "append",
"type": str,
}),
(["-l", "--limit"], {
"help": "Number of videos to fetch. Defaults to 40 in copmpact mode, 10 otherwise.",
"type": pos_integer,
}),
(["-a", "--all"], {
"help": "Fetch all videos, overrides --limit",
"action": "store_true",
"default": False,
}),
(["-s", "--sort"], {
"help": "Sorting order of videos. Defaults to `time`.",
"type": str,
"choices": ["views", "time"],
"default": "time",
}),
(["-t", "--type"], {
"help": "Broadcast type. Defaults to `archive`.",
"type": str,
"choices": ["archive", "highlight", "upload"],
"default": "archive",
}),
(["-j", "--json"], {
"help": "Show results as JSON. Ignores `--pager`.",
"action": "store_true",
"default": False,
}),
(["-p", "--pager"], {
"help": "Print videos in pages. Ignores `--limit`. Defaults to 10.",
"type": pos_integer,
"nargs": "?",
"const": 10,
}),
(["-c", "--compact"], {
"help": "Show videos in compact mode, one line per video",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="clips",
description="List or download clips for a channel.",
arguments=[
(["channel_name"], {
"help": "Name of the channel to list clips for.",
"type": str,
}),
(["-l", "--limit"], {
"help": "Number of videos to fetch (default 10, max 100)",
"type": pos_integer,
"default": 10,
}),
(["-a", "--all"], {
"help": "Fetch all videos, overrides --limit",
"action": "store_true",
"default": False,
}),
(["-P", "--period"], {
"help": "Period from which to return clips. Defaults to `all_time`.",
"type": str,
"choices": ["last_day", "last_week", "last_month", "all_time"],
"default": "all_time",
}),
(["-j", "--json"], {
"help": "Show results as JSON. Ignores `--pager`.",
"action": "store_true",
"default": False,
}),
(["-p", "--pager"], {
"help": "Number of clips to show per page. Disabled by default.",
"type": pos_integer,
"nargs": "?",
"const": 10,
}),
(["-d", "--download"], {
"help": "Download all videos in given period (in source quality)",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="download",
description="Download videos or clips.",
arguments=[
(["videos"], {
"help": "One or more video ID, clip slug or twitch URL to download.",
"type": str,
"nargs": "+",
}),
(["-w", "--max-workers"], {
"help": "Number of workers for downloading vods concurrently (default 5)",
"type": int,
"default": 5,
}),
(["-s", "--start"], {
"help": "Download video from this time (hh:mm or hh:mm:ss)",
"type": time,
"default": None,
}),
(["-e", "--end"], {
"help": "Download video up to this time (hh:mm or hh:mm:ss)",
"type": time,
"default": None,
}),
(["-f", "--format"], {
"help": "Video format to convert into, passed to ffmpeg as the "
"target file extension. Defaults to `mkv`.",
"type": str,
"default": "mkv",
}),
(["-k", "--keep"], {
"help": "Don't delete downloaded VODs and playlists after merging.",
"action": "store_true",
"default": False,
}),
(["-q", "--quality"], {
"help": "Video quality, e.g. 720p. Set to 'source' to get best quality.",
"type": str,
}),
(["-a", "--auth-token"], {
"help": "Authentication token, passed to Twitch to access subscriber only "
"VODs. Can be copied from the 'auth_token' cookie in any browser "
"logged in on Twitch.",
"type": str,
"default": None,
}),
(["--no-join"], {
"help": "Don't run ffmpeg to join the downloaded vods, implies --keep.",
"action": "store_true",
"default": False,
}),
(["--overwrite"], {
"help": "Overwrite the target file if it already exists without prompting.",
"action": "store_true",
"default": False,
}),
(["-o", "--output"], {
"help": "Output file name template. See docs for details.",
"type": str,
"default": "{date}_{id}_{channel_login}_{title_slug}.{format}"
}),
(["-r", "--rate-limit"], {
"help": "Limit the maximum download speed in bytes per second. "
"Use 'k' and 'm' suffixes for kbps and mbps.",
"type": rate,
}),
],
),
Command(
name="info",
description="Print information for a given Twitch URL, video ID or clip slug.",
arguments=[
(["video"], {
"help": "Video ID, clip slug, or URL",
"type": str,
}),
(["-j", "--json"], {
"help": "Show results as JSON",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="env",
description="Print environment information for inclusion in bug reports.",
arguments=[],
)
]
COMMON_ARGUMENTS = [
(["--debug"], {
"help": "show debug log in console",
"action": 'store_true',
"default": False,
}),
(["--no-color"], {
"help": "disable ANSI colors in output",
"action": 'store_true',
"default": False,
})
]
def get_parser():
description = "A script for downloading videos from Twitch"
parser = ArgumentParser(prog='twitch-dl', description=description, epilog=CLIENT_WEBSITE)
parser.add_argument("--version", help="show version number", action='store_true')
subparsers = parser.add_subparsers(title="commands")
for command in COMMANDS:
sub = subparsers.add_parser(
command.name,
description=command.description,
epilog=CLIENT_WEBSITE
)
# Set the function to call to the function of same name in the "commands" package
sub.set_defaults(func=commands.__dict__.get(command.name))
for args, kwargs in command.arguments + COMMON_ARGUMENTS:
sub.add_argument(*args, **kwargs)
return parser
def main():
parser = get_parser()
args = parser.parse_args()
if "--debug" in sys.argv:
logging.basicConfig(level=logging.DEBUG)
if args.version:
print("twitch-dl v{}".format(__version__))
return
if "func" not in args:
parser.print_help()
return
try:
args.func(args)
except ConsoleError as e:
print_err(e)
sys.exit(1)
except KeyboardInterrupt:
print_err("\nOperation canceled")
sys.exit(1)
except GQLError as e:
print_err(e)
for err in e.errors:
print_err("*", err["message"])
sys.exit(1)

View File

@ -1,15 +1,13 @@
import os
import httpx
from twitchdl.exceptions import ConsoleError
CHUNK_SIZE = 1024
CONNECT_TIMEOUT = 5
RETRY_COUNT = 5
class DownloadFailed(Exception):
pass
def _download(url: str, path: str):
tmp_path = path + ".tmp"
size = 0
@ -35,4 +33,4 @@ def download_file(url: str, path: str, retries: int = RETRY_COUNT):
except httpx.RequestError:
pass
raise DownloadFailed(":(")
raise ConsoleError(f"Failed downloading after {retries} attempts: {url}")

25
twitchdl/entities.py Normal file
View File

@ -0,0 +1,25 @@
from dataclasses import dataclass
from typing import Any
@dataclass
class DownloadOptions:
auth_token: str | None
chapter: int | None
concat: bool
dry_run: bool
end: int | None
format: str
keep: bool
no_join: bool
overwrite: bool
output: str
quality: str | None
rate_limit: str | None
start: int | None
max_workers: int
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = dict[str, Any]

View File

@ -1,4 +1,5 @@
import click
class ConsoleError(Exception):
class ConsoleError(click.ClickException):
"""Raised when an error occurs and script exectuion should halt."""
pass

View File

@ -4,7 +4,8 @@ import logging
import os
import time
from typing import List, Optional, Union
from abc import ABC, abstractmethod
from typing import List, Optional
from twitchdl.progress import Progress
@ -25,7 +26,13 @@ https://www.python-httpx.org/advanced/#timeout-configuration
"""
class TokenBucket:
class TokenBucket(ABC):
@abstractmethod
def advance(self, size: int):
pass
class LimitingTokenBucket(TokenBucket):
"""Limit the download speed by strategically inserting sleeps."""
def __init__(self, rate: int, capacity: Optional[int] = None):
@ -53,22 +60,19 @@ class TokenBucket:
self.last_refilled = now
class EndlessTokenBucket:
class EndlessTokenBucket(TokenBucket):
"""Used when download speed is not limited."""
def advance(self, size: int):
pass
AnyTokenBucket = Union[TokenBucket, EndlessTokenBucket]
async def download(
client: httpx.AsyncClient,
task_id: int,
source: str,
target: str,
progress: Progress,
token_bucket: AnyTokenBucket,
token_bucket: TokenBucket,
):
# Download to a temp file first, then copy to target when over to avoid
# getting saving chunks which may persist if canceled or --keep is used
@ -83,7 +87,7 @@ async def download(
token_bucket.advance(size)
progress.advance(task_id, size)
progress.end(task_id)
os.rename(tmp_target, target)
os.rename(tmp_target, target)
async def download_with_retries(
@ -93,7 +97,7 @@ async def download_with_retries(
source: str,
target: str,
progress: Progress,
token_bucket: AnyTokenBucket,
token_bucket: TokenBucket,
):
async with semaphore:
if os.path.exists(target):
@ -117,11 +121,11 @@ async def download_all(
sources: List[str],
targets: List[str],
workers: int,
/, *,
*,
rate_limit: Optional[int] = None
):
progress = Progress(len(sources))
token_bucket = TokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket)

View File

@ -1,141 +0,0 @@
from typing import Any, Dict, List, Optional, Generator
from dataclasses import dataclass
Json = Dict[str, Any]
GameID = str
@dataclass(frozen=True)
class Broadcaster():
login: str
display_name: str
@staticmethod
def from_json(data: Json) -> "Broadcaster":
return Broadcaster(data["login"], data["displayName"])
@dataclass(frozen=True)
class VideoQuality():
frame_rate: int
quality: str
source_url: str
@staticmethod
def from_json(data: Json) -> "VideoQuality":
return VideoQuality(data["frameRate"], data["quality"], data["sourceURL"])
@dataclass(frozen=True)
class Game():
id: str
name: str
description: str
@staticmethod
def from_json(data: Json) -> "Game":
return Game(data["id"], data["name"], data["description"])
@dataclass(frozen=True)
class Clip():
id: str
slug: str
title: str
created_at: str
view_count: int
duration_seconds: int
url: str
game: Optional[Game]
broadcaster: Broadcaster
video_qualities: List[VideoQuality]
raw: Json
@staticmethod
def from_json(data: Json) -> "Clip":
game = Game.from_json(data["game"]) if data["game"] else None
broadcaster = Broadcaster.from_json(data["broadcaster"])
video_qualities = [VideoQuality.from_json(q) for q in data["videoQualities"]]
return Clip(
data["id"],
data["slug"],
data["title"],
data["createdAt"],
data["viewCount"],
data["durationSeconds"],
data["url"],
game,
broadcaster,
video_qualities,
data
)
@dataclass(frozen=True)
class ClipsPage():
cursor: str
has_next_page: bool
has_previous_page: bool
clips: List[Clip]
@staticmethod
def from_json(data: Json) -> "ClipsPage":
return ClipsPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"]["hasPreviousPage"],
[Clip.from_json(c["node"]) for c in data["edges"]]
)
@dataclass(frozen=True)
class Video():
id: str
title: str
published_at: str
broadcast_type: str
length_seconds: int
game: Optional[Game]
creator: Broadcaster
raw: Json
@staticmethod
def from_json(data: Json) -> "Video":
game = Game.from_json(data["game"]) if data["game"] else None
creator = Broadcaster.from_json(data["creator"])
return Video(
data["id"],
data["title"],
data["publishedAt"],
data["broadcastType"],
data["lengthSeconds"],
game,
creator,
data
)
@dataclass(frozen=True)
class VideosPage():
cursor: str
has_next_page: bool
has_previous_page: bool
total_count: int
videos: List[Video]
@staticmethod
def from_json(data: Json) -> "VideosPage":
return VideosPage(
data["edges"][-1]["cursor"],
data["pageInfo"]["hasNextPage"],
data["pageInfo"].get("hasPreviousPage"),
data["totalCount"],
[Video.from_json(c["node"]) for c in data["edges"]]
)
ClipGenerator = Generator[Clip, None, None]
VideoGenerator = Generator[Video, None, None]

View File

@ -1,12 +1,9 @@
# -*- coding: utf-8 -*-
import json
import sys
import re
from itertools import islice
from twitchdl import utils
from twitchdl.models import Clip, Video
from typing import Any, Match
@ -67,42 +64,46 @@ def print_json(data: Any):
def print_err(*args, **kwargs):
args = ["<red>{}</red>".format(a) for a in args]
args = [f"<red>{arg}</red>" for arg in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_log(*args, **kwargs):
args = ["<dim>{}</dim>".format(a) for a in args]
args = [f"<dim>{a}</dim>" for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_video(video: Video):
published_at = video.published_at.replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video.length_seconds)
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = f"<blue>{video.creator.display_name}</blue>" if video.creator else ""
playing = f"playing <blue>{video.game.name}</blue>" if video.game else ""
channel = f"<blue>{video['creator']['displayName']}</blue>" if video["creator"] else ""
playing = f"playing <blue>{video['game']['name']}</blue>" if video["game"] else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video.id}"
url = f"https://www.twitch.tv/videos/{video['id']}"
print_out(f"<b>Video {video.id}</b>")
print_out(f"<green>{video.title}</green>")
print_out(f"<b>Video {video['id']}</b>")
print_out(f"<green>{video['title']}</green>")
if channel or playing:
print_out(" ".join([channel, playing]))
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue>")
if video["description"]:
print_out(f"Description: {video['description']}")
print_out(f"Published <blue>{published_at}</blue> Length: <blue>{length}</blue> ")
print_out(f"<i>{url}</i>")
def print_video_compact(video):
date = video.published_at[:10]
game = video.game.name if video.game else ""
title = truncate(video.title, 80).ljust(80)
print_out(f"<b>{video.id}</b> {date} <green>{title}</green> <blue>{game}</blue>")
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
print_out(f'<b>{id}</b> {date} <green>{title}</green> <blue>{game}</blue>')
def print_paged_videos(generator, page_size, total_count):
@ -123,7 +124,7 @@ def print_paged_videos(generator, page_size, total_count):
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(first, last, total_count))
print_out(f"<yellow>Videos {first}-{last} of {total_count}</yellow>")
first = first + len(page)
last = first + 1
@ -133,23 +134,24 @@ def print_paged_videos(generator, page_size, total_count):
break
def print_clip(clip: Clip):
published_at = clip.created_at.replace("T", " @ ").replace("Z", "")
length = utils.format_time(clip.duration_seconds)
channel = clip.broadcaster.display_name
def print_clip(clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
playing = (
"playing <blue>{}</blue>".format(clip.game.name)
if clip.game else ""
f"playing <blue>{clip['game']['name']}</blue>"
if clip["game"] else ""
)
print_out("Clip <b>{}</b>".format(clip.slug))
print_out("<green>{}</green>".format(clip.title))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(f"Clip <b>{clip['slug']}</b>")
print_out(f"<green>{clip['title']}</green>")
print_out(f"<blue>{channel}</blue> {playing}")
print_out(
f"Published: <blue>{published_at}</blue>"
f" Length: <blue>{length}</blue>"
f" Views: <blue>{clip.view_count}</blue>")
print_out(f"<i>{clip.url}</i>")
f"Published <blue>{published_at}</blue>" +
f" Length: <blue>{length}</blue>" +
f" Views: <blue>{clip["viewCount"]}</blue>"
)
print_out(f"<i>{clip['url']}</i>")
def _continue():

View File

@ -21,7 +21,7 @@ class Task:
size: int
downloaded: int = 0
def advance(self, size):
def advance(self, size: int):
self.downloaded += size

View File

@ -3,17 +3,20 @@ Twitch API access.
"""
import httpx
import json
import click
from typing import Dict
from twitchdl import CLIENT_ID
from twitchdl.exceptions import ConsoleError
from twitchdl.models import Clip, ClipsPage, ClipGenerator, Game, Video, VideoGenerator, VideosPage
from typing import Dict, Optional, Tuple
class GQLError(Exception):
def __init__(self, errors):
super().__init__("GraphQL query failed")
self.errors = errors
class GQLError(click.ClickException):
def __init__(self, errors: list[str]):
message = "GraphQL query failed."
for error in errors:
message += f"\n* {error}"
super().__init__(message)
def authenticated_post(url, data=None, json=None, headers={}):
@ -29,49 +32,45 @@ def authenticated_post(url, data=None, json=None, headers={}):
return response
def gql_post(query):
def gql_post(query: str):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, data=query).json()
if "errors" in response:
raise GQLError(response["errors"])
return response
response = authenticated_post(url, data=query)
gql_raise_on_error(response)
return response.json()
def gql_query(query: str, headers: Dict[str, str] = {}):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, headers=headers).json()
if "errors" in response:
raise GQLError(response["errors"])
return response
response = authenticated_post(url, json={"query": query}, headers=headers)
gql_raise_on_error(response)
return response.json()
GAME_FIELDS = """
id
name
description
"""
def gql_raise_on_error(response: httpx.Response):
data = response.json()
if "errors" in data:
errors = [e["message"] for e in data["errors"]]
raise GQLError(errors)
VIDEO_FIELDS = f"""
VIDEO_FIELDS = """
id
title
description
publishedAt
broadcastType
lengthSeconds
game {{
{GAME_FIELDS}
}}
creator {{
game {
name
}
creator {
login
displayName
}}
}
"""
CLIP_FIELDS = f"""
CLIP_FIELDS = """
id
slug
title
@ -79,52 +78,50 @@ CLIP_FIELDS = f"""
viewCount
durationSeconds
url
videoQualities {{
videoQualities {
frameRate
quality
sourceURL
}}
game {{
{GAME_FIELDS}
}}
broadcaster {{
login
}
game {
id
name
}
broadcaster {
displayName
}}
login
}
"""
def get_video(video_id: str) -> Optional[Video]:
query = """
def get_video(video_id: str):
query = f"""
{{
video(id: "{video_id}") {{
{fields}
{VIDEO_FIELDS}
}}
}}
"""
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
if response["data"]["video"]:
return Video.from_json(response["data"]["video"])
return response["data"]["video"]
def get_clip(slug: str) -> Clip:
query = """
def get_clip(slug: str):
query = f"""
{{
clip(slug: "{}") {{
{fields}
clip(slug: "{slug}") {{
{CLIP_FIELDS}
}}
}}
"""
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
return Clip.from_json(response["data"]["clip"])
response = gql_query(query)
return response["data"]["clip"]
def get_clip_access_token(slug):
query = """
def get_clip_access_token(slug: str):
query = f"""
{{
"operationName": "VideoAccessToken_Clip",
"variables": {{
@ -139,11 +136,11 @@ def get_clip_access_token(slug):
}}
"""
response = gql_post(query.format(slug=slug).strip())
response = gql_post(query.strip())
return response["data"]["clip"]
def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
def get_channel_clips(channel_id: str, period: str, limit: int, after: str | None= None):
"""
List channel clips.
@ -153,10 +150,10 @@ def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
* sorting by VIEWS_DESC and TRENDING returns the same results
* there is no totalCount
"""
query = """
query = f"""
{{
user(login: "{channel_id}") {{
clips(first: {limit}, after: "{after}", criteria: {{ period: {period}, sort: VIEWS_DESC }}) {{
clips(first: {limit}, after: "{after or ''}", criteria: {{ period: {period.upper()}, sort: VIEWS_DESC }}) {{
pageInfo {{
hasNextPage
hasPreviousPage
@ -164,7 +161,7 @@ def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
edges {{
cursor
node {{
{fields}
{CLIP_FIELDS}
}}
}}
}}
@ -172,68 +169,72 @@ def get_channel_clips(channel_id, period, limit, after=None) -> ClipsPage:
}}
"""
query = query.format(
channel_id=channel_id,
after=after if after else "",
limit=limit,
period=period.upper(),
fields=CLIP_FIELDS
)
response = gql_query(query)
user = response["data"]["user"]
if not user:
raise ConsoleError("Channel {} not found".format(channel_id))
raise ConsoleError(f"Channel {channel_id} not found")
return ClipsPage.from_json(response["data"]["user"]["clips"])
return response["data"]["user"]["clips"]
def channel_clips_generator(channel_id: str, period, limit: int) -> ClipGenerator:
def _generator(page: ClipsPage, limit: int):
for clip in page.clips:
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
for clip in clips["edges"]:
if limit < 1:
return
yield clip
yield clip["node"]
limit -= 1
if limit < 1 or not page.has_next_page:
has_next = clips["pageInfo"]["hasNextPage"]
if limit < 1 or not has_next:
return
req_limit = min(limit, 100)
next_page = get_channel_clips(channel_id, period, req_limit, page.cursor)
yield from _generator(next_page, limit)
cursor = clips["edges"][-1]["cursor"]
clips = get_channel_clips(channel_id, period, req_limit, cursor)
yield from _generator(clips, limit)
req_limit = min(limit, 100)
page = get_channel_clips(channel_id, period, req_limit)
return _generator(page, limit)
clips = get_channel_clips(channel_id, period, req_limit)
return _generator(clips, limit)
def channel_clips_generator_old(channel_id, period, limit):
cursor = ""
while True:
page = get_channel_clips(channel_id, period, limit, after=cursor)
clips = get_channel_clips(
channel_id, period, limit, after=cursor)
if not page.clips:
if not clips["edges"]:
break
has_next = page.has_next_page
cursor = page.cursor if has_next else None
has_next = clips["pageInfo"]["hasNextPage"]
cursor = clips["edges"][-1]["cursor"] if has_next else None
yield page.clips, has_next
yield clips, has_next
if not cursor:
break
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None) -> VideosPage:
query = """
def get_channel_videos(
channel_id: str,
limit: int,
sort: str,
type: str = "archive",
game_ids: list[str] | None = None,
after: str | None = None
):
game_ids = game_ids or []
query = f"""
{{
user(login: "{channel_id}") {{
videos(
first: {limit},
type: {type},
sort: {sort},
after: "{after}",
type: {type.upper()},
sort: {sort.upper()},
after: "{after or ''}",
options: {{
gameIDs: {game_ids}
}}
@ -245,7 +246,7 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
edges {{
cursor
node {{
{fields}
{VIDEO_FIELDS}
}}
}}
}}
@ -253,46 +254,38 @@ def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], aft
}}
"""
query = query.format(
channel_id=channel_id,
game_ids=game_ids,
after=after if after else "",
limit=limit,
sort=sort.upper(),
type=type.upper(),
fields=VIDEO_FIELDS
)
response = gql_query(query)
if not response["data"]["user"]:
raise ConsoleError("Channel {} not found".format(channel_id))
raise ConsoleError(f"Channel {channel_id} not found")
return VideosPage.from_json(response["data"]["user"]["videos"])
return response["data"]["user"]["videos"]
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None) -> Tuple[int, VideoGenerator]:
def _generator(page, max_videos):
for video in page.videos:
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=[]):
def _generator(videos, max_videos):
for video in videos["edges"]:
if max_videos < 1:
return
yield video
yield video["node"]
max_videos -= 1
if max_videos < 1 or not page.has_next_page:
has_next = videos["pageInfo"]["hasNextPage"]
if max_videos < 1 or not has_next:
return
limit = min(max_videos, 100)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, page.cursor)
cursor = videos["edges"][-1]["cursor"]
videos = get_channel_videos(channel_id, limit, sort, type, game_ids, cursor)
yield from _generator(videos, max_videos)
limit = min(max_videos, 100)
page = get_channel_videos(channel_id, limit, sort, type, game_ids)
return page.total_count, _generator(page, max_videos)
videos = get_channel_videos(channel_id, limit, sort, type, game_ids)
return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id, auth_token=None):
query = """
query = f"""
{{
videoPlaybackAccessToken(
id: {video_id},
@ -308,8 +301,6 @@ def get_access_token(video_id, auth_token=None):
}}
"""
query = query.format(video_id=video_id)
headers = {}
if auth_token is not None:
headers['authorization'] = f'OAuth {auth_token}'
@ -336,7 +327,7 @@ def get_playlists(video_id, access_token):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = "http://usher.twitch.tv/vod/{}".format(video_id)
url = f"https://usher.ttvnw.net/vod/{video_id}"
response = httpx.get(url, params={
"nauth": access_token['value'],
@ -349,15 +340,45 @@ def get_playlists(video_id, access_token):
return response.content.decode('utf-8')
def find_game(name: str) -> Optional[Game]:
def get_game_id(name):
query = f"""
{{
game(name: "{name.strip()}") {{
{GAME_FIELDS}
id
}}
}}
"""
response = gql_query(query)
if response["data"]["game"]:
return Game.from_json(response["data"]["game"])
game = response["data"]["game"]
if game:
return game["id"]
def get_video_chapters(video_id: str):
query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables":
{
"includePrivate": False,
"videoID": video_id
},
"extensions":
{
"persistedQuery":
{
"version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41"
}
}
}
response = gql_post(json.dumps(query))
return list(_chapter_nodes(response["data"]["video"]["moments"]))
def _chapter_nodes(collection):
for edge in collection["edges"]:
node = edge["node"]
del node["moments"]
yield node

View File

@ -2,14 +2,14 @@ import re
import unicodedata
def _format_size(value, digits, unit):
def _format_size(value: float, digits: int, unit: str):
if digits > 0:
return "{{:.{}f}}{}".format(digits, unit).format(value)
return f"{{:.{digits}f}}{unit}".format(value)
else:
return "{{:d}}{}".format(unit).format(value)
return f"{int(value)}{unit}"
def format_size(bytes_, digits=1):
def format_size(bytes_: int, digits: int = 1):
if bytes_ < 1024:
return _format_size(bytes_, digits, "B")
@ -24,7 +24,7 @@ def format_size(bytes_, digits=1):
return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds: int) -> str:
def format_duration(total_seconds: int | float) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -32,34 +32,37 @@ def format_duration(total_seconds: int) -> str:
seconds = total_seconds % 60
if hours:
return "{} h {} min".format(hours, minutes)
return f"{hours} h {minutes} min"
if minutes:
return "{} min {} sec".format(minutes, seconds)
return f"{minutes} min {seconds} sec"
return "{} sec".format(seconds)
return f"{seconds} sec"
def format_time(total_seconds: int) -> str:
def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
minutes = remainder // 60
seconds = total_seconds % 60
if hours:
if hours or force_hours:
return f"{hours:02}:{minutes:02}:{seconds:02}"
return f"{minutes:02}:{seconds:02}"
def read_int(msg, min, max, default):
msg = msg + " [default {}]: ".format(default)
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
if default:
msg = msg + f" [default {default}]"
msg += ": "
while True:
try:
val = input(msg)
if not val:
if default and not val:
return default
if min <= int(val) <= max:
return int(val)
@ -93,7 +96,7 @@ CLIP_PATTERNS = [
]
def parse_video_identifier(identifier: str) -> str:
def parse_video_identifier(identifier: str) -> str | None:
"""Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier)
@ -101,7 +104,7 @@ def parse_video_identifier(identifier: str) -> str:
return match.group("id")
def parse_clip_identifier(identifier: str) -> str:
def parse_clip_identifier(identifier: str) -> str | None:
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier)