feat: revert SQLiteMigrator class

Will pursue this in a separate PR.
This commit is contained in:
psychedelicious
2023-11-29 13:29:19 +11:00
parent 0b079df4ae
commit 1987bc9cc5
16 changed files with 264 additions and 569 deletions

View File

@ -124,7 +124,6 @@ class ApiDependencies:
create_system_graphs(services.graph_library)
db.run_migrations()
db.clean()
ApiDependencies.invoker = Invoker(services)

View File

@ -3,11 +3,8 @@ from datetime import datetime
from typing import Optional, Union, cast
from invokeai.app.invocations.baseinvocation import MetadataField, MetadataFieldValidator
from invokeai.app.services.image_records.migrations import v0, v1, v2
from invokeai.app.services.invoker import Invoker
from invokeai.app.services.shared.pagination import OffsetPaginatedResults
from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
from invokeai.app.services.shared.sqlite.sqlite_migrator import Migration, MigrationSet
from .image_records_base import ImageRecordStorageBase
from .image_records_common import (
@ -22,27 +19,107 @@ from .image_records_common import (
deserialize_image_record,
)
images_migrations = MigrationSet(
table_name="images",
migrations=[
Migration(version=0, migrate=v0),
Migration(version=1, migrate=v1),
Migration(version=2, migrate=v2),
],
)
class SqliteImageRecordStorage(ImageRecordStorageBase):
def __init__(self, db: SqliteDatabase) -> None:
super().__init__()
self._db = db
self._lock = db.lock
self._conn = db.conn
self._cursor = self._conn.cursor()
self._db.register_migration_set(images_migrations)
self._create_tables()
def start(self, invoker: Invoker) -> None:
self._invoker = invoker
def _create_tables(self) -> None:
"""Creates the `images` table."""
with self._lock:
try:
self._cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS images (
image_name TEXT NOT NULL PRIMARY KEY,
-- This is an enum in python, unrestricted string here for flexibility
image_origin TEXT NOT NULL,
-- This is an enum in python, unrestricted string here for flexibility
image_category TEXT NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
session_id TEXT,
node_id TEXT,
metadata TEXT,
is_intermediate BOOLEAN DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
-- Updated via trigger
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
-- Soft delete, currently unused
deleted_at DATETIME
);
"""
)
self._cursor.execute("PRAGMA table_info(images)")
columns = [column[1] for column in self._cursor.fetchall()]
if "starred" not in columns:
self._cursor.execute(
"""--sql
ALTER TABLE images ADD COLUMN starred BOOLEAN DEFAULT FALSE;
"""
)
# Create the `images` table indices.
self._cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_images_image_name ON images(image_name);
"""
)
self._cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_image_origin ON images(image_origin);
"""
)
self._cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_image_category ON images(image_category);
"""
)
self._cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_created_at ON images(created_at);
"""
)
self._cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_starred ON images(starred);
"""
)
# Add trigger for `updated_at`.
self._cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_images_updated_at
AFTER UPDATE
ON images FOR EACH ROW
BEGIN
UPDATE images SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE image_name = old.image_name;
END;
"""
)
self._cursor.execute("PRAGMA table_info(images)")
columns = [column[1] for column in self._cursor.fetchall()]
if "has_workflow" not in columns:
self._cursor.execute(
"""--sql
ALTER TABLE images
ADD COLUMN has_workflow BOOLEAN DEFAULT FALSE;
"""
)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
def get(self, image_name: str) -> ImageRecord:
try:

View File

@ -1,5 +0,0 @@
from .v0 import v0
from .v1 import v1
from .v2 import v2
__all__ = [v0, v1, v2] # type: ignore

View File

@ -1,64 +0,0 @@
import sqlite3
def v0(cursor: sqlite3.Cursor) -> None:
"""
Migration for `images` table v0
https://github.com/invoke-ai/InvokeAI/pull/3443
Adds the `images` table, indicies and triggers for the image_records service.
"""
cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS images (
image_name TEXT NOT NULL PRIMARY KEY,
-- This is an enum in python, unrestricted string here for flexibility
image_origin TEXT NOT NULL,
-- This is an enum in python, unrestricted string here for flexibility
image_category TEXT NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
session_id TEXT,
node_id TEXT,
metadata TEXT,
is_intermediate BOOLEAN DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
-- Updated via trigger
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
-- Soft delete, currently unused
deleted_at DATETIME
);
"""
)
cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_images_image_name ON images(image_name);
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_image_origin ON images(image_origin);
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_image_category ON images(image_category);
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_created_at ON images(created_at);
"""
)
cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_images_updated_at
AFTER
UPDATE ON images FOR EACH ROW BEGIN
UPDATE images
SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE image_name = old.image_name;
END;
"""
)

View File

@ -1,25 +0,0 @@
import sqlite3
def v1(cursor: sqlite3.Cursor) -> None:
"""
Migration for `images` table v1
https://github.com/invoke-ai/InvokeAI/pull/4246
Adds the `starred` column to the `images` table.
"""
cursor.execute("PRAGMA table_info(images)")
columns = [column[1] for column in cursor.fetchall()]
if "starred" not in columns:
cursor.execute(
"""--sql
ALTER TABLE images
ADD COLUMN starred BOOLEAN DEFAULT FALSE;
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_images_starred ON images(starred);
"""
)

View File

@ -1,24 +0,0 @@
import sqlite3
def v2(cursor: sqlite3.Cursor) -> None:
"""
Migration for `images` table v2
https://github.com/invoke-ai/InvokeAI/pull/5148
Adds the `has_workflow` column to the `images` table.
Workflows associated with images are now only stored in the image file itself. This column
indicates whether the image has a workflow embedded in it, so we don't need to read the image
file to find out.
"""
cursor.execute("PRAGMA table_info(images)")
columns = [column[1] for column in cursor.fetchall()]
if "has_workflow" not in columns:
cursor.execute(
"""--sql
ALTER TABLE images
ADD COLUMN has_workflow BOOLEAN DEFAULT FALSE;
"""
)

View File

@ -1,4 +0,0 @@
from .v0 import v0
from .v1 import v1
__all__ = [v0, v1] # type: ignore

View File

@ -1,106 +0,0 @@
import sqlite3
def v0(cursor: sqlite3.Cursor) -> None:
"""
Migration for `session_queue` table v0
https://github.com/invoke-ai/InvokeAI/pull/4502
Creates the `session_queue` table, indicies and triggers for the session_queue service.
"""
cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS session_queue (
item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- used for ordering, cursor pagination
batch_id TEXT NOT NULL, -- identifier of the batch this queue item belongs to
queue_id TEXT NOT NULL, -- identifier of the queue this queue item belongs to
session_id TEXT NOT NULL UNIQUE, -- duplicated data from the session column, for ease of access
field_values TEXT, -- NULL if no values are associated with this queue item
session TEXT NOT NULL, -- the session to be executed
status TEXT NOT NULL DEFAULT 'pending', -- the status of the queue item, one of 'pending', 'in_progress', 'completed', 'failed', 'canceled'
priority INTEGER NOT NULL DEFAULT 0, -- the priority, higher is more important
error TEXT, -- any errors associated with this queue item
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), -- updated via trigger
started_at DATETIME, -- updated via trigger
completed_at DATETIME -- updated via trigger, completed items are cleaned up on application startup
-- Ideally this is a FK, but graph_executions uses INSERT OR REPLACE, and REPLACE triggers the ON DELETE CASCADE...
-- FOREIGN KEY (session_id) REFERENCES graph_executions (id) ON DELETE CASCADE
);
"""
)
cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_item_id ON session_queue(item_id);
"""
)
cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_session_id ON session_queue(session_id);
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_batch_id ON session_queue(batch_id);
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_created_priority ON session_queue(priority);
"""
)
cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_created_status ON session_queue(status);
"""
)
cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_completed_at
AFTER UPDATE OF status ON session_queue
FOR EACH ROW
WHEN
NEW.status = 'completed'
OR NEW.status = 'failed'
OR NEW.status = 'canceled'
BEGIN
UPDATE session_queue
SET completed_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = NEW.item_id;
END;
"""
)
cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_started_at
AFTER UPDATE OF status ON session_queue
FOR EACH ROW
WHEN
NEW.status = 'in_progress'
BEGIN
UPDATE session_queue
SET started_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = NEW.item_id;
END;
"""
)
cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_updated_at
AFTER UPDATE
ON session_queue FOR EACH ROW
BEGIN
UPDATE session_queue
SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = old.item_id;
END;
"""
)

View File

@ -1,22 +0,0 @@
import sqlite3
def v1(cursor: sqlite3.Cursor) -> None:
"""
Migration for `session_queue` table v1
https://github.com/invoke-ai/InvokeAI/pull/5148
Adds the `workflow` column to the `session_queue` table.
Workflows have been (correctly) made a property of a queue item, rather than individual nodes.
This requires they be included in the session queue.
"""
cursor.execute("PRAGMA table_info(session_queue)")
columns = [column[1] for column in cursor.fetchall()]
if "workflow" not in columns:
cursor.execute(
"""--sql
ALTER TABLE session_queue ADD COLUMN workflow TEXT;
"""
)

View File

@ -6,7 +6,6 @@ from fastapi_events.typing import Event as FastAPIEvent
from invokeai.app.services.events.events_base import EventServiceBase
from invokeai.app.services.invoker import Invoker
from invokeai.app.services.session_queue.migrations import v0, v1
from invokeai.app.services.session_queue.session_queue_base import SessionQueueBase
from invokeai.app.services.session_queue.session_queue_common import (
DEFAULT_QUEUE_ID,
@ -29,25 +28,15 @@ from invokeai.app.services.session_queue.session_queue_common import (
)
from invokeai.app.services.shared.pagination import CursorPaginatedResults
from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
from invokeai.app.services.shared.sqlite.sqlite_migrator import Migration, MigrationSet
session_queue_migrations = MigrationSet(
table_name="session_queue",
migrations=[
Migration(version=0, migrate=v0),
Migration(version=1, migrate=v1),
],
)
class SqliteSessionQueue(SessionQueueBase):
def __init__(self, db: SqliteDatabase) -> None:
super().__init__()
self.__db = db
self.__lock = db.lock
self.__conn = db.conn
self.__cursor = self.__conn.cursor()
self.__db.register_migration_set(session_queue_migrations)
self._create_tables()
def start(self, invoker: Invoker) -> None:
self.__invoker = invoker
@ -102,6 +91,123 @@ class SqliteSessionQueue(SessionQueueBase):
except SessionQueueItemNotFoundError:
return
def _create_tables(self) -> None:
"""Creates the session queue tables, indicies, and triggers"""
try:
self.__lock.acquire()
self.__cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS session_queue (
item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- used for ordering, cursor pagination
batch_id TEXT NOT NULL, -- identifier of the batch this queue item belongs to
queue_id TEXT NOT NULL, -- identifier of the queue this queue item belongs to
session_id TEXT NOT NULL UNIQUE, -- duplicated data from the session column, for ease of access
field_values TEXT, -- NULL if no values are associated with this queue item
session TEXT NOT NULL, -- the session to be executed
status TEXT NOT NULL DEFAULT 'pending', -- the status of the queue item, one of 'pending', 'in_progress', 'completed', 'failed', 'canceled'
priority INTEGER NOT NULL DEFAULT 0, -- the priority, higher is more important
error TEXT, -- any errors associated with this queue item
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), -- updated via trigger
started_at DATETIME, -- updated via trigger
completed_at DATETIME -- updated via trigger, completed items are cleaned up on application startup
-- Ideally this is a FK, but graph_executions uses INSERT OR REPLACE, and REPLACE triggers the ON DELETE CASCADE...
-- FOREIGN KEY (session_id) REFERENCES graph_executions (id) ON DELETE CASCADE
);
"""
)
self.__cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_item_id ON session_queue(item_id);
"""
)
self.__cursor.execute(
"""--sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_session_id ON session_queue(session_id);
"""
)
self.__cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_batch_id ON session_queue(batch_id);
"""
)
self.__cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_created_priority ON session_queue(priority);
"""
)
self.__cursor.execute(
"""--sql
CREATE INDEX IF NOT EXISTS idx_session_queue_created_status ON session_queue(status);
"""
)
self.__cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_completed_at
AFTER UPDATE OF status ON session_queue
FOR EACH ROW
WHEN
NEW.status = 'completed'
OR NEW.status = 'failed'
OR NEW.status = 'canceled'
BEGIN
UPDATE session_queue
SET completed_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = NEW.item_id;
END;
"""
)
self.__cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_started_at
AFTER UPDATE OF status ON session_queue
FOR EACH ROW
WHEN
NEW.status = 'in_progress'
BEGIN
UPDATE session_queue
SET started_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = NEW.item_id;
END;
"""
)
self.__cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_session_queue_updated_at
AFTER UPDATE
ON session_queue FOR EACH ROW
BEGIN
UPDATE session_queue
SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE item_id = old.item_id;
END;
"""
)
self.__cursor.execute("PRAGMA table_info(session_queue)")
columns = [column[1] for column in self.__cursor.fetchall()]
if "workflow" not in columns:
self.__cursor.execute(
"""--sql
ALTER TABLE session_queue ADD COLUMN workflow TEXT;
"""
)
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
finally:
self.__lock.release()
def _set_in_progress_to_canceled(self) -> None:
"""
Sets all in_progress queue items to canceled. Run on app startup, not associated with any queue.

View File

@ -4,7 +4,6 @@ from logging import Logger
from invokeai.app.services.config import InvokeAIAppConfig
from invokeai.app.services.shared.sqlite.sqlite_common import sqlite_memory
from invokeai.app.services.shared.sqlite.sqlite_migrator import MigrationSet, SQLiteMigrator
class SqliteDatabase:
@ -29,7 +28,6 @@ class SqliteDatabase:
self.conn.set_trace_callback(self._logger.debug)
self.conn.execute("PRAGMA foreign_keys = ON;")
self._migrator = SQLiteMigrator(db_path=location, lock=self.lock, logger=self._logger)
def clean(self) -> None:
try:
@ -42,9 +40,3 @@ class SqliteDatabase:
raise
finally:
self.lock.release()
def register_migration_set(self, migration_set: MigrationSet) -> None:
self._migrator.register_migration_set(migration_set)
def run_migrations(self) -> None:
self._migrator.run_migrations()

View File

@ -1,192 +0,0 @@
import datetime
import shutil
import sqlite3
import threading
from logging import Logger
from pathlib import Path
from typing import Callable, Optional, TypeAlias
from .sqlite_common import sqlite_memory
MigrateCallback: TypeAlias = Callable[[sqlite3.Cursor], None]
class MigrationError(Exception):
pass
class Migration:
def __init__(
self,
version: int,
migrate: MigrateCallback,
) -> None:
self.version = version
self.migrate = migrate
class MigrationSet:
def __init__(self, table_name: str, migrations: list[Migration]) -> None:
self.table_name = table_name
self.migrations = migrations
class SQLiteMigrator:
"""
Handles SQLite database migrations.
Migrations are registered with the `register_migration_set` method. They are applied on
application startup with the `run_migrations` method.
A `MigrationSet` is a set of `Migration`s for a single table. Each `Migration` has a `version`
and `migrate` callback. The callback is provided with a `sqlite3.Cursor` and should perform the
any migration logic. Committing, rolling back transactions and errors are handled by the migrator.
Migrations are applied in order of version number. If the database does not have a version table
for a given table, it is assumed to be at version 0. The migrator creates and manages the version
tables.
If the database is a file, it will be backed up before migrations are applied and restored if
there are any errors.
"""
def __init__(self, db_path: Path | str, lock: threading.RLock, logger: Logger):
self._logger = logger
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._cursor = self._conn.cursor()
self._lock = lock
self._db_path = db_path
self._migration_sets: set[MigrationSet] = set()
def _get_version_table_name(self, table_name: str) -> str:
"""Returns the name of the version table for a given table."""
return f"{table_name}_version"
def _create_version_table(self, table_name: str) -> None:
"""
Creates a version table for a given table, if it does not exist.
Throws MigrationError if there is a problem.
"""
version_table_name = self._get_version_table_name(table_name)
with self._lock:
try:
self._cursor.execute(
f"""--sql
CREATE TABLE IF NOT EXISTS {version_table_name} (
version INTEGER PRIMARY KEY,
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))
);
"""
)
self._conn.commit()
except sqlite3.Error as e:
msg = f'Problem creation "{version_table_name}" table: {e}'
self._logger.error(msg)
self._conn.rollback()
raise MigrationError(msg) from e
def _get_current_version(self, table_name: str) -> Optional[int]:
"""Gets the current version of a table, or None if it doesn't exist."""
version_table_name = self._get_version_table_name(table_name)
try:
self._cursor.execute(f"SELECT MAX(version) FROM {version_table_name};")
return self._cursor.fetchone()[0]
except sqlite3.OperationalError as e:
if "no such table" in str(e):
return None
raise
def _set_version(self, table_name: str, version: int) -> None:
"""Adds a version entry to the table's version table."""
version_table_name = self._get_version_table_name(table_name)
self._cursor.execute(f"INSERT INTO {version_table_name} (version) VALUES (?);", (version,))
def _run_migration(self, table_name: str, migration: Migration) -> None:
"""Runs a single migration."""
with self._lock:
try:
migration.migrate(self._cursor)
self._set_version(table_name=table_name, version=migration.version)
self._conn.commit()
except sqlite3.Error:
self._conn.rollback()
raise
def _run_migration_set(self, migration_set: MigrationSet) -> None:
"""Runs a set of migrations for a single table."""
with self._lock:
table_name = migration_set.table_name
migrations = migration_set.migrations
self._create_version_table(table_name=table_name)
for migration in migrations:
current_version = self._get_current_version(table_name)
if current_version is None or current_version < migration.version:
try:
self._logger.info(f'runing "{table_name}" migration {migration.version}')
self._run_migration(table_name=table_name, migration=migration)
except sqlite3.Error as e:
raise MigrationError(f'Problem runing "{table_name}" migration {migration.version}: {e}') from e
def _backup_db(self, db_path: Path) -> Path:
"""Backs up the databse, returning the path to the backup file."""
with self._lock:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = db_path.parent / f"{db_path.stem}_{timestamp}.db"
self._logger.info(f"Backing up database to {backup_path}")
backup_conn = sqlite3.connect(backup_path)
with backup_conn:
self._conn.backup(backup_conn)
backup_conn.close()
return backup_path
def _restore_db(self, backup_path: Path) -> None:
"""Restores the database from a backup file, unless the database is a memory database."""
if self._db_path == sqlite_memory:
return
with self._lock:
self._logger.info(f"Restoring database from {backup_path}")
self._conn.close()
if not Path(backup_path).is_file():
raise FileNotFoundError(f"Backup file {backup_path} does not exist")
shutil.copy2(backup_path, self._db_path)
def _get_is_migration_needed(self, migration_set: MigrationSet) -> bool:
table_name = migration_set.table_name
migrations = migration_set.migrations
current_version = self._get_current_version(table_name)
if current_version is None or current_version < migrations[-1].version:
return True
return False
def run_migrations(self) -> None:
"""
Applies all registered migration sets.
If the database is a file, it will be backed up before migrations are applied and restored
if there are any errors.
"""
if not any(self._get_is_migration_needed(migration_set) for migration_set in self._migration_sets):
return
backup_path: Optional[Path] = None
with self._lock:
# Only make a backup if using a file database (not memory)
if isinstance(self._db_path, Path):
backup_path = self._backup_db(self._db_path)
for migration_set in self._migration_sets:
if self._get_is_migration_needed(migration_set):
try:
self._run_migration_set(migration_set)
except Exception as e:
msg = f'Problem runing "{migration_set.table_name}" migrations: {e}'
self._logger.error(msg)
if backup_path is not None:
self._logger.error(f" Restoring from {backup_path}")
self._restore_db(backup_path)
raise MigrationError(msg) from e
# TODO: delete backup file?
# if backup_path is not None:
# Path(backup_path).unlink()
def register_migration_set(self, migration_set: MigrationSet) -> None:
"""Registers a migration set to be migrated on application startup."""
self._migration_sets.add(migration_set)

View File

@ -1,4 +0,0 @@
from invokeai.app.services.workflow_records.migrations.v0 import v0
from invokeai.app.services.workflow_records.migrations.v1 import v1
__all__ = [v0, v1] # type: ignore

View File

@ -1,35 +0,0 @@
import sqlite3
def v0(cursor: sqlite3.Cursor) -> None:
"""
Migration for `workflows` table v0
https://github.com/invoke-ai/InvokeAI/pull/4686
Creates the `workflows` table for the workflow_records service & a trigger for updated_at.
Note: `workflow_id` gets an implicit index. We don't need to make one for this column.
"""
cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS workflows (
workflow TEXT NOT NULL,
workflow_id TEXT GENERATED ALWAYS AS (json_extract(workflow, '$.id')) VIRTUAL NOT NULL UNIQUE, -- gets implicit index
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) -- updated via trigger
);
"""
)
cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_workflows_updated_at
AFTER UPDATE
ON workflows FOR EACH ROW
BEGIN
UPDATE workflows
SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE workflow_id = old.workflow_id;
END;
"""
)

View File

@ -1,33 +0,0 @@
import sqlite3
def v1(cursor: sqlite3.Cursor) -> None:
"""
Migration for `workflows` table v1
https://github.com/invoke-ai/InvokeAI/pull/5148
Drops the `workflow_images` table and empties the `workflows` table.
Prior to v3.5.0, all workflows were associated with images. They were stored in the image files
themselves, and in v3.4.0 we started storing them in the DB. This turned out to be a bad idea -
you end up with *many* image workflows, most of which are duplicates.
The purpose of workflows DB storage was to provide a workflow library. Library workflows are
different from image workflows. They are only saved when the user requests they be saved.
Moving forward, the storage for image workflows and library workflows will be separate. Image
workflows are store only in the image files. Library workflows are stored only in the DB.
To give ourselves a clean slate, we need to delete all existing workflows in the DB (all of which)
are image workflows. We also need to delete the workflow_images table, which is no longer needed.
"""
cursor.execute(
"""--sql
DROP TABLE IF EXISTS workflow_images;
"""
)
cursor.execute(
"""--sql
DELETE FROM workflows;
"""
)

View File

@ -1,8 +1,6 @@
from invokeai.app.services.invoker import Invoker
from invokeai.app.services.shared.pagination import PaginatedResults
from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
from invokeai.app.services.shared.sqlite.sqlite_migrator import Migration, MigrationSet
from invokeai.app.services.workflow_records.migrations import v0, v1
from invokeai.app.services.workflow_records.workflow_records_base import WorkflowRecordsStorageBase
from invokeai.app.services.workflow_records.workflow_records_common import (
Workflow,
@ -10,23 +8,14 @@ from invokeai.app.services.workflow_records.workflow_records_common import (
WorkflowRecordDTO,
)
workflows_migrations = MigrationSet(
table_name="workflows",
migrations=[
Migration(version=0, migrate=v0),
Migration(version=1, migrate=v1),
],
)
class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
def __init__(self, db: SqliteDatabase) -> None:
super().__init__()
self._db = db
self._lock = db.lock
self._conn = db.conn
self._cursor = self._conn.cursor()
self._db.register_migration_set(workflows_migrations)
self._create_tables()
def start(self, invoker: Invoker) -> None:
self._invoker = invoker
@ -37,7 +26,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
self._cursor.execute(
"""--sql
SELECT workflow_id, workflow, created_at, updated_at
FROM workflows
FROM workflow_library
WHERE workflow_id = ?;
""",
(workflow_id,),
@ -57,7 +46,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
self._lock.acquire()
self._cursor.execute(
"""--sql
INSERT OR IGNORE INTO workflows(workflow)
INSERT OR IGNORE INTO workflow_library(workflow)
VALUES (?);
""",
(workflow.model_dump_json(),),
@ -75,7 +64,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
self._lock.acquire()
self._cursor.execute(
"""--sql
UPDATE workflows
UPDATE workflow_library
SET workflow = ?
WHERE workflow_id = ?;
""",
@ -94,7 +83,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
self._lock.acquire()
self._cursor.execute(
"""--sql
DELETE from workflows
DELETE from workflow_library
WHERE workflow_id = ?;
""",
(workflow_id,),
@ -114,7 +103,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
self._cursor.execute(
"""--sql
SELECT workflow_id, workflow, created_at, updated_at
FROM workflows
FROM workflow_library
ORDER BY created_at DESC
LIMIT ? OFFSET ?;
""",
@ -125,7 +114,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
self._cursor.execute(
"""--sql
SELECT COUNT(*)
FROM workflows;
FROM workflow_library;
"""
)
total = self._cursor.fetchone()[0]
@ -142,3 +131,49 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase):
raise
finally:
self._lock.release()
def _create_tables(self) -> None:
try:
self._lock.acquire()
self._cursor.execute(
"""--sql
CREATE TABLE IF NOT EXISTS workflow_library (
workflow TEXT NOT NULL,
workflow_id TEXT GENERATED ALWAYS AS (json_extract(workflow, '$.id')) VIRTUAL NOT NULL UNIQUE, -- gets implicit index
created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) -- updated via trigger
);
"""
)
self._cursor.execute(
"""--sql
CREATE TRIGGER IF NOT EXISTS tg_workflow_library_updated_at
AFTER UPDATE
ON workflow_library FOR EACH ROW
BEGIN
UPDATE workflow_library
SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
WHERE workflow_id = old.workflow_id;
END;
"""
)
# We do not need the original `workflows` table or `workflow_images` junction table.
self._cursor.execute(
"""--sql
DROP TABLE IF EXISTS workflow_images;
"""
)
self._cursor.execute(
"""--sql
DROP TABLE IF EXISTS workflows;
"""
)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
finally:
self._lock.release()