Merge pull request #9 from halgari/issue-8

issue-8 - Fix PMap thread starvation
This commit is contained in:
Timothy Baldridge 2019-08-10 09:22:27 -06:00 committed by GitHub
commit 4a6b27eaea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 23 deletions

View File

@ -169,6 +169,8 @@ namespace Wabbajack.Common
WorkQueue.MaxQueueSize = colllst.Count;
WorkQueue.CurrentQueueSize = 0;
int remaining_tasks = colllst.Count;
var tasks = coll.Select(i =>
{
TaskCompletionSource<TR> tc = new TaskCompletionSource<TR>();
@ -183,11 +185,22 @@ namespace Wabbajack.Common
tc.SetException(ex);
}
Interlocked.Increment(ref WorkQueue.CurrentQueueSize);
Interlocked.Decrement(ref remaining_tasks);
WorkQueue.ReportNow();
});
return tc.Task;
}).ToList();
// To avoid thread starvation, we'll start to help out in the work queue
if (WorkQueue.WorkerThread)
while(remaining_tasks > 0)
{
if(WorkQueue.Queue.TryTake(out var a, 500))
{
a();
}
}
return tasks.Select(t =>
{
t.Wait();
@ -199,29 +212,11 @@ namespace Wabbajack.Common
public static void PMap<TI>(this IEnumerable<TI> coll, Action<TI> f)
{
var tasks = coll.Select(i =>
coll.PMap<TI, bool>(i =>
{
TaskCompletionSource<bool> tc = new TaskCompletionSource<bool>();
WorkQueue.QueueTask(() =>
{
try
{
f(i);
tc.SetResult(true);
}
catch (Exception ex)
{
tc.SetException(ex);
}
});
return tc.Task;
}).ToList();
tasks.Select(t =>
{
t.Wait();
return t.Result;
}).ToList();
f(i);
return false;
});
return;
}

View File

@ -10,11 +10,14 @@ namespace Wabbajack.Common
{
public class WorkQueue
{
private static BlockingCollection<Action> Queue = new BlockingCollection<Action>();
internal static BlockingCollection<Action> Queue = new BlockingCollection<Action>();
[ThreadStatic]
private static int CpuId;
[ThreadStatic]
internal static bool WorkerThread;
public static void Init(Action<int, string, int> report_function, Action<int, int> report_queue_size)
{
ReportFunction = report_function;
@ -40,6 +43,7 @@ namespace Wabbajack.Common
private static void ThreadBody(int idx)
{
CpuId = idx;
WorkerThread = true;
while(true)
{