Compare commits

..

1 Commits

Author SHA1 Message Date
a6efcca78c illustration of two generate alternatives 2023-02-26 12:22:32 -05:00
1261 changed files with 87923 additions and 93409 deletions

6
.coveragerc Normal file
View File

@ -0,0 +1,6 @@
[run]
omit='.env/*'
source='.'
[report]
show_missing = true

View File

@ -4,22 +4,22 @@
!ldm
!pyproject.toml
# ignore frontend/web but whitelist dist
invokeai/frontend/web/
!invokeai/frontend/web/dist/
# Guard against pulling in any models that might exist in the directory tree
**/*.pt*
**/*.ckpt
# ignore frontend but whitelist dist
invokeai/frontend/
!invokeai/frontend/dist/
# ignore invokeai/assets but whitelist invokeai/assets/web
invokeai/assets/
!invokeai/assets/web/
# Guard against pulling in any models that might exist in the directory tree
**/*.pt*
**/*.ckpt
# Byte-compiled / optimized / DLL files
**/__pycache__/
**/*.py[cod]
# Distribution / packaging
**/*.egg-info/
**/*.egg
*.egg-info/
*.egg

View File

@ -1 +0,0 @@
b3dccfaeb636599c02effc377cdd8a87d658256c

53
.github/CODEOWNERS vendored
View File

@ -3,32 +3,49 @@
# documentation
/docs/ @lstein @mauwii @tildebyte @blessedcoolant
/mkdocs.yml @lstein @mauwii @blessedcoolant
# nodes
/invokeai/app/ @Kyle0654 @blessedcoolant
mkdocs.yml @lstein @mauwii @blessedcoolant
# installation and configuration
/pyproject.toml @mauwii @lstein @blessedcoolant
/pyproject.toml @mauwii @lstein @ebr @blessedcoolant
/docker/ @mauwii @lstein @blessedcoolant
/scripts/ @ebr @lstein
/installer/ @lstein @ebr
/invokeai/assets @lstein @ebr
/invokeai/configs @lstein
/invokeai/version @lstein @blessedcoolant
/scripts/ @ebr @lstein @blessedcoolant
/installer/ @ebr @lstein @tildebyte @blessedcoolant
ldm/invoke/config @lstein @ebr @blessedcoolant
invokeai/assets @lstein @ebr @blessedcoolant
invokeai/configs @lstein @ebr @blessedcoolant
/ldm/invoke/_version.py @lstein @blessedcoolant
# web ui
/invokeai/frontend @blessedcoolant @psychedelicious @lstein
/invokeai/backend @blessedcoolant @psychedelicious @lstein
# generation, model management, postprocessing
/invokeai/backend @keturn @damian0815 @lstein @blessedcoolant @jpphoto
# generation and model management
/ldm/*.py @lstein @blessedcoolant
/ldm/generate.py @lstein @keturn @blessedcoolant
/ldm/invoke/args.py @lstein @blessedcoolant
/ldm/invoke/ckpt* @lstein @blessedcoolant
/ldm/invoke/ckpt_generator @lstein @blessedcoolant
/ldm/invoke/CLI.py @lstein @blessedcoolant
/ldm/invoke/config @lstein @ebr @mauwii @blessedcoolant
/ldm/invoke/generator @keturn @damian0815 @blessedcoolant
/ldm/invoke/globals.py @lstein @blessedcoolant
/ldm/invoke/merge_diffusers.py @lstein @blessedcoolant
/ldm/invoke/model_manager.py @lstein @blessedcoolant
/ldm/invoke/txt2mask.py @lstein @blessedcoolant
/ldm/invoke/patchmatch.py @Kyle0654 @blessedcoolant @lstein
/ldm/invoke/restoration @lstein @blessedcoolant
# front ends
/invokeai/frontend/CLI @lstein
/invokeai/frontend/install @lstein @ebr @mauwii
/invokeai/frontend/merge @lstein @blessedcoolant @hipsterusername
/invokeai/frontend/training @lstein @blessedcoolant @hipsterusername
/invokeai/frontend/web @psychedelicious @blessedcoolant
# attention, textual inversion, model configuration
/ldm/models @damian0815 @keturn @lstein @blessedcoolant
/ldm/modules @damian0815 @keturn @lstein @blessedcoolant
# Nodes
apps/ @Kyle0654 @lstein @blessedcoolant
# legacy REST API
# is CapableWeb still engaged?
/ldm/invoke/pngwriter.py @CapableWeb @lstein @blessedcoolant
/ldm/invoke/server_legacy.py @CapableWeb @lstein @blessedcoolant
/scripts/legacy_api.py @CapableWeb @lstein @blessedcoolant
/tests/legacy_tests.sh @CapableWeb @lstein @blessedcoolant

View File

@ -65,16 +65,6 @@ body:
placeholder: 8GB
validations:
required: false
- type: input
id: version-number
attributes:
label: What version did you experience this issue on?
description: |
Please share the version of Invoke AI that you experienced the issue on. If this is not the latest version, please update first to confirm the issue still exists. If you are testing main, please include the commit hash instead.
placeholder: X.X.X
validations:
required: true
- type: textarea
id: what-happened

19
.github/stale.yaml vendored
View File

@ -1,19 +0,0 @@
# Number of days of inactivity before an issue becomes stale
daysUntilStale: 28
# Number of days of inactivity before a stale issue is closed
daysUntilClose: 14
# Issues with these labels will never be considered stale
exemptLabels:
- pinned
- security
# Label to use when marking an issue as stale
staleLabel: stale
# Comment to post when marking an issue as stale. Set to `false` to disable
markComment: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. Please
update the ticket if this is still a problem on the latest release.
# Comment to post when closing a stale issue. Set to `false` to disable
closeComment: >
Due to inactivity, this issue has been automatically closed. If this is
still a problem on the latest release, please recreate the issue.

View File

@ -5,20 +5,17 @@ on:
- 'main'
- 'update/ci/docker/*'
- 'update/docker/*'
- 'dev/ci/docker/*'
- 'dev/docker/*'
paths:
- 'pyproject.toml'
- '.dockerignore'
- 'invokeai/**'
- 'ldm/**'
- 'invokeai/backend/**'
- 'invokeai/configs/**'
- 'invokeai/frontend/dist/**'
- 'docker/Dockerfile'
tags:
- 'v*.*.*'
workflow_dispatch:
permissions:
contents: write
packages: write
jobs:
docker:
@ -27,11 +24,11 @@ jobs:
fail-fast: false
matrix:
flavor:
- rocm
- amd
- cuda
- cpu
include:
- flavor: rocm
- flavor: amd
pip-extra-index-url: 'https://download.pytorch.org/whl/rocm5.2'
- flavor: cuda
pip-extra-index-url: ''
@ -57,9 +54,9 @@ jobs:
tags: |
type=ref,event=branch
type=ref,event=tag
type=pep440,pattern={{version}}
type=pep440,pattern={{major}}.{{minor}}
type=pep440,pattern={{major}}
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha,enable=true,prefix=sha-,format=short
flavor: |
latest=${{ matrix.flavor == 'cuda' && github.ref == 'refs/heads/main' }}
@ -95,7 +92,7 @@ jobs:
context: .
file: ${{ env.DOCKERFILE }}
platforms: ${{ env.PLATFORMS }}
push: ${{ github.ref == 'refs/heads/main' || github.ref_type == 'tag' }}
push: ${{ github.ref == 'refs/heads/main' || github.ref == 'refs/tags/*' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: PIP_EXTRA_INDEX_URL=${{ matrix.pip-extra-index-url }}

View File

@ -1,27 +0,0 @@
name: Close inactive issues
on:
schedule:
- cron: "00 6 * * *"
env:
DAYS_BEFORE_ISSUE_STALE: 14
DAYS_BEFORE_ISSUE_CLOSE: 28
jobs:
close-issues:
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v5
with:
days-before-issue-stale: ${{ env.DAYS_BEFORE_ISSUE_STALE }}
days-before-issue-close: ${{ env.DAYS_BEFORE_ISSUE_CLOSE }}
stale-issue-label: "Inactive Issue"
stale-issue-message: "There has been no activity in this issue for ${{ env.DAYS_BEFORE_ISSUE_STALE }} days. If this issue is still being experienced, please reply with an updated confirmation that the issue is still being experienced with the latest release."
close-issue-message: "Due to inactivity, this issue was automatically closed. If you are still experiencing the issue, please recreate the issue."
days-before-pr-stale: -1
days-before-pr-close: -1
repo-token: ${{ secrets.GITHUB_TOKEN }}
operations-per-run: 500

View File

@ -3,22 +3,14 @@ name: Lint frontend
on:
pull_request:
paths:
- 'invokeai/frontend/web/**'
types:
- 'ready_for_review'
- 'opened'
- 'synchronize'
- 'invokeai/frontend/**'
push:
branches:
- 'main'
paths:
- 'invokeai/frontend/web/**'
merge_group:
workflow_dispatch:
- 'invokeai/frontend/**'
defaults:
run:
working-directory: invokeai/frontend/web
working-directory: invokeai/frontend
jobs:
lint-frontend:
@ -31,7 +23,7 @@ jobs:
node-version: '18'
- uses: actions/checkout@v3
- run: 'yarn install --frozen-lockfile'
- run: 'yarn run lint:tsc'
- run: 'yarn run lint:madge'
- run: 'yarn run lint:eslint'
- run: 'yarn run lint:prettier'
- run: 'yarn tsc'
- run: 'yarn run madge'
- run: 'yarn run lint --max-warnings=0'
- run: 'yarn run prettier --check'

View File

@ -5,9 +5,6 @@ on:
- 'main'
- 'development'
permissions:
contents: write
jobs:
mkdocs-material:
if: github.event.pull_request.draft == false

View File

@ -3,7 +3,7 @@ name: PyPI Release
on:
push:
paths:
- 'invokeai/version/invokeai_version.py'
- 'ldm/invoke/_version.py'
workflow_dispatch:
jobs:

View File

@ -1,11 +1,12 @@
name: Test invoke.py pip
on:
pull_request:
paths:
- '**'
- '!pyproject.toml'
- '!invokeai/**'
- 'invokeai/frontend/web/**'
paths-ignore:
- 'pyproject.toml'
- 'ldm/**'
- 'invokeai/backend/**'
- 'invokeai/configs/**'
- 'invokeai/frontend/dist/**'
merge_group:
workflow_dispatch:

View File

@ -5,13 +5,17 @@ on:
- 'main'
paths:
- 'pyproject.toml'
- 'invokeai/**'
- '!invokeai/frontend/web/**'
- 'ldm/**'
- 'invokeai/backend/**'
- 'invokeai/configs/**'
- 'invokeai/frontend/dist/**'
pull_request:
paths:
- 'pyproject.toml'
- 'invokeai/**'
- '!invokeai/frontend/web/**'
- 'ldm/**'
- 'invokeai/backend/**'
- 'invokeai/configs/**'
- 'invokeai/frontend/dist/**'
types:
- 'ready_for_review'
- 'opened'
@ -108,7 +112,7 @@ jobs:
- name: set INVOKEAI_OUTDIR
run: >
python -c
"import os;from invokeai.backend.globals import Globals;OUTDIR=os.path.join(Globals.root,str('outputs'));print(f'INVOKEAI_OUTDIR={OUTDIR}')"
"import os;from ldm.invoke.globals import Globals;OUTDIR=os.path.join(Globals.root,str('outputs'));print(f'INVOKEAI_OUTDIR={OUTDIR}')"
>> ${{ matrix.github-env }}
- name: run invokeai-configure

12
.gitignore vendored
View File

@ -63,7 +63,6 @@ pip-delete-this-directory.txt
htmlcov/
.tox/
.nox/
.coveragerc
.coverage
.coverage.*
.cache
@ -74,7 +73,6 @@ cov.xml
*.py,cover
.hypothesis/
.pytest_cache/
.pytest.ini
cover/
junit/
@ -200,7 +198,7 @@ checkpoints
.DS_Store
# Let the frontend manage its own gitignore
!invokeai/frontend/web/*
!invokeai/frontend/*
# Scratch folder
.scratch/
@ -215,6 +213,11 @@ gfpgan/
# config file (will be created by installer)
configs/models.yaml
# weights (will be created by installer)
models/ldm/stable-diffusion-v1/*.ckpt
models/clipseg
models/gfpgan
# ignore initfile
.invokeai
@ -229,3 +232,6 @@ installer/install.bat
installer/install.sh
installer/update.bat
installer/update.sh
# no longer stored in source directory
models

5
.pytest.ini Normal file
View File

@ -0,0 +1,5 @@
[pytest]
DJANGO_SETTINGS_MODULE = webtas.settings
; python_files = tests.py test_*.py *_tests.py
addopts = --cov=. --cov-config=.coveragerc --cov-report xml:cov.xml

View File

@ -84,7 +84,7 @@ installing lots of models.
6. Wait while the installer does its thing. After installing the software,
the installer will launch a script that lets you configure InvokeAI and
select a set of starting image generation models.
select a set of starting image generaiton models.
7. Find the folder that InvokeAI was installed into (it is not the
same as the unpacked zip file directory!) The default location of this
@ -139,13 +139,13 @@ not supported.
_For Windows/Linux with an NVIDIA GPU:_
```terminal
pip install "InvokeAI[xformers]" --use-pep517 --extra-index-url https://download.pytorch.org/whl/cu117
pip install InvokeAI[xformers] --use-pep517 --extra-index-url https://download.pytorch.org/whl/cu117
```
_For Linux with an AMD GPU:_
```sh
pip install InvokeAI --use-pep517 --extra-index-url https://download.pytorch.org/whl/rocm5.4.2
pip install InvokeAI --use-pep517 --extra-index-url https://download.pytorch.org/whl/rocm5.2
```
_For Macintoshes, either Intel or M1/M2:_

4
coverage/.gitignore vendored
View File

@ -1,4 +0,0 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

View File

@ -4,15 +4,15 @@ ARG PYTHON_VERSION=3.9
##################
## base image ##
##################
FROM --platform=${TARGETPLATFORM} python:${PYTHON_VERSION}-slim AS python-base
FROM python:${PYTHON_VERSION}-slim AS python-base
LABEL org.opencontainers.image.authors="mauwii@outlook.de"
# Prepare apt for buildkit cache
# prepare for buildkit cache
RUN rm -f /etc/apt/apt.conf.d/docker-clean \
&& echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' >/etc/apt/apt.conf.d/keep-cache
# Install dependencies
# Install necessary packages
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
@ -23,7 +23,7 @@ RUN \
libglib2.0-0=2.66.* \
libopencv-dev=4.5.*
# Set working directory and env
# set working directory and env
ARG APPDIR=/usr/src
ARG APPNAME=InvokeAI
WORKDIR ${APPDIR}
@ -32,7 +32,7 @@ ENV PATH ${APPDIR}/${APPNAME}/bin:$PATH
ENV PYTHONDONTWRITEBYTECODE 1
# Turns off buffering for easier container logging
ENV PYTHONUNBUFFERED 1
# Don't fall back to legacy build system
# don't fall back to legacy build system
ENV PIP_USE_PEP517=1
#######################
@ -40,7 +40,7 @@ ENV PIP_USE_PEP517=1
#######################
FROM python-base AS pyproject-builder
# Install build dependencies
# Install dependencies
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
@ -51,30 +51,26 @@ RUN \
gcc=4:10.2.* \
python3-dev=3.9.*
# Prepare pip for buildkit cache
# prepare pip for buildkit cache
ARG PIP_CACHE_DIR=/var/cache/buildkit/pip
ENV PIP_CACHE_DIR ${PIP_CACHE_DIR}
RUN mkdir -p ${PIP_CACHE_DIR}
# Create virtual environment
RUN --mount=type=cache,target=${PIP_CACHE_DIR} \
# create virtual environment
RUN --mount=type=cache,target=${PIP_CACHE_DIR},sharing=locked \
python3 -m venv "${APPNAME}" \
--upgrade-deps
# Install requirements
COPY --link pyproject.toml .
COPY --link invokeai/version/invokeai_version.py invokeai/version/__init__.py invokeai/version/
# copy sources
COPY --link . .
# install pyproject.toml
ARG PIP_EXTRA_INDEX_URL
ENV PIP_EXTRA_INDEX_URL ${PIP_EXTRA_INDEX_URL}
RUN --mount=type=cache,target=${PIP_CACHE_DIR} \
"${APPNAME}"/bin/pip install .
# Install pyproject.toml
COPY --link . .
RUN --mount=type=cache,target=${PIP_CACHE_DIR} \
RUN --mount=type=cache,target=${PIP_CACHE_DIR},sharing=locked \
"${APPNAME}/bin/pip" install .
# Build patchmatch
# build patchmatch
RUN python3 -c "from patchmatch import patch_match"
#####################
@ -90,14 +86,14 @@ RUN useradd \
-U \
"${UNAME}"
# Create volume directory
# create volume directory
ARG VOLUME_DIR=/data
RUN mkdir -p "${VOLUME_DIR}" \
&& chown -hR "${UNAME}:${UNAME}" "${VOLUME_DIR}"
&& chown -R "${UNAME}" "${VOLUME_DIR}"
# Setup runtime environment
USER ${UNAME}:${UNAME}
COPY --chown=${UNAME}:${UNAME} --from=pyproject-builder ${APPDIR}/${APPNAME} ${APPNAME}
# setup runtime environment
USER ${UNAME}
COPY --chown=${UNAME} --from=pyproject-builder ${APPDIR}/${APPNAME} ${APPNAME}
ENV INVOKEAI_ROOT ${VOLUME_DIR}
ENV TRANSFORMERS_CACHE ${VOLUME_DIR}/.cache
ENV INVOKE_MODEL_RECONFIGURE "--yes --default_only"

View File

@ -41,7 +41,7 @@ else
fi
# Build Container
docker build \
DOCKER_BUILDKIT=1 docker build \
--platform="${PLATFORM:-linux/amd64}" \
--tag="${CONTAINER_IMAGE:-invokeai}" \
${CONTAINER_FLAVOR:+--build-arg="CONTAINER_FLAVOR=${CONTAINER_FLAVOR}"} \

View File

@ -49,6 +49,3 @@ CONTAINER_FLAVOR="${CONTAINER_FLAVOR-cuda}"
CONTAINER_TAG="${CONTAINER_TAG-"${INVOKEAI_BRANCH##*/}-${CONTAINER_FLAVOR}"}"
CONTAINER_IMAGE="${CONTAINER_REGISTRY}/${CONTAINER_REPOSITORY}:${CONTAINER_TAG}"
CONTAINER_IMAGE="${CONTAINER_IMAGE,,}"
# enable docker buildkit
export DOCKER_BUILDKIT=1

View File

@ -21,10 +21,10 @@ docker run \
--tty \
--rm \
--platform="${PLATFORM}" \
--name="${REPOSITORY_NAME}" \
--hostname="${REPOSITORY_NAME}" \
--mount type=volume,volume-driver=local,source="${VOLUMENAME}",target=/data \
--mount type=bind,source="$(pwd)"/outputs/,target=/data/outputs/ \
--name="${REPOSITORY_NAME,,}" \
--hostname="${REPOSITORY_NAME,,}" \
--mount=source="${VOLUMENAME}",target=/data \
--mount type=bind,source="$(pwd)"/outputs,target=/data/outputs \
${MODELSPATH:+--mount="type=bind,source=${MODELSPATH},target=/data/models"} \
${HUGGING_FACE_HUB_TOKEN:+--env="HUGGING_FACE_HUB_TOKEN=${HUGGING_FACE_HUB_TOKEN}"} \
--publish=9090:9090 \
@ -32,7 +32,7 @@ docker run \
${GPU_FLAGS:+--gpus="${GPU_FLAGS}"} \
"${CONTAINER_IMAGE}" ${@:+$@}
echo -e "\nCleaning trash folder ..."
# Remove Trash folder
for f in outputs/.Trash*; do
if [ -e "$f" ]; then
rm -Rf "$f"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 470 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 457 KiB

View File

@ -1,18 +1,10 @@
# Invocations
Invocations represent a single operation, its inputs, and its outputs. These
operations and their outputs can be chained together to generate and modify
images.
Invocations represent a single operation, its inputs, and its outputs. These operations and their outputs can be chained together to generate and modify images.
## Creating a new invocation
To create a new invocation, either find the appropriate module file in
`/ldm/invoke/app/invocations` to add your invocation to, or create a new one in
that folder. All invocations in that folder will be discovered and made
available to the CLI and API automatically. Invocations make use of
[typing](https://docs.python.org/3/library/typing.html) and
[pydantic](https://pydantic-docs.helpmanual.io/) for validation and integration
into the CLI and API.
To create a new invocation, either find the appropriate module file in `/ldm/invoke/app/invocations` to add your invocation to, or create a new one in that folder. All invocations in that folder will be discovered and made available to the CLI and API automatically. Invocations make use of [typing](https://docs.python.org/3/library/typing.html) and [pydantic](https://pydantic-docs.helpmanual.io/) for validation and integration into the CLI and API.
An invocation looks like this:
@ -49,54 +41,34 @@ class UpscaleInvocation(BaseInvocation):
Each portion is important to implement correctly.
### Class definition and type
```py
class UpscaleInvocation(BaseInvocation):
"""Upscales an image."""
type: Literal['upscale'] = 'upscale'
```
All invocations must derive from `BaseInvocation`. They should have a docstring
that declares what they do in a single, short line. They should also have a
`type` with a type hint that's `Literal["command_name"]`, where `command_name`
is what the user will type on the CLI or use in the API to create this
invocation. The `command_name` must be unique. The `type` must be assigned to
the value of the literal in the type hint.
All invocations must derive from `BaseInvocation`. They should have a docstring that declares what they do in a single, short line. They should also have a `type` with a type hint that's `Literal["command_name"]`, where `command_name` is what the user will type on the CLI or use in the API to create this invocation. The `command_name` must be unique. The `type` must be assigned to the value of the literal in the type hint.
### Inputs
```py
# Inputs
image: Union[ImageField,None] = Field(description="The input image")
strength: float = Field(default=0.75, gt=0, le=1, description="The strength")
level: Literal[2,4] = Field(default=2, description="The upscale level")
```
Inputs consist of three parts: a name, a type hint, and a `Field` with default, description, and validation information. For example:
| Part | Value | Description |
| ---- | ----- | ----------- |
| Name | `strength` | This field is referred to as `strength` |
| Type Hint | `float` | This field must be of type `float` |
| Field | `Field(default=0.75, gt=0, le=1, description="The strength")` | The default value is `0.75`, the value must be in the range (0,1], and help text will show "The strength" for this field. |
Inputs consist of three parts: a name, a type hint, and a `Field` with default,
description, and validation information. For example:
Notice that `image` has type `Union[ImageField,None]`. The `Union` allows this field to be parsed with `None` as a value, which enables linking to previous invocations. All fields should either provide a default value or allow `None` as a value, so that they can be overwritten with a linked output from another invocation.
| Part | Value | Description |
| --------- | ------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------- |
| Name | `strength` | This field is referred to as `strength` |
| Type Hint | `float` | This field must be of type `float` |
| Field | `Field(default=0.75, gt=0, le=1, description="The strength")` | The default value is `0.75`, the value must be in the range (0,1], and help text will show "The strength" for this field. |
The special type `ImageField` is also used here. All images are passed as `ImageField`, which protects them from pydantic validation errors (since images only ever come from links).
Notice that `image` has type `Union[ImageField,None]`. The `Union` allows this
field to be parsed with `None` as a value, which enables linking to previous
invocations. All fields should either provide a default value or allow `None` as
a value, so that they can be overwritten with a linked output from another
invocation.
The special type `ImageField` is also used here. All images are passed as
`ImageField`, which protects them from pydantic validation errors (since images
only ever come from links).
Finally, note that for all linking, the `type` of the linked fields must match.
If the `name` also matches, then the field can be **automatically linked** to a
previous invocation by name and matching.
Finally, note that for all linking, the `type` of the linked fields must match. If the `name` also matches, then the field can be **automatically linked** to a previous invocation by name and matching.
### Invoke Function
```py
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(self.image.image_type, self.image.image_name)
@ -116,22 +88,13 @@ previous invocation by name and matching.
image = ImageField(image_type = image_type, image_name = image_name)
)
```
The `invoke` function is the last portion of an invocation. It is provided an `InvocationContext` which contains services to perform work as well as a `session_id` for use as needed. It should return a class with output values that derives from `BaseInvocationOutput`.
The `invoke` function is the last portion of an invocation. It is provided an
`InvocationContext` which contains services to perform work as well as a
`session_id` for use as needed. It should return a class with output values that
derives from `BaseInvocationOutput`.
Before being called, the invocation will have all of its fields set from defaults, inputs, and finally links (overriding in that order).
Before being called, the invocation will have all of its fields set from
defaults, inputs, and finally links (overriding in that order).
Assume that this invocation may be running simultaneously with other
invocations, may be running on another machine, or in other interesting
scenarios. If you need functionality, please provide it as a service in the
`InvocationServices` class, and make sure it can be overridden.
Assume that this invocation may be running simultaneously with other invocations, may be running on another machine, or in other interesting scenarios. If you need functionality, please provide it as a service in the `InvocationServices` class, and make sure it can be overridden.
### Outputs
```py
class ImageOutput(BaseInvocationOutput):
"""Base class for invocations that output an image"""
@ -139,64 +102,4 @@ class ImageOutput(BaseInvocationOutput):
image: ImageField = Field(default=None, description="The output image")
```
Output classes look like an invocation class without the invoke method. Prefer
to use an existing output class if available, and prefer to name inputs the same
as outputs when possible, to promote automatic invocation linking.
## Schema Generation
Invocation, output and related classes are used to generate an OpenAPI schema.
### Required Properties
The schema generation treat all properties with default values as optional. This
makes sense internally, but when when using these classes via the generated
schema, we end up with e.g. the `ImageOutput` class having its `image` property
marked as optional.
We know that this property will always be present, so the additional logic
needed to always check if the property exists adds a lot of extraneous cruft.
To fix this, we can leverage `pydantic`'s
[schema customisation](https://docs.pydantic.dev/usage/schema/#schema-customization)
to mark properties that we know will always be present as required.
Here's that `ImageOutput` class, without the needed schema customisation:
```python
class ImageOutput(BaseInvocationOutput):
"""Base class for invocations that output an image"""
type: Literal["image"] = "image"
image: ImageField = Field(default=None, description="The output image")
```
The generated OpenAPI schema, and all clients/types generated from it, will have
the `type` and `image` properties marked as optional, even though we know they
will always have a value by the time we can interact with them via the API.
Here's the same class, but with the schema customisation added:
```python
class ImageOutput(BaseInvocationOutput):
"""Base class for invocations that output an image"""
type: Literal["image"] = "image"
image: ImageField = Field(default=None, description="The output image")
class Config:
schema_extra = {
'required': [
'type',
'image',
]
}
```
The resultant schema (and any API client or types generated from it) will now
have see `type` as string literal `"image"` and `image` as an `ImageField`
object.
See this `pydantic` issue for discussion on this solution:
<https://github.com/pydantic/pydantic/discussions/4577>
Output classes look like an invocation class without the invoke method. Prefer to use an existing output class if available, and prefer to name inputs the same as outputs when possible, to promote automatic invocation linking.

View File

@ -1,83 +0,0 @@
# Local Development
If you are looking to contribute you will need to have a local development
environment. See the
[Developer Install](../installation/020_INSTALL_MANUAL.md#developer-install) for
full details.
Broadly this involves cloning the repository, installing the pre-reqs, and
InvokeAI (in editable form). Assuming this is working, choose your area of
focus.
## Documentation
We use [mkdocs](https://www.mkdocs.org) for our documentation with the
[material theme](https://squidfunk.github.io/mkdocs-material/). Documentation is
written in markdown files under the `./docs` folder and then built into a static
website for hosting with GitHub Pages at
[invoke-ai.github.io/InvokeAI](https://invoke-ai.github.io/InvokeAI).
To contribute to the documentation you'll need to install the dependencies. Note
the use of `"`.
```zsh
pip install ".[docs]"
```
Now, to run the documentation locally with hot-reloading for changes made.
```zsh
mkdocs serve
```
You'll then be prompted to connect to `http://127.0.0.1:8080` in order to
access.
## Backend
The backend is contained within the `./invokeai/backend` folder structure. To
get started however please install the development dependencies.
From the root of the repository run the following command. Note the use of `"`.
```zsh
pip install ".[test]"
```
This in an optional group of packages which is defined within the
`pyproject.toml` and will be required for testing the changes you make the the
code.
### Running Tests
We use [pytest](https://docs.pytest.org/en/7.2.x/) for our test suite. Tests can
be found under the `./tests` folder and can be run with a single `pytest`
command. Optionally, to review test coverage you can append `--cov`.
```zsh
pytest --cov
```
Test outcomes and coverage will be reported in the terminal. In addition a more
detailed report is created in both XML and HTML format in the `./coverage`
folder. The HTML one in particular can help identify missing statements
requiring tests to ensure coverage. This can be run by opening
`./coverage/html/index.html`.
For example.
```zsh
pytest --cov; open ./coverage/html/index.html
```
??? info "HTML coverage report output"
![html-overview](../assets/contributing/html-overview.png)
![html-detail](../assets/contributing/html-detail.png)
## Front End
<!--#TODO: get input from blessedcoolant here, for the moment inserted the frontend README via snippets extension.-->
--8<-- "invokeai/frontend/web/README.md"

View File

@ -168,15 +168,11 @@ used by Stable Diffusion 1.4 and 1.5.
After installation, your `models.yaml` should contain an entry that looks like
this one:
```yml
inpainting-1.5:
weights: models/ldm/stable-diffusion-v1/sd-v1-5-inpainting.ckpt
description: SD inpainting v1.5
config: configs/stable-diffusion/v1-inpainting-inference.yaml
vae: models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt
width: 512
height: 512
```
inpainting-1.5: weights: models/ldm/stable-diffusion-v1/sd-v1-5-inpainting.ckpt
description: SD inpainting v1.5 config:
configs/stable-diffusion/v1-inpainting-inference.yaml vae:
models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt width: 512
height: 512
As shown in the example, you may include a VAE fine-tuning weights file as well.
This is strongly recommended.

View File

@ -268,7 +268,7 @@ model is so good at inpainting, a good substitute is to use the `clipseg` text
masking option:
```bash
invoke> a fluffy cat eating a hotdog
invoke> a fluffy cat eating a hotdot
Outputs:
[1010] outputs/000025.2182095108.png: a fluffy cat eating a hotdog
invoke> a smiling dog eating a hotdog -I 000025.2182095108.png -tm cat

View File

@ -17,7 +17,7 @@ notebooks.
You will need a GPU to perform training in a reasonable length of
time, and at least 12 GB of VRAM. We recommend using the [`xformers`
library](../installation/070_INSTALL_XFORMERS.md) to accelerate the
library](../installation/070_INSTALL_XFORMERS) to accelerate the
training process further. During training, about ~8 GB is temporarily
needed in order to store intermediate models, checkpoints and logs.

View File

@ -417,7 +417,7 @@ Then type the following commands:
=== "AMD System"
```bash
pip install torch torchvision --force-reinstall --extra-index-url https://download.pytorch.org/whl/rocm5.4.2
pip install torch torchvision --force-reinstall --extra-index-url https://download.pytorch.org/whl/rocm5.2
```
### Corrupted configuration file

View File

@ -148,13 +148,13 @@ manager, please follow these steps:
=== "CUDA (NVidia)"
```bash
pip install "InvokeAI[xformers]" --use-pep517 --extra-index-url https://download.pytorch.org/whl/cu117
pip install InvokeAI[xformers] --use-pep517 --extra-index-url https://download.pytorch.org/whl/cu117
```
=== "ROCm (AMD)"
```bash
pip install InvokeAI --use-pep517 --extra-index-url https://download.pytorch.org/whl/rocm5.4.2
pip install InvokeAI --use-pep517 --extra-index-url https://download.pytorch.org/whl/rocm5.2
```
=== "CPU (Intel Macs & non-GPU systems)"
@ -315,7 +315,7 @@ installation protocol (important!)
=== "ROCm (AMD)"
```bash
pip install -e . --use-pep517 --extra-index-url https://download.pytorch.org/whl/rocm5.4.2
pip install -e . --use-pep517 --extra-index-url https://download.pytorch.org/whl/rocm5.2
```
=== "CPU (Intel Macs & non-GPU systems)"

View File

@ -110,7 +110,7 @@ recipes are available
When installing torch and torchvision manually with `pip`, remember to provide
the argument `--extra-index-url
https://download.pytorch.org/whl/rocm5.4.2` as described in the [Manual
https://download.pytorch.org/whl/rocm5.2` as described in the [Manual
Installation Guide](020_INSTALL_MANUAL.md).
This will be done automatically for you if you use the installer

View File

@ -50,7 +50,7 @@ subset that are currently installed are found in
|stable-diffusion-1.5|runwayml/stable-diffusion-v1-5|Stable Diffusion version 1.5 diffusers model (4.27 GB)|https://huggingface.co/runwayml/stable-diffusion-v1-5 |
|sd-inpainting-1.5|runwayml/stable-diffusion-inpainting|RunwayML SD 1.5 model optimized for inpainting, diffusers version (4.27 GB)|https://huggingface.co/runwayml/stable-diffusion-inpainting |
|stable-diffusion-2.1|stabilityai/stable-diffusion-2-1|Stable Diffusion version 2.1 diffusers model, trained on 768 pixel images (5.21 GB)|https://huggingface.co/stabilityai/stable-diffusion-2-1 |
|sd-inpainting-2.0|stabilityai/stable-diffusion-2-inpainting|Stable Diffusion version 2.0 inpainting model (5.21 GB)|https://huggingface.co/stabilityai/stable-diffusion-2-inpainting |
|sd-inpainting-2.0|stabilityai/stable-diffusion-2-1|Stable Diffusion version 2.0 inpainting model (5.21 GB)|https://huggingface.co/stabilityai/stable-diffusion-2-1 |
|analog-diffusion-1.0|wavymulder/Analog-Diffusion|An SD-1.5 model trained on diverse analog photographs (2.13 GB)|https://huggingface.co/wavymulder/Analog-Diffusion |
|deliberate-1.0|XpucT/Deliberate|Versatile model that produces detailed images up to 768px (4.27 GB)|https://huggingface.co/XpucT/Deliberate |
|d&d-diffusion-1.0|0xJustin/Dungeons-and-Diffusion|Dungeons & Dragons characters (2.13 GB)|https://huggingface.co/0xJustin/Dungeons-and-Diffusion |

View File

@ -24,7 +24,7 @@ You need to have opencv installed so that pypatchmatch can be built:
brew install opencv
```
The next time you start `invoke`, after successfully installing opencv, pypatchmatch will be built.
The next time you start `invoke`, after sucesfully installing opencv, pypatchmatch will be built.
## Linux
@ -56,7 +56,7 @@ Prior to installing PyPatchMatch, you need to take the following steps:
5. Confirm that pypatchmatch is installed. At the command-line prompt enter
`python`, and then at the `>>>` line type
`from patchmatch import patch_match`: It should look like the following:
`from patchmatch import patch_match`: It should look like the follwing:
```py
Python 3.9.5 (default, Nov 23 2021, 15:27:38)
@ -108,4 +108,4 @@ Prior to installing PyPatchMatch, you need to take the following steps:
[**Next, Follow Steps 4-6 from the Debian Section above**](#linux)
If you see no errors you're ready to go!
If you see no errors, then you're ready to go!

View File

@ -11,10 +11,10 @@ if [[ -v "VIRTUAL_ENV" ]]; then
exit -1
fi
VERSION=$(cd ..; python -c "from invokeai.version import __version__ as version; print(version)")
VERSION=$(cd ..; python -c "from ldm.invoke import __version__ as version; print(version)")
PATCH=""
VERSION="v${VERSION}${PATCH}"
LATEST_TAG="v3.0-latest"
LATEST_TAG="v2.3-latest"
echo Building installer for version $VERSION
echo "Be certain that you're in the 'installer' directory before continuing."

View File

@ -291,7 +291,7 @@ class InvokeAiInstance:
src = Path(__file__).parents[1].expanduser().resolve()
# if the above directory contains one of these files, we'll do a source install
next(src.glob("pyproject.toml"))
next(src.glob("invokeai"))
next(src.glob("ldm"))
except StopIteration:
print("Unable to find a wheel or perform a source install. Giving up.")
@ -342,14 +342,14 @@ class InvokeAiInstance:
introduction()
from invokeai.frontend.install import invokeai_configure
from ldm.invoke.config import invokeai_configure
# NOTE: currently the config script does its own arg parsing! this means the command-line switches
# from the installer will also automatically propagate down to the config script.
# this may change in the future with config refactoring!
succeeded = False
try:
invokeai_configure()
invokeai_configure.main()
succeeded = True
except requests.exceptions.ConnectionError as e:
print(f'\nA network error was encountered during configuration and download: {str(e)}')
@ -456,7 +456,7 @@ def get_torch_source() -> (Union[str, None],str):
optional_modules = None
if OS == "Linux":
if device == "rocm":
url = "https://download.pytorch.org/whl/rocm5.4.2"
url = "https://download.pytorch.org/whl/rocm5.2"
elif device == "cpu":
url = "https://download.pytorch.org/whl/cpu"

View File

@ -24,9 +24,9 @@ if [ "$(uname -s)" == "Darwin" ]; then
export PYTORCH_ENABLE_MPS_FALLBACK=1
fi
while true
do
if [ "$0" != "bash" ]; then
while true
do
echo "Do you want to generate images using the"
echo "1. command-line interface"
echo "2. browser-based UI"
@ -67,29 +67,29 @@ if [ "$0" != "bash" ]; then
;;
7)
invokeai-configure --root ${INVOKEAI_ROOT} --yes --default_only
;;
8)
echo "Developer Console:"
;;
8)
echo "Developer Console:"
file_name=$(basename "${BASH_SOURCE[0]}")
bash --init-file "$file_name"
;;
9)
echo "Update:"
echo "Update:"
invokeai-update
;;
10)
invokeai --help
;;
[qQ])
[qQ])
exit 0
;;
*)
echo "Invalid selection"
exit;;
esac
done
else # in developer console
python --version
echo "Press ^D to exit"
export PS1="(InvokeAI) \u@\h \w> "
fi
done

View File

@ -1,11 +1,3 @@
Organization of the source tree:
app -- Home of nodes invocations and services
assets -- Images and other data files used by InvokeAI
backend -- Non-user facing libraries, including the rendering
core.
configs -- Configuration files used at install and run times
frontend -- User-facing scripts, including the CLI and the WebUI
version -- Current InvokeAI version string, stored
in version/invokeai_version.py
After version 2.3 is released, the ldm/invoke modules will be migrated to this location
so that we have a proper invokeai distribution. Currently it is only being used for
data files.

View File

@ -1,96 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import os
from argparse import Namespace
from invokeai.app.services.metadata import PngMetadataService, MetadataServiceBase
from ..services.default_graphs import create_system_graphs
from ..services.latent_storage import DiskLatentsStorage, ForwardCacheLatentsStorage
from ...backend import Globals
from ..services.model_manager_initializer import get_model_manager
from ..services.restoration_services import RestorationServices
from ..services.graph import GraphExecutionState, LibraryGraph
from ..services.image_storage import DiskImageStorage
from ..services.invocation_queue import MemoryInvocationQueue
from ..services.invocation_services import InvocationServices
from ..services.invoker import Invoker
from ..services.processor import DefaultInvocationProcessor
from ..services.sqlite import SqliteItemStorage
from .events import FastAPIEventService
# TODO: is there a better way to achieve this?
def check_internet() -> bool:
"""
Return true if the internet is reachable.
It does this by pinging huggingface.co.
"""
import urllib.request
host = "http://huggingface.co"
try:
urllib.request.urlopen(host, timeout=1)
return True
except:
return False
class ApiDependencies:
"""Contains and initializes all dependencies for the API"""
invoker: Invoker = None
@staticmethod
def initialize(config, event_handler_id: int):
Globals.try_patchmatch = config.patchmatch
Globals.always_use_cpu = config.always_use_cpu
Globals.internet_available = config.internet_available and check_internet()
Globals.disable_xformers = not config.xformers
Globals.ckpt_convert = config.ckpt_convert
# TODO: Use a logger
print(f">> Internet connectivity is {Globals.internet_available}")
events = FastAPIEventService(event_handler_id)
output_folder = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../../outputs")
)
latents = ForwardCacheLatentsStorage(DiskLatentsStorage(f'{output_folder}/latents'))
metadata = PngMetadataService()
images = DiskImageStorage(f'{output_folder}/images', metadata_service=metadata)
# TODO: build a file/path manager?
db_location = os.path.join(output_folder, "invokeai.db")
services = InvocationServices(
model_manager=get_model_manager(config),
events=events,
latents=latents,
images=images,
metadata=metadata,
queue=MemoryInvocationQueue(),
graph_library=SqliteItemStorage[LibraryGraph](
filename=db_location, table_name="graphs"
),
graph_execution_manager=SqliteItemStorage[GraphExecutionState](
filename=db_location, table_name="graph_executions"
),
processor=DefaultInvocationProcessor(),
restoration=RestorationServices(config),
)
create_system_graphs(services.graph_library)
ApiDependencies.invoker = Invoker(services)
@staticmethod
def shutdown():
if ApiDependencies.invoker:
ApiDependencies.invoker.stop()

View File

@ -1,52 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import asyncio
import threading
from queue import Empty, Queue
from typing import Any
from fastapi_events.dispatcher import dispatch
from ..services.events import EventServiceBase
class FastAPIEventService(EventServiceBase):
event_handler_id: int
__queue: Queue
__stop_event: threading.Event
def __init__(self, event_handler_id: int) -> None:
self.event_handler_id = event_handler_id
self.__queue = Queue()
self.__stop_event = threading.Event()
asyncio.create_task(self.__dispatch_from_queue(stop_event=self.__stop_event))
super().__init__()
def stop(self, *args, **kwargs):
self.__stop_event.set()
self.__queue.put(None)
def dispatch(self, event_name: str, payload: Any) -> None:
self.__queue.put(dict(event_name=event_name, payload=payload))
async def __dispatch_from_queue(self, stop_event: threading.Event):
"""Get events on from the queue and dispatch them, from the correct thread"""
while not stop_event.is_set():
try:
event = self.__queue.get(block=False)
if not event: # Probably stopping
continue
dispatch(
event.get("event_name"),
payload=event.get("payload"),
middleware_id=self.event_handler_id,
)
except Empty:
await asyncio.sleep(0.001)
pass
except asyncio.CancelledError as e:
raise e # Raise a proper error

View File

@ -1,34 +0,0 @@
from typing import Optional
from pydantic import BaseModel, Field
from invokeai.app.models.image import ImageType
from invokeai.app.services.metadata import InvokeAIMetadata
class ImageResponseMetadata(BaseModel):
"""An image's metadata. Used only in HTTP responses."""
created: int = Field(description="The creation timestamp of the image")
width: int = Field(description="The width of the image in pixels")
height: int = Field(description="The height of the image in pixels")
invokeai: Optional[InvokeAIMetadata] = Field(
description="The image's InvokeAI-specific metadata"
)
class ImageResponse(BaseModel):
"""The response type for images"""
image_type: ImageType = Field(description="The type of the image")
image_name: str = Field(description="The name of the image")
image_url: str = Field(description="The url of the image")
thumbnail_url: str = Field(description="The url of the image's thumbnail")
metadata: ImageResponseMetadata = Field(description="The image's metadata")
class ProgressImage(BaseModel):
"""The progress image sent intermittently during processing"""
width: int = Field(description="The effective width of the image in pixels")
height: int = Field(description="The effective height of the image in pixels")
dataURL: str = Field(description="The image data as a b64 data URL")

View File

@ -1,128 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import io
from datetime import datetime, timezone
import json
import os
from typing import Any
import uuid
from fastapi import HTTPException, Path, Query, Request, UploadFile
from fastapi.responses import FileResponse, Response
from fastapi.routing import APIRouter
from PIL import Image
from invokeai.app.api.models.images import ImageResponse, ImageResponseMetadata
from invokeai.app.services.metadata import InvokeAIMetadata
from invokeai.app.services.item_storage import PaginatedResults
from ...services.image_storage import ImageType
from ..dependencies import ApiDependencies
images_router = APIRouter(prefix="/v1/images", tags=["images"])
@images_router.get("/{image_type}/{image_name}", operation_id="get_image")
async def get_image(
image_type: ImageType = Path(description="The type of image to get"),
image_name: str = Path(description="The name of the image to get"),
) -> FileResponse | Response:
"""Gets a result"""
path = ApiDependencies.invoker.services.images.get_path(
image_type=image_type, image_name=image_name
)
if ApiDependencies.invoker.services.images.validate_path(path):
return FileResponse(path)
else:
raise HTTPException(status_code=404)
@images_router.get(
"/{image_type}/thumbnails/{image_name}", operation_id="get_thumbnail"
)
async def get_thumbnail(
image_type: ImageType = Path(description="The type of image to get"),
image_name: str = Path(description="The name of the image to get"),
) -> FileResponse | Response:
"""Gets a thumbnail"""
path = ApiDependencies.invoker.services.images.get_path(
image_type=image_type, image_name=image_name, is_thumbnail=True
)
if ApiDependencies.invoker.services.images.validate_path(path):
return FileResponse(path)
else:
raise HTTPException(status_code=404)
@images_router.post(
"/uploads/",
operation_id="upload_image",
responses={
201: {
"description": "The image was uploaded successfully",
"model": ImageResponse,
},
415: {"description": "Image upload failed"},
},
status_code=201,
)
async def upload_image(
file: UploadFile, request: Request, response: Response
) -> ImageResponse:
if not file.content_type.startswith("image"):
raise HTTPException(status_code=415, detail="Not an image")
contents = await file.read()
try:
img = Image.open(io.BytesIO(contents))
except:
# Error opening the image
raise HTTPException(status_code=415, detail="Failed to read image")
filename = f"{uuid.uuid4()}_{str(int(datetime.now(timezone.utc).timestamp()))}.png"
(image_path, thumbnail_path, ctime) = ApiDependencies.invoker.services.images.save(
ImageType.UPLOAD, filename, img
)
invokeai_metadata = ApiDependencies.invoker.services.metadata.get_metadata(img)
res = ImageResponse(
image_type=ImageType.UPLOAD,
image_name=filename,
image_url=f"api/v1/images/{ImageType.UPLOAD.value}/{filename}",
thumbnail_url=f"api/v1/images/{ImageType.UPLOAD.value}/thumbnails/{os.path.splitext(filename)[0]}.webp",
metadata=ImageResponseMetadata(
created=ctime,
width=img.width,
height=img.height,
invokeai=invokeai_metadata,
),
)
response.status_code = 201
response.headers["Location"] = request.url_for(
"get_image", image_type=ImageType.UPLOAD.value, image_name=filename
)
return res
@images_router.get(
"/",
operation_id="list_images",
responses={200: {"model": PaginatedResults[ImageResponse]}},
)
async def list_images(
image_type: ImageType = Query(
default=ImageType.RESULT, description="The type of images to get"
),
page: int = Query(default=0, description="The page of images to get"),
per_page: int = Query(default=10, description="The number of images per page"),
) -> PaginatedResults[ImageResponse]:
"""Gets a list of images"""
result = ApiDependencies.invoker.services.images.list(image_type, page, per_page)
return result

View File

@ -1,251 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654) and 2023 Kent Keirsey (https://github.com/hipsterusername)
import shutil
import asyncio
from typing import Annotated, Any, List, Literal, Optional, Union
from fastapi.routing import APIRouter, HTTPException
from pydantic import BaseModel, Field, parse_obj_as
from pathlib import Path
from ..dependencies import ApiDependencies
from invokeai.backend.globals import Globals, global_converted_ckpts_dir
from invokeai.backend.args import Args
models_router = APIRouter(prefix="/v1/models", tags=["models"])
class VaeRepo(BaseModel):
repo_id: str = Field(description="The repo ID to use for this VAE")
path: Optional[str] = Field(description="The path to the VAE")
subfolder: Optional[str] = Field(description="The subfolder to use for this VAE")
class ModelInfo(BaseModel):
description: Optional[str] = Field(description="A description of the model")
class CkptModelInfo(ModelInfo):
format: Literal['ckpt'] = 'ckpt'
config: str = Field(description="The path to the model config")
weights: str = Field(description="The path to the model weights")
vae: str = Field(description="The path to the model VAE")
width: Optional[int] = Field(description="The width of the model")
height: Optional[int] = Field(description="The height of the model")
class DiffusersModelInfo(ModelInfo):
format: Literal['diffusers'] = 'diffusers'
vae: Optional[VaeRepo] = Field(description="The VAE repo to use for this model")
repo_id: Optional[str] = Field(description="The repo ID to use for this model")
path: Optional[str] = Field(description="The path to the model")
class CreateModelRequest(BaseModel):
name: str = Field(description="The name of the model")
info: Union[CkptModelInfo, DiffusersModelInfo] = Field(discriminator="format", description="The model info")
class CreateModelResponse(BaseModel):
name: str = Field(description="The name of the new model")
info: Union[CkptModelInfo, DiffusersModelInfo] = Field(discriminator="format", description="The model info")
status: str = Field(description="The status of the API response")
class ConversionRequest(BaseModel):
name: str = Field(description="The name of the new model")
info: CkptModelInfo = Field(description="The converted model info")
save_location: str = Field(description="The path to save the converted model weights")
class ConvertedModelResponse(BaseModel):
name: str = Field(description="The name of the new model")
info: DiffusersModelInfo = Field(description="The converted model info")
class ModelsList(BaseModel):
models: dict[str, Annotated[Union[(CkptModelInfo,DiffusersModelInfo)], Field(discriminator="format")]]
@models_router.get(
"/",
operation_id="list_models",
responses={200: {"model": ModelsList }},
)
async def list_models() -> ModelsList:
"""Gets a list of models"""
models_raw = ApiDependencies.invoker.services.model_manager.list_models()
models = parse_obj_as(ModelsList, { "models": models_raw })
return models
@models_router.post(
"/",
operation_id="update_model",
responses={200: {"status": "success"}},
)
async def update_model(
model_request: CreateModelRequest
) -> CreateModelResponse:
""" Add Model """
model_request_info = model_request.info
info_dict = model_request_info.dict()
model_response = CreateModelResponse(name=model_request.name, info=model_request.info, status="success")
ApiDependencies.invoker.services.model_manager.add_model(
model_name=model_request.name,
model_attributes=info_dict,
clobber=True,
)
return model_response
@models_router.delete(
"/{model_name}",
operation_id="del_model",
responses={
204: {
"description": "Model deleted successfully"
},
404: {
"description": "Model not found"
}
},
)
async def delete_model(model_name: str) -> None:
"""Delete Model"""
model_names = ApiDependencies.invoker.services.model_manager.model_names()
model_exists = model_name in model_names
# check if model exists
print(f">> Checking for model {model_name}...")
if model_exists:
print(f">> Deleting Model: {model_name}")
ApiDependencies.invoker.services.model_manager.del_model(model_name, delete_files=True)
print(f">> Model Deleted: {model_name}")
raise HTTPException(status_code=204, detail=f"Model '{model_name}' deleted successfully")
else:
print(f">> Model not found")
raise HTTPException(status_code=404, detail=f"Model '{model_name}' not found")
# @socketio.on("convertToDiffusers")
# def convert_to_diffusers(model_to_convert: dict):
# try:
# if model_info := self.generate.model_manager.model_info(
# model_name=model_to_convert["model_name"]
# ):
# if "weights" in model_info:
# ckpt_path = Path(model_info["weights"])
# original_config_file = Path(model_info["config"])
# model_name = model_to_convert["model_name"]
# model_description = model_info["description"]
# else:
# self.socketio.emit(
# "error", {"message": "Model is not a valid checkpoint file"}
# )
# else:
# self.socketio.emit(
# "error", {"message": "Could not retrieve model info."}
# )
# if not ckpt_path.is_absolute():
# ckpt_path = Path(Globals.root, ckpt_path)
# if original_config_file and not original_config_file.is_absolute():
# original_config_file = Path(Globals.root, original_config_file)
# diffusers_path = Path(
# ckpt_path.parent.absolute(), f"{model_name}_diffusers"
# )
# if model_to_convert["save_location"] == "root":
# diffusers_path = Path(
# global_converted_ckpts_dir(), f"{model_name}_diffusers"
# )
# if (
# model_to_convert["save_location"] == "custom"
# and model_to_convert["custom_location"] is not None
# ):
# diffusers_path = Path(
# model_to_convert["custom_location"], f"{model_name}_diffusers"
# )
# if diffusers_path.exists():
# shutil.rmtree(diffusers_path)
# self.generate.model_manager.convert_and_import(
# ckpt_path,
# diffusers_path,
# model_name=model_name,
# model_description=model_description,
# vae=None,
# original_config_file=original_config_file,
# commit_to_conf=opt.conf,
# )
# new_model_list = self.generate.model_manager.list_models()
# socketio.emit(
# "modelConverted",
# {
# "new_model_name": model_name,
# "model_list": new_model_list,
# "update": True,
# },
# )
# print(f">> Model Converted: {model_name}")
# except Exception as e:
# self.handle_exceptions(e)
# @socketio.on("mergeDiffusersModels")
# def merge_diffusers_models(model_merge_info: dict):
# try:
# models_to_merge = model_merge_info["models_to_merge"]
# model_ids_or_paths = [
# self.generate.model_manager.model_name_or_path(x)
# for x in models_to_merge
# ]
# merged_pipe = merge_diffusion_models(
# model_ids_or_paths,
# model_merge_info["alpha"],
# model_merge_info["interp"],
# model_merge_info["force"],
# )
# dump_path = global_models_dir() / "merged_models"
# if model_merge_info["model_merge_save_path"] is not None:
# dump_path = Path(model_merge_info["model_merge_save_path"])
# os.makedirs(dump_path, exist_ok=True)
# dump_path = dump_path / model_merge_info["merged_model_name"]
# merged_pipe.save_pretrained(dump_path, safe_serialization=1)
# merged_model_config = dict(
# model_name=model_merge_info["merged_model_name"],
# description=f'Merge of models {", ".join(models_to_merge)}',
# commit_to_conf=opt.conf,
# )
# if vae := self.generate.model_manager.config[models_to_merge[0]].get(
# "vae", None
# ):
# print(f">> Using configured VAE assigned to {models_to_merge[0]}")
# merged_model_config.update(vae=vae)
# self.generate.model_manager.import_diffuser_model(
# dump_path, **merged_model_config
# )
# new_model_list = self.generate.model_manager.list_models()
# socketio.emit(
# "modelsMerged",
# {
# "merged_models": models_to_merge,
# "merged_model_name": model_merge_info["merged_model_name"],
# "model_list": new_model_list,
# "update": True,
# },
# )
# print(f">> Models Merged: {models_to_merge}")
# print(f">> New Model Added: {model_merge_info['merged_model_name']}")
# except Exception as e:

View File

@ -1,287 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Annotated, List, Optional, Union
from fastapi import Body, Path, Query
from fastapi.responses import Response
from fastapi.routing import APIRouter
from pydantic.fields import Field
from ...invocations import *
from ...invocations.baseinvocation import BaseInvocation
from ...services.graph import (
Edge,
EdgeConnection,
Graph,
GraphExecutionState,
NodeAlreadyExecutedError,
)
from ...services.item_storage import PaginatedResults
from ..dependencies import ApiDependencies
session_router = APIRouter(prefix="/v1/sessions", tags=["sessions"])
@session_router.post(
"/",
operation_id="create_session",
responses={
200: {"model": GraphExecutionState},
400: {"description": "Invalid json"},
},
)
async def create_session(
graph: Optional[Graph] = Body(
default=None, description="The graph to initialize the session with"
)
) -> GraphExecutionState:
"""Creates a new session, optionally initializing it with an invocation graph"""
session = ApiDependencies.invoker.create_execution_state(graph)
return session
@session_router.get(
"/",
operation_id="list_sessions",
responses={200: {"model": PaginatedResults[GraphExecutionState]}},
)
async def list_sessions(
page: int = Query(default=0, description="The page of results to get"),
per_page: int = Query(default=10, description="The number of results per page"),
query: str = Query(default="", description="The query string to search for"),
) -> PaginatedResults[GraphExecutionState]:
"""Gets a list of sessions, optionally searching"""
if query == "":
result = ApiDependencies.invoker.services.graph_execution_manager.list(
page, per_page
)
else:
result = ApiDependencies.invoker.services.graph_execution_manager.search(
query, page, per_page
)
return result
@session_router.get(
"/{session_id}",
operation_id="get_session",
responses={
200: {"model": GraphExecutionState},
404: {"description": "Session not found"},
},
)
async def get_session(
session_id: str = Path(description="The id of the session to get"),
) -> GraphExecutionState:
"""Gets a session"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
else:
return session
@session_router.post(
"/{session_id}/nodes",
operation_id="add_node",
responses={
200: {"model": str},
400: {"description": "Invalid node or link"},
404: {"description": "Session not found"},
},
)
async def add_node(
session_id: str = Path(description="The id of the session"),
node: Annotated[
Union[BaseInvocation.get_invocations()], Field(discriminator="type") # type: ignore
] = Body(description="The node to add"),
) -> str:
"""Adds a node to the graph"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
try:
session.add_node(node)
ApiDependencies.invoker.services.graph_execution_manager.set(
session
) # TODO: can this be done automatically, or add node through an API?
return session.id
except NodeAlreadyExecutedError:
return Response(status_code=400)
except IndexError:
return Response(status_code=400)
@session_router.put(
"/{session_id}/nodes/{node_path}",
operation_id="update_node",
responses={
200: {"model": GraphExecutionState},
400: {"description": "Invalid node or link"},
404: {"description": "Session not found"},
},
)
async def update_node(
session_id: str = Path(description="The id of the session"),
node_path: str = Path(description="The path to the node in the graph"),
node: Annotated[
Union[BaseInvocation.get_invocations()], Field(discriminator="type") # type: ignore
] = Body(description="The new node"),
) -> GraphExecutionState:
"""Updates a node in the graph and removes all linked edges"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
try:
session.update_node(node_path, node)
ApiDependencies.invoker.services.graph_execution_manager.set(
session
) # TODO: can this be done automatically, or add node through an API?
return session
except NodeAlreadyExecutedError:
return Response(status_code=400)
except IndexError:
return Response(status_code=400)
@session_router.delete(
"/{session_id}/nodes/{node_path}",
operation_id="delete_node",
responses={
200: {"model": GraphExecutionState},
400: {"description": "Invalid node or link"},
404: {"description": "Session not found"},
},
)
async def delete_node(
session_id: str = Path(description="The id of the session"),
node_path: str = Path(description="The path to the node to delete"),
) -> GraphExecutionState:
"""Deletes a node in the graph and removes all linked edges"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
try:
session.delete_node(node_path)
ApiDependencies.invoker.services.graph_execution_manager.set(
session
) # TODO: can this be done automatically, or add node through an API?
return session
except NodeAlreadyExecutedError:
return Response(status_code=400)
except IndexError:
return Response(status_code=400)
@session_router.post(
"/{session_id}/edges",
operation_id="add_edge",
responses={
200: {"model": GraphExecutionState},
400: {"description": "Invalid node or link"},
404: {"description": "Session not found"},
},
)
async def add_edge(
session_id: str = Path(description="The id of the session"),
edge: Edge = Body(description="The edge to add"),
) -> GraphExecutionState:
"""Adds an edge to the graph"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
try:
session.add_edge(edge)
ApiDependencies.invoker.services.graph_execution_manager.set(
session
) # TODO: can this be done automatically, or add node through an API?
return session
except NodeAlreadyExecutedError:
return Response(status_code=400)
except IndexError:
return Response(status_code=400)
# TODO: the edge being in the path here is really ugly, find a better solution
@session_router.delete(
"/{session_id}/edges/{from_node_id}/{from_field}/{to_node_id}/{to_field}",
operation_id="delete_edge",
responses={
200: {"model": GraphExecutionState},
400: {"description": "Invalid node or link"},
404: {"description": "Session not found"},
},
)
async def delete_edge(
session_id: str = Path(description="The id of the session"),
from_node_id: str = Path(description="The id of the node the edge is coming from"),
from_field: str = Path(description="The field of the node the edge is coming from"),
to_node_id: str = Path(description="The id of the node the edge is going to"),
to_field: str = Path(description="The field of the node the edge is going to"),
) -> GraphExecutionState:
"""Deletes an edge from the graph"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
try:
edge = Edge(
source=EdgeConnection(node_id=from_node_id, field=from_field),
destination=EdgeConnection(node_id=to_node_id, field=to_field)
)
session.delete_edge(edge)
ApiDependencies.invoker.services.graph_execution_manager.set(
session
) # TODO: can this be done automatically, or add node through an API?
return session
except NodeAlreadyExecutedError:
return Response(status_code=400)
except IndexError:
return Response(status_code=400)
@session_router.put(
"/{session_id}/invoke",
operation_id="invoke_session",
responses={
200: {"model": None},
202: {"description": "The invocation is queued"},
400: {"description": "The session has no invocations ready to invoke"},
404: {"description": "Session not found"},
},
)
async def invoke_session(
session_id: str = Path(description="The id of the session to invoke"),
all: bool = Query(
default=False, description="Whether or not to invoke all remaining invocations"
),
) -> None:
"""Invokes a session"""
session = ApiDependencies.invoker.services.graph_execution_manager.get(session_id)
if session is None:
return Response(status_code=404)
if session.is_complete():
return Response(status_code=400)
ApiDependencies.invoker.invoke(session, invoke_all=all)
return Response(status_code=202)
@session_router.delete(
"/{session_id}/invoke",
operation_id="cancel_session_invoke",
responses={
202: {"description": "The invocation is canceled"}
},
)
async def cancel_session_invoke(
session_id: str = Path(description="The id of the session to cancel"),
) -> None:
"""Invokes a session"""
ApiDependencies.invoker.cancel(session_id)
return Response(status_code=202)

View File

@ -1,38 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from fastapi import FastAPI
from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event
from fastapi_socketio import SocketManager
from ..services.events import EventServiceBase
class SocketIO:
__sio: SocketManager
def __init__(self, app: FastAPI):
self.__sio = SocketManager(app=app)
self.__sio.on("subscribe", handler=self._handle_sub)
self.__sio.on("unsubscribe", handler=self._handle_unsub)
local_handler.register(
event_name=EventServiceBase.session_event, _func=self._handle_session_event
)
async def _handle_session_event(self, event: Event):
await self.__sio.emit(
event=event[1]["event"],
data=event[1]["data"],
room=event[1]["data"]["graph_execution_state_id"],
)
async def _handle_sub(self, sid, data, *args, **kwargs):
if "session" in data:
self.__sio.enter_room(sid, data["session"])
# @app.sio.on('unsubscribe')
async def _handle_unsub(self, sid, data, *args, **kwargs):
if "session" in data:
self.__sio.leave_room(sid, data["session"])

View File

@ -1,160 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import asyncio
from inspect import signature
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
from fastapi.staticfiles import StaticFiles
from fastapi_events.handlers.local import local_handler
from fastapi_events.middleware import EventHandlerASGIMiddleware
from pydantic.schema import schema
from ..backend import Args
from .api.dependencies import ApiDependencies
from .api.routers import images, sessions, models
from .api.sockets import SocketIO
from .invocations import *
from .invocations.baseinvocation import BaseInvocation
# Create the app
# TODO: create this all in a method so configuration/etc. can be passed in?
app = FastAPI(title="Invoke AI", docs_url=None, redoc_url=None)
# Add event handler
event_handler_id: int = id(app)
app.add_middleware(
EventHandlerASGIMiddleware,
handlers=[
local_handler
], # TODO: consider doing this in services to support different configurations
middleware_id=event_handler_id,
)
# Add CORS
# TODO: use configuration for this
origins = []
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
socket_io = SocketIO(app)
config = {}
# Add startup event to load dependencies
@app.on_event("startup")
async def startup_event():
config = Args()
config.parse_args()
ApiDependencies.initialize(
config=config, event_handler_id=event_handler_id
)
# Shut down threads
@app.on_event("shutdown")
async def shutdown_event():
ApiDependencies.shutdown()
# Include all routers
# TODO: REMOVE
# app.include_router(
# invocation.invocation_router,
# prefix = '/api')
app.include_router(sessions.session_router, prefix="/api")
app.include_router(images.images_router, prefix="/api")
app.include_router(models.models_router, prefix="/api")
# Build a custom OpenAPI to include all outputs
# TODO: can outputs be included on metadata of invocation schemas somehow?
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title=app.title,
description="An API for invoking AI image operations",
version="1.0.0",
routes=app.routes,
)
# Add all outputs
all_invocations = BaseInvocation.get_invocations()
output_types = set()
output_type_titles = dict()
for invoker in all_invocations:
output_type = signature(invoker.invoke).return_annotation
output_types.add(output_type)
output_schemas = schema(output_types, ref_prefix="#/components/schemas/")
for schema_key, output_schema in output_schemas["definitions"].items():
openapi_schema["components"]["schemas"][schema_key] = output_schema
# TODO: note that we assume the schema_key here is the TYPE.__name__
# This could break in some cases, figure out a better way to do it
output_type_titles[schema_key] = output_schema["title"]
# Add a reference to the output type to additionalProperties of the invoker schema
for invoker in all_invocations:
invoker_name = invoker.__name__
output_type = signature(invoker.invoke).return_annotation
output_type_title = output_type_titles[output_type.__name__]
invoker_schema = openapi_schema["components"]["schemas"][invoker_name]
outputs_ref = {"$ref": f"#/components/schemas/{output_type_title}"}
invoker_schema["output"] = outputs_ref
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Override API doc favicons
app.mount("/static", StaticFiles(directory="static/dream_web"), name="static")
@app.get("/docs", include_in_schema=False)
def overridden_swagger():
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title,
swagger_favicon_url="/static/favicon.ico",
)
@app.get("/redoc", include_in_schema=False)
def overridden_redoc():
return get_redoc_html(
openapi_url=app.openapi_url,
title=app.title,
redoc_favicon_url="/static/favicon.ico",
)
def invoke_api():
# Start our own event loop for eventing usage
# TODO: determine if there's a better way to do this
loop = asyncio.new_event_loop()
config = uvicorn.Config(app=app, host="0.0.0.0", port=9090, loop=loop)
# Use access_log to turn off logging
server = uvicorn.Server(config)
loop.run_until_complete(server.serve())
if __name__ == "__main__":
invoke_api()

View File

@ -1,286 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
from abc import ABC, abstractmethod
import argparse
from typing import Any, Callable, Iterable, Literal, get_args, get_origin, get_type_hints
from pydantic import BaseModel, Field
import networkx as nx
import matplotlib.pyplot as plt
from ..invocations.baseinvocation import BaseInvocation
from ..invocations.image import ImageField
from ..services.graph import GraphExecutionState, LibraryGraph, GraphInvocation, Edge
from ..services.invoker import Invoker
def add_field_argument(command_parser, name: str, field, default_override = None):
default = default_override if default_override is not None else field.default if field.default_factory is None else field.default_factory()
if get_origin(field.type_) == Literal:
allowed_values = get_args(field.type_)
allowed_types = set()
for val in allowed_values:
allowed_types.add(type(val))
allowed_types_list = list(allowed_types)
field_type = allowed_types_list[0] if len(allowed_types) == 1 else Union[allowed_types_list] # type: ignore
command_parser.add_argument(
f"--{name}",
dest=name,
type=field_type,
default=default,
choices=allowed_values,
help=field.field_info.description,
)
else:
command_parser.add_argument(
f"--{name}",
dest=name,
type=field.type_,
default=default,
help=field.field_info.description,
)
def add_parsers(
subparsers,
commands: list[type],
command_field: str = "type",
exclude_fields: list[str] = ["id", "type"],
add_arguments: Callable[[argparse.ArgumentParser], None]|None = None
):
"""Adds parsers for each command to the subparsers"""
# Create subparsers for each command
for command in commands:
hints = get_type_hints(command)
cmd_name = get_args(hints[command_field])[0]
command_parser = subparsers.add_parser(cmd_name, help=command.__doc__)
if add_arguments is not None:
add_arguments(command_parser)
# Convert all fields to arguments
fields = command.__fields__ # type: ignore
for name, field in fields.items():
if name in exclude_fields:
continue
add_field_argument(command_parser, name, field)
def add_graph_parsers(
subparsers,
graphs: list[LibraryGraph],
add_arguments: Callable[[argparse.ArgumentParser], None]|None = None
):
for graph in graphs:
command_parser = subparsers.add_parser(graph.name, help=graph.description)
if add_arguments is not None:
add_arguments(command_parser)
# Add arguments for inputs
for exposed_input in graph.exposed_inputs:
node = graph.graph.get_node(exposed_input.node_path)
field = node.__fields__[exposed_input.field]
default_override = getattr(node, exposed_input.field)
add_field_argument(command_parser, exposed_input.alias, field, default_override)
class CliContext:
invoker: Invoker
session: GraphExecutionState
parser: argparse.ArgumentParser
defaults: dict[str, Any]
graph_nodes: dict[str, str]
nodes_added: list[str]
def __init__(self, invoker: Invoker, session: GraphExecutionState, parser: argparse.ArgumentParser):
self.invoker = invoker
self.session = session
self.parser = parser
self.defaults = dict()
self.graph_nodes = dict()
self.nodes_added = list()
def get_session(self):
self.session = self.invoker.services.graph_execution_manager.get(self.session.id)
return self.session
def reset(self):
self.session = self.invoker.create_execution_state()
self.graph_nodes = dict()
self.nodes_added = list()
# Leave defaults unchanged
def add_node(self, node: BaseInvocation):
self.get_session()
self.session.graph.add_node(node)
self.nodes_added.append(node.id)
self.invoker.services.graph_execution_manager.set(self.session)
def add_edge(self, edge: Edge):
self.get_session()
self.session.add_edge(edge)
self.invoker.services.graph_execution_manager.set(self.session)
class ExitCli(Exception):
"""Exception to exit the CLI"""
pass
class BaseCommand(ABC, BaseModel):
"""A CLI command"""
# All commands must include a type name like this:
# type: Literal['your_command_name'] = 'your_command_name'
@classmethod
def get_all_subclasses(cls):
subclasses = []
toprocess = [cls]
while len(toprocess) > 0:
next = toprocess.pop(0)
next_subclasses = next.__subclasses__()
subclasses.extend(next_subclasses)
toprocess.extend(next_subclasses)
return subclasses
@classmethod
def get_commands(cls):
return tuple(BaseCommand.get_all_subclasses())
@classmethod
def get_commands_map(cls):
# Get the type strings out of the literals and into a dictionary
return dict(map(lambda t: (get_args(get_type_hints(t)['type'])[0], t),BaseCommand.get_all_subclasses()))
@abstractmethod
def run(self, context: CliContext) -> None:
"""Run the command. Raise ExitCli to exit."""
pass
class ExitCommand(BaseCommand):
"""Exits the CLI"""
type: Literal['exit'] = 'exit'
def run(self, context: CliContext) -> None:
raise ExitCli()
class HelpCommand(BaseCommand):
"""Shows help"""
type: Literal['help'] = 'help'
def run(self, context: CliContext) -> None:
context.parser.print_help()
def get_graph_execution_history(
graph_execution_state: GraphExecutionState,
) -> Iterable[str]:
"""Gets the history of fully-executed invocations for a graph execution"""
return (
n
for n in reversed(graph_execution_state.executed_history)
if n in graph_execution_state.graph.nodes
)
def get_invocation_command(invocation) -> str:
fields = invocation.__fields__.items()
type_hints = get_type_hints(type(invocation))
command = [invocation.type]
for name, field in fields:
if name in ["id", "type"]:
continue
# TODO: add links
# Skip image fields when serializing command
type_hint = type_hints.get(name) or None
if type_hint is ImageField or ImageField in get_args(type_hint):
continue
field_value = getattr(invocation, name)
field_default = field.default
if field_value != field_default:
if type_hint is str or str in get_args(type_hint):
command.append(f'--{name} "{field_value}"')
else:
command.append(f"--{name} {field_value}")
return " ".join(command)
class HistoryCommand(BaseCommand):
"""Shows the invocation history"""
type: Literal['history'] = 'history'
# Inputs
# fmt: off
count: int = Field(default=5, gt=0, description="The number of history entries to show")
# fmt: on
def run(self, context: CliContext) -> None:
history = list(get_graph_execution_history(context.get_session()))
for i in range(min(self.count, len(history))):
entry_id = history[-1 - i]
entry = context.get_session().graph.get_node(entry_id)
print(f"{entry_id}: {get_invocation_command(entry)}")
class SetDefaultCommand(BaseCommand):
"""Sets a default value for a field"""
type: Literal['default'] = 'default'
# Inputs
# fmt: off
field: str = Field(description="The field to set the default for")
value: str = Field(description="The value to set the default to, or None to clear the default")
# fmt: on
def run(self, context: CliContext) -> None:
if self.value is None:
if self.field in context.defaults:
del context.defaults[self.field]
else:
context.defaults[self.field] = self.value
class DrawGraphCommand(BaseCommand):
"""Debugs a graph"""
type: Literal['draw_graph'] = 'draw_graph'
def run(self, context: CliContext) -> None:
session: GraphExecutionState = context.invoker.services.graph_execution_manager.get(context.session.id)
nxgraph = session.graph.nx_graph_flat()
# Draw the networkx graph
plt.figure(figsize=(20, 20))
pos = nx.spectral_layout(nxgraph)
nx.draw_networkx_nodes(nxgraph, pos, node_size=1000)
nx.draw_networkx_edges(nxgraph, pos, width=2)
nx.draw_networkx_labels(nxgraph, pos, font_size=20, font_family="sans-serif")
plt.axis("off")
plt.show()
class DrawExecutionGraphCommand(BaseCommand):
"""Debugs an execution graph"""
type: Literal['draw_xgraph'] = 'draw_xgraph'
def run(self, context: CliContext) -> None:
session: GraphExecutionState = context.invoker.services.graph_execution_manager.get(context.session.id)
nxgraph = session.execution_graph.nx_graph_flat()
# Draw the networkx graph
plt.figure(figsize=(20, 20))
pos = nx.spectral_layout(nxgraph)
nx.draw_networkx_nodes(nxgraph, pos, node_size=1000)
nx.draw_networkx_edges(nxgraph, pos, width=2)
nx.draw_networkx_labels(nxgraph, pos, font_size=20, font_family="sans-serif")
plt.axis("off")
plt.show()

View File

@ -1,167 +0,0 @@
"""
Readline helper functions for cli_app.py
You may import the global singleton `completer` to get access to the
completer object.
"""
import atexit
import readline
import shlex
from pathlib import Path
from typing import List, Dict, Literal, get_args, get_type_hints, get_origin
from ...backend import ModelManager, Globals
from ..invocations.baseinvocation import BaseInvocation
from .commands import BaseCommand
# singleton object, class variable
completer = None
class Completer(object):
def __init__(self, model_manager: ModelManager):
self.commands = self.get_commands()
self.matches = None
self.linebuffer = None
self.manager = model_manager
return
def complete(self, text, state):
"""
Complete commands and switches fromm the node CLI command line.
Switches are determined in a context-specific manner.
"""
buffer = readline.get_line_buffer()
if state == 0:
options = None
try:
current_command, current_switch = self.get_current_command(buffer)
options = self.get_command_options(current_command, current_switch)
except IndexError:
pass
options = options or list(self.parse_commands().keys())
if not text: # first time
self.matches = options
else:
self.matches = [s for s in options if s and s.startswith(text)]
try:
match = self.matches[state]
except IndexError:
match = None
return match
@classmethod
def get_commands(self)->List[object]:
"""
Return a list of all the client commands and invocations.
"""
return BaseCommand.get_commands() + BaseInvocation.get_invocations()
def get_current_command(self, buffer: str)->tuple[str, str]:
"""
Parse the readline buffer to find the most recent command and its switch.
"""
if len(buffer)==0:
return None, None
tokens = shlex.split(buffer)
command = None
switch = None
for t in tokens:
if t[0].isalpha():
if switch is None:
command = t
else:
switch = t
# don't try to autocomplete switches that are already complete
if switch and buffer.endswith(' '):
switch=None
return command or '', switch or ''
def parse_commands(self)->Dict[str, List[str]]:
"""
Return a dict in which the keys are the command name
and the values are the parameters the command takes.
"""
result = dict()
for command in self.commands:
hints = get_type_hints(command)
name = get_args(hints['type'])[0]
result.update({name:hints})
return result
def get_command_options(self, command: str, switch: str)->List[str]:
"""
Return all the parameters that can be passed to the command as
command-line switches. Returns None if the command is unrecognized.
"""
parsed_commands = self.parse_commands()
if command not in parsed_commands:
return None
# handle switches in the format "-foo=bar"
argument = None
if switch and '=' in switch:
switch, argument = switch.split('=')
parameter = switch.strip('-')
if parameter in parsed_commands[command]:
if argument is None:
return self.get_parameter_options(parameter, parsed_commands[command][parameter])
else:
return [f"--{parameter}={x}" for x in self.get_parameter_options(parameter, parsed_commands[command][parameter])]
else:
return [f"--{x}" for x in parsed_commands[command].keys()]
def get_parameter_options(self, parameter: str, typehint)->List[str]:
"""
Given a parameter type (such as Literal), offers autocompletions.
"""
if get_origin(typehint) == Literal:
return get_args(typehint)
if parameter == 'model':
return self.manager.model_names()
def _pre_input_hook(self):
if self.linebuffer:
readline.insert_text(self.linebuffer)
readline.redisplay()
self.linebuffer = None
def set_autocompleter(model_manager: ModelManager) -> Completer:
global completer
if completer:
return completer
completer = Completer(model_manager)
readline.set_completer(completer.complete)
# pyreadline3 does not have a set_auto_history() method
try:
readline.set_auto_history(True)
except:
pass
readline.set_pre_input_hook(completer._pre_input_hook)
readline.set_completer_delims(" ")
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set print-completions-horizontally off")
readline.parse_and_bind("set page-completions on")
readline.parse_and_bind("set skip-completed-text on")
readline.parse_and_bind("set show-all-if-ambiguous on")
histfile = Path(Globals.root, ".invoke_history")
try:
readline.read_history_file(histfile)
readline.set_history_length(1000)
except FileNotFoundError:
pass
except OSError: # file likely corrupted
newname = f"{histfile}.old"
print(
f"## Your history file {histfile} couldn't be loaded and may be corrupted. Renaming it to {newname}"
)
histfile.replace(Path(newname))
atexit.register(readline.write_history_file, histfile)

View File

@ -1,386 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import argparse
import os
import re
import shlex
import time
from typing import (
Union,
get_type_hints,
)
from pydantic import BaseModel
from pydantic.fields import Field
from invokeai.app.services.metadata import PngMetadataService
from .services.default_graphs import create_system_graphs
from .services.latent_storage import DiskLatentsStorage, ForwardCacheLatentsStorage
from ..backend import Args
from .cli.commands import BaseCommand, CliContext, ExitCli, add_graph_parsers, add_parsers, get_graph_execution_history
from .cli.completer import set_autocompleter
from .invocations import *
from .invocations.baseinvocation import BaseInvocation
from .services.events import EventServiceBase
from .services.model_manager_initializer import get_model_manager
from .services.restoration_services import RestorationServices
from .services.graph import Edge, EdgeConnection, ExposedNodeInput, GraphExecutionState, GraphInvocation, LibraryGraph, are_connection_types_compatible
from .services.default_graphs import default_text_to_image_graph_id
from .services.image_storage import DiskImageStorage
from .services.invocation_queue import MemoryInvocationQueue
from .services.invocation_services import InvocationServices
from .services.invoker import Invoker
from .services.processor import DefaultInvocationProcessor
from .services.sqlite import SqliteItemStorage
class CliCommand(BaseModel):
command: Union[BaseCommand.get_commands() + BaseInvocation.get_invocations()] = Field(discriminator="type") # type: ignore
class InvalidArgs(Exception):
pass
def add_invocation_args(command_parser):
# Add linking capability
command_parser.add_argument(
"--link",
"-l",
action="append",
nargs=3,
help="A link in the format 'source_node source_field dest_field'. source_node can be relative to history (e.g. -1)",
)
command_parser.add_argument(
"--link_node",
"-ln",
action="append",
help="A link from all fields in the specified node. Node can be relative to history (e.g. -1)",
)
def get_command_parser(services: InvocationServices) -> argparse.ArgumentParser:
# Create invocation parser
parser = argparse.ArgumentParser()
def exit(*args, **kwargs):
raise InvalidArgs
parser.exit = exit
subparsers = parser.add_subparsers(dest="type")
# Create subparsers for each invocation
invocations = BaseInvocation.get_all_subclasses()
add_parsers(subparsers, invocations, add_arguments=add_invocation_args)
# Create subparsers for each command
commands = BaseCommand.get_all_subclasses()
add_parsers(subparsers, commands, exclude_fields=["type"])
# Create subparsers for exposed CLI graphs
# TODO: add a way to identify these graphs
text_to_image = services.graph_library.get(default_text_to_image_graph_id)
add_graph_parsers(subparsers, [text_to_image], add_arguments=add_invocation_args)
return parser
class NodeField():
alias: str
node_path: str
field: str
field_type: type
def __init__(self, alias: str, node_path: str, field: str, field_type: type):
self.alias = alias
self.node_path = node_path
self.field = field
self.field_type = field_type
def fields_from_type_hints(hints: dict[str, type], node_path: str) -> dict[str,NodeField]:
return {k:NodeField(alias=k, node_path=node_path, field=k, field_type=v) for k, v in hints.items()}
def get_node_input_field(graph: LibraryGraph, field_alias: str, node_id: str) -> NodeField:
"""Gets the node field for the specified field alias"""
exposed_input = next(e for e in graph.exposed_inputs if e.alias == field_alias)
node_type = type(graph.graph.get_node(exposed_input.node_path))
return NodeField(alias=exposed_input.alias, node_path=f'{node_id}.{exposed_input.node_path}', field=exposed_input.field, field_type=get_type_hints(node_type)[exposed_input.field])
def get_node_output_field(graph: LibraryGraph, field_alias: str, node_id: str) -> NodeField:
"""Gets the node field for the specified field alias"""
exposed_output = next(e for e in graph.exposed_outputs if e.alias == field_alias)
node_type = type(graph.graph.get_node(exposed_output.node_path))
node_output_type = node_type.get_output_type()
return NodeField(alias=exposed_output.alias, node_path=f'{node_id}.{exposed_output.node_path}', field=exposed_output.field, field_type=get_type_hints(node_output_type)[exposed_output.field])
def get_node_inputs(invocation: BaseInvocation, context: CliContext) -> dict[str, NodeField]:
"""Gets the inputs for the specified invocation from the context"""
node_type = type(invocation)
if node_type is not GraphInvocation:
return fields_from_type_hints(get_type_hints(node_type), invocation.id)
else:
graph: LibraryGraph = context.invoker.services.graph_library.get(context.graph_nodes[invocation.id])
return {e.alias: get_node_input_field(graph, e.alias, invocation.id) for e in graph.exposed_inputs}
def get_node_outputs(invocation: BaseInvocation, context: CliContext) -> dict[str, NodeField]:
"""Gets the outputs for the specified invocation from the context"""
node_type = type(invocation)
if node_type is not GraphInvocation:
return fields_from_type_hints(get_type_hints(node_type.get_output_type()), invocation.id)
else:
graph: LibraryGraph = context.invoker.services.graph_library.get(context.graph_nodes[invocation.id])
return {e.alias: get_node_output_field(graph, e.alias, invocation.id) for e in graph.exposed_outputs}
def generate_matching_edges(
a: BaseInvocation, b: BaseInvocation, context: CliContext
) -> list[Edge]:
"""Generates all possible edges between two invocations"""
afields = get_node_outputs(a, context)
bfields = get_node_inputs(b, context)
matching_fields = set(afields.keys()).intersection(bfields.keys())
# Remove invalid fields
invalid_fields = set(["type", "id"])
matching_fields = matching_fields.difference(invalid_fields)
# Validate types
matching_fields = [f for f in matching_fields if are_connection_types_compatible(afields[f].field_type, bfields[f].field_type)]
edges = [
Edge(
source=EdgeConnection(node_id=afields[alias].node_path, field=afields[alias].field),
destination=EdgeConnection(node_id=bfields[alias].node_path, field=bfields[alias].field)
)
for alias in matching_fields
]
return edges
class SessionError(Exception):
"""Raised when a session error has occurred"""
pass
def invoke_all(context: CliContext):
"""Runs all invocations in the specified session"""
context.invoker.invoke(context.session, invoke_all=True)
while not context.get_session().is_complete():
# Wait some time
time.sleep(0.1)
# Print any errors
if context.session.has_error():
for n in context.session.errors:
print(
f"Error in node {n} (source node {context.session.prepared_source_mapping[n]}): {context.session.errors[n]}"
)
raise SessionError()
def invoke_cli():
config = Args()
config.parse_args()
model_manager = get_model_manager(config)
# This initializes the autocompleter and returns it.
# Currently nothing is done with the returned Completer
# object, but the object can be used to change autocompletion
# behavior on the fly, if desired.
completer = set_autocompleter(model_manager)
events = EventServiceBase()
metadata = PngMetadataService()
output_folder = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../outputs")
)
# TODO: build a file/path manager?
db_location = os.path.join(output_folder, "invokeai.db")
services = InvocationServices(
model_manager=model_manager,
events=events,
latents = ForwardCacheLatentsStorage(DiskLatentsStorage(f'{output_folder}/latents')),
images=DiskImageStorage(f'{output_folder}/images', metadata_service=metadata),
metadata=metadata,
queue=MemoryInvocationQueue(),
graph_library=SqliteItemStorage[LibraryGraph](
filename=db_location, table_name="graphs"
),
graph_execution_manager=SqliteItemStorage[GraphExecutionState](
filename=db_location, table_name="graph_executions"
),
processor=DefaultInvocationProcessor(),
restoration=RestorationServices(config),
)
system_graphs = create_system_graphs(services.graph_library)
system_graph_names = set([g.name for g in system_graphs])
invoker = Invoker(services)
session: GraphExecutionState = invoker.create_execution_state()
parser = get_command_parser(services)
re_negid = re.compile('^-[0-9]+$')
# Uncomment to print out previous sessions at startup
# print(services.session_manager.list())
context = CliContext(invoker, session, parser)
while True:
try:
cmd_input = input("invoke> ")
except (KeyboardInterrupt, EOFError):
# Ctrl-c exits
break
try:
# Refresh the state of the session
#history = list(get_graph_execution_history(context.session))
history = list(reversed(context.nodes_added))
# Split the command for piping
cmds = cmd_input.split("|")
start_id = len(context.nodes_added)
current_id = start_id
new_invocations = list()
for cmd in cmds:
if cmd is None or cmd.strip() == "":
raise InvalidArgs("Empty command")
# Parse args to create invocation
args = vars(context.parser.parse_args(shlex.split(cmd.strip())))
# Override defaults
for field_name, field_default in context.defaults.items():
if field_name in args:
args[field_name] = field_default
# Parse invocation
command: CliCommand = None # type:ignore
system_graph: LibraryGraph|None = None
if args['type'] in system_graph_names:
system_graph = next(filter(lambda g: g.name == args['type'], system_graphs))
invocation = GraphInvocation(graph=system_graph.graph, id=str(current_id))
for exposed_input in system_graph.exposed_inputs:
if exposed_input.alias in args:
node = invocation.graph.get_node(exposed_input.node_path)
field = exposed_input.field
setattr(node, field, args[exposed_input.alias])
command = CliCommand(command = invocation)
context.graph_nodes[invocation.id] = system_graph.id
else:
args["id"] = current_id
command = CliCommand(command=args)
if command is None:
continue
# Run any CLI commands immediately
if isinstance(command.command, BaseCommand):
# Invoke all current nodes to preserve operation order
invoke_all(context)
# Run the command
command.command.run(context)
continue
# TODO: handle linking with library graphs
# Pipe previous command output (if there was a previous command)
edges: list[Edge] = list()
if len(history) > 0 or current_id != start_id:
from_id = (
history[0] if current_id == start_id else str(current_id - 1)
)
from_node = (
next(filter(lambda n: n[0].id == from_id, new_invocations))[0]
if current_id != start_id
else context.session.graph.get_node(from_id)
)
matching_edges = generate_matching_edges(
from_node, command.command, context
)
edges.extend(matching_edges)
# Parse provided links
if "link_node" in args and args["link_node"]:
for link in args["link_node"]:
node_id = link
if re_negid.match(node_id):
node_id = str(current_id + int(node_id))
link_node = context.session.graph.get_node(node_id)
matching_edges = generate_matching_edges(
link_node, command.command, context
)
matching_destinations = [e.destination for e in matching_edges]
edges = [e for e in edges if e.destination not in matching_destinations]
edges.extend(matching_edges)
if "link" in args and args["link"]:
for link in args["link"]:
edges = [e for e in edges if e.destination.node_id != command.command.id or e.destination.field != link[2]]
node_id = link[0]
if re_negid.match(node_id):
node_id = str(current_id + int(node_id))
# TODO: handle missing input/output
node_output = get_node_outputs(context.session.graph.get_node(node_id), context)[link[1]]
node_input = get_node_inputs(command.command, context)[link[2]]
edges.append(
Edge(
source=EdgeConnection(node_id=node_output.node_path, field=node_output.field),
destination=EdgeConnection(node_id=node_input.node_path, field=node_input.field)
)
)
new_invocations.append((command.command, edges))
current_id = current_id + 1
# Add the node to the session
context.add_node(command.command)
for edge in edges:
print(edge)
context.add_edge(edge)
# Execute all remaining nodes
invoke_all(context)
except InvalidArgs:
print('Invalid command, use "help" to list commands')
continue
except SessionError:
# Start a new session
print("Session error: creating a new session")
context.reset()
except ExitCli:
break
except SystemExit:
continue
invoker.stop()
if __name__ == "__main__":
invoke_cli()

View File

@ -1,12 +0,0 @@
import os
__all__ = []
dirname = os.path.dirname(os.path.abspath(__file__))
for f in os.listdir(dirname):
if (
f != "__init__.py"
and os.path.isfile("%s/%s" % (dirname, f))
and f[-3:] == ".py"
):
__all__.append(f[:-3])

View File

@ -1,131 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from abc import ABC, abstractmethod
from inspect import signature
from typing import get_args, get_type_hints, Dict, List, Literal, TypedDict
from pydantic import BaseModel, Field
from ..services.invocation_services import InvocationServices
class InvocationContext:
services: InvocationServices
graph_execution_state_id: str
def __init__(self, services: InvocationServices, graph_execution_state_id: str):
self.services = services
self.graph_execution_state_id = graph_execution_state_id
class BaseInvocationOutput(BaseModel):
"""Base class for all invocation outputs"""
# All outputs must include a type name like this:
# type: Literal['your_output_name']
@classmethod
def get_all_subclasses_tuple(cls):
subclasses = []
toprocess = [cls]
while len(toprocess) > 0:
next = toprocess.pop(0)
next_subclasses = next.__subclasses__()
subclasses.extend(next_subclasses)
toprocess.extend(next_subclasses)
return tuple(subclasses)
class BaseInvocation(ABC, BaseModel):
"""A node to process inputs and produce outputs.
May use dependency injection in __init__ to receive providers.
"""
# All invocations must include a type name like this:
# type: Literal['your_output_name']
@classmethod
def get_all_subclasses(cls):
subclasses = []
toprocess = [cls]
while len(toprocess) > 0:
next = toprocess.pop(0)
next_subclasses = next.__subclasses__()
subclasses.extend(next_subclasses)
toprocess.extend(next_subclasses)
return subclasses
@classmethod
def get_invocations(cls):
return tuple(BaseInvocation.get_all_subclasses())
@classmethod
def get_invocations_map(cls):
# Get the type strings out of the literals and into a dictionary
return dict(map(lambda t: (get_args(get_type_hints(t)['type'])[0], t),BaseInvocation.get_all_subclasses()))
@classmethod
def get_output_type(cls):
return signature(cls.invoke).return_annotation
@abstractmethod
def invoke(self, context: InvocationContext) -> BaseInvocationOutput:
"""Invoke with provided context and return outputs."""
pass
#fmt: off
id: str = Field(description="The id of this node. Must be unique among all nodes.")
#fmt: on
# TODO: figure out a better way to provide these hints
# TODO: when we can upgrade to python 3.11, we can use the`NotRequired` type instead of `total=False`
class UIConfig(TypedDict, total=False):
type_hints: Dict[
str,
Literal[
"integer",
"float",
"boolean",
"string",
"enum",
"image",
"latents",
"model",
],
]
tags: List[str]
title: str
class CustomisedSchemaExtra(TypedDict):
ui: UIConfig
class InvocationConfig(BaseModel.Config):
"""Customizes pydantic's BaseModel.Config class for use by Invocations.
Provide `schema_extra` a `ui` dict to add hints for generated UIs.
`tags`
- A list of strings, used to categorise invocations.
`type_hints`
- A dict of field types which override the types in the invocation definition.
- Each key should be the name of one of the invocation's fields.
- Each value should be one of the valid types:
- `integer`, `float`, `boolean`, `string`, `enum`, `image`, `latents`, `model`
```python
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["stable-diffusion", "image"],
"type_hints": {
"initial_image": "image",
},
},
}
```
"""
schema_extra: CustomisedSchemaExtra

View File

@ -1,64 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
from typing import Literal, Optional
import numpy as np
import numpy.random
from pydantic import Field
from .baseinvocation import (
BaseInvocation,
InvocationConfig,
InvocationContext,
BaseInvocationOutput,
)
class IntCollectionOutput(BaseInvocationOutput):
"""A collection of integers"""
type: Literal["int_collection"] = "int_collection"
# Outputs
collection: list[int] = Field(default=[], description="The int collection")
class RangeInvocation(BaseInvocation):
"""Creates a range"""
type: Literal["range"] = "range"
# Inputs
start: int = Field(default=0, description="The start of the range")
stop: int = Field(default=10, description="The stop of the range")
step: int = Field(default=1, description="The step of the range")
def invoke(self, context: InvocationContext) -> IntCollectionOutput:
return IntCollectionOutput(
collection=list(range(self.start, self.stop, self.step))
)
class RandomRangeInvocation(BaseInvocation):
"""Creates a collection of random numbers"""
type: Literal["random_range"] = "random_range"
# Inputs
low: int = Field(default=0, description="The inclusive low value")
high: int = Field(
default=np.iinfo(np.int32).max, description="The exclusive high value"
)
size: int = Field(default=1, description="The number of values to generate")
seed: Optional[int] = Field(
ge=0,
le=np.iinfo(np.int32).max,
description="The seed for the RNG",
default_factory=lambda: numpy.random.randint(0, np.iinfo(np.int32).max),
)
def invoke(self, context: InvocationContext) -> IntCollectionOutput:
rng = np.random.default_rng(self.seed)
return IntCollectionOutput(
collection=list(rng.integers(low=self.low, high=self.high, size=self.size))
)

View File

@ -1,69 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Literal
import cv2 as cv
import numpy
from PIL import Image, ImageOps
from pydantic import BaseModel, Field
from invokeai.app.models.image import ImageField, ImageType
from .baseinvocation import BaseInvocation, InvocationContext, InvocationConfig
from .image import ImageOutput, build_image_output
class CvInvocationConfig(BaseModel):
"""Helper class to provide all OpenCV invocations with additional config"""
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["cv", "image"],
},
}
class CvInpaintInvocation(BaseInvocation, CvInvocationConfig):
"""Simple inpaint using opencv."""
#fmt: off
type: Literal["cv_inpaint"] = "cv_inpaint"
# Inputs
image: ImageField = Field(default=None, description="The image to inpaint")
mask: ImageField = Field(default=None, description="The mask to use when inpainting")
#fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
mask = context.services.images.get(self.mask.image_type, self.mask.image_name)
# Convert to cv image/mask
# TODO: consider making these utility functions
cv_image = cv.cvtColor(numpy.array(image.convert("RGB")), cv.COLOR_RGB2BGR)
cv_mask = numpy.array(ImageOps.invert(mask))
# Inpaint
cv_inpainted = cv.inpaint(cv_image, cv_mask, 3, cv.INPAINT_TELEA)
# Convert back to Pillow
# TODO: consider making a utility function
image_inpainted = Image.fromarray(cv.cvtColor(cv_inpainted, cv.COLOR_BGR2RGB))
image_type = ImageType.INTERMEDIATE
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, image_inpainted, metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=image_inpainted,
)

View File

@ -1,281 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from functools import partial
from typing import Literal, Optional, Union
import numpy as np
from torch import Tensor
from pydantic import BaseModel, Field
from invokeai.app.models.image import ImageField, ImageType
from invokeai.app.invocations.util.choose_model import choose_model
from .baseinvocation import BaseInvocation, InvocationContext, InvocationConfig
from .image import ImageOutput, build_image_output
from ...backend.generator import Txt2Img, Img2Img, Inpaint, InvokeAIGenerator
from ...backend.stable_diffusion import PipelineIntermediateState
from ..util.step_callback import stable_diffusion_step_callback
SAMPLER_NAME_VALUES = Literal[tuple(InvokeAIGenerator.schedulers())]
class SDImageInvocation(BaseModel):
"""Helper class to provide all Stable Diffusion raster image invocations with additional config"""
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["stable-diffusion", "image"],
"type_hints": {
"model": "model",
},
},
}
# Text to image
class TextToImageInvocation(BaseInvocation, SDImageInvocation):
"""Generates an image using text2img."""
type: Literal["txt2img"] = "txt2img"
# Inputs
# TODO: consider making prompt optional to enable providing prompt through a link
# fmt: off
prompt: Optional[str] = Field(description="The prompt to generate an image from")
seed: int = Field(default=-1,ge=-1, le=np.iinfo(np.uint32).max, description="The seed to use (-1 for a random seed)", )
steps: int = Field(default=10, gt=0, description="The number of steps to use to generate the image")
width: int = Field(default=512, multiple_of=64, gt=0, description="The width of the resulting image", )
height: int = Field(default=512, multiple_of=64, gt=0, description="The height of the resulting image", )
cfg_scale: float = Field(default=7.5, gt=0, description="The Classifier-Free Guidance, higher values may result in a result closer to the prompt", )
scheduler: SAMPLER_NAME_VALUES = Field(default="k_lms", description="The scheduler to use" )
seamless: bool = Field(default=False, description="Whether or not to generate an image that can tile without seams", )
model: str = Field(default="", description="The model to use (currently ignored)")
progress_images: bool = Field(default=False, description="Whether or not to produce progress images during generation", )
# fmt: on
# TODO: pass this an emitter method or something? or a session for dispatching?
def dispatch_progress(
self,
context: InvocationContext,
source_node_id: str,
intermediate_state: PipelineIntermediateState,
) -> None:
stable_diffusion_step_callback(
context=context,
intermediate_state=intermediate_state,
node=self.dict(),
source_node_id=source_node_id,
)
def invoke(self, context: InvocationContext) -> ImageOutput:
# Handle invalid model parameter
model = choose_model(context.services.model_manager, self.model)
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(
context.graph_execution_state_id
)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
outputs = Txt2Img(model).generate(
prompt=self.prompt,
step_callback=partial(self.dispatch_progress, context, source_node_id),
**self.dict(
exclude={"prompt"}
), # Shorthand for passing all of the parameters above manually
)
# Outputs is an infinite iterator that will return a new InvokeAIGeneratorOutput object
# each time it is called. We only need the first one.
generate_output = next(outputs)
# Results are image and seed, unwrap for now and ignore the seed
# TODO: pre-seed?
# TODO: can this return multiple results? Should it?
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(
image_type, image_name, generate_output.image, metadata
)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=generate_output.image,
)
class ImageToImageInvocation(TextToImageInvocation):
"""Generates an image using img2img."""
type: Literal["img2img"] = "img2img"
# Inputs
image: Union[ImageField, None] = Field(description="The input image")
strength: float = Field(
default=0.75, gt=0, le=1, description="The strength of the original image"
)
fit: bool = Field(
default=True,
description="Whether or not the result should be fit to the aspect ratio of the input image",
)
def dispatch_progress(
self,
context: InvocationContext,
source_node_id: str,
intermediate_state: PipelineIntermediateState,
) -> None:
stable_diffusion_step_callback(
context=context,
intermediate_state=intermediate_state,
node=self.dict(),
source_node_id=source_node_id,
)
def invoke(self, context: InvocationContext) -> ImageOutput:
image = (
None
if self.image is None
else context.services.images.get(
self.image.image_type, self.image.image_name
)
)
mask = None
# Handle invalid model parameter
model = choose_model(context.services.model_manager, self.model)
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(
context.graph_execution_state_id
)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
outputs = Img2Img(model).generate(
prompt=self.prompt,
init_image=image,
init_mask=mask,
step_callback=partial(self.dispatch_progress, context, source_node_id),
**self.dict(
exclude={"prompt", "image", "mask"}
), # Shorthand for passing all of the parameters above manually
)
# Outputs is an infinite iterator that will return a new InvokeAIGeneratorOutput object
# each time it is called. We only need the first one.
generator_output = next(outputs)
result_image = generator_output.image
# Results are image and seed, unwrap for now and ignore the seed
# TODO: pre-seed?
# TODO: can this return multiple results? Should it?
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, result_image, metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=result_image,
)
class InpaintInvocation(ImageToImageInvocation):
"""Generates an image using inpaint."""
type: Literal["inpaint"] = "inpaint"
# Inputs
mask: Union[ImageField, None] = Field(description="The mask")
inpaint_replace: float = Field(
default=0.0,
ge=0.0,
le=1.0,
description="The amount by which to replace masked areas with latent noise",
)
def dispatch_progress(
self,
context: InvocationContext,
source_node_id: str,
intermediate_state: PipelineIntermediateState,
) -> None:
stable_diffusion_step_callback(
context=context,
intermediate_state=intermediate_state,
node=self.dict(),
source_node_id=source_node_id,
)
def invoke(self, context: InvocationContext) -> ImageOutput:
image = (
None
if self.image is None
else context.services.images.get(
self.image.image_type, self.image.image_name
)
)
mask = (
None
if self.mask is None
else context.services.images.get(self.mask.image_type, self.mask.image_name)
)
# Handle invalid model parameter
model = choose_model(context.services.model_manager, self.model)
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(
context.graph_execution_state_id
)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
outputs = Inpaint(model).generate(
prompt=self.prompt,
init_img=image,
init_mask=mask,
step_callback=partial(self.dispatch_progress, context, source_node_id),
**self.dict(
exclude={"prompt", "image", "mask"}
), # Shorthand for passing all of the parameters above manually
)
# Outputs is an infinite iterator that will return a new InvokeAIGeneratorOutput object
# each time it is called. We only need the first one.
generator_output = next(outputs)
result_image = generator_output.image
# Results are image and seed, unwrap for now and ignore the seed
# TODO: pre-seed?
# TODO: can this return multiple results? Should it?
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, result_image, metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=result_image,
)

View File

@ -1,369 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Literal, Optional
import numpy
from PIL import Image, ImageFilter, ImageOps
from pydantic import BaseModel, Field
from ..models.image import ImageField, ImageType
from .baseinvocation import (
BaseInvocation,
BaseInvocationOutput,
InvocationContext,
InvocationConfig,
)
class PILInvocationConfig(BaseModel):
"""Helper class to provide all PIL invocations with additional config"""
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["PIL", "image"],
},
}
class ImageOutput(BaseInvocationOutput):
"""Base class for invocations that output an image"""
# fmt: off
type: Literal["image"] = "image"
image: ImageField = Field(default=None, description="The output image")
width: Optional[int] = Field(default=None, description="The width of the image in pixels")
height: Optional[int] = Field(default=None, description="The height of the image in pixels")
# fmt: on
class Config:
schema_extra = {
"required": ["type", "image", "width", "height", "mode"]
}
def build_image_output(
image_type: ImageType, image_name: str, image: Image.Image
) -> ImageOutput:
"""Builds an ImageOutput and its ImageField"""
image_field = ImageField(
image_name=image_name,
image_type=image_type,
)
return ImageOutput(
image=image_field,
width=image.width,
height=image.height,
mode=image.mode,
)
class MaskOutput(BaseInvocationOutput):
"""Base class for invocations that output a mask"""
# fmt: off
type: Literal["mask"] = "mask"
mask: ImageField = Field(default=None, description="The output mask")
# fmt: on
class Config:
schema_extra = {
"required": [
"type",
"mask",
]
}
class LoadImageInvocation(BaseInvocation):
"""Load an image and provide it as output."""
# fmt: off
type: Literal["load_image"] = "load_image"
# Inputs
image_type: ImageType = Field(description="The type of the image")
image_name: str = Field(description="The name of the image")
# fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(self.image_type, self.image_name)
return build_image_output(
image_type=self.image_type,
image_name=self.image_name,
image=image,
)
class ShowImageInvocation(BaseInvocation):
"""Displays a provided image, and passes it forward in the pipeline."""
type: Literal["show_image"] = "show_image"
# Inputs
image: ImageField = Field(default=None, description="The image to show")
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
if image:
image.show()
# TODO: how to handle failure?
return build_image_output(
image_type=self.image.image_type,
image_name=self.image.image_name,
image=image,
)
class CropImageInvocation(BaseInvocation, PILInvocationConfig):
"""Crops an image to a specified box. The box can be outside of the image."""
# fmt: off
type: Literal["crop"] = "crop"
# Inputs
image: ImageField = Field(default=None, description="The image to crop")
x: int = Field(default=0, description="The left x coordinate of the crop rectangle")
y: int = Field(default=0, description="The top y coordinate of the crop rectangle")
width: int = Field(default=512, gt=0, description="The width of the crop rectangle")
height: int = Field(default=512, gt=0, description="The height of the crop rectangle")
# fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
image_crop = Image.new(
mode="RGBA", size=(self.width, self.height), color=(0, 0, 0, 0)
)
image_crop.paste(image, (-self.x, -self.y))
image_type = ImageType.INTERMEDIATE
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, image_crop, metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=image_crop,
)
class PasteImageInvocation(BaseInvocation, PILInvocationConfig):
"""Pastes an image into another image."""
# fmt: off
type: Literal["paste"] = "paste"
# Inputs
base_image: ImageField = Field(default=None, description="The base image")
image: ImageField = Field(default=None, description="The image to paste")
mask: Optional[ImageField] = Field(default=None, description="The mask to use when pasting")
x: int = Field(default=0, description="The left x coordinate at which to paste the image")
y: int = Field(default=0, description="The top y coordinate at which to paste the image")
# fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
base_image = context.services.images.get(
self.base_image.image_type, self.base_image.image_name
)
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
mask = (
None
if self.mask is None
else ImageOps.invert(
context.services.images.get(self.mask.image_type, self.mask.image_name)
)
)
# TODO: probably shouldn't invert mask here... should user be required to do it?
min_x = min(0, self.x)
min_y = min(0, self.y)
max_x = max(base_image.width, image.width + self.x)
max_y = max(base_image.height, image.height + self.y)
new_image = Image.new(
mode="RGBA", size=(max_x - min_x, max_y - min_y), color=(0, 0, 0, 0)
)
new_image.paste(base_image, (abs(min_x), abs(min_y)))
new_image.paste(image, (max(0, self.x), max(0, self.y)), mask=mask)
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, new_image, metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=new_image,
)
class MaskFromAlphaInvocation(BaseInvocation, PILInvocationConfig):
"""Extracts the alpha channel of an image as a mask."""
# fmt: off
type: Literal["tomask"] = "tomask"
# Inputs
image: ImageField = Field(default=None, description="The image to create the mask from")
invert: bool = Field(default=False, description="Whether or not to invert the mask")
# fmt: on
def invoke(self, context: InvocationContext) -> MaskOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
image_mask = image.split()[-1]
if self.invert:
image_mask = ImageOps.invert(image_mask)
image_type = ImageType.INTERMEDIATE
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, image_mask, metadata)
return MaskOutput(mask=ImageField(image_type=image_type, image_name=image_name))
class BlurInvocation(BaseInvocation, PILInvocationConfig):
"""Blurs an image"""
# fmt: off
type: Literal["blur"] = "blur"
# Inputs
image: ImageField = Field(default=None, description="The image to blur")
radius: float = Field(default=8.0, ge=0, description="The blur radius")
blur_type: Literal["gaussian", "box"] = Field(default="gaussian", description="The type of blur")
# fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
blur = (
ImageFilter.GaussianBlur(self.radius)
if self.blur_type == "gaussian"
else ImageFilter.BoxBlur(self.radius)
)
blur_image = image.filter(blur)
image_type = ImageType.INTERMEDIATE
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, blur_image, metadata)
return build_image_output(
image_type=image_type, image_name=image_name, image=blur_image
)
class LerpInvocation(BaseInvocation, PILInvocationConfig):
"""Linear interpolation of all pixels of an image"""
# fmt: off
type: Literal["lerp"] = "lerp"
# Inputs
image: ImageField = Field(default=None, description="The image to lerp")
min: int = Field(default=0, ge=0, le=255, description="The minimum output value")
max: int = Field(default=255, ge=0, le=255, description="The maximum output value")
# fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
image_arr = numpy.asarray(image, dtype=numpy.float32) / 255
image_arr = image_arr * (self.max - self.min) + self.max
lerp_image = Image.fromarray(numpy.uint8(image_arr))
image_type = ImageType.INTERMEDIATE
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, lerp_image, metadata)
return build_image_output(
image_type=image_type, image_name=image_name, image=lerp_image
)
class InverseLerpInvocation(BaseInvocation, PILInvocationConfig):
"""Inverse linear interpolation of all pixels of an image"""
# fmt: off
type: Literal["ilerp"] = "ilerp"
# Inputs
image: ImageField = Field(default=None, description="The image to lerp")
min: int = Field(default=0, ge=0, le=255, description="The minimum input value")
max: int = Field(default=255, ge=0, le=255, description="The maximum input value")
# fmt: on
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
image_arr = numpy.asarray(image, dtype=numpy.float32)
image_arr = (
numpy.minimum(
numpy.maximum(image_arr - self.min, 0) / float(self.max - self.min), 1
)
* 255
)
ilerp_image = Image.fromarray(numpy.uint8(image_arr))
image_type = ImageType.INTERMEDIATE
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, ilerp_image, metadata)
return build_image_output(
image_type=image_type, image_name=image_name, image=ilerp_image
)

View File

@ -1,371 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
import random
from typing import Literal, Optional
from pydantic import BaseModel, Field
import torch
from invokeai.app.invocations.util.choose_model import choose_model
from invokeai.app.util.step_callback import stable_diffusion_step_callback
from ...backend.model_management.model_manager import ModelManager
from ...backend.util.devices import choose_torch_device, torch_dtype
from ...backend.stable_diffusion.diffusion.shared_invokeai_diffusion import PostprocessingSettings
from ...backend.image_util.seamless import configure_model_padding
from ...backend.prompting.conditioning import get_uc_and_c_and_ec
from ...backend.stable_diffusion.diffusers_pipeline import ConditioningData, StableDiffusionGeneratorPipeline
from .baseinvocation import BaseInvocation, BaseInvocationOutput, InvocationContext, InvocationConfig
import numpy as np
from ..services.image_storage import ImageType
from .baseinvocation import BaseInvocation, InvocationContext
from .image import ImageField, ImageOutput, build_image_output
from ...backend.stable_diffusion import PipelineIntermediateState
from diffusers.schedulers import SchedulerMixin as Scheduler
import diffusers
from diffusers import DiffusionPipeline
class LatentsField(BaseModel):
"""A latents field used for passing latents between invocations"""
latents_name: Optional[str] = Field(default=None, description="The name of the latents")
class Config:
schema_extra = {"required": ["latents_name"]}
class LatentsOutput(BaseInvocationOutput):
"""Base class for invocations that output latents"""
#fmt: off
type: Literal["latent_output"] = "latent_output"
latents: LatentsField = Field(default=None, description="The output latents")
#fmt: on
class NoiseOutput(BaseInvocationOutput):
"""Invocation noise output"""
#fmt: off
type: Literal["noise_output"] = "noise_output"
noise: LatentsField = Field(default=None, description="The output noise")
#fmt: on
# TODO: this seems like a hack
scheduler_map = dict(
ddim=diffusers.DDIMScheduler,
dpmpp_2=diffusers.DPMSolverMultistepScheduler,
k_dpm_2=diffusers.KDPM2DiscreteScheduler,
k_dpm_2_a=diffusers.KDPM2AncestralDiscreteScheduler,
k_dpmpp_2=diffusers.DPMSolverMultistepScheduler,
k_euler=diffusers.EulerDiscreteScheduler,
k_euler_a=diffusers.EulerAncestralDiscreteScheduler,
k_heun=diffusers.HeunDiscreteScheduler,
k_lms=diffusers.LMSDiscreteScheduler,
plms=diffusers.PNDMScheduler,
)
SAMPLER_NAME_VALUES = Literal[
tuple(list(scheduler_map.keys()))
]
def get_scheduler(scheduler_name:str, model: StableDiffusionGeneratorPipeline)->Scheduler:
scheduler_class = scheduler_map.get(scheduler_name,'ddim')
scheduler = scheduler_class.from_config(model.scheduler.config)
# hack copied over from generate.py
if not hasattr(scheduler, 'uses_inpainting_model'):
scheduler.uses_inpainting_model = lambda: False
return scheduler
def get_noise(width:int, height:int, device:torch.device, seed:int = 0, latent_channels:int=4, use_mps_noise:bool=False, downsampling_factor:int = 8):
# limit noise to only the diffusion image channels, not the mask channels
input_channels = min(latent_channels, 4)
use_device = "cpu" if (use_mps_noise or device.type == "mps") else device
generator = torch.Generator(device=use_device).manual_seed(seed)
x = torch.randn(
[
1,
input_channels,
height // downsampling_factor,
width // downsampling_factor,
],
dtype=torch_dtype(device),
device=use_device,
generator=generator,
).to(device)
# if self.perlin > 0.0:
# perlin_noise = self.get_perlin_noise(
# width // self.downsampling_factor, height // self.downsampling_factor
# )
# x = (1 - self.perlin) * x + self.perlin * perlin_noise
return x
def random_seed():
return random.randint(0, np.iinfo(np.uint32).max)
class NoiseInvocation(BaseInvocation):
"""Generates latent noise."""
type: Literal["noise"] = "noise"
# Inputs
seed: int = Field(ge=0, le=np.iinfo(np.uint32).max, description="The seed to use", default_factory=random_seed)
width: int = Field(default=512, multiple_of=64, gt=0, description="The width of the resulting noise", )
height: int = Field(default=512, multiple_of=64, gt=0, description="The height of the resulting noise", )
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["latents", "noise"],
},
}
def invoke(self, context: InvocationContext) -> NoiseOutput:
device = torch.device(choose_torch_device())
noise = get_noise(self.width, self.height, device, self.seed)
name = f'{context.graph_execution_state_id}__{self.id}'
context.services.latents.set(name, noise)
return NoiseOutput(
noise=LatentsField(latents_name=name)
)
# Text to image
class TextToLatentsInvocation(BaseInvocation):
"""Generates latents from a prompt."""
type: Literal["t2l"] = "t2l"
# Inputs
# TODO: consider making prompt optional to enable providing prompt through a link
# fmt: off
prompt: Optional[str] = Field(description="The prompt to generate an image from")
seed: int = Field(default=-1,ge=-1, le=np.iinfo(np.uint32).max, description="The seed to use (-1 for a random seed)", )
noise: Optional[LatentsField] = Field(description="The noise to use")
steps: int = Field(default=10, gt=0, description="The number of steps to use to generate the image")
width: int = Field(default=512, multiple_of=64, gt=0, description="The width of the resulting image", )
height: int = Field(default=512, multiple_of=64, gt=0, description="The height of the resulting image", )
cfg_scale: float = Field(default=7.5, gt=0, description="The Classifier-Free Guidance, higher values may result in a result closer to the prompt", )
scheduler: SAMPLER_NAME_VALUES = Field(default="k_lms", description="The scheduler to use" )
seamless: bool = Field(default=False, description="Whether or not to generate an image that can tile without seams", )
seamless_axes: str = Field(default="", description="The axes to tile the image on, 'x' and/or 'y'")
model: str = Field(default="", description="The model to use (currently ignored)")
progress_images: bool = Field(default=False, description="Whether or not to produce progress images during generation", )
# fmt: on
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["latents", "image"],
"type_hints": {
"model": "model"
}
},
}
# TODO: pass this an emitter method or something? or a session for dispatching?
def dispatch_progress(
self, context: InvocationContext, source_node_id: str, intermediate_state: PipelineIntermediateState
) -> None:
stable_diffusion_step_callback(
context=context,
intermediate_state=intermediate_state,
node=self.dict(),
source_node_id=source_node_id,
)
def get_model(self, model_manager: ModelManager) -> StableDiffusionGeneratorPipeline:
model_info = choose_model(model_manager, self.model)
model_name = model_info['model_name']
model_hash = model_info['hash']
model: StableDiffusionGeneratorPipeline = model_info['model']
model.scheduler = get_scheduler(
model=model,
scheduler_name=self.scheduler
)
if isinstance(model, DiffusionPipeline):
for component in [model.unet, model.vae]:
configure_model_padding(component,
self.seamless,
self.seamless_axes
)
else:
configure_model_padding(model,
self.seamless,
self.seamless_axes
)
return model
def get_conditioning_data(self, model: StableDiffusionGeneratorPipeline) -> ConditioningData:
uc, c, extra_conditioning_info = get_uc_and_c_and_ec(self.prompt, model=model)
conditioning_data = ConditioningData(
uc,
c,
self.cfg_scale,
extra_conditioning_info,
postprocessing_settings=PostprocessingSettings(
threshold=0.0,#threshold,
warmup=0.2,#warmup,
h_symmetry_time_pct=None,#h_symmetry_time_pct,
v_symmetry_time_pct=None#v_symmetry_time_pct,
),
).add_scheduler_args_if_applicable(model.scheduler, eta=None)#ddim_eta)
return conditioning_data
def invoke(self, context: InvocationContext) -> LatentsOutput:
noise = context.services.latents.get(self.noise.latents_name)
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(context.graph_execution_state_id)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
def step_callback(state: PipelineIntermediateState):
self.dispatch_progress(context, source_node_id, state)
model = self.get_model(context.services.model_manager)
conditioning_data = self.get_conditioning_data(model)
# TODO: Verify the noise is the right size
result_latents, result_attention_map_saver = model.latents_from_embeddings(
latents=torch.zeros_like(noise, dtype=torch_dtype(model.device)),
noise=noise,
num_inference_steps=self.steps,
conditioning_data=conditioning_data,
callback=step_callback
)
# https://discuss.huggingface.co/t/memory-usage-by-later-pipeline-stages/23699
torch.cuda.empty_cache()
name = f'{context.graph_execution_state_id}__{self.id}'
context.services.latents.set(name, result_latents)
return LatentsOutput(
latents=LatentsField(latents_name=name)
)
class LatentsToLatentsInvocation(TextToLatentsInvocation):
"""Generates latents using latents as base image."""
type: Literal["l2l"] = "l2l"
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["latents"],
"type_hints": {
"model": "model"
}
},
}
# Inputs
latents: Optional[LatentsField] = Field(description="The latents to use as a base image")
strength: float = Field(default=0.5, description="The strength of the latents to use")
def invoke(self, context: InvocationContext) -> LatentsOutput:
noise = context.services.latents.get(self.noise.latents_name)
latent = context.services.latents.get(self.latents.latents_name)
# Get the source node id (we are invoking the prepared node)
graph_execution_state = context.services.graph_execution_manager.get(context.graph_execution_state_id)
source_node_id = graph_execution_state.prepared_source_mapping[self.id]
def step_callback(state: PipelineIntermediateState):
self.dispatch_progress(context, source_node_id, state)
model = self.get_model(context.services.model_manager)
conditioning_data = self.get_conditioning_data(model)
# TODO: Verify the noise is the right size
initial_latents = latent if self.strength < 1.0 else torch.zeros_like(
latent, device=model.device, dtype=latent.dtype
)
timesteps, _ = model.get_img2img_timesteps(
self.steps,
self.strength,
device=model.device,
)
result_latents, result_attention_map_saver = model.latents_from_embeddings(
latents=initial_latents,
timesteps=timesteps,
noise=noise,
num_inference_steps=self.steps,
conditioning_data=conditioning_data,
callback=step_callback
)
# https://discuss.huggingface.co/t/memory-usage-by-later-pipeline-stages/23699
torch.cuda.empty_cache()
name = f'{context.graph_execution_state_id}__{self.id}'
context.services.latents.set(name, result_latents)
return LatentsOutput(
latents=LatentsField(latents_name=name)
)
# Latent to image
class LatentsToImageInvocation(BaseInvocation):
"""Generates an image from latents."""
type: Literal["l2i"] = "l2i"
# Inputs
latents: Optional[LatentsField] = Field(description="The latents to generate an image from")
model: str = Field(default="", description="The model to use")
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["latents", "image"],
"type_hints": {
"model": "model"
}
},
}
@torch.no_grad()
def invoke(self, context: InvocationContext) -> ImageOutput:
latents = context.services.latents.get(self.latents.latents_name)
# TODO: this only really needs the vae
model_info = choose_model(context.services.model_manager, self.model)
model: StableDiffusionGeneratorPipeline = model_info['model']
with torch.inference_mode():
np_image = model.decode_latents(latents)
image = model.numpy_to_pil(np_image)[0]
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, image, metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=image
)

View File

@ -1,75 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
from typing import Literal
from pydantic import BaseModel, Field
from .baseinvocation import BaseInvocation, BaseInvocationOutput, InvocationContext, InvocationConfig
class MathInvocationConfig(BaseModel):
"""Helper class to provide all math invocations with additional config"""
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["math"],
}
}
class IntOutput(BaseInvocationOutput):
"""An integer output"""
#fmt: off
type: Literal["int_output"] = "int_output"
a: int = Field(default=None, description="The output integer")
#fmt: on
class AddInvocation(BaseInvocation, MathInvocationConfig):
"""Adds two numbers"""
#fmt: off
type: Literal["add"] = "add"
a: int = Field(default=0, description="The first number")
b: int = Field(default=0, description="The second number")
#fmt: on
def invoke(self, context: InvocationContext) -> IntOutput:
return IntOutput(a=self.a + self.b)
class SubtractInvocation(BaseInvocation, MathInvocationConfig):
"""Subtracts two numbers"""
#fmt: off
type: Literal["sub"] = "sub"
a: int = Field(default=0, description="The first number")
b: int = Field(default=0, description="The second number")
#fmt: on
def invoke(self, context: InvocationContext) -> IntOutput:
return IntOutput(a=self.a - self.b)
class MultiplyInvocation(BaseInvocation, MathInvocationConfig):
"""Multiplies two numbers"""
#fmt: off
type: Literal["mul"] = "mul"
a: int = Field(default=0, description="The first number")
b: int = Field(default=0, description="The second number")
#fmt: on
def invoke(self, context: InvocationContext) -> IntOutput:
return IntOutput(a=self.a * self.b)
class DivideInvocation(BaseInvocation, MathInvocationConfig):
"""Divides two numbers"""
#fmt: off
type: Literal["div"] = "div"
a: int = Field(default=0, description="The first number")
b: int = Field(default=0, description="The second number")
#fmt: on
def invoke(self, context: InvocationContext) -> IntOutput:
return IntOutput(a=int(self.a / self.b))

View File

@ -1,18 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
from typing import Literal
from pydantic import Field
from .baseinvocation import BaseInvocation, BaseInvocationOutput, InvocationContext
from .math import IntOutput
# Pass-through parameter nodes - used by subgraphs
class ParamIntInvocation(BaseInvocation):
"""An integer parameter"""
#fmt: off
type: Literal["param_int"] = "param_int"
a: int = Field(default=0, description="The integer value")
#fmt: on
def invoke(self, context: InvocationContext) -> IntOutput:
return IntOutput(a=self.a)

View File

@ -1,22 +0,0 @@
from typing import Literal
from pydantic.fields import Field
from .baseinvocation import BaseInvocationOutput
class PromptOutput(BaseInvocationOutput):
"""Base class for invocations that output a prompt"""
#fmt: off
type: Literal["prompt"] = "prompt"
prompt: str = Field(default=None, description="The output prompt")
#fmt: on
class Config:
schema_extra = {
'required': [
'type',
'prompt',
]
}

View File

@ -1,56 +0,0 @@
from typing import Literal, Union
from pydantic import Field
from invokeai.app.models.image import ImageField, ImageType
from .baseinvocation import BaseInvocation, InvocationContext, InvocationConfig
from .image import ImageOutput, build_image_output
class RestoreFaceInvocation(BaseInvocation):
"""Restores faces in an image."""
#fmt: off
type: Literal["restore_face"] = "restore_face"
# Inputs
image: Union[ImageField, None] = Field(description="The input image")
strength: float = Field(default=0.75, gt=0, le=1, description="The strength of the restoration" )
#fmt: on
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["restoration", "image"],
},
}
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
results = context.services.restoration.upscale_and_reconstruct(
image_list=[[image, 0]],
upscale=None,
strength=self.strength, # GFPGAN strength
save_original=False,
image_callback=None,
)
# Results are image and seed, unwrap for now
# TODO: can this return multiple results?
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, results[0][0], metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=results[0][0]
)

View File

@ -1,60 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Literal, Union
from pydantic import Field
from invokeai.app.models.image import ImageField, ImageType
from .baseinvocation import BaseInvocation, InvocationContext, InvocationConfig
from .image import ImageOutput, build_image_output
class UpscaleInvocation(BaseInvocation):
"""Upscales an image."""
#fmt: off
type: Literal["upscale"] = "upscale"
# Inputs
image: Union[ImageField, None] = Field(description="The input image", default=None)
strength: float = Field(default=0.75, gt=0, le=1, description="The strength")
level: Literal[2, 4] = Field(default=2, description="The upscale level")
#fmt: on
# Schema customisation
class Config(InvocationConfig):
schema_extra = {
"ui": {
"tags": ["upscaling", "image"],
},
}
def invoke(self, context: InvocationContext) -> ImageOutput:
image = context.services.images.get(
self.image.image_type, self.image.image_name
)
results = context.services.restoration.upscale_and_reconstruct(
image_list=[[image, 0]],
upscale=(self.level, self.strength),
strength=0.0, # GFPGAN strength
save_original=False,
image_callback=None,
)
# Results are image and seed, unwrap for now
# TODO: can this return multiple results?
image_type = ImageType.RESULT
image_name = context.services.images.create_name(
context.graph_execution_state_id, self.id
)
metadata = context.services.metadata.build_metadata(
session_id=context.graph_execution_state_id, node=self
)
context.services.images.save(image_type, image_name, results[0][0], metadata)
return build_image_output(
image_type=image_type,
image_name=image_name,
image=results[0][0]
)

View File

@ -1,14 +0,0 @@
from invokeai.backend.model_management.model_manager import ModelManager
def choose_model(model_manager: ModelManager, model_name: str):
"""Returns the default model if the `model_name` not a valid model, else returns the selected model."""
if model_manager.valid_model(model_name):
model = model_manager.get_model(model_name)
else:
model = model_manager.get_model()
print(
f"* Warning: '{model_name}' is not a valid model name. Using default model \'{model['model_name']}\' instead."
)
return model

View File

@ -1,3 +0,0 @@
class CanceledException(Exception):
"""Execution canceled by user."""
pass

View File

@ -1,29 +0,0 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class ImageType(str, Enum):
RESULT = "results"
INTERMEDIATE = "intermediates"
UPLOAD = "uploads"
def is_image_type(obj):
try:
ImageType(obj)
except ValueError:
return False
return True
class ImageField(BaseModel):
"""An image field used for passing image objects between invocations"""
image_type: ImageType = Field(
default=ImageType.RESULT, description="The type of the image"
)
image_name: Optional[str] = Field(default=None, description="The name of the image")
class Config:
schema_extra = {"required": ["image_type", "image_name"]}

View File

@ -1,56 +0,0 @@
from ..invocations.latent import LatentsToImageInvocation, NoiseInvocation, TextToLatentsInvocation
from ..invocations.params import ParamIntInvocation
from .graph import Edge, EdgeConnection, ExposedNodeInput, ExposedNodeOutput, Graph, LibraryGraph
from .item_storage import ItemStorageABC
default_text_to_image_graph_id = '539b2af5-2b4d-4d8c-8071-e54a3255fc74'
def create_text_to_image() -> LibraryGraph:
return LibraryGraph(
id=default_text_to_image_graph_id,
name='t2i',
description='Converts text to an image',
graph=Graph(
nodes={
'width': ParamIntInvocation(id='width', a=512),
'height': ParamIntInvocation(id='height', a=512),
'3': NoiseInvocation(id='3'),
'4': TextToLatentsInvocation(id='4'),
'5': LatentsToImageInvocation(id='5')
},
edges=[
Edge(source=EdgeConnection(node_id='width', field='a'), destination=EdgeConnection(node_id='3', field='width')),
Edge(source=EdgeConnection(node_id='height', field='a'), destination=EdgeConnection(node_id='3', field='height')),
Edge(source=EdgeConnection(node_id='width', field='a'), destination=EdgeConnection(node_id='4', field='width')),
Edge(source=EdgeConnection(node_id='height', field='a'), destination=EdgeConnection(node_id='4', field='height')),
Edge(source=EdgeConnection(node_id='3', field='noise'), destination=EdgeConnection(node_id='4', field='noise')),
Edge(source=EdgeConnection(node_id='4', field='latents'), destination=EdgeConnection(node_id='5', field='latents')),
]
),
exposed_inputs=[
ExposedNodeInput(node_path='4', field='prompt', alias='prompt'),
ExposedNodeInput(node_path='width', field='a', alias='width'),
ExposedNodeInput(node_path='height', field='a', alias='height')
],
exposed_outputs=[
ExposedNodeOutput(node_path='5', field='image', alias='image')
])
def create_system_graphs(graph_library: ItemStorageABC[LibraryGraph]) -> list[LibraryGraph]:
"""Creates the default system graphs, or adds new versions if the old ones don't match"""
graphs: list[LibraryGraph] = list()
text_to_image = graph_library.get(default_text_to_image_graph_id)
# TODO: Check if the graph is the same as the default one, and if not, update it
#if text_to_image is None:
text_to_image = create_text_to_image()
graph_library.set(text_to_image)
graphs.append(text_to_image)
return graphs

View File

@ -1,103 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Any
from invokeai.app.api.models.images import ProgressImage
from invokeai.app.util.misc import get_timestamp
class EventServiceBase:
session_event: str = "session_event"
"""Basic event bus, to have an empty stand-in when not needed"""
def dispatch(self, event_name: str, payload: Any) -> None:
pass
def __emit_session_event(self, event_name: str, payload: dict) -> None:
payload["timestamp"] = get_timestamp()
self.dispatch(
event_name=EventServiceBase.session_event,
payload=dict(event=event_name, data=payload),
)
# Define events here for every event in the system.
# This will make them easier to integrate until we find a schema generator.
def emit_generator_progress(
self,
graph_execution_state_id: str,
node: dict,
source_node_id: str,
progress_image: ProgressImage | None,
step: int,
total_steps: int,
) -> None:
"""Emitted when there is generation progress"""
self.__emit_session_event(
event_name="generator_progress",
payload=dict(
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
progress_image=progress_image.dict() if progress_image is not None else None,
step=step,
total_steps=total_steps,
),
)
def emit_invocation_complete(
self,
graph_execution_state_id: str,
result: dict,
node: dict,
source_node_id: str,
) -> None:
"""Emitted when an invocation has completed"""
self.__emit_session_event(
event_name="invocation_complete",
payload=dict(
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
result=result,
),
)
def emit_invocation_error(
self,
graph_execution_state_id: str,
node: dict,
source_node_id: str,
error: str,
) -> None:
"""Emitted when an invocation has completed"""
self.__emit_session_event(
event_name="invocation_error",
payload=dict(
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
error=error,
),
)
def emit_invocation_started(
self, graph_execution_state_id: str, node: dict, source_node_id: str
) -> None:
"""Emitted when an invocation has started"""
self.__emit_session_event(
event_name="invocation_started",
payload=dict(
graph_execution_state_id=graph_execution_state_id,
node=node,
source_node_id=source_node_id,
),
)
def emit_graph_execution_complete(self, graph_execution_state_id: str) -> None:
"""Emitted when a session has completed all invocations"""
self.__emit_session_event(
event_name="graph_execution_state_complete",
payload=dict(
graph_execution_state_id=graph_execution_state_id,
),
)

File diff suppressed because it is too large Load Diff

View File

@ -1,238 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import os
from glob import glob
from abc import ABC, abstractmethod
from pathlib import Path
from queue import Queue
from typing import Dict, List, Tuple
from PIL.Image import Image
import PIL.Image as PILImage
from invokeai.app.api.models.images import ImageResponse, ImageResponseMetadata
from invokeai.app.models.image import ImageType
from invokeai.app.services.metadata import (
InvokeAIMetadata,
MetadataServiceBase,
build_invokeai_metadata_pnginfo,
)
from invokeai.app.services.item_storage import PaginatedResults
from invokeai.app.util.misc import get_timestamp
from invokeai.app.util.thumbnails import get_thumbnail_name, make_thumbnail
class ImageStorageBase(ABC):
"""Responsible for storing and retrieving images."""
@abstractmethod
def get(self, image_type: ImageType, image_name: str) -> Image:
"""Retrieves an image as PIL Image."""
pass
@abstractmethod
def list(
self, image_type: ImageType, page: int = 0, per_page: int = 10
) -> PaginatedResults[ImageResponse]:
"""Gets a paginated list of images."""
pass
# TODO: make this a bit more flexible for e.g. cloud storage
@abstractmethod
def get_path(
self, image_type: ImageType, image_name: str, is_thumbnail: bool = False
) -> str:
"""Gets the path to an image or its thumbnail."""
pass
# TODO: make this a bit more flexible for e.g. cloud storage
@abstractmethod
def validate_path(self, path: str) -> bool:
"""Validates an image path."""
pass
@abstractmethod
def save(
self,
image_type: ImageType,
image_name: str,
image: Image,
metadata: InvokeAIMetadata | None = None,
) -> Tuple[str, str, int]:
"""Saves an image and a 256x256 WEBP thumbnail. Returns a tuple of the image path, thumbnail path, and created timestamp."""
pass
@abstractmethod
def delete(self, image_type: ImageType, image_name: str) -> None:
"""Deletes an image and its thumbnail (if one exists)."""
pass
def create_name(self, context_id: str, node_id: str) -> str:
"""Creates a unique contextual image filename."""
return f"{context_id}_{node_id}_{str(get_timestamp())}.png"
class DiskImageStorage(ImageStorageBase):
"""Stores images on disk"""
__output_folder: str
__cache_ids: Queue # TODO: this is an incredibly naive cache
__cache: Dict[str, Image]
__max_cache_size: int
__metadata_service: MetadataServiceBase
def __init__(self, output_folder: str, metadata_service: MetadataServiceBase):
self.__output_folder = output_folder
self.__cache = dict()
self.__cache_ids = Queue()
self.__max_cache_size = 10 # TODO: get this from config
self.__metadata_service = metadata_service
Path(output_folder).mkdir(parents=True, exist_ok=True)
# TODO: don't hard-code. get/save/delete should maybe take subpath?
for image_type in ImageType:
Path(os.path.join(output_folder, image_type)).mkdir(
parents=True, exist_ok=True
)
Path(os.path.join(output_folder, image_type, "thumbnails")).mkdir(
parents=True, exist_ok=True
)
def list(
self, image_type: ImageType, page: int = 0, per_page: int = 10
) -> PaginatedResults[ImageResponse]:
dir_path = os.path.join(self.__output_folder, image_type)
image_paths = glob(f"{dir_path}/*.png")
count = len(image_paths)
sorted_image_paths = sorted(
glob(f"{dir_path}/*.png"), key=os.path.getctime, reverse=True
)
page_of_image_paths = sorted_image_paths[
page * per_page : (page + 1) * per_page
]
page_of_images: List[ImageResponse] = []
for path in page_of_image_paths:
filename = os.path.basename(path)
img = PILImage.open(path)
invokeai_metadata = self.__metadata_service.get_metadata(img)
page_of_images.append(
ImageResponse(
image_type=image_type.value,
image_name=filename,
# TODO: DiskImageStorage should not be building URLs...?
image_url=f"api/v1/images/{image_type.value}/{filename}",
thumbnail_url=f"api/v1/images/{image_type.value}/thumbnails/{os.path.splitext(filename)[0]}.webp",
# TODO: Creation of this object should happen elsewhere (?), just making it fit here so it works
metadata=ImageResponseMetadata(
created=int(os.path.getctime(path)),
width=img.width,
height=img.height,
invokeai=invokeai_metadata,
),
)
)
page_count_trunc = int(count / per_page)
page_count_mod = count % per_page
page_count = page_count_trunc if page_count_mod == 0 else page_count_trunc + 1
return PaginatedResults[ImageResponse](
items=page_of_images,
page=page,
pages=page_count,
per_page=per_page,
total=count,
)
def get(self, image_type: ImageType, image_name: str) -> Image:
image_path = self.get_path(image_type, image_name)
cache_item = self.__get_cache(image_path)
if cache_item:
return cache_item
image = PILImage.open(image_path)
self.__set_cache(image_path, image)
return image
# TODO: make this a bit more flexible for e.g. cloud storage
def get_path(
self, image_type: ImageType, image_name: str, is_thumbnail: bool = False
) -> str:
# strip out any relative path shenanigans
basename = os.path.basename(image_name)
if is_thumbnail:
path = os.path.join(
self.__output_folder, image_type, "thumbnails", basename
)
else:
path = os.path.join(self.__output_folder, image_type, basename)
return path
def validate_path(self, path: str) -> bool:
try:
os.stat(path)
return True
except Exception:
return False
def save(
self,
image_type: ImageType,
image_name: str,
image: Image,
metadata: InvokeAIMetadata | None = None,
) -> Tuple[str, str, int]:
image_path = self.get_path(image_type, image_name)
# TODO: Reading the image and then saving it strips the metadata...
if metadata:
pnginfo = build_invokeai_metadata_pnginfo(metadata=metadata)
image.save(image_path, "PNG", pnginfo=pnginfo)
else:
image.save(image_path) # this saved image has an empty info
thumbnail_name = get_thumbnail_name(image_name)
thumbnail_path = self.get_path(image_type, thumbnail_name, is_thumbnail=True)
thumbnail_image = make_thumbnail(image)
thumbnail_image.save(thumbnail_path)
self.__set_cache(image_path, image)
self.__set_cache(thumbnail_path, thumbnail_image)
return (image_path, thumbnail_path, int(os.path.getctime(image_path)))
def delete(self, image_type: ImageType, image_name: str) -> None:
image_path = self.get_path(image_type, image_name)
thumbnail_path = self.get_path(image_type, image_name, True)
if os.path.exists(image_path):
os.remove(image_path)
if image_path in self.__cache:
del self.__cache[image_path]
if os.path.exists(thumbnail_path):
os.remove(thumbnail_path)
if thumbnail_path in self.__cache:
del self.__cache[thumbnail_path]
def __get_cache(self, image_name: str) -> Image:
return None if image_name not in self.__cache else self.__cache[image_name]
def __set_cache(self, image_name: str, image: Image):
if not image_name in self.__cache:
self.__cache[image_name] = image
self.__cache_ids.put(
image_name
) # TODO: this should refresh position for LRU cache
if len(self.__cache) > self.__max_cache_size:
cache_id = self.__cache_ids.get()
del self.__cache[cache_id]

View File

@ -1,68 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
import time
from abc import ABC, abstractmethod
from queue import Queue
from pydantic import BaseModel, Field
class InvocationQueueItem(BaseModel):
graph_execution_state_id: str = Field(description="The ID of the graph execution state")
invocation_id: str = Field(description="The ID of the node being invoked")
invoke_all: bool = Field(default=False)
timestamp: float = Field(default_factory=time.time)
class InvocationQueueABC(ABC):
"""Abstract base class for all invocation queues"""
@abstractmethod
def get(self) -> InvocationQueueItem:
pass
@abstractmethod
def put(self, item: InvocationQueueItem | None) -> None:
pass
@abstractmethod
def cancel(self, graph_execution_state_id: str) -> None:
pass
@abstractmethod
def is_canceled(self, graph_execution_state_id: str) -> bool:
pass
class MemoryInvocationQueue(InvocationQueueABC):
__queue: Queue
__cancellations: dict[str, float]
def __init__(self):
self.__queue = Queue()
self.__cancellations = dict()
def get(self) -> InvocationQueueItem:
item = self.__queue.get()
while isinstance(item, InvocationQueueItem) \
and item.graph_execution_state_id in self.__cancellations \
and self.__cancellations[item.graph_execution_state_id] > item.timestamp:
item = self.__queue.get()
# Clear old items
for graph_execution_state_id in list(self.__cancellations.keys()):
if self.__cancellations[graph_execution_state_id] < item.timestamp:
del self.__cancellations[graph_execution_state_id]
return item
def put(self, item: InvocationQueueItem | None) -> None:
self.__queue.put(item)
def cancel(self, graph_execution_state_id: str) -> None:
if graph_execution_state_id not in self.__cancellations:
self.__cancellations[graph_execution_state_id] = time.time()
def is_canceled(self, graph_execution_state_id: str) -> bool:
return graph_execution_state_id in self.__cancellations

View File

@ -1,50 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from invokeai.app.services.metadata import MetadataServiceBase
from invokeai.backend import ModelManager
from .events import EventServiceBase
from .latent_storage import LatentsStorageBase
from .image_storage import ImageStorageBase
from .restoration_services import RestorationServices
from .invocation_queue import InvocationQueueABC
from .item_storage import ItemStorageABC
class InvocationServices:
"""Services that can be used by invocations"""
events: EventServiceBase
latents: LatentsStorageBase
images: ImageStorageBase
metadata: MetadataServiceBase
queue: InvocationQueueABC
model_manager: ModelManager
restoration: RestorationServices
# NOTE: we must forward-declare any types that include invocations, since invocations can use services
graph_library: ItemStorageABC["LibraryGraph"]
graph_execution_manager: ItemStorageABC["GraphExecutionState"]
processor: "InvocationProcessorABC"
def __init__(
self,
model_manager: ModelManager,
events: EventServiceBase,
latents: LatentsStorageBase,
images: ImageStorageBase,
metadata: MetadataServiceBase,
queue: InvocationQueueABC,
graph_library: ItemStorageABC["LibraryGraph"],
graph_execution_manager: ItemStorageABC["GraphExecutionState"],
processor: "InvocationProcessorABC",
restoration: RestorationServices,
):
self.model_manager = model_manager
self.events = events
self.latents = latents
self.images = images
self.metadata = metadata
self.queue = queue
self.graph_library = graph_library
self.graph_execution_manager = graph_execution_manager
self.processor = processor
self.restoration = restoration

View File

@ -1,90 +0,0 @@
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from abc import ABC
from threading import Event, Thread
from ..invocations.baseinvocation import InvocationContext
from .graph import Graph, GraphExecutionState
from .invocation_queue import InvocationQueueABC, InvocationQueueItem
from .invocation_services import InvocationServices
from .item_storage import ItemStorageABC
class Invoker:
"""The invoker, used to execute invocations"""
services: InvocationServices
def __init__(self, services: InvocationServices):
self.services = services
self._start()
def invoke(
self, graph_execution_state: GraphExecutionState, invoke_all: bool = False
) -> str | None:
"""Determines the next node to invoke and returns the id of the invoked node, or None if there are no nodes to execute"""
# Get the next invocation
invocation = graph_execution_state.next()
if not invocation:
return None
# Save the execution state
self.services.graph_execution_manager.set(graph_execution_state)
# Queue the invocation
self.services.queue.put(
InvocationQueueItem(
# session_id = session.id,
graph_execution_state_id=graph_execution_state.id,
invocation_id=invocation.id,
invoke_all=invoke_all,
)
)
return invocation.id
def create_execution_state(self, graph: Graph | None = None) -> GraphExecutionState:
"""Creates a new execution state for the given graph"""
new_state = GraphExecutionState(graph=Graph() if graph is None else graph)
self.services.graph_execution_manager.set(new_state)
return new_state
def cancel(self, graph_execution_state_id: str) -> None:
"""Cancels the given execution state"""
self.services.queue.cancel(graph_execution_state_id)
def __start_service(self, service) -> None:
# Call start() method on any services that have it
start_op = getattr(service, "start", None)
if callable(start_op):
start_op(self)
def __stop_service(self, service) -> None:
# Call stop() method on any services that have it
stop_op = getattr(service, "stop", None)
if callable(stop_op):
stop_op(self)
def _start(self) -> None:
"""Starts the invoker. This is called automatically when the invoker is created."""
for service in vars(self.services):
self.__start_service(getattr(self.services, service))
for service in vars(self.services):
self.__start_service(getattr(self.services, service))
def stop(self) -> None:
"""Stops the invoker. A new invoker will have to be created to execute further."""
# First stop all services
for service in vars(self.services):
self.__stop_service(getattr(self.services, service))
for service in vars(self.services):
self.__stop_service(getattr(self.services, service))
self.services.queue.put(None)
class InvocationProcessorABC(ABC):
pass

View File

@ -1,62 +0,0 @@
from abc import ABC, abstractmethod
from typing import Callable, Generic, TypeVar
from pydantic import BaseModel, Field
from pydantic.generics import GenericModel
T = TypeVar("T", bound=BaseModel)
class PaginatedResults(GenericModel, Generic[T]):
"""Paginated results"""
#fmt: off
items: list[T] = Field(description="Items")
page: int = Field(description="Current Page")
pages: int = Field(description="Total number of pages")
per_page: int = Field(description="Number of items per page")
total: int = Field(description="Total number of items in result")
#fmt: on
class ItemStorageABC(ABC, Generic[T]):
_on_changed_callbacks: list[Callable[[T], None]]
_on_deleted_callbacks: list[Callable[[str], None]]
def __init__(self) -> None:
self._on_changed_callbacks = list()
self._on_deleted_callbacks = list()
"""Base item storage class"""
@abstractmethod
def get(self, item_id: str) -> T:
pass
@abstractmethod
def set(self, item: T) -> None:
pass
@abstractmethod
def list(self, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
pass
@abstractmethod
def search(
self, query: str, page: int = 0, per_page: int = 10
) -> PaginatedResults[T]:
pass
def on_changed(self, on_changed: Callable[[T], None]) -> None:
"""Register a callback for when an item is changed"""
self._on_changed_callbacks.append(on_changed)
def on_deleted(self, on_deleted: Callable[[str], None]) -> None:
"""Register a callback for when an item is deleted"""
self._on_deleted_callbacks.append(on_deleted)
def _on_changed(self, item: T) -> None:
for callback in self._on_changed_callbacks:
callback(item)
def _on_deleted(self, item_id: str) -> None:
for callback in self._on_deleted_callbacks:
callback(item_id)

View File

@ -1,93 +0,0 @@
# Copyright (c) 2023 Kyle Schouviller (https://github.com/kyle0654)
import os
from abc import ABC, abstractmethod
from pathlib import Path
from queue import Queue
from typing import Dict
import torch
class LatentsStorageBase(ABC):
"""Responsible for storing and retrieving latents."""
@abstractmethod
def get(self, name: str) -> torch.Tensor:
pass
@abstractmethod
def set(self, name: str, data: torch.Tensor) -> None:
pass
@abstractmethod
def delete(self, name: str) -> None:
pass
class ForwardCacheLatentsStorage(LatentsStorageBase):
"""Caches the latest N latents in memory, writing-thorugh to and reading from underlying storage"""
__cache: Dict[str, torch.Tensor]
__cache_ids: Queue
__max_cache_size: int
__underlying_storage: LatentsStorageBase
def __init__(self, underlying_storage: LatentsStorageBase, max_cache_size: int = 20):
self.__underlying_storage = underlying_storage
self.__cache = dict()
self.__cache_ids = Queue()
self.__max_cache_size = max_cache_size
def get(self, name: str) -> torch.Tensor:
cache_item = self.__get_cache(name)
if cache_item is not None:
return cache_item
latent = self.__underlying_storage.get(name)
self.__set_cache(name, latent)
return latent
def set(self, name: str, data: torch.Tensor) -> None:
self.__underlying_storage.set(name, data)
self.__set_cache(name, data)
def delete(self, name: str) -> None:
self.__underlying_storage.delete(name)
if name in self.__cache:
del self.__cache[name]
def __get_cache(self, name: str) -> torch.Tensor|None:
return None if name not in self.__cache else self.__cache[name]
def __set_cache(self, name: str, data: torch.Tensor):
if not name in self.__cache:
self.__cache[name] = data
self.__cache_ids.put(name)
if self.__cache_ids.qsize() > self.__max_cache_size:
self.__cache.pop(self.__cache_ids.get())
class DiskLatentsStorage(LatentsStorageBase):
"""Stores latents in a folder on disk without caching"""
__output_folder: str
def __init__(self, output_folder: str):
self.__output_folder = output_folder
Path(output_folder).mkdir(parents=True, exist_ok=True)
def get(self, name: str) -> torch.Tensor:
latent_path = self.get_path(name)
return torch.load(latent_path)
def set(self, name: str, data: torch.Tensor) -> None:
latent_path = self.get_path(name)
torch.save(data, latent_path)
def delete(self, name: str) -> None:
latent_path = self.get_path(name)
os.remove(latent_path)
def get_path(self, name: str) -> str:
return os.path.join(self.__output_folder, name)

View File

@ -1,96 +0,0 @@
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, TypedDict
from PIL import Image, PngImagePlugin
from pydantic import BaseModel
from invokeai.app.models.image import ImageType, is_image_type
class MetadataImageField(TypedDict):
"""Pydantic-less ImageField, used for metadata parsing."""
image_type: ImageType
image_name: str
class MetadataLatentsField(TypedDict):
"""Pydantic-less LatentsField, used for metadata parsing."""
latents_name: str
# TODO: This is a placeholder for `InvocationsUnion` pending resolution of circular imports
NodeMetadata = Dict[
str, str | int | float | bool | MetadataImageField | MetadataLatentsField
]
class InvokeAIMetadata(TypedDict, total=False):
"""InvokeAI-specific metadata format."""
session_id: Optional[str]
node: Optional[NodeMetadata]
def build_invokeai_metadata_pnginfo(
metadata: InvokeAIMetadata | None,
) -> PngImagePlugin.PngInfo:
"""Builds a PngInfo object with key `"invokeai"` and value `metadata`"""
pnginfo = PngImagePlugin.PngInfo()
if metadata is not None:
pnginfo.add_text("invokeai", json.dumps(metadata))
return pnginfo
class MetadataServiceBase(ABC):
@abstractmethod
def get_metadata(self, image: Image.Image) -> InvokeAIMetadata | None:
"""Gets the InvokeAI metadata from a PIL Image, skipping invalid values"""
pass
@abstractmethod
def build_metadata(
self, session_id: str, node: BaseModel
) -> InvokeAIMetadata | None:
"""Builds an InvokeAIMetadata object"""
pass
class PngMetadataService(MetadataServiceBase):
"""Handles loading and building metadata for images."""
# TODO: Use `InvocationsUnion` to **validate** metadata as representing a fully-functioning node
def _load_metadata(self, image: Image.Image) -> dict | None:
"""Loads a specific info entry from a PIL Image."""
try:
info = image.info.get("invokeai")
if type(info) is not str:
return None
loaded_metadata = json.loads(info)
if type(loaded_metadata) is not dict:
return None
if len(loaded_metadata.items()) == 0:
return None
return loaded_metadata
except:
return None
def get_metadata(self, image: Image.Image) -> dict | None:
"""Retrieves an image's metadata as a dict"""
loaded_metadata = self._load_metadata(image)
return loaded_metadata
def build_metadata(self, session_id: str, node: BaseModel) -> InvokeAIMetadata:
metadata = InvokeAIMetadata(session_id=session_id, node=node.dict())
return metadata

View File

@ -1,120 +0,0 @@
import os
import sys
import torch
from argparse import Namespace
from invokeai.backend import Args
from omegaconf import OmegaConf
from pathlib import Path
import invokeai.version
from ...backend import ModelManager
from ...backend.util import choose_precision, choose_torch_device
from ...backend import Globals
# TODO: Replace with an abstract class base ModelManagerBase
def get_model_manager(config: Args) -> ModelManager:
if not config.conf:
config_file = os.path.join(Globals.root, "configs", "models.yaml")
if not os.path.exists(config_file):
report_model_error(
config, FileNotFoundError(f"The file {config_file} could not be found.")
)
print(f">> {invokeai.version.__app_name__}, version {invokeai.version.__version__}")
print(f'>> InvokeAI runtime directory is "{Globals.root}"')
# these two lines prevent a horrible warning message from appearing
# when the frozen CLIP tokenizer is imported
import transformers # type: ignore
transformers.logging.set_verbosity_error()
import diffusers
diffusers.logging.set_verbosity_error()
# normalize the config directory relative to root
if not os.path.isabs(config.conf):
config.conf = os.path.normpath(os.path.join(Globals.root, config.conf))
if config.embeddings:
if not os.path.isabs(config.embedding_path):
embedding_path = os.path.normpath(
os.path.join(Globals.root, config.embedding_path)
)
else:
embedding_path = config.embedding_path
else:
embedding_path = None
# migrate legacy models
ModelManager.migrate_models()
# creating the model manager
try:
device = torch.device(choose_torch_device())
precision = 'float16' if config.precision=='float16' \
else 'float32' if config.precision=='float32' \
else choose_precision(device)
model_manager = ModelManager(
OmegaConf.load(config.conf),
precision=precision,
device_type=device,
max_loaded_models=config.max_loaded_models,
embedding_path = Path(embedding_path),
)
except (FileNotFoundError, TypeError, AssertionError) as e:
report_model_error(config, e)
except (IOError, KeyError) as e:
print(f"{e}. Aborting.")
sys.exit(-1)
# try to autoconvert new models
# autoimport new .ckpt files
if path := config.autoconvert:
model_manager.autoconvert_weights(
conf_path=config.conf,
weights_directory=path,
)
return model_manager
def report_model_error(opt: Namespace, e: Exception):
print(f'** An error occurred while attempting to initialize the model: "{str(e)}"')
print(
"** This can be caused by a missing or corrupted models file, and can sometimes be fixed by (re)installing the models."
)
yes_to_all = os.environ.get("INVOKE_MODEL_RECONFIGURE")
if yes_to_all:
print(
"** Reconfiguration is being forced by environment variable INVOKE_MODEL_RECONFIGURE"
)
else:
response = input(
"Do you want to run invokeai-configure script to select and/or reinstall models? [y] "
)
if response.startswith(("n", "N")):
return
print("invokeai-configure is launching....\n")
# Match arguments that were set on the CLI
# only the arguments accepted by the configuration script are parsed
root_dir = ["--root", opt.root_dir] if opt.root_dir is not None else []
config = ["--config", opt.conf] if opt.conf is not None else []
previous_config = sys.argv
sys.argv = ["invokeai-configure"]
sys.argv.extend(root_dir)
sys.argv.extend(config.to_dict())
if yes_to_all is not None:
for arg in yes_to_all.split():
sys.argv.append(arg)
from invokeai.frontend.install import invokeai_configure
invokeai_configure()
# TODO: Figure out how to restart
# print('** InvokeAI will now restart')
# sys.argv = previous_args
# main() # would rather do a os.exec(), but doesn't exist?
# sys.exit(0)

View File

@ -1,130 +0,0 @@
import traceback
from threading import Event, Thread
from ..invocations.baseinvocation import InvocationContext
from .invocation_queue import InvocationQueueItem
from .invoker import InvocationProcessorABC, Invoker
from ..models.exceptions import CanceledException
class DefaultInvocationProcessor(InvocationProcessorABC):
__invoker_thread: Thread
__stop_event: Event
__invoker: Invoker
def start(self, invoker) -> None:
self.__invoker = invoker
self.__stop_event = Event()
self.__invoker_thread = Thread(
name="invoker_processor",
target=self.__process,
kwargs=dict(stop_event=self.__stop_event),
)
self.__invoker_thread.daemon = (
True # TODO: probably better to just not use threads?
)
self.__invoker_thread.start()
def stop(self, *args, **kwargs) -> None:
self.__stop_event.set()
def __process(self, stop_event: Event):
try:
while not stop_event.is_set():
queue_item: InvocationQueueItem = self.__invoker.services.queue.get()
if not queue_item: # Probably stopping
continue
graph_execution_state = (
self.__invoker.services.graph_execution_manager.get(
queue_item.graph_execution_state_id
)
)
invocation = graph_execution_state.execution_graph.get_node(
queue_item.invocation_id
)
# get the source node id to provide to clients (the prepared node id is not as useful)
source_node_id = graph_execution_state.prepared_source_mapping[invocation.id]
# Send starting event
self.__invoker.services.events.emit_invocation_started(
graph_execution_state_id=graph_execution_state.id,
node=invocation.dict(),
source_node_id=source_node_id
)
# Invoke
try:
outputs = invocation.invoke(
InvocationContext(
services=self.__invoker.services,
graph_execution_state_id=graph_execution_state.id,
)
)
# Check queue to see if this is canceled, and skip if so
if self.__invoker.services.queue.is_canceled(
graph_execution_state.id
):
continue
# Save outputs and history
graph_execution_state.complete(invocation.id, outputs)
# Save the state changes
self.__invoker.services.graph_execution_manager.set(
graph_execution_state
)
# Send complete event
self.__invoker.services.events.emit_invocation_complete(
graph_execution_state_id=graph_execution_state.id,
node=invocation.dict(),
source_node_id=source_node_id,
result=outputs.dict(),
)
except KeyboardInterrupt:
pass
except CanceledException:
pass
except Exception as e:
error = traceback.format_exc()
# Save error
graph_execution_state.set_node_error(invocation.id, error)
# Save the state changes
self.__invoker.services.graph_execution_manager.set(
graph_execution_state
)
# Send error event
self.__invoker.services.events.emit_invocation_error(
graph_execution_state_id=graph_execution_state.id,
node=invocation.dict(),
source_node_id=source_node_id,
error=error,
)
pass
# Check queue to see if this is canceled, and skip if so
if self.__invoker.services.queue.is_canceled(
graph_execution_state.id
):
continue
# Queue any further commands if invoking all
is_complete = graph_execution_state.is_complete()
if queue_item.invoke_all and not is_complete:
self.__invoker.invoke(graph_execution_state, invoke_all=True)
elif is_complete:
self.__invoker.services.events.emit_graph_execution_complete(
graph_execution_state.id
)
except KeyboardInterrupt:
... # Log something?

View File

@ -1,109 +0,0 @@
import sys
import traceback
import torch
from ...backend.restoration import Restoration
from ...backend.util import choose_torch_device, CPU_DEVICE, MPS_DEVICE
# This should be a real base class for postprocessing functions,
# but right now we just instantiate the existing gfpgan, esrgan
# and codeformer functions.
class RestorationServices:
'''Face restoration and upscaling'''
def __init__(self,args):
try:
gfpgan, codeformer, esrgan = None, None, None
if args.restore or args.esrgan:
restoration = Restoration()
if args.restore:
gfpgan, codeformer = restoration.load_face_restore_models(
args.gfpgan_model_path
)
else:
print(">> Face restoration disabled")
if args.esrgan:
esrgan = restoration.load_esrgan(args.esrgan_bg_tile)
else:
print(">> Upscaling disabled")
else:
print(">> Face restoration and upscaling disabled")
except (ModuleNotFoundError, ImportError):
print(traceback.format_exc(), file=sys.stderr)
print(">> You may need to install the ESRGAN and/or GFPGAN modules")
self.device = torch.device(choose_torch_device())
self.gfpgan = gfpgan
self.codeformer = codeformer
self.esrgan = esrgan
# note that this one method does gfpgan and codepath reconstruction, as well as
# esrgan upscaling
# TO DO: refactor into separate methods
def upscale_and_reconstruct(
self,
image_list,
facetool="gfpgan",
upscale=None,
upscale_denoise_str=0.75,
strength=0.0,
codeformer_fidelity=0.75,
save_original=False,
image_callback=None,
prefix=None,
):
results = []
for r in image_list:
image, seed = r
try:
if strength > 0:
if self.gfpgan is not None or self.codeformer is not None:
if facetool == "gfpgan":
if self.gfpgan is None:
print(
">> GFPGAN not found. Face restoration is disabled."
)
else:
image = self.gfpgan.process(image, strength, seed)
if facetool == "codeformer":
if self.codeformer is None:
print(
">> CodeFormer not found. Face restoration is disabled."
)
else:
cf_device = (
CPU_DEVICE if self.device == MPS_DEVICE else self.device
)
image = self.codeformer.process(
image=image,
strength=strength,
device=cf_device,
seed=seed,
fidelity=codeformer_fidelity,
)
else:
print(">> Face Restoration is disabled.")
if upscale is not None:
if self.esrgan is not None:
if len(upscale) < 2:
upscale.append(0.75)
image = self.esrgan.process(
image,
upscale[1],
seed,
int(upscale[0]),
denoise_str=upscale_denoise_str,
)
else:
print(">> ESRGAN is disabled. Image not upscaled.")
except Exception as e:
print(
f">> Error running RealESRGAN or GFPGAN. Your image was not upscaled.\n{e}"
)
if image_callback is not None:
image_callback(image, seed, upscaled=True, use_prefix=prefix)
else:
r[0] = image
results.append([image, seed])
return results

View File

@ -1,140 +0,0 @@
import sqlite3
from threading import Lock
from typing import Generic, TypeVar, Union, get_args
from pydantic import BaseModel, parse_raw_as
from .item_storage import ItemStorageABC, PaginatedResults
T = TypeVar("T", bound=BaseModel)
sqlite_memory = ":memory:"
class SqliteItemStorage(ItemStorageABC, Generic[T]):
_filename: str
_table_name: str
_conn: sqlite3.Connection
_cursor: sqlite3.Cursor
_id_field: str
_lock: Lock
def __init__(self, filename: str, table_name: str, id_field: str = "id"):
super().__init__()
self._filename = filename
self._table_name = table_name
self._id_field = id_field # TODO: validate that T has this field
self._lock = Lock()
self._conn = sqlite3.connect(
self._filename, check_same_thread=False
) # TODO: figure out a better threading solution
self._cursor = self._conn.cursor()
self._create_table()
def _create_table(self):
try:
self._lock.acquire()
self._cursor.execute(
f"""CREATE TABLE IF NOT EXISTS {self._table_name} (
item TEXT,
id TEXT GENERATED ALWAYS AS (json_extract(item, '$.{self._id_field}')) VIRTUAL NOT NULL);"""
)
self._cursor.execute(
f"""CREATE UNIQUE INDEX IF NOT EXISTS {self._table_name}_id ON {self._table_name}(id);"""
)
finally:
self._lock.release()
def _parse_item(self, item: str) -> T:
item_type = get_args(self.__orig_class__)[0]
return parse_raw_as(item_type, item)
def set(self, item: T):
try:
self._lock.acquire()
self._cursor.execute(
f"""INSERT OR REPLACE INTO {self._table_name} (item) VALUES (?);""",
(item.json(),),
)
self._conn.commit()
finally:
self._lock.release()
self._on_changed(item)
def get(self, id: str) -> Union[T, None]:
try:
self._lock.acquire()
self._cursor.execute(
f"""SELECT item FROM {self._table_name} WHERE id = ?;""", (str(id),)
)
result = self._cursor.fetchone()
finally:
self._lock.release()
if not result:
return None
return self._parse_item(result[0])
def delete(self, id: str):
try:
self._lock.acquire()
self._cursor.execute(
f"""DELETE FROM {self._table_name} WHERE id = ?;""", (str(id),)
)
self._conn.commit()
finally:
self._lock.release()
self._on_deleted(id)
def list(self, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
try:
self._lock.acquire()
self._cursor.execute(
f"""SELECT item FROM {self._table_name} LIMIT ? OFFSET ?;""",
(per_page, page * per_page),
)
result = self._cursor.fetchall()
items = list(map(lambda r: self._parse_item(r[0]), result))
self._cursor.execute(f"""SELECT count(*) FROM {self._table_name};""")
count = self._cursor.fetchone()[0]
finally:
self._lock.release()
pageCount = int(count / per_page) + 1
return PaginatedResults[T](
items=items, page=page, pages=pageCount, per_page=per_page, total=count
)
def search(
self, query: str, page: int = 0, per_page: int = 10
) -> PaginatedResults[T]:
try:
self._lock.acquire()
self._cursor.execute(
f"""SELECT item FROM {self._table_name} WHERE item LIKE ? LIMIT ? OFFSET ?;""",
(f"%{query}%", per_page, page * per_page),
)
result = self._cursor.fetchall()
items = list(map(lambda r: self._parse_item(r[0]), result))
self._cursor.execute(
f"""SELECT count(*) FROM {self._table_name} WHERE item LIKE ?;""",
(f"%{query}%",),
)
count = self._cursor.fetchone()[0]
finally:
self._lock.release()
pageCount = int(count / per_page) + 1
return PaginatedResults[T](
items=items, page=page, pages=pageCount, per_page=per_page, total=count
)

View File

@ -1,5 +0,0 @@
import datetime
def get_timestamp():
return int(datetime.datetime.now(datetime.timezone.utc).timestamp())

View File

@ -1,55 +0,0 @@
from invokeai.app.api.models.images import ProgressImage
from invokeai.app.models.exceptions import CanceledException
from ..invocations.baseinvocation import InvocationContext
from ...backend.util.util import image_to_dataURL
from ...backend.generator.base import Generator
from ...backend.stable_diffusion import PipelineIntermediateState
def stable_diffusion_step_callback(
context: InvocationContext,
intermediate_state: PipelineIntermediateState,
node: dict,
source_node_id: str,
):
if context.services.queue.is_canceled(context.graph_execution_state_id):
raise CanceledException
# Some schedulers report not only the noisy latents at the current timestep,
# but also their estimate so far of what the de-noised latents will be. Use
# that estimate if it is available.
if intermediate_state.predicted_original is not None:
sample = intermediate_state.predicted_original
else:
sample = intermediate_state.latents
# TODO: This does not seem to be needed any more?
# # txt2img provides a Tensor in the step_callback
# # img2img provides a PipelineIntermediateState
# if isinstance(sample, PipelineIntermediateState):
# # this was an img2img
# print('img2img')
# latents = sample.latents
# step = sample.step
# else:
# print('txt2img')
# latents = sample
# step = intermediate_state.step
# TODO: only output a preview image when requested
image = Generator.sample_to_lowres_estimated_image(sample)
(width, height) = image.size
width *= 8
height *= 8
dataURL = image_to_dataURL(image, image_format="JPEG")
context.services.events.emit_generator_progress(
graph_execution_state_id=context.graph_execution_state_id,
node=node,
source_node_id=source_node_id,
progress_image=ProgressImage(width=width, height=height, dataURL=dataURL),
step=intermediate_state.step,
total_steps=node["steps"],
)

View File

@ -1,15 +0,0 @@
import os
from PIL import Image
def get_thumbnail_name(image_name: str) -> str:
"""Formats given an image name, returns the appropriate thumbnail image name"""
thumbnail_name = os.path.splitext(image_name)[0] + ".webp"
return thumbnail_name
def make_thumbnail(image: Image.Image, size: int = 256) -> Image.Image:
"""Makes a thumbnail from a PIL Image"""
thumbnail = image.copy()
thumbnail.thumbnail(size=(size, size))
return thumbnail

View File

@ -1,16 +1,5 @@
"""
'''
Initialization file for invokeai.backend
"""
from .generate import Generate
from .generator import (
InvokeAIGeneratorBasicParams,
InvokeAIGenerator,
InvokeAIGeneratorOutput,
Txt2Img,
Img2Img,
Inpaint
)
from .model_management import ModelManager
from .safety_checker import SafetyChecker
from .args import Args
from .globals import Globals
'''
from .invoke_ai_web_server import InvokeAIWebServer

File diff suppressed because it is too large Load Diff

View File

@ -1,867 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2022 Lincoln D. Stein (https://github.com/lstein)
# Before running stable-diffusion on an internet-isolated machine,
# run this script from one with internet connectivity. The
# two machines must share a common .cache directory.
#
# Coauthor: Kevin Turner http://github.com/keturn
#
import sys
print("Loading Python libraries...\n",file=sys.stderr)
import argparse
import io
import os
import re
import shutil
import traceback
import warnings
from argparse import Namespace
from pathlib import Path
from shutil import get_terminal_size
from urllib import request
import npyscreen
import torch
import transformers
from diffusers import AutoencoderKL
from huggingface_hub import HfFolder
from huggingface_hub import login as hf_hub_login
from omegaconf import OmegaConf
from tqdm import tqdm
from transformers import (
AutoProcessor,
CLIPSegForImageSegmentation,
CLIPTextModel,
CLIPTokenizer,
)
import invokeai.configs as configs
from ...frontend.install.model_install import addModelsForm, process_and_execute
from ...frontend.install.widgets import (
CenteredButtonPress,
IntTitleSlider,
set_min_terminal_size,
)
from ..args import PRECISION_CHOICES, Args
from ..globals import Globals, global_cache_dir, global_config_dir, global_config_file
from .model_install_backend import (
default_dataset,
download_from_hf,
hf_download_with_resume,
recommended_datasets,
)
warnings.filterwarnings("ignore")
transformers.logging.set_verbosity_error()
# --------------------------globals-----------------------
Model_dir = "models"
Weights_dir = "ldm/stable-diffusion-v1/"
# the initial "configs" dir is now bundled in the `invokeai.configs` package
Dataset_path = Path(configs.__path__[0]) / "INITIAL_MODELS.yaml"
Default_config_file = Path(global_config_dir()) / "models.yaml"
SD_Configs = Path(global_config_dir()) / "stable-diffusion"
Datasets = OmegaConf.load(Dataset_path)
# minimum size for the UI
MIN_COLS = 135
MIN_LINES = 45
INIT_FILE_PREAMBLE = """# InvokeAI initialization file
# This is the InvokeAI initialization file, which contains command-line default values.
# Feel free to edit. If anything goes wrong, you can re-initialize this file by deleting
# or renaming it and then running invokeai-configure again.
# Place frequently-used startup commands here, one or more per line.
# Examples:
# --outdir=D:\data\images
# --no-nsfw_checker
# --web --host=0.0.0.0
# --steps=20
# -Ak_euler_a -C10.0
"""
# --------------------------------------------
def postscript(errors: None):
if not any(errors):
message = f"""
** INVOKEAI INSTALLATION SUCCESSFUL **
If you installed manually from source or with 'pip install': activate the virtual environment
then run one of the following commands to start InvokeAI.
Web UI:
invokeai --web # (connect to http://localhost:9090)
invokeai --web --host 0.0.0.0 # (connect to http://your-lan-ip:9090 from another computer on the local network)
Command-line interface:
invokeai
If you installed using an installation script, run:
{Globals.root}/invoke.{"bat" if sys.platform == "win32" else "sh"}
Add the '--help' argument to see all of the command-line switches available for use.
"""
else:
message = "\n** There were errors during installation. It is possible some of the models were not fully downloaded.\n"
for err in errors:
message += f"\t - {err}\n"
message += "Please check the logs above and correct any issues."
print(message)
# ---------------------------------------------
def yes_or_no(prompt: str, default_yes=True):
default = "y" if default_yes else "n"
response = input(f"{prompt} [{default}] ") or default
if default_yes:
return response[0] not in ("n", "N")
else:
return response[0] in ("y", "Y")
# ---------------------------------------------
def HfLogin(access_token) -> str:
"""
Helper for logging in to Huggingface
The stdout capture is needed to hide the irrelevant "git credential helper" warning
"""
capture = io.StringIO()
sys.stdout = capture
try:
hf_hub_login(token=access_token, add_to_git_credential=False)
sys.stdout = sys.__stdout__
except Exception as exc:
sys.stdout = sys.__stdout__
print(exc)
raise exc
# -------------------------------------
class ProgressBar:
def __init__(self, model_name="file"):
self.pbar = None
self.name = model_name
def __call__(self, block_num, block_size, total_size):
if not self.pbar:
self.pbar = tqdm(
desc=self.name,
initial=0,
unit="iB",
unit_scale=True,
unit_divisor=1000,
total=total_size,
)
self.pbar.update(block_size)
# ---------------------------------------------
def download_with_progress_bar(model_url: str, model_dest: str, label: str = "the"):
try:
print(f"Installing {label} model file {model_url}...", end="", file=sys.stderr)
if not os.path.exists(model_dest):
os.makedirs(os.path.dirname(model_dest), exist_ok=True)
request.urlretrieve(
model_url, model_dest, ProgressBar(os.path.basename(model_dest))
)
print("...downloaded successfully", file=sys.stderr)
else:
print("...exists", file=sys.stderr)
except Exception:
print("...download failed", file=sys.stderr)
print(f"Error downloading {label} model", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
# ---------------------------------------------
# this will preload the Bert tokenizer fles
def download_bert():
print("Installing bert tokenizer...", file=sys.stderr)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
from transformers import BertTokenizerFast
download_from_hf(BertTokenizerFast, "bert-base-uncased")
# ---------------------------------------------
def download_sd1_clip():
print("Installing SD1 clip model...", file=sys.stderr)
version = "openai/clip-vit-large-patch14"
download_from_hf(CLIPTokenizer, version)
download_from_hf(CLIPTextModel, version)
# ---------------------------------------------
def download_sd2_clip():
version = "stabilityai/stable-diffusion-2"
print("Installing SD2 clip model...", file=sys.stderr)
download_from_hf(CLIPTokenizer, version, subfolder="tokenizer")
download_from_hf(CLIPTextModel, version, subfolder="text_encoder")
# ---------------------------------------------
def download_realesrgan():
print("Installing models from RealESRGAN...", file=sys.stderr)
model_url = "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-x4v3.pth"
wdn_model_url = "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-wdn-x4v3.pth"
model_dest = os.path.join(
Globals.root, "models/realesrgan/realesr-general-x4v3.pth"
)
wdn_model_dest = os.path.join(
Globals.root, "models/realesrgan/realesr-general-wdn-x4v3.pth"
)
download_with_progress_bar(model_url, model_dest, "RealESRGAN")
download_with_progress_bar(wdn_model_url, wdn_model_dest, "RealESRGANwdn")
def download_gfpgan():
print("Installing GFPGAN models...", file=sys.stderr)
for model in (
[
"https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth",
"./models/gfpgan/GFPGANv1.4.pth",
],
[
"https://github.com/xinntao/facexlib/releases/download/v0.1.0/detection_Resnet50_Final.pth",
"./models/gfpgan/weights/detection_Resnet50_Final.pth",
],
[
"https://github.com/xinntao/facexlib/releases/download/v0.2.2/parsing_parsenet.pth",
"./models/gfpgan/weights/parsing_parsenet.pth",
],
):
model_url, model_dest = model[0], os.path.join(Globals.root, model[1])
download_with_progress_bar(model_url, model_dest, "GFPGAN weights")
# ---------------------------------------------
def download_codeformer():
print("Installing CodeFormer model file...", file=sys.stderr)
model_url = (
"https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/codeformer.pth"
)
model_dest = os.path.join(Globals.root, "models/codeformer/codeformer.pth")
download_with_progress_bar(model_url, model_dest, "CodeFormer")
# ---------------------------------------------
def download_clipseg():
print("Installing clipseg model for text-based masking...", file=sys.stderr)
CLIPSEG_MODEL = "CIDAS/clipseg-rd64-refined"
try:
download_from_hf(AutoProcessor, CLIPSEG_MODEL)
download_from_hf(CLIPSegForImageSegmentation, CLIPSEG_MODEL)
except Exception:
print("Error installing clipseg model:")
print(traceback.format_exc())
# -------------------------------------
def download_safety_checker():
print("Installing model for NSFW content detection...", file=sys.stderr)
try:
from diffusers.pipelines.stable_diffusion.safety_checker import (
StableDiffusionSafetyChecker,
)
from transformers import AutoFeatureExtractor
except ModuleNotFoundError:
print("Error installing NSFW checker model:")
print(traceback.format_exc())
return
safety_model_id = "CompVis/stable-diffusion-safety-checker"
print("AutoFeatureExtractor...", file=sys.stderr)
download_from_hf(AutoFeatureExtractor, safety_model_id)
print("StableDiffusionSafetyChecker...", file=sys.stderr)
download_from_hf(StableDiffusionSafetyChecker, safety_model_id)
# -------------------------------------
def download_vaes():
print("Installing stabilityai VAE...", file=sys.stderr)
try:
# first the diffusers version
repo_id = "stabilityai/sd-vae-ft-mse"
args = dict(
cache_dir=global_cache_dir("hub"),
)
if not AutoencoderKL.from_pretrained(repo_id, **args):
raise Exception(f"download of {repo_id} failed")
repo_id = "stabilityai/sd-vae-ft-mse-original"
model_name = "vae-ft-mse-840000-ema-pruned.ckpt"
# next the legacy checkpoint version
if not hf_download_with_resume(
repo_id=repo_id,
model_name=model_name,
model_dir=str(Globals.root / Model_dir / Weights_dir),
):
raise Exception(f"download of {model_name} failed")
except Exception as e:
print(f"Error downloading StabilityAI standard VAE: {str(e)}", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
# -------------------------------------
def get_root(root: str = None) -> str:
if root:
return root
elif os.environ.get("INVOKEAI_ROOT"):
return os.environ.get("INVOKEAI_ROOT")
else:
return Globals.root
# -------------------------------------
class editOptsForm(npyscreen.FormMultiPage):
# for responsive resizing - disabled
# FIX_MINIMUM_SIZE_WHEN_CREATED = False
def create(self):
program_opts = self.parentApp.program_opts
old_opts = self.parentApp.invokeai_opts
first_time = not (Globals.root / Globals.initfile).exists()
access_token = HfFolder.get_token()
window_width, window_height = get_terminal_size()
for i in [
"Configure startup settings. You can come back and change these later.",
"Use ctrl-N and ctrl-P to move to the <N>ext and <P>revious fields.",
"Use cursor arrows to make a checkbox selection, and space to toggle.",
]:
self.add_widget_intelligent(
npyscreen.FixedText,
value=i,
editable=False,
color="CONTROL",
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name="== BASIC OPTIONS ==",
begin_entry_at=0,
editable=False,
color="CONTROL",
scroll_exit=True,
)
self.nextrely -= 1
self.add_widget_intelligent(
npyscreen.FixedText,
value="Select an output directory for images:",
editable=False,
color="CONTROL",
)
self.outdir = self.add_widget_intelligent(
npyscreen.TitleFilename,
name="(<tab> autocompletes, ctrl-N advances):",
value=old_opts.outdir or str(default_output_dir()),
select_dir=True,
must_exist=False,
use_two_lines=False,
labelColor="GOOD",
begin_entry_at=40,
scroll_exit=True,
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.FixedText,
value="Activate the NSFW checker to blur images showing potential sexual imagery:",
editable=False,
color="CONTROL",
)
self.safety_checker = self.add_widget_intelligent(
npyscreen.Checkbox,
name="NSFW checker",
value=old_opts.safety_checker,
relx=5,
scroll_exit=True,
)
self.nextrely += 1
for i in [
"If you have an account at HuggingFace you may paste your access token here",
'to allow InvokeAI to download styles & subjects from the "Concept Library".',
"See https://huggingface.co/settings/tokens",
]:
self.add_widget_intelligent(
npyscreen.FixedText,
value=i,
editable=False,
color="CONTROL",
)
self.hf_token = self.add_widget_intelligent(
npyscreen.TitlePassword,
name="Access Token (ctrl-shift-V pastes):",
value=access_token,
begin_entry_at=42,
use_two_lines=False,
scroll_exit=True,
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name="== ADVANCED OPTIONS ==",
begin_entry_at=0,
editable=False,
color="CONTROL",
scroll_exit=True,
)
self.nextrely -= 1
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name="GPU Management",
begin_entry_at=0,
editable=False,
color="CONTROL",
scroll_exit=True,
)
self.nextrely -= 1
self.free_gpu_mem = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Free GPU memory after each generation",
value=old_opts.free_gpu_mem,
relx=5,
scroll_exit=True,
)
self.xformers = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Enable xformers support if available",
value=old_opts.xformers,
relx=5,
scroll_exit=True,
)
self.ckpt_convert = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Load legacy checkpoint models into memory as diffusers models",
value=old_opts.ckpt_convert,
relx=5,
scroll_exit=True,
)
self.always_use_cpu = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Force CPU to be used on GPU systems",
value=old_opts.always_use_cpu,
relx=5,
scroll_exit=True,
)
precision = old_opts.precision or (
"float32" if program_opts.full_precision else "auto"
)
self.precision = self.add_widget_intelligent(
npyscreen.TitleSelectOne,
name="Precision",
values=PRECISION_CHOICES,
value=PRECISION_CHOICES.index(precision),
begin_entry_at=3,
max_height=len(PRECISION_CHOICES) + 1,
scroll_exit=True,
)
self.max_loaded_models = self.add_widget_intelligent(
IntTitleSlider,
name="Number of models to cache in CPU memory (each will use 2-4 GB!)",
value=old_opts.max_loaded_models,
out_of=10,
lowest=1,
begin_entry_at=4,
scroll_exit=True,
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.FixedText,
value="Directory containing embedding/textual inversion files:",
editable=False,
color="CONTROL",
)
self.embedding_path = self.add_widget_intelligent(
npyscreen.TitleFilename,
name="(<tab> autocompletes, ctrl-N advances):",
value=str(default_embedding_dir()),
select_dir=True,
must_exist=False,
use_two_lines=False,
labelColor="GOOD",
begin_entry_at=40,
scroll_exit=True,
)
self.nextrely += 1
self.add_widget_intelligent(
npyscreen.TitleFixedText,
name="== LICENSE ==",
begin_entry_at=0,
editable=False,
color="CONTROL",
scroll_exit=True,
)
self.nextrely -= 1
for i in [
"BY DOWNLOADING THE STABLE DIFFUSION WEIGHT FILES, YOU AGREE TO HAVE READ",
"AND ACCEPTED THE CREATIVEML RESPONSIBLE AI LICENSE LOCATED AT",
"https://huggingface.co/spaces/CompVis/stable-diffusion-license",
]:
self.add_widget_intelligent(
npyscreen.FixedText,
value=i,
editable=False,
color="CONTROL",
)
self.license_acceptance = self.add_widget_intelligent(
npyscreen.Checkbox,
name="I accept the CreativeML Responsible AI License",
value=not first_time,
relx=2,
scroll_exit=True,
)
self.nextrely += 1
label = (
"DONE"
if program_opts.skip_sd_weights or program_opts.default_only
else "NEXT"
)
self.ok_button = self.add_widget_intelligent(
CenteredButtonPress,
name=label,
relx=(window_width - len(label)) // 2,
rely=-3,
when_pressed_function=self.on_ok,
)
def on_ok(self):
options = self.marshall_arguments()
if self.validate_field_values(options):
self.parentApp.new_opts = options
if hasattr(self.parentApp, "model_select"):
self.parentApp.setNextForm("MODELS")
else:
self.parentApp.setNextForm(None)
self.editing = False
else:
self.editing = True
def validate_field_values(self, opt: Namespace) -> bool:
bad_fields = []
if not opt.license_acceptance:
bad_fields.append(
"Please accept the license terms before proceeding to model downloads"
)
if not Path(opt.outdir).parent.exists():
bad_fields.append(
f"The output directory does not seem to be valid. Please check that {str(Path(opt.outdir).parent)} is an existing directory."
)
if not Path(opt.embedding_path).parent.exists():
bad_fields.append(
f"The embedding directory does not seem to be valid. Please check that {str(Path(opt.embedding_path).parent)} is an existing directory."
)
if len(bad_fields) > 0:
message = "The following problems were detected and must be corrected:\n"
for problem in bad_fields:
message += f"* {problem}\n"
npyscreen.notify_confirm(message)
return False
else:
return True
def marshall_arguments(self):
new_opts = Namespace()
for attr in [
"outdir",
"safety_checker",
"free_gpu_mem",
"max_loaded_models",
"xformers",
"always_use_cpu",
"embedding_path",
"ckpt_convert",
]:
setattr(new_opts, attr, getattr(self, attr).value)
new_opts.hf_token = self.hf_token.value
new_opts.license_acceptance = self.license_acceptance.value
new_opts.precision = PRECISION_CHOICES[self.precision.value[0]]
return new_opts
class EditOptApplication(npyscreen.NPSAppManaged):
def __init__(self, program_opts: Namespace, invokeai_opts: Namespace):
super().__init__()
self.program_opts = program_opts
self.invokeai_opts = invokeai_opts
self.user_cancelled = False
self.user_selections = default_user_selections(program_opts)
def onStart(self):
npyscreen.setTheme(npyscreen.Themes.DefaultTheme)
self.options = self.addForm(
"MAIN",
editOptsForm,
name="InvokeAI Startup Options",
)
if not (self.program_opts.skip_sd_weights or self.program_opts.default_only):
self.model_select = self.addForm(
"MODELS",
addModelsForm,
name="Install Stable Diffusion Models",
multipage=True,
)
def new_opts(self):
return self.options.marshall_arguments()
def edit_opts(program_opts: Namespace, invokeai_opts: Namespace) -> argparse.Namespace:
editApp = EditOptApplication(program_opts, invokeai_opts)
editApp.run()
return editApp.new_opts()
def default_startup_options(init_file: Path) -> Namespace:
opts = Args().parse_args([])
outdir = Path(opts.outdir)
if not outdir.is_absolute():
opts.outdir = str(Globals.root / opts.outdir)
if not init_file.exists():
opts.safety_checker = True
return opts
def default_user_selections(program_opts: Namespace) -> Namespace:
return Namespace(
starter_models=default_dataset()
if program_opts.default_only
else recommended_datasets()
if program_opts.yes_to_all
else dict(),
purge_deleted_models=False,
scan_directory=None,
autoscan_on_startup=None,
import_model_paths=None,
convert_to_diffusers=None,
)
# -------------------------------------
def initialize_rootdir(root: str, yes_to_all: bool = False):
print("** INITIALIZING INVOKEAI RUNTIME DIRECTORY **")
for name in (
"models",
"configs",
"embeddings",
"text-inversion-output",
"text-inversion-training-data",
):
os.makedirs(os.path.join(root, name), exist_ok=True)
configs_src = Path(configs.__path__[0])
configs_dest = Path(root) / "configs"
if not os.path.samefile(configs_src, configs_dest):
shutil.copytree(configs_src, configs_dest, dirs_exist_ok=True)
# -------------------------------------
def run_console_ui(
program_opts: Namespace, initfile: Path = None
) -> (Namespace, Namespace):
# parse_args() will read from init file if present
invokeai_opts = default_startup_options(initfile)
set_min_terminal_size(MIN_COLS, MIN_LINES)
editApp = EditOptApplication(program_opts, invokeai_opts)
editApp.run()
if editApp.user_cancelled:
return (None, None)
else:
return (editApp.new_opts, editApp.user_selections)
# -------------------------------------
def write_opts(opts: Namespace, init_file: Path):
"""
Update the invokeai.init file with values from opts Namespace
"""
# touch file if it doesn't exist
if not init_file.exists():
with open(init_file, "w") as f:
f.write(INIT_FILE_PREAMBLE)
# We want to write in the changed arguments without clobbering
# any other initialization values the user has entered. There is
# no good way to do this because of the one-way nature of
# argparse: i.e. --outdir could be --outdir, --out, or -o
# initfile needs to be replaced with a fully structured format
# such as yaml; this is a hack that will work much of the time
args_to_skip = re.compile(
"^--?(o|out|no-xformer|xformer|no-ckpt|ckpt|free|no-nsfw|nsfw|prec|max_load|embed|always|ckpt|free_gpu)"
)
# fix windows paths
opts.outdir = opts.outdir.replace("\\", "/")
opts.embedding_path = opts.embedding_path.replace("\\", "/")
new_file = f"{init_file}.new"
try:
lines = [x.strip() for x in open(init_file, "r").readlines()]
with open(new_file, "w") as out_file:
for line in lines:
if len(line) > 0 and not args_to_skip.match(line):
out_file.write(line + "\n")
out_file.write(
f"""
--outdir={opts.outdir}
--embedding_path={opts.embedding_path}
--precision={opts.precision}
--max_loaded_models={int(opts.max_loaded_models)}
--{'no-' if not opts.safety_checker else ''}nsfw_checker
--{'no-' if not opts.xformers else ''}xformers
--{'no-' if not opts.ckpt_convert else ''}ckpt_convert
{'--free_gpu_mem' if opts.free_gpu_mem else ''}
{'--always_use_cpu' if opts.always_use_cpu else ''}
"""
)
except OSError as e:
print(f"** An error occurred while writing the init file: {str(e)}")
os.replace(new_file, init_file)
if opts.hf_token:
HfLogin(opts.hf_token)
# -------------------------------------
def default_output_dir() -> Path:
return Globals.root / "outputs"
# -------------------------------------
def default_embedding_dir() -> Path:
return Globals.root / "embeddings"
# -------------------------------------
def write_default_options(program_opts: Namespace, initfile: Path):
opt = default_startup_options(initfile)
opt.hf_token = HfFolder.get_token()
write_opts(opt, initfile)
# -------------------------------------
def main():
parser = argparse.ArgumentParser(description="InvokeAI model downloader")
parser.add_argument(
"--skip-sd-weights",
dest="skip_sd_weights",
action=argparse.BooleanOptionalAction,
default=False,
help="skip downloading the large Stable Diffusion weight files",
)
parser.add_argument(
"--skip-support-models",
dest="skip_support_models",
action=argparse.BooleanOptionalAction,
default=False,
help="skip downloading the support models",
)
parser.add_argument(
"--full-precision",
dest="full_precision",
action=argparse.BooleanOptionalAction,
type=bool,
default=False,
help="use 32-bit weights instead of faster 16-bit weights",
)
parser.add_argument(
"--yes",
"-y",
dest="yes_to_all",
action="store_true",
help='answer "yes" to all prompts',
)
parser.add_argument(
"--default_only",
action="store_true",
help="when --yes specified, only install the default model",
)
parser.add_argument(
"--config_file",
"-c",
dest="config_file",
type=str,
default=None,
help="path to configuration file to create",
)
parser.add_argument(
"--root_dir",
dest="root",
type=str,
default=None,
help="path to root of install directory",
)
opt = parser.parse_args()
# setting a global here
Globals.root = Path(os.path.expanduser(get_root(opt.root) or ""))
errors = set()
try:
models_to_download = default_user_selections(opt)
# We check for to see if the runtime directory is correctly initialized.
init_file = Path(Globals.root, Globals.initfile)
if not init_file.exists() or not global_config_file().exists():
initialize_rootdir(Globals.root, opt.yes_to_all)
if opt.yes_to_all:
write_default_options(opt, init_file)
init_options = Namespace(
precision="float32" if opt.full_precision else "float16"
)
else:
init_options, models_to_download = run_console_ui(opt, init_file)
if init_options:
write_opts(init_options, init_file)
else:
print(
'\n** CANCELLED AT USER\'S REQUEST. USE THE "invoke.sh" LAUNCHER TO RUN LATER **\n'
)
sys.exit(0)
if opt.skip_support_models:
print("\n** SKIPPING SUPPORT MODEL DOWNLOADS PER USER REQUEST **")
else:
print("\n** DOWNLOADING SUPPORT MODELS **")
download_bert()
download_sd1_clip()
download_sd2_clip()
download_realesrgan()
download_gfpgan()
download_codeformer()
download_clipseg()
download_safety_checker()
download_vaes()
if opt.skip_sd_weights:
print("\n** SKIPPING DIFFUSION WEIGHTS DOWNLOAD PER USER REQUEST **")
elif models_to_download:
print("\n** DOWNLOADING DIFFUSION WEIGHTS **")
process_and_execute(opt, models_to_download)
postscript(errors=errors)
except KeyboardInterrupt:
print("\nGoodbye! Come back soon.")
# -------------------------------------
if __name__ == "__main__":
main()

View File

@ -1,463 +0,0 @@
"""
Utility (backend) functions used by model_install.py
"""
import os
import re
import shutil
import sys
import warnings
from pathlib import Path
from tempfile import TemporaryFile
from typing import List
import requests
from diffusers import AutoencoderKL
from huggingface_hub import hf_hub_url
from omegaconf import OmegaConf
from omegaconf.dictconfig import DictConfig
from tqdm import tqdm
import invokeai.configs as configs
from ..globals import Globals, global_cache_dir, global_config_dir
from ..model_management import ModelManager
from ..stable_diffusion import StableDiffusionGeneratorPipeline
warnings.filterwarnings("ignore")
# --------------------------globals-----------------------
Model_dir = "models"
Weights_dir = "ldm/stable-diffusion-v1/"
# the initial "configs" dir is now bundled in the `invokeai.configs` package
Dataset_path = Path(configs.__path__[0]) / "INITIAL_MODELS.yaml"
# initial models omegaconf
Datasets = None
Config_preamble = """
# This file describes the alternative machine learning models
# available to InvokeAI script.
#
# To add a new model, follow the examples below. Each
# model requires a model config file, a weights file,
# and the width and height of the images it
# was trained on.
"""
def default_config_file():
return Path(global_config_dir()) / "models.yaml"
def sd_configs():
return Path(global_config_dir()) / "stable-diffusion"
def initial_models():
global Datasets
if Datasets:
return Datasets
return (Datasets := OmegaConf.load(Dataset_path))
def install_requested_models(
install_initial_models: List[str] = None,
remove_models: List[str] = None,
scan_directory: Path = None,
external_models: List[str] = None,
scan_at_startup: bool = False,
precision: str = "float16",
purge_deleted: bool = False,
config_file_path: Path = None,
):
"""
Entry point for installing/deleting starter models, or installing external models.
"""
config_file_path = config_file_path or default_config_file()
if not config_file_path.exists():
open(config_file_path, "w")
model_manager = ModelManager(OmegaConf.load(config_file_path), precision=precision)
if remove_models and len(remove_models) > 0:
print("== DELETING UNCHECKED STARTER MODELS ==")
for model in remove_models:
print(f"{model}...")
model_manager.del_model(model, delete_files=purge_deleted)
model_manager.commit(config_file_path)
if install_initial_models and len(install_initial_models) > 0:
print("== INSTALLING SELECTED STARTER MODELS ==")
successfully_downloaded = download_weight_datasets(
models=install_initial_models,
access_token=None,
precision=precision,
) # FIX: for historical reasons, we don't use model manager here
update_config_file(successfully_downloaded, config_file_path)
if len(successfully_downloaded) < len(install_initial_models):
print("** Some of the model downloads were not successful")
# due to above, we have to reload the model manager because conf file
# was changed behind its back
model_manager = ModelManager(OmegaConf.load(config_file_path), precision=precision)
external_models = external_models or list()
if scan_directory:
external_models.append(str(scan_directory))
if len(external_models) > 0:
print("== INSTALLING EXTERNAL MODELS ==")
for path_url_or_repo in external_models:
try:
model_manager.heuristic_import(
path_url_or_repo,
commit_to_conf=config_file_path,
)
except KeyboardInterrupt:
sys.exit(-1)
except Exception:
pass
if scan_at_startup and scan_directory.is_dir():
argument = "--autoconvert"
initfile = Path(Globals.root, Globals.initfile)
replacement = Path(Globals.root, f"{Globals.initfile}.new")
directory = str(scan_directory).replace("\\", "/")
with open(initfile, "r") as input:
with open(replacement, "w") as output:
while line := input.readline():
if not line.startswith(argument):
output.writelines([line])
output.writelines([f"{argument} {directory}"])
os.replace(replacement, initfile)
# -------------------------------------
def yes_or_no(prompt: str, default_yes=True):
default = "y" if default_yes else "n"
response = input(f"{prompt} [{default}] ") or default
if default_yes:
return response[0] not in ("n", "N")
else:
return response[0] in ("y", "Y")
# -------------------------------------
def get_root(root: str = None) -> str:
if root:
return root
elif os.environ.get("INVOKEAI_ROOT"):
return os.environ.get("INVOKEAI_ROOT")
else:
return Globals.root
# ---------------------------------------------
def recommended_datasets() -> dict:
datasets = dict()
for ds in initial_models().keys():
if initial_models()[ds].get("recommended", False):
datasets[ds] = True
return datasets
# ---------------------------------------------
def default_dataset() -> dict:
datasets = dict()
for ds in initial_models().keys():
if initial_models()[ds].get("default", False):
datasets[ds] = True
return datasets
# ---------------------------------------------
def all_datasets() -> dict:
datasets = dict()
for ds in initial_models().keys():
datasets[ds] = True
return datasets
# ---------------------------------------------
# look for legacy model.ckpt in models directory and offer to
# normalize its name
def migrate_models_ckpt():
model_path = os.path.join(Globals.root, Model_dir, Weights_dir)
if not os.path.exists(os.path.join(model_path, "model.ckpt")):
return
new_name = initial_models()["stable-diffusion-1.4"]["file"]
print(
'The Stable Diffusion v4.1 "model.ckpt" is already installed. The name will be changed to {new_name} to avoid confusion.'
)
print(f"model.ckpt => {new_name}")
os.replace(
os.path.join(model_path, "model.ckpt"), os.path.join(model_path, new_name)
)
# ---------------------------------------------
def download_weight_datasets(
models: List[str], access_token: str, precision: str = "float32"
):
migrate_models_ckpt()
successful = dict()
for mod in models:
print(f"Downloading {mod}:")
successful[mod] = _download_repo_or_file(
initial_models()[mod], access_token, precision=precision
)
return successful
def _download_repo_or_file(
mconfig: DictConfig, access_token: str, precision: str = "float32"
) -> Path:
path = None
if mconfig["format"] == "ckpt":
path = _download_ckpt_weights(mconfig, access_token)
else:
path = _download_diffusion_weights(mconfig, access_token, precision=precision)
if "vae" in mconfig and "repo_id" in mconfig["vae"]:
_download_diffusion_weights(
mconfig["vae"], access_token, precision=precision
)
return path
def _download_ckpt_weights(mconfig: DictConfig, access_token: str) -> Path:
repo_id = mconfig["repo_id"]
filename = mconfig["file"]
cache_dir = os.path.join(Globals.root, Model_dir, Weights_dir)
return hf_download_with_resume(
repo_id=repo_id,
model_dir=cache_dir,
model_name=filename,
access_token=access_token,
)
# ---------------------------------------------
def download_from_hf(
model_class: object, model_name: str, cache_subdir: Path = Path("hub"), **kwargs
):
path = global_cache_dir(cache_subdir)
model = model_class.from_pretrained(
model_name,
cache_dir=path,
resume_download=True,
**kwargs,
)
model_name = "--".join(("models", *model_name.split("/")))
return path / model_name if model else None
def _download_diffusion_weights(
mconfig: DictConfig, access_token: str, precision: str = "float32"
):
repo_id = mconfig["repo_id"]
model_class = (
StableDiffusionGeneratorPipeline
if mconfig.get("format", None) == "diffusers"
else AutoencoderKL
)
extra_arg_list = [{"revision": "fp16"}, {}] if precision == "float16" else [{}]
path = None
for extra_args in extra_arg_list:
try:
path = download_from_hf(
model_class,
repo_id,
safety_checker=None,
**extra_args,
)
except OSError as e:
if str(e).startswith("fp16 is not a valid"):
pass
else:
print(f"An unexpected error occurred while downloading the model: {e})")
if path:
break
return path
# ---------------------------------------------
def hf_download_with_resume(
repo_id: str, model_dir: str, model_name: str, access_token: str = None
) -> Path:
model_dest = Path(os.path.join(model_dir, model_name))
os.makedirs(model_dir, exist_ok=True)
url = hf_hub_url(repo_id, model_name)
header = {"Authorization": f"Bearer {access_token}"} if access_token else {}
open_mode = "wb"
exist_size = 0
if os.path.exists(model_dest):
exist_size = os.path.getsize(model_dest)
header["Range"] = f"bytes={exist_size}-"
open_mode = "ab"
resp = requests.get(url, headers=header, stream=True)
total = int(resp.headers.get("content-length", 0))
if (
resp.status_code == 416
): # "range not satisfiable", which means nothing to return
print(f"* {model_name}: complete file found. Skipping.")
return model_dest
elif resp.status_code != 200:
print(f"** An error occurred during downloading {model_name}: {resp.reason}")
elif exist_size > 0:
print(f"* {model_name}: partial file found. Resuming...")
else:
print(f"* {model_name}: Downloading...")
try:
if total < 2000:
print(f"*** ERROR DOWNLOADING {model_name}: {resp.text}")
return None
with open(model_dest, open_mode) as file, tqdm(
desc=model_name,
initial=exist_size,
total=total + exist_size,
unit="iB",
unit_scale=True,
unit_divisor=1000,
) as bar:
for data in resp.iter_content(chunk_size=1024):
size = file.write(data)
bar.update(size)
except Exception as e:
print(f"An error occurred while downloading {model_name}: {str(e)}")
return None
return model_dest
# ---------------------------------------------
def update_config_file(successfully_downloaded: dict, config_file: Path):
config_file = (
Path(config_file) if config_file is not None else default_config_file()
)
# In some cases (incomplete setup, etc), the default configs directory might be missing.
# Create it if it doesn't exist.
# this check is ignored if opt.config_file is specified - user is assumed to know what they
# are doing if they are passing a custom config file from elsewhere.
if config_file is default_config_file() and not config_file.parent.exists():
configs_src = Dataset_path.parent
configs_dest = default_config_file().parent
shutil.copytree(configs_src, configs_dest, dirs_exist_ok=True)
yaml = new_config_file_contents(successfully_downloaded, config_file)
try:
backup = None
if os.path.exists(config_file):
print(
f"** {config_file.name} exists. Renaming to {config_file.stem}.yaml.orig"
)
backup = config_file.with_suffix(".yaml.orig")
## Ugh. Windows is unable to overwrite an existing backup file, raises a WinError 183
if sys.platform == "win32" and backup.is_file():
backup.unlink()
config_file.rename(backup)
with TemporaryFile() as tmp:
tmp.write(Config_preamble.encode())
tmp.write(yaml.encode())
with open(str(config_file.expanduser().resolve()), "wb") as new_config:
tmp.seek(0)
new_config.write(tmp.read())
except Exception as e:
print(f"**Error creating config file {config_file}: {str(e)} **")
if backup is not None:
print("restoring previous config file")
## workaround, for WinError 183, see above
if sys.platform == "win32" and config_file.is_file():
config_file.unlink()
backup.rename(config_file)
return
print(f"Successfully created new configuration file {config_file}")
# ---------------------------------------------
def new_config_file_contents(
successfully_downloaded: dict,
config_file: Path,
) -> str:
if config_file.exists():
conf = OmegaConf.load(str(config_file.expanduser().resolve()))
else:
conf = OmegaConf.create()
default_selected = None
for model in successfully_downloaded:
# a bit hacky - what we are doing here is seeing whether a checkpoint
# version of the model was previously defined, and whether the current
# model is a diffusers (indicated with a path)
if conf.get(model) and Path(successfully_downloaded[model]).is_dir():
delete_weights(model, conf[model])
stanza = {}
mod = initial_models()[model]
stanza["description"] = mod["description"]
stanza["repo_id"] = mod["repo_id"]
stanza["format"] = mod["format"]
# diffusers don't need width and height (probably .ckpt doesn't either)
# so we no longer require these in INITIAL_MODELS.yaml
if "width" in mod:
stanza["width"] = mod["width"]
if "height" in mod:
stanza["height"] = mod["height"]
if "file" in mod:
stanza["weights"] = os.path.relpath(
successfully_downloaded[model], start=Globals.root
)
stanza["config"] = os.path.normpath(
os.path.join(sd_configs(), mod["config"])
)
if "vae" in mod:
if "file" in mod["vae"]:
stanza["vae"] = os.path.normpath(
os.path.join(Model_dir, Weights_dir, mod["vae"]["file"])
)
else:
stanza["vae"] = mod["vae"]
if mod.get("default", False):
stanza["default"] = True
default_selected = True
conf[model] = stanza
# if no default model was chosen, then we select the first
# one in the list
if not default_selected:
conf[list(successfully_downloaded.keys())[0]]["default"] = True
return OmegaConf.to_yaml(conf)
# ---------------------------------------------
def delete_weights(model_name: str, conf_stanza: dict):
if not (weights := conf_stanza.get("weights")):
return
if re.match("/VAE/", conf_stanza.get("config")):
return
print(
f"\n** The checkpoint version of {model_name} is superseded by the diffusers version. Deleting the original file {weights}?"
)
weights = Path(weights)
if not weights.is_absolute():
weights = Path(Globals.root) / weights
try:
weights.unlink()
except OSError as e:
print(str(e))

File diff suppressed because it is too large Load Diff

View File

@ -1,13 +0,0 @@
"""
Initialization file for the invokeai.generator package
"""
from .base import (
InvokeAIGenerator,
InvokeAIGeneratorBasicParams,
InvokeAIGeneratorOutput,
Txt2Img,
Img2Img,
Inpaint,
Generator,
)
from .inpaint import infill_methods

View File

@ -1,649 +0,0 @@
"""
Base class for invokeai.backend.generator.*
including img2img, txt2img, and inpaint
"""
from __future__ import annotations
import itertools
import dataclasses
import diffusers
import os
import random
import traceback
from abc import ABCMeta
from argparse import Namespace
from contextlib import nullcontext
import cv2
import numpy as np
import torch
from PIL import Image, ImageChops, ImageFilter
from accelerate.utils import set_seed
from diffusers import DiffusionPipeline
from tqdm import trange
from typing import Callable, List, Iterator, Optional, Type
from dataclasses import dataclass, field
from diffusers.schedulers import SchedulerMixin as Scheduler
from ..image_util import configure_model_padding
from ..util.util import rand_perlin_2d
from ..safety_checker import SafetyChecker
from ..prompting.conditioning import get_uc_and_c_and_ec
from ..stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline
downsampling = 8
@dataclass
class InvokeAIGeneratorBasicParams:
seed: Optional[int]=None
width: int=512
height: int=512
cfg_scale: float=7.5
steps: int=20
ddim_eta: float=0.0
scheduler: str='ddim'
precision: str='float16'
perlin: float=0.0
threshold: float=0.0
seamless: bool=False
seamless_axes: List[str]=field(default_factory=lambda: ['x', 'y'])
h_symmetry_time_pct: Optional[float]=None
v_symmetry_time_pct: Optional[float]=None
variation_amount: float = 0.0
with_variations: list=field(default_factory=list)
safety_checker: Optional[SafetyChecker]=None
@dataclass
class InvokeAIGeneratorOutput:
'''
InvokeAIGeneratorOutput is a dataclass that contains the outputs of a generation
operation, including the image, its seed, the model name used to generate the image
and the model hash, as well as all the generate() parameters that went into
generating the image (in .params, also available as attributes)
'''
image: Image.Image
seed: int
model_hash: str
attention_maps_images: List[Image.Image]
params: Namespace
# we are interposing a wrapper around the original Generator classes so that
# old code that calls Generate will continue to work.
class InvokeAIGenerator(metaclass=ABCMeta):
scheduler_map = dict(
ddim=diffusers.DDIMScheduler,
dpmpp_2=diffusers.DPMSolverMultistepScheduler,
k_dpm_2=diffusers.KDPM2DiscreteScheduler,
k_dpm_2_a=diffusers.KDPM2AncestralDiscreteScheduler,
k_dpmpp_2=diffusers.DPMSolverMultistepScheduler,
k_euler=diffusers.EulerDiscreteScheduler,
k_euler_a=diffusers.EulerAncestralDiscreteScheduler,
k_heun=diffusers.HeunDiscreteScheduler,
k_lms=diffusers.LMSDiscreteScheduler,
plms=diffusers.PNDMScheduler,
)
def __init__(self,
model_info: dict,
params: InvokeAIGeneratorBasicParams=InvokeAIGeneratorBasicParams(),
):
self.model_info=model_info
self.params=params
def generate(self,
prompt: str='',
callback: Optional[Callable]=None,
step_callback: Optional[Callable]=None,
iterations: int=1,
**keyword_args,
)->Iterator[InvokeAIGeneratorOutput]:
'''
Return an iterator across the indicated number of generations.
Each time the iterator is called it will return an InvokeAIGeneratorOutput
object. Use like this:
outputs = txt2img.generate(prompt='banana sushi', iterations=5)
for result in outputs:
print(result.image, result.seed)
In the typical case of wanting to get just a single image, iterations
defaults to 1 and do:
output = next(txt2img.generate(prompt='banana sushi')
Pass None to get an infinite iterator.
outputs = txt2img.generate(prompt='banana sushi', iterations=None)
for o in outputs:
print(o.image, o.seed)
'''
generator_args = dataclasses.asdict(self.params)
generator_args.update(keyword_args)
model_info = self.model_info
model_name = model_info['model_name']
model:StableDiffusionGeneratorPipeline = model_info['model']
model_hash = model_info['hash']
scheduler: Scheduler = self.get_scheduler(
model=model,
scheduler_name=generator_args.get('scheduler')
)
uc, c, extra_conditioning_info = get_uc_and_c_and_ec(prompt,model=model)
gen_class = self._generator_class()
generator = gen_class(model, self.params.precision)
if self.params.variation_amount > 0:
generator.set_variation(generator_args.get('seed'),
generator_args.get('variation_amount'),
generator_args.get('with_variations')
)
if isinstance(model, DiffusionPipeline):
for component in [model.unet, model.vae]:
configure_model_padding(component,
generator_args.get('seamless',False),
generator_args.get('seamless_axes')
)
else:
configure_model_padding(model,
generator_args.get('seamless',False),
generator_args.get('seamless_axes')
)
iteration_count = range(iterations) if iterations else itertools.count(start=0, step=1)
for i in iteration_count:
results = generator.generate(prompt,
conditioning=(uc, c, extra_conditioning_info),
step_callback=step_callback,
sampler=scheduler,
**generator_args,
)
output = InvokeAIGeneratorOutput(
image=results[0][0],
seed=results[0][1],
attention_maps_images=results[0][2],
model_hash = model_hash,
params=Namespace(model_name=model_name,**generator_args),
)
if callback:
callback(output)
yield output
@classmethod
def schedulers(self)->List[str]:
'''
Return list of all the schedulers that we currently handle.
'''
return list(self.scheduler_map.keys())
def load_generator(self, model: StableDiffusionGeneratorPipeline, generator_class: Type[Generator]):
return generator_class(model, self.params.precision)
def get_scheduler(self, scheduler_name:str, model: StableDiffusionGeneratorPipeline)->Scheduler:
scheduler_class = self.scheduler_map.get(scheduler_name,'ddim')
scheduler = scheduler_class.from_config(model.scheduler.config)
# hack copied over from generate.py
if not hasattr(scheduler, 'uses_inpainting_model'):
scheduler.uses_inpainting_model = lambda: False
return scheduler
@classmethod
def _generator_class(cls)->Type[Generator]:
'''
In derived classes return the name of the generator to apply.
If you don't override will return the name of the derived
class, which nicely parallels the generator class names.
'''
return Generator
# ------------------------------------
class Txt2Img(InvokeAIGenerator):
@classmethod
def _generator_class(cls):
from .txt2img import Txt2Img
return Txt2Img
# ------------------------------------
class Img2Img(InvokeAIGenerator):
def generate(self,
init_image: Image.Image | torch.FloatTensor,
strength: float=0.75,
**keyword_args
)->Iterator[InvokeAIGeneratorOutput]:
return super().generate(init_image=init_image,
strength=strength,
**keyword_args
)
@classmethod
def _generator_class(cls):
from .img2img import Img2Img
return Img2Img
# ------------------------------------
# Takes all the arguments of Img2Img and adds the mask image and the seam/infill stuff
class Inpaint(Img2Img):
def generate(self,
mask_image: Image.Image | torch.FloatTensor,
# Seam settings - when 0, doesn't fill seam
seam_size: int = 0,
seam_blur: int = 0,
seam_strength: float = 0.7,
seam_steps: int = 10,
tile_size: int = 32,
inpaint_replace=False,
infill_method=None,
inpaint_width=None,
inpaint_height=None,
inpaint_fill: tuple(int) = (0x7F, 0x7F, 0x7F, 0xFF),
**keyword_args
)->Iterator[InvokeAIGeneratorOutput]:
return super().generate(
mask_image=mask_image,
seam_size=seam_size,
seam_blur=seam_blur,
seam_strength=seam_strength,
seam_steps=seam_steps,
tile_size=tile_size,
inpaint_replace=inpaint_replace,
infill_method=infill_method,
inpaint_width=inpaint_width,
inpaint_height=inpaint_height,
inpaint_fill=inpaint_fill,
**keyword_args
)
@classmethod
def _generator_class(cls):
from .inpaint import Inpaint
return Inpaint
# ------------------------------------
class Embiggen(Txt2Img):
def generate(
self,
embiggen: list=None,
embiggen_tiles: list = None,
strength: float=0.75,
**kwargs)->Iterator[InvokeAIGeneratorOutput]:
return super().generate(embiggen=embiggen,
embiggen_tiles=embiggen_tiles,
strength=strength,
**kwargs)
@classmethod
def _generator_class(cls):
from .embiggen import Embiggen
return Embiggen
class Generator:
downsampling_factor: int
latent_channels: int
precision: str
model: DiffusionPipeline
def __init__(self, model: DiffusionPipeline, precision: str):
self.model = model
self.precision = precision
self.seed = None
self.latent_channels = model.channels
self.downsampling_factor = downsampling # BUG: should come from model or config
self.safety_checker = None
self.perlin = 0.0
self.threshold = 0
self.variation_amount = 0
self.with_variations = []
self.use_mps_noise = False
self.free_gpu_mem = None
# this is going to be overridden in img2img.py, txt2img.py and inpaint.py
def get_make_image(self, prompt, **kwargs):
"""
Returns a function returning an image derived from the prompt and the initial image
Return value depends on the seed at the time you call it
"""
raise NotImplementedError(
"image_iterator() must be implemented in a descendent class"
)
def set_variation(self, seed, variation_amount, with_variations):
self.seed = seed
self.variation_amount = variation_amount
self.with_variations = with_variations
def generate(
self,
prompt,
width,
height,
sampler,
init_image=None,
iterations=1,
seed=None,
image_callback=None,
step_callback=None,
threshold=0.0,
perlin=0.0,
h_symmetry_time_pct=None,
v_symmetry_time_pct=None,
safety_checker: SafetyChecker=None,
free_gpu_mem: bool = False,
**kwargs,
):
scope = nullcontext
self.safety_checker = safety_checker
self.free_gpu_mem = free_gpu_mem
attention_maps_images = []
attention_maps_callback = lambda saver: attention_maps_images.append(
saver.get_stacked_maps_image()
)
make_image = self.get_make_image(
prompt,
sampler=sampler,
init_image=init_image,
width=width,
height=height,
step_callback=step_callback,
threshold=threshold,
perlin=perlin,
h_symmetry_time_pct=h_symmetry_time_pct,
v_symmetry_time_pct=v_symmetry_time_pct,
attention_maps_callback=attention_maps_callback,
**kwargs,
)
results = []
seed = seed if seed is not None and seed >= 0 else self.new_seed()
first_seed = seed
seed, initial_noise = self.generate_initial_noise(seed, width, height)
# There used to be an additional self.model.ema_scope() here, but it breaks
# the inpaint-1.5 model. Not sure what it did.... ?
with scope(self.model.device.type):
for n in trange(iterations, desc="Generating"):
x_T = None
if self.variation_amount > 0:
set_seed(seed)
target_noise = self.get_noise(width, height)
x_T = self.slerp(self.variation_amount, initial_noise, target_noise)
elif initial_noise is not None:
# i.e. we specified particular variations
x_T = initial_noise
else:
set_seed(seed)
try:
x_T = self.get_noise(width, height)
except:
print("** An error occurred while getting initial noise **")
print(traceback.format_exc())
# Pass on the seed in case a layer beneath us needs to generate noise on its own.
image = make_image(x_T, seed)
if self.safety_checker is not None:
image = self.safety_checker.check(image)
results.append([image, seed, attention_maps_images])
if image_callback is not None:
attention_maps_image = (
None
if len(attention_maps_images) == 0
else attention_maps_images[-1]
)
image_callback(
image,
seed,
first_seed=first_seed,
attention_maps_image=attention_maps_image,
)
seed = self.new_seed()
# Free up memory from the last generation.
clear_cuda_cache = (
kwargs["clear_cuda_cache"] if "clear_cuda_cache" in kwargs else None
)
if clear_cuda_cache is not None:
clear_cuda_cache()
return results
def sample_to_image(self, samples) -> Image.Image:
"""
Given samples returned from a sampler, converts
it into a PIL Image
"""
with torch.inference_mode():
image = self.model.decode_latents(samples)
return self.model.numpy_to_pil(image)[0]
def repaste_and_color_correct(
self,
result: Image.Image,
init_image: Image.Image,
init_mask: Image.Image,
mask_blur_radius: int = 8,
) -> Image.Image:
if init_image is None or init_mask is None:
return result
# Get the original alpha channel of the mask if there is one.
# Otherwise it is some other black/white image format ('1', 'L' or 'RGB')
pil_init_mask = (
init_mask.getchannel("A")
if init_mask.mode == "RGBA"
else init_mask.convert("L")
)
pil_init_image = init_image.convert(
"RGBA"
) # Add an alpha channel if one doesn't exist
# Build an image with only visible pixels from source to use as reference for color-matching.
init_rgb_pixels = np.asarray(init_image.convert("RGB"), dtype=np.uint8)
init_a_pixels = np.asarray(pil_init_image.getchannel("A"), dtype=np.uint8)
init_mask_pixels = np.asarray(pil_init_mask, dtype=np.uint8)
# Get numpy version of result
np_image = np.asarray(result, dtype=np.uint8)
# Mask and calculate mean and standard deviation
mask_pixels = init_a_pixels * init_mask_pixels > 0
np_init_rgb_pixels_masked = init_rgb_pixels[mask_pixels, :]
np_image_masked = np_image[mask_pixels, :]
if np_init_rgb_pixels_masked.size > 0:
init_means = np_init_rgb_pixels_masked.mean(axis=0)
init_std = np_init_rgb_pixels_masked.std(axis=0)
gen_means = np_image_masked.mean(axis=0)
gen_std = np_image_masked.std(axis=0)
# Color correct
np_matched_result = np_image.copy()
np_matched_result[:, :, :] = (
(
(
(
np_matched_result[:, :, :].astype(np.float32)
- gen_means[None, None, :]
)
/ gen_std[None, None, :]
)
* init_std[None, None, :]
+ init_means[None, None, :]
)
.clip(0, 255)
.astype(np.uint8)
)
matched_result = Image.fromarray(np_matched_result, mode="RGB")
else:
matched_result = Image.fromarray(np_image, mode="RGB")
# Blur the mask out (into init image) by specified amount
if mask_blur_radius > 0:
nm = np.asarray(pil_init_mask, dtype=np.uint8)
nmd = cv2.erode(
nm,
kernel=np.ones((3, 3), dtype=np.uint8),
iterations=int(mask_blur_radius / 2),
)
pmd = Image.fromarray(nmd, mode="L")
blurred_init_mask = pmd.filter(ImageFilter.BoxBlur(mask_blur_radius))
else:
blurred_init_mask = pil_init_mask
multiplied_blurred_init_mask = ImageChops.multiply(
blurred_init_mask, self.pil_image.split()[-1]
)
# Paste original on color-corrected generation (using blurred mask)
matched_result.paste(init_image, (0, 0), mask=multiplied_blurred_init_mask)
return matched_result
@staticmethod
def sample_to_lowres_estimated_image(samples):
# origingally adapted from code by @erucipe and @keturn here:
# https://discuss.huggingface.co/t/decoding-latents-to-rgb-without-upscaling/23204/7
# these updated numbers for v1.5 are from @torridgristle
v1_5_latent_rgb_factors = torch.tensor(
[
# R G B
[0.3444, 0.1385, 0.0670], # L1
[0.1247, 0.4027, 0.1494], # L2
[-0.3192, 0.2513, 0.2103], # L3
[-0.1307, -0.1874, -0.7445], # L4
],
dtype=samples.dtype,
device=samples.device,
)
latent_image = samples[0].permute(1, 2, 0) @ v1_5_latent_rgb_factors
latents_ubyte = (
((latent_image + 1) / 2)
.clamp(0, 1) # change scale from -1..1 to 0..1
.mul(0xFF) # to 0..255
.byte()
).cpu()
return Image.fromarray(latents_ubyte.numpy())
def generate_initial_noise(self, seed, width, height):
initial_noise = None
if self.variation_amount > 0 or len(self.with_variations) > 0:
# use fixed initial noise plus random noise per iteration
set_seed(seed)
initial_noise = self.get_noise(width, height)
for v_seed, v_weight in self.with_variations:
seed = v_seed
set_seed(seed)
next_noise = self.get_noise(width, height)
initial_noise = self.slerp(v_weight, initial_noise, next_noise)
if self.variation_amount > 0:
random.seed() # reset RNG to an actually random state, so we can get a random seed for variations
seed = random.randrange(0, np.iinfo(np.uint32).max)
return (seed, initial_noise)
def get_perlin_noise(self, width, height):
fixdevice = "cpu" if (self.model.device.type == "mps") else self.model.device
# limit noise to only the diffusion image channels, not the mask channels
input_channels = min(self.latent_channels, 4)
# round up to the nearest block of 8
temp_width = int((width + 7) / 8) * 8
temp_height = int((height + 7) / 8) * 8
noise = torch.stack(
[
rand_perlin_2d(
(temp_height, temp_width), (8, 8), device=self.model.device
).to(fixdevice)
for _ in range(input_channels)
],
dim=0,
).to(self.model.device)
return noise[0:4, 0:height, 0:width]
def new_seed(self):
self.seed = random.randrange(0, np.iinfo(np.uint32).max)
return self.seed
def slerp(self, t, v0, v1, DOT_THRESHOLD=0.9995):
"""
Spherical linear interpolation
Args:
t (float/np.ndarray): Float value between 0.0 and 1.0
v0 (np.ndarray): Starting vector
v1 (np.ndarray): Final vector
DOT_THRESHOLD (float): Threshold for considering the two vectors as
colineal. Not recommended to alter this.
Returns:
v2 (np.ndarray): Interpolation vector between v0 and v1
"""
inputs_are_torch = False
if not isinstance(v0, np.ndarray):
inputs_are_torch = True
v0 = v0.detach().cpu().numpy()
if not isinstance(v1, np.ndarray):
inputs_are_torch = True
v1 = v1.detach().cpu().numpy()
dot = np.sum(v0 * v1 / (np.linalg.norm(v0) * np.linalg.norm(v1)))
if np.abs(dot) > DOT_THRESHOLD:
v2 = (1 - t) * v0 + t * v1
else:
theta_0 = np.arccos(dot)
sin_theta_0 = np.sin(theta_0)
theta_t = theta_0 * t
sin_theta_t = np.sin(theta_t)
s0 = np.sin(theta_0 - theta_t) / sin_theta_0
s1 = sin_theta_t / sin_theta_0
v2 = s0 * v0 + s1 * v1
if inputs_are_torch:
v2 = torch.from_numpy(v2).to(self.model.device)
return v2
# this is a handy routine for debugging use. Given a generated sample,
# convert it into a PNG image and store it at the indicated path
def save_sample(self, sample, filepath):
image = self.sample_to_image(sample)
dirname = os.path.dirname(filepath) or "."
if not os.path.exists(dirname):
print(f"** creating directory {dirname}")
os.makedirs(dirname, exist_ok=True)
image.save(filepath, "PNG")
def torch_dtype(self) -> torch.dtype:
return torch.float16 if self.precision == "float16" else torch.float32
# returns a tensor filled with random numbers from a normal distribution
def get_noise(self, width, height):
device = self.model.device
# limit noise to only the diffusion image channels, not the mask channels
input_channels = min(self.latent_channels, 4)
if self.use_mps_noise or device.type == "mps":
x = torch.randn(
[
1,
input_channels,
height // self.downsampling_factor,
width // self.downsampling_factor,
],
dtype=self.torch_dtype(),
device="cpu",
).to(device)
else:
x = torch.randn(
[
1,
input_channels,
height // self.downsampling_factor,
width // self.downsampling_factor,
],
dtype=self.torch_dtype(),
device=device,
)
if self.perlin > 0.0:
perlin_noise = self.get_perlin_noise(
width // self.downsampling_factor, height // self.downsampling_factor
)
x = (1 - self.perlin) * x + self.perlin * perlin_noise
return x

View File

@ -1,558 +0,0 @@
"""
invokeai.backend.generator.embiggen descends from .generator
and generates with .generator.img2img
"""
import numpy as np
import torch
from PIL import Image
from tqdm import trange
from .base import Generator
from .img2img import Img2Img
class Embiggen(Generator):
def __init__(self, model, precision):
super().__init__(model, precision)
self.init_latent = None
# Replace generate because Embiggen doesn't need/use most of what it does normallly
def generate(
self,
prompt,
iterations=1,
seed=None,
image_callback=None,
step_callback=None,
**kwargs,
):
make_image = self.get_make_image(prompt, step_callback=step_callback, **kwargs)
results = []
seed = seed if seed else self.new_seed()
# Noise will be generated by the Img2Img generator when called
for _ in trange(iterations, desc="Generating"):
# make_image will call Img2Img which will do the equivalent of get_noise itself
image = make_image()
results.append([image, seed])
if image_callback is not None:
image_callback(image, seed, prompt_in=prompt)
seed = self.new_seed()
return results
@torch.no_grad()
def get_make_image(
self,
prompt,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
init_img,
strength,
width,
height,
embiggen,
embiggen_tiles,
step_callback=None,
**kwargs,
):
"""
Returns a function returning an image derived from the prompt and multi-stage twice-baked potato layering over the img2img on the initial image
Return value depends on the seed at the time you call it
"""
assert (
not sampler.uses_inpainting_model()
), "--embiggen is not supported by inpainting models"
# Construct embiggen arg array, and sanity check arguments
if embiggen == None: # embiggen can also be called with just embiggen_tiles
embiggen = [1.0] # If not specified, assume no scaling
elif embiggen[0] < 0:
embiggen[0] = 1.0
print(
">> Embiggen scaling factor cannot be negative, fell back to the default of 1.0 !"
)
if len(embiggen) < 2:
embiggen.append(0.75)
elif embiggen[1] > 1.0 or embiggen[1] < 0:
embiggen[1] = 0.75
print(
">> Embiggen upscaling strength for ESRGAN must be between 0 and 1, fell back to the default of 0.75 !"
)
if len(embiggen) < 3:
embiggen.append(0.25)
elif embiggen[2] < 0:
embiggen[2] = 0.25
print(
">> Overlap size for Embiggen must be a positive ratio between 0 and 1 OR a number of pixels, fell back to the default of 0.25 !"
)
# Convert tiles from their user-freindly count-from-one to count-from-zero, because we need to do modulo math
# and then sort them, because... people.
if embiggen_tiles:
embiggen_tiles = list(map(lambda n: n - 1, embiggen_tiles))
embiggen_tiles.sort()
if strength >= 0.5:
print(
f"* WARNING: Embiggen may produce mirror motifs if the strength (-f) is too high (currently {strength}). Try values between 0.35-0.45."
)
# Prep img2img generator, since we wrap over it
gen_img2img = Img2Img(self.model, self.precision)
# Open original init image (not a tensor) to manipulate
initsuperimage = Image.open(init_img)
with Image.open(init_img) as img:
initsuperimage = img.convert("RGB")
# Size of the target super init image in pixels
initsuperwidth, initsuperheight = initsuperimage.size
# Increase by scaling factor if not already resized, using ESRGAN as able
if embiggen[0] != 1.0:
initsuperwidth = round(initsuperwidth * embiggen[0])
initsuperheight = round(initsuperheight * embiggen[0])
if embiggen[1] > 0: # No point in ESRGAN upscaling if strength is set zero
from ..restoration.realesrgan import ESRGAN
esrgan = ESRGAN()
print(
f">> ESRGAN upscaling init image prior to cutting with Embiggen with strength {embiggen[1]}"
)
if embiggen[0] > 2:
initsuperimage = esrgan.process(
initsuperimage,
embiggen[1], # upscale strength
self.seed,
4, # upscale scale
)
else:
initsuperimage = esrgan.process(
initsuperimage,
embiggen[1], # upscale strength
self.seed,
2, # upscale scale
)
# We could keep recursively re-running ESRGAN for a requested embiggen[0] larger than 4x
# but from personal experiance it doesn't greatly improve anything after 4x
# Resize to target scaling factor resolution
initsuperimage = initsuperimage.resize(
(initsuperwidth, initsuperheight), Image.Resampling.LANCZOS
)
# Use width and height as tile widths and height
# Determine buffer size in pixels
if embiggen[2] < 1:
if embiggen[2] < 0:
embiggen[2] = 0
overlap_size_x = round(embiggen[2] * width)
overlap_size_y = round(embiggen[2] * height)
else:
overlap_size_x = round(embiggen[2])
overlap_size_y = round(embiggen[2])
# With overall image width and height known, determine how many tiles we need
def ceildiv(a, b):
return -1 * (-a // b)
# X and Y needs to be determined independantly (we may have savings on one based on the buffer pixel count)
# (initsuperwidth - width) is the area remaining to the right that we need to layers tiles to fill
# (width - overlap_size_x) is how much new we can fill with a single tile
emb_tiles_x = 1
emb_tiles_y = 1
if (initsuperwidth - width) > 0:
emb_tiles_x = ceildiv(initsuperwidth - width, width - overlap_size_x) + 1
if (initsuperheight - height) > 0:
emb_tiles_y = ceildiv(initsuperheight - height, height - overlap_size_y) + 1
# Sanity
assert (
emb_tiles_x > 1 or emb_tiles_y > 1
), f"ERROR: Based on the requested dimensions of {initsuperwidth}x{initsuperheight} and tiles of {width}x{height} you don't need to Embiggen! Check your arguments."
# Prep alpha layers --------------
# https://stackoverflow.com/questions/69321734/how-to-create-different-transparency-like-gradient-with-python-pil
# agradientL is Left-side transparent
agradientL = (
Image.linear_gradient("L").rotate(90).resize((overlap_size_x, height))
)
# agradientT is Top-side transparent
agradientT = Image.linear_gradient("L").resize((width, overlap_size_y))
# radial corner is the left-top corner, made full circle then cut to just the left-top quadrant
agradientC = Image.new("L", (256, 256))
for y in range(256):
for x in range(256):
# Find distance to lower right corner (numpy takes arrays)
distanceToLR = np.sqrt([(255 - x) ** 2 + (255 - y) ** 2])[0]
# Clamp values to max 255
if distanceToLR > 255:
distanceToLR = 255
# Place the pixel as invert of distance
agradientC.putpixel((x, y), round(255 - distanceToLR))
# Create alternative asymmetric diagonal corner to use on "tailing" intersections to prevent hard edges
# Fits for a left-fading gradient on the bottom side and full opacity on the right side.
agradientAsymC = Image.new("L", (256, 256))
for y in range(256):
for x in range(256):
value = round(max(0, x - (255 - y)) * (255 / max(1, y)))
# Clamp values
value = max(0, value)
value = min(255, value)
agradientAsymC.putpixel((x, y), value)
# Create alpha layers default fully white
alphaLayerL = Image.new("L", (width, height), 255)
alphaLayerT = Image.new("L", (width, height), 255)
alphaLayerLTC = Image.new("L", (width, height), 255)
# Paste gradients into alpha layers
alphaLayerL.paste(agradientL, (0, 0))
alphaLayerT.paste(agradientT, (0, 0))
alphaLayerLTC.paste(agradientL, (0, 0))
alphaLayerLTC.paste(agradientT, (0, 0))
alphaLayerLTC.paste(agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0))
# make masks with an asymmetric upper-right corner so when the curved transparent corner of the next tile
# to its right is placed it doesn't reveal a hard trailing semi-transparent edge in the overlapping space
alphaLayerTaC = alphaLayerT.copy()
alphaLayerTaC.paste(
agradientAsymC.rotate(270).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, 0),
)
alphaLayerLTaC = alphaLayerLTC.copy()
alphaLayerLTaC.paste(
agradientAsymC.rotate(270).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, 0),
)
if embiggen_tiles:
# Individual unconnected sides
alphaLayerR = Image.new("L", (width, height), 255)
alphaLayerR.paste(agradientL.rotate(180), (width - overlap_size_x, 0))
alphaLayerB = Image.new("L", (width, height), 255)
alphaLayerB.paste(agradientT.rotate(180), (0, height - overlap_size_y))
alphaLayerTB = Image.new("L", (width, height), 255)
alphaLayerTB.paste(agradientT, (0, 0))
alphaLayerTB.paste(agradientT.rotate(180), (0, height - overlap_size_y))
alphaLayerLR = Image.new("L", (width, height), 255)
alphaLayerLR.paste(agradientL, (0, 0))
alphaLayerLR.paste(agradientL.rotate(180), (width - overlap_size_x, 0))
# Sides and corner Layers
alphaLayerRBC = Image.new("L", (width, height), 255)
alphaLayerRBC.paste(agradientL.rotate(180), (width - overlap_size_x, 0))
alphaLayerRBC.paste(agradientT.rotate(180), (0, height - overlap_size_y))
alphaLayerRBC.paste(
agradientC.rotate(180).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, height - overlap_size_y),
)
alphaLayerLBC = Image.new("L", (width, height), 255)
alphaLayerLBC.paste(agradientL, (0, 0))
alphaLayerLBC.paste(agradientT.rotate(180), (0, height - overlap_size_y))
alphaLayerLBC.paste(
agradientC.rotate(90).resize((overlap_size_x, overlap_size_y)),
(0, height - overlap_size_y),
)
alphaLayerRTC = Image.new("L", (width, height), 255)
alphaLayerRTC.paste(agradientL.rotate(180), (width - overlap_size_x, 0))
alphaLayerRTC.paste(agradientT, (0, 0))
alphaLayerRTC.paste(
agradientC.rotate(270).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, 0),
)
# All but X layers
alphaLayerABT = Image.new("L", (width, height), 255)
alphaLayerABT.paste(alphaLayerLBC, (0, 0))
alphaLayerABT.paste(agradientL.rotate(180), (width - overlap_size_x, 0))
alphaLayerABT.paste(
agradientC.rotate(180).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, height - overlap_size_y),
)
alphaLayerABL = Image.new("L", (width, height), 255)
alphaLayerABL.paste(alphaLayerRTC, (0, 0))
alphaLayerABL.paste(agradientT.rotate(180), (0, height - overlap_size_y))
alphaLayerABL.paste(
agradientC.rotate(180).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, height - overlap_size_y),
)
alphaLayerABR = Image.new("L", (width, height), 255)
alphaLayerABR.paste(alphaLayerLBC, (0, 0))
alphaLayerABR.paste(agradientT, (0, 0))
alphaLayerABR.paste(
agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0)
)
alphaLayerABB = Image.new("L", (width, height), 255)
alphaLayerABB.paste(alphaLayerRTC, (0, 0))
alphaLayerABB.paste(agradientL, (0, 0))
alphaLayerABB.paste(
agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0)
)
# All-around layer
alphaLayerAA = Image.new("L", (width, height), 255)
alphaLayerAA.paste(alphaLayerABT, (0, 0))
alphaLayerAA.paste(agradientT, (0, 0))
alphaLayerAA.paste(
agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0)
)
alphaLayerAA.paste(
agradientC.rotate(270).resize((overlap_size_x, overlap_size_y)),
(width - overlap_size_x, 0),
)
# Clean up temporary gradients
del agradientL
del agradientT
del agradientC
def make_image():
# Make main tiles -------------------------------------------------
if embiggen_tiles:
print(f">> Making {len(embiggen_tiles)} Embiggen tiles...")
else:
print(
f">> Making {(emb_tiles_x * emb_tiles_y)} Embiggen tiles ({emb_tiles_x}x{emb_tiles_y})..."
)
emb_tile_store = []
# Although we could use the same seed for every tile for determinism, at higher strengths this may
# produce duplicated structures for each tile and make the tiling effect more obvious
# instead track and iterate a local seed we pass to Img2Img
seed = self.seed
seedintlimit = (
np.iinfo(np.uint32).max - 1
) # only retreive this one from numpy
for tile in range(emb_tiles_x * emb_tiles_y):
# Don't iterate on first tile
if tile != 0:
if seed < seedintlimit:
seed += 1
else:
seed = 0
# Determine if this is a re-run and replace
if embiggen_tiles and not tile in embiggen_tiles:
continue
# Get row and column entries
emb_row_i = tile // emb_tiles_x
emb_column_i = tile % emb_tiles_x
# Determine bounds to cut up the init image
# Determine upper-left point
if emb_column_i + 1 == emb_tiles_x:
left = initsuperwidth - width
else:
left = round(emb_column_i * (width - overlap_size_x))
if emb_row_i + 1 == emb_tiles_y:
top = initsuperheight - height
else:
top = round(emb_row_i * (height - overlap_size_y))
right = left + width
bottom = top + height
# Cropped image of above dimension (does not modify the original)
newinitimage = initsuperimage.crop((left, top, right, bottom))
# DEBUG:
# newinitimagepath = init_img[0:-4] + f'_emb_Ti{tile}.png'
# newinitimage.save(newinitimagepath)
if embiggen_tiles:
print(
f"Making tile #{tile + 1} ({embiggen_tiles.index(tile) + 1} of {len(embiggen_tiles)} requested)"
)
else:
print(f"Starting {tile + 1} of {(emb_tiles_x * emb_tiles_y)} tiles")
# create a torch tensor from an Image
newinitimage = np.array(newinitimage).astype(np.float32) / 255.0
newinitimage = newinitimage[None].transpose(0, 3, 1, 2)
newinitimage = torch.from_numpy(newinitimage)
newinitimage = 2.0 * newinitimage - 1.0
newinitimage = newinitimage.to(self.model.device)
clear_cuda_cache = (
kwargs["clear_cuda_cache"] if "clear_cuda_cache" in kwargs else None
)
tile_results = gen_img2img.generate(
prompt,
iterations=1,
seed=seed,
sampler=sampler,
steps=steps,
cfg_scale=cfg_scale,
conditioning=conditioning,
ddim_eta=ddim_eta,
image_callback=None, # called only after the final image is generated
step_callback=step_callback, # called after each intermediate image is generated
width=width,
height=height,
init_image=newinitimage, # notice that init_image is different from init_img
mask_image=None,
strength=strength,
clear_cuda_cache=clear_cuda_cache,
)
emb_tile_store.append(tile_results[0][0])
# DEBUG (but, also has other uses), worth saving if you want tiles without a transparency overlap to manually composite
# emb_tile_store[-1].save(init_img[0:-4] + f'_emb_To{tile}.png')
del newinitimage
# Sanity check we have them all
if len(emb_tile_store) == (emb_tiles_x * emb_tiles_y) or (
embiggen_tiles != [] and len(emb_tile_store) == len(embiggen_tiles)
):
outputsuperimage = Image.new("RGBA", (initsuperwidth, initsuperheight))
if embiggen_tiles:
outputsuperimage.alpha_composite(
initsuperimage.convert("RGBA"), (0, 0)
)
for tile in range(emb_tiles_x * emb_tiles_y):
if embiggen_tiles:
if tile in embiggen_tiles:
intileimage = emb_tile_store.pop(0)
else:
continue
else:
intileimage = emb_tile_store[tile]
intileimage = intileimage.convert("RGBA")
# Get row and column entries
emb_row_i = tile // emb_tiles_x
emb_column_i = tile % emb_tiles_x
if emb_row_i == 0 and emb_column_i == 0 and not embiggen_tiles:
left = 0
top = 0
else:
# Determine upper-left point
if emb_column_i + 1 == emb_tiles_x:
left = initsuperwidth - width
else:
left = round(emb_column_i * (width - overlap_size_x))
if emb_row_i + 1 == emb_tiles_y:
top = initsuperheight - height
else:
top = round(emb_row_i * (height - overlap_size_y))
# Handle gradients for various conditions
# Handle emb_rerun case
if embiggen_tiles:
# top of image
if emb_row_i == 0:
if emb_column_i == 0:
if (tile + 1) in embiggen_tiles: # Look-ahead right
if (
tile + emb_tiles_x
) not in embiggen_tiles: # Look-ahead down
intileimage.putalpha(alphaLayerB)
# Otherwise do nothing on this tile
elif (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down only
intileimage.putalpha(alphaLayerR)
else:
intileimage.putalpha(alphaLayerRBC)
elif emb_column_i == emb_tiles_x - 1:
if (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down
intileimage.putalpha(alphaLayerL)
else:
intileimage.putalpha(alphaLayerLBC)
else:
if (tile + 1) in embiggen_tiles: # Look-ahead right
if (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down
intileimage.putalpha(alphaLayerL)
else:
intileimage.putalpha(alphaLayerLBC)
elif (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down only
intileimage.putalpha(alphaLayerLR)
else:
intileimage.putalpha(alphaLayerABT)
# bottom of image
elif emb_row_i == emb_tiles_y - 1:
if emb_column_i == 0:
if (tile + 1) in embiggen_tiles: # Look-ahead right
intileimage.putalpha(alphaLayerTaC)
else:
intileimage.putalpha(alphaLayerRTC)
elif emb_column_i == emb_tiles_x - 1:
# No tiles to look ahead to
intileimage.putalpha(alphaLayerLTC)
else:
if (tile + 1) in embiggen_tiles: # Look-ahead right
intileimage.putalpha(alphaLayerLTaC)
else:
intileimage.putalpha(alphaLayerABB)
# vertical middle of image
else:
if emb_column_i == 0:
if (tile + 1) in embiggen_tiles: # Look-ahead right
if (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down
intileimage.putalpha(alphaLayerTaC)
else:
intileimage.putalpha(alphaLayerTB)
elif (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down only
intileimage.putalpha(alphaLayerRTC)
else:
intileimage.putalpha(alphaLayerABL)
elif emb_column_i == emb_tiles_x - 1:
if (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down
intileimage.putalpha(alphaLayerLTC)
else:
intileimage.putalpha(alphaLayerABR)
else:
if (tile + 1) in embiggen_tiles: # Look-ahead right
if (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down
intileimage.putalpha(alphaLayerLTaC)
else:
intileimage.putalpha(alphaLayerABR)
elif (
tile + emb_tiles_x
) in embiggen_tiles: # Look-ahead down only
intileimage.putalpha(alphaLayerABB)
else:
intileimage.putalpha(alphaLayerAA)
# Handle normal tiling case (much simpler - since we tile left to right, top to bottom)
else:
if emb_row_i == 0 and emb_column_i >= 1:
intileimage.putalpha(alphaLayerL)
elif emb_row_i >= 1 and emb_column_i == 0:
if (
emb_column_i + 1 == emb_tiles_x
): # If we don't have anything that can be placed to the right
intileimage.putalpha(alphaLayerT)
else:
intileimage.putalpha(alphaLayerTaC)
else:
if (
emb_column_i + 1 == emb_tiles_x
): # If we don't have anything that can be placed to the right
intileimage.putalpha(alphaLayerLTC)
else:
intileimage.putalpha(alphaLayerLTaC)
# Layer tile onto final image
outputsuperimage.alpha_composite(intileimage, (left, top))
else:
print(
"Error: could not find all Embiggen output tiles in memory? Something must have gone wrong with img2img generation."
)
# after internal loops and patching up return Embiggen image
return outputsuperimage
# end of function declaration
return make_image

View File

@ -1,101 +0,0 @@
"""
invokeai.backend.generator.img2img descends from .generator
"""
from typing import Optional
import torch
from accelerate.utils import set_seed
from diffusers import logging
from ..stable_diffusion import (
ConditioningData,
PostprocessingSettings,
StableDiffusionGeneratorPipeline,
)
from .base import Generator
class Img2Img(Generator):
def __init__(self, model, precision):
super().__init__(model, precision)
self.init_latent = None # by get_noise()
def get_make_image(
self,
prompt,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
init_image,
strength,
step_callback=None,
threshold=0.0,
warmup=0.2,
perlin=0.0,
h_symmetry_time_pct=None,
v_symmetry_time_pct=None,
attention_maps_callback=None,
**kwargs,
):
"""
Returns a function returning an image derived from the prompt and the initial image
Return value depends on the seed at the time you call it.
"""
self.perlin = perlin
# noinspection PyTypeChecker
pipeline: StableDiffusionGeneratorPipeline = self.model
pipeline.scheduler = sampler
uc, c, extra_conditioning_info = conditioning
conditioning_data = ConditioningData(
uc,
c,
cfg_scale,
extra_conditioning_info,
postprocessing_settings=PostprocessingSettings(
threshold=threshold,
warmup=warmup,
h_symmetry_time_pct=h_symmetry_time_pct,
v_symmetry_time_pct=v_symmetry_time_pct,
),
).add_scheduler_args_if_applicable(pipeline.scheduler, eta=ddim_eta)
def make_image(x_T: torch.Tensor, seed: int):
# FIXME: use x_T for initial seeded noise
# We're not at the moment because the pipeline automatically resizes init_image if
# necessary, which the x_T input might not match.
# In the meantime, reset the seed prior to generating pipeline output so we at least get the same result.
logging.set_verbosity_error() # quench safety check warnings
pipeline_output = pipeline.img2img_from_embeddings(
init_image,
strength,
steps,
conditioning_data,
noise_func=self.get_noise_like,
callback=step_callback,
seed=seed,
)
if (
pipeline_output.attention_map_saver is not None
and attention_maps_callback is not None
):
attention_maps_callback(pipeline_output.attention_map_saver)
return pipeline.numpy_to_pil(pipeline_output.images)[0]
return make_image
def get_noise_like(self, like: torch.Tensor):
device = like.device
if device.type == "mps":
x = torch.randn_like(like, device="cpu").to(device)
else:
x = torch.randn_like(like, device=device)
if self.perlin > 0.0:
shape = like.shape
x = (1 - self.perlin) * x + self.perlin * self.get_perlin_noise(
shape[3], shape[2]
)
return x

View File

@ -1,411 +0,0 @@
"""
invokeai.backend.generator.inpaint descends from .generator
"""
from __future__ import annotations
import math
import cv2
import numpy as np
import PIL
import torch
from PIL import Image, ImageChops, ImageFilter, ImageOps
from ..image_util import PatchMatch, debug_image
from ..stable_diffusion.diffusers_pipeline import (
ConditioningData,
StableDiffusionGeneratorPipeline,
image_resized_to_grid_as_tensor,
)
from .img2img import Img2Img
def infill_methods() -> list[str]:
methods = [
"tile",
"solid",
]
if PatchMatch.patchmatch_available():
methods.insert(0, "patchmatch")
return methods
class Inpaint(Img2Img):
def __init__(self, model, precision):
self.inpaint_height = 0
self.inpaint_width = 0
self.enable_image_debugging = False
self.init_latent = None
self.pil_image = None
self.pil_mask = None
self.mask_blur_radius = 0
self.infill_method = None
super().__init__(model, precision)
# Outpaint support code
def get_tile_images(self, image: np.ndarray, width=8, height=8):
_nrows, _ncols, depth = image.shape
_strides = image.strides
nrows, _m = divmod(_nrows, height)
ncols, _n = divmod(_ncols, width)
if _m != 0 or _n != 0:
return None
return np.lib.stride_tricks.as_strided(
np.ravel(image),
shape=(nrows, ncols, height, width, depth),
strides=(height * _strides[0], width * _strides[1], *_strides),
writeable=False,
)
def infill_patchmatch(self, im: Image.Image) -> Image:
if im.mode != "RGBA":
return im
# Skip patchmatch if patchmatch isn't available
if not PatchMatch.patchmatch_available():
return im
# Patchmatch (note, we may want to expose patch_size? Increasing it significantly impacts performance though)
im_patched_np = PatchMatch.inpaint(
im.convert("RGB"), ImageOps.invert(im.split()[-1]), patch_size=3
)
im_patched = Image.fromarray(im_patched_np, mode="RGB")
return im_patched
def tile_fill_missing(
self, im: Image.Image, tile_size: int = 16, seed: int = None
) -> Image:
# Only fill if there's an alpha layer
if im.mode != "RGBA":
return im
a = np.asarray(im, dtype=np.uint8)
tile_size = (tile_size, tile_size)
# Get the image as tiles of a specified size
tiles = self.get_tile_images(a, *tile_size).copy()
# Get the mask as tiles
tiles_mask = tiles[:, :, :, :, 3]
# Find any mask tiles with any fully transparent pixels (we will be replacing these later)
tmask_shape = tiles_mask.shape
tiles_mask = tiles_mask.reshape(math.prod(tiles_mask.shape))
n, ny = (math.prod(tmask_shape[0:2])), math.prod(tmask_shape[2:])
tiles_mask = tiles_mask > 0
tiles_mask = tiles_mask.reshape((n, ny)).all(axis=1)
# Get RGB tiles in single array and filter by the mask
tshape = tiles.shape
tiles_all = tiles.reshape((math.prod(tiles.shape[0:2]), *tiles.shape[2:]))
filtered_tiles = tiles_all[tiles_mask]
if len(filtered_tiles) == 0:
return im
# Find all invalid tiles and replace with a random valid tile
replace_count = (tiles_mask == False).sum()
rng = np.random.default_rng(seed=seed)
tiles_all[np.logical_not(tiles_mask)] = filtered_tiles[
rng.choice(filtered_tiles.shape[0], replace_count), :, :, :
]
# Convert back to an image
tiles_all = tiles_all.reshape(tshape)
tiles_all = tiles_all.swapaxes(1, 2)
st = tiles_all.reshape(
(
math.prod(tiles_all.shape[0:2]),
math.prod(tiles_all.shape[2:4]),
tiles_all.shape[4],
)
)
si = Image.fromarray(st, mode="RGBA")
return si
def mask_edge(self, mask: Image, edge_size: int, edge_blur: int) -> Image:
npimg = np.asarray(mask, dtype=np.uint8)
# Detect any partially transparent regions
npgradient = np.uint8(
255 * (1.0 - np.floor(np.abs(0.5 - np.float32(npimg) / 255.0) * 2.0))
)
# Detect hard edges
npedge = cv2.Canny(npimg, threshold1=100, threshold2=200)
# Combine
npmask = npgradient + npedge
# Expand
npmask = cv2.dilate(
npmask, np.ones((3, 3), np.uint8), iterations=int(edge_size / 2)
)
new_mask = Image.fromarray(npmask)
if edge_blur > 0:
new_mask = new_mask.filter(ImageFilter.BoxBlur(edge_blur))
return ImageOps.invert(new_mask)
def seam_paint(
self,
im: Image.Image,
seam_size: int,
seam_blur: int,
prompt,
seed,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
strength,
noise,
infill_method,
step_callback,
) -> Image.Image:
hard_mask = self.pil_image.split()[-1].copy()
mask = self.mask_edge(hard_mask, seam_size, seam_blur)
make_image = self.get_make_image(
prompt,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
init_image=im.copy().convert("RGBA"),
mask_image=mask,
strength=strength,
mask_blur_radius=0,
seam_size=0,
step_callback=step_callback,
inpaint_width=im.width,
inpaint_height=im.height,
infill_method=infill_method,
)
seam_noise = self.get_noise(im.width, im.height)
result = make_image(seam_noise, seed)
return result
@torch.no_grad()
def get_make_image(
self,
prompt,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
init_image: PIL.Image.Image | torch.FloatTensor,
mask_image: PIL.Image.Image | torch.FloatTensor,
strength: float,
mask_blur_radius: int = 8,
# Seam settings - when 0, doesn't fill seam
seam_size: int = 0,
seam_blur: int = 0,
seam_strength: float = 0.7,
seam_steps: int = 10,
tile_size: int = 32,
step_callback=None,
inpaint_replace=False,
enable_image_debugging=False,
infill_method=None,
inpaint_width=None,
inpaint_height=None,
inpaint_fill: tuple(int) = (0x7F, 0x7F, 0x7F, 0xFF),
attention_maps_callback=None,
**kwargs,
):
"""
Returns a function returning an image derived from the prompt and
the initial image + mask. Return value depends on the seed at
the time you call it. kwargs are 'init_latent' and 'strength'
"""
self.enable_image_debugging = enable_image_debugging
infill_method = infill_method or infill_methods()[0]
self.infill_method = infill_method
self.inpaint_width = inpaint_width
self.inpaint_height = inpaint_height
if isinstance(init_image, PIL.Image.Image):
self.pil_image = init_image.copy()
# Do infill
if infill_method == "patchmatch" and PatchMatch.patchmatch_available():
init_filled = self.infill_patchmatch(self.pil_image.copy())
elif infill_method == "tile":
init_filled = self.tile_fill_missing(
self.pil_image.copy(), seed=self.seed, tile_size=tile_size
)
elif infill_method == "solid":
solid_bg = PIL.Image.new("RGBA", init_image.size, inpaint_fill)
init_filled = PIL.Image.alpha_composite(solid_bg, init_image)
else:
raise ValueError(
f"Non-supported infill type {infill_method}", infill_method
)
init_filled.paste(init_image, (0, 0), init_image.split()[-1])
# Resize if requested for inpainting
if inpaint_width and inpaint_height:
init_filled = init_filled.resize((inpaint_width, inpaint_height))
debug_image(
init_filled, "init_filled", debug_status=self.enable_image_debugging
)
# Create init tensor
init_image = image_resized_to_grid_as_tensor(init_filled.convert("RGB"))
if isinstance(mask_image, PIL.Image.Image):
self.pil_mask = mask_image.copy()
debug_image(
mask_image,
"mask_image BEFORE multiply with pil_image",
debug_status=self.enable_image_debugging,
)
init_alpha = self.pil_image.getchannel("A")
if mask_image.mode != "L":
# FIXME: why do we get passed an RGB image here? We can only use single-channel.
mask_image = mask_image.convert("L")
mask_image = ImageChops.multiply(mask_image, init_alpha)
self.pil_mask = mask_image
# Resize if requested for inpainting
if inpaint_width and inpaint_height:
mask_image = mask_image.resize((inpaint_width, inpaint_height))
debug_image(
mask_image,
"mask_image AFTER multiply with pil_image",
debug_status=self.enable_image_debugging,
)
mask: torch.FloatTensor = image_resized_to_grid_as_tensor(
mask_image, normalize=False
)
else:
mask: torch.FloatTensor = mask_image
self.mask_blur_radius = mask_blur_radius
# noinspection PyTypeChecker
pipeline: StableDiffusionGeneratorPipeline = self.model
pipeline.scheduler = sampler
# todo: support cross-attention control
uc, c, _ = conditioning
conditioning_data = ConditioningData(
uc, c, cfg_scale
).add_scheduler_args_if_applicable(pipeline.scheduler, eta=ddim_eta)
def make_image(x_T: torch.Tensor, seed: int):
pipeline_output = pipeline.inpaint_from_embeddings(
init_image=init_image,
mask=1 - mask, # expects white means "paint here."
strength=strength,
num_inference_steps=steps,
conditioning_data=conditioning_data,
noise_func=self.get_noise_like,
callback=step_callback,
seed=seed,
)
if (
pipeline_output.attention_map_saver is not None
and attention_maps_callback is not None
):
attention_maps_callback(pipeline_output.attention_map_saver)
result = self.postprocess_size_and_mask(
pipeline.numpy_to_pil(pipeline_output.images)[0]
)
# Seam paint if this is our first pass (seam_size set to 0 during seam painting)
if seam_size > 0:
old_image = self.pil_image or init_image
old_mask = self.pil_mask or mask_image
result = self.seam_paint(
result,
seam_size,
seam_blur,
prompt,
seed,
sampler,
seam_steps,
cfg_scale,
ddim_eta,
conditioning,
seam_strength,
x_T,
infill_method,
step_callback,
)
# Restore original settings
self.get_make_image(
prompt,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
old_image,
old_mask,
strength,
mask_blur_radius,
seam_size,
seam_blur,
seam_strength,
seam_steps,
tile_size,
step_callback,
inpaint_replace,
enable_image_debugging,
inpaint_width=inpaint_width,
inpaint_height=inpaint_height,
infill_method=infill_method,
**kwargs,
)
return result
return make_image
def sample_to_image(self, samples) -> Image.Image:
gen_result = super().sample_to_image(samples).convert("RGB")
return self.postprocess_size_and_mask(gen_result)
def postprocess_size_and_mask(self, gen_result: Image.Image) -> Image.Image:
debug_image(gen_result, "gen_result", debug_status=self.enable_image_debugging)
# Resize if necessary
if self.inpaint_width and self.inpaint_height:
gen_result = gen_result.resize(self.pil_image.size)
if self.pil_image is None or self.pil_mask is None:
return gen_result
corrected_result = self.repaste_and_color_correct(
gen_result, self.pil_image, self.pil_mask, self.mask_blur_radius
)
debug_image(
corrected_result,
"corrected_result",
debug_status=self.enable_image_debugging,
)
return corrected_result

View File

@ -1,81 +0,0 @@
"""
invokeai.backend.generator.txt2img inherits from invokeai.backend.generator
"""
import PIL.Image
import torch
from ..stable_diffusion import (
ConditioningData,
PostprocessingSettings,
StableDiffusionGeneratorPipeline,
)
from .base import Generator
class Txt2Img(Generator):
def __init__(self, model, precision):
super().__init__(model, precision)
@torch.no_grad()
def get_make_image(
self,
prompt,
sampler,
steps,
cfg_scale,
ddim_eta,
conditioning,
width,
height,
step_callback=None,
threshold=0.0,
warmup=0.2,
perlin=0.0,
h_symmetry_time_pct=None,
v_symmetry_time_pct=None,
attention_maps_callback=None,
**kwargs,
):
"""
Returns a function returning an image derived from the prompt and the initial image
Return value depends on the seed at the time you call it
kwargs are 'width' and 'height'
"""
self.perlin = perlin
# noinspection PyTypeChecker
pipeline: StableDiffusionGeneratorPipeline = self.model
pipeline.scheduler = sampler
uc, c, extra_conditioning_info = conditioning
conditioning_data = ConditioningData(
uc,
c,
cfg_scale,
extra_conditioning_info,
postprocessing_settings=PostprocessingSettings(
threshold=threshold,
warmup=warmup,
h_symmetry_time_pct=h_symmetry_time_pct,
v_symmetry_time_pct=v_symmetry_time_pct,
),
).add_scheduler_args_if_applicable(pipeline.scheduler, eta=ddim_eta)
def make_image(x_T: torch.Tensor, _: int) -> PIL.Image.Image:
pipeline_output = pipeline.image_from_embeddings(
latents=torch.zeros_like(x_T, dtype=self.torch_dtype()),
noise=x_T,
num_inference_steps=steps,
conditioning_data=conditioning_data,
callback=step_callback,
)
if (
pipeline_output.attention_map_saver is not None
and attention_maps_callback is not None
):
attention_maps_callback(pipeline_output.attention_map_saver)
return pipeline.numpy_to_pil(pipeline_output.images)[0]
return make_image

View File

@ -1,207 +0,0 @@
"""
invokeai.backend.generator.txt2img inherits from invokeai.backend.generator
"""
import math
from typing import Callable, Optional
import torch
from diffusers.utils.logging import get_verbosity, set_verbosity, set_verbosity_error
from ..stable_diffusion import PostprocessingSettings
from .base import Generator
from ..stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ..stable_diffusion.diffusers_pipeline import ConditioningData
from ..stable_diffusion.diffusers_pipeline import trim_to_multiple_of
class Txt2Img2Img(Generator):
def __init__(self, model, precision):
super().__init__(model, precision)
self.init_latent = None # for get_noise()
def get_make_image(
self,
prompt: str,
sampler,
steps: int,
cfg_scale: float,
ddim_eta,
conditioning,
width: int,
height: int,
strength: float,
step_callback: Optional[Callable] = None,
threshold=0.0,
warmup=0.2,
perlin=0.0,
h_symmetry_time_pct=None,
v_symmetry_time_pct=None,
attention_maps_callback=None,
**kwargs,
):
"""
Returns a function returning an image derived from the prompt and the initial image
Return value depends on the seed at the time you call it
kwargs are 'width' and 'height'
"""
self.perlin = perlin
# noinspection PyTypeChecker
pipeline: StableDiffusionGeneratorPipeline = self.model
pipeline.scheduler = sampler
uc, c, extra_conditioning_info = conditioning
conditioning_data = ConditioningData(
uc,
c,
cfg_scale,
extra_conditioning_info,
postprocessing_settings=PostprocessingSettings(
threshold=threshold,
warmup=0.2,
h_symmetry_time_pct=h_symmetry_time_pct,
v_symmetry_time_pct=v_symmetry_time_pct,
),
).add_scheduler_args_if_applicable(pipeline.scheduler, eta=ddim_eta)
def make_image(x_T: torch.Tensor, _: int):
first_pass_latent_output, _ = pipeline.latents_from_embeddings(
latents=torch.zeros_like(x_T),
num_inference_steps=steps,
conditioning_data=conditioning_data,
noise=x_T,
callback=step_callback,
)
# Get our initial generation width and height directly from the latent output so
# the message below is accurate.
init_width = first_pass_latent_output.size()[3] * self.downsampling_factor
init_height = first_pass_latent_output.size()[2] * self.downsampling_factor
print(
f"\n>> Interpolating from {init_width}x{init_height} to {width}x{height} using DDIM sampling"
)
# resizing
resized_latents = torch.nn.functional.interpolate(
first_pass_latent_output,
size=(
height // self.downsampling_factor,
width // self.downsampling_factor,
),
mode="bilinear",
)
# Free up memory from the last generation.
clear_cuda_cache = kwargs["clear_cuda_cache"] or None
if clear_cuda_cache is not None:
clear_cuda_cache()
second_pass_noise = self.get_noise_like(
resized_latents, override_perlin=True
)
# Clear symmetry for the second pass
from dataclasses import replace
new_postprocessing_settings = replace(
conditioning_data.postprocessing_settings, h_symmetry_time_pct=None
)
new_postprocessing_settings = replace(
new_postprocessing_settings, v_symmetry_time_pct=None
)
new_conditioning_data = replace(
conditioning_data, postprocessing_settings=new_postprocessing_settings
)
verbosity = get_verbosity()
set_verbosity_error()
pipeline_output = pipeline.img2img_from_latents_and_embeddings(
resized_latents,
num_inference_steps=steps,
conditioning_data=new_conditioning_data,
strength=strength,
noise=second_pass_noise,
callback=step_callback,
)
set_verbosity(verbosity)
if (
pipeline_output.attention_map_saver is not None
and attention_maps_callback is not None
):
attention_maps_callback(pipeline_output.attention_map_saver)
return pipeline.numpy_to_pil(pipeline_output.images)[0]
# FIXME: do we really need something entirely different for the inpainting model?
# in the case of the inpainting model being loaded, the trick of
# providing an interpolated latent doesn't work, so we transiently
# create a 512x512 PIL image, upscale it, and run the inpainting
# over it in img2img mode. Because the inpaing model is so conservative
# it doesn't change the image (much)
return make_image
def get_noise_like(self, like: torch.Tensor, override_perlin: bool = False):
device = like.device
if device.type == "mps":
x = torch.randn_like(like, device="cpu", dtype=self.torch_dtype()).to(
device
)
else:
x = torch.randn_like(like, device=device, dtype=self.torch_dtype())
if self.perlin > 0.0 and override_perlin == False:
shape = like.shape
x = (1 - self.perlin) * x + self.perlin * self.get_perlin_noise(
shape[3], shape[2]
)
return x
# returns a tensor filled with random numbers from a normal distribution
def get_noise(self, width, height, scale=True):
# print(f"Get noise: {width}x{height}")
if scale:
# Scale the input width and height for the initial generation
# Make their area equivalent to the model's resolution area (e.g. 512*512 = 262144),
# while keeping the minimum dimension at least 0.5 * resolution (e.g. 512*0.5 = 256)
aspect = width / height
dimension = self.model.unet.config.sample_size * self.model.vae_scale_factor
min_dimension = math.floor(dimension * 0.5)
model_area = (
dimension * dimension
) # hardcoded for now since all models are trained on square images
if aspect > 1.0:
init_height = max(min_dimension, math.sqrt(model_area / aspect))
init_width = init_height * aspect
else:
init_width = max(min_dimension, math.sqrt(model_area * aspect))
init_height = init_width / aspect
scaled_width, scaled_height = trim_to_multiple_of(
math.floor(init_width), math.floor(init_height)
)
else:
scaled_width = width
scaled_height = height
device = self.model.device
channels = self.latent_channels
if channels == 9:
channels = 4 # we don't really want noise for all the mask channels
shape = (
1,
channels,
scaled_height // self.downsampling_factor,
scaled_width // self.downsampling_factor,
)
if self.use_mps_noise or device.type == "mps":
tensor = torch.empty(size=shape, device="cpu")
tensor = self.get_noise_like(like=tensor).to(device)
else:
tensor = torch.empty(size=shape, device=device)
tensor = self.get_noise_like(like=tensor)
return tensor

View File

@ -1,122 +0,0 @@
"""
invokeai.backend.globals defines a small number of global variables that would
otherwise have to be passed through long and complex call chains.
It defines a Namespace object named "Globals" that contains
the attributes:
- root - the root directory under which "models" and "outputs" can be found
- initfile - path to the initialization file
- try_patchmatch - option to globally disable loading of 'patchmatch' module
- always_use_cpu - force use of CPU even if GPU is available
"""
import os
import os.path as osp
from argparse import Namespace
from pathlib import Path
from typing import Union
Globals = Namespace()
# Where to look for the initialization file and other key components
Globals.initfile = "invokeai.init"
Globals.models_file = "models.yaml"
Globals.models_dir = "models"
Globals.config_dir = "configs"
Globals.autoscan_dir = "weights"
Globals.converted_ckpts_dir = "converted_ckpts"
# Set the default root directory. This can be overwritten by explicitly
# passing the `--root <directory>` argument on the command line.
# logic is:
# 1) use INVOKEAI_ROOT environment variable (no check for this being a valid directory)
# 2) use VIRTUAL_ENV environment variable, with a check for initfile being there
# 3) use ~/invokeai
if os.environ.get("INVOKEAI_ROOT"):
Globals.root = osp.abspath(os.environ.get("INVOKEAI_ROOT"))
elif (
os.environ.get("VIRTUAL_ENV")
and Path(os.environ.get("VIRTUAL_ENV"), "..", Globals.initfile).exists()
):
Globals.root = osp.abspath(osp.join(os.environ.get("VIRTUAL_ENV"), ".."))
else:
Globals.root = osp.abspath(osp.expanduser("~/invokeai"))
# Try loading patchmatch
Globals.try_patchmatch = True
# Use CPU even if GPU is available (main use case is for debugging MPS issues)
Globals.always_use_cpu = False
# Whether the internet is reachable for dynamic downloads
# The CLI will test connectivity at startup time.
Globals.internet_available = True
# Whether to disable xformers
Globals.disable_xformers = False
# Low-memory tradeoff for guidance calculations.
Globals.sequential_guidance = False
# whether we are forcing full precision
Globals.full_precision = False
# whether we should convert ckpt files into diffusers models on the fly
Globals.ckpt_convert = True
# logging tokenization everywhere
Globals.log_tokenization = False
def global_config_file() -> Path:
return Path(Globals.root, Globals.config_dir, Globals.models_file)
def global_config_dir() -> Path:
return Path(Globals.root, Globals.config_dir)
def global_models_dir() -> Path:
return Path(Globals.root, Globals.models_dir)
def global_autoscan_dir() -> Path:
return Path(Globals.root, Globals.autoscan_dir)
def global_converted_ckpts_dir() -> Path:
return Path(global_models_dir(), Globals.converted_ckpts_dir)
def global_set_root(root_dir: Union[str, Path]):
Globals.root = root_dir
def global_cache_dir(subdir: Union[str, Path] = "") -> Path:
"""
Returns Path to the model cache directory. If a subdirectory
is provided, it will be appended to the end of the path, allowing
for Hugging Face-style conventions. Currently, Hugging Face has
moved all models into the "hub" subfolder, so for any pretrained
HF model, use:
global_cache_dir('hub')
The legacy location for transformers used to be global_cache_dir('transformers')
and global_cache_dir('diffusers') for diffusers.
"""
home: str = os.getenv("HF_HOME")
if home is None:
home = os.getenv("XDG_CACHE_HOME")
if home is not None:
# Set `home` to $XDG_CACHE_HOME/huggingface, which is the default location mentioned in Hugging Face Hub Client Library.
# See: https://huggingface.co/docs/huggingface_hub/main/en/package_reference/environment_variables#xdgcachehome
home += os.sep + "huggingface"
if home is not None:
return Path(home, subdir)
else:
return Path(Globals.root, "models", subdir)

View File

@ -1,24 +0,0 @@
"""
Initialization file for invokeai.backend.image_util methods.
"""
from .patchmatch import PatchMatch
from .pngwriter import PngWriter, PromptFormatter, retrieve_metadata, write_metadata
from .seamless import configure_model_padding
from .txt2mask import Txt2Mask
from .util import InitImageResizer, make_grid
def debug_image(
debug_image, debug_text, debug_show=True, debug_result=False, debug_status=False
):
if not debug_status:
return
image_copy = debug_image.copy().convert("RGBA")
ImageDraw.Draw(image_copy).text((5, 5), debug_text, (255, 0, 0))
if debug_show:
image_copy.show()
if debug_result:
return image_copy

View File

@ -1,47 +0,0 @@
"""
This module defines a singleton object, "patchmatch" that
wraps the actual patchmatch object. It respects the global
"try_patchmatch" attribute, so that patchmatch loading can
be suppressed or deferred
"""
import numpy as np
from invokeai.backend.globals import Globals
class PatchMatch:
"""
Thin class wrapper around the patchmatch function.
"""
patch_match = None
tried_load: bool = False
def __init__(self):
super().__init__()
@classmethod
def _load_patch_match(self):
if self.tried_load:
return
if Globals.try_patchmatch:
from patchmatch import patch_match as pm
if pm.patchmatch_available:
print(">> Patchmatch initialized")
else:
print(">> Patchmatch not loaded (nonfatal)")
self.patch_match = pm
else:
print(">> Patchmatch loading disabled")
self.tried_load = True
@classmethod
def patchmatch_available(self) -> bool:
self._load_patch_match()
return self.patch_match and self.patch_match.patchmatch_available
@classmethod
def inpaint(self, *args, **kwargs) -> np.ndarray:
if self.patchmatch_available():
return self.patch_match.inpaint(*args, **kwargs)

View File

@ -1,121 +0,0 @@
"""
Two helper classes for dealing with PNG images and their path names.
PngWriter -- Converts Images generated by T2I into PNGs, finds
appropriate names for them, and writes prompt metadata
into the PNG.
Exports function retrieve_metadata(path)
"""
import json
import os
import re
from PIL import Image, PngImagePlugin
# -------------------image generation utils-----
class PngWriter:
def __init__(self, outdir):
self.outdir = outdir
os.makedirs(outdir, exist_ok=True)
# gives the next unique prefix in outdir
def unique_prefix(self):
# sort reverse alphabetically until we find max+1
dirlist = sorted(os.listdir(self.outdir), reverse=True)
# find the first filename that matches our pattern or return 000000.0.png
existing_name = next(
(f for f in dirlist if re.match("^(\d+)\..*\.png", f)),
"0000000.0.png",
)
basecount = int(existing_name.split(".", 1)[0]) + 1
return f"{basecount:06}"
# saves image named _image_ to outdir/name, writing metadata from prompt
# returns full path of output
def save_image_and_prompt_to_png(
self, image, dream_prompt, name, metadata=None, compress_level=6
):
path = os.path.join(self.outdir, name)
info = PngImagePlugin.PngInfo()
info.add_text("Dream", dream_prompt)
if metadata:
info.add_text("sd-metadata", json.dumps(metadata))
image.save(path, "PNG", pnginfo=info, compress_level=compress_level)
return path
def retrieve_metadata(self, img_basename):
"""
Given a PNG filename stored in outdir, returns the "sd-metadata"
metadata stored there, as a dict
"""
path = os.path.join(self.outdir, img_basename)
all_metadata = retrieve_metadata(path)
return all_metadata["sd-metadata"]
def retrieve_metadata(img_path):
"""
Given a path to a PNG image, returns the "sd-metadata"
metadata stored there, as a dict
"""
im = Image.open(img_path)
if hasattr(im, "text"):
md = im.text.get("sd-metadata", "{}")
dream_prompt = im.text.get("Dream", "")
else:
# When trying to retrieve metadata from images without a 'text' payload, such as JPG images.
md = "{}"
dream_prompt = ""
return {"sd-metadata": json.loads(md), "Dream": dream_prompt}
def write_metadata(img_path: str, meta: dict):
im = Image.open(img_path)
info = PngImagePlugin.PngInfo()
info.add_text("sd-metadata", json.dumps(meta))
im.save(img_path, "PNG", pnginfo=info)
class PromptFormatter:
def __init__(self, t2i, opt):
self.t2i = t2i
self.opt = opt
# note: the t2i object should provide all these values.
# there should be no need to or against opt values
def normalize_prompt(self):
"""Normalize the prompt and switches"""
t2i = self.t2i
opt = self.opt
switches = list()
switches.append(f'"{opt.prompt}"')
switches.append(f"-s{opt.steps or t2i.steps}")
switches.append(f"-W{opt.width or t2i.width}")
switches.append(f"-H{opt.height or t2i.height}")
switches.append(f"-C{opt.cfg_scale or t2i.cfg_scale}")
switches.append(f"-A{opt.sampler_name or t2i.sampler_name}")
# to do: put model name into the t2i object
# switches.append(f'--model{t2i.model_name}')
if opt.seamless or t2i.seamless:
switches.append(f"--seamless")
if opt.init_img:
switches.append(f"-I{opt.init_img}")
if opt.fit:
switches.append(f"--fit")
if opt.strength and opt.init_img is not None:
switches.append(f"-f{opt.strength or t2i.strength}")
if opt.gfpgan_strength:
switches.append(f"-G{opt.gfpgan_strength}")
if opt.upscale:
switches.append(f'-U {" ".join([str(u) for u in opt.upscale])}')
if opt.variation_amount > 0:
switches.append(f"-v{opt.variation_amount}")
if opt.with_variations:
formatted_variations = ",".join(
f"{seed}:{weight}" for seed, weight in opt.with_variations
)
switches.append(f"-V{formatted_variations}")
return " ".join(switches)

View File

@ -1,59 +0,0 @@
import torch.nn as nn
def _conv_forward_asymmetric(self, input, weight, bias):
"""
Patch for Conv2d._conv_forward that supports asymmetric padding
"""
working = nn.functional.pad(
input, self.asymmetric_padding["x"], mode=self.asymmetric_padding_mode["x"]
)
working = nn.functional.pad(
working, self.asymmetric_padding["y"], mode=self.asymmetric_padding_mode["y"]
)
return nn.functional.conv2d(
working,
weight,
bias,
self.stride,
nn.modules.utils._pair(0),
self.dilation,
self.groups,
)
def configure_model_padding(model, seamless, seamless_axes):
"""
Modifies the 2D convolution layers to use a circular padding mode based on the `seamless` and `seamless_axes` options.
"""
# TODO: get an explicit interface for this in diffusers: https://github.com/huggingface/diffusers/issues/556
for m in model.modules():
if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d)):
if seamless:
m.asymmetric_padding_mode = {}
m.asymmetric_padding = {}
m.asymmetric_padding_mode["x"] = (
"circular" if ("x" in seamless_axes) else "constant"
)
m.asymmetric_padding["x"] = (
m._reversed_padding_repeated_twice[0],
m._reversed_padding_repeated_twice[1],
0,
0,
)
m.asymmetric_padding_mode["y"] = (
"circular" if ("y" in seamless_axes) else "constant"
)
m.asymmetric_padding["y"] = (
0,
0,
m._reversed_padding_repeated_twice[2],
m._reversed_padding_repeated_twice[3],
)
m._conv_forward = _conv_forward_asymmetric.__get__(m, nn.Conv2d)
else:
m._conv_forward = nn.Conv2d._conv_forward.__get__(m, nn.Conv2d)
if hasattr(m, "asymmetric_padding_mode"):
del m.asymmetric_padding_mode
if hasattr(m, "asymmetric_padding"):
del m.asymmetric_padding

View File

@ -1,142 +0,0 @@
"""Makes available the Txt2Mask class, which assists in the automatic
assignment of masks via text prompt using clipseg.
Here is typical usage:
from invokeai.backend.image_util.txt2mask import Txt2Mask, SegmentedGrayscale
from PIL import Image
txt2mask = Txt2Mask(self.device)
segmented = txt2mask.segment(Image.open('/path/to/img.png'),'a bagel')
# this will return a grayscale Image of the segmented data
grayscale = segmented.to_grayscale()
# this will return a semi-transparent image in which the
# selected object(s) are opaque and the rest is at various
# levels of transparency
transparent = segmented.to_transparent()
# this will return a masked image suitable for use in inpainting:
mask = segmented.to_mask(threshold=0.5)
The threshold used in the call to to_mask() selects pixels for use in
the mask that exceed the indicated confidence threshold. Values range
from 0.0 to 1.0. The higher the threshold, the more confident the
algorithm is. In limited testing, I have found that values around 0.5
work fine.
"""
import numpy as np
import torch
from PIL import Image, ImageOps
from torchvision import transforms
from transformers import AutoProcessor, CLIPSegForImageSegmentation
from invokeai.backend.globals import global_cache_dir
CLIPSEG_MODEL = "CIDAS/clipseg-rd64-refined"
CLIPSEG_SIZE = 352
class SegmentedGrayscale(object):
def __init__(self, image: Image, heatmap: torch.Tensor):
self.heatmap = heatmap
self.image = image
def to_grayscale(self, invert: bool = False) -> Image:
return self._rescale(
Image.fromarray(
np.uint8(255 - self.heatmap * 255 if invert else self.heatmap * 255)
)
)
def to_mask(self, threshold: float = 0.5) -> Image:
discrete_heatmap = self.heatmap.lt(threshold).int()
return self._rescale(
Image.fromarray(np.uint8(discrete_heatmap * 255), mode="L")
)
def to_transparent(self, invert: bool = False) -> Image:
transparent_image = self.image.copy()
# For img2img, we want the selected regions to be transparent,
# but to_grayscale() returns the opposite. Thus invert.
gs = self.to_grayscale(not invert)
transparent_image.putalpha(gs)
return transparent_image
# unscales and uncrops the 352x352 heatmap so that it matches the image again
def _rescale(self, heatmap: Image) -> Image:
size = (
self.image.width
if (self.image.width > self.image.height)
else self.image.height
)
resized_image = heatmap.resize((size, size), resample=Image.Resampling.LANCZOS)
return resized_image.crop((0, 0, self.image.width, self.image.height))
class Txt2Mask(object):
"""
Create new Txt2Mask object. The optional device argument can be one of
'cuda', 'mps' or 'cpu'.
"""
def __init__(self, device="cpu", refined=False):
print(">> Initializing clipseg model for text to mask inference")
# BUG: we are not doing anything with the device option at this time
self.device = device
self.processor = AutoProcessor.from_pretrained(
CLIPSEG_MODEL, cache_dir=global_cache_dir("hub")
)
self.model = CLIPSegForImageSegmentation.from_pretrained(
CLIPSEG_MODEL, cache_dir=global_cache_dir("hub")
)
@torch.no_grad()
def segment(self, image, prompt: str) -> SegmentedGrayscale:
"""
Given a prompt string such as "a bagel", tries to identify the object in the
provided image and returns a SegmentedGrayscale object in which the brighter
pixels indicate where the object is inferred to be.
"""
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
),
transforms.Resize(
(CLIPSEG_SIZE, CLIPSEG_SIZE)
), # must be multiple of 64...
]
)
if type(image) is str:
image = Image.open(image).convert("RGB")
image = ImageOps.exif_transpose(image)
img = self._scale_and_crop(image)
inputs = self.processor(
text=[prompt], images=[img], padding=True, return_tensors="pt"
)
outputs = self.model(**inputs)
heatmap = torch.sigmoid(outputs.logits)
return SegmentedGrayscale(image, heatmap)
def _scale_and_crop(self, image: Image) -> Image:
scaled_image = Image.new("RGB", (CLIPSEG_SIZE, CLIPSEG_SIZE))
if image.width > image.height: # width is constraint
scale = CLIPSEG_SIZE / image.width
else:
scale = CLIPSEG_SIZE / image.height
scaled_image.paste(
image.resize(
(int(scale * image.width), int(scale * image.height)),
resample=Image.Resampling.LANCZOS,
),
box=(0, 0),
)
return scaled_image

View File

@ -1,71 +0,0 @@
from math import ceil, floor, sqrt
from PIL import Image
class InitImageResizer:
"""Simple class to create resized copies of an Image while preserving the aspect ratio."""
def __init__(self, Image):
self.image = Image
def resize(self, width=None, height=None) -> Image:
"""
Return a copy of the image resized to fit within
a box width x height. The aspect ratio is
maintained. If neither width nor height are provided,
then returns a copy of the original image. If one or the other is
provided, then the other will be calculated from the
aspect ratio.
Everything is floored to the nearest multiple of 64 so
that it can be passed to img2img()
"""
im = self.image
ar = im.width / float(im.height)
# Infer missing values from aspect ratio
if not (width or height): # both missing
width = im.width
height = im.height
elif not height: # height missing
height = int(width / ar)
elif not width: # width missing
width = int(height * ar)
w_scale = width / im.width
h_scale = height / im.height
scale = min(w_scale, h_scale)
(rw, rh) = (int(scale * im.width), int(scale * im.height))
# round everything to multiples of 64
width, height, rw, rh = map(lambda x: x - x % 64, (width, height, rw, rh))
# no resize necessary, but return a copy
if im.width == width and im.height == height:
return im.copy()
# otherwise resize the original image so that it fits inside the bounding box
resized_image = self.image.resize((rw, rh), resample=Image.Resampling.LANCZOS)
return resized_image
def make_grid(image_list, rows=None, cols=None):
image_cnt = len(image_list)
if None in (rows, cols):
rows = floor(sqrt(image_cnt)) # try to make it square
cols = ceil(image_cnt / rows)
width = image_list[0].width
height = image_list[0].height
grid_img = Image.new("RGB", (width * cols, height * rows))
i = 0
for r in range(0, rows):
for c in range(0, cols):
if i >= len(image_list):
break
grid_img.paste(image_list[i], (c * width, r * height))
i = i + 1
return grid_img

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More