twitch-dl/twitchdl/chat.py
Ivan Habunek 8a10b2bba5
wip
2024-08-30 17:44:45 +02:00

392 lines
13 KiB
Python

"""
Generate a video containing the twitch chat.
TODO:
- support clips
- use fonttool to find which characters are supported by a font
"""
from __future__ import annotations
import math
import re
import shutil
import subprocess
import time
from itertools import groupby
from pathlib import Path
from typing import Dict, Generator, List, Optional, Tuple
import click
from PIL import Image, ImageDraw, ImageFont
from twitchdl import cache
from twitchdl.entities import Badge, Comment, Emote
from twitchdl.exceptions import ConsoleError
from twitchdl.output import clear_line, cursor_previous_line, green, print_log
from twitchdl.twitch import get_comments, get_video, get_video_comments
from twitchdl.utils import format_time, iterate_with_next, parse_video_identifier
emoji_pattern = re.compile(
r"["
r"\U0001F600-\U0001F64F" # Emoticons
r"\U0001F300-\U0001F5FF" # Symbols & Pictographs
r"\U0001F680-\U0001F6FF" # Transport & Map Symbols
r"\U0001F700-\U0001F77F" # Alchemical Symbols
r"\U0001F780-\U0001F7FF" # Geometric Shapes Extended
r"\U0001F800-\U0001F8FF" # Supplemental Arrows-C
r"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
r"\U0001FA00-\U0001FA6F" # Chess Symbols
r"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A
r"\U00002702-\U000027B0" # Dingbats
r"\U0001F1E6-\U0001F1FF" # Flags (iOS)
r"\U00002500-\U00002BEF" # Various Symbols
r"\U0001F900-\U0001F9FF" # Additional Emoji in Unicode 10.0
r"\U0001F1F2-\U0001F1F4" # Enclosed characters
r"\U0001F1E6-\U0001F1FF" # Regional indicator symbols
r"\U0001F004" # Mahjong Tile Red Dragon
r"\U0001F0CF" # Playing Card Black Joker
r"\U0001F18E" # Negative Squared AB
r"\U0001F191-\U0001F251" # Squared CJK Unified Ideographs
r"\U00002600-\U000026FF" # Miscellaneous Symbols
r"]+",
flags=re.UNICODE,
)
def render_chat(id: str, width: int, height: int, font_size: int, dark: bool):
foreground = "#ffffff" if dark else "#000000"
background = "#000000" if dark else "#ffffff"
screen = Screen(width, height, font_size, foreground, background)
frames: List[Tuple[Path, int]] = []
video_id = parse_video_identifier(id)
if not video_id:
raise ConsoleError("Invalid video ID")
print_log("Looking up video...")
video = get_video(video_id)
if not video:
raise ConsoleError(f"Video {video_id} not found")
total_duration = video["lengthSeconds"]
video_comments = get_video_comments(video_id)
badges_by_id = {badge["id"]: badge for badge in video_comments["badges"]}
cache_dir = cache.get_cache_dir() / "chat" / video_id
cache_dir.mkdir(parents=True, exist_ok=True)
first = True
start = time.monotonic()
for index, offset, duration, comments in group_comments(video_id, total_duration):
if first:
# Save the initial empty frame
frame_path = cache_dir / f"chat_{0:05d}.bmp"
screen.image.save(frame_path)
frames.append((frame_path, offset))
first = False
for comment in comments:
draw_comment(screen, comment, dark, badges_by_id)
screen.next_line()
frame_path = cache_dir / f"chat_{offset:05d}.bmp"
screen.image.save(frame_path)
frames.append((frame_path, duration))
_print_progress(index, offset, start, total_duration)
spec_path = cache_dir / "concat.txt"
with open(spec_path, "w") as f:
for path, duration in frames:
f.write(f"file '{path.resolve()}'\n")
f.write(f"duration {duration}\n")
# TODO
output_path = Path(f"chat_{video_id}.mp4")
print_status("Generating chat video...", dim=True)
generate_video(spec_path, output_path)
print_status("Deleting cache...", dim=True)
shutil.rmtree(cache_dir)
def _print_progress(index: int, offset: int, start: float, total_duration: int):
perc = 100 * offset / total_duration
duration = time.monotonic() - start
print_status(
f"Rendering chat frame {index} at {index / duration:.1f}fps, "
+ f"{format_time(offset)}/{format_time(total_duration)} ({perc:.0f}%)",
transient=True,
)
def add_frame_to_spec(concat_spec: str, frame_path: Path, duration: int) -> str:
concat_spec += f"file '{frame_path.resolve()}'\n"
concat_spec += f"duration {duration}\n"
return concat_spec
def draw_comment(screen: Screen, comment: Comment, dark: bool, badges_by_id: Dict[str, Badge]):
time = format_time(comment["contentOffsetSeconds"])
screen.draw_text(time + " ", "gray")
for message_badge in comment["message"]["userBadges"]:
# Skip 'empty' badges
if message_badge["id"] == "Ozs=":
continue
badge = badges_by_id.get(message_badge["id"])
if not badge:
print_status(f"Badge not found: {message_badge}")
continue
badge_path = download_badge(badge)
if not badge_path:
print_status(f"Failed downloading badge {message_badge}")
continue
badge_image = Image.open(badge_path)
screen.draw_image(badge_image)
if comment["message"]["userBadges"]:
screen.draw_text(" ")
user = comment["commenter"]["displayName"] if comment["commenter"] else "UNKWNOW"
user_color = comment["message"]["userColor"]
screen.draw_text(user, user_color)
screen.draw_text(": ")
for fragment in comment["message"]["fragments"]:
if fragment["emote"]:
emote_path = download_emote(fragment["emote"], dark)
if emote_path:
emote_image = Image.open(emote_path)
screen.draw_image(emote_image)
else:
print_status(f"Failed downloading emote {fragment['emote']}")
screen.draw_text(" " + fragment["text"])
else:
text_blocks = emoji_pattern.split(fragment["text"])
emoji_blocks = emoji_pattern.findall(fragment["text"])
for block_index, text in enumerate(text_blocks):
for word in re.split(r"\s", text):
if word:
screen.draw_text(" " + word)
if len(emoji_blocks) > block_index:
emoji_block = emoji_blocks[block_index]
for emoji in emoji_block:
screen.draw_emoji(emoji)
class Screen:
def __init__(self, width: int, height: int, font_size: int, foreground: str, background: str):
self.foreground = foreground
self.background = background
self.x: int = 0
self.y: int = 0
self.text_font = ImageFont.truetype(cache.get_text_font(), font_size)
self.emoji_font = ImageFont.truetype(cache.get_noto_color_emoji_font(), 109)
ascent, descent = self.text_font.getmetrics()
self.ascent = ascent
self.descent = descent
self.line_height = ascent + descent
left, _, right, _ = self.text_font.getbbox(" ")
self.space_size = int(right - left)
self._image = Image.new("RGBA", (width, height), self.background)
self._draw = ImageDraw.Draw(self._image)
self._draw.font = self.text_font
@property
def image(self):
return self._image
@image.setter
def image(self, image: Image.Image):
self._image = image
self._draw = ImageDraw.Draw(self._image)
self._draw.font = self.text_font
@property
def draw(self) -> ImageDraw.ImageDraw:
return self._draw
def draw_text(self, text: str, color: Optional[str] = None):
length = math.ceil(self.draw.textlength(text)) # type: ignore
if self.image.width < self.x + length:
self.next_line()
self.draw.text((self.x, self.y), text, fill=color or self.foreground) # type: ignore
self.x += length
def draw_image(self, image: Image.Image):
if self.image.width < self.x + image.width:
self.next_line()
x = self.x + self.space_size
y = self.y
if image.height < self.line_height:
y += self.line_height - image.height - 2 # baseline align (ish)
if image.mode != self.image.mode:
image = image.convert(self.image.mode)
self.image.alpha_composite(image, (x, y))
self.x += image.width + self.space_size
def draw_emoji(self, emoji: str):
left, top, right, bottom = self.emoji_font.getbbox(emoji)
source_width = int(right - left)
source_height = int(bottom - top)
source_size = (source_width, source_height)
if source_width == 0 or source_height == 0:
print_status(f"Emoji '{emoji}' not renderable in emoji font, falling back to text font")
self.draw_text(emoji)
return
aspect_ratio = source_width / source_height
target_height = self.line_height
target_width = int(target_height * aspect_ratio)
target_size = (target_width, target_height)
if self.image.width < self.x + target_width:
self.next_line()
emoji_image = Image.new("RGBA", source_size)
emoji_draw = ImageDraw.Draw(emoji_image)
emoji_draw.text((0, 0), emoji, font=self.emoji_font, embedded_color=True) # type: ignore
resized = emoji_image.resize(target_size)
self.image.alpha_composite(resized, (self.x + self.space_size, self.y))
self.x += target_width + self.space_size
def next_line(self):
required_height = self.y + self.line_height * 2
if self.image.height < required_height:
self.shift(required_height - self.image.height)
self.x = 0
self.y += self.line_height
def shift(self, dy: int):
cropped_image = self.image.crop((0, dy, self.image.width, self.image.height))
shifted_image = Image.new(self.image.mode, self.image.size, color=self.background)
shifted_image.paste(cropped_image, (0, 0))
self.image = shifted_image
self.y -= dy
def pad(self, px: int, py: int):
width = self.image.width + 2 * px
height = self.image.height + 2 * py
padded_image = Image.new(self.image.mode, (width, height), color=self.background)
padded_image.paste(self.image, (px, py))
return padded_image
def generate_video(spec_path: Path, output: Path):
print_status("Generating chat video...")
command = [
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
spec_path,
"-fps_mode",
"vfr",
"-pix_fmt",
"yuv420p",
"-stats",
"-loglevel",
"warning",
output,
"-y",
]
result = subprocess.run(command)
if result.returncode != 0:
raise ConsoleError("Joining files failed")
print_status(f"Saved: {green(output)}")
def shift(image: Image.Image, dy: int, background: str):
cropped_image = image.crop((0, dy, image.width, image.height))
shifted_image = Image.new(image.mode, image.size, color=background)
shifted_image.paste(cropped_image, (0, 0))
return shifted_image
def pad(image: Image.Image, px: int, py: int, background: str):
width = image.width + 2 * px
height = image.height + 2 * py
padded_image = Image.new(image.mode, (width, height), color=background)
padded_image.paste(image, (px, py))
return padded_image
def download_badge(badge: Badge) -> Optional[Path]:
# TODO: make badge size configurable?
url = badge["image1x"]
return cache.download_cached(url, subfolder="badges")
def download_emote(emote: Emote, dark: bool) -> Optional[Path]:
# TODO: make emote size customizable
emote_id = emote["emoteID"]
variant = "dark" if dark else "light"
url = f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/{variant}/1.0"
return cache.download_cached(url, subfolder="emotes")
def group_comments(video_id: str, total_duration: int):
g1 = generate_comments(video_id)
g2 = groupby(g1, lambda x: x["contentOffsetSeconds"])
# Delazify the comments list, without this they are consumed before we get to them
g3 = ((offset, list(comments)) for offset, comments in g2)
g4 = iterate_with_next(g3)
g5 = enumerate(g4)
# We need to go deeper? ^^;
for index, ((offset, comments), next_pair) in g5:
next_offset = next_pair[0] if next_pair else total_duration
duration = next_offset - offset
yield index, offset, duration, comments
def generate_comments(video_id: str) -> Generator[Comment, None, None]:
page = 1
has_next = True
cursor = None
while has_next:
video = get_comments(video_id, cursor=cursor)
for comment in video["comments"]["edges"]:
yield comment["node"]
has_next = video["comments"]["pageInfo"]["hasNextPage"]
cursor = video["comments"]["edges"][-1]["cursor"]
page += 1
_prev_transient = False
def print_status(message: str, transient: bool = False, dim: bool = False):
global _prev_transient
if _prev_transient:
cursor_previous_line()
clear_line()
click.secho(message, err=True, dim=dim)
_prev_transient = transient