import json
import re
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Union

import safetensors.torch
import spandrel
import torch
from picklescan.scanner import scan_file_path

import invokeai.backend.util.logging as logger
from invokeai.app.util.misc import uuid_string
from invokeai.backend.model_hash.model_hash import HASHING_ALGORITHMS, ModelHash
from invokeai.backend.model_manager.config import (
    AnyModelConfig,
    BaseModelType,
    ControlAdapterDefaultSettings,
    InvalidModelConfigException,
    MainModelDefaultSettings,
    ModelConfigFactory,
    ModelFormat,
    ModelRepoVariant,
    ModelSourceType,
    ModelType,
    ModelVariantType,
    SchedulerPredictionType,
)
from invokeai.backend.model_manager.util.model_util import lora_token_vector_length, read_checkpoint_meta
from invokeai.backend.spandrel_image_to_image_model import SpandrelImageToImageModel
from invokeai.backend.util.silence_warnings import SilenceWarnings

CkptType = Dict[str | int, Any]

LEGACY_CONFIGS: Dict[BaseModelType, Dict[ModelVariantType, Union[str, Dict[SchedulerPredictionType, str]]]] = {
    BaseModelType.StableDiffusion1: {
        ModelVariantType.Normal: {
            SchedulerPredictionType.Epsilon: "v1-inference.yaml",
            SchedulerPredictionType.VPrediction: "v1-inference-v.yaml",
        },
        ModelVariantType.Inpaint: "v1-inpainting-inference.yaml",
    },
    BaseModelType.StableDiffusion2: {
        ModelVariantType.Normal: {
            SchedulerPredictionType.Epsilon: "v2-inference.yaml",
            SchedulerPredictionType.VPrediction: "v2-inference-v.yaml",
        },
        ModelVariantType.Inpaint: {
            SchedulerPredictionType.Epsilon: "v2-inpainting-inference.yaml",
            SchedulerPredictionType.VPrediction: "v2-inpainting-inference-v.yaml",
        },
        ModelVariantType.Depth: "v2-midas-inference.yaml",
    },
    BaseModelType.StableDiffusionXL: {
        ModelVariantType.Normal: "sd_xl_base.yaml",
        ModelVariantType.Inpaint: "sd_xl_inpaint.yaml",
    },
    BaseModelType.StableDiffusionXLRefiner: {
        ModelVariantType.Normal: "sd_xl_refiner.yaml",
    },
}


class ProbeBase(object):
    """Base class for probes."""

    def __init__(self, model_path: Path):
        self.model_path = model_path

    def get_base_type(self) -> BaseModelType:
        """Get model base type."""
        raise NotImplementedError

    def get_format(self) -> ModelFormat:
        """Get model file format."""
        raise NotImplementedError

    def get_variant_type(self) -> Optional[ModelVariantType]:
        """Get model variant type."""
        return None

    def get_scheduler_prediction_type(self) -> Optional[SchedulerPredictionType]:
        """Get model scheduler prediction type."""
        return None

    def get_image_encoder_model_id(self) -> Optional[str]:
        """Get image encoder (IP adapters only)."""
        return None


class ModelProbe(object):
    PROBES: Dict[str, Dict[ModelType, type[ProbeBase]]] = {
        "diffusers": {},
        "checkpoint": {},
        "onnx": {},
    }

    CLASS2TYPE = {
        "FluxPipeline": ModelType.Main,
        "StableDiffusionPipeline": ModelType.Main,
        "StableDiffusionInpaintPipeline": ModelType.Main,
        "StableDiffusionXLPipeline": ModelType.Main,
        "StableDiffusionXLImg2ImgPipeline": ModelType.Main,
        "StableDiffusionXLInpaintPipeline": ModelType.Main,
        "LatentConsistencyModelPipeline": ModelType.Main,
        "AutoencoderKL": ModelType.VAE,
        "AutoencoderTiny": ModelType.VAE,
        "ControlNetModel": ModelType.ControlNet,
        "CLIPVisionModelWithProjection": ModelType.CLIPVision,
        "T2IAdapter": ModelType.T2IAdapter,
        "CLIPModel": ModelType.CLIPEmbed,
    }

    @classmethod
    def register_probe(
        cls, format: Literal["diffusers", "checkpoint", "onnx"], model_type: ModelType, probe_class: type[ProbeBase]
    ) -> None:
        cls.PROBES[format][model_type] = probe_class

    @classmethod
    def probe(
        cls, model_path: Path, fields: Optional[Dict[str, Any]] = None, hash_algo: HASHING_ALGORITHMS = "blake3_single"
    ) -> AnyModelConfig:
        """
        Probe the model at model_path and return its configuration record.

        :param model_path: Path to the model file (checkpoint) or directory (diffusers).
        :param fields: An optional dictionary that can be used to override probed
        fields. Typically used for fields that don't probe well, such as prediction_type.

        Returns: The appropriate model configuration derived from ModelConfigBase.
        """
        if fields is None:
            fields = {}

        model_path = model_path.resolve()

        format_type = ModelFormat.Diffusers if model_path.is_dir() else ModelFormat.Checkpoint
        model_info = None
        model_type = ModelType(fields["type"]) if "type" in fields and fields["type"] else None
        if not model_type:
            if format_type is ModelFormat.Diffusers:
                model_type = cls.get_model_type_from_folder(model_path)
            else:
                model_type = cls.get_model_type_from_checkpoint(model_path)
        format_type = ModelFormat.ONNX if model_type == ModelType.ONNX else format_type

        probe_class = cls.PROBES[format_type].get(model_type)
        if not probe_class:
            raise InvalidModelConfigException(f"Unhandled combination of {format_type} and {model_type}")

        probe = probe_class(model_path)

        fields["source_type"] = fields.get("source_type") or ModelSourceType.Path
        fields["source"] = fields.get("source") or model_path.as_posix()
        fields["key"] = fields.get("key", uuid_string())
        fields["path"] = model_path.as_posix()
        fields["type"] = fields.get("type") or model_type
        fields["base"] = fields.get("base") or probe.get_base_type()
        fields["variant"] = fields.get("variant") or probe.get_variant_type()
        fields["prediction_type"] = fields.get("prediction_type") or probe.get_scheduler_prediction_type()
        fields["image_encoder_model_id"] = fields.get("image_encoder_model_id") or probe.get_image_encoder_model_id()
        fields["name"] = fields.get("name") or cls.get_model_name(model_path)
        fields["description"] = (
            fields.get("description") or f"{fields['base'].value} {model_type.value} model {fields['name']}"
        )
        fields["format"] = ModelFormat(fields.get("format")) if "format" in fields else probe.get_format()
        fields["hash"] = fields.get("hash") or ModelHash(algorithm=hash_algo).hash(model_path)

        fields["default_settings"] = fields.get("default_settings")

        if not fields["default_settings"]:
            if fields["type"] in {ModelType.ControlNet, ModelType.T2IAdapter}:
                fields["default_settings"] = get_default_settings_controlnet_t2i_adapter(fields["name"])
            elif fields["type"] is ModelType.Main:
                fields["default_settings"] = get_default_settings_main(fields["base"])

        if format_type == ModelFormat.Diffusers and isinstance(probe, FolderProbeBase):
            fields["repo_variant"] = fields.get("repo_variant") or probe.get_repo_variant()

        # additional fields needed for main and controlnet models
        if fields["type"] in [ModelType.Main, ModelType.ControlNet, ModelType.VAE] and fields["format"] in [
            ModelFormat.Checkpoint,
            ModelFormat.BnbQuantizednf4b,
        ]:
            ckpt_config_path = cls._get_checkpoint_config_path(
                model_path,
                model_type=fields["type"],
                base_type=fields["base"],
                variant_type=fields["variant"],
                prediction_type=fields["prediction_type"],
            )
            fields["config_path"] = str(ckpt_config_path)

        # additional fields needed for main non-checkpoint models
        elif fields["type"] == ModelType.Main and fields["format"] in [
            ModelFormat.ONNX,
            ModelFormat.Olive,
            ModelFormat.Diffusers,
        ]:
            fields["upcast_attention"] = fields.get("upcast_attention") or (
                fields["base"] == BaseModelType.StableDiffusion2
                and fields["prediction_type"] == SchedulerPredictionType.VPrediction
            )

        model_info = ModelConfigFactory.make_config(fields)  # , key=fields.get("key", None))
        return model_info

    @classmethod
    def get_model_name(cls, model_path: Path) -> str:
        if model_path.suffix in {".safetensors", ".bin", ".pt", ".ckpt"}:
            return model_path.stem
        else:
            return model_path.name

    @classmethod
    def get_model_type_from_checkpoint(cls, model_path: Path, checkpoint: Optional[CkptType] = None) -> ModelType:
        if model_path.suffix not in (".bin", ".pt", ".ckpt", ".safetensors", ".pth"):
            raise InvalidModelConfigException(f"{model_path}: unrecognized suffix")

        if model_path.name == "learned_embeds.bin":
            return ModelType.TextualInversion

        ckpt = checkpoint if checkpoint else read_checkpoint_meta(model_path, scan=True)
        ckpt = ckpt.get("state_dict", ckpt)

        for key in [str(k) for k in ckpt.keys()]:
            if key.startswith(("cond_stage_model.", "first_stage_model.", "model.diffusion_model.", "double_blocks.")):
                # Keys starting with double_blocks are associated with Flux models
                return ModelType.Main
            elif key.startswith(("encoder.conv_in", "decoder.conv_in")):
                return ModelType.VAE
            elif key.startswith(("lora_te_", "lora_unet_")):
                return ModelType.LoRA
            elif key.endswith(("to_k_lora.up.weight", "to_q_lora.down.weight")):
                return ModelType.LoRA
            elif key.startswith(("controlnet", "control_model", "input_blocks")):
                return ModelType.ControlNet
            elif key.startswith(("image_proj.", "ip_adapter.")):
                return ModelType.IPAdapter
            elif key in {"emb_params", "string_to_param"}:
                return ModelType.TextualInversion

        # diffusers-ti
        if len(ckpt) < 10 and all(isinstance(v, torch.Tensor) for v in ckpt.values()):
            return ModelType.TextualInversion

        # Check if the model can be loaded as a SpandrelImageToImageModel.
        # This check is intentionally performed last, as it can be expensive (it requires loading the model from disk).
        try:
            # It would be nice to avoid having to load the Spandrel model from disk here. A couple of options were
            # explored to avoid this:
            # 1. Call `SpandrelImageToImageModel.load_from_state_dict(ckpt)`, where `ckpt` is a state_dict on the meta
            #    device. Unfortunately, some Spandrel models perform operations during initialization that are not
            #    supported on meta tensors.
            # 2. Spandrel has internal logic to determine a model's type from its state_dict before loading the model.
            #    This logic is not exposed in spandrel's public API. We could copy the logic here, but then we have to
            #    maintain it, and the risk of false positive detections is higher.
            SpandrelImageToImageModel.load_from_file(model_path)
            return ModelType.SpandrelImageToImage
        except spandrel.UnsupportedModelError:
            pass
        except RuntimeError as e:
            if "No such file or directory" in str(e):
                # This error is expected if the model_path does not exist (which is the case in some unit tests).
                pass
            else:
                raise e

        raise InvalidModelConfigException(f"Unable to determine model type for {model_path}")

    @classmethod
    def get_model_type_from_folder(cls, folder_path: Path) -> ModelType:
        """Get the model type of a hugging-face style folder."""
        class_name = None
        error_hint = None
        for suffix in ["bin", "safetensors"]:
            if (folder_path / f"learned_embeds.{suffix}").exists():
                return ModelType.TextualInversion
            if (folder_path / f"pytorch_lora_weights.{suffix}").exists():
                return ModelType.LoRA
        if (folder_path / "unet/model.onnx").exists():
            return ModelType.ONNX
        if (folder_path / "image_encoder.txt").exists():
            return ModelType.IPAdapter

        i = folder_path / "model_index.json"
        c = folder_path / "config.json"
        config_path = i if i.exists() else c if c.exists() else None

        if config_path:
            with open(config_path, "r") as file:
                conf = json.load(file)
            if "_class_name" in conf:
                class_name = conf["_class_name"]
            elif "architectures" in conf:
                class_name = conf["architectures"][0]
            else:
                class_name = None
        else:
            error_hint = f"No model_index.json or config.json found in {folder_path}."

        if class_name and (type := cls.CLASS2TYPE.get(class_name)):
            return type
        else:
            error_hint = f"class {class_name} is not one of the supported classes [{', '.join(cls.CLASS2TYPE.keys())}]"

        # give up
        raise InvalidModelConfigException(
            f"Unable to determine model type for {folder_path}" + (f"; {error_hint}" if error_hint else "")
        )

    @classmethod
    def _get_checkpoint_config_path(
        cls,
        model_path: Path,
        model_type: ModelType,
        base_type: BaseModelType,
        variant_type: ModelVariantType,
        prediction_type: SchedulerPredictionType,
    ) -> Path:
        # look for a YAML file adjacent to the model file first
        possible_conf = model_path.with_suffix(".yaml")
        if possible_conf.exists():
            return possible_conf.absolute()

        if model_type is ModelType.Main:
            if base_type == BaseModelType.Flux:
                # TODO: Decide between dev/schnell
                checkpoint = ModelProbe._scan_and_load_checkpoint(model_path)
                state_dict = checkpoint.get("state_dict") or checkpoint
                if "guidance_in.out_layer.weight" in state_dict:
                    # For flux, this is a key in invokeai.backend.flux.util.params
                    #   Due to model type and format being the descriminator for model configs this
                    #   is used rather than attempting to support flux with separate model types and format
                    #   If changed in the future, please fix me
                    config_file = "flux-dev"
                else:
                    # For flux, this is a key in invokeai.backend.flux.util.params
                    #   Due to model type and format being the descriminator for model configs this
                    #   is used rather than attempting to support flux with separate model types and format
                    #   If changed in the future, please fix me
                    config_file = "flux-schnell"
            else:
                config_file = LEGACY_CONFIGS[base_type][variant_type]
                if isinstance(config_file, dict):  # need another tier for sd-2.x models
                    config_file = config_file[prediction_type]
                config_file = f"stable-diffusion/{config_file}"
        elif model_type is ModelType.ControlNet:
            config_file = (
                "controlnet/cldm_v15.yaml"
                if base_type is BaseModelType.StableDiffusion1
                else "controlnet/cldm_v21.yaml"
            )
        elif model_type is ModelType.VAE:
            config_file = (
                # For flux, this is a key in invokeai.backend.flux.util.ae_params
                #   Due to model type and format being the descriminator for model configs this
                #   is used rather than attempting to support flux with separate model types and format
                #   If changed in the future, please fix me
                "flux"
                if base_type is BaseModelType.Flux
                else "stable-diffusion/v1-inference.yaml"
                if base_type is BaseModelType.StableDiffusion1
                else "stable-diffusion/sd_xl_base.yaml"
                if base_type is BaseModelType.StableDiffusionXL
                else "stable-diffusion/v2-inference.yaml"
            )
        else:
            raise InvalidModelConfigException(
                f"{model_path}: Unrecognized combination of model_type={model_type}, base_type={base_type}"
            )
        return Path(config_file)

    @classmethod
    def _scan_and_load_checkpoint(cls, model_path: Path) -> CkptType:
        with SilenceWarnings():
            if model_path.suffix.endswith((".ckpt", ".pt", ".pth", ".bin")):
                cls._scan_model(model_path.name, model_path)
                model = torch.load(model_path, map_location="cpu")
                assert isinstance(model, dict)
                return model
            else:
                return safetensors.torch.load_file(model_path)

    @classmethod
    def _scan_model(cls, model_name: str, checkpoint: Path) -> None:
        """
        Apply picklescanner to the indicated checkpoint and issue a warning
        and option to exit if an infected file is identified.
        """
        # scan model
        scan_result = scan_file_path(checkpoint)
        if scan_result.infected_files != 0:
            raise Exception("The model {model_name} is potentially infected by malware. Aborting import.")


# Probing utilities
MODEL_NAME_TO_PREPROCESSOR = {
    "canny": "canny_image_processor",
    "mlsd": "mlsd_image_processor",
    "depth": "depth_anything_image_processor",
    "bae": "normalbae_image_processor",
    "normal": "normalbae_image_processor",
    "sketch": "pidi_image_processor",
    "scribble": "lineart_image_processor",
    "lineart": "lineart_image_processor",
    "lineart_anime": "lineart_anime_image_processor",
    "softedge": "hed_image_processor",
    "shuffle": "content_shuffle_image_processor",
    "pose": "dw_openpose_image_processor",
    "mediapipe": "mediapipe_face_processor",
    "pidi": "pidi_image_processor",
    "zoe": "zoe_depth_image_processor",
    "color": "color_map_image_processor",
}


def get_default_settings_controlnet_t2i_adapter(model_name: str) -> Optional[ControlAdapterDefaultSettings]:
    for k, v in MODEL_NAME_TO_PREPROCESSOR.items():
        if k in model_name:
            return ControlAdapterDefaultSettings(preprocessor=v)
    return None


def get_default_settings_main(model_base: BaseModelType) -> Optional[MainModelDefaultSettings]:
    if model_base is BaseModelType.StableDiffusion1 or model_base is BaseModelType.StableDiffusion2:
        return MainModelDefaultSettings(width=512, height=512)
    elif model_base is BaseModelType.StableDiffusionXL:
        return MainModelDefaultSettings(width=1024, height=1024)
    # We don't provide defaults for BaseModelType.StableDiffusionXLRefiner, as they are not standalone models.
    return None


# ##################################################3
# Checkpoint probing
# ##################################################3


class CheckpointProbeBase(ProbeBase):
    def __init__(self, model_path: Path):
        super().__init__(model_path)
        self.checkpoint = ModelProbe._scan_and_load_checkpoint(model_path)

    def get_format(self) -> ModelFormat:
        state_dict = self.checkpoint.get("state_dict") or self.checkpoint
        if "double_blocks.0.img_attn.proj.weight.quant_state.bitsandbytes__nf4" in state_dict:
            return ModelFormat.BnbQuantizednf4b
        return ModelFormat("checkpoint")

    def get_variant_type(self) -> ModelVariantType:
        model_type = ModelProbe.get_model_type_from_checkpoint(self.model_path, self.checkpoint)
        base_type = self.get_base_type()
        if model_type != ModelType.Main or base_type == BaseModelType.Flux:
            return ModelVariantType.Normal
        state_dict = self.checkpoint.get("state_dict") or self.checkpoint
        in_channels = state_dict["model.diffusion_model.input_blocks.0.0.weight"].shape[1]
        if in_channels == 9:
            return ModelVariantType.Inpaint
        elif in_channels == 5:
            return ModelVariantType.Depth
        elif in_channels == 4:
            return ModelVariantType.Normal
        else:
            raise InvalidModelConfigException(
                f"Cannot determine variant type (in_channels={in_channels}) at {self.model_path}"
            )


class PipelineCheckpointProbe(CheckpointProbeBase):
    def get_base_type(self) -> BaseModelType:
        checkpoint = self.checkpoint
        state_dict = self.checkpoint.get("state_dict") or checkpoint
        if "double_blocks.0.img_attn.norm.key_norm.scale" in state_dict:
            return BaseModelType.Flux
        key_name = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
        if key_name in state_dict and state_dict[key_name].shape[-1] == 768:
            return BaseModelType.StableDiffusion1
        if key_name in state_dict and state_dict[key_name].shape[-1] == 1024:
            return BaseModelType.StableDiffusion2
        key_name = "model.diffusion_model.input_blocks.4.1.transformer_blocks.0.attn2.to_k.weight"
        if key_name in state_dict and state_dict[key_name].shape[-1] == 2048:
            return BaseModelType.StableDiffusionXL
        elif key_name in state_dict and state_dict[key_name].shape[-1] == 1280:
            return BaseModelType.StableDiffusionXLRefiner
        else:
            raise InvalidModelConfigException("Cannot determine base type")

    def get_scheduler_prediction_type(self) -> SchedulerPredictionType:
        """Return model prediction type."""
        type = self.get_base_type()
        if type == BaseModelType.StableDiffusion2:
            checkpoint = self.checkpoint
            state_dict = self.checkpoint.get("state_dict") or checkpoint
            key_name = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
            if key_name in state_dict and state_dict[key_name].shape[-1] == 1024:
                if "global_step" in checkpoint:
                    if checkpoint["global_step"] == 220000:
                        return SchedulerPredictionType.Epsilon
                    elif checkpoint["global_step"] == 110000:
                        return SchedulerPredictionType.VPrediction
            return SchedulerPredictionType.VPrediction  # a guess for sd2 ckpts

        elif type == BaseModelType.StableDiffusion1:
            return SchedulerPredictionType.Epsilon  # a reasonable guess for sd1 ckpts
        else:
            return SchedulerPredictionType.Epsilon


class VaeCheckpointProbe(CheckpointProbeBase):
    def get_base_type(self) -> BaseModelType:
        # VAEs of all base types have the same structure, so we wimp out and
        # guess using the name.
        for regexp, basetype in [
            (r"xl", BaseModelType.StableDiffusionXL),
            (r"sd2", BaseModelType.StableDiffusion2),
            (r"vae", BaseModelType.StableDiffusion1),
            (r"FLUX.1-schnell_ae", BaseModelType.Flux),
        ]:
            if re.search(regexp, self.model_path.name, re.IGNORECASE):
                return basetype
        raise InvalidModelConfigException("Cannot determine base type")


class LoRACheckpointProbe(CheckpointProbeBase):
    """Class for LoRA checkpoints."""

    def get_format(self) -> ModelFormat:
        return ModelFormat("lycoris")

    def get_base_type(self) -> BaseModelType:
        checkpoint = self.checkpoint
        token_vector_length = lora_token_vector_length(checkpoint)

        if token_vector_length == 768:
            return BaseModelType.StableDiffusion1
        elif token_vector_length == 1024:
            return BaseModelType.StableDiffusion2
        elif token_vector_length == 1280:
            return BaseModelType.StableDiffusionXL  # recognizes format at https://civitai.com/models/224641
        elif token_vector_length == 2048:
            return BaseModelType.StableDiffusionXL
        else:
            raise InvalidModelConfigException(f"Unknown LoRA type: {self.model_path}")


class TextualInversionCheckpointProbe(CheckpointProbeBase):
    """Class for probing embeddings."""

    def get_format(self) -> ModelFormat:
        return ModelFormat.EmbeddingFile

    def get_base_type(self) -> BaseModelType:
        checkpoint = self.checkpoint
        if "string_to_token" in checkpoint:
            token_dim = list(checkpoint["string_to_param"].values())[0].shape[-1]
        elif "emb_params" in checkpoint:
            token_dim = checkpoint["emb_params"].shape[-1]
        elif "clip_g" in checkpoint:
            token_dim = checkpoint["clip_g"].shape[-1]
        else:
            token_dim = list(checkpoint.values())[0].shape[0]
        if token_dim == 768:
            return BaseModelType.StableDiffusion1
        elif token_dim == 1024:
            return BaseModelType.StableDiffusion2
        elif token_dim == 1280:
            return BaseModelType.StableDiffusionXL
        else:
            raise InvalidModelConfigException(f"{self.model_path}: Could not determine base type")


class ControlNetCheckpointProbe(CheckpointProbeBase):
    """Class for probing controlnets."""

    def get_base_type(self) -> BaseModelType:
        checkpoint = self.checkpoint
        for key_name in (
            "control_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight",
            "controlnet_mid_block.bias",
            "input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight",
            "down_blocks.1.attentions.0.transformer_blocks.0.attn2.to_k.weight",
        ):
            if key_name not in checkpoint:
                continue
            width = checkpoint[key_name].shape[-1]
            if width == 768:
                return BaseModelType.StableDiffusion1
            elif width == 1024:
                return BaseModelType.StableDiffusion2
            elif width == 2048:
                return BaseModelType.StableDiffusionXL
            elif width == 1280:
                return BaseModelType.StableDiffusionXL
        raise InvalidModelConfigException(f"{self.model_path}: Unable to determine base type")


class IPAdapterCheckpointProbe(CheckpointProbeBase):
    """Class for probing IP Adapters"""

    def get_base_type(self) -> BaseModelType:
        checkpoint = self.checkpoint
        for key in checkpoint.keys():
            if not key.startswith(("image_proj.", "ip_adapter.")):
                continue
            cross_attention_dim = checkpoint["ip_adapter.1.to_k_ip.weight"].shape[-1]
            if cross_attention_dim == 768:
                return BaseModelType.StableDiffusion1
            elif cross_attention_dim == 1024:
                return BaseModelType.StableDiffusion2
            elif cross_attention_dim == 2048:
                return BaseModelType.StableDiffusionXL
            else:
                raise InvalidModelConfigException(
                    f"IP-Adapter had unexpected cross-attention dimension: {cross_attention_dim}."
                )
        raise InvalidModelConfigException(f"{self.model_path}: Unable to determine base type")


class CLIPVisionCheckpointProbe(CheckpointProbeBase):
    def get_base_type(self) -> BaseModelType:
        raise NotImplementedError()


class T2IAdapterCheckpointProbe(CheckpointProbeBase):
    def get_base_type(self) -> BaseModelType:
        raise NotImplementedError()


class SpandrelImageToImageCheckpointProbe(CheckpointProbeBase):
    def get_base_type(self) -> BaseModelType:
        return BaseModelType.Any


########################################################
# classes for probing folders
#######################################################
class FolderProbeBase(ProbeBase):
    def get_variant_type(self) -> ModelVariantType:
        return ModelVariantType.Normal

    def get_format(self) -> ModelFormat:
        return ModelFormat("diffusers")

    def get_repo_variant(self) -> ModelRepoVariant:
        # get all files ending in .bin or .safetensors
        weight_files = list(self.model_path.glob("**/*.safetensors"))
        weight_files.extend(list(self.model_path.glob("**/*.bin")))
        for x in weight_files:
            if ".fp16" in x.suffixes:
                return ModelRepoVariant.FP16
            if "openvino_model" in x.name:
                return ModelRepoVariant.OpenVINO
            if "flax_model" in x.name:
                return ModelRepoVariant.Flax
            if x.suffix == ".onnx":
                return ModelRepoVariant.ONNX
        return ModelRepoVariant.Default


class PipelineFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        with open(self.model_path / "unet" / "config.json", "r") as file:
            unet_conf = json.load(file)
        if unet_conf["cross_attention_dim"] == 768:
            return BaseModelType.StableDiffusion1
        elif unet_conf["cross_attention_dim"] == 1024:
            return BaseModelType.StableDiffusion2
        elif unet_conf["cross_attention_dim"] == 1280:
            return BaseModelType.StableDiffusionXLRefiner
        elif unet_conf["cross_attention_dim"] == 2048:
            return BaseModelType.StableDiffusionXL
        else:
            raise InvalidModelConfigException(f"Unknown base model for {self.model_path}")

    def get_scheduler_prediction_type(self) -> SchedulerPredictionType:
        with open(self.model_path / "scheduler" / "scheduler_config.json", "r") as file:
            scheduler_conf = json.load(file)
        if scheduler_conf.get("prediction_type", "epsilon") == "v_prediction":
            return SchedulerPredictionType.VPrediction
        elif scheduler_conf.get("prediction_type", "epsilon") == "epsilon":
            return SchedulerPredictionType.Epsilon
        else:
            raise InvalidModelConfigException("Unknown scheduler prediction type: {scheduler_conf['prediction_type']}")

    def get_variant_type(self) -> ModelVariantType:
        # This only works for pipelines! Any kind of
        # exception results in our returning the
        # "normal" variant type
        try:
            config_file = self.model_path / "unet" / "config.json"
            with open(config_file, "r") as file:
                conf = json.load(file)

            in_channels = conf["in_channels"]
            if in_channels == 9:
                return ModelVariantType.Inpaint
            elif in_channels == 5:
                return ModelVariantType.Depth
            elif in_channels == 4:
                return ModelVariantType.Normal
        except Exception:
            pass
        return ModelVariantType.Normal


class VaeFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        if self._config_looks_like_sdxl():
            return BaseModelType.StableDiffusionXL
        elif self._name_looks_like_sdxl():
            # but SD and SDXL VAE are the same shape (3-channel RGB to 4-channel float scaled down
            # by a factor of 8), we can't necessarily tell them apart by config hyperparameters.
            return BaseModelType.StableDiffusionXL
        else:
            return BaseModelType.StableDiffusion1

    def _config_looks_like_sdxl(self) -> bool:
        # config values that distinguish Stability's SD 1.x VAE from their SDXL VAE.
        config_file = self.model_path / "config.json"
        if not config_file.exists():
            raise InvalidModelConfigException(f"Cannot determine base type for {self.model_path}")
        with open(config_file, "r") as file:
            config = json.load(file)
        return config.get("scaling_factor", 0) == 0.13025 and config.get("sample_size") in [512, 1024]

    def _name_looks_like_sdxl(self) -> bool:
        return bool(re.search(r"xl\b", self._guess_name(), re.IGNORECASE))

    def _guess_name(self) -> str:
        name = self.model_path.name
        if name == "vae":
            name = self.model_path.parent.name
        return name


class TextualInversionFolderProbe(FolderProbeBase):
    def get_format(self) -> ModelFormat:
        return ModelFormat.EmbeddingFolder

    def get_base_type(self) -> BaseModelType:
        path = self.model_path / "learned_embeds.bin"
        if not path.exists():
            raise InvalidModelConfigException(
                f"{self.model_path.as_posix()} does not contain expected 'learned_embeds.bin' file"
            )
        return TextualInversionCheckpointProbe(path).get_base_type()


class T5EncoderFolderProbe(FolderProbeBase):
    def get_format(self) -> ModelFormat:
        return ModelFormat.T5Encoder


class ONNXFolderProbe(PipelineFolderProbe):
    def get_base_type(self) -> BaseModelType:
        # Due to the way the installer is set up, the configuration file for safetensors
        # will come along for the ride if both the onnx and safetensors forms
        # share the same directory. We take advantage of this here.
        if (self.model_path / "unet" / "config.json").exists():
            return super().get_base_type()
        else:
            logger.warning('Base type probing is not implemented for ONNX models. Assuming "sd-1"')
            return BaseModelType.StableDiffusion1

    def get_format(self) -> ModelFormat:
        return ModelFormat("onnx")

    def get_variant_type(self) -> ModelVariantType:
        return ModelVariantType.Normal


class ControlNetFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        config_file = self.model_path / "config.json"
        if not config_file.exists():
            raise InvalidModelConfigException(f"Cannot determine base type for {self.model_path}")
        with open(config_file, "r") as file:
            config = json.load(file)
        # no obvious way to distinguish between sd2-base and sd2-768
        dimension = config["cross_attention_dim"]
        base_model = (
            BaseModelType.StableDiffusion1
            if dimension == 768
            else (
                BaseModelType.StableDiffusion2
                if dimension == 1024
                else BaseModelType.StableDiffusionXL
                if dimension == 2048
                else None
            )
        )
        if not base_model:
            raise InvalidModelConfigException(f"Unable to determine model base for {self.model_path}")
        return base_model


class LoRAFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        model_file = None
        for suffix in ["safetensors", "bin"]:
            base_file = self.model_path / f"pytorch_lora_weights.{suffix}"
            if base_file.exists():
                model_file = base_file
                break
        if not model_file:
            raise InvalidModelConfigException("Unknown LoRA format encountered")
        return LoRACheckpointProbe(model_file).get_base_type()


class IPAdapterFolderProbe(FolderProbeBase):
    def get_format(self) -> ModelFormat:
        return ModelFormat.InvokeAI

    def get_base_type(self) -> BaseModelType:
        model_file = self.model_path / "ip_adapter.bin"
        if not model_file.exists():
            raise InvalidModelConfigException("Unknown IP-Adapter model format.")

        state_dict = torch.load(model_file, map_location="cpu")
        cross_attention_dim = state_dict["ip_adapter"]["1.to_k_ip.weight"].shape[-1]
        if cross_attention_dim == 768:
            return BaseModelType.StableDiffusion1
        elif cross_attention_dim == 1024:
            return BaseModelType.StableDiffusion2
        elif cross_attention_dim == 2048:
            return BaseModelType.StableDiffusionXL
        else:
            raise InvalidModelConfigException(
                f"IP-Adapter had unexpected cross-attention dimension: {cross_attention_dim}."
            )

    def get_image_encoder_model_id(self) -> Optional[str]:
        encoder_id_path = self.model_path / "image_encoder.txt"
        if not encoder_id_path.exists():
            return None
        with open(encoder_id_path, "r") as f:
            image_encoder_model = f.readline().strip()
        return image_encoder_model


class CLIPVisionFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        return BaseModelType.Any


class CLIPEmbedFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        return BaseModelType.Any


class SpandrelImageToImageFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        raise NotImplementedError()


class T2IAdapterFolderProbe(FolderProbeBase):
    def get_base_type(self) -> BaseModelType:
        config_file = self.model_path / "config.json"
        if not config_file.exists():
            raise InvalidModelConfigException(f"Cannot determine base type for {self.model_path}")
        with open(config_file, "r") as file:
            config = json.load(file)

        adapter_type = config.get("adapter_type", None)
        if adapter_type == "full_adapter_xl":
            return BaseModelType.StableDiffusionXL
        elif adapter_type == "full_adapter" or "light_adapter":
            # I haven't seen any T2I adapter models for SD2, so assume that this is an SD1 adapter.
            return BaseModelType.StableDiffusion1
        else:
            raise InvalidModelConfigException(
                f"Unable to determine base model for '{self.model_path}' (adapter_type = {adapter_type})."
            )


# Register probe classes
ModelProbe.register_probe("diffusers", ModelType.Main, PipelineFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.VAE, VaeFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.LoRA, LoRAFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.TextualInversion, TextualInversionFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.T5Encoder, T5EncoderFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.ControlNet, ControlNetFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.IPAdapter, IPAdapterFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.CLIPEmbed, CLIPEmbedFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.CLIPVision, CLIPVisionFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.T2IAdapter, T2IAdapterFolderProbe)
ModelProbe.register_probe("diffusers", ModelType.SpandrelImageToImage, SpandrelImageToImageFolderProbe)

ModelProbe.register_probe("checkpoint", ModelType.Main, PipelineCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.VAE, VaeCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.LoRA, LoRACheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.TextualInversion, TextualInversionCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.ControlNet, ControlNetCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.IPAdapter, IPAdapterCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.CLIPVision, CLIPVisionCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.T2IAdapter, T2IAdapterCheckpointProbe)
ModelProbe.register_probe("checkpoint", ModelType.SpandrelImageToImage, SpandrelImageToImageCheckpointProbe)

ModelProbe.register_probe("onnx", ModelType.ONNX, ONNXFolderProbe)