twitch-dl/twitchdl/progress.py
2024-04-28 10:09:53 +02:00

156 lines
4.7 KiB
Python

import logging
import time
from collections import deque
from dataclasses import dataclass
from statistics import mean
from typing import Deque, Dict, NamedTuple, Optional
import click
from twitchdl.output import blue
from twitchdl.utils import format_size, format_time
logger = logging.getLogger(__name__)
TaskId = int
@dataclass
class Task:
id: TaskId
size: int
downloaded: int = 0
def advance(self, size: int):
self.downloaded += size
class Sample(NamedTuple):
downloaded: int
timestamp: float
class Progress:
def __init__(self, vod_count: int):
self.downloaded: int = 0
self.estimated_total: Optional[int] = None
self.last_printed: Optional[float] = None
self.progress_bytes: int = 0
self.progress_perc: int = 0
self.remaining_time: Optional[int] = None
self.samples: Deque[Sample] = deque(maxlen=1000)
self.speed: Optional[float] = None
self.start_time: float = time.time()
self.tasks: Dict[TaskId, Task] = {}
self.vod_count = vod_count
self.vod_downloaded_count: int = 0
def start(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot start, already started")
self.tasks[task_id] = Task(task_id, size)
self._calculate_total()
self._calculate_progress()
self.print()
def advance(self, task_id: int, size: int):
if task_id not in self.tasks:
raise ValueError(f"Task {task_id}: cannot advance, not started")
self.downloaded += size
self.progress_bytes += size
self.tasks[task_id].advance(size)
self.samples.append(Sample(self.downloaded, time.time()))
self._calculate_progress()
self.print()
def already_downloaded(self, task_id: int, size: int):
if task_id in self.tasks:
raise ValueError(f"Task {task_id}: cannot mark as downloaded, already started")
self.tasks[task_id] = Task(task_id, size)
self.progress_bytes += size
self.vod_downloaded_count += 1
self._calculate_total()
self._calculate_progress()
self.print()
def abort(self, task_id: int):
if task_id not in self.tasks:
raise ValueError(f"Task {task_id}: cannot abort, not started")
del self.tasks[task_id]
self.progress_bytes = sum(t.downloaded for t in self.tasks.values())
self._calculate_total()
self._calculate_progress()
self.print()
def end(self, task_id: int):
if task_id not in self.tasks:
raise ValueError(f"Task {task_id}: cannot end, not started")
task = self.tasks[task_id]
if task.size != task.downloaded:
logger.warn(
f"Taks {task_id} ended with {task.downloaded}b downloaded, expected {task.size}b."
)
self.vod_downloaded_count += 1
self.print()
def _calculate_total(self):
self.estimated_total = (
int(mean(t.size for t in self.tasks.values()) * self.vod_count) if self.tasks else None
)
def _calculate_progress(self):
self.speed = self._calculate_speed()
self.progress_perc = (
int(100 * self.progress_bytes / self.estimated_total) if self.estimated_total else 0
)
self.remaining_time = (
int((self.estimated_total - self.progress_bytes) / self.speed)
if self.estimated_total and self.speed
else None
)
def _calculate_speed(self):
if len(self.samples) < 2:
return None
first_sample = self.samples[0]
last_sample = self.samples[-1]
size = last_sample.downloaded - first_sample.downloaded
duration = last_sample.timestamp - first_sample.timestamp
return size / duration if duration > 0 else None
def print(self):
now = time.time()
# Don't print more often than 10 times per second
if self.last_printed and now - self.last_printed < 0.1:
return
click.echo(f"\rDownloaded {self.vod_downloaded_count}/{self.vod_count} VODs", nl=False)
click.secho(f" {self.progress_perc}%", fg="blue", nl=False)
if self.estimated_total is not None:
total = f"~{format_size(self.estimated_total)}"
click.echo(f" of {blue(total)}", nl=False)
if self.speed is not None:
speed = f"{format_size(self.speed)}/s"
click.echo(f" at {blue(speed)}", nl=False)
if self.remaining_time is not None:
click.echo(f" ETA {blue(format_time(self.remaining_time))}", nl=False)
click.echo(" ", nl=False)
self.last_printed = now