Some tech debt related to dynamic pydantic schemas for invocations became problematic. Including the invocations and results in the event schemas was breaking pydantic's handling of ref schemas. I don't really understand why - I think it's a pydantic bug in a remote edge case that we are hitting.
After many failed attempts I landed on this implementation, which is actually much tidier than what was in there before.
- Create pydantic-enabled types for `AnyInvocation` and `AnyInvocationOutput` and use these in place of the janky dynamic unions. Actually, they are kinda the same, but better encapsulated. Use these in `Graph`, `GraphExecutionState`, `InvocationEventBase` and `InvocationCompleteEvent`.
- Revise the custom openapi function to work with the new models.
- Split out the custom openapi function to a separate file. Add a `post_transform` callback so consumers can customize the output schema.
- Update makefile scripts.
This is required to get these event fields to deserialize correctly. If omitted, pydantic uses `BaseInvocation`/`BaseInvocationOutput`, which is not correct.
This is similar to the workaround in the `Graph` and `GraphExecutionState` classes where we need to fanagle pydantic with manual validation handling.
There's no longer any need for session-scoped events now that we have the session queue. Session started/completed/canceled map 1-to-1 to queue item status events, but queue item status events also have an event for failed state.
We can simplify queue and processor handling substantially by removing session events and instead using queue item events.
- Remove the session-scoped events entirely.
- Remove all event handling from session queue. The processor still needs to respond to some events from the queue: `QueueClearedEvent`, `BatchEnqueuedEvent` and `QueueItemStatusChangedEvent`.
- Pass an `is_canceled` callback to the invocation context instead of the cancel event
- Update processor logic to ensure the local instance of the current queue item is synced with the instance in the database. This prevents race conditions and ensures lifecycle callback do not get stale callbacks.
- Update docstrings and comments
- Add `complete_queue_item` method to session queue service as an explicit way to mark a queue item as successfully completed. Previously, the queue listened for session complete events to do this.
Closes#6442
- Restore calculation of step percentage but in the backend instead of client
- Simplify signatures for denoise progress event callbacks
- Clean up `step_callback.py` (types, do not recreate constant matrix on every step, formatting)
We don't need to use the payload schema registry. All our events are dispatched as pydantic models, which are already validated on instantiation.
We do want to add all events to the OpenAPI schema, and we referred to the payload schema registry for this. To get all events, add a simple helper to EventBase. This is functionally identical to using the schema registry.
The model loader emits events. During testing, it doesn't have access to a fully-mocked events service, so the test fails when attempting to call a nonexistent method. There was a check for this previously, but I accidentally removed it. Restored.
- Remove ABCs, they do not work well with pydantic
- Remove the event type classvar - unused
- Remove clever logic to require an event name - we already get validation for this during schema registration.
- Rename event bases to all end in "Base"
Our events handling and implementation has a couple pain points:
- Adding or removing data from event payloads requires changes wherever the events are dispatched from.
- We have no type safety for events and need to rely on string matching and dict access when interacting with events.
- Frontend types for socket events must be manually typed. This has caused several bugs.
`fastapi-events` has a neat feature where you can create a pydantic model as an event payload, give it an `__event_name__` attr, and then dispatch the model directly.
This allows us to eliminate a layer of indirection and some unpleasant complexity:
- Event handler callbacks get type hints for their event payloads, and can use `isinstance` on them if needed.
- Event payload construction is now the responsibility of the event itself (a pydantic model), not the service. Every event model has a `build` class method, encapsulating this logic. The build methods are provided as few args as possible. For example, `InvocationStartedEvent.build()` gets the invocation instance and queue item, and can choose the data it wants to include in the event payload.
- Frontend event types may be autogenerated from the OpenAPI schema. We use the payload registry feature of `fastapi-events` to collect all payload models into one place, making it trivial to keep our schema and frontend types in sync.
This commit moves the backend over to this improved event handling setup.
I had set the cancel event at some point during troubleshooting an unrelated issue. It seemed logical that it should be set there, and didn't seem to break anything. However, this is not correct.
The cancel event should not be set in response to a queue status change event. Doing so can cause a race condition when nodes are executed very quickly.
It's possible that a previously-executed session's queue item status change event is handled after the next session starts executing. The cancel event is set and the session runner sees it aborting the session run early.
In hindsight, it doesn't make sense to set the cancel event here either. It should be set in response to user action, e.g. the user cancelled the session or cleared the queue (which implicitly cancels the current session). These events actually trigger the queue item status changed event, so if we set the cancel event here, we'd be setting it twice per cancellation.
- Add handling for new error columns `error_type`, `error_message`, `error_traceback`.
- Update queue item model to include the new data. The `error_traceback` field has an alias of `error` for backwards compatibility.
- Add `fail_queue_item` method. This was previously handled by `cancel_queue_item`. Splitting this functionality makes failing a queue item a bit more explicit. We also don't need to handle multiple optional error args.
-
We were not handling node preparation errors as node errors before. Here's the explanation, copied from a comment that is no longer required:
---
TODO(psyche): Sessions only support errors on nodes, not on the session itself. When an error occurs outside
node execution, it bubbles up to the processor where it is treated as a queue item error.
Nodes are pydantic models. When we prepare a node in `session.next()`, we set its inputs. This can cause a
pydantic validation error. For example, consider a resize image node which has a constraint on its `width`
input field - it must be greater than zero. During preparation, if the width is set to zero, pydantic will
raise a validation error.
When this happens, it breaks the flow before `invocation` is set. We can't set an error on the invocation
because we didn't get far enough to get it - we don't know its id. Hence, we just set it as a queue item error.
---
This change wraps the node preparation step with exception handling. A new `NodeInputError` exception is raised when there is a validation error. This error has the node (in the state it was in just prior to the error) and an identifier of the input that failed.
This allows us to mark the node that failed preparation as errored, correctly making such errors _node_ errors and not _processor_ errors. It's much easier to diagnose these situations. The error messages look like this:
> Node b5ac87c6-0678-4b8c-96b9-d215aee12175 has invalid incoming input for height
Some of the exception handling logic is cleaned up.
- Use protocol to define callbacks, this allows them to have kwargs
- Shuffle the profiler around a bit
- Move `thread_limit` and `polling_interval` to `__init__`; `start` is called programmatically and will never get these args in practice
- Add `OnNodeError` and `OnNonFatalProcessorError` callbacks
- Move all session/node callbacks to `SessionRunner` - this ensures we dump perf stats before resetting them and generally makes sense to me
- Remove `complete` event from `SessionRunner`, it's essentially the same as `OnAfterRunSession`
- Remove extraneous `next_invocation` block, which would treat a processor error as a node error
- Simplify loops
- Add some callbacks for testing, to be removed before merge
The session is never updated in the queue after it is first enqueued. As a result, the queue detail view in the frontend never never updates and the session itself doesn't show outputs, execution graph, etc.
We need a new method on the queue service to update a queue item's session, then call it before updating the queue item's status.
Queue item status may be updated via a session-type event _or_ queue-type event. Adding the updated session to all these events is a hairy - simpler to just update the session before we do anything that could trigger a queue item status change event:
- Before calling `emit_session_complete` in the processor (handles session error, completed and cancel events and the corresponding queue events)
- Before calling `cancel_queue_item` in the processor (handles another way queue items can be canceled, outside the session execution loop)
When serializing the session, both in the new service method and the `get_queue_item` endpoint, we need to use `exclude_none=True` to prevent unexpected validation errors.