using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; namespace Wabbajack.Common { public class WorkQueue : IDisposable { internal BlockingCollection Queue = new BlockingCollection(new ConcurrentStack()); [ThreadStatic] private static int CpuId; internal static bool WorkerThread => CurrentQueue != null; [ThreadStatic] internal static WorkQueue CurrentQueue; private static readonly Subject _Status = new Subject(); public IObservable Status => _Status; public static List Threads { get; private set; } public WorkQueue(int threadCount = 0) { StartThreads(threadCount == 0 ? Environment.ProcessorCount : threadCount); } private void StartThreads(int threadCount) { ThreadCount = threadCount; Threads = Enumerable.Range(0, threadCount) .Select(idx => { var thread = new Thread(() => ThreadBody(idx)); thread.Priority = ThreadPriority.BelowNormal; thread.IsBackground = true; thread.Name = string.Format("Wabbajack_Worker_{0}", idx); thread.Start(); return thread; }).ToList(); } public int ThreadCount { get; private set; } private void ThreadBody(int idx) { CpuId = idx; CurrentQueue = this; while (true) { Report("Waiting", 0); var f = Queue.Take(); f(); } } public void Report(string msg, int progress) { _Status.OnNext( new CPUStatus { Progress = progress, Msg = msg, ID = CpuId }); } public void QueueTask(Action a) { Queue.Add(a); } public void Shutdown() { Threads.Do(th => th.Abort()); } public void Dispose() { Shutdown(); Queue?.Dispose(); } } public class CPUStatus { public int Progress { get; internal set; } public string Msg { get; internal set; } public int ID { get; internal set; } } }