twitch-dl/twitchdl/playlists.py

171 lines
4.4 KiB
Python
Raw Normal View History

2024-04-06 08:15:26 +00:00
"""
Parse and manipulate m3u8 playlists.
"""
from dataclasses import dataclass
2024-04-23 16:09:30 +00:00
from typing import Generator, List, Optional, OrderedDict
2024-04-06 08:15:26 +00:00
import click
import m3u8
from twitchdl import utils
2024-04-27 18:05:39 +00:00
from twitchdl.output import bold, dim, print_table
2024-04-06 08:15:26 +00:00
@dataclass
class Playlist:
name: str
group_id: str
resolution: Optional[str]
2024-04-06 08:15:26 +00:00
url: str
is_source: bool
2024-04-06 08:15:26 +00:00
@dataclass
class Vod:
index: int
"""Ordinal number of the VOD in the playlist"""
path: str
"""Path part of the VOD URL"""
duration: int
"""Segment duration in seconds"""
2024-04-24 06:11:33 +00:00
def parse_playlists(playlists_m3u8: str) -> List[Playlist]:
2024-04-06 08:15:26 +00:00
def _parse(source: str) -> Generator[Playlist, None, None]:
document = load_m3u8(source)
for p in document.playlists:
resolution = (
"x".join(str(r) for r in p.stream_info.resolution)
if p.stream_info.resolution
else None
)
2024-04-06 08:15:26 +00:00
media = p.media[0]
is_source = media.group_id == "chunked"
yield Playlist(media.name, media.group_id, resolution, p.uri, is_source)
2024-04-06 08:15:26 +00:00
return list(_parse(playlists_m3u8))
2024-04-06 08:15:26 +00:00
def load_m3u8(playlist_m3u8: str) -> m3u8.M3U8:
return m3u8.loads(playlist_m3u8)
2024-04-10 06:04:21 +00:00
def enumerate_vods(
document: m3u8.M3U8,
start: Optional[int] = None,
end: Optional[int] = None,
2024-04-23 16:09:30 +00:00
) -> List[Vod]:
2024-04-06 08:15:26 +00:00
"""Extract VODs for download from document."""
vods = []
vod_start = 0
for index, segment in enumerate(document.segments):
vod_end = vod_start + segment.duration
# `vod_end > start` is used here becuase it's better to download a bit
# more than a bit less, similar for the end condition
start_condition = not start or vod_end > start
end_condition = not end or vod_start < end
if start_condition and end_condition:
vods.append(Vod(index, segment.uri, segment.duration))
vod_start = vod_end
return vods
def make_join_playlist(
playlist: m3u8.M3U8,
2024-04-23 16:09:30 +00:00
vods: List[Vod],
targets: List[str],
2024-04-06 08:15:26 +00:00
) -> m3u8.Playlist:
"""
Make a modified playlist which references downloaded VODs
Keep only the downloaded segments and skip the rest
"""
org_segments = playlist.segments.copy()
path_map = OrderedDict(zip([v.path for v in vods], targets))
playlist.segments.clear()
for segment in org_segments:
if segment.uri in path_map:
segment.uri = path_map[segment.uri]
playlist.segments.append(segment)
return playlist
2024-04-23 16:09:30 +00:00
def select_playlist(playlists: List[Playlist], quality: Optional[str]) -> Playlist:
2024-04-06 08:15:26 +00:00
return (
select_playlist_by_name(playlists, quality)
if quality is not None
else select_playlist_interactive(playlists)
)
2024-04-23 16:09:30 +00:00
def select_playlist_by_name(playlists: List[Playlist], quality: str) -> Playlist:
2024-04-06 08:15:26 +00:00
if quality == "source":
for playlist in playlists:
if playlist.is_source:
return playlist
raise click.ClickException("Source quality not found, please report an issue on github.")
2024-04-06 08:15:26 +00:00
for playlist in playlists:
if playlist.name == quality or playlist.group_id == quality:
2024-04-06 08:15:26 +00:00
return playlist
available = ", ".join([p.name for p in playlists])
msg = f"Quality '{quality}' not found. Available qualities are: {available}"
raise click.ClickException(msg)
2024-04-23 16:09:30 +00:00
def select_playlist_interactive(playlists: List[Playlist]) -> Playlist:
2024-04-27 18:18:48 +00:00
playlists = sorted(playlists, key=_playlist_key)
2024-04-27 18:05:39 +00:00
headers = ["#", "Name", "Group ID", "Resolution"]
rows = [
[
f"{n + 1})",
bold(playlist.name),
dim(playlist.group_id),
dim(playlist.resolution or ""),
]
for n, playlist in enumerate(playlists)
]
click.echo()
print_table(headers, rows)
default = 1
for index, playlist in enumerate(playlists):
if playlist.is_source:
default = index + 1
no = utils.read_int("\nChoose quality", min=1, max=len(playlists) + 1, default=default)
2024-04-06 08:15:26 +00:00
playlist = playlists[no - 1]
return playlist
2024-04-27 18:18:48 +00:00
MAX = 1_000_000
def _playlist_key(playlist: Playlist) -> int:
"""Attempt to sort playlists so that source quality is on top, audio only
is on bottom and others are sorted descending by resolution."""
if playlist.is_source:
return 0
if playlist.group_id == "audio_only":
return MAX
try:
return MAX - int(playlist.name.split("p")[0])
except Exception:
pass
return MAX