# The InvokeAI Download Queue The DownloadQueueService provides a multithreaded parallel download queue for arbitrary URLs, with queue prioritization, event handling, and restart capabilities. ## Simple Example ``` from invokeai.app.services.download import DownloadQueueService, TqdmProgress download_queue = DownloadQueueService() for url in ['https://github.com/invoke-ai/InvokeAI/blob/main/invokeai/assets/a-painting-of-a-fire.png?raw=true', 'https://github.com/invoke-ai/InvokeAI/blob/main/invokeai/assets/birdhouse.png?raw=true', 'https://github.com/invoke-ai/InvokeAI/blob/main/invokeai/assets/missing.png', 'https://civitai.com/api/download/models/152309?type=Model&format=SafeTensor', ]: # urls start downloading as soon as download() is called download_queue.download(source=url, dest='/tmp/downloads', on_progress=TqdmProgress().update ) download_queue.join() # wait for all downloads to finish for job in download_queue.list_jobs(): print(job.model_dump_json(exclude_none=True, indent=4),"\n") ``` Output: ``` { "source": "https://github.com/invoke-ai/InvokeAI/blob/main/invokeai/assets/a-painting-of-a-fire.png?raw=true", "dest": "/tmp/downloads", "id": 0, "priority": 10, "status": "completed", "download_path": "/tmp/downloads/a-painting-of-a-fire.png", "job_started": "2023-12-04T05:34:41.742174", "job_ended": "2023-12-04T05:34:42.592035", "bytes": 666734, "total_bytes": 666734 } { "source": "https://github.com/invoke-ai/InvokeAI/blob/main/invokeai/assets/birdhouse.png?raw=true", "dest": "/tmp/downloads", "id": 1, "priority": 10, "status": "completed", "download_path": "/tmp/downloads/birdhouse.png", "job_started": "2023-12-04T05:34:41.741975", "job_ended": "2023-12-04T05:34:42.652841", "bytes": 774949, "total_bytes": 774949 } { "source": "https://github.com/invoke-ai/InvokeAI/blob/main/invokeai/assets/missing.png", "dest": "/tmp/downloads", "id": 2, "priority": 10, "status": "error", "job_started": "2023-12-04T05:34:41.742079", "job_ended": "2023-12-04T05:34:42.147625", "bytes": 0, "total_bytes": 0, "error_type": "HTTPError(Not Found)", "error": "Traceback (most recent call last):\n File \"/home/lstein/Projects/InvokeAI/invokeai/app/services/download/download_default.py\", line 182, in _download_next_item\n self._do_download(job)\n File \"/home/lstein/Projects/InvokeAI/invokeai/app/services/download/download_default.py\", line 206, in _do_download\n raise HTTPError(resp.reason)\nrequests.exceptions.HTTPError: Not Found\n" } { "source": "https://civitai.com/api/download/models/152309?type=Model&format=SafeTensor", "dest": "/tmp/downloads", "id": 3, "priority": 10, "status": "completed", "download_path": "/tmp/downloads/xl_more_art-full_v1.safetensors", "job_started": "2023-12-04T05:34:42.147645", "job_ended": "2023-12-04T05:34:43.735990", "bytes": 719020768, "total_bytes": 719020768 } ``` ## The API The default download queue is `DownloadQueueService`, an implementation of ABC `DownloadQueueServiceBase`. It juggles multiple background download requests and provides facilities for interrogating and cancelling the requests. Access to a current or past download task is mediated via `DownloadJob` objects which report the current status of a job request ### The Queue Object A default download queue is located in `ApiDependencies.invoker.services.download_queue`. However, you can create additional instances if you need to isolate your queue from the main one. ``` queue = DownloadQueueService(event_bus=events) ``` `DownloadQueueService()` takes three optional arguments: | **Argument** | **Type** | **Default** | **Description** | |----------------|-----------------|---------------|-----------------| | `max_parallel_dl` | int | 5 | Maximum number of simultaneous downloads allowed | | `event_bus` | EventServiceBase | None | System-wide FastAPI event bus for reporting download events | | `requests_session` | requests.sessions.Session | None | An alternative requests Session object to use for the download | `max_parallel_dl` specifies how many download jobs are allowed to run simultaneously. Each will run in a different thread of execution. `event_bus` is an EventServiceBase, typically the one created at InvokeAI startup. If present, download events are periodically emitted on this bus to allow clients to follow download progress. `requests_session` is a url library requests Session object. It is used for testing. ### The Job object The queue operates on a series of download job objects. These objects specify the source and destination of the download, and keep track of the progress of the download. Two job types are defined. `DownloadJob` and `MultiFileDownloadJob`. The former is a pydantic object with the following fields: | **Field** | **Type** | **Default** | **Description** | |----------------|-----------------|---------------|-----------------| | _Fields passed in at job creation time_ | | `source` | AnyHttpUrl | | Where to download from | | `dest` | Path | | Where to download to | | `access_token` | str | | [optional] string containing authentication token for access | | `on_start` | Callable | | [optional] callback when the download starts | | `on_progress` | Callable | | [optional] callback called at intervals during download progress | | `on_complete` | Callable | | [optional] callback called after successful download completion | | `on_error` | Callable | | [optional] callback called after an error occurs | | `id` | int | auto assigned | Job ID, an integer >= 0 | | `priority` | int | 10 | Job priority. Lower priorities run before higher priorities | | | | _Fields updated over the course of the download task_ | `status` | DownloadJobStatus| | Status code | | `download_path` | Path | | Path to the location of the downloaded file | | `job_started` | float | | Timestamp for when the job started running | | `job_ended` | float | | Timestamp for when the job completed or errored out | | `job_sequence` | int | | A counter that is incremented each time a model is dequeued | | `bytes` | int | 0 | Bytes downloaded so far | | `total_bytes` | int | 0 | Total size of the file at the remote site | | `error_type` | str | | String version of the exception that caused an error during download | | `error` | str | | String version of the traceback associated with an error | | `cancelled` | bool | False | Set to true if the job was cancelled by the caller| When you create a job, you can assign it a `priority`. If multiple jobs are queued, the job with the lowest priority runs first. Every job has a `source` and a `dest`. `source` is a pydantic.networks AnyHttpUrl object. The `dest` is a path on the local filesystem that specifies the destination for the downloaded object. Its semantics are described below. When the job is submitted, it is assigned a numeric `id`. The id can then be used to fetch the job object from the queue. The `status` field is updated by the queue to indicate where the job is in its lifecycle. Values are defined in the string enum `DownloadJobStatus`, a symbol available from `invokeai.app.services.download_manager`. Possible values are: | **Value** | **String Value** | ** Description ** | |--------------|---------------------|-------------------| | `WAITING` | waiting | Job is on the queue but not yet running| | `RUNNING` | running | The download is started | | `COMPLETED` | completed | Job has finished its work without an error | | `ERROR` | error | Job encountered an error and will not run again| `job_started` and `job_ended` indicate when the job was started (using a python timestamp) and when it completed. In case of an error, the job's status will be set to `DownloadJobStatus.ERROR`, the text of the Exception that caused the error will be placed in the `error_type` field and the traceback that led to the error will be in `error`. A cancelled job will have status `DownloadJobStatus.ERROR` and an `error_type` field of "DownloadJobCancelledException". In addition, the job's `cancelled` property will be set to True. The `MultiFileDownloadJob` is used for diffusers model downloads, which contain multiple files and directories under a common root: | **Field** | **Type** | **Default** | **Description** | |----------------|-----------------|---------------|-----------------| | _Fields passed in at job creation time_ | | `download_parts` | Set[DownloadJob]| | Component download jobs | | `dest` | Path | | Where to download to | | `on_start` | Callable | | [optional] callback when the download starts | | `on_progress` | Callable | | [optional] callback called at intervals during download progress | | `on_complete` | Callable | | [optional] callback called after successful download completion | | `on_error` | Callable | | [optional] callback called after an error occurs | | `id` | int | auto assigned | Job ID, an integer >= 0 | | _Fields updated over the course of the download task_ | `status` | DownloadJobStatus| | Status code | | `download_path` | Path | | Path to the root of the downloaded files | | `bytes` | int | 0 | Bytes downloaded so far | | `total_bytes` | int | 0 | Total size of the file at the remote site | | `error_type` | str | | String version of the exception that caused an error during download | | `error` | str | | String version of the traceback associated with an error | | `cancelled` | bool | False | Set to true if the job was cancelled by the caller| Note that the MultiFileDownloadJob does not support the `priority`, `job_started`, `job_ended` or `content_type` attributes. You can get these from the individual download jobs in `download_parts`. ### Callbacks Download jobs can be associated with a series of callbacks, each with the signature `Callable[["DownloadJob"], None]`. The callbacks are assigned using optional arguments `on_start`, `on_progress`, `on_complete` and `on_error`. When the corresponding event occurs, the callback wil be invoked and passed the job. The callback will be run in a `try:` context in the same thread as the download job. Any exceptions that occur during execution of the callback will be caught and converted into a log error message, thereby allowing the download to continue. #### `TqdmProgress` The `invokeai.app.services.download.download_default` module defines a class named `TqdmProgress` which can be used as an `on_progress` handler to display a completion bar in the console. Use as follows: ``` from invokeai.app.services.download import TqdmProgress download_queue.download(source='http://some.server.somewhere/some_file', dest='/tmp/downloads', on_progress=TqdmProgress().update ) ``` ### Events If the queue was initialized with the InvokeAI event bus (the case when using `ApiDependencies.invoker.services.download_queue`), then download events will also be issued on the bus. The events are: * `download_started` -- This is issued when a job is taken off the queue and a request is made to the remote server for the URL headers, but before any data has been downloaded. The event payload will contain the keys `source` and `download_path`. The latter contains the path that the URL will be downloaded to. * `download_progress -- This is issued periodically as the download runs. The payload contains the keys `source`, `download_path`, `current_bytes` and `total_bytes`. The latter two fields can be used to display the percent complete. * `download_complete` -- This is issued when the download completes successfully. The payload contains the keys `source`, `download_path` and `total_bytes`. * `download_error` -- This is issued when the download stops because of an error condition. The payload contains the fields `error_type` and `error`. The former is the text representation of the exception, and the latter is a traceback showing where the error occurred. ### Job control To create a job call the queue's `download()` method. You can list all jobs using `list_jobs()`, fetch a single job by its with `id_to_job()`, cancel a running job with `cancel_job()`, cancel all running jobs with `cancel_all_jobs()`, and wait for all jobs to finish with `join()`. #### job = queue.download(source, dest, priority, access_token, on_start, on_progress, on_complete, on_cancelled, on_error) Create a new download job and put it on the queue, returning the DownloadJob object. #### multifile_job = queue.multifile_download(parts, dest, access_token, on_start, on_progress, on_complete, on_cancelled, on_error) This is similar to download(), but instead of taking a single source, it accepts a `parts` argument consisting of a list of `RemoteModelFile` objects. Each part corresponds to a URL/Path pair, where the URL is the location of the remote file, and the Path is the destination. `RemoteModelFile` can be imported from `invokeai.backend.model_manager.metadata`, and consists of a url/path pair. Note that the path *must* be relative. The method returns a `MultiFileDownloadJob`. ``` from invokeai.backend.model_manager.metadata import RemoteModelFile remote_file_1 = RemoteModelFile(url='http://www.foo.bar/my/pytorch_model.safetensors'', path='my_model/textencoder/pytorch_model.safetensors' ) remote_file_2 = RemoteModelFile(url='http://www.bar.baz/vae.ckpt', path='my_model/vae/diffusers_model.safetensors' ) job = queue.multifile_download(parts=[remote_file_1, remote_file_2], dest='/tmp/downloads', on_progress=TqdmProgress().update) queue.wait_for_job(job) print(f"The files were downloaded to {job.download_path}") ``` #### jobs = queue.list_jobs() Return a list of all active and inactive `DownloadJob`s. #### job = queue.id_to_job(id) Return the job corresponding to given ID. Return a list of all active and inactive `DownloadJob`s. #### queue.prune_jobs() Remove inactive (complete or errored) jobs from the listing returned by `list_jobs()`. #### queue.join() Block until all pending jobs have run to completion or errored out.