Refactored most of controlnet code into its own method to declutter TextToLatents.invoke(), and make upcoming integration with LatentsToLatents easier.

This commit is contained in:
user1 2023-05-17 17:23:21 -07:00 committed by Kent Keirsey
parent af60304f97
commit a9007c7e0f

View File

@ -179,6 +179,7 @@ class TextToLatentsInvocation(BaseInvocation):
# seamless_axes: str = Field(default="", description="The axes to tile the image on, 'x' and/or 'y'")
progress_images: bool = Field(default=False, description="Whether or not to produce progress images during generation", )
control: list[ControlField] = Field(default=None, description="The controlnet(s) to use")
# control: ControlField | List[ControlField] = Field(default=None, description="The controlnet(s) to use")
# fmt: on
# Schema customisation
@ -246,43 +247,32 @@ class TextToLatentsInvocation(BaseInvocation):
).add_scheduler_args_if_applicable(model.scheduler, eta=0.0)#ddim_eta)
return conditioning_data
def invoke(self, context: InvocationContext) -> LatentsOutput:
noise = context.services.latents.get(self.noise.latents_name)
latents_shape = noise.shape
def prep_control_data(self,
context: InvocationContext,
model: StableDiffusionGeneratorPipeline, # really only need model for dtype and device
control_input: List[ControlField],
latents_shape: List[int],
do_classifier_free_guidance: bool = True,
) -> List[ControlNetData]:
# assuming fixed dimensional scaling of 8:1 for image:latents
control_height_resize = latents_shape[2] * 8
control_width_resize = latents_shape[3] * 8
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(context.graph_execution_state_id)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
def step_callback(state: PipelineIntermediateState):
self.dispatch_progress(context, source_node_id, state)
model = self.get_model(context.services.model_manager)
conditioning_data = self.get_conditioning_data(context, model)
# print("type of control input: ", type(self.control))
if self.control is None:
if control_input is None:
# print("control input is None")
control_list = None
elif isinstance(self.control, list) and len(self.control) == 0:
elif isinstance(control_input, list) and len(control_input) == 0:
# print("control input is empty list")
control_list = None
elif isinstance(self.control, ControlField):
elif isinstance(control_input, ControlField):
# print("control input is ControlField")
control_list = [self.control]
elif isinstance(self.control, list) and len(self.control) > 0 and isinstance(self.control[0], ControlField):
control_list = [control_input]
elif isinstance(control_input, list) and len(control_input) > 0 and isinstance(control_input[0], ControlField):
# print("control input is list[ControlField]")
control_list = self.control
control_list = control_input
else:
#print("input control is unrecognized:", type(self.control))
# print("input control is unrecognized:", type(self.control))
control_list = None
#if (self.control is None or (isinstance(self.control, list) and len(self.control)==0)):
if (control_list is None):
control_models = None
control_data = None
# from above handling, any control that is not None should now be of type list[ControlField]
else:
@ -307,8 +297,8 @@ class TextToLatentsInvocation(BaseInvocation):
# prepare_control_image should return torch.Tensor of shape(batch_size, 3, height, width)
control_image = model.prepare_control_image(
image=input_image,
# do_classifier_free_guidance=do_classifier_free_guidance,
do_classifier_free_guidance=True,
do_classifier_free_guidance=do_classifier_free_guidance,
# do_classifier_free_guidance=True,
width=control_width_resize,
height=control_height_resize,
# batch_size=batch_size * num_images_per_prompt,
@ -322,10 +312,33 @@ class TextToLatentsInvocation(BaseInvocation):
begin_step_percent=control_info.begin_step_percent,
end_step_percent=control_info.end_step_percent)
control_data.append(control_item)
# multi_control = MultiControlNetModel(control_models)
# multi_control = MultiControlNetModel(control_models) # no longer need MultiControlNetModel
# model.control_model = multi_control
# model.control_model = control_models
return control_data
def invoke(self, context: InvocationContext) -> LatentsOutput:
noise = context.services.latents.get(self.noise.latents_name)
# latents_shape = noise.shape
# assuming fixed dimensional scaling of 8:1 for image:latents
# control_height_resize = latents_shape[2] * 8
# control_width_resize = latents_shape[3] * 8
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(context.graph_execution_state_id)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
def step_callback(state: PipelineIntermediateState):
self.dispatch_progress(context, source_node_id, state)
model = self.get_model(context.services.model_manager)
conditioning_data = self.get_conditioning_data(context, model)
print("type of control input: ", type(self.control))
control_data = self.prep_control_data(model=model, context=context, control_input=self.control,
latents_shape=noise.shape,
do_classifier_free_guidance=(self.cfg_scale >= 1.0),
)
# TODO: Verify the noise is the right size
result_latents, result_attention_map_saver = model.latents_from_embeddings(