mirror of
https://github.com/invoke-ai/InvokeAI
synced 2024-08-30 20:32:17 +00:00
tidy(item_storage): remove unused list and search methods
This commit is contained in:
parent
2965357d99
commit
59279851a3
@ -3,8 +3,6 @@ from typing import Callable, Generic, TypeVar
|
|||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from invokeai.app.services.shared.pagination import PaginatedResults
|
|
||||||
|
|
||||||
T = TypeVar("T", bound=BaseModel)
|
T = TypeVar("T", bound=BaseModel)
|
||||||
|
|
||||||
|
|
||||||
@ -35,15 +33,6 @@ class ItemStorageABC(ABC, Generic[T]):
|
|||||||
"""Deletes the item"""
|
"""Deletes the item"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def list(self, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
|
|
||||||
"""Gets a paginated list of items"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def search(self, query: str, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_changed(self, on_changed: Callable[[T], None]) -> None:
|
def on_changed(self, on_changed: Callable[[T], None]) -> None:
|
||||||
"""Register a callback for when an item is changed"""
|
"""Register a callback for when an item is changed"""
|
||||||
self._on_changed_callbacks.append(on_changed)
|
self._on_changed_callbacks.append(on_changed)
|
||||||
|
@ -3,7 +3,6 @@ from typing import Generic, Optional, TypeVar
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from invokeai.app.services.item_storage.item_storage_base import ItemStorageABC
|
from invokeai.app.services.item_storage.item_storage_base import ItemStorageABC
|
||||||
from invokeai.app.services.shared.pagination import PaginatedResults
|
|
||||||
|
|
||||||
T = TypeVar("T", bound=BaseModel)
|
T = TypeVar("T", bound=BaseModel)
|
||||||
|
|
||||||
@ -27,16 +26,3 @@ class ItemStorageMemory(ItemStorageABC, Generic[T]):
|
|||||||
self._on_deleted(item_id)
|
self._on_deleted(item_id)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def list(self, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
|
|
||||||
# TODO: actually paginate?
|
|
||||||
return PaginatedResults(
|
|
||||||
items=list(self._items.values()), page=page, per_page=per_page, pages=1, total=len(self._items)
|
|
||||||
)
|
|
||||||
|
|
||||||
def search(self, query: str, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
|
|
||||||
# TODO: actually paginate?
|
|
||||||
# TODO: actually search?
|
|
||||||
return PaginatedResults(
|
|
||||||
items=list(self._items.values()), page=page, per_page=per_page, pages=1, total=len(self._items)
|
|
||||||
)
|
|
||||||
|
@ -4,7 +4,6 @@ from typing import Generic, Optional, TypeVar, get_args
|
|||||||
|
|
||||||
from pydantic import BaseModel, TypeAdapter
|
from pydantic import BaseModel, TypeAdapter
|
||||||
|
|
||||||
from invokeai.app.services.shared.pagination import PaginatedResults
|
|
||||||
from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
|
from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
|
||||||
|
|
||||||
from .item_storage_base import ItemStorageABC
|
from .item_storage_base import ItemStorageABC
|
||||||
@ -89,46 +88,3 @@ class SqliteItemStorage(ItemStorageABC, Generic[T]):
|
|||||||
finally:
|
finally:
|
||||||
self._lock.release()
|
self._lock.release()
|
||||||
self._on_deleted(id)
|
self._on_deleted(id)
|
||||||
|
|
||||||
def list(self, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
|
|
||||||
try:
|
|
||||||
self._lock.acquire()
|
|
||||||
self._cursor.execute(
|
|
||||||
f"""SELECT item FROM {self._table_name} LIMIT ? OFFSET ?;""",
|
|
||||||
(per_page, page * per_page),
|
|
||||||
)
|
|
||||||
result = self._cursor.fetchall()
|
|
||||||
|
|
||||||
items = [self._parse_item(r[0]) for r in result]
|
|
||||||
|
|
||||||
self._cursor.execute(f"""SELECT count(*) FROM {self._table_name};""")
|
|
||||||
count = self._cursor.fetchone()[0]
|
|
||||||
finally:
|
|
||||||
self._lock.release()
|
|
||||||
|
|
||||||
pageCount = int(count / per_page) + 1
|
|
||||||
|
|
||||||
return PaginatedResults[T](items=items, page=page, pages=pageCount, per_page=per_page, total=count)
|
|
||||||
|
|
||||||
def search(self, query: str, page: int = 0, per_page: int = 10) -> PaginatedResults[T]:
|
|
||||||
try:
|
|
||||||
self._lock.acquire()
|
|
||||||
self._cursor.execute(
|
|
||||||
f"""SELECT item FROM {self._table_name} WHERE item LIKE ? LIMIT ? OFFSET ?;""",
|
|
||||||
(f"%{query}%", per_page, page * per_page),
|
|
||||||
)
|
|
||||||
result = self._cursor.fetchall()
|
|
||||||
|
|
||||||
items = [self._parse_item(r[0]) for r in result]
|
|
||||||
|
|
||||||
self._cursor.execute(
|
|
||||||
f"""SELECT count(*) FROM {self._table_name} WHERE item LIKE ?;""",
|
|
||||||
(f"%{query}%",),
|
|
||||||
)
|
|
||||||
count = self._cursor.fetchone()[0]
|
|
||||||
finally:
|
|
||||||
self._lock.release()
|
|
||||||
|
|
||||||
pageCount = int(count / per_page) + 1
|
|
||||||
|
|
||||||
return PaginatedResults[T](items=items, page=page, pages=pageCount, per_page=per_page, total=count)
|
|
||||||
|
@ -28,22 +28,6 @@ def test_sqlite_service_can_create_and_get(db: SqliteItemStorage[TestModel]):
|
|||||||
assert db.get("1") == TestModel(id="1", name="Test")
|
assert db.get("1") == TestModel(id="1", name="Test")
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_list(db: SqliteItemStorage[TestModel]):
|
|
||||||
db.set(TestModel(id="1", name="Test"))
|
|
||||||
db.set(TestModel(id="2", name="Test"))
|
|
||||||
db.set(TestModel(id="3", name="Test"))
|
|
||||||
results = db.list()
|
|
||||||
assert results.page == 0
|
|
||||||
assert results.pages == 1
|
|
||||||
assert results.per_page == 10
|
|
||||||
assert results.total == 3
|
|
||||||
assert results.items == [
|
|
||||||
TestModel(id="1", name="Test"),
|
|
||||||
TestModel(id="2", name="Test"),
|
|
||||||
TestModel(id="3", name="Test"),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_delete(db: SqliteItemStorage[TestModel]):
|
def test_sqlite_service_can_delete(db: SqliteItemStorage[TestModel]):
|
||||||
db.set(TestModel(id="1", name="Test"))
|
db.set(TestModel(id="1", name="Test"))
|
||||||
db.delete("1")
|
db.delete("1")
|
||||||
@ -73,67 +57,3 @@ def test_sqlite_service_calls_delete_callback(db: SqliteItemStorage[TestModel]):
|
|||||||
db.set(TestModel(id="1", name="Test"))
|
db.set(TestModel(id="1", name="Test"))
|
||||||
db.delete("1")
|
db.delete("1")
|
||||||
assert called
|
assert called
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_list_with_pagination(db: SqliteItemStorage[TestModel]):
|
|
||||||
db.set(TestModel(id="1", name="Test"))
|
|
||||||
db.set(TestModel(id="2", name="Test"))
|
|
||||||
db.set(TestModel(id="3", name="Test"))
|
|
||||||
results = db.list(page=0, per_page=2)
|
|
||||||
assert results.page == 0
|
|
||||||
assert results.pages == 2
|
|
||||||
assert results.per_page == 2
|
|
||||||
assert results.total == 3
|
|
||||||
assert results.items == [TestModel(id="1", name="Test"), TestModel(id="2", name="Test")]
|
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_list_with_pagination_and_offset(db: SqliteItemStorage[TestModel]):
|
|
||||||
db.set(TestModel(id="1", name="Test"))
|
|
||||||
db.set(TestModel(id="2", name="Test"))
|
|
||||||
db.set(TestModel(id="3", name="Test"))
|
|
||||||
results = db.list(page=1, per_page=2)
|
|
||||||
assert results.page == 1
|
|
||||||
assert results.pages == 2
|
|
||||||
assert results.per_page == 2
|
|
||||||
assert results.total == 3
|
|
||||||
assert results.items == [TestModel(id="3", name="Test")]
|
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_search(db: SqliteItemStorage[TestModel]):
|
|
||||||
db.set(TestModel(id="1", name="Test"))
|
|
||||||
db.set(TestModel(id="2", name="Test"))
|
|
||||||
db.set(TestModel(id="3", name="Test"))
|
|
||||||
results = db.search(query="Test")
|
|
||||||
assert results.page == 0
|
|
||||||
assert results.pages == 1
|
|
||||||
assert results.per_page == 10
|
|
||||||
assert results.total == 3
|
|
||||||
assert results.items == [
|
|
||||||
TestModel(id="1", name="Test"),
|
|
||||||
TestModel(id="2", name="Test"),
|
|
||||||
TestModel(id="3", name="Test"),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_search_with_pagination(db: SqliteItemStorage[TestModel]):
|
|
||||||
db.set(TestModel(id="1", name="Test"))
|
|
||||||
db.set(TestModel(id="2", name="Test"))
|
|
||||||
db.set(TestModel(id="3", name="Test"))
|
|
||||||
results = db.search(query="Test", page=0, per_page=2)
|
|
||||||
assert results.page == 0
|
|
||||||
assert results.pages == 2
|
|
||||||
assert results.per_page == 2
|
|
||||||
assert results.total == 3
|
|
||||||
assert results.items == [TestModel(id="1", name="Test"), TestModel(id="2", name="Test")]
|
|
||||||
|
|
||||||
|
|
||||||
def test_sqlite_service_can_search_with_pagination_and_offset(db: SqliteItemStorage[TestModel]):
|
|
||||||
db.set(TestModel(id="1", name="Test"))
|
|
||||||
db.set(TestModel(id="2", name="Test"))
|
|
||||||
db.set(TestModel(id="3", name="Test"))
|
|
||||||
results = db.search(query="Test", page=1, per_page=2)
|
|
||||||
assert results.page == 1
|
|
||||||
assert results.pages == 2
|
|
||||||
assert results.per_page == 2
|
|
||||||
assert results.total == 3
|
|
||||||
assert results.items == [TestModel(id="3", name="Test")]
|
|
||||||
|
Loading…
Reference in New Issue
Block a user