Compare commits

..

5 Commits
2.5.0 ... queue

4 changed files with 81 additions and 46 deletions

View File

@ -1,6 +1,7 @@
import asyncio
import platform
import re
import shlex
import shutil
import subprocess
import tempfile
@ -76,7 +77,7 @@ def _join_vods(playlist_path: Path, target: Path, overwrite: bool, video: Video)
if overwrite:
command.append("-y")
click.secho(f"{' '.join(command)}", dim=True)
click.secho(f"{shlex.join(command)}", dim=True)
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
@ -230,13 +231,13 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
with open(target_dir / "playlist.m3u8", "w") as f:
f.write(vods_text)
click.echo(f"\nDownloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
init_sections = get_init_sections(vods_m3u8)
for uri in init_sections:
print_log(f"Downloading init section {uri}...")
download_file(f"{base_uri}{uri}", target_dir / uri)
print_log(f"Downloading {len(vods)} VODs using {args.max_workers} workers to {target_dir}")
sources = [base_uri + vod.path for vod in vods]
targets = [target_dir / f"{vod.index:05d}.ts" for vod in vods]
@ -269,12 +270,12 @@ def _download_video(video_id: str, args: DownloadOptions) -> None:
click.echo()
if args.keep:
click.echo(f"Temporary files not deleted: {target_dir}")
click.echo(f"Temporary files not deleted: {yellow(target_dir)}")
else:
print_log("Deleting temporary files...")
shutil.rmtree(target_dir)
click.echo(f"\nDownloaded: {green(target)}")
click.echo(f"Downloaded: {green(target)}")
def http_get(url: str) -> str:

View File

@ -6,7 +6,7 @@ import m3u8
from twitchdl import twitch, utils
from twitchdl.exceptions import ConsoleError
from twitchdl.naming import video_placeholders
from twitchdl.output import bold, print_clip, print_json, print_log, print_table, print_video
from twitchdl.output import bold, dim, print_clip, print_json, print_log, print_table, print_video
from twitchdl.playlists import parse_playlists
from twitchdl.twitch import Chapter, Clip, Video
@ -55,9 +55,19 @@ def video_info(video: Video, playlists: str, chapters: List[Chapter]):
click.echo()
print_video(video)
click.echo("Playlists:")
for p in parse_playlists(playlists):
click.echo(f"{bold(p.name)} {p.url}")
click.echo("Playlists:\n")
playlist_headers = ["Name", "Group", "Resolution", "URL"]
playlist_data = [
[
f"{p.name} {dim('source')}" if p.is_source else p.name,
p.group_id,
f"{p.resolution}",
p.url,
]
for p in parse_playlists(playlists)
]
print_table(playlist_headers, playlist_data)
if chapters:
click.echo()

View File

@ -4,7 +4,7 @@ import os
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterable, Optional, Tuple
from typing import Iterable, NamedTuple, Optional, Tuple
import httpx
@ -95,55 +95,81 @@ async def download(
async def download_with_retries(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
task_id: int,
source: str,
target: Path,
progress: Progress,
token_bucket: TokenBucket,
):
async with semaphore:
if target.exists():
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return
if target.exists():
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return
for n in range(RETRY_COUNT):
try:
return await download(client, task_id, source, target, progress, token_bucket)
except httpx.RequestError:
logger.exception("Task {task_id} failed. Retrying. Maybe.")
progress.abort(task_id)
if n + 1 >= RETRY_COUNT:
raise
for n in range(RETRY_COUNT):
try:
return await download(client, task_id, source, target, progress, token_bucket)
except httpx.RequestError:
logger.exception("Task {task_id} failed. Retrying. Maybe.")
progress.abort(task_id)
if n + 1 >= RETRY_COUNT:
raise
raise Exception("Should not happen")
raise Exception("Should not happen")
class QueueItem(NamedTuple):
task_id: int
url: str
target: Path
async def download_worker(
queue: asyncio.Queue[QueueItem],
client: httpx.AsyncClient,
progress: Progress,
token_bucket: TokenBucket,
):
while True:
item = await queue.get()
await download_with_retries(
client,
item.task_id,
item.url,
item.target,
progress,
token_bucket,
)
queue.task_done()
async def download_all(
source_targets: Iterable[Tuple[str, Path]],
workers: int,
worker_count: int,
*,
count: Optional[int] = None,
rate_limit: Optional[int] = None,
):
progress = Progress(count)
token_bucket = LimitingTokenBucket(rate_limit) if rate_limit else EndlessTokenBucket()
queue: asyncio.Queue[QueueItem] = asyncio.Queue()
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
semaphore = asyncio.Semaphore(workers)
tasks = [
download_with_retries(
client,
semaphore,
task_id,
source,
target,
progress,
token_bucket,
)
for task_id, (source, target) in enumerate(source_targets)
asyncio.create_task(download_worker(queue, client, progress, token_bucket))
for _ in range(worker_count)
]
await asyncio.gather(*tasks)
for index, (source, target) in enumerate(source_targets):
await queue.put(QueueItem(index, source, target))
# Wait for queue to deplete
await queue.join()
# Cancel tasks and wait until they are cancelled
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
def download_file(url: str, target: Path, retries: int = RETRY_COUNT) -> None:

View File

@ -46,11 +46,8 @@ def print_table(headers: List[str], data: List[List[str]]):
underlines = ["-" * width for width in widths]
def print_row(row: List[str]):
for idx, cell in enumerate(row):
width = widths[idx]
click.echo(ljust(cell, width), nl=False)
click.echo(" ", nl=False)
click.echo()
parts = (ljust(cell, widths[idx]) for idx, cell in enumerate(row))
click.echo(" ".join(parts).strip())
print_row(headers)
print_row(underlines)
@ -108,11 +105,12 @@ def print_video(video: Video):
if channel or playing:
click.echo(" ".join([channel, playing]))
if video["description"]:
click.echo(f"Description: {video['description']}")
click.echo(f"Published {blue(published_at)} Length: {blue(length)} ")
click.secho(url, italic=True)
if video["description"]:
click.echo(f"\nDescription:\n{video['description']}")
click.echo()