mirror of
https://github.com/wabbajack-tools/wabbajack.git
synced 2024-08-30 18:42:17 +00:00
WorkQueue IsWorkerThread fixes. PMap unit tests
This commit is contained in:
parent
11363847c9
commit
4e7829c7c1
@ -597,12 +597,15 @@ namespace Wabbajack.Common
|
|||||||
|
|
||||||
// To avoid thread starvation, we'll start to help out in the work queue
|
// To avoid thread starvation, we'll start to help out in the work queue
|
||||||
if (WorkQueue.WorkerThread)
|
if (WorkQueue.WorkerThread)
|
||||||
|
{
|
||||||
while (remainingTasks > 0)
|
while (remainingTasks > 0)
|
||||||
|
{
|
||||||
if (queue.Queue.TryTake(out var a, 500))
|
if (queue.Queue.TryTake(out var a, 500))
|
||||||
{
|
{
|
||||||
WorkQueue.AsyncLocalCurrentQueue.Value = WorkQueue.ThreadLocalCurrentQueue.Value;
|
|
||||||
await a();
|
await a();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return await Task.WhenAll(tasks);
|
return await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
@ -634,12 +637,15 @@ namespace Wabbajack.Common
|
|||||||
|
|
||||||
// To avoid thread starvation, we'll start to help out in the work queue
|
// To avoid thread starvation, we'll start to help out in the work queue
|
||||||
if (WorkQueue.WorkerThread)
|
if (WorkQueue.WorkerThread)
|
||||||
|
{
|
||||||
while (remainingTasks > 0)
|
while (remainingTasks > 0)
|
||||||
|
{
|
||||||
if (queue.Queue.TryTake(out var a, 500))
|
if (queue.Queue.TryTake(out var a, 500))
|
||||||
{
|
{
|
||||||
WorkQueue.AsyncLocalCurrentQueue.Value = WorkQueue.ThreadLocalCurrentQueue.Value;
|
|
||||||
await a();
|
await a();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return await Task.WhenAll(tasks);
|
return await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
@ -672,12 +678,15 @@ namespace Wabbajack.Common
|
|||||||
|
|
||||||
// To avoid thread starvation, we'll start to help out in the work queue
|
// To avoid thread starvation, we'll start to help out in the work queue
|
||||||
if (WorkQueue.WorkerThread)
|
if (WorkQueue.WorkerThread)
|
||||||
|
{
|
||||||
while (remainingTasks > 0)
|
while (remainingTasks > 0)
|
||||||
|
{
|
||||||
if (queue.Queue.TryTake(out var a, 500))
|
if (queue.Queue.TryTake(out var a, 500))
|
||||||
{
|
{
|
||||||
WorkQueue.AsyncLocalCurrentQueue.Value = WorkQueue.ThreadLocalCurrentQueue.Value;
|
|
||||||
await a();
|
await a();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await Task.WhenAll(tasks);
|
await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
|
@ -20,8 +20,8 @@ namespace Wabbajack.Common
|
|||||||
private static readonly AsyncLocal<int> _cpuId = new AsyncLocal<int>();
|
private static readonly AsyncLocal<int> _cpuId = new AsyncLocal<int>();
|
||||||
public int CpuId => _cpuId.Value;
|
public int CpuId => _cpuId.Value;
|
||||||
|
|
||||||
internal static bool WorkerThread => ThreadLocalCurrentQueue.Value != null;
|
public static bool WorkerThread => AsyncLocalCurrentQueue.Value != null;
|
||||||
internal static readonly ThreadLocal<WorkQueue> ThreadLocalCurrentQueue = new ThreadLocal<WorkQueue>();
|
public bool IsWorkerThread => WorkerThread;
|
||||||
internal static readonly AsyncLocal<WorkQueue> AsyncLocalCurrentQueue = new AsyncLocal<WorkQueue>();
|
internal static readonly AsyncLocal<WorkQueue> AsyncLocalCurrentQueue = new AsyncLocal<WorkQueue>();
|
||||||
|
|
||||||
private readonly Subject<CPUStatus> _Status = new Subject<CPUStatus>();
|
private readonly Subject<CPUStatus> _Status = new Subject<CPUStatus>();
|
||||||
@ -61,7 +61,6 @@ namespace Wabbajack.Common
|
|||||||
private async Task ThreadBody(int idx)
|
private async Task ThreadBody(int idx)
|
||||||
{
|
{
|
||||||
_cpuId.Value = idx;
|
_cpuId.Value = idx;
|
||||||
ThreadLocalCurrentQueue.Value = this;
|
|
||||||
AsyncLocalCurrentQueue.Value = this;
|
AsyncLocalCurrentQueue.Value = this;
|
||||||
|
|
||||||
try
|
try
|
||||||
|
238
Wabbajack.Test/PMapTests.cs
Normal file
238
Wabbajack.Test/PMapTests.cs
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||||
|
using Wabbajack.Common;
|
||||||
|
|
||||||
|
namespace Wabbajack.Test
|
||||||
|
{
|
||||||
|
[TestClass]
|
||||||
|
public class PMapTests
|
||||||
|
{
|
||||||
|
const int TypicalThreadCount = 2;
|
||||||
|
const int TypicalDelayMS = 50;
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Typical_Action()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var output = new List<int>();
|
||||||
|
await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
Thread.Sleep(TypicalDelayMS);
|
||||||
|
lock (output)
|
||||||
|
{
|
||||||
|
output.Add(item);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.IsTrue(input.SequenceEqual(output.OrderBy(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Typical_Func()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var results = await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
Thread.Sleep(TypicalDelayMS);
|
||||||
|
return item.ToString();
|
||||||
|
});
|
||||||
|
Assert.IsTrue(input.Select(i => i.ToString()).SequenceEqual(results));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Typical_Task()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var output = new List<int>();
|
||||||
|
await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
bool isWorker = WorkQueue.WorkerThread;
|
||||||
|
if (!isWorker)
|
||||||
|
{
|
||||||
|
int wer = 23;
|
||||||
|
wer++;
|
||||||
|
}
|
||||||
|
Assert.IsTrue(isWorker);
|
||||||
|
await Task.Delay(TypicalDelayMS);
|
||||||
|
lock (output)
|
||||||
|
{
|
||||||
|
output.Add(item);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.IsTrue(input.SequenceEqual(output.OrderBy(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Typical_TaskReturn()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var results = await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
await Task.Delay(TypicalDelayMS);
|
||||||
|
return item.ToString();
|
||||||
|
});
|
||||||
|
Assert.IsTrue(input.Select(i => i.ToString()).SequenceEqual(results));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task NestedAction()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var inputConstructedResults = input.SelectMany(i => Enumerable.Range(i * 100, TypicalThreadCount * 2));
|
||||||
|
var output = new List<int>();
|
||||||
|
await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
await Enumerable.Range(item * 100, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (subItem) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
Thread.Sleep(TypicalDelayMS);
|
||||||
|
lock (output)
|
||||||
|
{
|
||||||
|
output.Add(subItem);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Assert.IsTrue(inputConstructedResults.SequenceEqual(output.OrderBy(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Nested_Func()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var inputConstructedResults = input.SelectMany(i => Enumerable.Range(i * 100, TypicalThreadCount * 2));
|
||||||
|
var results = await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
return await Enumerable.Range(item * 100, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, (subItem) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
Thread.Sleep(TypicalDelayMS);
|
||||||
|
return subItem;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Assert.IsTrue(inputConstructedResults.SequenceEqual(results.SelectMany(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Nested_Task()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var inputConstructedResults = input.SelectMany(i => Enumerable.Range(i * 100, TypicalThreadCount * 2));
|
||||||
|
var output = new List<int>();
|
||||||
|
await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
bool isWorker = WorkQueue.WorkerThread;
|
||||||
|
if (!isWorker)
|
||||||
|
{
|
||||||
|
int wer = 23;
|
||||||
|
wer++;
|
||||||
|
}
|
||||||
|
Assert.IsTrue(isWorker);
|
||||||
|
await Enumerable.Range(item * 100, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (subItem) =>
|
||||||
|
{
|
||||||
|
bool isWorker2 = WorkQueue.WorkerThread;
|
||||||
|
if (!isWorker2)
|
||||||
|
{
|
||||||
|
int wer = 23;
|
||||||
|
wer++;
|
||||||
|
}
|
||||||
|
Assert.IsTrue(isWorker2);
|
||||||
|
await Task.Delay(TypicalDelayMS);
|
||||||
|
lock (output)
|
||||||
|
{
|
||||||
|
output.Add(subItem);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Assert.IsTrue(inputConstructedResults.SequenceEqual(output.OrderBy(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Nested_TaskReturn()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var inputConstructedResults = input.SelectMany(i => Enumerable.Range(i * 100, TypicalThreadCount * 2));
|
||||||
|
var results = await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
return await Enumerable.Range(item * 100, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (subItem) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
await Task.Delay(TypicalDelayMS);
|
||||||
|
return subItem;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Assert.IsTrue(inputConstructedResults.SequenceEqual(results.SelectMany(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task Nested_BackgroundThreadsInvolved()
|
||||||
|
{
|
||||||
|
using (var queue = new WorkQueue(TypicalThreadCount))
|
||||||
|
{
|
||||||
|
var input = Enumerable.Range(0, TypicalThreadCount * 2).ToArray();
|
||||||
|
var inputConstructedResults = input.SelectMany(i => Enumerable.Range(i * 100, TypicalThreadCount * 2));
|
||||||
|
var results = await Enumerable.Range(0, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (item) =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
return await Enumerable.Range(item * 100, TypicalThreadCount * 2)
|
||||||
|
.PMap(queue, async (subItem) =>
|
||||||
|
{
|
||||||
|
return await Task.Run(async () =>
|
||||||
|
{
|
||||||
|
Assert.IsTrue(WorkQueue.WorkerThread);
|
||||||
|
await Task.Delay(TypicalDelayMS);
|
||||||
|
return subItem;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Assert.IsTrue(inputConstructedResults.SequenceEqual(results.SelectMany(i => i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -110,6 +110,7 @@
|
|||||||
<Compile Include="MiscTests.cs" />
|
<Compile Include="MiscTests.cs" />
|
||||||
<Compile Include="MO2Tests.cs" />
|
<Compile Include="MO2Tests.cs" />
|
||||||
<Compile Include="ModlistMetadataTests.cs" />
|
<Compile Include="ModlistMetadataTests.cs" />
|
||||||
|
<Compile Include="PMapTests.cs" />
|
||||||
<Compile Include="RestartingDownloadsTests.cs" />
|
<Compile Include="RestartingDownloadsTests.cs" />
|
||||||
<Compile Include="SimpleHTTPServer.cs" />
|
<Compile Include="SimpleHTTPServer.cs" />
|
||||||
<Compile Include="TestUtils.cs" />
|
<Compile Include="TestUtils.cs" />
|
||||||
|
Loading…
Reference in New Issue
Block a user