using System; using System.IO; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Wabbajack.Common; using Wabbajack.Common.StatusFeed; using Wabbajack.VirtualFileSystem; namespace Wabbajack.Lib { public abstract class ABatchProcessor : IBatchProcessor { public WorkQueue Queue { get; } = new WorkQueue(); public Context VFS { get; private set; } protected StatusUpdateTracker UpdateTracker { get; private set; } private Subject _percentCompleted { get; } = new Subject(); /// /// The current progress of the entire processing system on a scale of 0.0 to 1.0 /// public IObservable PercentCompleted => _percentCompleted; private Subject _textStatus { get; } = new Subject(); /// /// The current status of the processor as a text string /// public IObservable TextStatus => _textStatus; private Subject _queueStatus { get; } = new Subject(); public IObservable QueueStatus => _queueStatus; private Subject _logMessages { get; } = new Subject(); public IObservable LogMessages => _logMessages; private Subject _isRunning { get; } = new Subject(); public IObservable IsRunning => _isRunning; private int _configured; private int _started; private readonly CancellationTokenSource _cancel = new CancellationTokenSource(); private readonly CompositeDisposable _subs = new CompositeDisposable(); // WorkQueue settings public BehaviorSubject ManualCoreLimit = new BehaviorSubject(true); public BehaviorSubject MaxCores = new BehaviorSubject(byte.MaxValue); public BehaviorSubject TargetUsagePercent = new BehaviorSubject(1.0d); protected void ConfigureProcessor(int steps, IObservable numThreads = null) { if (1 == Interlocked.CompareExchange(ref _configured, 1, 1)) { throw new InvalidDataException("Can't configure a processor twice"); } Queue.SetActiveThreadsObservable(numThreads); UpdateTracker = new StatusUpdateTracker(steps); Queue.Status.Subscribe(_queueStatus) .DisposeWith(_subs); Queue.LogMessages.Subscribe(_logMessages) .DisposeWith(_subs); UpdateTracker.Progress.Subscribe(_percentCompleted); UpdateTracker.StepName.Subscribe(_textStatus); VFS = new Context(Queue) { UpdateTracker = UpdateTracker }; } /// /// Gets the recommended maximum number of threads that should be used for the current machine. /// This will either run a heavy processing job to do the measurement in the current folder, or refer to caches. /// /// Recommended maximum number of threads to use public async Task RecommendQueueSize() { const ulong GB = (1024 * 1024 * 1024); // Most of the heavy lifting is done on the scratch disk, so we'll use the value from that disk var memory = Utils.GetMemoryStatus(); // Assume roughly 2GB of ram needed to extract each 7zip archive, and then leave 2GB for the OS. If calculation is lower or equal to 1 GB, use 1GB var based_on_memory = Math.Max((memory.ullTotalPhys - (2 * GB)) / (2 * GB), 1); var scratch_size = await RecommendQueueSize(Directory.GetCurrentDirectory()); var result = Math.Min((int)based_on_memory, (int)scratch_size); Utils.Log($"Recommending a queue size of {result} based on disk performance, number of cores, and {((long)memory.ullTotalPhys).ToFileSizeString()} of system RAM"); return result; } /// /// Gets the recommended maximum number of threads that should be used for the current machine. /// This will either run a heavy processing job to do the measurement in the specified folder, or refer to caches. /// /// If the folder does not exist, it will be created, and not cleaned up afterwards. /// /// /// Recommended maximum number of threads to use public static async Task RecommendQueueSize(string folder) { if (!Directory.Exists(folder)) Directory.CreateDirectory(folder); using (var queue = new WorkQueue()) { Utils.Log($"Benchmarking {folder}"); var raw_speed = await Utils.TestDiskSpeed(queue, folder); Utils.Log($"{raw_speed.ToFileSizeString()}/sec for {folder}"); int speed = (int)(raw_speed / 1024 / 1024); // Less than 100MB/sec, stick with two threads. return speed < 100 ? 2 : Math.Min(Environment.ProcessorCount, speed / 100 * 2); } } /// /// Constructs an observable of the number of threads to be used /// /// Takes in a recommended amount (based off measuring the machine capabilities), and combines that with user preferences stored in subjects. /// /// As user preferences change, the number of threads gets recalculated in the resulting observable /// /// Maximum recommended number of threads /// Observable of number of threads to use based off recommendations and user preferences public IObservable ConstructDynamicNumThreads(int recommendedCount) { return Observable.CombineLatest( ManualCoreLimit, MaxCores, TargetUsagePercent, (manual, max, target) => CalculateThreadsToUse(recommendedCount, manual, max, target)); } /// /// Calculates the number of threads to use, based off recommended values and user preferences /// public static int CalculateThreadsToUse( int recommendedCount, bool manual, byte manualMax, double targetUsage) { if (manual) { if (recommendedCount > manualMax) { Utils.Log($"Only using {manualMax} due to user preferences."); } return Math.Max(1, Math.Min(manualMax, recommendedCount)); } else if (targetUsage < 1.0d && targetUsage >= 0d) { var ret = (int)Math.Ceiling(recommendedCount * targetUsage); return Math.Max(1, ret); } return recommendedCount; } protected abstract Task _Begin(CancellationToken cancel); public Task Begin() { if (1 == Interlocked.CompareExchange(ref _started, 1, 1)) { throw new InvalidDataException("Can't start the processor twice"); } return Task.Run(async () => { try { _isRunning.OnNext(true); return await _Begin(_cancel.Token); } finally { _isRunning.OnNext(false); } }); } public void Dispose() { _cancel.Cancel(); Queue?.Dispose(); _isRunning.OnNext(false); } public void Add(IDisposable disposable) => _subs.Add(disposable); } }