using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Wabbajack.Common.StatusFeed; namespace Wabbajack.Common { public class WorkQueue : IDisposable { internal BlockingCollection> Queue = new BlockingCollection>(new ConcurrentStack>()); public const int UnassignedCpuId = 0; private static readonly AsyncLocal _cpuId = new AsyncLocal(); public int CpuId => _cpuId.Value; internal static bool WorkerThread => ThreadLocalCurrentQueue.Value != null; internal static readonly ThreadLocal ThreadLocalCurrentQueue = new ThreadLocal(); internal static readonly AsyncLocal AsyncLocalCurrentQueue = new AsyncLocal(); private readonly Subject _Status = new Subject(); public IObservable Status => _Status; public List Threads { get; private set; } private CancellationTokenSource _cancel = new CancellationTokenSource(); // This is currently a lie, as it wires to the Utils singleton stream This is still good to have, // so that logic related to a single WorkQueue can subscribe to this dummy member so that If/when we // implement log messages in a non-singleton fashion, they will already be wired up properly. public IObservable LogMessages => Utils.LogMessages; public WorkQueue(int threadCount = 0) { StartThreads(threadCount == 0 ? Environment.ProcessorCount : threadCount); } private void StartThreads(int threadCount) { ThreadCount = threadCount; Threads = Enumerable.Range(1, threadCount) .Select(idx => { var thread = new Thread(() => ThreadBody(idx).Wait()); thread.Priority = ThreadPriority.BelowNormal; thread.IsBackground = true; thread.Name = string.Format("Wabbajack_Worker_{0}", idx); thread.Start(); return thread; }).ToList(); } public int ThreadCount { get; private set; } private async Task ThreadBody(int idx) { _cpuId.Value = idx; ThreadLocalCurrentQueue.Value = this; AsyncLocalCurrentQueue.Value = this; try { while (true) { Report("Waiting", 0, false); if (_cancel.IsCancellationRequested) return; Func f; try { f = Queue.Take(_cancel.Token); } catch (Exception) { throw new OperationCanceledException(); } await f(); } } catch (OperationCanceledException) { } } public void Report(string msg, int progress, bool isWorking = true) { _Status.OnNext( new CPUStatus { Progress = progress, ProgressPercent = progress / 100f, Msg = msg, ID = _cpuId.Value, IsWorking = isWorking }); } public void QueueTask(Func a) { Queue.Add(a); } public void Dispose() { _cancel.Cancel(); Queue?.Dispose(); } } public class CPUStatus { public int Progress { get; internal set; } public float ProgressPercent { get; internal set; } public string Msg { get; internal set; } public int ID { get; internal set; } public bool IsWorking { get; internal set; } } }