Add chat command

This commit is contained in:
Ivan Habunek 2024-08-24 08:43:07 +02:00
parent dc99ee51bc
commit ec985efcd1
No known key found for this signature in database
GPG Key ID: 01DB3DD0D824504C
8 changed files with 659 additions and 4 deletions

View File

@ -34,6 +34,13 @@ packages = [
[tool.setuptools_scm]
[project.optional-dependencies]
# This is made optional because it is not pure python, and when used prevents
# distribution of twitch-dl as a pyz archive while keeping it cross-platform.
chat = [
"pillow>=9",
"fonttools>=4,<5",
]
dev = [
"build",
"pytest",

71
twitchdl/cache.py Normal file
View File

@ -0,0 +1,71 @@
import hashlib
import logging
import os
import sys
from pathlib import Path
from typing import Optional
from twitchdl.http import download_file
CACHE_SUBFOLDER = "twitch-dl"
logger = logging.getLogger(__name__)
def download_cached(
url: str,
*,
filename: Optional[str] = None,
subfolder: Optional[str] = None,
) -> Optional[Path]:
base = get_cache_dir()
if not filename:
filename = hashlib.sha256(url.encode()).hexdigest()
target = base / subfolder / filename if subfolder else base / filename
if not target.exists():
download_file(url, target)
return target
def get_text_font() -> Path:
url = "https://cdn.jsdelivr.net/gh/notofonts/notofonts.github.io/fonts/NotoSans/full/ttf/NotoSans-Light.ttf"
path = download_cached(url, subfolder="fonts", filename="NotoSans-Light.ttf")
if not path:
raise ValueError(f"Failed downloading font from {url}")
return path
def get_noto_color_emoji_font() -> Path:
url = "https://github.com/googlefonts/noto-emoji/raw/main/fonts/NotoColorEmoji.ttf"
path = download_cached(url, subfolder="fonts", filename="NotoColorEmoji.ttf")
if not path:
raise ValueError(f"Failed downloading font from {url}")
return path
def get_cache_dir() -> Path:
path = _cache_dir_path()
path.mkdir(parents=True, exist_ok=True)
return path
def _cache_dir_path() -> Path:
"""Returns the path to the cache directory"""
# Windows
if sys.platform == "win32" and "APPDATA" in os.environ:
return Path(os.environ["APPDATA"], CACHE_SUBFOLDER, "cache")
# Mac OS
if sys.platform == "darwin":
return Path.home() / "Library" / "Caches" / CACHE_SUBFOLDER
# Respect XDG_CONFIG_HOME env variable if set
# https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
if "XDG_CACHE_HOME" in os.environ:
return Path(os.environ["XDG_CACHE_HOME"], CACHE_SUBFOLDER)
return Path.home() / ".cache" / CACHE_SUBFOLDER

383
twitchdl/chat.py Normal file
View File

@ -0,0 +1,383 @@
"""
Generate a video containing the twitch chat.
TODO:
- support clips
- use fonttool to find which characters are supported by a font
"""
from __future__ import annotations
import math
import re
import shutil
import subprocess
from itertools import groupby
from pathlib import Path
from typing import Dict, Generator, List, Optional, Tuple
import click
from PIL import Image, ImageDraw, ImageFont
from twitchdl import cache
from twitchdl.entities import Badge, Comment, Emote
from twitchdl.exceptions import ConsoleError
from twitchdl.output import clear_line, print_log
from twitchdl.twitch import get_comments, get_video, get_video_comments
from twitchdl.utils import format_time, iterate_with_next, parse_video_identifier
emoji_pattern = re.compile(
r"["
r"\U0001F600-\U0001F64F" # Emoticons
r"\U0001F300-\U0001F5FF" # Symbols & Pictographs
r"\U0001F680-\U0001F6FF" # Transport & Map Symbols
r"\U0001F700-\U0001F77F" # Alchemical Symbols
r"\U0001F780-\U0001F7FF" # Geometric Shapes Extended
r"\U0001F800-\U0001F8FF" # Supplemental Arrows-C
r"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
r"\U0001FA00-\U0001FA6F" # Chess Symbols
r"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A
r"\U00002702-\U000027B0" # Dingbats
r"\U0001F1E6-\U0001F1FF" # Flags (iOS)
r"\U00002500-\U00002BEF" # Various Symbols
r"\U0001F900-\U0001F9FF" # Additional Emoji in Unicode 10.0
r"\U0001F1F2-\U0001F1F4" # Enclosed characters
r"\U0001F1E6-\U0001F1FF" # Regional indicator symbols
r"\U0001F004" # Mahjong Tile Red Dragon
r"\U0001F0CF" # Playing Card Black Joker
r"\U0001F18E" # Negative Squared AB
r"\U0001F191-\U0001F251" # Squared CJK Unified Ideographs
r"\U00002600-\U000026FF" # Miscellaneous Symbols
r"]+",
flags=re.UNICODE,
)
def render_chat(id: str, width: int, height: int, font_size: int, dark: bool):
foreground = "#ffffff" if dark else "#000000"
background = "#000000" if dark else "#ffffff"
screen = Screen(width, height, font_size, foreground, background)
frames: List[Tuple[Path, int]] = []
video_id = parse_video_identifier(id)
if not video_id:
raise ConsoleError("Invalid video ID")
print_log("Looking up video...")
video = get_video(video_id)
if not video:
raise ConsoleError(f"Video {video_id} not found")
total_duration = video["lengthSeconds"]
video_comments = get_video_comments(video_id)
badges_by_id = {badge["id"]: badge for badge in video_comments["badges"]}
cache_dir = cache.get_cache_dir() / "chat" / video_id
cache_dir.mkdir(parents=True, exist_ok=True)
first = True
for offset, duration, comments in group_comments(video_id, total_duration):
if first:
# Save the initial empty frame
frame_path = cache_dir / f"chat_{0:05d}.bmp"
screen.image.save(frame_path)
frames.append((frame_path, offset))
first = False
for comment in comments:
draw_comment(screen, comment, dark, badges_by_id)
screen.next_line()
frame_path = cache_dir / f"chat_{offset:05d}.bmp"
screen.image.save(frame_path)
frames.append((frame_path, duration))
perc = int(100 * offset / total_duration)
print_status(
f"Rendering chat frames {format_time(offset)}/{format_time(total_duration)} ({perc}%)",
transient=True,
)
spec_path = cache_dir / "concat.txt"
with open(spec_path, "w") as f:
for path, duration in frames:
f.write(f"file '{path.resolve()}'\n")
f.write(f"duration {duration}\n")
# TODO
output_path = Path(f"chat_{video_id}.mp4")
print_status("Generating chat video...", dim=True)
generate_video(spec_path, output_path)
print_status("Deleting cache...", dim=True)
shutil.rmtree(cache_dir)
def add_frame_to_spec(concat_spec: str, frame_path: Path, duration: int) -> str:
concat_spec += f"file '{frame_path.resolve()}'\n"
concat_spec += f"duration {duration}\n"
return concat_spec
def draw_comment(screen: Screen, comment: Comment, dark: bool, badges_by_id: Dict[str, Badge]):
time = format_time(comment["contentOffsetSeconds"])
screen.draw_text(time + " ", "gray")
for message_badge in comment["message"]["userBadges"]:
# Skip 'empty' badges
if message_badge["id"] == "Ozs=":
continue
badge = badges_by_id.get(message_badge["id"])
if not badge:
print_status(f"Badge not found: {message_badge}")
continue
badge_path = download_badge(badge)
if not badge_path:
print_status(f"Failed downloading badge {message_badge}")
continue
badge_image = Image.open(badge_path)
screen.draw_image(badge_image)
if comment["message"]["userBadges"]:
screen.draw_text(" ")
user = comment["commenter"]["displayName"] if comment["commenter"] else "UNKWNOW"
user_color = comment["message"]["userColor"]
screen.draw_text(user, user_color)
screen.draw_text(": ")
for fragment in comment["message"]["fragments"]:
if fragment["emote"]:
emote_path = download_emote(fragment["emote"], dark)
if emote_path:
emote_image = Image.open(emote_path)
screen.draw_image(emote_image)
else:
print_status(f"Failed downloading emote {fragment['emote']}")
screen.draw_text(" " + fragment["text"])
else:
text_blocks = emoji_pattern.split(fragment["text"])
emoji_blocks = emoji_pattern.findall(fragment["text"])
for block_index, text in enumerate(text_blocks):
for word in re.split(r"\s", text):
if word:
screen.draw_text(" " + word)
if len(emoji_blocks) > block_index:
emoji_block = emoji_blocks[block_index]
for emoji in emoji_block:
screen.draw_emoji(emoji)
class Screen:
def __init__(self, width: int, height: int, font_size: int, foreground: str, background: str):
self.foreground = foreground
self.background = background
self.x: int = 0
self.y: int = 0
self.text_font = ImageFont.truetype(cache.get_text_font(), font_size)
self.emoji_font = ImageFont.truetype(cache.get_noto_color_emoji_font(), 109)
ascent, descent = self.text_font.getmetrics()
self.ascent = ascent
self.descent = descent
self.line_height = ascent + descent
left, _, right, _ = self.text_font.getbbox(" ")
self.space_size = int(right - left)
self._image = Image.new("RGBA", (width, height), self.background)
self._draw = ImageDraw.Draw(self._image)
self._draw.font = self.text_font
@property
def image(self):
return self._image
@image.setter
def image(self, image: Image.Image):
self._image = image
self._draw = ImageDraw.Draw(self._image)
self._draw.font = self.text_font
@property
def draw(self) -> ImageDraw.ImageDraw:
return self._draw
def draw_text(self, text: str, color: Optional[str] = None):
length = math.ceil(self.draw.textlength(text)) # type: ignore
if self.image.width < self.x + length:
self.next_line()
self.draw.text((self.x, self.y), text, fill=color or self.foreground) # type: ignore
self.x += length
def draw_image(self, image: Image.Image):
if self.image.width < self.x + image.width:
self.next_line()
x = self.x + self.space_size
y = self.y
if image.height < self.line_height:
y += self.line_height - image.height - 2 # baseline align (ish)
if image.mode != self.image.mode:
image = image.convert(self.image.mode)
self.image.alpha_composite(image, (x, y))
self.x += image.width + self.space_size
def draw_emoji(self, emoji: str):
left, top, right, bottom = self.emoji_font.getbbox(emoji)
source_width = int(right - left)
source_height = int(bottom - top)
source_size = (source_width, source_height)
if source_width == 0 or source_height == 0:
print_status(f"Emoji '{emoji}' not renderable in emoji font, falling back to text font")
self.draw_text(emoji)
return
aspect_ratio = source_width / source_height
target_height = self.line_height
target_width = int(target_height * aspect_ratio)
target_size = (target_width, target_height)
if self.image.width < self.x + target_width:
self.next_line()
emoji_image = Image.new("RGBA", source_size)
emoji_draw = ImageDraw.Draw(emoji_image)
emoji_draw.text((0, 0), emoji, font=self.emoji_font, embedded_color=True) # type: ignore
resized = emoji_image.resize(target_size)
self.image.alpha_composite(resized, (self.x + self.space_size, self.y))
self.x += target_width + self.space_size
def next_line(self):
required_height = self.y + self.line_height * 2
if self.image.height < required_height:
self.shift(required_height - self.image.height)
self.x = 0
self.y += self.line_height
def shift(self, dy: int):
cropped_image = self.image.crop((0, dy, self.image.width, self.image.height))
shifted_image = Image.new(self.image.mode, self.image.size, color=self.background)
shifted_image.paste(cropped_image, (0, 0))
self.image = shifted_image
self.y -= dy
def pad(self, px: int, py: int):
width = self.image.width + 2 * px
height = self.image.height + 2 * py
padded_image = Image.new(self.image.mode, (width, height), color=self.background)
padded_image.paste(self.image, (px, py))
return padded_image
def generate_video(spec_path: Path, output: Path):
print_status("Generating chat video...")
command = [
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
spec_path,
"-fps_mode",
"vfr",
"-pix_fmt",
"yuv420p",
"-stats",
"-loglevel",
"warning",
output,
"-y",
]
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
print_status(f"Saved {output}")
def shift(image: Image.Image, dy: int, background: str):
cropped_image = image.crop((0, dy, image.width, image.height))
shifted_image = Image.new(image.mode, image.size, color=background)
shifted_image.paste(cropped_image, (0, 0))
return shifted_image
def pad(image: Image.Image, px: int, py: int, background: str):
width = image.width + 2 * px
height = image.height + 2 * py
padded_image = Image.new(image.mode, (width, height), color=background)
padded_image.paste(image, (px, py))
return padded_image
def download_badge(badge: Badge) -> Optional[Path]:
# TODO: make badge size configurable?
url = badge["image1x"]
return cache.download_cached(url, subfolder="badges")
def download_emote(emote: Emote, dark: bool) -> Optional[Path]:
# TODO: make emote size customizable
emote_id = emote["emoteID"]
variant = "dark" if dark else "light"
url = f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/{variant}/1.0"
return cache.download_cached(url, subfolder="emotes")
def group_comments(video_id: str, total_duration: int):
g1 = generate_comments(video_id)
g2 = groupby(g1, lambda x: x["contentOffsetSeconds"])
# Delazify the comments list, without this they are consumed before we get to them
g3 = ((offset, list(comments)) for offset, comments in g2)
g4 = iterate_with_next(g3)
for (offset, comments), next_pair in g4:
next_offset = next_pair[0] if next_pair else total_duration
duration = next_offset - offset
yield offset, duration, comments
def generate_comments(video_id: str) -> Generator[Comment, None, None]:
page = 1
has_next = True
cursor = None
while has_next:
video = get_comments(video_id, cursor=cursor)
for comment in video["comments"]["edges"]:
yield comment["node"]
has_next = video["comments"]["pageInfo"]["hasNextPage"]
cursor = video["comments"]["edges"][-1]["cursor"]
page += 1
_prev_transient = False
def print_status(message: str, transient: bool = False, dim: bool = False):
global _prev_transient
if _prev_transient:
clear_line()
else:
click.echo("", err=True)
click.secho(message, nl=False, err=True, dim=dim)
_prev_transient = transient

View File

@ -3,13 +3,16 @@ import platform
import re
import sys
from pathlib import Path
from textwrap import dedent
from typing import Optional, Tuple
import click
from twitchdl import __version__
from twitchdl.entities import DownloadOptions
from twitchdl.exceptions import ConsoleError
from twitchdl.naming import DEFAULT_OUTPUT_TEMPLATE
from twitchdl.output import print_log
from twitchdl.twitch import ClipsPeriod, VideosSort, VideosType
# Tweak the Click context
@ -97,6 +100,7 @@ def cli(ctx: click.Context, color: bool, debug: bool, verbose: bool):
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARN)
logging.getLogger("httpcore").setLevel(logging.WARN)
logging.getLogger("PIL").setLevel(logging.WARN)
@cli.command()
@ -420,3 +424,55 @@ def videos(
sort=sort,
type=type,
)
@cli.command()
@click.argument("id")
@click.option(
"-w",
"--width",
help="Chat width in pixels",
type=int,
default=400,
callback=validate_positive,
)
@click.option(
"-h",
"--height",
help="Chat height in pixels",
type=int,
default=600,
callback=validate_positive,
)
@click.option(
"--font-size",
help="Font size",
type=int,
default=18,
callback=validate_positive,
)
@click.option(
"--dark",
help="Dark mode",
is_flag=True,
)
def chat(id: str, width: int, height: int, font_size: int, dark: bool):
"""Render chat for a given video"""
print_log("Chat command is still experimental, try it out and report any bugs.")
try:
from twitchdl.chat import render_chat
render_chat(id, width, height, font_size, dark)
except ModuleNotFoundError as ex:
raise ConsoleError(
dedent(f"""
{ex}
This command requires twitch-dl to be installed with optional "chat" dependencies:
pipx install "twitch-dl[chat]"
See documentation for more info:
https://twitch-dl.bezdomni.net/commands/chat.html
""")
)

View File

@ -90,3 +90,74 @@ class Chapter(TypedDict):
# Type for annotating decoded JSON
# TODO: make data classes for common structs
Data = Mapping[str, Any]
class Commenter(TypedDict):
id: str
login: str
displayName: str
Emote = TypedDict(
"Emote",
{
"id": str,
"emoteID": str,
"from": int,
},
)
class Message_Fragment(TypedDict):
emote: Optional[Emote]
text: str
class Message_Badge(TypedDict):
id: str
setID: str
version: str
class Message(TypedDict):
fragments: List[Message_Fragment]
userBadges: List[Message_Badge]
userColor: str
class Comment(TypedDict):
id: str
commenter: Commenter
contentOffsetSeconds: int
createdAt: str
message: Message
class Badge(TypedDict):
id: str
setID: str
version: str
title: str
image1x: str
image2x: str
image4x: str
clickAction: str
clickURL: str
class VideoComments_Owner(TypedDict):
id: str
login: str
broadcastBadges: List[Badge]
class VideoComments_Video(TypedDict):
id: str
broadcastType: str
lengthSeconds: int
owner: VideoComments_Owner
class VideoComments(TypedDict):
video: VideoComments_Video
badges: List[Badge]

View File

@ -27,8 +27,8 @@ def print_json(data: Any):
click.echo(json.dumps(data))
def print_log(message: Any):
click.secho(message, err=True, dim=True)
def print_log(message: Any, *, nl: bool = True):
click.secho(message, err=True, dim=True, nl=nl)
def visual_len(text: str):

View File

@ -19,11 +19,12 @@ from twitchdl.entities import (
ClipsPeriod,
Data,
Video,
VideoComments,
VideosSort,
VideosType,
)
from twitchdl.exceptions import ConsoleError
from twitchdl.utils import format_size
from twitchdl.utils import format_size, remove_null_values
class GQLError(click.ClickException):
@ -443,3 +444,51 @@ def _chapter_nodes(moments: Data) -> Generator[Chapter, None, None]:
del node["details"]
del node["moments"]
yield node
def get_comments(
video_id: str,
*,
cursor: Optional[str] = None,
offset_seconds: Optional[int] = None,
):
variables = remove_null_values(
{
"videoID": video_id,
"cursor": cursor,
"contentOffsetSeconds": offset_seconds,
}
)
query = {
"operationName": "VideoCommentsByOffsetOrCursor",
"variables": variables,
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "b70a3591ff0f4e0313d126c6a1502d79a1c02baebb288227c582044aa76adf6a",
}
},
}
response = gql_persisted_query(query)
return response["data"]["video"]
def get_video_comments(video_id: str) -> VideoComments:
query = {
"operationName": "VideoComments",
"variables": {
"videoID": video_id,
"hasVideoID": True,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "be06407e8d7cda72f2ee086ebb11abb6b062a7deb8985738e648090904d2f0eb",
}
},
}
response = gql_persisted_query(query)
return response["data"]

View File

@ -1,9 +1,14 @@
import re
import unicodedata
from typing import Optional, Union
from itertools import chain, islice, tee
from typing import Dict, Iterable, Optional, Tuple, TypeVar, Union
import click
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
def _format_size(value: float, digits: int, unit: str):
if digits > 0:
@ -109,3 +114,16 @@ def parse_clip_identifier(identifier: str) -> Optional[str]:
match = re.match(pattern, identifier)
if match:
return match.group("slug")
def remove_null_values(adict: Dict[K, V]) -> Dict[K, V]:
return {k: v for k, v in adict.items() if v is not None}
def iterate_with_next(iterable: Iterable[T]) -> Iterable[Tuple[T, Optional[T]]]:
"""
Creates an iterator which provides the current and next item.
"""
items, nexts = tee(iterable, 2)
nexts = chain(islice(nexts, 1, None), [None])
return zip(items, nexts)