from typing import TYPE_CHECKING, Callable, Optional

import torch
from PIL import Image

from invokeai.app.services.session_processor.session_processor_common import CanceledException, ProgressImage
from invokeai.backend.model_manager.config import BaseModelType
from invokeai.backend.stable_diffusion.diffusers_pipeline import PipelineIntermediateState
from invokeai.backend.util.util import image_to_dataURL

if TYPE_CHECKING:
    from invokeai.app.services.events.events_base import EventServiceBase
    from invokeai.app.services.shared.invocation_context import InvocationContextData

# fast latents preview matrix for sdxl
# generated by @StAlKeR7779
SDXL_LATENT_RGB_FACTORS = [
    #   R        G        B
    [0.3816, 0.4930, 0.5320],
    [-0.3753, 0.1631, 0.1739],
    [0.1770, 0.3588, -0.2048],
    [-0.4350, -0.2644, -0.4289],
]
SDXL_SMOOTH_MATRIX = [
    [0.0358, 0.0964, 0.0358],
    [0.0964, 0.4711, 0.0964],
    [0.0358, 0.0964, 0.0358],
]

# origingally adapted from code by @erucipe and @keturn here:
# https://discuss.huggingface.co/t/decoding-latents-to-rgb-without-upscaling/23204/7
# these updated numbers for v1.5 are from @torridgristle
SD1_5_LATENT_RGB_FACTORS = [
    #    R        G        B
    [0.3444, 0.1385, 0.0670],  # L1
    [0.1247, 0.4027, 0.1494],  # L2
    [-0.3192, 0.2513, 0.2103],  # L3
    [-0.1307, -0.1874, -0.7445],  # L4
]


def sample_to_lowres_estimated_image(
    samples: torch.Tensor, latent_rgb_factors: torch.Tensor, smooth_matrix: Optional[torch.Tensor] = None
):
    latent_image = samples[0].permute(1, 2, 0) @ latent_rgb_factors

    if smooth_matrix is not None:
        latent_image = latent_image.unsqueeze(0).permute(3, 0, 1, 2)
        latent_image = torch.nn.functional.conv2d(latent_image, smooth_matrix.reshape((1, 1, 3, 3)), padding=1)
        latent_image = latent_image.permute(1, 2, 3, 0).squeeze(0)

    latents_ubyte = (
        ((latent_image + 1) / 2).clamp(0, 1).mul(0xFF).byte()  # change scale from -1..1 to 0..1  # to 0..255
    ).cpu()

    return Image.fromarray(latents_ubyte.numpy())


def stable_diffusion_step_callback(
    context_data: "InvocationContextData",
    intermediate_state: PipelineIntermediateState,
    base_model: BaseModelType,
    events: "EventServiceBase",
    is_canceled: Callable[[], bool],
) -> None:
    if is_canceled():
        raise CanceledException

    # Some schedulers report not only the noisy latents at the current timestep,
    # but also their estimate so far of what the de-noised latents will be. Use
    # that estimate if it is available.
    if intermediate_state.predicted_original is not None:
        sample = intermediate_state.predicted_original
    else:
        sample = intermediate_state.latents

    if base_model in [BaseModelType.StableDiffusionXL, BaseModelType.StableDiffusionXLRefiner]:
        sdxl_latent_rgb_factors = torch.tensor(SDXL_LATENT_RGB_FACTORS, dtype=sample.dtype, device=sample.device)
        sdxl_smooth_matrix = torch.tensor(SDXL_SMOOTH_MATRIX, dtype=sample.dtype, device=sample.device)
        image = sample_to_lowres_estimated_image(sample, sdxl_latent_rgb_factors, sdxl_smooth_matrix)
    else:
        v1_5_latent_rgb_factors = torch.tensor(SD1_5_LATENT_RGB_FACTORS, dtype=sample.dtype, device=sample.device)
        image = sample_to_lowres_estimated_image(sample, v1_5_latent_rgb_factors)

    (width, height) = image.size
    width *= 8
    height *= 8

    dataURL = image_to_dataURL(image, image_format="JPEG")

    events.emit_invocation_denoise_progress(
        context_data.queue_item,
        context_data.invocation,
        intermediate_state,
        ProgressImage(dataURL=dataURL, width=width, height=height),
    )