ABatchProcessor: Configured/Started state tracking made threadsafe

This commit is contained in:
Justin Swanson
2019-12-03 14:44:52 -06:00
parent 97ec9c9d0f
commit 095f43b67b

View File

@ -11,7 +11,6 @@ namespace Wabbajack.Lib
public abstract class ABatchProcessor : IBatchProcessor public abstract class ABatchProcessor : IBatchProcessor
{ {
public WorkQueue Queue { get; private set; } public WorkQueue Queue { get; private set; }
private bool _configured = false;
public void Dispose() public void Dispose()
{ {
@ -44,17 +43,21 @@ namespace Wabbajack.Lib
private Thread _processorThread { get; set; } private Thread _processorThread { get; set; }
private int _configured;
private int _started;
protected void ConfigureProcessor(int steps, int threads = 0) protected void ConfigureProcessor(int steps, int threads = 0)
{ {
if (_configured) if (1 == Interlocked.CompareExchange(ref _configured, 1, 1))
{
throw new InvalidDataException("Can't configure a processor twice"); throw new InvalidDataException("Can't configure a processor twice");
}
Queue = new WorkQueue(threads); Queue = new WorkQueue(threads);
UpdateTracker = new StatusUpdateTracker(steps); UpdateTracker = new StatusUpdateTracker(steps);
Queue.Status.Subscribe(_queueStatus); Queue.Status.Subscribe(_queueStatus);
UpdateTracker.Progress.Subscribe(_percentCompleted); UpdateTracker.Progress.Subscribe(_percentCompleted);
UpdateTracker.StepName.Subscribe(_textStatus); UpdateTracker.StepName.Subscribe(_textStatus);
VFS = new Context(Queue) { UpdateTracker = UpdateTracker }; VFS = new Context(Queue) { UpdateTracker = UpdateTracker };
_configured = true;
} }
public static int RecommendQueueSize(string folder) public static int RecommendQueueSize(string folder)
@ -77,13 +80,14 @@ namespace Wabbajack.Lib
protected abstract bool _Begin(); protected abstract bool _Begin();
public Task<bool> Begin() public Task<bool> Begin()
{ {
_isRunning.OnNext(true); if (1 == Interlocked.CompareExchange(ref _started, 1, 1))
var _tcs = new TaskCompletionSource<bool>();
if (_processorThread != null)
{ {
throw new InvalidDataException("Can't start the processor twice"); throw new InvalidDataException("Can't start the processor twice");
} }
_isRunning.OnNext(true);
var _tcs = new TaskCompletionSource<bool>();
_processorThread = new Thread(() => _processorThread = new Thread(() =>
{ {
try try