From d3a9cb866618e06e60ad80abdecd936683d16c98 Mon Sep 17 00:00:00 2001 From: Timothy Baldridge Date: Mon, 30 Mar 2020 15:38:01 -0600 Subject: [PATCH] Worker queue fixes (backport) --- Wabbajack.Common/AsyncBlockingConnection.cs | 46 +++++++++++++++++++++ Wabbajack.Common/Utils.cs | 9 ++-- Wabbajack.Common/WorkQueue.cs | 6 +-- Wabbajack.Test/WorkQueueTests.cs | 2 +- 4 files changed, 56 insertions(+), 7 deletions(-) create mode 100644 Wabbajack.Common/AsyncBlockingConnection.cs diff --git a/Wabbajack.Common/AsyncBlockingConnection.cs b/Wabbajack.Common/AsyncBlockingConnection.cs new file mode 100644 index 00000000..3d7dd04a --- /dev/null +++ b/Wabbajack.Common/AsyncBlockingConnection.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Wabbajack.Common +{ + public class AsyncBlockingCollection : IDisposable + { + private readonly ConcurrentQueue _collection; + private bool isDisposed = false; + + public AsyncBlockingCollection() + { + _collection = new ConcurrentQueue(); + } + + public void Add(T val) + { + _collection.Enqueue(val); + } + + public async ValueTask<(bool found, T val)> TryTake(TimeSpan timeout, CancellationToken token) + { + var startTime = DateTime.Now; + while (true) + { + if (_collection.TryDequeue(out T result)) + { + return (true, result); + } + + if (DateTime.Now - startTime > timeout || token.IsCancellationRequested || isDisposed) + return (false, default); + await Task.Delay(100); + } + } + + public void Dispose() + { + isDisposed = true; + } + } +} diff --git a/Wabbajack.Common/Utils.cs b/Wabbajack.Common/Utils.cs index 95bf7a85..2494701a 100644 --- a/Wabbajack.Common/Utils.cs +++ b/Wabbajack.Common/Utils.cs @@ -759,7 +759,8 @@ namespace Wabbajack.Common { while (remainingTasks > 0) { - if (queue.Queue.TryTake(out var a, 500)) + var (got, a) = await queue.Queue.TryTake(TimeSpan.FromMilliseconds(200), CancellationToken.None); + if (got) { await a(); } @@ -799,7 +800,8 @@ namespace Wabbajack.Common { while (remainingTasks > 0) { - if (queue.Queue.TryTake(out var a, 500)) + var (got, a) = await queue.Queue.TryTake(TimeSpan.FromMilliseconds(200), CancellationToken.None); + if (got) { await a(); } @@ -840,7 +842,8 @@ namespace Wabbajack.Common { while (remainingTasks > 0) { - if (queue.Queue.TryTake(out var a, 500)) + var (got, a) = await queue.Queue.TryTake(TimeSpan.FromMilliseconds(200), CancellationToken.None); + if (got) { await a(); } diff --git a/Wabbajack.Common/WorkQueue.cs b/Wabbajack.Common/WorkQueue.cs index 1ae1335e..d5f30055 100644 --- a/Wabbajack.Common/WorkQueue.cs +++ b/Wabbajack.Common/WorkQueue.cs @@ -16,7 +16,7 @@ namespace Wabbajack.Common { public class WorkQueue : IDisposable { - internal BlockingCollection> Queue = new BlockingCollection>(new ConcurrentStack>()); + internal AsyncBlockingCollection> Queue = new AsyncBlockingCollection>(); public const int UnassignedCpuId = 0; @@ -50,7 +50,7 @@ namespace Wabbajack.Common private readonly Subject> _activeNumThreadsObservable = new Subject>(); - internal const int PollMS = 200; + public TimeSpan PollMS = TimeSpan.FromMilliseconds(200); /// /// Creates a WorkQueue with the given number of threads @@ -124,7 +124,7 @@ namespace Wabbajack.Common bool got; try { - got = Queue.TryTake(out f, PollMS, _shutdown.Token); + (got, f) = await Queue.TryTake(PollMS, _shutdown.Token); } catch (Exception) { diff --git a/Wabbajack.Test/WorkQueueTests.cs b/Wabbajack.Test/WorkQueueTests.cs index ff0c262c..1f50b2fe 100644 --- a/Wabbajack.Test/WorkQueueTests.cs +++ b/Wabbajack.Test/WorkQueueTests.cs @@ -18,7 +18,7 @@ namespace Wabbajack.Test const int Large = 8; const int Medium = 6; const int Small = 4; - public int PollMS => WorkQueue.PollMS * 5; + public TimeSpan PollMS => TimeSpan.FromSeconds(1); [TestMethod] public void DynamicNumThreads_Typical()