diff --git a/invokeai/app/api/dependencies.py b/invokeai/app/api/dependencies.py index 825e852545..d2ca877a45 100644 --- a/invokeai/app/api/dependencies.py +++ b/invokeai/app/api/dependencies.py @@ -124,7 +124,6 @@ class ApiDependencies: create_system_graphs(services.graph_library) - db.run_migrations() db.clean() ApiDependencies.invoker = Invoker(services) diff --git a/invokeai/app/services/image_records/image_records_sqlite.py b/invokeai/app/services/image_records/image_records_sqlite.py index 1b1ff4220d..e71edbad87 100644 --- a/invokeai/app/services/image_records/image_records_sqlite.py +++ b/invokeai/app/services/image_records/image_records_sqlite.py @@ -3,11 +3,8 @@ from datetime import datetime from typing import Optional, Union, cast from invokeai.app.invocations.baseinvocation import MetadataField, MetadataFieldValidator -from invokeai.app.services.image_records.migrations import v0, v1, v2 -from invokeai.app.services.invoker import Invoker from invokeai.app.services.shared.pagination import OffsetPaginatedResults from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase -from invokeai.app.services.shared.sqlite.sqlite_migrator import Migration, MigrationSet from .image_records_base import ImageRecordStorageBase from .image_records_common import ( @@ -22,27 +19,107 @@ from .image_records_common import ( deserialize_image_record, ) -images_migrations = MigrationSet( - table_name="images", - migrations=[ - Migration(version=0, migrate=v0), - Migration(version=1, migrate=v1), - Migration(version=2, migrate=v2), - ], -) - class SqliteImageRecordStorage(ImageRecordStorageBase): def __init__(self, db: SqliteDatabase) -> None: super().__init__() - self._db = db self._lock = db.lock self._conn = db.conn self._cursor = self._conn.cursor() - self._db.register_migration_set(images_migrations) + self._create_tables() - def start(self, invoker: Invoker) -> None: - self._invoker = invoker + def _create_tables(self) -> None: + """Creates the `images` table.""" + with self._lock: + try: + self._cursor.execute( + """--sql + CREATE TABLE IF NOT EXISTS images ( + image_name TEXT NOT NULL PRIMARY KEY, + -- This is an enum in python, unrestricted string here for flexibility + image_origin TEXT NOT NULL, + -- This is an enum in python, unrestricted string here for flexibility + image_category TEXT NOT NULL, + width INTEGER NOT NULL, + height INTEGER NOT NULL, + session_id TEXT, + node_id TEXT, + metadata TEXT, + is_intermediate BOOLEAN DEFAULT FALSE, + created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + -- Updated via trigger + updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + -- Soft delete, currently unused + deleted_at DATETIME + ); + """ + ) + + self._cursor.execute("PRAGMA table_info(images)") + columns = [column[1] for column in self._cursor.fetchall()] + + if "starred" not in columns: + self._cursor.execute( + """--sql + ALTER TABLE images ADD COLUMN starred BOOLEAN DEFAULT FALSE; + """ + ) + + # Create the `images` table indices. + self._cursor.execute( + """--sql + CREATE UNIQUE INDEX IF NOT EXISTS idx_images_image_name ON images(image_name); + """ + ) + self._cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_images_image_origin ON images(image_origin); + """ + ) + self._cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_images_image_category ON images(image_category); + """ + ) + self._cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_images_created_at ON images(created_at); + """ + ) + + self._cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_images_starred ON images(starred); + """ + ) + + # Add trigger for `updated_at`. + self._cursor.execute( + """--sql + CREATE TRIGGER IF NOT EXISTS tg_images_updated_at + AFTER UPDATE + ON images FOR EACH ROW + BEGIN + UPDATE images SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') + WHERE image_name = old.image_name; + END; + """ + ) + + self._cursor.execute("PRAGMA table_info(images)") + columns = [column[1] for column in self._cursor.fetchall()] + if "has_workflow" not in columns: + self._cursor.execute( + """--sql + ALTER TABLE images + ADD COLUMN has_workflow BOOLEAN DEFAULT FALSE; + """ + ) + + self._conn.commit() + except Exception: + self._conn.rollback() + raise def get(self, image_name: str) -> ImageRecord: try: diff --git a/invokeai/app/services/image_records/migrations/__init__.py b/invokeai/app/services/image_records/migrations/__init__.py deleted file mode 100644 index f896bfec46..0000000000 --- a/invokeai/app/services/image_records/migrations/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .v0 import v0 -from .v1 import v1 -from .v2 import v2 - -__all__ = [v0, v1, v2] # type: ignore diff --git a/invokeai/app/services/image_records/migrations/v0.py b/invokeai/app/services/image_records/migrations/v0.py deleted file mode 100644 index 480ace2736..0000000000 --- a/invokeai/app/services/image_records/migrations/v0.py +++ /dev/null @@ -1,64 +0,0 @@ -import sqlite3 - - -def v0(cursor: sqlite3.Cursor) -> None: - """ - Migration for `images` table v0 - https://github.com/invoke-ai/InvokeAI/pull/3443 - - Adds the `images` table, indicies and triggers for the image_records service. - """ - - cursor.execute( - """--sql - CREATE TABLE IF NOT EXISTS images ( - image_name TEXT NOT NULL PRIMARY KEY, - -- This is an enum in python, unrestricted string here for flexibility - image_origin TEXT NOT NULL, - -- This is an enum in python, unrestricted string here for flexibility - image_category TEXT NOT NULL, - width INTEGER NOT NULL, - height INTEGER NOT NULL, - session_id TEXT, - node_id TEXT, - metadata TEXT, - is_intermediate BOOLEAN DEFAULT FALSE, - created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), - -- Updated via trigger - updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), - -- Soft delete, currently unused - deleted_at DATETIME - ); - """ - ) - cursor.execute( - """--sql - CREATE UNIQUE INDEX IF NOT EXISTS idx_images_image_name ON images(image_name); - """ - ) - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_images_image_origin ON images(image_origin); - """ - ) - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_images_image_category ON images(image_category); - """ - ) - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_images_created_at ON images(created_at); - """ - ) - cursor.execute( - """--sql - CREATE TRIGGER IF NOT EXISTS tg_images_updated_at - AFTER - UPDATE ON images FOR EACH ROW BEGIN - UPDATE images - SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE image_name = old.image_name; - END; - """ - ) diff --git a/invokeai/app/services/image_records/migrations/v1.py b/invokeai/app/services/image_records/migrations/v1.py deleted file mode 100644 index 9ef24d4481..0000000000 --- a/invokeai/app/services/image_records/migrations/v1.py +++ /dev/null @@ -1,25 +0,0 @@ -import sqlite3 - - -def v1(cursor: sqlite3.Cursor) -> None: - """ - Migration for `images` table v1 - https://github.com/invoke-ai/InvokeAI/pull/4246 - - Adds the `starred` column to the `images` table. - """ - - cursor.execute("PRAGMA table_info(images)") - columns = [column[1] for column in cursor.fetchall()] - if "starred" not in columns: - cursor.execute( - """--sql - ALTER TABLE images - ADD COLUMN starred BOOLEAN DEFAULT FALSE; - """ - ) - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_images_starred ON images(starred); - """ - ) diff --git a/invokeai/app/services/image_records/migrations/v2.py b/invokeai/app/services/image_records/migrations/v2.py deleted file mode 100644 index 2cca604fd6..0000000000 --- a/invokeai/app/services/image_records/migrations/v2.py +++ /dev/null @@ -1,24 +0,0 @@ -import sqlite3 - - -def v2(cursor: sqlite3.Cursor) -> None: - """ - Migration for `images` table v2 - https://github.com/invoke-ai/InvokeAI/pull/5148 - - Adds the `has_workflow` column to the `images` table. - - Workflows associated with images are now only stored in the image file itself. This column - indicates whether the image has a workflow embedded in it, so we don't need to read the image - file to find out. - """ - - cursor.execute("PRAGMA table_info(images)") - columns = [column[1] for column in cursor.fetchall()] - if "has_workflow" not in columns: - cursor.execute( - """--sql - ALTER TABLE images - ADD COLUMN has_workflow BOOLEAN DEFAULT FALSE; - """ - ) diff --git a/invokeai/app/services/session_queue/migrations/__init__.py b/invokeai/app/services/session_queue/migrations/__init__.py deleted file mode 100644 index 0518bb512c..0000000000 --- a/invokeai/app/services/session_queue/migrations/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .v0 import v0 -from .v1 import v1 - -__all__ = [v0, v1] # type: ignore diff --git a/invokeai/app/services/session_queue/migrations/v0.py b/invokeai/app/services/session_queue/migrations/v0.py deleted file mode 100644 index ca388b2a7d..0000000000 --- a/invokeai/app/services/session_queue/migrations/v0.py +++ /dev/null @@ -1,106 +0,0 @@ -import sqlite3 - - -def v0(cursor: sqlite3.Cursor) -> None: - """ - Migration for `session_queue` table v0 - https://github.com/invoke-ai/InvokeAI/pull/4502 - - Creates the `session_queue` table, indicies and triggers for the session_queue service. - """ - cursor.execute( - """--sql - CREATE TABLE IF NOT EXISTS session_queue ( - item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- used for ordering, cursor pagination - batch_id TEXT NOT NULL, -- identifier of the batch this queue item belongs to - queue_id TEXT NOT NULL, -- identifier of the queue this queue item belongs to - session_id TEXT NOT NULL UNIQUE, -- duplicated data from the session column, for ease of access - field_values TEXT, -- NULL if no values are associated with this queue item - session TEXT NOT NULL, -- the session to be executed - status TEXT NOT NULL DEFAULT 'pending', -- the status of the queue item, one of 'pending', 'in_progress', 'completed', 'failed', 'canceled' - priority INTEGER NOT NULL DEFAULT 0, -- the priority, higher is more important - error TEXT, -- any errors associated with this queue item - created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), - updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), -- updated via trigger - started_at DATETIME, -- updated via trigger - completed_at DATETIME -- updated via trigger, completed items are cleaned up on application startup - -- Ideally this is a FK, but graph_executions uses INSERT OR REPLACE, and REPLACE triggers the ON DELETE CASCADE... - -- FOREIGN KEY (session_id) REFERENCES graph_executions (id) ON DELETE CASCADE - ); - """ - ) - - cursor.execute( - """--sql - CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_item_id ON session_queue(item_id); - """ - ) - - cursor.execute( - """--sql - CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_session_id ON session_queue(session_id); - """ - ) - - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_session_queue_batch_id ON session_queue(batch_id); - """ - ) - - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_session_queue_created_priority ON session_queue(priority); - """ - ) - - cursor.execute( - """--sql - CREATE INDEX IF NOT EXISTS idx_session_queue_created_status ON session_queue(status); - """ - ) - - cursor.execute( - """--sql - CREATE TRIGGER IF NOT EXISTS tg_session_queue_completed_at - AFTER UPDATE OF status ON session_queue - FOR EACH ROW - WHEN - NEW.status = 'completed' - OR NEW.status = 'failed' - OR NEW.status = 'canceled' - BEGIN - UPDATE session_queue - SET completed_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE item_id = NEW.item_id; - END; - """ - ) - - cursor.execute( - """--sql - CREATE TRIGGER IF NOT EXISTS tg_session_queue_started_at - AFTER UPDATE OF status ON session_queue - FOR EACH ROW - WHEN - NEW.status = 'in_progress' - BEGIN - UPDATE session_queue - SET started_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE item_id = NEW.item_id; - END; - """ - ) - - cursor.execute( - """--sql - CREATE TRIGGER IF NOT EXISTS tg_session_queue_updated_at - AFTER UPDATE - ON session_queue FOR EACH ROW - BEGIN - UPDATE session_queue - SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE item_id = old.item_id; - END; - """ - ) diff --git a/invokeai/app/services/session_queue/migrations/v1.py b/invokeai/app/services/session_queue/migrations/v1.py deleted file mode 100644 index e805f22852..0000000000 --- a/invokeai/app/services/session_queue/migrations/v1.py +++ /dev/null @@ -1,22 +0,0 @@ -import sqlite3 - - -def v1(cursor: sqlite3.Cursor) -> None: - """ - Migration for `session_queue` table v1 - https://github.com/invoke-ai/InvokeAI/pull/5148 - - Adds the `workflow` column to the `session_queue` table. - - Workflows have been (correctly) made a property of a queue item, rather than individual nodes. - This requires they be included in the session queue. - """ - - cursor.execute("PRAGMA table_info(session_queue)") - columns = [column[1] for column in cursor.fetchall()] - if "workflow" not in columns: - cursor.execute( - """--sql - ALTER TABLE session_queue ADD COLUMN workflow TEXT; - """ - ) diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index 495a053431..2220bf00f8 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -6,7 +6,6 @@ from fastapi_events.typing import Event as FastAPIEvent from invokeai.app.services.events.events_base import EventServiceBase from invokeai.app.services.invoker import Invoker -from invokeai.app.services.session_queue.migrations import v0, v1 from invokeai.app.services.session_queue.session_queue_base import SessionQueueBase from invokeai.app.services.session_queue.session_queue_common import ( DEFAULT_QUEUE_ID, @@ -29,25 +28,15 @@ from invokeai.app.services.session_queue.session_queue_common import ( ) from invokeai.app.services.shared.pagination import CursorPaginatedResults from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase -from invokeai.app.services.shared.sqlite.sqlite_migrator import Migration, MigrationSet - -session_queue_migrations = MigrationSet( - table_name="session_queue", - migrations=[ - Migration(version=0, migrate=v0), - Migration(version=1, migrate=v1), - ], -) class SqliteSessionQueue(SessionQueueBase): def __init__(self, db: SqliteDatabase) -> None: super().__init__() - self.__db = db self.__lock = db.lock self.__conn = db.conn self.__cursor = self.__conn.cursor() - self.__db.register_migration_set(session_queue_migrations) + self._create_tables() def start(self, invoker: Invoker) -> None: self.__invoker = invoker @@ -102,6 +91,123 @@ class SqliteSessionQueue(SessionQueueBase): except SessionQueueItemNotFoundError: return + def _create_tables(self) -> None: + """Creates the session queue tables, indicies, and triggers""" + try: + self.__lock.acquire() + self.__cursor.execute( + """--sql + CREATE TABLE IF NOT EXISTS session_queue ( + item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- used for ordering, cursor pagination + batch_id TEXT NOT NULL, -- identifier of the batch this queue item belongs to + queue_id TEXT NOT NULL, -- identifier of the queue this queue item belongs to + session_id TEXT NOT NULL UNIQUE, -- duplicated data from the session column, for ease of access + field_values TEXT, -- NULL if no values are associated with this queue item + session TEXT NOT NULL, -- the session to be executed + status TEXT NOT NULL DEFAULT 'pending', -- the status of the queue item, one of 'pending', 'in_progress', 'completed', 'failed', 'canceled' + priority INTEGER NOT NULL DEFAULT 0, -- the priority, higher is more important + error TEXT, -- any errors associated with this queue item + created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), -- updated via trigger + started_at DATETIME, -- updated via trigger + completed_at DATETIME -- updated via trigger, completed items are cleaned up on application startup + -- Ideally this is a FK, but graph_executions uses INSERT OR REPLACE, and REPLACE triggers the ON DELETE CASCADE... + -- FOREIGN KEY (session_id) REFERENCES graph_executions (id) ON DELETE CASCADE + ); + """ + ) + + self.__cursor.execute( + """--sql + CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_item_id ON session_queue(item_id); + """ + ) + + self.__cursor.execute( + """--sql + CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_session_id ON session_queue(session_id); + """ + ) + + self.__cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_session_queue_batch_id ON session_queue(batch_id); + """ + ) + + self.__cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_session_queue_created_priority ON session_queue(priority); + """ + ) + + self.__cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_session_queue_created_status ON session_queue(status); + """ + ) + + self.__cursor.execute( + """--sql + CREATE TRIGGER IF NOT EXISTS tg_session_queue_completed_at + AFTER UPDATE OF status ON session_queue + FOR EACH ROW + WHEN + NEW.status = 'completed' + OR NEW.status = 'failed' + OR NEW.status = 'canceled' + BEGIN + UPDATE session_queue + SET completed_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') + WHERE item_id = NEW.item_id; + END; + """ + ) + + self.__cursor.execute( + """--sql + CREATE TRIGGER IF NOT EXISTS tg_session_queue_started_at + AFTER UPDATE OF status ON session_queue + FOR EACH ROW + WHEN + NEW.status = 'in_progress' + BEGIN + UPDATE session_queue + SET started_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') + WHERE item_id = NEW.item_id; + END; + """ + ) + + self.__cursor.execute( + """--sql + CREATE TRIGGER IF NOT EXISTS tg_session_queue_updated_at + AFTER UPDATE + ON session_queue FOR EACH ROW + BEGIN + UPDATE session_queue + SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') + WHERE item_id = old.item_id; + END; + """ + ) + + self.__cursor.execute("PRAGMA table_info(session_queue)") + columns = [column[1] for column in self.__cursor.fetchall()] + if "workflow" not in columns: + self.__cursor.execute( + """--sql + ALTER TABLE session_queue ADD COLUMN workflow TEXT; + """ + ) + + self.__conn.commit() + except Exception: + self.__conn.rollback() + raise + finally: + self.__lock.release() + def _set_in_progress_to_canceled(self) -> None: """ Sets all in_progress queue items to canceled. Run on app startup, not associated with any queue. diff --git a/invokeai/app/services/shared/sqlite/sqlite_database.py b/invokeai/app/services/shared/sqlite/sqlite_database.py index ae3f4fba5b..b90134f703 100644 --- a/invokeai/app/services/shared/sqlite/sqlite_database.py +++ b/invokeai/app/services/shared/sqlite/sqlite_database.py @@ -4,7 +4,6 @@ from logging import Logger from invokeai.app.services.config import InvokeAIAppConfig from invokeai.app.services.shared.sqlite.sqlite_common import sqlite_memory -from invokeai.app.services.shared.sqlite.sqlite_migrator import MigrationSet, SQLiteMigrator class SqliteDatabase: @@ -29,7 +28,6 @@ class SqliteDatabase: self.conn.set_trace_callback(self._logger.debug) self.conn.execute("PRAGMA foreign_keys = ON;") - self._migrator = SQLiteMigrator(db_path=location, lock=self.lock, logger=self._logger) def clean(self) -> None: try: @@ -42,9 +40,3 @@ class SqliteDatabase: raise finally: self.lock.release() - - def register_migration_set(self, migration_set: MigrationSet) -> None: - self._migrator.register_migration_set(migration_set) - - def run_migrations(self) -> None: - self._migrator.run_migrations() diff --git a/invokeai/app/services/shared/sqlite/sqlite_migrator.py b/invokeai/app/services/shared/sqlite/sqlite_migrator.py deleted file mode 100644 index 1f71ad3d85..0000000000 --- a/invokeai/app/services/shared/sqlite/sqlite_migrator.py +++ /dev/null @@ -1,192 +0,0 @@ -import datetime -import shutil -import sqlite3 -import threading -from logging import Logger -from pathlib import Path -from typing import Callable, Optional, TypeAlias - -from .sqlite_common import sqlite_memory - -MigrateCallback: TypeAlias = Callable[[sqlite3.Cursor], None] - - -class MigrationError(Exception): - pass - - -class Migration: - def __init__( - self, - version: int, - migrate: MigrateCallback, - ) -> None: - self.version = version - self.migrate = migrate - - -class MigrationSet: - def __init__(self, table_name: str, migrations: list[Migration]) -> None: - self.table_name = table_name - self.migrations = migrations - - -class SQLiteMigrator: - """ - Handles SQLite database migrations. - - Migrations are registered with the `register_migration_set` method. They are applied on - application startup with the `run_migrations` method. - - A `MigrationSet` is a set of `Migration`s for a single table. Each `Migration` has a `version` - and `migrate` callback. The callback is provided with a `sqlite3.Cursor` and should perform the - any migration logic. Committing, rolling back transactions and errors are handled by the migrator. - - Migrations are applied in order of version number. If the database does not have a version table - for a given table, it is assumed to be at version 0. The migrator creates and manages the version - tables. - - If the database is a file, it will be backed up before migrations are applied and restored if - there are any errors. - """ - - def __init__(self, db_path: Path | str, lock: threading.RLock, logger: Logger): - self._logger = logger - self._conn = sqlite3.connect(db_path, check_same_thread=False) - self._cursor = self._conn.cursor() - self._lock = lock - self._db_path = db_path - self._migration_sets: set[MigrationSet] = set() - - def _get_version_table_name(self, table_name: str) -> str: - """Returns the name of the version table for a given table.""" - return f"{table_name}_version" - - def _create_version_table(self, table_name: str) -> None: - """ - Creates a version table for a given table, if it does not exist. - Throws MigrationError if there is a problem. - """ - version_table_name = self._get_version_table_name(table_name) - with self._lock: - try: - self._cursor.execute( - f"""--sql - CREATE TABLE IF NOT EXISTS {version_table_name} ( - version INTEGER PRIMARY KEY, - created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) - ); - """ - ) - self._conn.commit() - except sqlite3.Error as e: - msg = f'Problem creation "{version_table_name}" table: {e}' - self._logger.error(msg) - self._conn.rollback() - raise MigrationError(msg) from e - - def _get_current_version(self, table_name: str) -> Optional[int]: - """Gets the current version of a table, or None if it doesn't exist.""" - version_table_name = self._get_version_table_name(table_name) - try: - self._cursor.execute(f"SELECT MAX(version) FROM {version_table_name};") - return self._cursor.fetchone()[0] - except sqlite3.OperationalError as e: - if "no such table" in str(e): - return None - raise - - def _set_version(self, table_name: str, version: int) -> None: - """Adds a version entry to the table's version table.""" - version_table_name = self._get_version_table_name(table_name) - self._cursor.execute(f"INSERT INTO {version_table_name} (version) VALUES (?);", (version,)) - - def _run_migration(self, table_name: str, migration: Migration) -> None: - """Runs a single migration.""" - with self._lock: - try: - migration.migrate(self._cursor) - self._set_version(table_name=table_name, version=migration.version) - self._conn.commit() - except sqlite3.Error: - self._conn.rollback() - raise - - def _run_migration_set(self, migration_set: MigrationSet) -> None: - """Runs a set of migrations for a single table.""" - with self._lock: - table_name = migration_set.table_name - migrations = migration_set.migrations - self._create_version_table(table_name=table_name) - for migration in migrations: - current_version = self._get_current_version(table_name) - if current_version is None or current_version < migration.version: - try: - self._logger.info(f'runing "{table_name}" migration {migration.version}') - self._run_migration(table_name=table_name, migration=migration) - except sqlite3.Error as e: - raise MigrationError(f'Problem runing "{table_name}" migration {migration.version}: {e}') from e - - def _backup_db(self, db_path: Path) -> Path: - """Backs up the databse, returning the path to the backup file.""" - with self._lock: - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = db_path.parent / f"{db_path.stem}_{timestamp}.db" - self._logger.info(f"Backing up database to {backup_path}") - backup_conn = sqlite3.connect(backup_path) - with backup_conn: - self._conn.backup(backup_conn) - backup_conn.close() - return backup_path - - def _restore_db(self, backup_path: Path) -> None: - """Restores the database from a backup file, unless the database is a memory database.""" - if self._db_path == sqlite_memory: - return - with self._lock: - self._logger.info(f"Restoring database from {backup_path}") - self._conn.close() - if not Path(backup_path).is_file(): - raise FileNotFoundError(f"Backup file {backup_path} does not exist") - shutil.copy2(backup_path, self._db_path) - - def _get_is_migration_needed(self, migration_set: MigrationSet) -> bool: - table_name = migration_set.table_name - migrations = migration_set.migrations - current_version = self._get_current_version(table_name) - if current_version is None or current_version < migrations[-1].version: - return True - return False - - def run_migrations(self) -> None: - """ - Applies all registered migration sets. - - If the database is a file, it will be backed up before migrations are applied and restored - if there are any errors. - """ - if not any(self._get_is_migration_needed(migration_set) for migration_set in self._migration_sets): - return - backup_path: Optional[Path] = None - with self._lock: - # Only make a backup if using a file database (not memory) - if isinstance(self._db_path, Path): - backup_path = self._backup_db(self._db_path) - for migration_set in self._migration_sets: - if self._get_is_migration_needed(migration_set): - try: - self._run_migration_set(migration_set) - except Exception as e: - msg = f'Problem runing "{migration_set.table_name}" migrations: {e}' - self._logger.error(msg) - if backup_path is not None: - self._logger.error(f" Restoring from {backup_path}") - self._restore_db(backup_path) - raise MigrationError(msg) from e - # TODO: delete backup file? - # if backup_path is not None: - # Path(backup_path).unlink() - - def register_migration_set(self, migration_set: MigrationSet) -> None: - """Registers a migration set to be migrated on application startup.""" - self._migration_sets.add(migration_set) diff --git a/invokeai/app/services/workflow_records/migrations/__init__.py b/invokeai/app/services/workflow_records/migrations/__init__.py deleted file mode 100644 index 60dbadf70a..0000000000 --- a/invokeai/app/services/workflow_records/migrations/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from invokeai.app.services.workflow_records.migrations.v0 import v0 -from invokeai.app.services.workflow_records.migrations.v1 import v1 - -__all__ = [v0, v1] # type: ignore diff --git a/invokeai/app/services/workflow_records/migrations/v0.py b/invokeai/app/services/workflow_records/migrations/v0.py deleted file mode 100644 index f20287a923..0000000000 --- a/invokeai/app/services/workflow_records/migrations/v0.py +++ /dev/null @@ -1,35 +0,0 @@ -import sqlite3 - - -def v0(cursor: sqlite3.Cursor) -> None: - """ - Migration for `workflows` table v0 - https://github.com/invoke-ai/InvokeAI/pull/4686 - - Creates the `workflows` table for the workflow_records service & a trigger for updated_at. - - Note: `workflow_id` gets an implicit index. We don't need to make one for this column. - """ - cursor.execute( - """--sql - CREATE TABLE IF NOT EXISTS workflows ( - workflow TEXT NOT NULL, - workflow_id TEXT GENERATED ALWAYS AS (json_extract(workflow, '$.id')) VIRTUAL NOT NULL UNIQUE, -- gets implicit index - created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), - updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) -- updated via trigger - ); - """ - ) - - cursor.execute( - """--sql - CREATE TRIGGER IF NOT EXISTS tg_workflows_updated_at - AFTER UPDATE - ON workflows FOR EACH ROW - BEGIN - UPDATE workflows - SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE workflow_id = old.workflow_id; - END; - """ - ) diff --git a/invokeai/app/services/workflow_records/migrations/v1.py b/invokeai/app/services/workflow_records/migrations/v1.py deleted file mode 100644 index 2790294808..0000000000 --- a/invokeai/app/services/workflow_records/migrations/v1.py +++ /dev/null @@ -1,33 +0,0 @@ -import sqlite3 - - -def v1(cursor: sqlite3.Cursor) -> None: - """ - Migration for `workflows` table v1 - https://github.com/invoke-ai/InvokeAI/pull/5148 - - Drops the `workflow_images` table and empties the `workflows` table. - - Prior to v3.5.0, all workflows were associated with images. They were stored in the image files - themselves, and in v3.4.0 we started storing them in the DB. This turned out to be a bad idea - - you end up with *many* image workflows, most of which are duplicates. - - The purpose of workflows DB storage was to provide a workflow library. Library workflows are - different from image workflows. They are only saved when the user requests they be saved. - - Moving forward, the storage for image workflows and library workflows will be separate. Image - workflows are store only in the image files. Library workflows are stored only in the DB. - - To give ourselves a clean slate, we need to delete all existing workflows in the DB (all of which) - are image workflows. We also need to delete the workflow_images table, which is no longer needed. - """ - cursor.execute( - """--sql - DROP TABLE IF EXISTS workflow_images; - """ - ) - cursor.execute( - """--sql - DELETE FROM workflows; - """ - ) diff --git a/invokeai/app/services/workflow_records/workflow_records_sqlite.py b/invokeai/app/services/workflow_records/workflow_records_sqlite.py index 6945698eb1..a97b0a5cd8 100644 --- a/invokeai/app/services/workflow_records/workflow_records_sqlite.py +++ b/invokeai/app/services/workflow_records/workflow_records_sqlite.py @@ -1,8 +1,6 @@ from invokeai.app.services.invoker import Invoker from invokeai.app.services.shared.pagination import PaginatedResults from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase -from invokeai.app.services.shared.sqlite.sqlite_migrator import Migration, MigrationSet -from invokeai.app.services.workflow_records.migrations import v0, v1 from invokeai.app.services.workflow_records.workflow_records_base import WorkflowRecordsStorageBase from invokeai.app.services.workflow_records.workflow_records_common import ( Workflow, @@ -10,23 +8,14 @@ from invokeai.app.services.workflow_records.workflow_records_common import ( WorkflowRecordDTO, ) -workflows_migrations = MigrationSet( - table_name="workflows", - migrations=[ - Migration(version=0, migrate=v0), - Migration(version=1, migrate=v1), - ], -) - class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): def __init__(self, db: SqliteDatabase) -> None: super().__init__() - self._db = db self._lock = db.lock self._conn = db.conn self._cursor = self._conn.cursor() - self._db.register_migration_set(workflows_migrations) + self._create_tables() def start(self, invoker: Invoker) -> None: self._invoker = invoker @@ -37,7 +26,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): self._cursor.execute( """--sql SELECT workflow_id, workflow, created_at, updated_at - FROM workflows + FROM workflow_library WHERE workflow_id = ?; """, (workflow_id,), @@ -57,7 +46,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): self._lock.acquire() self._cursor.execute( """--sql - INSERT OR IGNORE INTO workflows(workflow) + INSERT OR IGNORE INTO workflow_library(workflow) VALUES (?); """, (workflow.model_dump_json(),), @@ -75,7 +64,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): self._lock.acquire() self._cursor.execute( """--sql - UPDATE workflows + UPDATE workflow_library SET workflow = ? WHERE workflow_id = ?; """, @@ -94,7 +83,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): self._lock.acquire() self._cursor.execute( """--sql - DELETE from workflows + DELETE from workflow_library WHERE workflow_id = ?; """, (workflow_id,), @@ -114,7 +103,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): self._cursor.execute( """--sql SELECT workflow_id, workflow, created_at, updated_at - FROM workflows + FROM workflow_library ORDER BY created_at DESC LIMIT ? OFFSET ?; """, @@ -125,7 +114,7 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): self._cursor.execute( """--sql SELECT COUNT(*) - FROM workflows; + FROM workflow_library; """ ) total = self._cursor.fetchone()[0] @@ -142,3 +131,49 @@ class SqliteWorkflowRecordsStorage(WorkflowRecordsStorageBase): raise finally: self._lock.release() + + def _create_tables(self) -> None: + try: + self._lock.acquire() + self._cursor.execute( + """--sql + CREATE TABLE IF NOT EXISTS workflow_library ( + workflow TEXT NOT NULL, + workflow_id TEXT GENERATED ALWAYS AS (json_extract(workflow, '$.id')) VIRTUAL NOT NULL UNIQUE, -- gets implicit index + created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) -- updated via trigger + ); + """ + ) + + self._cursor.execute( + """--sql + CREATE TRIGGER IF NOT EXISTS tg_workflow_library_updated_at + AFTER UPDATE + ON workflow_library FOR EACH ROW + BEGIN + UPDATE workflow_library + SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') + WHERE workflow_id = old.workflow_id; + END; + """ + ) + + # We do not need the original `workflows` table or `workflow_images` junction table. + self._cursor.execute( + """--sql + DROP TABLE IF EXISTS workflow_images; + """ + ) + self._cursor.execute( + """--sql + DROP TABLE IF EXISTS workflows; + """ + ) + + self._conn.commit() + except Exception: + self._conn.rollback() + raise + finally: + self._lock.release()