'''
Migrate the models directory and models.yaml file from an existing
InvokeAI 2.3 installation to 3.0.0.
'''

import os
import argparse
import shutil
import yaml

import transformers
import diffusers
import warnings

from dataclasses import dataclass
from pathlib import Path
from omegaconf import OmegaConf, DictConfig
from typing import Union

from diffusers import StableDiffusionPipeline, AutoencoderKL
from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker
from transformers import (
    CLIPTextModel,
    CLIPTokenizer,
    AutoFeatureExtractor,
    BertTokenizerFast,
)

import invokeai.backend.util.logging as logger
from invokeai.app.services.config import InvokeAIAppConfig
from invokeai.backend.model_management import ModelManager
from invokeai.backend.model_management.model_probe import (
    ModelProbe, ModelType, BaseModelType, ModelProbeInfo
    )

warnings.filterwarnings("ignore")
transformers.logging.set_verbosity_error()
diffusers.logging.set_verbosity_error()

# holder for paths that we will migrate
@dataclass
class ModelPaths:
    models: Path
    embeddings: Path
    loras: Path
    controlnets: Path

class MigrateTo3(object):
    def __init__(self,
                 from_root: Path,
                 to_models: Path,
                 model_manager: ModelManager,
                 src_paths: ModelPaths,
                 ):
        self.root_directory = from_root
        self.dest_models = to_models
        self.mgr = model_manager
        self.src_paths = src_paths
        
    @classmethod
    def initialize_yaml(cls, yaml_file: Path):
        with open(yaml_file, 'w') as file:
            file.write(
                yaml.dump(
                    {
                        '__metadata__': {'version':'3.0.0'}
                    }
                )
            )
    
    def create_directory_structure(self):
        '''
        Create the basic directory structure for the models folder.
        '''
        for model_base in [BaseModelType.StableDiffusion1,BaseModelType.StableDiffusion2]:
            for model_type in [ModelType.Main, ModelType.Vae, ModelType.Lora,
                               ModelType.ControlNet,ModelType.TextualInversion]:
                path = self.dest_models / model_base.value / model_type.value
                path.mkdir(parents=True, exist_ok=True)
        path = self.dest_models / 'core'
        path.mkdir(parents=True, exist_ok=True)

    @staticmethod
    def copy_file(src:Path,dest:Path):
        '''
        copy a single file with logging
        '''
        if dest.exists():
            logger.info(f'Skipping existing {str(dest)}')
            return
        logger.info(f'Copying {str(src)} to {str(dest)}')
        try:
            shutil.copy(src, dest)
        except Exception as e:
            logger.error(f'COPY FAILED: {str(e)}')

    @staticmethod
    def copy_dir(src:Path,dest:Path):
        '''
        Recursively copy a directory with logging
        '''
        if dest.exists():
            logger.info(f'Skipping existing {str(dest)}')
            return
        
        logger.info(f'Copying {str(src)} to {str(dest)}')
        try:
            shutil.copytree(src, dest)
        except Exception as e:
            logger.error(f'COPY FAILED: {str(e)}')

    def migrate_models(self, src_dir: Path):
        '''
        Recursively walk through src directory, probe anything
        that looks like a model, and copy the model into the
        appropriate location within the destination models directory.
        '''
        directories_scanned = set()
        for root, dirs, files in os.walk(src_dir):
            for d in dirs:
                try:
                    model = Path(root,d)
                    info = ModelProbe().heuristic_probe(model)
                    if not info:
                        continue
                    dest = self._model_probe_to_path(info) / model.name
                    self.copy_dir(model, dest)
                    directories_scanned.add(model)
                except Exception as e:
                    logger.error(str(e))
                except KeyboardInterrupt:
                    raise
                except Exception as e:
                    logger.error(str(e))
            for f in files:
                # don't copy raw learned_embeds.bin or pytorch_lora_weights.bin
                # let them be copied as part of a tree copy operation
                try:
                    if f in {'learned_embeds.bin','pytorch_lora_weights.bin'}:
                        continue
                    model = Path(root,f)
                    if model.parent in directories_scanned:
                        continue
                    info = ModelProbe().heuristic_probe(model)
                    if not info:
                        continue
                    dest = self._model_probe_to_path(info) / f
                    self.copy_file(model, dest)
                except Exception as e:
                    logger.error(str(e))
                except KeyboardInterrupt:
                    raise
                except Exception as e:
                    logger.error(str(e))

    def migrate_support_models(self):
        '''
        Copy the clipseg, upscaler, and restoration models to their new
        locations.
        '''
        dest_directory = self.dest_models
        if (self.root_directory / 'models/clipseg').exists():
            self.copy_dir(self.root_directory / 'models/clipseg', dest_directory / 'core/misc/clipseg')
        if (self.root_directory / 'models/realesrgan').exists():
            self.copy_dir(self.root_directory / 'models/realesrgan', dest_directory / 'core/upscaling/realesrgan')
        for d in ['codeformer','gfpgan']:
            path = self.root_directory / 'models' / d
            if path.exists():
                self.copy_dir(path,dest_directory / f'core/face_restoration/{d}')

    def migrate_tuning_models(self):
        '''
        Migrate the embeddings, loras and controlnets directories to their new homes.
        '''
        for src in [self.src_paths.embeddings, self.src_paths.loras, self.src_paths.controlnets]:
            if not src:
                continue
            if src.is_dir():
                logger.info(f'Scanning {src}')
                self.migrate_models(src)
            else:
                logger.info(f'{src} directory not found; skipping')
                continue

    def migrate_conversion_models(self):
        '''
        Migrate all the models that are needed by the ckpt_to_diffusers conversion
        script.
        '''

        dest_directory = self.dest_models
        kwargs = dict(
            cache_dir = self.root_directory / 'models/hub',
            #local_files_only = True
        )
        try:
            logger.info('Migrating core tokenizers and text encoders')
            target_dir = dest_directory / 'core' / 'convert'

            self._migrate_pretrained(BertTokenizerFast,
                                     repo_id='bert-base-uncased',
                                     dest = target_dir / 'bert-base-uncased',
                                     **kwargs)

            # sd-1
            repo_id = 'openai/clip-vit-large-patch14'
            self._migrate_pretrained(CLIPTokenizer,
                                     repo_id= repo_id,
                                     dest= target_dir / 'clip-vit-large-patch14',
                                     **kwargs)
            self._migrate_pretrained(CLIPTextModel,
                                     repo_id = repo_id,
                                     dest = target_dir / 'clip-vit-large-patch14',
                                     force = True,
                                     **kwargs)

            # sd-2
            repo_id = "stabilityai/stable-diffusion-2"
            self._migrate_pretrained(CLIPTokenizer,
                                     repo_id = repo_id,
                                     dest = target_dir / 'stable-diffusion-2-clip' / 'tokenizer',
                                     **{'subfolder':'tokenizer',**kwargs}
                                     )
            self._migrate_pretrained(CLIPTextModel,
                                     repo_id = repo_id,
                                     dest = target_dir / 'stable-diffusion-2-clip' / 'text_encoder',
                                     **{'subfolder':'text_encoder',**kwargs}
                                     )

            # VAE
            logger.info('Migrating stable diffusion VAE')
            self._migrate_pretrained(AutoencoderKL,
                                     repo_id = 'stabilityai/sd-vae-ft-mse',
                                     dest = target_dir / 'sd-vae-ft-mse',
                                     **kwargs)
            
            # safety checking
            logger.info('Migrating safety checker')
            repo_id = "CompVis/stable-diffusion-safety-checker"
            self._migrate_pretrained(AutoFeatureExtractor,
                                     repo_id = repo_id,
                                     dest = target_dir / 'stable-diffusion-safety-checker',
                                     **kwargs)
            self._migrate_pretrained(StableDiffusionSafetyChecker,
                                     repo_id = repo_id,
                                     dest = target_dir / 'stable-diffusion-safety-checker',
                                     **kwargs)
        except KeyboardInterrupt:
            raise
        except Exception as e:
            logger.error(str(e))

    def _model_probe_to_path(self, info: ModelProbeInfo)->Path:
        return Path(self.dest_models, info.base_type.value, info.model_type.value)

    def _migrate_pretrained(self, model_class, repo_id: str, dest: Path, force:bool=False, **kwargs):
        if dest.exists() and not force:
            logger.info(f'Skipping existing {dest}')
            return
        model = model_class.from_pretrained(repo_id, **kwargs)
        self._save_pretrained(model, dest, overwrite=force)

    def _save_pretrained(self, model, dest: Path, overwrite: bool=False):
        model_name = dest.name
        if overwrite:
            model.save_pretrained(dest, safe_serialization=True)
        else:
            download_path = dest.with_name(f'{model_name}.downloading')
            model.save_pretrained(download_path, safe_serialization=True)
            download_path.replace(dest)

    def _download_vae(self, repo_id: str, subfolder:str=None)->Path:
        vae = AutoencoderKL.from_pretrained(repo_id, cache_dir=self.root_directory / 'models/hub', subfolder=subfolder)
        info = ModelProbe().heuristic_probe(vae)
        _, model_name = repo_id.split('/')
        dest = self._model_probe_to_path(info) / self.unique_name(model_name, info)
        vae.save_pretrained(dest, safe_serialization=True)
        return dest

    def _vae_path(self, vae: Union[str,dict])->Path:
        '''
        Convert 2.3 VAE stanza to a straight path.
        '''
        vae_path = None
        
        # First get a path
        if isinstance(vae,str):
            vae_path = vae

        elif isinstance(vae,DictConfig):
            if p := vae.get('path'):
                vae_path = p
            elif repo_id := vae.get('repo_id'):
                if repo_id=='stabilityai/sd-vae-ft-mse':  # this guy is already downloaded
                    vae_path = 'models/core/convert/sd-vae-ft-mse'
                    return vae_path
                else:
                    vae_path = self._download_vae(repo_id, vae.get('subfolder'))

        assert vae_path is not None, "Couldn't find VAE for this model"

        # if the VAE is in the old models directory, then we must move it into the new
        # one. VAEs outside of this directory can stay where they are.
        vae_path = Path(vae_path)
        if vae_path.is_relative_to(self.src_paths.models):
            info = ModelProbe().heuristic_probe(vae_path)
            dest = self._model_probe_to_path(info) / vae_path.name
            if not dest.exists():
                if vae_path.is_dir():
                    self.copy_dir(vae_path,dest)
                else:
                    self.copy_file(vae_path,dest)
            vae_path = dest

        if vae_path.is_relative_to(self.dest_models):
            rel_path = vae_path.relative_to(self.dest_models)
            return Path('models',rel_path)
        else:
            return vae_path

    def migrate_repo_id(self, repo_id: str, model_name: str=None, **extra_config):
        '''
        Migrate a locally-cached diffusers pipeline identified with a repo_id
        '''
        dest_dir = self.dest_models
        
        cache = self.root_directory / 'models/hub'
        kwargs = dict(
            cache_dir = cache,
            safety_checker = None,
            # local_files_only = True,
        )

        owner,repo_name = repo_id.split('/')
        model_name = model_name or repo_name
        model = cache / '--'.join(['models',owner,repo_name])
        
        if len(list(model.glob('snapshots/**/model_index.json')))==0:
            return
        revisions = [x.name for x in model.glob('refs/*')]

        # if an fp16 is available we use that
        revision = 'fp16' if len(revisions) > 1 and 'fp16' in revisions else revisions[0]
        pipeline = StableDiffusionPipeline.from_pretrained(
            repo_id,
            revision=revision,
            **kwargs)

        info = ModelProbe().heuristic_probe(pipeline)
        if not info:
            return

        if self.mgr.model_exists(model_name, info.base_type, info.model_type):
            logger.warning(f'A model named {model_name} already exists at the destination. Skipping migration.')
            return

        dest = self._model_probe_to_path(info) / model_name
        self._save_pretrained(pipeline, dest)
            
        rel_path = Path('models',dest.relative_to(dest_dir))
        self._add_model(model_name, info, rel_path, **extra_config)

    def migrate_path(self, location: Path, model_name: str=None, **extra_config):
        '''
        Migrate a model referred to using 'weights' or 'path'
        '''

        # handle relative paths
        dest_dir = self.dest_models
        location = self.root_directory / location
        model_name = model_name or location.stem
        
        info = ModelProbe().heuristic_probe(location)
        if not info:
            return
        
        if self.mgr.model_exists(model_name, info.base_type, info.model_type):
            logger.warning(f'A model named {model_name} already exists at the destination. Skipping migration.')
            return

        # uh oh, weights is in the old models directory - move it into the new one
        if Path(location).is_relative_to(self.src_paths.models):
            dest = Path(dest_dir, info.base_type.value, info.model_type.value, location.name)
            if location.is_dir():
                self.copy_dir(location,dest)
            else:
                self.copy_file(location,dest)
            location = Path('models', info.base_type.value, info.model_type.value, location.name)

        self._add_model(model_name, info, location, **extra_config)

    def _add_model(self,
                   model_name: str,
                   info: ModelProbeInfo,
                   location: Path,
                   **extra_config):
        if info.model_type != ModelType.Main:
            return
        
        self.mgr.add_model(
            model_name = model_name,
            base_model = info.base_type,
            model_type = info.model_type,
            clobber = True,
            model_attributes = {
                'path': str(location),
                'description': f'A {info.base_type.value} {info.model_type.value} model',
                'model_format': info.format,
                'variant': info.variant_type.value,
                **extra_config,
            }
        )
        
    def migrate_defined_models(self):
        '''
        Migrate models defined in models.yaml
        '''
        # find any models referred to in old models.yaml
        conf = OmegaConf.load(self.root_directory / 'configs/models.yaml')
        
        for model_name, stanza in conf.items():

            try:
                passthru_args = {}
                
                if vae := stanza.get('vae'):
                    try:
                        passthru_args['vae'] = str(self._vae_path(vae))
                    except Exception as e:
                        logger.warning(f'Could not find a VAE matching "{vae}" for model "{model_name}"')
                        logger.warning(str(e))

                if config := stanza.get('config'):
                    passthru_args['config'] = config

                if description:= stanza.get('description'):
                    passthru_args['description'] = description
                
                if repo_id := stanza.get('repo_id'):
                    logger.info(f'Migrating diffusers model {model_name}')
                    self.migrate_repo_id(repo_id, model_name, **passthru_args)

                elif location := stanza.get('weights'):
                    logger.info(f'Migrating checkpoint model {model_name}')
                    self.migrate_path(Path(location), model_name, **passthru_args)
                    
                elif location := stanza.get('path'):
                    logger.info(f'Migrating diffusers model {model_name}')
                    self.migrate_path(Path(location), model_name, **passthru_args)
                    
            except KeyboardInterrupt:
                raise
            except Exception as e:
                logger.error(str(e))
                    
    def migrate(self):
        self.create_directory_structure()
        # the configure script is doing this
        self.migrate_support_models()
        self.migrate_conversion_models()
        self.migrate_tuning_models()
        self.migrate_defined_models()

def _parse_legacy_initfile(root: Path, initfile: Path)->ModelPaths:
    '''
    Returns tuple of (embedding_path, lora_path, controlnet_path)
    '''
    parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
    parser.add_argument(
        '--embedding_directory',
        '--embedding_path',
        type=Path,
        dest='embedding_path',
        default=Path('embeddings'),
    )
    parser.add_argument(
        '--lora_directory',
        dest='lora_path',
        type=Path,
        default=Path('loras'),
    )
    opt,_ = parser.parse_known_args([f'@{str(initfile)}'])
    return ModelPaths(
        models = root / 'models',
        embeddings = root / str(opt.embedding_path).strip('"'),
        loras = root / str(opt.lora_path).strip('"'),
        controlnets = root / 'controlnets',
    )

def _parse_legacy_yamlfile(root: Path, initfile: Path)->ModelPaths:
    '''
    Returns tuple of (embedding_path, lora_path, controlnet_path)
    '''
    # Don't use the config object because it is unforgiving of version updates
    # Just use omegaconf directly
    opt = OmegaConf.load(initfile)
    paths = opt.InvokeAI.Paths
    models = paths.get('models_dir','models')
    embeddings = paths.get('embedding_dir','embeddings')
    loras = paths.get('lora_dir','loras')
    controlnets = paths.get('controlnet_dir','controlnets')
    return ModelPaths(
        models = root / models,
        embeddings = root / embeddings,
        loras = root /loras,
        controlnets = root / controlnets,
    )
    
def get_legacy_embeddings(root: Path) -> ModelPaths:
    path = root / 'invokeai.init'
    if path.exists():
        return _parse_legacy_initfile(root, path)
    path = root / 'invokeai.yaml'
    if path.exists():
        return _parse_legacy_yamlfile(root, path)

def do_migrate(src_directory: Path, dest_directory: Path):
    """
    Migrate models from src to dest InvokeAI root directories
    """
    config_file = dest_directory / 'configs' / 'models.yaml.3'
    dest_models = dest_directory / 'models.3'
    
    version_3 = (dest_directory / 'models' / 'core').exists()

    # Here we create the destination models.yaml file.
    # If we are writing into a version 3 directory and the
    # file already exists, then we write into a copy of it to
    # avoid deleting its previous customizations. Otherwise we
    # create a new empty one.
    if version_3:  # write into the dest directory
        try:
            shutil.copy(dest_directory / 'configs' / 'models.yaml', config_file)
        except:
            MigrateTo3.initialize_yaml(config_file)
        mgr = ModelManager(config_file) # important to initialize BEFORE moving the models directory
        (dest_directory / 'models').replace(dest_models)
    else:
        MigrateTo3.initialize_yaml(config_file)
        mgr = ModelManager(config_file)
    
    paths = get_legacy_embeddings(src_directory)
    migrator = MigrateTo3(
        from_root = src_directory,
        to_models = dest_models,
        model_manager = mgr,
        src_paths = paths
    )
    migrator.migrate()
    print("Migration successful.")

    if not version_3:
        (dest_directory / 'models').replace(src_directory / 'models.orig')
        print(f'Original models directory moved to {dest_directory}/models.orig')
        
    (dest_directory / 'configs' / 'models.yaml').replace(src_directory / 'configs' / 'models.yaml.orig')
    print(f'Original models.yaml file moved to {dest_directory}/configs/models.yaml.orig')
    
    config_file.replace(config_file.with_suffix(''))
    dest_models.replace(dest_models.with_suffix(''))
    
def main():
    parser = argparse.ArgumentParser(prog="invokeai-migrate3",
                                     description="""
This will copy and convert the models directory and the configs/models.yaml from the InvokeAI 2.3 format 
'--from-directory' root to the InvokeAI 3.0 '--to-directory' root. These may be abbreviated '--from' and '--to'.a

The old models directory and config file will be renamed 'models.orig' and 'models.yaml.orig' respectively.
It is safe to provide the same directory for both arguments, but it is better to use the invokeai_configure
script, which will perform a full upgrade in place."""
                                     )
    parser.add_argument('--from-directory',
                        dest='src_root',
                        type=Path,
                        required=True,
                        help='Source InvokeAI 2.3 root directory (containing "invokeai.init" or "invokeai.yaml")'
                        )
    parser.add_argument('--to-directory',
                        dest='dest_root',
                        type=Path,
                        required=True,
                        help='Destination InvokeAI 3.0 directory (containing "invokeai.yaml")'
                        )
    args = parser.parse_args()
    src_root = args.src_root
    assert src_root.is_dir(), f"{src_root} is not a valid directory"
    assert (src_root / 'models').is_dir(), f"{src_root} does not contain a 'models' subdirectory"
    assert (src_root / 'models' / 'hub').exists(), f"{src_root} does not contain a version 2.3 models directory"
    assert (src_root / 'invokeai.init').exists() or (src_root / 'invokeai.yaml').exists(), f"{src_root} does not contain an InvokeAI init file."

    dest_root = args.dest_root
    assert dest_root.is_dir(), f"{dest_root} is not a valid directory"
    config = InvokeAIAppConfig.get_config()
    config.parse_args(['--root',str(dest_root)])

    # TODO: revisit - don't rely on invokeai.yaml to exist yet!
    dest_is_setup = (dest_root / 'models/core').exists() and (dest_root / 'databases').exists()
    if not dest_is_setup:
        import invokeai.frontend.install.invokeai_configure
        from invokeai.backend.install.invokeai_configure import initialize_rootdir
        initialize_rootdir(dest_root, True)

    do_migrate(src_root,dest_root)

if __name__ == '__main__':
    main()