using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using ReactiveUI; using Wabbajack.Common; using Wabbajack.VirtualFileSystem; namespace Wabbajack.Lib { public abstract class ABatchProcessor : IBatchProcessor { public WorkQueue Queue { get; private set; } private bool _configured = false; public void Dispose() { Queue?.Shutdown(); } public Context VFS { get; private set; } protected StatusUpdateTracker UpdateTracker { get; private set; } private Subject _percentCompleted { get; set; } = new Subject(); /// /// The current progress of the entire processing system on a scale of 0.0 to 1.0 /// public IObservable PercentCompleted { get; } private Subject _textStatus { get; set; } = new Subject(); /// /// The current status of the processor as a text string /// public IObservable TextStatus { get; } private Subject _QueueStatus { get; set; } = new Subject(); public IObservable QueueStatus { get; } private Subject _IsRunning { get; set; } = new Subject(); public IObservable IsRunning { get; } private Thread _processorThread { get; set; } protected ABatchProcessor() { QueueStatus = _QueueStatus; } protected void ConfigureProcessor(int steps, int threads = 0) { if (_configured) throw new InvalidDataException("Can't configure a processor twice"); Queue = new WorkQueue(threads); UpdateTracker = new StatusUpdateTracker(steps); Queue.Status.Subscribe(_QueueStatus); UpdateTracker.Progress.Subscribe(_percentCompleted); UpdateTracker.StepName.Subscribe(_textStatus); VFS = new Context(Queue) { UpdateTracker = UpdateTracker }; _configured = true; } protected abstract bool _Begin(); public Task Begin() { _IsRunning.OnNext(true); var _tcs = new TaskCompletionSource(); if (_processorThread != null) { throw new InvalidDataException("Can't start the processor twice"); } _processorThread = new Thread(() => { try { _tcs.SetResult(_Begin()); } catch (Exception ex) { _tcs.SetException(ex); } finally { _IsRunning.OnNext(false); } }); _processorThread.Priority = ThreadPriority.BelowNormal; _processorThread.Start(); return _tcs.Task; } public void Terminate() { Queue?.Shutdown(); _processorThread?.Abort(); _IsRunning.OnNext(false); } } }