using System; using System.IO; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Wabbajack.Common; using Wabbajack.VirtualFileSystem; namespace Wabbajack.Lib { public abstract class ABatchProcessor : IBatchProcessor { public WorkQueue Queue { get; private set; } private bool _configured = false; public void Dispose() { Queue?.Shutdown(); } public Context VFS { get; private set; } protected StatusUpdateTracker UpdateTracker { get; private set; } private Subject _percentCompleted { get; } = new Subject(); /// /// The current progress of the entire processing system on a scale of 0.0 to 1.0 /// public IObservable PercentCompleted => _percentCompleted; private Subject _textStatus { get; } = new Subject(); /// /// The current status of the processor as a text string /// public IObservable TextStatus => _textStatus; private Subject _queueStatus { get; } = new Subject(); public IObservable QueueStatus => _queueStatus; private Subject _isRunning { get; } = new Subject(); public IObservable IsRunning => _isRunning; private Thread _processorThread { get; set; } protected void ConfigureProcessor(int steps, int threads = 0) { if (_configured) throw new InvalidDataException("Can't configure a processor twice"); Queue = new WorkQueue(threads); UpdateTracker = new StatusUpdateTracker(steps); Queue.Status.Subscribe(_queueStatus); UpdateTracker.Progress.Subscribe(_percentCompleted); UpdateTracker.StepName.Subscribe(_textStatus); VFS = new Context(Queue) { UpdateTracker = UpdateTracker }; _configured = true; } public static int RecommendQueueSize(string folder) { if (!Directory.Exists(folder)) Directory.CreateDirectory(folder); using (var queue = new WorkQueue()) { Utils.Log($"Benchmarking {folder}"); var raw_speed = Utils.TestDiskSpeed(queue, folder); Utils.Log($"{raw_speed.ToFileSizeString()}/sec for {folder}"); int speed = (int)(raw_speed / 1024 / 1024); // Less than 100MB/sec, stick with two threads. return speed < 100 ? 2 : Math.Min(Environment.ProcessorCount, speed / 100 * 2); } } protected abstract bool _Begin(); public Task Begin() { _isRunning.OnNext(true); var _tcs = new TaskCompletionSource(); if (_processorThread != null) { throw new InvalidDataException("Can't start the processor twice"); } _processorThread = new Thread(() => { try { _tcs.SetResult(_Begin()); } catch (Exception ex) { _tcs.SetException(ex); } finally { _isRunning.OnNext(false); } }); _processorThread.Priority = ThreadPriority.BelowNormal; _processorThread.Start(); return _tcs.Task; } public void Terminate() { Queue?.Shutdown(); _processorThread?.Abort(); _isRunning.OnNext(false); } } }