InvokeAI/ldm/models/diffusion/ksampler.py

301 lines
10 KiB
Python
Raw Normal View History

"""wrapper around part of Katherine Crowson's k-diffusion library, making it call compatible with other Samplers"""
2022-10-12 21:29:48 +00:00
from enum import Enum
import k_diffusion as K
2022-08-21 23:57:48 +00:00
import torch
import torch.nn as nn
from ldm.invoke.devices import choose_torch_device
from ldm.models.diffusion.sampler import Sampler
from ldm.util import rand_perlin_2d
from ldm.modules.diffusionmodules.util import (
make_ddim_sampling_parameters,
make_ddim_timesteps,
noise_like,
extract_into_tensor,
)
from ldm.models.diffusion.cross_attention import CrossAttentionControl, CrossAttentionControllableDiffusionMixin
def cfg_apply_threshold(result, threshold = 0.0, scale = 0.7):
if threshold <= 0.0:
return result
maxval = 0.0 + torch.max(result).cpu().numpy()
minval = 0.0 + torch.min(result).cpu().numpy()
if maxval < threshold and minval > -threshold:
return result
if maxval > threshold:
maxval = min(max(1, scale*maxval), threshold)
if minval < -threshold:
minval = max(min(-1, scale*minval), -threshold)
return torch.clamp(result, min=minval, max=maxval)
class CFGDenoiser(nn.Module, CrossAttentionControllableDiffusionMixin):
def __init__(self, model, threshold = 0, warmup = 0):
super().__init__()
self.inner_model = model
self.threshold = threshold
self.warmup_max = warmup
self.warmup = max(warmup / 10, 1)
def prepare_to_sample(self, t_enc, **kwargs):
edited_conditioning = kwargs.get('edited_conditioning', None)
conditioning_edit_opcodes = kwargs.get('conditioning_edit_opcodes', None)
2022-10-12 21:29:48 +00:00
2022-10-18 20:22:47 +00:00
self.setup_cross_attention_control_if_appropriate(self.inner_model, edited_conditioning, conditioning_edit_opcodes)
2022-10-12 21:29:48 +00:00
2022-10-18 09:48:33 +00:00
def forward(self, x, sigma, uncond, cond, cond_scale):
2022-10-18 17:49:25 +00:00
unconditioned_next_x, conditioned_next_x = self.do_cross_attention_controllable_diffusion_step(x, sigma, uncond, cond, self.inner_model,
model_forward_callback=lambda x, sigma, cond: self.inner_model(x, sigma, cond=cond))
2022-10-12 21:29:48 +00:00
if self.warmup < self.warmup_max:
thresh = max(1, 1 + (self.threshold - 1) * (self.warmup / self.warmup_max))
self.warmup += 1
else:
thresh = self.threshold
if thresh > self.threshold:
thresh = self.threshold
2022-10-18 17:49:25 +00:00
# to scale how much effect conditioning has, calculate the changes it does and then scale that
scaled_delta = (conditioned_next_x - unconditioned_next_x) * cond_scale
return cfg_apply_threshold(unconditioned_next_x + scaled_delta, thresh)
class KSampler(Sampler):
def __init__(self, model, schedule='lms', device=None, **kwargs):
denoiser = K.external.CompVisDenoiser(model)
super().__init__(
denoiser,
schedule,
steps=model.num_timesteps,
)
self.sigmas = None
self.ds = None
self.s_in = None
2022-08-21 23:57:48 +00:00
def forward(self, x, sigma, uncond, cond, cond_scale):
x_in = torch.cat([x] * 2)
sigma_in = torch.cat([sigma] * 2)
cond_in = torch.cat([uncond, cond])
uncond, cond = self.inner_model(
x_in, sigma_in, cond=cond_in
).chunk(2)
2022-08-21 23:57:48 +00:00
return uncond + (cond - uncond) * cond_scale
def make_schedule(
self,
ddim_num_steps,
ddim_discretize='uniform',
ddim_eta=0.0,
verbose=False,
):
outer_model = self.model
self.model = outer_model.inner_model
super().make_schedule(
ddim_num_steps,
ddim_discretize='uniform',
ddim_eta=0.0,
verbose=False,
)
2022-10-04 21:48:16 +00:00
self.model = outer_model
self.ddim_num_steps = ddim_num_steps
# we don't need both of these sigmas, but storing them here to make
# comparison easier later on
self.model_sigmas = self.model.get_sigmas(ddim_num_steps)
self.karras_sigmas = K.sampling.get_sigmas_karras(
n=ddim_num_steps,
sigma_min=self.model.sigmas[0].item(),
sigma_max=self.model.sigmas[-1].item(),
rho=7.,
device=self.device,
)
self.sigmas = self.model_sigmas
#self.sigmas = self.karras_sigmas
# ALERT: We are completely overriding the sample() method in the base class, which
# means that inpainting will not work. To get this to work we need to be able to
# modify the inner loop of k_heun, k_lms, etc, as is done in an ugly way
# in the lstein/k-diffusion branch.
@torch.no_grad()
def decode(
self,
z_enc,
cond,
t_enc,
img_callback=None,
unconditional_guidance_scale=1.0,
unconditional_conditioning=None,
use_original_steps=False,
init_latent = None,
mask = None,
):
samples,_ = self.sample(
batch_size = 1,
S = t_enc,
x_T = z_enc,
shape = z_enc.shape[1:],
conditioning = cond,
unconditional_guidance_scale=unconditional_guidance_scale,
unconditional_conditioning = unconditional_conditioning,
img_callback = img_callback,
x0 = init_latent,
mask = mask
)
return samples
# this is a no-op, provided here for compatibility with ddim and plms samplers
@torch.no_grad()
def stochastic_encode(self, x0, t, use_original_steps=False, noise=None):
return x0
# Most of these arguments are ignored and are only present for compatibility with
# other samples
@torch.no_grad()
def sample(
self,
S,
batch_size,
shape,
conditioning=None,
callback=None,
normals_sequence=None,
img_callback=None,
quantize_x0=False,
eta=0.0,
mask=None,
x0=None,
temperature=1.0,
noise_dropout=0.0,
score_corrector=None,
corrector_kwargs=None,
verbose=True,
x_T=None,
log_every_t=100,
unconditional_guidance_scale=1.0,
unconditional_conditioning=None,
edited_conditioning=None,
conditioning_edit_opcodes=None,
threshold = 0,
perlin = 0,
# this has to come in the same format as the conditioning, # e.g. as encoded tokens, ...
**kwargs,
):
2022-08-27 01:10:13 +00:00
def route_callback(k_callback_values):
if img_callback is not None:
2022-10-03 09:38:43 +00:00
img_callback(k_callback_values['x'],k_callback_values['i'])
# if make_schedule() hasn't been called, we do it now
if self.sigmas is None:
self.make_schedule(
ddim_num_steps=S,
ddim_eta = eta,
verbose = False,
)
2022-10-06 18:57:06 +00:00
# sigmas are set up in make_schedule - we take the last steps items
total_steps = len(self.sigmas)
sigmas = self.sigmas[-S-1:]
# x_T is variation noise. When an init image is provided (in x0) we need to add
# more randomness to the starting image.
if x_T is not None:
if x0 is not None:
x = x_T + torch.randn_like(x0, device=self.device) * sigmas[0]
else:
x = x_T * sigmas[0]
2022-10-06 18:57:06 +00:00
else:
x = torch.randn([batch_size, *shape], device=self.device) * sigmas[0]
model_wrap_cfg = CFGDenoiser(self.model, threshold=threshold, warmup=max(0.8*S,S-10))
model_wrap_cfg.prepare_to_sample(S, edited_conditioning=edited_conditioning, conditioning_edit_opcodes=conditioning_edit_opcodes)
extra_args = {
'cond': conditioning,
'uncond': unconditional_conditioning,
'cond_scale': unconditional_guidance_scale,
}
print(f'>> Sampling with k_{self.schedule} starting at step {len(self.sigmas)-S-1} of {len(self.sigmas)-1} ({S} new sampling steps)')
sampling_result = (
K.sampling.__dict__[f'sample_{self.schedule}'](
2022-08-27 01:10:13 +00:00
model_wrap_cfg, x, sigmas, extra_args=extra_args,
callback=route_callback
),
None,
)
return sampling_result
# this code will support inpainting if and when ksampler API modified or
# a workaround is found.
@torch.no_grad()
def p_sample(
self,
img,
cond,
ts,
index,
unconditional_guidance_scale=1.0,
unconditional_conditioning=None,
**kwargs,
):
if self.model_wrap is None:
self.model_wrap = CFGDenoiser(self.model)
extra_args = {
'cond': cond,
'uncond': unconditional_conditioning,
'cond_scale': unconditional_guidance_scale,
}
if self.s_in is None:
self.s_in = img.new_ones([img.shape[0]])
if self.ds is None:
self.ds = []
# terrible, confusing names here
steps = self.ddim_num_steps
t_enc = self.t_enc
# sigmas is a full steps in length, but t_enc might
# be less. We start in the middle of the sigma array
# and work our way to the end after t_enc steps.
# index starts at t_enc and works its way to zero,
# so the actual formula for indexing into sigmas:
# sigma_index = (steps-index)
s_index = t_enc - index - 1
img = K.sampling.__dict__[f'_{self.schedule}'](
self.model_wrap,
img,
self.sigmas,
s_index,
s_in = self.s_in,
ds = self.ds,
extra_args=extra_args,
)
return img, None, None
2022-10-06 18:57:06 +00:00
# REVIEW THIS METHOD: it has never been tested. In particular,
# we should not be multiplying by self.sigmas[0] if we
# are at an intermediate step in img2img. See similar in
# sample() which does work.
def get_initial_image(self,x_T,shape,steps):
print(f'WARNING: ksampler.get_initial_image(): get_initial_image needs testing')
2022-10-06 18:57:06 +00:00
x = (torch.randn(shape, device=self.device) * self.sigmas[0])
if x_T is not None:
2022-10-06 18:57:06 +00:00
return x_T + x
else:
2022-10-06 18:57:06 +00:00
return x
def prepare_to_sample(self,t_enc,**kwargs):
self.t_enc = t_enc
self.model_wrap = None
self.ds = None
self.s_in = None
def q_sample(self,x0,ts):
'''
Overrides parent method to return the q_sample of the inner model.
'''
return self.model.inner_model.q_sample(x0,ts)