remove download queue change_priority() calls completely

This commit is contained in:
Lincoln Stein
2023-10-12 14:03:28 -04:00
parent a51b165a40
commit 0f9c676fcb
7 changed files with 23 additions and 68 deletions

View File

@ -859,16 +859,6 @@ restarted and pick up where it left off using `queue.start_job()`.
This will cancel the job if possible and clean up temporary files and This will cancel the job if possible and clean up temporary files and
other resources that it might have been using. other resources that it might have been using.
#### queue.change_priority(job, delta)
This will increase (positive delta) or decrease (negative delta) the
priority of the job on the queue. Lower priority jobs will be taken off
the queue and run before higher priority jobs.
Note that this cannot be used to change the job's priority once it has
begun running. However, you can still pause the job and restart it
later.
#### queue.start_all_jobs(), queue.pause_all_jobs(), queue.cancel_all_jobs() #### queue.start_all_jobs(), queue.pause_all_jobs(), queue.cancel_all_jobs()
This will start/pause/cancel all jobs that have been submitted to the This will start/pause/cancel all jobs that have been submitted to the

View File

@ -56,7 +56,6 @@ class JobControlOperation(str, Enum):
START = "Start" START = "Start"
PAUSE = "Pause" PAUSE = "Pause"
CANCEL = "Cancel" CANCEL = "Cancel"
CHANGE_PRIORITY = "Change Priority"
@models_router.get( @models_router.get(
@ -478,11 +477,8 @@ async def control_download_jobs(
elif operation == JobControlOperation.CANCEL: elif operation == JobControlOperation.CANCEL:
job_mgr.cancel_job(job_id) job_mgr.cancel_job(job_id)
elif operation == JobControlOperation.CHANGE_PRIORITY and priority_delta is not None:
job_mgr.change_job_priority(job_id, priority_delta)
else: else:
raise ValueError("priority_delta must be set for the CHANGE_PRIORITY operation") raise ValueError("unknown operation {operation}")
bytes = 0 bytes = 0
total_bytes = 0 total_bytes = 0
if isinstance(job, DownloadJobRemoteSource): if isinstance(job, DownloadJobRemoteSource):

View File

@ -123,20 +123,6 @@ class DownloadQueueServiceBase(ABC):
"""Cancel the job, clearing partial downloads and putting it into ERROR state.""" """Cancel the job, clearing partial downloads and putting it into ERROR state."""
pass pass
@abstractmethod
def change_priority(self, job: DownloadJobBase, delta: int):
"""
Change the job's priority.
:param job: Job to apply change to
:param delta: Value to increment or decrement priority.
Lower values are higher priority. The default starting value is 10.
Thus to make this a really high priority job:
job.change_priority(-10).
"""
pass
@abstractmethod @abstractmethod
def join(self): def join(self):
"""Wait until all jobs are off the queue.""" """Wait until all jobs are off the queue."""
@ -203,7 +189,7 @@ class DownloadQueueService(DownloadQueueServiceBase):
def cancel_all_jobs(self): # noqa D102 def cancel_all_jobs(self): # noqa D102
return self._queue.cancel_all_jobs() return self._queue.cancel_all_jobs()
def prune_jobs(self, job: DownloadJobBase): # noqa D102 def prune_jobs(self): # noqa D102
return self._queue.prune_jobs() return self._queue.prune_jobs()
def start_job(self, job: DownloadJobBase): # noqa D102 def start_job(self, job: DownloadJobBase): # noqa D102
@ -215,8 +201,5 @@ class DownloadQueueService(DownloadQueueServiceBase):
def cancel_job(self, job: DownloadJobBase): # noqa D102 def cancel_job(self, job: DownloadJobBase): # noqa D102
return self._queue.cancel_job(job) return self._queue.cancel_job(job)
def change_priority(self, job: DownloadJobBase, delta: int): # noqa D102
return self._queue.change_priority(job, delta)
def join(self): # noqa D102 def join(self): # noqa D102
return self._queue.join() return self._queue.join()

View File

@ -51,7 +51,7 @@ class TqdmProgress(object):
if not isinstance(job, DownloadJobRemoteSource): if not isinstance(job, DownloadJobRemoteSource):
return return
job_id = job.id job_id = job.id
if job.status == "running": if job.status == "running" and job.total_bytes > 0: # job starts running before total bytes known
if job_id not in self._bars: if job_id not in self._bars:
dest = Path(job.destination).name dest = Path(job.destination).name
self._bars[job_id] = tqdm( self._bars[job_id] = tqdm(

View File

@ -235,20 +235,6 @@ class DownloadQueueBase(ABC):
""" """
pass pass
@abstractmethod
def change_priority(self, job: DownloadJobBase, delta: int):
"""
Change the job's priority.
:param job: Job to change
:param delta: Value to increment or decrement priority.
Lower values are higher priority. The default starting value is 10.
Thus to make this a really high priority job:
job.change_priority(-10).
"""
pass
@abstractmethod @abstractmethod
def join(self): def join(self):
""" """

View File

@ -154,15 +154,6 @@ class DownloadQueue(DownloadQueueBase):
"""List all the jobs.""" """List all the jobs."""
return list(self._jobs.values()) return list(self._jobs.values())
def change_priority(self, job: DownloadJobBase, delta: int):
"""Change the priority of a job. Smaller priorities run first."""
with self._lock:
try:
assert isinstance(self._jobs[job.id], DownloadJobBase)
job.priority += delta
except (AssertionError, KeyError) as excp:
raise UnknownJobIDException("Unrecognized job") from excp
def prune_jobs(self): def prune_jobs(self):
"""Prune completed and errored queue items from the job list.""" """Prune completed and errored queue items from the job list."""
with self._lock: with self._lock:
@ -235,7 +226,7 @@ class DownloadQueue(DownloadQueueBase):
"""Pause all running jobs.""" """Pause all running jobs."""
with self._lock: with self._lock:
for job in self._jobs.values(): for job in self._jobs.values():
if job.status == DownloadJobStatus.RUNNING: if not self._in_terminal_state(job):
self.pause_job(job) self.pause_job(job)
def cancel_all_jobs(self, preserve_partial: bool = False): def cancel_all_jobs(self, preserve_partial: bool = False):
@ -431,7 +422,10 @@ class DownloadQueue(DownloadQueueBase):
if not job.preserve_partial_downloads: if not job.preserve_partial_downloads:
self._logger.warning(f"Cleaning up leftover files from cancelled download job {job.destination}") self._logger.warning(f"Cleaning up leftover files from cancelled download job {job.destination}")
dest = Path(job.destination) dest = Path(job.destination)
if dest.is_file(): try:
dest.unlink() if dest.is_file():
elif dest.is_dir(): dest.unlink()
shutil.rmtree(dest.as_posix(), ignore_errors=True) elif dest.is_dir():
shutil.rmtree(dest.as_posix(), ignore_errors=True)
except OSError as excp:
self._logger(excp)

View File

@ -172,20 +172,26 @@ def test_queue_priority():
) )
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
job1 = queue.create_download_job(source="http://www.civitai.com/models/12345", destdir=tmpdir, start=False) job1 = queue.create_download_job(
job2 = queue.create_download_job(source="http://www.civitai.com/models/9999", destdir=tmpdir, start=False) priority=0, source="http://www.civitai.com/models/12345", destdir=tmpdir, start=False
)
job2 = queue.create_download_job(
priority=10, source="http://www.civitai.com/models/9999", destdir=tmpdir, start=False
)
queue.change_priority(job1, -10) # make id1 run first
assert job1 < job2 assert job1 < job2
queue.start_all_jobs() queue.start_all_jobs()
queue.join() queue.join()
assert job1.job_sequence < job2.job_sequence assert job1.job_sequence < job2.job_sequence
job1 = queue.create_download_job(source="http://www.civitai.com/models/12345", destdir=tmpdir, start=False) job1 = queue.create_download_job(
job2 = queue.create_download_job(source="http://www.civitai.com/models/9999", destdir=tmpdir, start=False) priority=10, source="http://www.civitai.com/models/12345", destdir=tmpdir, start=False
)
job2 = queue.create_download_job(
priority=0, source="http://www.civitai.com/models/9999", destdir=tmpdir, start=False
)
queue.change_priority(job2, -10) # make id2 run first
assert job2 < job1 assert job2 < job1
queue.start_all_jobs() queue.start_all_jobs()