Worker queue fixes (backport)

This commit is contained in:
Timothy Baldridge 2020-03-30 15:38:01 -06:00
parent 2ae65688f0
commit d3a9cb8666
4 changed files with 56 additions and 7 deletions

View File

@ -0,0 +1,46 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common
{
public class AsyncBlockingCollection<T> : IDisposable
{
private readonly ConcurrentQueue<T> _collection;
private bool isDisposed = false;
public AsyncBlockingCollection()
{
_collection = new ConcurrentQueue<T>();
}
public void Add(T val)
{
_collection.Enqueue(val);
}
public async ValueTask<(bool found, T val)> TryTake(TimeSpan timeout, CancellationToken token)
{
var startTime = DateTime.Now;
while (true)
{
if (_collection.TryDequeue(out T result))
{
return (true, result);
}
if (DateTime.Now - startTime > timeout || token.IsCancellationRequested || isDisposed)
return (false, default);
await Task.Delay(100);
}
}
public void Dispose()
{
isDisposed = true;
}
}
}

View File

@ -759,7 +759,8 @@ namespace Wabbajack.Common
{
while (remainingTasks > 0)
{
if (queue.Queue.TryTake(out var a, 500))
var (got, a) = await queue.Queue.TryTake(TimeSpan.FromMilliseconds(200), CancellationToken.None);
if (got)
{
await a();
}
@ -799,7 +800,8 @@ namespace Wabbajack.Common
{
while (remainingTasks > 0)
{
if (queue.Queue.TryTake(out var a, 500))
var (got, a) = await queue.Queue.TryTake(TimeSpan.FromMilliseconds(200), CancellationToken.None);
if (got)
{
await a();
}
@ -840,7 +842,8 @@ namespace Wabbajack.Common
{
while (remainingTasks > 0)
{
if (queue.Queue.TryTake(out var a, 500))
var (got, a) = await queue.Queue.TryTake(TimeSpan.FromMilliseconds(200), CancellationToken.None);
if (got)
{
await a();
}

View File

@ -16,7 +16,7 @@ namespace Wabbajack.Common
{
public class WorkQueue : IDisposable
{
internal BlockingCollection<Func<Task>> Queue = new BlockingCollection<Func<Task>>(new ConcurrentStack<Func<Task>>());
internal AsyncBlockingCollection<Func<Task>> Queue = new AsyncBlockingCollection<Func<Task>>();
public const int UnassignedCpuId = 0;
@ -50,7 +50,7 @@ namespace Wabbajack.Common
private readonly Subject<IObservable<int>> _activeNumThreadsObservable = new Subject<IObservable<int>>();
internal const int PollMS = 200;
public TimeSpan PollMS = TimeSpan.FromMilliseconds(200);
/// <summary>
/// Creates a WorkQueue with the given number of threads
@ -124,7 +124,7 @@ namespace Wabbajack.Common
bool got;
try
{
got = Queue.TryTake(out f, PollMS, _shutdown.Token);
(got, f) = await Queue.TryTake(PollMS, _shutdown.Token);
}
catch (Exception)
{

View File

@ -18,7 +18,7 @@ namespace Wabbajack.Test
const int Large = 8;
const int Medium = 6;
const int Small = 4;
public int PollMS => WorkQueue.PollMS * 5;
public TimeSpan PollMS => TimeSpan.FromSeconds(1);
[TestMethod]
public void DynamicNumThreads_Typical()