Compare commits

..

1 Commits

Author SHA1 Message Date
3c96c02394 wip 2022-08-08 13:55:33 +02:00
44 changed files with 1170 additions and 1983 deletions

View File

@ -1,4 +0,0 @@
[vermin]
only_show_violations = yes
show_tips = no
targets = 3.8

View File

@ -3,60 +3,6 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires python 3.8 or later**
* Migrated to click lib for cli parsing
* Add shell auto completion
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)
* Fix error caused by twitch requiring https for the usher api (thanks
@deanpcmad)
### [2.1.3 (2023-05-07)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.3)
* Replace client ID with one that works for now (thanks @mwhite34)
### [2.1.2 (2023-04-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.2)
* Fix error caused by twitch changing the Usher domain (thanks @adsa95)
### [2.1.1 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.1)
* Fix Python 3.7 compatibility (#117, thanks @eliduvid)
* Fix default value for game_ids (#102, thanks @FunnyPocketBook)
### [2.1.0 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.0)
* Add chapter list to `info` command
* Add `download --chapter` option for downloading a single chapter
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
* Fix an issue where a temp vod file would be renamed while still being open,
which caused an exception on Windows (#111)
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
This release switches from using `requests` to `httpx` for making http requests,
and from threads to `asyncio` for concurrency. This enables easier
implementation of new features, but has no breaking changes for the CLI.
* **BREAKING**: Require Python 3.7 or later.
* Add `--rate-limit` option to `download` for limiting maximum bandwidth when
downloading.
* Add `--compact` option to `download` for displaying one video per line.
* Allow passing multiple video ids to `download` to download multiple videos
successively.
* Improved progress meter, updates on each chunk downloaded, instead of each VOD
downloaded.
* Improved speed estimate, displays recent speed instead of average speed since
the start of download.
* Decreased default concurrent downloads to 5. This seems to be enough to
saturate the download link in most cases. You can override this by setting the
`-w` option. Please test and report back if this works for you.
### [1.22.0 (2022-06-25)](https://github.com/ihabunek/twitch-dl/releases/tag/1.22.0)
* Add support for downloading subscriber-only VODs (#48, thanks @cemiu)

View File

@ -2,12 +2,13 @@
default : clean dist
dist:
python -m build
dist :
python setup.py sdist --formats=gztar,zip
python setup.py bdist_wheel --python-tag=py3
clean :
find . -name "*pyc" | xargs rm -rf $1
rm -rf build dist bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man twitch_dl.egg-info
rm -rf build dist bundle MANIFEST htmlcov deb_dist twitch-dl.*.pyz twitch-dl.1.man
bundle:
mkdir bundle

View File

@ -17,7 +17,7 @@ Resources
Requirements
------------
* Python 3.8 or later
* Python 3.5 or later
* [ffmpeg](https://ffmpeg.org/download.html), installed and on the system path
Quick start

View File

@ -1,8 +0,0 @@
TODO
====
Some ideas what to do next.
* gracefully handle aborting the download with Ctrl+C, now it prints out an error stack
* add keyboard control for e.g. pausing a download
* test how worker count affects download speeds on low and high-bandwidth links (see https://github.com/ihabunek/twitch-dl/issues/104), adjust default worker count

View File

@ -1,64 +1,3 @@
2.2.0:
date: TBA
changes:
- "**Requires python 3.8 or later**"
- "Migrated to Click library for generating the commandline interface"
- "Add shell auto completion, see: https://twitch-dl.bezdomni.net/shell_completion.html"
- "Add setting defaults via environment variables, see: https://twitch-dl.bezdomni.net/environment_variables.html"
- "Add `download --concat` option to avoid using ffmeg for joinig vods and concat them instead. This will produce a `.ts` file by default."
- "Add video description to metadata (#129)"
2.1.4:
date: 2024-01-06
changes:
- "Fix error caused by twitch requiring https for the usher api (thanks @deanpcmad)"
2.1.3:
date: 2023-05-07
changes:
- "Replace client ID with one that works for now (thanks @mwhite34)"
2.1.2:
date: 2023-04-18
changes:
- "Fix error caused by twitch changing the Usher domain (thanks @adsa95)"
2.1.1:
date: 2022-11-20
changes:
- "Fix Python 3.7 compatibility (#117, thanks @eliduvid)"
- "Fix default value for game_ids (#102, thanks @FunnyPocketBook)"
2.1.0:
date: 2022-11-20
changes:
- "Add chapter list to `info` command"
- "Add `download --chapter` option for downloading a single chapter"
2.0.1:
date: 2022-09-09
changes:
- "Fix an issue where a temp vod file would be renamed while still being open, which caused an exception on Windows (#111)"
2.0.0:
date: 2022-08-18
description: |
This release switches from using `requests` to `httpx` for making http
requests, and from threads to `asyncio` for concurrency. This enables
easier implementation of new features, but has no breaking changes for the
CLI.
changes:
- "**BREAKING**: Require Python 3.7 or later."
- "Add `--rate-limit` option to `download` for limiting maximum bandwidth when downloading."
- "Add `--compact` option to `download` for displaying one video per line."
- "Allow passing multiple video ids to `download` to download multiple videos successively."
- "Improved progress meter, updates on each chunk downloaded, instead of each VOD downloaded."
- "Improved speed estimate, displays recent speed instead of average speed since the start of download."
- |
Decreased default concurrent downloads to 5. This seems to be enough to
saturate the download link in most cases. You can override this by setting
the `-w` option. Please test and report back if this works for you.
1.22.0:
date: 2022-06-25
changes:

View File

@ -9,8 +9,6 @@
- [twitch-dl clips](commands/clips.md)
- [twitch-dl info](commands/info.md)
- [twitch-dl env](commands/env.md)
- [Environemnt variables](environment_variables.md)
- [Shell completion](shell_completion.md)
- [Advanced](advanced.md)
[License](license.md)

View File

@ -3,60 +3,6 @@ twitch-dl changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
### [2.2.0 (TBA)](https://github.com/ihabunek/twitch-dl/releases/tag/2.2.0)
* **Requires python 3.8 or later**
* Migrated to click lib for cli parsing
* Add shell auto completion
### [2.1.4 (2024-01-06)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.4)
* Fix error caused by twitch requiring https for the usher api (thanks
@deanpcmad)
### [2.1.3 (2023-05-07)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.3)
* Replace client ID with one that works for now (thanks @mwhite34)
### [2.1.2 (2023-04-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.2)
* Fix error caused by twitch changing the Usher domain (thanks @adsa95)
### [2.1.1 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.1)
* Fix Python 3.7 compatibility (#117, thanks @eliduvid)
* Fix default value for game_ids (#102, thanks @FunnyPocketBook)
### [2.1.0 (2022-11-20)](https://github.com/ihabunek/twitch-dl/releases/tag/2.1.0)
* Add chapter list to `info` command
* Add `download --chapter` option for downloading a single chapter
### [2.0.1 (2022-09-09)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.1)
* Fix an issue where a temp vod file would be renamed while still being open,
which caused an exception on Windows (#111)
### [2.0.0 (2022-08-18)](https://github.com/ihabunek/twitch-dl/releases/tag/2.0.0)
This release switches from using `requests` to `httpx` for making http requests,
and from threads to `asyncio` for concurrency. This enables easier
implementation of new features, but has no breaking changes for the CLI.
* **BREAKING**: Require Python 3.7 or later.
* Add `--rate-limit` option to `download` for limiting maximum bandwidth when
downloading.
* Add `--compact` option to `download` for displaying one video per line.
* Allow passing multiple video ids to `download` to download multiple videos
successively.
* Improved progress meter, updates on each chunk downloaded, instead of each VOD
downloaded.
* Improved speed estimate, displays recent speed instead of average speed since
the start of download.
* Decreased default concurrent downloads to 5. This seems to be enough to
saturate the download link in most cases. You can override this by setting the
`-w` option. Please test and report back if this works for you.
### [1.22.0 (2022-06-25)](https://github.com/ihabunek/twitch-dl/releases/tag/1.22.0)
* Add support for downloading subscriber-only VODs (#48, thanks @cemiu)

View File

@ -1,47 +1,64 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl clips
List or download clips for given CHANNEL_NAME.
List or download clips for a channel.
### USAGE
```
twitch-dl clips [OPTIONS] CHANNEL_NAME
twitch-dl clips <channel_name> [FLAGS] [OPTIONS]
```
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">&lt;channel_name&gt;</td>
<td>Name of the channel to list clips for.</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all videos, overrides --limit</td>
</tr>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON. Ignores <code>--pager</code>.</td>
</tr>
<tr>
<td class="code">-d, --download</td>
<td>Download all videos in given period (in source quality)</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all clips, overrides --limit</td>
<td class="code">-l, --limit</td>
<td>Number of videos to fetch (default 10, max 100)</td>
</tr>
<tr>
<td class="code">-d, --download</td>
<td>Download clips in given period (in source quality)</td>
<td class="code">-P, --period</td>
<td>Period from which to return clips. Defaults to <code>all_time</code>. Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>.</td>
</tr>
<tr>
<td class="code">-l, --limit INTEGER</td>
<td>Number of clips to fetch [max: 100] [default: <code>10</code>]</td>
</tr>
<tr>
<td class="code">-p, --pager INTEGER</td>
<td class="code">-p, --pager</td>
<td>Number of clips to show per page. Disabled by default.</td>
</tr>
<tr>
<td class="code">-P, --period TEXT</td>
<td>Period from which to return clips Possible values: <code>last_day</code>, <code>last_week</code>, <code>last_month</code>, <code>all_time</code>. [default: <code>all_time</code>]</td>
</tr>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
</tr>
</tbody>
</table>

View File

@ -1,51 +1,29 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl download
Download videos or clips.
Pass one or more video ID, clip slug or Twitch URL to download.
Download a video or clip.
### USAGE
```
twitch-dl download [OPTIONS] [IDS]...
twitch-dl download <video> [FLAGS] [OPTIONS]
```
### OPTIONS
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">-a, --auth-token TEXT</td>
<td>Authentication token, passed to Twitch to access subscriber only VODs. Can be copied from the <code>auth_token</code> cookie in any browser logged in on Twitch.</td>
<td class="code">&lt;video&gt;</td>
<td>Video ID, clip slug, or URL</td>
</tr>
</tbody>
</table>
<tr>
<td class="code">-c, --chapter INTEGER</td>
<td>Download a single chapter of the video. Specify the chapter number or use the flag without a number to display a chapter select prompt.</td>
</tr>
<tr>
<td class="code">--concat</td>
<td>Do not use ffmpeg to join files, concat them instead</td>
</tr>
<tr>
<td class="code">-d, --dry-run</td>
<td>Simulate the download provcess without actually downloading any files.</td>
</tr>
<tr>
<td class="code">-e, --end TEXT</td>
<td>Download video up to this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-f, --format TEXT</td>
<td>Video format to convert into, passed to ffmpeg as the target file extension. Defaults to <code>mkv</code>. If <code>--concat</code> is passed, defaults to <code>ts</code>.</td>
</tr>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-k, --keep</td>
<td>Don&#x27;t delete downloaded VODs and playlists after merging.</td>
@ -60,30 +38,46 @@ twitch-dl download [OPTIONS] [IDS]...
<td class="code">--overwrite</td>
<td>Overwrite the target file if it already exists without prompting.</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-o, --output TEXT</td>
<td>Output file name template. See docs for details. [default: <code>{date}_{id}_{channel_login}_{title_slug}.{format}</code>]</td>
<td class="code">-w, --max-workers</td>
<td>Maximal number of threads for downloading vods concurrently (default 20)</td>
</tr>
<tr>
<td class="code">-q, --quality TEXT</td>
<td>Video quality, e.g. <code>720p</code>. Set to <code>source</code> to get best quality.</td>
</tr>
<tr>
<td class="code">-r, --rate-limit TEXT</td>
<td>Limit the maximum download speed in bytes per second. Use &#x27;k&#x27; and &#x27;m&#x27; suffixes for kbps and mbps.</td>
</tr>
<tr>
<td class="code">-s, --start TEXT</td>
<td class="code">-s, --start</td>
<td>Download video from this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-w, --max-workers INTEGER</td>
<td>Number of workers for downloading vods concurrently [default: <code>5</code>]</td>
<td class="code">-e, --end</td>
<td>Download video up to this time (hh:mm or hh:mm:ss)</td>
</tr>
<tr>
<td class="code">-f, --format</td>
<td>Video format to convert into, passed to ffmpeg as the target file extension. Defaults to <code>mkv</code>.</td>
</tr>
<tr>
<td class="code">-q, --quality</td>
<td>Video quality, e.g. 720p. Set to &#x27;source&#x27; to get best quality.</td>
</tr>
<tr>
<td class="code">-a, --auth-token</td>
<td>Authentication token, passed to Twitch to access subscriber only VODs. Can be copied from the &#x27;auth_token&#x27; cookie in any browser logged in on Twitch.</td>
</tr>
<tr>
<td class="code">-o, --output</td>
<td>Output file name template. See docs for details.</td>
</tr>
</tbody>
</table>
@ -117,12 +111,6 @@ Setting quality to `audio_only` will download only audio:
twitch-dl download -q audio_only 221837124
```
Download multiple videos one after the other:
```
twitch-dl download 1559928295 1557034274 1555157293 -q source
```
### Overriding the target file name
The target filename can be defined by passing the `--output` option followed by
@ -184,4 +172,4 @@ download command:
```
twitch-dl download 221837124 --auth-token iduetx4i107rn4b9wrgctf590aiktv
```
```

View File

@ -6,7 +6,7 @@ Print environment information for inclusion in bug reports.
### USAGE
```
twitch-dl env [OPTIONS]
twitch-dl env
```
<!-- ------------------- generated docs end ------------------- -->

View File

@ -6,16 +6,27 @@ Print information for a given Twitch URL, video ID or clip slug.
### USAGE
```
twitch-dl info [OPTIONS] ID
twitch-dl info <video> [FLAGS]
```
### OPTIONS
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
<td class="code">&lt;video&gt;</td>
<td>Video ID, clip slug, or URL</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON</td>
</tr>
</tbody>
</table>

View File

@ -1,56 +1,68 @@
<!-- ------------------- generated docs start ------------------- -->
# twitch-dl videos
List or download clips for given CHANNEL_NAME.
List videos for a channel.
### USAGE
```
twitch-dl videos [OPTIONS] CHANNEL_NAME
twitch-dl videos <channel_name> [FLAGS] [OPTIONS]
```
### ARGUMENTS
<table>
<tbody>
<tr>
<td class="code">&lt;channel_name&gt;</td>
<td>Name of the channel to list videos for.</td>
</tr>
</tbody>
</table>
### FLAGS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all videos, overrides --limit</td>
</tr>
<tr>
<td class="code">-j, --json</td>
<td>Show results as JSON. Ignores <code>--pager</code>.</td>
</tr>
</tbody>
</table>
### OPTIONS
<table>
<tbody>
<tr>
<td class="code">-a, --all</td>
<td>Fetch all clips, overrides --limit</td>
</tr>
<tr>
<td class="code">-c, --compact</td>
<td>Show videos in compact mode, one line per video</td>
</tr>
<tr>
<td class="code">-l, --limit INTEGER</td>
<td>Number of videos to fetch. Defaults to 40 in compact mode, 10 otherwise.</td>
</tr>
<tr>
<td class="code">-p, --pager INTEGER</td>
<td>Number of videos to show per page. Disabled by default.</td>
</tr>
<tr>
<td class="code">-g, --game TEXT</td>
<td class="code">-g, --game</td>
<td>Show videos of given game (can be given multiple times)</td>
</tr>
<tr>
<td class="code">-s, --sort TEXT</td>
<td>Sorting order of videos Possible values: <code>views</code>, <code>time</code>. [default: <code>time</code>]</td>
<td class="code">-l, --limit</td>
<td>Number of videos to fetch. Defaults to 10.</td>
</tr>
<tr>
<td class="code">-t, --type TEXT</td>
<td>Broadcast type Possible values: <code>archive</code>, <code>highlight</code>, <code>upload</code>. [default: <code>archive</code>]</td>
<td class="code">-s, --sort</td>
<td>Sorting order of videos. Defaults to <code>time</code>. Possible values: <code>views</code>, <code>time</code>.</td>
</tr>
<tr>
<td class="code">--json</td>
<td>Print data as JSON rather than human readable text</td>
<td class="code">-t, --type</td>
<td>Broadcast type. Defaults to <code>archive</code>. Possible values: <code>archive</code>, <code>highlight</code>, <code>upload</code>.</td>
</tr>
<tr>
<td class="code">-p, --pager</td>
<td>Print videos in pages. Ignores <code>--limit</code>. Defaults to 10.</td>
</tr>
</tbody>
</table>

View File

@ -1,15 +0,0 @@
# Environment variables
> Introduced in twitch-dl 2.2.0
twitch-dl allows setting defaults for parameters via environment variables.
Environment variables should be named `TWITCH_DL_<COMMAND_NAME>_<OPTION_NAME>`.
For example, when invoking `twitch-dl download`, if you always set `--quality
source` you can set the following environment variable to make this the
default:
```
TWITCH_DL_DOWNLOAD_QUALITY="source"
```

View File

@ -1,6 +1,6 @@
# Installation
twitch-dl requires **Python 3.8** or later.
twitch-dl requires **Python 3.5** or later.
## Prerequisite: FFmpeg

View File

@ -1,31 +0,0 @@
# Shell completion
> Introduced in twitch-dl 2.2.0
twitch-dl uses [Click shell completion](https://click.palletsprojects.com/en/8.1.x/shell-completion/) which works on Bash, Fish and Zsh.
To enable completion, twitch-dl must be [installed](./installation.html) as a command and available by ivoking `twitch-dl`. Then follow the instructions for your shell.
**Bash**
Add to `~/.bashrc`:
```
eval "$(_TWITCH_DL_COMPLETE=bash_source twitch-dl)"
```
**Fish**
Add to `~/.config/fish/completions/twitch-dl.fish`:
```
_TWITCH_DL_COMPLETE=fish_source twitch-dl | source
```
**Zsh**
Add to `~/.zshrc`:
```
eval "$(_TWITCH_DL_COMPLETE=zsh_source twitch-dl)"
```

View File

@ -1,59 +0,0 @@
[build-system]
requires = ["setuptools>=64", "setuptools_scm>=8"]
build-backend = "setuptools.build_meta"
[project]
name = "twitch-dl"
authors = [{ name="Ivan Habunek", email="ivan@habunek.com" }]
description = "Quickly download videos from twitch.tv from the comort of your terminal emulator"
keywords=["twitch", "vod", "video", "download"]
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.8"
dynamic = ["version"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
]
dependencies = [
"click>=8.0.0,<9.0.0",
"httpx>=0.17.0,<1.0.0",
"m3u8>=1.0.0,<4.0.0",
"python-dateutil>=2.8.0,<3.0.0",
]
[tool.setuptools]
packages = [
"twitchdl",
"twitchdl.commands",
]
[tool.setuptools_scm]
[project.optional-dependencies]
dev = [
"build",
"pytest",
"pyyaml",
"setuptools",
"twine",
"vermin",
]
[project.urls]
"Homepage" = "https://twitch-dl.bezdomni.net/"
"Source" = "https://github.com/ihabunek/twitch-dl"
[project.scripts]
twitch-dl = "twitchdl.cli:cli"
[tool.pyright]
include = ["twitchdl"]
typeCheckingMode = "strict"
[tool.ruff]
line-length = 100

4
requirements-dev.txt Normal file
View File

@ -0,0 +1,4 @@
pytest
twine
wheel
pyyaml

View File

@ -21,13 +21,6 @@ for version in data.keys():
changes = data[version]["changes"]
print(f"### [{version} ({date})](https://github.com/ihabunek/twitch-dl/releases/tag/{version})")
print()
if "description" in data[version]:
description = data[version]["description"].strip()
for line in textwrap.wrap(description, 80):
print(line)
print()
for c in changes:
lines = textwrap.wrap(c, 78)
initial = True

View File

@ -4,16 +4,13 @@
Auto-generates documentation from command defs in console.py.
"""
import click
import html
import os
import re
import shutil
import textwrap
from click import Command
from twitchdl.cli import cli
from twitchdl.console import COMMANDS
START_MARKER = "<!-- ------------------- generated docs start ------------------- -->"
@ -22,11 +19,8 @@ END_MARKER = "<!-- ------------------- generated docs end ------------------- --
def main():
update_changelog()
parent_ctx = click.Context(cli, info_name="twitch-dl")
for name, command in cli.commands.items():
ctx = click.Context(cli, info_name=name, parent=parent_ctx)
update_docs(command, ctx)
for command in COMMANDS:
update_docs(command)
def update_changelog():
@ -37,9 +31,9 @@ def update_changelog():
shutil.copy(source, target)
def update_docs(command: Command, ctx: click.Context):
def update_docs(command):
path = os.path.join("docs", "commands", f"{command.name}.md")
content = render_command(command, ctx)
content = render_command(command)
if not os.path.exists(path):
print(f"Creating: {path}")
@ -51,42 +45,53 @@ def update_docs(command: Command, ctx: click.Context):
write(path, content)
def render_command(command: Command, ctx: click.Context):
def render_command(command):
content = START_MARKER
content += f"\n# twitch-dl {command.name}\n\n"
if command.help:
content += command.help + "\n\n"
content += render_usage(ctx, command)
content += render_options(ctx, command)
content += command.description + "\n\n"
content += render_usage(command)
content += render_arguments(command)
content += render_flags(command)
content += render_options(command)
return content
def render_usage(ctx: click.Context, command: Command):
def render_usage(command):
arguments = get_arguments(command)
arguments = " ".join(f"<{name}>" for [name, _] in arguments)
flags = get_flags(command)
options = get_options(command)
content = "### USAGE\n\n"
content += "```\n"
content += command.get_usage(ctx).replace("Usage: ", "")
content += f"twitch-dl {command.name} {arguments}"
if flags:
content += " [FLAGS]"
if options:
content += " [OPTIONS]"
content += "\n```\n\n"
return content
def render_options(ctx, command: Command):
options = list(get_options(command))
def render_arguments(command):
arguments = get_arguments(command)
if not options:
if not arguments:
return ""
content = "### OPTIONS\n\n"
content = "### ARGUMENTS\n\n"
content += "<table>\n"
content += "<tbody>"
for opts, help in options:
for [name, params] in arguments:
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(opts)}</td>
<td>{escape(help)}</td>
<td class="code">&lt;{escape(name)}&gt;</td>
<td>{escape(params['help'])}</td>
</tr>
""")
content += "</tbody>\n"
@ -95,39 +100,85 @@ def render_options(ctx, command: Command):
return content
def get_options(command: Command):
for option in command.params:
if isinstance(option, click.Option):
opts = ", ".join(option.opts)
opts += option_type(option)
def render_flags(command):
flags = get_flags(command)
help = option.help or ""
help = re.sub(r"\s+", " ", help)
help += choices(option)
if option.default:
help += f" [default: `{option.default}`]"
if not flags:
return ""
yield opts, help
content = "### FLAGS\n\n"
content += "<table>\n"
content += "<tbody>"
for [names, params] in flags:
names = ", ".join(f"{name}" for name in names)
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(names)}</td>
<td>{escape(params['help'])}</td>
</tr>
""")
content += "</tbody>\n"
content += "</table>\n\n"
return content
def option_type(option: click.Option):
match option.type:
case click.types.StringParamType():
return " TEXT"
case click.types.Choice():
return " TEXT"
case click.types.IntParamType():
return " INTEGER"
case _:
return ""
def render_options(command):
options = get_options(command)
def choices(option: click.Option):
if isinstance(option.type, click.Choice):
choices = ", ".join(f"`{c}`" for c in option.type.choices)
if not options:
return ""
content = "### OPTIONS\n\n"
content += "<table>\n"
content += "<tbody>"
for [names, params] in options:
names = ", ".join(f"{name}" for name in names)
content += textwrap.dedent(f"""
<tr>
<td class="code">{escape(names)}</td>
<td>{escape(params['help'])}{choices(params)}</td>
</tr>
""")
content += "</tbody>\n"
content += "</table>\n\n"
return content
def choices(params):
if "choices" in params:
choices = ", ".join(code(c) for c in params["choices"])
return f" Possible values: {choices}."
return ""
def get_arguments(command):
return [
[names[0], options]
for names, options in command.arguments
if len(names) == 1 and not names[0].startswith("-")
]
def get_flags(command):
return [
[names, options]
for names, options in command.arguments
if names[0].startswith("-") and "type" not in options
]
def get_options(command):
return [
[names, options]
for names, options in command.arguments
if names[0].startswith("-") and "type" in options
]
def read(path):
with open(path, "r") as f:
return f.read()
@ -138,6 +189,10 @@ def write(path, content):
return f.write(content)
def code(string):
return f"<code>{string}</code>"
def escape(text: str):
text = html.escape(text)
text = re.sub(r"`([\S]+)`", "<code>\\1</code>", text)

View File

@ -44,18 +44,14 @@ if dist_version != version:
release_date = changelog_item["date"]
changes = changelog_item["changes"]
description = changelog_item["description"] if "description" in changelog_item else None
if not isinstance(release_date, date):
print(f"Release date not set for version `{version}` in the changelog.", file=sys.stderr)
sys.exit(1)
commit_message = f"twitch-dl {version}\n\n"
if description:
lines = textwrap.wrap(description.strip(), 72)
commit_message += "\n".join(lines) + "\n\n"
for c in changes:
lines = textwrap.wrap(c, 69)
lines = textwrap.wrap(c, 70)
initial = True
for line in lines:
lead = " *" if initial else " "

44
setup.py Normal file
View File

@ -0,0 +1,44 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
long_description = """
Quickly download videos from twitch.tv.
Works simliarly to youtube-dl but downloads multiple VODs in parallel which
makes it faster.
"""
setup(
name='twitch-dl',
version='1.22.0',
description='Twitch downloader',
long_description=long_description.strip(),
author='Ivan Habunek',
author_email='ivan@habunek.com',
url='https://github.com/ihabunek/twitch-dl/',
project_urls={
"Documentation": "https://twitch-dl.bezdomni.net/"
},
keywords='twitch vod video download',
license='GPLv3',
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
packages=find_packages(),
python_requires='>=3.5',
install_requires=[
"m3u8>=1.0.0,<2.0.0",
"requests>=2.13,<3.0",
],
entry_points={
'console_scripts': [
'twitch-dl=twitchdl.console:main',
],
}
)

View File

@ -2,10 +2,7 @@
These tests depend on the channel having some videos and clips published.
"""
import httpx
import m3u8
from twitchdl import twitch
from twitchdl.commands.download import _parse_playlists, get_clip_authenticated_url
TEST_CHANNEL = "bananasaurus_rex"
@ -19,21 +16,6 @@ def test_get_videos():
video = twitch.get_video(video_id)
assert video["id"] == video_id
access_token = twitch.get_access_token(video_id)
assert "signature" in access_token
assert "value" in access_token
playlists = twitch.get_playlists(video_id, access_token)
assert playlists.startswith("#EXTM3U")
name, res, url = next(_parse_playlists(playlists))
playlist = httpx.get(url).text
assert playlist.startswith("#EXTM3U")
playlist = m3u8.loads(playlist)
vod_path = playlist.segments[0].uri
assert vod_path == "0.ts"
def test_get_clips():
"""
@ -43,8 +25,6 @@ def test_get_clips():
assert clips["pageInfo"]
assert len(clips["edges"]) > 0
slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(slug)
assert clip["slug"] == slug
assert get_clip_authenticated_url(slug, "source")
clip_slug = clips["edges"][0]["node"]["slug"]
clip = twitch.get_clip(clip_slug)
assert clip["slug"] == clip_slug

View File

@ -1,102 +0,0 @@
from twitchdl.progress import Progress
def test_initial_values():
progress = Progress(10)
assert progress.downloaded == 0
assert progress.estimated_total is None
assert progress.progress_perc == 0
assert progress.remaining_time is None
assert progress.speed is None
assert progress.vod_count == 10
assert progress.vod_downloaded_count == 0
def test_downloaded():
progress = Progress(3)
progress.start(1, 300)
progress.start(2, 300)
progress.start(3, 300)
assert progress.downloaded == 0
assert progress.progress_bytes == 0
assert progress.progress_perc == 0
progress.advance(1, 100)
assert progress.downloaded == 100
assert progress.progress_bytes == 100
assert progress.progress_perc == 11
progress.advance(2, 200)
assert progress.downloaded == 300
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
progress.advance(3, 150)
assert progress.downloaded == 450
assert progress.progress_bytes == 450
assert progress.progress_perc == 50
progress.advance(1, 50)
assert progress.downloaded == 500
assert progress.progress_bytes == 500
assert progress.progress_perc == 55
progress.abort(2)
assert progress.downloaded == 500
assert progress.progress_bytes == 300
assert progress.progress_perc == 33
progress.start(2, 300)
progress.advance(1, 150)
progress.advance(2, 300)
progress.advance(3, 150)
assert progress.downloaded == 1100
assert progress.progress_bytes == 900
assert progress.progress_perc == 100
progress.end(1)
progress.end(2)
progress.end(3)
assert progress.downloaded == 1100
assert progress.progress_bytes == 900
assert progress.progress_perc == 100
def test_estimated_total():
progress = Progress(3)
assert progress.estimated_total is None
progress.start(1, 12000)
assert progress.estimated_total == 12000 * 3
progress.start(2, 11000)
assert progress.estimated_total == 11500 * 3
progress.start(3, 10000)
assert progress.estimated_total == 11000 * 3
def test_vod_downloaded_count():
progress = Progress(3)
progress.start(1, 100)
progress.start(2, 100)
progress.start(3, 100)
assert progress.vod_downloaded_count == 0
progress.advance(1, 100)
progress.end(1)
assert progress.vod_downloaded_count == 1
progress.advance(2, 100)
progress.end(2)
assert progress.vod_downloaded_count == 2
progress.advance(3, 100)
progress.end(3)
assert progress.vod_downloaded_count == 3

View File

@ -1,8 +1,3 @@
from importlib import metadata
__version__ = "1.22.0"
try:
__version__ = metadata.version("twitch-dl")
except metadata.PackageNotFoundError:
__version__ = "0.0.0"
CLIENT_ID = "kd1unb4b3q4t58fwlpcbzcbnm76a8fp"
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"

View File

@ -1,3 +1,3 @@
from twitchdl.cli import cli
from twitchdl.console import main
cli()
main()

View File

@ -1,391 +0,0 @@
import click
import logging
import platform
import re
import sys
from twitchdl import __version__
from twitchdl.entities import DownloadOptions
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context
# https://click.palletsprojects.com/en/8.1.x/api/#context
CONTEXT = dict(
# Enable using environment variables to set options
auto_envvar_prefix="TWITCH_DL",
# Add shorthand -h for invoking help
help_option_names=["-h", "--help"],
# Always show default values for options
show_default=True,
# Make help a bit wider
max_content_width=100,
)
json_option = click.option(
"--json",
is_flag=True,
default=False,
help="Print data as JSON rather than human readable text",
)
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: int | None):
if value is not None and value <= 0:
raise click.BadParameter("must be greater than 0")
return value
def validate_time(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None:
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
if not value:
return None
parts = [int(p) for p in value.split(":")]
if not 2 <= len(parts) <= 3:
raise click.BadParameter("invalid time")
hours = parts[0]
minutes = parts[1]
seconds = parts[2] if len(parts) > 2 else 0
if hours < 0 or not (0 <= minutes <= 59) or not (0 <= seconds <= 59):
raise click.BadParameter("invalid time")
return hours * 3600 + minutes * 60 + seconds
def validate_rate(_ctx: click.Context, _param: click.Parameter, value: str) -> int | None:
if not value:
return None
match = re.search(r"^([0-9]+)(k|m|)$", value, flags=re.IGNORECASE)
if not match:
raise click.BadParameter("must be an integer, followed by an optional 'k' or 'm'")
amount = int(match.group(1))
unit = match.group(2)
if unit == "k":
return amount * 1024
if unit == "m":
return amount * 1024 * 1024
return amount
@click.group(context_settings=CONTEXT)
@click.option("--debug/--no-debug", default=False, help="Log debug info to stderr")
@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output")
@click.version_option(package_name="twitch-dl")
@click.pass_context
def cli(ctx: click.Context, color: bool, debug: bool):
"""twitch-dl - twitch.tv downloader
https://toot.bezdomni.net/
"""
ctx.color = color
if debug:
logging.basicConfig(level=logging.INFO)
@cli.command()
@click.argument("channel_name")
@click.option(
"-a",
"--all",
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-d",
"--download",
help="Download clips in given period (in source quality)",
is_flag=True,
)
@click.option(
"-l",
"--limit",
help="Number of clips to fetch [max: 100]",
type=int,
default=10,
callback=validate_positive,
)
@click.option(
"-p",
"--pager",
help="Number of clips to show per page. Disabled by default.",
type=int,
callback=validate_positive,
is_flag=False,
flag_value=10,
)
@click.option(
"-P",
"--period",
help="Period from which to return clips",
default="all_time",
type=click.Choice(["last_day", "last_week", "last_month", "all_time"]),
)
@json_option
def clips(
channel_name: str,
all: bool,
download: bool,
json: bool,
limit: int,
pager: int | None,
period: ClipsPeriod,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.clips import clips
clips(
channel_name,
all=all,
download=download,
json=json,
limit=limit,
pager=pager,
period=period,
)
@cli.command()
@click.argument("ids", nargs=-1)
@click.option(
"-a",
"--auth-token",
help="""Authentication token, passed to Twitch to access subscriber only
VODs. Can be copied from the `auth_token` cookie in any browser logged
in on Twitch.""",
)
@click.option(
"-c",
"--chapter",
help="""Download a single chapter of the video. Specify the chapter number
or use the flag without a number to display a chapter select prompt.
""",
type=int,
is_flag=False,
flag_value=0,
)
@click.option(
"--concat",
is_flag=True,
help="""Do not use ffmpeg to join files, concat them instead. This will
produce a .ts file by default.""",
)
@click.option(
"-d",
"--dry-run",
help="Simulate the download provcess without actually downloading any files.",
is_flag=True,
)
@click.option(
"-e",
"--end",
help="Download video up to this time (hh:mm or hh:mm:ss)",
callback=validate_time,
)
@click.option(
"-f",
"--format",
help="""Video format to convert into, passed to ffmpeg as the target file
extension. Defaults to `mkv`. If `--concat` is passed, defaults to
`ts`.""",
)
@click.option(
"-k",
"--keep",
help="Don't delete downloaded VODs and playlists after merging.",
is_flag=True,
)
@click.option(
"--no-join",
help="Don't run ffmpeg to join the downloaded vods, implies --keep.",
is_flag=True,
)
@click.option(
"--overwrite",
help="Overwrite the target file if it already exists without prompting.",
is_flag=True,
)
@click.option(
"-o",
"--output",
help="Output file name template. See docs for details.",
default="{date}_{id}_{channel_login}_{title_slug}.{format}",
)
@click.option(
"-q",
"--quality",
help="Video quality, e.g. `720p`. Set to `source` to get best quality.",
)
@click.option(
"-r",
"--rate-limit",
help="""Limit the maximum download speed in bytes per second. Use 'k' and
'm' suffixes for kbps and mbps.""",
callback=validate_rate,
)
@click.option(
"-s",
"--start",
help="Download video from this time (hh:mm or hh:mm:ss)",
callback=validate_time,
)
@click.option(
"-w",
"--max-workers",
help="Number of workers for downloading vods concurrently",
type=int,
default=5,
)
def download(
ids: tuple[str, ...],
auth_token: str | None,
chapter: int | None,
concat: bool,
dry_run: bool,
end: int | None,
format: str,
keep: bool,
no_join: bool,
overwrite: bool,
output: str,
quality: str | None,
rate_limit: str | None,
start: int | None,
max_workers: int,
):
"""Download videos or clips.
Pass one or more video ID, clip slug or Twitch URL to download.
"""
from twitchdl.commands.download import download
if not format:
format = "ts" if concat else "mkv"
options = DownloadOptions(
auth_token=auth_token,
chapter=chapter,
concat=concat,
dry_run=dry_run,
end=end,
format=format,
keep=keep,
no_join=no_join,
overwrite=overwrite,
output=output,
quality=quality,
rate_limit=rate_limit,
start=start,
max_workers=max_workers,
)
download(list(ids), options)
@cli.command()
def env():
"""Print environment information for inclusion in bug reports."""
click.echo(f"twitch-dl {__version__}")
click.echo(f"Python {sys.version}")
click.echo(f"Platform: {platform.platform()}")
@cli.command()
@click.argument("id")
@json_option
def info(id: str, json: bool):
"""Print information for a given Twitch URL, video ID or clip slug."""
from twitchdl.commands.info import info
info(id, json=json)
@cli.command()
@click.argument("channel_name")
@click.option(
"-a",
"--all",
help="Fetch all clips, overrides --limit",
is_flag=True,
)
@click.option(
"-c",
"--compact",
help="Show videos in compact mode, one line per video",
is_flag=True,
)
@click.option(
"-l",
"--limit",
help="Number of videos to fetch. Defaults to 40 in compact mode, 10 otherwise.",
type=int,
callback=validate_positive,
)
@click.option(
"-p",
"--pager",
help="Number of videos to show per page. Disabled by default.",
type=int,
callback=validate_positive,
is_flag=False,
flag_value=10,
)
@click.option(
"-g",
"--game",
"games_tuple",
help="Show videos of given game (can be given multiple times)",
multiple=True,
)
@click.option(
"-s",
"--sort",
help="Sorting order of videos",
default="time",
type=click.Choice(["views", "time"]),
)
@click.option(
"-t",
"--type",
help="Broadcast type",
default="archive",
type=click.Choice(["archive", "highlight", "upload"]),
)
@json_option
def videos(
channel_name: str,
all: bool,
compact: bool,
games_tuple: tuple[str, ...],
json: bool,
limit: int | None,
pager: int | None,
sort: VideosSort,
type: VideosType,
):
"""List or download clips for given CHANNEL_NAME."""
from twitchdl.commands.videos import videos
# Click provides a tuple, make it a list instead
games = list(games_tuple)
videos(
channel_name,
all=all,
compact=compact,
games=games,
json=json,
limit=limit,
pager=pager,
sort=sort,
type=type,
)

View File

@ -0,0 +1,13 @@
from .clips import clips
from .download import download
from .env import env
from .info import info
from .videos import videos
__all__ = [
clips,
download,
env,
info,
videos,
]

View File

@ -1,5 +1,3 @@
from typing import Generator
import click
import re
import sys
@ -9,42 +7,30 @@ from os import path
from twitchdl import twitch, utils
from twitchdl.commands.download import get_clip_authenticated_url
from twitchdl.download import download_file
from twitchdl.entities import Data
from twitchdl.output import green, print_clip, print_json, print_paged, yellow
from twitchdl.output import print_out, print_clip, print_json
def clips(
channel_name: str,
*,
all: bool = False,
download: bool = False,
json: bool = False,
limit: int = 10,
pager: int | None = None,
period: twitch.ClipsPeriod = "all_time",
):
def clips(args):
# Ignore --limit if --pager or --all are given
limit = sys.maxsize if all or pager else limit
limit = sys.maxsize if args.all or args.pager else args.limit
generator = twitch.channel_clips_generator(channel_name, period, limit)
generator = twitch.channel_clips_generator(args.channel_name, args.period, limit)
if json:
if args.json:
return print_json(list(generator))
if download:
if args.download:
return _download_clips(generator)
if pager:
return print_paged("Clips", generator, print_clip, pager)
if args.pager:
print(args)
return _print_paged(generator, args.pager)
return _print_all(generator, all)
return _print_all(generator, args)
def _continue():
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
try:
input()
@ -54,14 +40,12 @@ def _continue():
return True
def _target_filename(clip: Data):
def _target_filename(clip):
url = clip["videoQualities"][0]["sourceURL"]
_, ext = path.splitext(url)
ext = ext.lstrip(".")
match = re.search(r"^(\d{4})-(\d{2})-(\d{2})T", clip["createdAt"])
if not match:
raise ValueError(f"Failed parsing date from: {clip['createdAt']}")
date = "".join(match.groups())
name = "_".join([
@ -71,29 +55,56 @@ def _target_filename(clip: Data):
utils.slugify(clip["title"]),
])
return f"{name}.{ext}"
return "{}.{}".format(name, ext)
def _download_clips(generator: Generator[Data, None, None]):
def _download_clips(generator):
for clip in generator:
target = _target_filename(clip)
if path.exists(target):
click.echo(f"Already downloaded: {green(target)}")
print_out("Already downloaded: <green>{}</green>".format(target))
else:
url = get_clip_authenticated_url(clip["slug"], "source")
click.echo(f"Downloading: {yellow(target)}")
print_out("Downloading: <yellow>{}</yellow>".format(target))
download_file(url, target)
def _print_all(generator: Generator[Data, None, None], all: bool):
def _print_all(generator, args):
for clip in generator:
click.echo()
print_out()
print_clip(clip)
if not all:
click.secho(
"\nThere may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.",
dim=True
if not args.all:
print_out(
"\n<dim>There may be more clips. " +
"Increase the --limit, use --all or --pager to see the rest.</dim>"
)
def _print_paged(generator, page_size):
iterator = iter(generator)
page = list(islice(iterator, page_size))
first = 1
last = first + len(page) - 1
while True:
print_out("-" * 80)
print_out()
for clip in page:
print_clip(clip)
print_out()
last = first + len(page) - 1
print_out("-" * 80)
print_out("<yellow>Clips {}-{}</yellow>".format(first, last))
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not _continue():
break

View File

@ -1,43 +1,18 @@
import asyncio
import platform
import click
import httpx
import m3u8
import os
import re
import requests
import shutil
import subprocess
import tempfile
from os import path
from pathlib import Path
from typing import List, Optional, OrderedDict
from urllib.parse import urlparse, urlencode
from twitchdl import twitch, utils
from twitchdl.conversion import from_dict
from twitchdl.download import download_file
from twitchdl.entities import Data, DownloadOptions, Video
from twitchdl.download import download_file, download_files
from twitchdl.exceptions import ConsoleError
from twitchdl.http import download_all
from twitchdl.output import blue, bold, dim, green, print_log, yellow
def download(ids: list[str], args: DownloadOptions):
for video_id in ids:
download_one(video_id, args)
def download_one(video: str, args: DownloadOptions):
video_id = utils.parse_video_identifier(video)
if video_id:
return _download_video(video_id, args)
clip_slug = utils.parse_clip_identifier(video)
if clip_slug:
return _download_clip(clip_slug, args)
raise ConsoleError(f"Invalid input: {video}")
from twitchdl.output import print_out
def _parse_playlists(playlists_m3u8):
@ -64,90 +39,71 @@ def _get_playlist_by_name(playlists, quality):
return uri
available = ", ".join([name for (name, _, _) in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
raise ConsoleError(msg)
def _select_playlist_interactive(playlists):
click.echo("\nAvailable qualities:")
print_out("\nAvailable qualities:")
for n, (name, resolution, uri) in enumerate(playlists):
if resolution:
click.echo(f"{n + 1}) {bold(name)} {dim(f'({resolution})')}")
print_out("{}) {} [{}]".format(n + 1, name, resolution))
else:
click.echo(f"{n + 1}) {bold(name)}")
print_out("{}) {}".format(n + 1, name))
no = utils.read_int("Choose quality", min=1, max=len(playlists) + 1, default=1)
_, _, uri = playlists[no - 1]
return uri
def _join_vods(playlist_path: str, target: str, overwrite: bool, video: Video):
description = video.description or ""
description = description.strip()
def _join_vods(playlist_path, target, overwrite, video):
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", f"artist={video.creator.display_name}",
"-metadata", f"title={video.title}",
"-metadata", f"description={description}",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
f"file:{target}",
"file:{}".format(target),
]
if overwrite:
command.append("-y")
click.secho(f"{' '.join(command)}", dim = True)
print_out("<dim>{}</dim>".format(" ".join(command)))
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
def _concat_vods(vod_paths: list[str], target: str):
tool = "type" if platform.system() == "Windows" else "cat"
command = [tool] + vod_paths
with open(target, "wb") as target_file:
result = subprocess.run(command, stdout=target_file)
if result.returncode != 0:
raise ConsoleError(f"Joining files failed: {result.stderr}")
def _video_target_filename(video, args):
date, time = video['publishedAt'].split("T")
game = video["game"]["name"] if video["game"] else "Unknown"
def get_video_placeholders(video: Video, format: str) -> Data:
date = video.published_at.date().isoformat()
time = video.published_at.time().isoformat()
datetime = video.published_at.isoformat().replace("+00:00", "Z")
game = video.game.name if video.game else "Unknown"
return {
"channel": video.creator.display_name,
"channel_login": video.creator.login,
subs = {
"channel": video["creator"]["displayName"],
"channel_login": video["creator"]["login"],
"date": date,
"datetime": datetime,
"format": format,
"datetime": video["publishedAt"],
"format": args.format,
"game": game,
"game_slug": utils.slugify(game),
"id": video.id,
"id": video["id"],
"time": time,
"title": utils.titlify(video.title),
"title_slug": utils.slugify(video.title),
"title": utils.titlify(video["title"]),
"title_slug": utils.slugify(video["title"]),
}
def _video_target_filename(video: Video, args: DownloadOptions):
subs = get_video_placeholders(video, args.format)
try:
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _clip_target_filename(clip, args: DownloadOptions):
def _clip_target_filename(clip, args):
date, time = clip["createdAt"].split("T")
game = clip["game"]["name"] if clip["game"] else "Unknown"
@ -174,10 +130,10 @@ def _clip_target_filename(clip, args: DownloadOptions):
return args.output.format(**subs)
except KeyError as e:
supported = ", ".join(subs.keys())
raise ConsoleError(f"Invalid key {e} used in --output. Supported keys are: {supported}")
raise ConsoleError("Invalid key {} used in --output. Supported keys are: {}".format(e, supported))
def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[str]:
def _get_vod_paths(playlist, start, end):
"""Extract unique VOD paths for download from playlist."""
files = []
vod_start = 0
@ -197,7 +153,7 @@ def _get_vod_paths(playlist, start: Optional[int], end: Optional[int]) -> List[s
return files
def _crete_temp_dir(base_uri: str) -> str:
def _crete_temp_dir(base_uri):
"""Create a temp dir to store downloads if it doesn't exist."""
path = urlparse(base_uri).path.lstrip("/")
temp_dir = Path(tempfile.gettempdir(), "twitch-dl", path)
@ -205,6 +161,18 @@ def _crete_temp_dir(base_uri: str) -> str:
return str(temp_dir)
def download(args):
video_id = utils.parse_video_identifier(args.video)
if video_id:
return _download_video(video_id, args)
clip_slug = utils.parse_clip_identifier(args.video)
if clip_slug:
return _download_clip(clip_slug, args)
raise ConsoleError("Invalid input: {}".format(args.video))
def _get_clip_url(clip, quality):
qualities = clip["videoQualities"]
@ -219,14 +187,14 @@ def _get_clip_url(clip, quality):
return q["sourceURL"]
available = ", ".join([str(q["quality"]) for q in qualities])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
msg = "Quality '{}' not found. Available qualities are: {}".format(quality, available)
raise ConsoleError(msg)
# Ask user to select quality
click.echo("\nAvailable qualities:")
print_out("\nAvailable qualities:")
for n, q in enumerate(qualities):
click.echo(f"{n + 1}) {bold(q['quality'])} [{q['frameRate']} fps]")
click.echo()
print_out("{}) {} [{} fps]".format(n + 1, q["quality"], q["frameRate"]))
print_out()
no = utils.read_int("Choose quality", min=1, max=len(qualities), default=1)
selected_quality = qualities[no - 1]
@ -234,11 +202,11 @@ def _get_clip_url(clip, quality):
def get_clip_authenticated_url(slug, quality):
print_log("Fetching access token...")
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_clip_access_token(slug)
if not access_token:
raise ConsoleError(f"Access token not found for slug '{slug}'")
raise ConsoleError("Access token not found for slug '{}'".format(slug))
url = _get_clip_url(access_token, quality)
@ -247,84 +215,81 @@ def get_clip_authenticated_url(slug, quality):
"token": access_token["playbackAccessToken"]["value"],
})
return f"{url}?{query}"
return "{}?{}".format(url, query)
def _download_clip(slug: str, args: DownloadOptions) -> None:
print_log("Looking up clip...")
def _download_clip(slug, args):
print_out("<dim>Looking up clip...</dim>")
clip = twitch.get_clip(slug)
game = clip["game"]["name"] if clip["game"] else "Unknown"
if not clip:
raise ConsoleError(f"Clip '{slug}' not found")
raise ConsoleError("Clip '{}' not found".format(slug))
title = clip["title"]
user = clip["broadcaster"]["displayName"]
game = clip["game"]["name"] if clip["game"] else "Unknown"
duration = utils.format_duration(clip["durationSeconds"])
click.echo(f"Found: {green(title)} by {yellow(user)}, playing {blue(game)} ({duration})")
print_out("Found: <green>{}</green> by <yellow>{}</yellow>, playing <blue>{}</blue> ({})".format(
clip["title"],
clip["broadcaster"]["displayName"],
game,
utils.format_duration(clip["durationSeconds"])
))
target = _clip_target_filename(clip, args)
click.echo(f"Target: {blue(target)}")
print_out("Target: <blue>{}</blue>".format(target))
if not args.overwrite and path.exists(target):
response = click.prompt("File exists. Overwrite? [Y/n]", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
response = input("File exists. Overwrite? [Y/n]: ")
if response.lower().strip() not in ["", "y"]:
raise ConsoleError("Aborted")
args.overwrite = True
url = get_clip_authenticated_url(slug, args.quality)
print_log(f"Selected URL: {url}")
print_out("<dim>Selected URL: {}</dim>".format(url))
if args.dry_run:
click.echo("Dry run, clip not downloaded.")
else:
print_log("Downloading clip...")
download_file(url, target)
click.echo(f"Downloaded: {blue(target)}")
print_out("<dim>Downloading clip...</dim>")
download_file(url, target)
print_out("Downloaded: <blue>{}</blue>".format(target))
def _download_video(video_id: str, args: DownloadOptions) -> None:
def _download_video(video_id, args):
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
print_log("Looking up video...")
response = twitch.get_video(video_id)
print_out("<dim>Looking up video...</dim>")
video = twitch.get_video(video_id)
if not response:
raise ConsoleError(f"Video {video_id} not found")
if not video:
raise ConsoleError("Video {} not found".format(video_id))
video = from_dict(Video, response)
click.echo(f"Found: {blue(video.title)} by {yellow(video.creator.display_name)}")
print_out("Found: <blue>{}</blue> by <yellow>{}</yellow>".format(
video['title'], video['creator']['displayName']))
target = _video_target_filename(video, args)
click.echo(f"Output: {blue(target)}")
print_out("Output: <blue>{}</blue>".format(target))
if not args.overwrite and path.exists(target):
response = click.prompt("File exists. Overwrite? [Y/n]: ", default="Y", show_default=False)
if response.lower().strip() != "y":
raise click.Abort()
response = input("File exists. Overwrite? [Y/n]: ")
if response.lower().strip() not in ["", "y"]:
raise ConsoleError("Aborted")
args.overwrite = True
# Chapter select or manual offset
start, end = _determine_time_range(video_id, args)
print_log("Fetching access token...")
print_out("<dim>Fetching access token...</dim>")
access_token = twitch.get_access_token(video_id, auth_token=args.auth_token)
print_log("Fetching playlists...")
print_out("<dim>Fetching playlists...</dim>")
playlists_m3u8 = twitch.get_playlists(video_id, access_token)
playlists = list(_parse_playlists(playlists_m3u8))
playlist_uri = (_get_playlist_by_name(playlists, args.quality) if args.quality
else _select_playlist_interactive(playlists))
print_log("Fetching playlist...")
response = httpx.get(playlist_uri)
print_out("<dim>Fetching playlist...</dim>")
response = requests.get(playlist_uri)
response.raise_for_status()
playlist = m3u8.loads(response.text)
base_uri = re.sub("/[^/]+$", "/", playlist_uri)
target_dir = _crete_temp_dir(base_uri)
vod_paths = _get_vod_paths(playlist, start, end)
vod_paths = _get_vod_paths(playlist, args.start, args.end)
# Save playlists for debugging purposes
with open(path.join(target_dir, "playlists.m3u8"), "w") as f:
@ -332,16 +297,13 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
with open(path.join(target_dir, "playlist.m3u8"), "w") as f:
f.write(response.text)
click.echo(f"\nDownloading {len(vod_paths)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + path for path in vod_paths]
targets = [os.path.join(target_dir, f"{k:05d}.ts") for k, _ in enumerate(vod_paths)]
asyncio.run(download_all(sources, targets, args.max_workers, rate_limit=args.rate_limit))
print_out("\nDownloading {} VODs using {} workers to {}".format(
len(vod_paths), args.max_workers, target_dir))
path_map = download_files(base_uri, target_dir, vod_paths, args.max_workers)
# Make a modified playlist which references downloaded VODs
# Keep only the downloaded segments and skip the rest
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip(vod_paths, targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
@ -351,63 +313,18 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
playlist_path = path.join(target_dir, "playlist_downloaded.m3u8")
playlist.dump(playlist_path)
click.echo()
if args.no_join:
print_log("Skipping joining files...")
click.echo(f"VODs downloaded to:\n{blue(target_dir)}")
print_out("\n\n<dim>Skipping joining files...</dim>")
print_out("VODs downloaded to:\n<blue>{}</blue>".format(target_dir))
return
if args.concat:
print_log("Concating files...")
_concat_vods(targets, target)
else:
print_log("Joining files...")
_join_vods(playlist_path, target, args.overwrite, video)
click.echo()
print_out("\n\nJoining files...")
_join_vods(playlist_path, target, args.overwrite, video)
if args.keep:
click.echo(f"Temporary files not deleted: {target_dir}")
print_out("\n<dim>Temporary files not deleted: {}</dim>".format(target_dir))
else:
print_log("Deleting temporary files...")
print_out("\n<dim>Deleting temporary files...</dim>")
shutil.rmtree(target_dir)
click.echo(f"\nDownloaded: {green(target)}")
def _determine_time_range(video_id, args: DownloadOptions):
if args.start or args.end:
return args.start, args.end
if args.chapter is not None:
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
if not chapters:
raise ConsoleError("This video has no chapters")
if args.chapter == 0:
chapter = _choose_chapter_interactive(chapters)
else:
try:
chapter = chapters[args.chapter - 1]
except IndexError:
raise ConsoleError(f"Chapter {args.chapter} does not exist. This video has {len(chapters)} chapters.")
click.echo(f'Chapter selected: {blue(chapter["description"])}\n')
start = chapter["positionMilliseconds"] // 1000
duration = chapter["durationMilliseconds"] // 1000
return start, start + duration
return None, None
def _choose_chapter_interactive(chapters):
click.echo("\nChapters:")
for index, chapter in enumerate(chapters):
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
click.echo(f'{index + 1}) {bold(chapter["description"])} ({duration})')
index = utils.read_int("Select a chapter", 1, len(chapters))
chapter = chapters[index - 1]
return chapter
print_out("\nDownloaded: <green>{}</green>".format(target))

9
twitchdl/commands/env.py Normal file
View File

@ -0,0 +1,9 @@
import platform
import sys
import twitchdl
def env(args):
print("twitch-dl", twitchdl.__version__)
print("Platform:", platform.platform())
print("Python", sys.version)

View File

@ -1,22 +1,18 @@
import click
import m3u8
from twitchdl import utils, twitch
from twitchdl.commands.download import get_video_placeholders
from twitchdl.conversion import from_dict
from twitchdl.entities import Data, Video
from twitchdl.exceptions import ConsoleError
from twitchdl.output import bold, print_table, print_video, print_clip, print_json, print_log
from twitchdl.output import print_video, print_clip, print_json, print_out, print_log
def info(id: str, *, json: bool = False):
video_id = utils.parse_video_identifier(id)
def info(args):
video_id = utils.parse_video_identifier(args.video)
if video_id:
print_log("Fetching video...")
response = twitch.get_video(video_id)
video = twitch.get_video(video_id)
if not response:
raise ConsoleError(f"Video {video_id} not found")
if not video:
raise ConsoleError("Video {} not found".format(video_id))
print_log("Fetching access token...")
access_token = twitch.get_access_token(video_id)
@ -24,55 +20,40 @@ def info(id: str, *, json: bool = False):
print_log("Fetching playlists...")
playlists = twitch.get_playlists(video_id, access_token)
print_log("Fetching chapters...")
chapters = twitch.get_video_chapters(video_id)
if video:
if args.json:
video_json(video, playlists)
else:
video_info(video, playlists)
return
if json:
video_json(response, playlists, chapters)
else:
video = from_dict(Video, response)
video_info(video, playlists, chapters)
return
clip_slug = utils.parse_clip_identifier(id)
clip_slug = utils.parse_clip_identifier(args.video)
if clip_slug:
print_log("Fetching clip...")
clip = twitch.get_clip(clip_slug)
if not clip:
raise ConsoleError(f"Clip {clip_slug} not found")
raise ConsoleError("Clip {} not found".format(clip_slug))
if json:
if args.json:
print_json(clip)
else:
clip_info(clip)
return
raise ConsoleError(f"Invalid input: {id}")
raise ConsoleError("Invalid input: {}".format(args.video))
def video_info(video: Video, playlists, chapters):
click.echo()
def video_info(video, playlists):
print_out()
print_video(video)
click.echo("Playlists:")
print_out()
print_out("Playlists:")
for p in m3u8.loads(playlists).playlists:
click.echo(f"{bold(p.stream_info.video)} {p.uri}")
if chapters:
click.echo()
click.echo("Chapters:")
for chapter in chapters:
start = utils.format_time(chapter["positionMilliseconds"] // 1000, force_hours=True)
duration = utils.format_time(chapter["durationMilliseconds"] // 1000)
click.echo(f'{start} {bold(chapter["description"])} ({duration})')
placeholders = get_video_placeholders(video, format = "mkv")
placeholders = [[f"{{{k}}}", v] for k, v in placeholders.items()]
click.echo("")
print_table(["Placeholder", "Value"], placeholders)
print_out("<b>{}</b> {}".format(p.stream_info.video, p.uri))
def video_json(video, playlists, chapters):
def video_json(video, playlists):
playlists = m3u8.loads(playlists).playlists
video["playlists"] = [
@ -85,16 +66,14 @@ def video_json(video, playlists, chapters):
} for p in playlists
]
video["chapters"] = chapters
print_json(video)
def clip_info(clip: Data):
click.echo()
def clip_info(clip):
print_out()
print_clip(clip)
click.echo()
click.echo("Download links:")
print_out()
print_out("Download links:")
for q in clip["videoQualities"]:
click.echo(f"{bold(q['quality'])} [{q['frameRate']} fps] {q['sourceURL']}")
print_out("<b>{quality}p{frameRate}</b> {sourceURL}".format(**q))

View File

@ -1,36 +1,19 @@
import sys
import click
from twitchdl import twitch
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_log, print_paged, print_video, print_json, print_video_compact
from twitchdl.output import print_out, print_paged_videos, print_video, print_json
def videos(
channel_name: str,
*,
all: bool,
compact: bool,
games: list[str],
json: bool,
limit: int | None,
pager: int | None,
sort: twitch.VideosSort,
type: twitch.VideosType,
):
game_ids = _get_game_ids(games)
# Set different defaults for limit for compact display
limit = limit or (40 if compact else 10)
def videos(args):
game_ids = _get_game_ids(args.game)
# Ignore --limit if --pager or --all are given
max_videos = sys.maxsize if all or pager else limit
max_videos = sys.maxsize if args.all or args.pager else args.limit
total_count, generator = twitch.channel_videos_generator(
channel_name, max_videos, sort, type, game_ids=game_ids)
args.channel_name, max_videos, args.sort, args.type, game_ids=game_ids)
if json:
if args.json:
videos = list(generator)
print_json({
"count": len(videos),
@ -40,44 +23,40 @@ def videos(
return
if total_count == 0:
click.echo("No videos found")
print_out("<yellow>No videos found</yellow>")
return
if pager:
print_fn = print_video_compact if compact else print_video
print_paged("Videos", generator, print_fn, pager, total_count)
if args.pager:
print_paged_videos(generator, args.pager, total_count)
return
count = 0
for video in generator:
if compact:
print_video_compact(video)
else:
click.echo()
print_video(video)
print_out()
print_video(video)
count += 1
click.echo()
click.echo("-" * 80)
click.echo(f"Videos 1-{count} of {total_count}")
print_out()
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(1, count, total_count))
if total_count > count:
click.secho(
"\nThere are more videos. Increase the --limit, use --all or --pager to see the rest.",
dim=True
print_out()
print_out(
"<dim>There are more videos. Increase the --limit, use --all or --pager to see the rest.</dim>"
)
def _get_game_ids(names: list[str]) -> list[str]:
def _get_game_ids(names):
if not names:
return []
game_ids = []
for name in names:
print_log(f"Looking up game '{name}'...")
print_out("<dim>Looking up game '{}'...</dim>".format(name))
game_id = twitch.get_game_id(name)
if not game_id:
raise ConsoleError(f"Game '{name}' not found")
raise ConsoleError("Game '{}' not found".format(name))
game_ids.append(int(game_id))
return game_ids

290
twitchdl/console.py Normal file
View File

@ -0,0 +1,290 @@
# -*- coding: utf-8 -*-
import logging
import sys
from argparse import ArgumentParser, ArgumentTypeError
from collections import namedtuple
from twitchdl.exceptions import ConsoleError
from twitchdl.output import print_err
from twitchdl.twitch import GQLError
from . import commands, __version__
Command = namedtuple("Command", ["name", "description", "arguments"])
CLIENT_WEBSITE = 'https://github.com/ihabunek/twitch-dl'
def time(value):
"""Parse a time string (hh:mm or hh:mm:ss) to number of seconds."""
parts = [int(p) for p in value.split(":")]
if not 2 <= len(parts) <= 3:
raise ArgumentTypeError()
hours = parts[0]
minutes = parts[1]
seconds = parts[2] if len(parts) > 2 else 0
if hours < 0 or not (0 <= minutes <= 59) or not (0 <= seconds <= 59):
raise ArgumentTypeError()
return hours * 3600 + minutes * 60 + seconds
def pos_integer(value):
try:
value = int(value)
except ValueError:
raise ArgumentTypeError("must be an integer")
if value < 1:
raise ArgumentTypeError("must be positive")
return value
COMMANDS = [
Command(
name="videos",
description="List videos for a channel.",
arguments=[
(["channel_name"], {
"help": "Name of the channel to list videos for.",
"type": str,
}),
(["-g", "--game"], {
"help": "Show videos of given game (can be given multiple times)",
"action": "append",
"type": str,
}),
(["-l", "--limit"], {
"help": "Number of videos to fetch. Defaults to 10.",
"type": pos_integer,
"default": 10,
}),
(["-a", "--all"], {
"help": "Fetch all videos, overrides --limit",
"action": "store_true",
"default": False,
}),
(["-s", "--sort"], {
"help": "Sorting order of videos. Defaults to `time`.",
"type": str,
"choices": ["views", "time"],
"default": "time",
}),
(["-t", "--type"], {
"help": "Broadcast type. Defaults to `archive`.",
"type": str,
"choices": ["archive", "highlight", "upload"],
"default": "archive",
}),
(["-j", "--json"], {
"help": "Show results as JSON. Ignores `--pager`.",
"action": "store_true",
"default": False,
}),
(["-p", "--pager"], {
"help": "Print videos in pages. Ignores `--limit`. Defaults to 10.",
"type": pos_integer,
"nargs": "?",
"const": 10,
}),
],
),
Command(
name="clips",
description="List or download clips for a channel.",
arguments=[
(["channel_name"], {
"help": "Name of the channel to list clips for.",
"type": str,
}),
(["-l", "--limit"], {
"help": "Number of videos to fetch (default 10, max 100)",
"type": pos_integer,
"default": 10,
}),
(["-a", "--all"], {
"help": "Fetch all videos, overrides --limit",
"action": "store_true",
"default": False,
}),
(["-P", "--period"], {
"help": "Period from which to return clips. Defaults to `all_time`.",
"type": str,
"choices": ["last_day", "last_week", "last_month", "all_time"],
"default": "all_time",
}),
(["-j", "--json"], {
"help": "Show results as JSON. Ignores `--pager`.",
"action": "store_true",
"default": False,
}),
(["-p", "--pager"], {
"help": "Number of clips to show per page. Disabled by default.",
"type": pos_integer,
"nargs": "?",
"const": 10,
}),
(["-d", "--download"], {
"help": "Download all videos in given period (in source quality)",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="download",
description="Download a video or clip.",
arguments=[
(["video"], {
"help": "Video ID, clip slug, or URL",
"type": str,
}),
(["-w", "--max-workers"], {
"help": "Maximal number of threads for downloading vods "
"concurrently (default 20)",
"type": int,
"default": 20,
}),
(["-s", "--start"], {
"help": "Download video from this time (hh:mm or hh:mm:ss)",
"type": time,
"default": None,
}),
(["-e", "--end"], {
"help": "Download video up to this time (hh:mm or hh:mm:ss)",
"type": time,
"default": None,
}),
(["-f", "--format"], {
"help": "Video format to convert into, passed to ffmpeg as the "
"target file extension. Defaults to `mkv`.",
"type": str,
"default": "mkv",
}),
(["-k", "--keep"], {
"help": "Don't delete downloaded VODs and playlists after merging.",
"action": "store_true",
"default": False,
}),
(["-q", "--quality"], {
"help": "Video quality, e.g. 720p. Set to 'source' to get best quality.",
"type": str,
}),
(["-a", "--auth-token"], {
"help": "Authentication token, passed to Twitch to access subscriber only "
"VODs. Can be copied from the 'auth_token' cookie in any browser "
"logged in on Twitch.",
"type": str,
"default": None,
}),
(["--no-join"], {
"help": "Don't run ffmpeg to join the downloaded vods, implies --keep.",
"action": "store_true",
"default": False,
}),
(["--overwrite"], {
"help": "Overwrite the target file if it already exists without prompting.",
"action": "store_true",
"default": False,
}),
(["-o", "--output"], {
"help": "Output file name template. See docs for details.",
"type": str,
"default": "{date}_{id}_{channel_login}_{title_slug}.{format}"
})
],
),
Command(
name="info",
description="Print information for a given Twitch URL, video ID or clip slug.",
arguments=[
(["video"], {
"help": "Video ID, clip slug, or URL",
"type": str,
}),
(["-j", "--json"], {
"help": "Show results as JSON",
"action": "store_true",
"default": False,
}),
],
),
Command(
name="env",
description="Print environment information for inclusion in bug reports.",
arguments=[],
)
]
COMMON_ARGUMENTS = [
(["--debug"], {
"help": "show debug log in console",
"action": 'store_true',
"default": False,
}),
(["--no-color"], {
"help": "disable ANSI colors in output",
"action": 'store_true',
"default": False,
})
]
def get_parser():
description = "A script for downloading videos from Twitch"
parser = ArgumentParser(prog='twitch-dl', description=description, epilog=CLIENT_WEBSITE)
parser.add_argument("--version", help="show version number", action='store_true')
subparsers = parser.add_subparsers(title="commands")
for command in COMMANDS:
sub = subparsers.add_parser(
command.name,
description=command.description,
epilog=CLIENT_WEBSITE
)
# Set the function to call to the function of same name in the "commands" package
sub.set_defaults(func=commands.__dict__.get(command.name))
for args, kwargs in command.arguments + COMMON_ARGUMENTS:
sub.add_argument(*args, **kwargs)
return parser
def main():
parser = get_parser()
args = parser.parse_args()
if "--debug" in sys.argv:
logging.basicConfig(level=logging.DEBUG)
if args.version:
print("twitch-dl v{}".format(__version__))
return
if "func" not in args:
parser.print_help()
return
try:
args.func(args)
except ConsoleError as e:
print_err(e)
sys.exit(1)
except KeyboardInterrupt:
print_err("Operation canceled")
sys.exit(1)
except GQLError as e:
print_err(e)
for err in e.errors:
print_err("*", err["message"])
sys.exit(1)

View File

@ -1,87 +0,0 @@
import re
import dataclasses
from dataclasses import Field, is_dataclass
from datetime import date, datetime
from dateutil import parser
from typing import Any, Generator, Type, TypeVar, Union, get_args, get_origin, Callable
from typing import get_type_hints
# Generic data class instance
T = TypeVar("T")
# Dict of data decoded from JSON
Data = dict[str, Any]
def snake_to_camel(name: str):
def repl(match: re.Match[str]):
return match.group(1).upper()
return re.sub(r"_([a-z])", repl, name)
def from_dict(cls: Type[T], data: Data, key_fn: Callable[[str], str] = snake_to_camel) -> T:
"""Convert a nested dict into an instance of `cls`."""
def _fields() -> Generator[tuple[str, Any], None, None]:
hints = get_type_hints(cls)
for field in dataclasses.fields(cls):
field_type = _prune_optional(hints[field.name])
dict_field_name = key_fn(field.name)
if (value := data.get(dict_field_name)) is not None:
field_value = _convert(field_type, value)
else:
field_value = _get_default_value(field)
yield field.name, field_value
return cls(**dict(_fields()))
def from_dict_list(cls: Type[T], data: list[Data]) -> list[T]:
return [from_dict(cls, x) for x in data]
def _get_default_value(field: Field[Any]):
if field.default is not dataclasses.MISSING:
return field.default
if field.default_factory is not dataclasses.MISSING:
return field.default_factory()
return None
def _convert(field_type: Type[Any], value: Any) -> Any:
if value is None:
return None
if field_type in [str, int, bool, dict]:
return value
if field_type == datetime:
return parser.parse(value)
if field_type == date:
return date.fromisoformat(value)
if get_origin(field_type) == list:
(inner_type,) = get_args(field_type)
return [_convert(inner_type, x) for x in value]
if is_dataclass(field_type):
return from_dict(field_type, value)
raise ValueError(f"Not implemented for type '{field_type}'")
def _prune_optional(field_type: Type[Any]):
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
if get_origin(field_type) == Union:
args = get_args(field_type)
if len(args) == 2 and args[1] == type(None): # noqa
return args[0]
return field_type

View File

@ -1,27 +1,38 @@
import os
import httpx
import requests
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from functools import partial
from requests.exceptions import RequestException
from twitchdl.output import print_out
from twitchdl.utils import format_size, format_duration
from twitchdl.exceptions import ConsoleError
CHUNK_SIZE = 1024
CONNECT_TIMEOUT = 5
RETRY_COUNT = 5
def _download(url: str, path: str):
class DownloadFailed(Exception):
pass
def _download(url, path):
tmp_path = path + ".tmp"
response = requests.get(url, stream=True, timeout=CONNECT_TIMEOUT)
size = 0
with httpx.stream("GET", url, timeout=CONNECT_TIMEOUT) as response:
with open(tmp_path, "wb") as target:
for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE):
target.write(chunk)
size += len(chunk)
with open(tmp_path, 'wb') as target:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
target.write(chunk)
size += len(chunk)
os.rename(tmp_path, path)
return size
def download_file(url: str, path: str, retries: int = RETRY_COUNT):
def download_file(url, path, retries=RETRY_COUNT):
if os.path.exists(path):
from_disk = True
return (os.path.getsize(path), from_disk)
@ -30,7 +41,63 @@ def download_file(url: str, path: str, retries: int = RETRY_COUNT):
for _ in range(retries):
try:
return (_download(url, path), from_disk)
except httpx.RequestError:
except RequestException:
pass
raise ConsoleError(f"Failed downloading after {retries} attempts: {url}")
raise DownloadFailed(":(")
def _print_progress(futures):
downloaded_count = 0
downloaded_size = 0
max_msg_size = 0
start_time = datetime.now()
total_count = len(futures)
current_download_size = 0
current_downloaded_count = 0
for future in as_completed(futures):
size, from_disk = future.result()
downloaded_count += 1
downloaded_size += size
# If we find something on disk, we don't want to take it in account in
# the speed calculation
if not from_disk:
current_download_size += size
current_downloaded_count += 1
percentage = 100 * downloaded_count // total_count
est_total_size = int(total_count * downloaded_size / downloaded_count)
duration = (datetime.now() - start_time).seconds
speed = current_download_size // duration if duration else 0
remaining = (total_count - downloaded_count) * duration / current_downloaded_count \
if current_downloaded_count else 0
msg = " ".join([
"Downloaded VOD {}/{}".format(downloaded_count, total_count),
"({}%)".format(percentage),
"<cyan>{}</cyan>".format(format_size(downloaded_size)),
"of <cyan>~{}</cyan>".format(format_size(est_total_size)),
"at <cyan>{}/s</cyan>".format(format_size(speed)) if speed > 0 else "",
"remaining <cyan>~{}</cyan>".format(format_duration(remaining)) if remaining > 0 else "",
])
max_msg_size = max(len(msg), max_msg_size)
print_out("\r" + msg.ljust(max_msg_size), end="")
def download_files(base_url, target_dir, vod_paths, max_workers):
"""
Downloads a list of VODs defined by a common `base_url` and a list of
`vod_paths`, returning a dict which maps the paths to the downloaded files.
"""
urls = [base_url + path for path in vod_paths]
targets = [os.path.join(target_dir, "{:05d}.ts".format(k)) for k, _ in enumerate(vod_paths)]
partials = (partial(download_file, url, path) for url, path in zip(urls, targets))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(fn) for fn in partials]
_print_progress(futures)
return OrderedDict(zip(vod_paths, targets))

View File

@ -1,53 +0,0 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@dataclass
class DownloadOptions:
auth_token: str | None
chapter: int | None
concat: bool
dry_run: bool
end: int | None
format: str
keep: bool
no_join: bool
overwrite: bool
output: str
quality: str | None
rate_limit: str | None
start: int | None
max_workers: int
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = dict[str, Any]
@dataclass
class User:
login: str
display_name: str
@dataclass
class Game:
name: str
@dataclass
class Video:
id: str
title: str
description: str
published_at: datetime
broadcast_type: str
length_seconds: int
game: Game
creator: User
@dataclass
class AccessToken:
signature: str
value: str

View File

@ -1,5 +1,4 @@
import click
class ConsoleError(click.ClickException):
class ConsoleError(Exception):
"""Raised when an error occurs and script exectuion should halt."""
pass

76
twitchdl/ffmpeg.py Normal file
View File

@ -0,0 +1,76 @@
import asyncio
import json
import logging
import re
from asyncio.subprocess import PIPE
from pprint import pprint
from typing import Optional
from twitchdl.output import print_out
async def join_vods(playlist_path: str, target: str, overwrite: bool, video: dict):
command = [
"ffmpeg",
"-i", playlist_path,
"-c", "copy",
"-metadata", "artist={}".format(video["creator"]["displayName"]),
"-metadata", "title={}".format(video["title"]),
"-metadata", "encoded_by=twitch-dl",
"-stats",
"-loglevel", "warning",
f"file:{target}",
]
if overwrite:
command.append("-y")
# command = ["ls", "-al"]
print_out("<dim>{}</dim>".format(" ".join(command)))
process = await asyncio.create_subprocess_exec(*command, stdout=PIPE, stderr=PIPE)
assert process.stderr is not None
await asyncio.gather(
# _read_stream("stdout", process.stdout),
_print_progress("stderr", process.stderr),
process.wait()
)
print(process.returncode)
async def _read_stream(name: str, stream: Optional[asyncio.StreamReader]):
if stream:
async for line in readlines(stream):
print(name, ">", line)
async def _print_progress(stream: asyncio.StreamReader):
async for line in readlines(stream):
print(name, ">", line)
pattern = re.compile(br"[\r\n]+")
async def readlines(stream: asyncio.StreamReader):
data = bytearray()
while not stream.at_eof():
lines = pattern.split(data)
data[:] = lines.pop(-1)
for line in lines:
yield line
data.extend(await stream.read(1024))
if __name__ == "__main__":
# logging.basicConfig(level=logging.DEBUG)
video = json.loads('{"id": "1555108011", "title": "Cult of the Lamb", "publishedAt": "2022-08-07T17:00:30Z", "broadcastType": "ARCHIVE", "lengthSeconds": 17948, "game": {"name": "Cult of the Lamb"}, "creator": {"login": "bananasaurus_rex", "displayName": "Bananasaurus_Rex"}, "playlists": [{"bandwidth": 8446533, "resolution": [1920, 1080], "codecs": "avc1.64002A,mp4a.40.2", "video": "chunked", "uri": "https://d1m7jfoe9zdc1j.cloudfront.net/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/chunked/index-dvr.m3u8"}, {"bandwidth": 3432426, "resolution": [1280, 720], "codecs": "avc1.4D0020,mp4a.40.2", "video": "720p60", "uri": "https://d1m7jfoe9zdc1j.cloudfront.net/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/720p60/index-dvr.m3u8"}, {"bandwidth": 1445268, "resolution": [852, 480], "codecs": "avc1.4D001F,mp4a.40.2", "video": "480p30", "uri": "https://d1m7jfoe9zdc1j.cloudfront.net/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/480p30/index-dvr.m3u8"}, {"bandwidth": 215355, "resolution": null, "codecs": "mp4a.40.2", "video": "audio_only", "uri": "https://d1m7jfoe9zdc1j.cloudfront.net/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/audio_only/index-dvr.m3u8"}, {"bandwidth": 705523, "resolution": [640, 360], "codecs": "avc1.4D001E,mp4a.40.2", "video": "360p30", "uri": "https://d1m7jfoe9zdc1j.cloudfront.net/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/360p30/index-dvr.m3u8"}, {"bandwidth": 285614, "resolution": [284, 160], "codecs": "avc1.4D000C,mp4a.40.2", "video": "160p30", "uri": "https://d1m7jfoe9zdc1j.cloudfront.net/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/160p30/index-dvr.m3u8"}]}')
playlist_path = "/tmp/twitch-dl/278bcbd011d28f96b856_bananasaurus_rex_40035345017_1659891626/160p30/playlist_downloaded.m3u8"
asyncio.run(join_vods(playlist_path, "out.mkv", True, video), debug=True)

View File

@ -1,133 +0,0 @@
import asyncio
import httpx
import logging
import os
import time
from abc import ABC, abstractmethod
from typing import List, Optional
from twitchdl.progress import Progress
logger = logging.getLogger(__name__)
KB = 1024
CHUNK_SIZE = 256 * KB
"""How much of a VOD to download in each iteration"""
RETRY_COUNT = 5
"""Number of times to retry failed downloads before aborting."""
TIMEOUT = 30
"""
Number of seconds to wait before aborting when there is no network activity.
https://www.python-httpx.org/advanced/#timeout-configuration
"""
class TokenBucket(ABC):
@abstractmethod
def advance(self, size: int):
pass
class LimitingTokenBucket(TokenBucket):
"""Limit the download speed by strategically inserting sleeps."""
def __init__(self, rate: int, capacity: Optional[int] = None):
self.rate: int = rate
self.capacity: int = capacity or rate * 2
self.available: int = 0
self.last_refilled: float = time.time()
def advance(self, size: int):
"""Called every time a chunk of data is downloaded."""
self._refill()
if self.available < size:
deficit = size - self.available
time.sleep(deficit / self.rate)
self.available -= size
def _refill(self):
"""Increase available capacity according to elapsed time since last refill."""
now = time.time()
elapsed = now - self.last_refilled
refill_amount = int(elapsed * self.rate)
self.available = min(self.available + refill_amount, self.capacity)
self.last_refilled = now
class EndlessTokenBucket(TokenBucket):
"""Used when download speed is not limited."""
def advance(self, size: int):
pass
async def download(
client: httpx.AsyncClient,
task_id: int,
source: str,
target: str,
progress: Progress,
token_bucket: TokenBucket,
):
# Download to a temp file first, then copy to target when over to avoid
# getting saving chunks which may persist if canceled or --keep is used
tmp_target = f"{target}.tmp"
with open(tmp_target, "wb") as f:
async with client.stream("GET", source) as response:
size = int(response.headers.get("content-length"))
progress.start(task_id, size)
async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE):
f.write(chunk)
size = len(chunk)
token_bucket.advance(size)
progress.advance(task_id, size)
progress.end(task_id)
os.rename(tmp_target, target)
async def download_with_retries(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
task_id: int,
source: str,
target: str,
progress: Progress,
token_bucket: TokenBucket,
):
async with semaphore:
if os.path.exists(target):
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return
for n in range(RETRY_COUNT):
try:
return await download(client, task_id, source, target, progress, token_bucket)
except httpx.RequestError:
logger.exception("Task {task_id} failed. Retrying. Maybe.")
progress.abort(task_id)
if n + 1 >= RETRY_COUNT:
raise
raise Exception("Should not happen")
async def download_all(
sources: List[str],
targets: List[str],
workers: int,
*,
rate_limit: Optional[int] = None
):
progress = Progress(len(sources))
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
tasks = [download_with_retries(client, semaphore, task_id, source, target, progress, token_bucket)
for task_id, (source, target) in enumerate(zip(sources, targets))]
await asyncio.gather(*tasks)

View File

@ -1,56 +1,95 @@
import click
# -*- coding: utf-8 -*-
import json
import sys
import re
from itertools import islice
from twitchdl import utils
from typing import Any, Callable, Generator, TypeVar
from twitchdl.entities import Data, Video
T = TypeVar("T")
def truncate(string: str, length: int) -> str:
if len(string) > length:
return string[:length - 1] + ""
START_CODES = {
'b': '\033[1m',
'dim': '\033[2m',
'i': '\033[3m',
'u': '\033[4m',
'red': '\033[91m',
'green': '\033[92m',
'yellow': '\033[93m',
'blue': '\033[94m',
'magenta': '\033[95m',
'cyan': '\033[96m',
}
return string
END_CODE = '\033[0m'
START_PATTERN = "<(" + "|".join(START_CODES.keys()) + ")>"
END_PATTERN = "</(" + "|".join(START_CODES.keys()) + ")>"
USE_ANSI_COLOR = "--no-color" not in sys.argv
def print_json(data: Any):
click.echo(json.dumps(data))
def start_code(match):
name = match.group(1)
return START_CODES[name]
def print_log(message: Any):
click.secho(message, err=True, dim=True)
def colorize(text):
text = re.sub(START_PATTERN, start_code, text)
text = re.sub(END_PATTERN, END_CODE, text)
return text
def print_table(headers: list[str], data: list[list[str]]):
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
underlines = ["-" * width for width in widths]
def strip_tags(text):
text = re.sub(START_PATTERN, '', text)
text = re.sub(END_PATTERN, '', text)
def print_row(row: list[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(cell.ljust(width), nl=False)
click.echo(" ", nl=False)
click.echo()
print_row(headers)
print_row(underlines)
for row in data:
print_row(row)
return text
def print_paged(
label: str,
generator: Generator[T, Any, Any],
print_fn: Callable[[T], None],
page_size: int,
total_count: int | None = None,
):
def print_out(*args, **kwargs):
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, **kwargs)
def print_json(data):
print(json.dumps(data))
def print_err(*args, **kwargs):
args = ["<red>{}</red>".format(a) for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_log(*args, **kwargs):
args = ["<dim>{}</dim>".format(a) for a in args]
args = [colorize(a) if USE_ANSI_COLOR else strip_tags(a) for a in args]
print(*args, file=sys.stderr, **kwargs)
def print_video(video):
published_at = video["publishedAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(video["lengthSeconds"])
channel = "<blue>{}</blue>".format(video["creator"]["displayName"]) if video["creator"] else ""
playing = "playing <blue>{}</blue>".format(video["game"]["name"]) if video["game"] else ""
# Can't find URL in video object, strange
url = "https://www.twitch.tv/videos/{}".format(video["id"])
print_out("<b>Video {}</b>".format(video["id"]))
print_out("<green>{}</green>".format(video["title"]))
if channel or playing:
print_out(" ".join([channel, playing]))
print_out("Published <blue>{}</blue> Length: <blue>{}</blue> ".format(published_at, length))
print_out("<i>{}</i>".format(url))
def print_paged_videos(generator, page_size, total_count):
iterator = iter(generator)
page = list(islice(iterator, page_size))
@ -58,79 +97,47 @@ def print_paged(
last = first + len(page) - 1
while True:
click.echo("-" * 80)
print_out("-" * 80)
click.echo()
for item in page:
print_fn(item)
print_out()
for video in page:
print_video(video)
print_out()
last = first + len(page) - 1
click.echo("-" * 80)
click.echo(f"{label} {first}-{last} of {total_count or '???'}")
print_out("-" * 80)
print_out("<yellow>Videos {}-{} of {}</yellow>".format(first, last, total_count))
first = first + len(page)
last = first + 1
page = list(islice(iterator, page_size))
if not page or not prompt_continue():
if not page or not _continue():
break
def print_video(video: Video):
published_at = str(video.published_at.astimezone())
length = utils.format_duration(video.length_seconds)
channel = blue(video.creator.display_name) if video.creator else ""
playing = f"playing {blue(video.game.name)}" if video.game else ""
# Can't find URL in video object, strange
url = f"https://www.twitch.tv/videos/{video.id}"
click.secho(f"Video {video.id}", bold=True)
click.secho(f"{video.title}", fg="green")
if channel or playing:
click.echo(" ".join([channel, playing]))
if video.description:
click.echo(f"Description: {video.description}")
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)
click.echo()
def print_video_compact(video: Data):
id = video["id"]
date = video["publishedAt"][:10]
game = video["game"]["name"] if video["game"] else ""
title = truncate(video["title"], 80).ljust(80)
click.echo(f"{bold(id)} {date} {green(title)} {blue(game)}")
def print_clip(clip: Data):
def print_clip(clip):
published_at = clip["createdAt"].replace("T", " @ ").replace("Z", "")
length = utils.format_duration(clip["durationSeconds"])
channel = clip["broadcaster"]["displayName"]
playing = f"playing {blue(clip['game']['name'])}" if clip["game"] else ""
click.echo(f"Clip {bold(clip['slug'])}")
click.secho(clip["title"], fg="green")
click.echo(f"{blue(channel)} {playing}")
click.echo(
f"Published {blue(published_at)}" +
f" Length: {blue(length)}" +
f" Views: {blue(clip['viewCount'])}"
playing = (
"playing <blue>{}</blue>".format(clip["game"]["name"])
if clip["game"] else ""
)
click.secho(clip["url"], italic=True)
print_out("Clip <b>{}</b>".format(clip["slug"]))
print_out("<green>{}</green>".format(clip["title"]))
print_out("<blue>{}</blue> {}".format(channel, playing))
print_out(
"Published <blue>{}</blue>"
" Length: <blue>{}</blue>"
" Views: <blue>{}</blue>".format(published_at, length, clip["viewCount"]))
print_out("<i>{}</i>".format(clip["url"]))
def prompt_continue():
enter = click.style("Enter", bold=True, fg="green")
ctrl_c = click.style("Ctrl+C", bold=True, fg="yellow")
click.echo(f"Press {enter} to continue, {ctrl_c} to break.")
def _continue():
print_out("Press <green><b>Enter</green> to continue, <yellow><b>Ctrl+C</yellow> to break.")
try:
input()
@ -138,29 +145,3 @@ def prompt_continue():
return False
return True
# Shorthand functions for coloring output
def blue(text: Any) -> str:
return click.style(text, fg="blue")
def cyan(text: Any) -> str:
return click.style(text, fg="cyan")
def green(text: Any) -> str:
return click.style(text, fg="green")
def yellow(text: Any) -> str:
return click.style(text, fg="yellow")
def bold(text: Any) -> str:
return click.style(text, bold=True)
def dim(text: Any) -> str:
return click.style(text, dim=True)

View File

@ -1,138 +0,0 @@
import click
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from statistics import mean
from typing import Dict, NamedTuple, Optional, Deque
from twitchdl.output import blue
from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__)
TaskId = int
@dataclass
class Task:
id: TaskId
size: int
downloaded: int = 0
def advance(self, size: int):
self.downloaded += size
class Sample(NamedTuple):
downloaded: int
timestamp: float
@dataclass
class Progress:
vod_count: int
downloaded: int = 0
estimated_total: Optional[int] = None
last_printed: float = field(default_factory=time.time)
progress_bytes: int = 0
progress_perc: int = 0
remaining_time: Optional[int] = None
speed: Optional[float] = None
start_time: float = field(default_factory=time.time)
tasks: Dict[TaskId, Task] = field(default_factory=dict)
vod_downloaded_count: int = 0
samples: Deque[Sample] = field(default_factory=lambda: deque(maxlen=100))
def start(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot start, already started")
self.tasks[task_id] = Task(task_id, size)
self._calculate_total()
self._calculate_progress()
self.print()
def advance(self, task_id: int, size: int):
if task_id not in self.tasks:
raise ValueError(f"Task {task_id}: cannot advance, not started")
self.downloaded += size
self.progress_bytes += size
self.tasks[task_id].advance(size)
self.samples.append(Sample(self.downloaded, time.time()))
self._calculate_progress()
self.print()
def already_downloaded(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot mark as downloaded, already started")
self.tasks[task_id] = Task(task_id, size)
self.progress_bytes += size
self.vod_downloaded_count += 1
self._calculate_total()
self._calculate_progress()
self.print()
def abort(self, task_id: int):
if task_id not in self.tasks:
raise ValueError(f"Task {task_id}: cannot abort, not started")
del self.tasks[task_id]
self.progress_bytes = sum(t.downloaded for t in self.tasks.values())
self._calculate_total()
self._calculate_progress()
self.print()
def end(self, task_id: int):
if task_id not in self.tasks:
raise ValueError(f"Task {task_id}: cannot end, not started")
task = self.tasks[task_id]
if task.size != task.downloaded:
logger.warn(f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b.")
self.vod_downloaded_count += 1
self.print()
def _calculate_total(self):
self.estimated_total = int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
def _calculate_progress(self):
self.speed = self._calculate_speed()
self.progress_perc = int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
self.remaining_time = int((self.estimated_total - self.progress_bytes) / self.speed) if self.estimated_total and self.speed else None
def _calculate_speed(self):
if len(self.samples) < 2:
return None
first_sample = self.samples[0]
last_sample = self.samples[-1]
size = last_sample.downloaded - first_sample.downloaded
duration = last_sample.timestamp - first_sample.timestamp
return size / duration
def print(self):
now = time.time()
# Don't print more often than 10 times per second
if now - self.last_printed < 0.1:
return
progress = " ".join([
f"Downloaded {self.vod_downloaded_count}/{self.vod_count} VODs",
blue(self.progress_perc),
f"of ~{blue(format_size(self.estimated_total))}" if self.estimated_total else "",
f"at {blue(format_size(self.speed))}/s" if self.speed else "",
f"ETA {blue(format_time(self.remaining_time))}" if self.remaining_time is not None else "",
])
click.echo(f"\r{progress} ", nl=False)
self.last_printed = now

View File

@ -2,33 +2,38 @@
Twitch API access.
"""
import httpx
import json
import click
import requests
from typing import Dict, Generator, Literal
from requests.exceptions import HTTPError
from twitchdl import CLIENT_ID
from twitchdl.entities import AccessToken, Data
from twitchdl.exceptions import ConsoleError
ClipsPeriod = Literal["last_day", "last_week", "last_month", "all_time"]
VideosSort = Literal["views", "time"]
VideosType = Literal["archive", "highlight", "upload"]
class GQLError(Exception):
def __init__(self, errors):
super().__init__("GraphQL query failed")
self.errors = errors
class GQLError(click.ClickException):
def __init__(self, errors: list[str]):
message = "GraphQL query failed."
for error in errors:
message += f"\n* {error}"
super().__init__(message)
def authenticated_get(url, params={}, headers={}):
headers['Client-ID'] = CLIENT_ID
response = requests.get(url, params, headers=headers)
if 400 <= response.status_code < 500:
data = response.json()
# TODO: this does not look nice in the console since data["message"]
# can contain a JSON encoded object.
raise ConsoleError(data["message"])
response.raise_for_status()
return response
def authenticated_post(url, data=None, json=None, headers={}):
headers['Client-ID'] = CLIENT_ID
response = httpx.post(url, data=data, json=json, headers=headers)
response = requests.post(url, data=data, json=json, headers=headers)
if response.status_code == 400:
data = response.json()
raise ConsoleError(data["message"])
@ -38,31 +43,29 @@ def authenticated_post(url, data=None, json=None, headers={}):
return response
def gql_post(query: str):
def gql_post(query):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, data=query)
gql_raise_on_error(response)
return response.json()
response = authenticated_post(url, data=query).json()
if "errors" in response:
raise GQLError(response["errors"])
return response
def gql_query(query: str, headers: Dict[str, str] = {}):
def gql_query(query, headers={}):
url = "https://gql.twitch.tv/gql"
response = authenticated_post(url, json={"query": query}, headers=headers)
gql_raise_on_error(response)
return response.json()
response = authenticated_post(url, json={"query": query}, headers=headers).json()
if "errors" in response:
raise GQLError(response["errors"])
def gql_raise_on_error(response: httpx.Response):
data = response.json()
if "errors" in data:
errors = [e["message"] for e in data["errors"]]
raise GQLError(errors)
return response
VIDEO_FIELDS = """
id
title
description
publishedAt
broadcastType
lengthSeconds
@ -100,34 +103,36 @@ CLIP_FIELDS = """
"""
def get_video(video_id: str):
query = f"""
def get_video(video_id):
query = """
{{
video(id: "{video_id}") {{
{VIDEO_FIELDS}
{fields}
}}
}}
"""
query = query.format(video_id=video_id, fields=VIDEO_FIELDS)
response = gql_query(query)
return response["data"]["video"]
def get_clip(slug: str):
query = f"""
def get_clip(slug):
query = """
{{
clip(slug: "{slug}") {{
{CLIP_FIELDS}
clip(slug: "{}") {{
{fields}
}}
}}
"""
response = gql_query(query)
response = gql_query(query.format(slug, fields=CLIP_FIELDS))
return response["data"]["clip"]
def get_clip_access_token(slug: str):
query = f"""
def get_clip_access_token(slug):
query = """
{{
"operationName": "VideoAccessToken_Clip",
"variables": {{
@ -142,11 +147,11 @@ def get_clip_access_token(slug: str):
}}
"""
response = gql_post(query.strip())
response = gql_post(query.format(slug=slug).strip())
return response["data"]["clip"]
def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: str | None= None):
def get_channel_clips(channel_id, period, limit, after=None):
"""
List channel clips.
@ -156,10 +161,10 @@ def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: s
* sorting by VIEWS_DESC and TRENDING returns the same results
* there is no totalCount
"""
query = f"""
query = """
{{
user(login: "{channel_id}") {{
clips(first: {limit}, after: "{after or ''}", criteria: {{ period: {period.upper()}, sort: VIEWS_DESC }}) {{
clips(first: {limit}, after: "{after}", criteria: {{ period: {period}, sort: VIEWS_DESC }}) {{
pageInfo {{
hasNextPage
hasPreviousPage
@ -167,7 +172,7 @@ def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: s
edges {{
cursor
node {{
{CLIP_FIELDS}
{fields}
}}
}}
}}
@ -175,16 +180,24 @@ def get_channel_clips(channel_id: str, period: ClipsPeriod, limit: int, after: s
}}
"""
query = query.format(
channel_id=channel_id,
after=after if after else "",
limit=limit,
period=period.upper(),
fields=CLIP_FIELDS
)
response = gql_query(query)
user = response["data"]["user"]
if not user:
raise ConsoleError(f"Channel {channel_id} not found")
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["clips"]
def channel_clips_generator(channel_id: str, period: str, limit: int) -> Generator[Data, None, None]:
def _generator(clips: Data, limit: int) -> Generator[Data, None, None]:
def channel_clips_generator(channel_id, period, limit):
def _generator(clips, limit):
for clip in clips["edges"]:
if limit < 1:
return
@ -223,24 +236,15 @@ def channel_clips_generator_old(channel_id, period, limit):
break
def get_channel_videos(
channel_id: str,
limit: int,
sort: str,
type: str = "archive",
game_ids: list[str] | None = None,
after: str | None = None
):
game_ids = game_ids or []
query = f"""
def get_channel_videos(channel_id, limit, sort, type="archive", game_ids=[], after=None):
query = """
{{
user(login: "{channel_id}") {{
videos(
first: {limit},
type: {type.upper()},
sort: {sort.upper()},
after: "{after or ''}",
type: {type},
sort: {sort},
after: "{after}",
options: {{
gameIDs: {game_ids}
}}
@ -252,7 +256,7 @@ def get_channel_videos(
edges {{
cursor
node {{
{VIDEO_FIELDS}
{fields}
}}
}}
}}
@ -260,24 +264,26 @@ def get_channel_videos(
}}
"""
query = query.format(
channel_id=channel_id,
game_ids=game_ids,
after=after if after else "",
limit=limit,
sort=sort.upper(),
type=type.upper(),
fields=VIDEO_FIELDS
)
response = gql_query(query)
if not response["data"]["user"]:
raise ConsoleError(f"Channel {channel_id} not found")
raise ConsoleError("Channel {} not found".format(channel_id))
return response["data"]["user"]["videos"]
def channel_videos_generator(
channel_id: str,
max_videos: int,
sort: VideosSort,
type: VideosType,
game_ids: list[str] | None = None
) -> tuple[int, Generator[Data, None, None]]:
game_ids = game_ids or []
def _generator(videos: Data, max_videos: int) -> Generator[Data, None, None]:
def channel_videos_generator(channel_id, max_videos, sort, type, game_ids=None):
def _generator(videos, max_videos):
for video in videos["edges"]:
if max_videos < 1:
return
@ -298,8 +304,8 @@ def channel_videos_generator(
return videos["totalCount"], _generator(videos, max_videos)
def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToken:
query = f"""
def get_access_token(video_id, auth_token=None):
query = """
{{
videoPlaybackAccessToken(
id: {video_id},
@ -315,18 +321,16 @@ def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToke
}}
"""
query = query.format(video_id=video_id)
headers = {}
if auth_token is not None:
headers['authorization'] = f'OAuth {auth_token}'
try:
response = gql_query(query, headers=headers)
return AccessToken(
response["data"]["videoPlaybackAccessToken"]["signature"],
response["data"]["videoPlaybackAccessToken"]["value"],
)
except httpx.HTTPStatusError as error:
return response["data"]["videoPlaybackAccessToken"]
except HTTPError as error:
# Provide a more useful error message when server returns HTTP 401
# Unauthorized while using a user-provided auth token.
if error.response.status_code == 401:
@ -341,15 +345,15 @@ def get_access_token(video_id: str, auth_token: str | None = None) -> AccessToke
raise
def get_playlists(video_id: str, access_token: AccessToken):
def get_playlists(video_id, access_token):
"""
For a given video return a playlist which contains possible video qualities.
"""
url = f"https://usher.ttvnw.net/vod/{video_id}"
url = "http://usher.twitch.tv/vod/{}".format(video_id)
response = httpx.get(url, params={
"nauth": access_token.value,
"nauthsig": access_token.signature,
response = requests.get(url, params={
"nauth": access_token['value'],
"nauthsig": access_token['signature'],
"allow_audio_only": "true",
"allow_source": "true",
"player": "twitchweb",
@ -359,44 +363,15 @@ def get_playlists(video_id: str, access_token: AccessToken):
def get_game_id(name):
query = f"""
query = """
{{
game(name: "{name.strip()}") {{
game(name: "{}") {{
id
}}
}}
"""
response = gql_query(query)
response = gql_query(query.format(name.strip()))
game = response["data"]["game"]
if game:
return game["id"]
def get_video_chapters(video_id: str):
query = {
"operationName": "VideoPlayer_ChapterSelectButtonVideo",
"variables":
{
"includePrivate": False,
"videoID": video_id
},
"extensions":
{
"persistedQuery":
{
"version": 1,
"sha256Hash": "8d2793384aac3773beab5e59bd5d6f585aedb923d292800119e03d40cd0f9b41"
}
}
}
response = gql_post(json.dumps(query))
return list(_chapter_nodes(response["data"]["video"]["moments"]))
def _chapter_nodes(collection):
for edge in collection["edges"]:
node = edge["node"]
del node["moments"]
yield node

View File

@ -1,17 +1,15 @@
import re
import unicodedata
import click
def _format_size(value: float, digits: int, unit: str):
def _format_size(value, digits, unit):
if digits > 0:
return f"{{:.{digits}f}}{unit}".format(value)
return "{{:.{}f}}{}".format(digits, unit).format(value)
else:
return f"{int(value)}{unit}"
return "{{:d}}{}".format(unit).format(value)
def format_size(bytes_: int | float, digits: int = 1):
def format_size(bytes_, digits=1):
if bytes_ < 1024:
return _format_size(bytes_, digits, "B")
@ -26,7 +24,7 @@ def format_size(bytes_: int | float, digits: int = 1):
return _format_size(mega / 1024, digits, "GB")
def format_duration(total_seconds: int | float) -> str:
def format_duration(total_seconds):
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
@ -34,32 +32,21 @@ def format_duration(total_seconds: int | float) -> str:
seconds = total_seconds % 60
if hours:
return f"{hours} h {minutes} min"
return "{} h {} min".format(hours, minutes)
if minutes:
return f"{minutes} min {seconds} sec"
return "{} min {} sec".format(minutes, seconds)
return f"{seconds} sec"
return "{} sec".format(seconds)
def format_time(total_seconds: int | float, force_hours: bool = False) -> str:
total_seconds = int(total_seconds)
hours = total_seconds // 3600
remainder = total_seconds % 3600
minutes = remainder // 60
seconds = total_seconds % 60
def read_int(msg, min, max, default):
msg = msg + " [default {}]: ".format(default)
if hours or force_hours:
return f"{hours:02}:{minutes:02}:{seconds:02}"
return f"{minutes:02}:{seconds:02}"
def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
while True:
try:
val = click.prompt(msg, default=default, type=int)
if default and not val:
val = input(msg)
if not val:
return default
if min <= int(val) <= max:
return int(val)
@ -67,14 +54,14 @@ def read_int(msg: str, min: int, max: int, default: int | None = None) -> int:
pass
def slugify(value: str) -> str:
def slugify(value):
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s_-]', '', value)
value = re.sub(r'[\s_-]+', '_', value)
return value.strip("_").lower()
def titlify(value: str) -> str:
def titlify(value):
value = unicodedata.normalize('NFKC', str(value))
value = re.sub(r'[^\w\s\[\]().-]', '', value)
value = re.sub(r'\s+', ' ', value)
@ -93,7 +80,7 @@ CLIP_PATTERNS = [
]
def parse_video_identifier(identifier: str) -> str | None:
def parse_video_identifier(identifier):
"""Given a video ID or URL returns the video ID, or null if not matched"""
for pattern in VIDEO_PATTERNS:
match = re.match(pattern, identifier)
@ -101,7 +88,7 @@ def parse_video_identifier(identifier: str) -> str | None:
return match.group("id")
def parse_clip_identifier(identifier: str) -> str | None:
def parse_clip_identifier(identifier):
"""Given a clip slug or URL returns the clip slug, or null if not matched"""
for pattern in CLIP_PATTERNS:
match = re.match(pattern, identifier)