From 9051d2882202f6ba77161f3fdf0149ac55aad3b2 Mon Sep 17 00:00:00 2001 From: Justin Swanson Date: Mon, 4 Nov 2019 22:14:31 -0600 Subject: [PATCH] WorkQueue and Utils Rx instead of callbacks Swapped the callback registration systems for Rx subjects exposing observables --- VirtualFileSystem.Test/Program.cs | 7 ++-- .../VirtualFileSystem.Test.csproj | 5 +++ Wabbajack.Common/Utils.cs | 23 ++++------ Wabbajack.Common/Wabbajack.Common.csproj | 3 ++ Wabbajack.Common/WorkQueue.cs | 42 +++++++++++++------ Wabbajack.Test/ACompilerTest.cs | 4 +- .../ContentRightsManagementTests.cs | 1 - Wabbajack.Test/EndToEndTests.cs | 4 +- Wabbajack/App.xaml.cs | 1 - Wabbajack/Util/CPUStatus.cs | 15 ------- Wabbajack/View Models/MainWindowVM.cs | 42 ++++++++----------- Wabbajack/Wabbajack.csproj | 1 - 12 files changed, 67 insertions(+), 81 deletions(-) delete mode 100644 Wabbajack/Util/CPUStatus.cs diff --git a/VirtualFileSystem.Test/Program.cs b/VirtualFileSystem.Test/Program.cs index 29335225..f8feb7a0 100644 --- a/VirtualFileSystem.Test/Program.cs +++ b/VirtualFileSystem.Test/Program.cs @@ -1,6 +1,7 @@ using System; using Wabbajack.Common; using Microsoft.Win32; +using System.Reactive; namespace VirtualFileSystem.Test { @@ -10,10 +11,8 @@ namespace VirtualFileSystem.Test { var result = Registry.LocalMachine.OpenSubKey(@"SOFTWARE\7-zip\"); - Utils.SetLoggerFn(s => Console.WriteLine(s)); - Utils.SetStatusFn((s, i) => Console.Write(s + "\r")); - WorkQueue.Init((a, b, c) => { }, - (a, b) => { }); + Utils.LogMessages.Subscribe(s => Console.WriteLine(s)); + Utils.StatusUpdates.Subscribe((i) => Console.Write(i.Message + "\r")); VFS.VirtualFileSystem.VFS.AddRoot(@"D:\tmp\archivetests"); } } diff --git a/VirtualFileSystem.Test/VirtualFileSystem.Test.csproj b/VirtualFileSystem.Test/VirtualFileSystem.Test.csproj index 221d41e6..486ec68d 100644 --- a/VirtualFileSystem.Test/VirtualFileSystem.Test.csproj +++ b/VirtualFileSystem.Test/VirtualFileSystem.Test.csproj @@ -101,5 +101,10 @@ Wabbajack.Common + + + 4.2.0 + + \ No newline at end of file diff --git a/Wabbajack.Common/Utils.cs b/Wabbajack.Common/Utils.cs index 2c9db0e9..77ff661b 100644 --- a/Wabbajack.Common/Utils.cs +++ b/Wabbajack.Common/Utils.cs @@ -6,6 +6,7 @@ using System.IO; using System.Linq; using System.Net.Configuration; using System.Net.Http; +using System.Reactive.Subjects; using System.Reflection; using System.Security.Cryptography; using System.Text; @@ -43,22 +44,15 @@ namespace Wabbajack.Common File.Delete(LogFile); } - private static Action _loggerFn; - private static Action _statusFn; + private static readonly Subject _loggerSubj = new Subject(); + public static IObservable LogMessages => _loggerSubj; + private static readonly Subject<(string Message, int Progress)> _statusSubj = new Subject<(string Message, int Progress)>(); + public static IObservable<(string Message, int Progress)> StatusUpdates => _statusSubj; private static readonly string[] Suffix = {"B", "KB", "MB", "GB", "TB", "PB", "EB"}; // Longs run out around EB - public static void SetLoggerFn(Action f) - { - _loggerFn = f; - } - - public static void SetStatusFn(Action f) - { - _statusFn = f; - } - private static object _lock = new object(); + private static DateTime _startTime; public static void Log(string msg) @@ -69,7 +63,7 @@ namespace Wabbajack.Common File.AppendAllText(LogFile, msg + "\r\n"); } - _loggerFn?.Invoke(msg); + _loggerSubj.OnNext(msg); } public static void LogToFile(string msg) @@ -87,10 +81,9 @@ namespace Wabbajack.Common if (WorkQueue.CustomReportFn != null) WorkQueue.CustomReportFn(progress, msg); else - _statusFn?.Invoke(msg, progress); + _statusSubj.OnNext((msg, progress)); } - /// /// MurMur3 hashes the file pointed to by this string /// diff --git a/Wabbajack.Common/Wabbajack.Common.csproj b/Wabbajack.Common/Wabbajack.Common.csproj index 939dc843..c22e7ddc 100644 --- a/Wabbajack.Common/Wabbajack.Common.csproj +++ b/Wabbajack.Common/Wabbajack.Common.csproj @@ -143,6 +143,9 @@ 2.0.0 + + 4.2.0 + 8.0.0 diff --git a/Wabbajack.Common/WorkQueue.cs b/Wabbajack.Common/WorkQueue.cs index 21ddd74c..c0dbb915 100644 --- a/Wabbajack.Common/WorkQueue.cs +++ b/Wabbajack.Common/WorkQueue.cs @@ -2,6 +2,10 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading; namespace Wabbajack.Common @@ -19,22 +23,17 @@ namespace Wabbajack.Common public static int MaxQueueSize; public static int CurrentQueueSize; - private static bool _inited; - public static Action ReportFunction { get; private set; } - public static Action ReportQueueSize { get; private set; } - public static int ThreadCount { get; private set; } + private readonly static Subject _Status = new Subject(); + public static IObservable Status => _Status; + private readonly static Subject<(int Current, int Max)> _QueueSize = new Subject<(int Current, int Max)>(); + public static IObservable<(int Current, int Max)> QueueSize => _QueueSize; + public static int ThreadCount { get; } = Environment.ProcessorCount; public static List Threads { get; private set; } - public static void Init(Action report_function, Action report_queue_size) + static WorkQueue() { - ReportFunction = report_function; - ReportQueueSize = report_queue_size; - ThreadCount = Environment.ProcessorCount; - if (_inited) return; StartThreads(); - _inited = true; - } private static void StartThreads() @@ -67,9 +66,19 @@ namespace Wabbajack.Common public static void Report(string msg, int progress) { if (CustomReportFn != null) + { CustomReportFn(progress, msg); + } else - ReportFunction(CpuId, msg, progress); + { + _Status.OnNext( + new CPUStatus + { + Progress = progress, + Msg = msg, + ID = CpuId + }); + } } public static void QueueTask(Action a) @@ -79,7 +88,14 @@ namespace Wabbajack.Common internal static void ReportNow() { - ReportQueueSize(MaxQueueSize, CurrentQueueSize); + _QueueSize.OnNext((MaxQueueSize, CurrentQueueSize)); } } + + public class CPUStatus + { + public int Progress { get; internal set; } + public string Msg { get; internal set; } + public int ID { get; internal set; } + } } \ No newline at end of file diff --git a/Wabbajack.Test/ACompilerTest.cs b/Wabbajack.Test/ACompilerTest.cs index 78dd2fe9..33f42e0f 100644 --- a/Wabbajack.Test/ACompilerTest.cs +++ b/Wabbajack.Test/ACompilerTest.cs @@ -23,9 +23,7 @@ namespace Wabbajack.Test utils = new TestUtils(); utils.GameName = "Skyrim Special Edition"; - Utils.SetStatusFn((f, idx) => { }); - Utils.SetLoggerFn(f => TestContext.WriteLine(f)); - WorkQueue.Init((x, y, z) => { }, (min, max) => { }); + Utils.LogMessages.Subscribe(f => TestContext.WriteLine(f)); } diff --git a/Wabbajack.Test/ContentRightsManagementTests.cs b/Wabbajack.Test/ContentRightsManagementTests.cs index 1eba31f6..ae89aa24 100644 --- a/Wabbajack.Test/ContentRightsManagementTests.cs +++ b/Wabbajack.Test/ContentRightsManagementTests.cs @@ -48,7 +48,6 @@ namespace Wabbajack.Test [TestInitialize] public void TestSetup() { - WorkQueue.Init((x, y, z) => { }, (min, max) => { }); validate = new ValidateModlist(); validate.LoadAuthorPermissionsFromString(permissions); validate.LoadServerWhitelist(server_whitelist); diff --git a/Wabbajack.Test/EndToEndTests.cs b/Wabbajack.Test/EndToEndTests.cs index be63d2e9..26044d40 100644 --- a/Wabbajack.Test/EndToEndTests.cs +++ b/Wabbajack.Test/EndToEndTests.cs @@ -30,9 +30,7 @@ namespace Wabbajack.Test utils = new TestUtils(); utils.GameName = "Skyrim Special Edition"; - Utils.SetStatusFn((f, idx) => { }); - Utils.SetLoggerFn(f => TestContext.WriteLine(f)); - WorkQueue.Init((x, y, z) => { }, (min, max) => { }); + Utils.LogMessages.Subscribe(f => TestContext.WriteLine(f)); if (!Directory.Exists(DOWNLOAD_FOLDER)) Directory.CreateDirectory(DOWNLOAD_FOLDER); diff --git a/Wabbajack/App.xaml.cs b/Wabbajack/App.xaml.cs index 7717a08e..53c1f5dd 100644 --- a/Wabbajack/App.xaml.cs +++ b/Wabbajack/App.xaml.cs @@ -17,7 +17,6 @@ namespace Wabbajack AppDomain.CurrentDomain.UnhandledException += (sender, e) => { // Don't do any special logging side effects - Utils.SetLoggerFn((s) => { }); Utils.Log("Uncaught error:"); Utils.Log(((Exception)e.ExceptionObject).ExceptionToString()); }; diff --git a/Wabbajack/Util/CPUStatus.cs b/Wabbajack/Util/CPUStatus.cs deleted file mode 100644 index 0a218d7a..00000000 --- a/Wabbajack/Util/CPUStatus.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Wabbajack -{ - public class CPUStatus - { - public int Progress { get; internal set; } - public string Msg { get; internal set; } - public int ID { get; internal set; } - } -} diff --git a/Wabbajack/View Models/MainWindowVM.cs b/Wabbajack/View Models/MainWindowVM.cs index 64993fd1..613334bd 100644 --- a/Wabbajack/View Models/MainWindowVM.cs +++ b/Wabbajack/View Models/MainWindowVM.cs @@ -29,11 +29,9 @@ namespace Wabbajack private readonly ObservableAsPropertyHelper _ActivePane; public ViewModel ActivePane => _ActivePane.Value; - [Reactive] - public int QueueProgress { get; set; } + private readonly ObservableAsPropertyHelper _QueueProgress; + public int QueueProgress => _QueueProgress.Value; - private readonly Subject _statusSubject = new Subject(); - public IObservable StatusObservable => _statusSubject; public ObservableCollectionExtended StatusList { get; } = new ObservableCollectionExtended(); private Subject _logSubj = new Subject(); @@ -53,7 +51,7 @@ namespace Wabbajack this._Compiler = new Lazy(() => new CompilerVM(this, source)); // Set up logging - _logSubj + Utils.LogMessages .ToObservableChangeSet() .Buffer(TimeSpan.FromMilliseconds(250)) .Where(l => l.Count > 0) @@ -63,8 +61,9 @@ namespace Wabbajack .Bind(this.Log) .Subscribe() .DisposeWith(this.CompositeDisposable); - Utils.SetLoggerFn(s => _logSubj.OnNext(s)); - Utils.SetStatusFn((msg, progress) => WorkQueue.Report(msg, progress)); + Utils.StatusUpdates + .Subscribe((i) => WorkQueue.Report(i.Message, i.Progress)) + .DisposeWith(this.CompositeDisposable); // Wire mode to drive the active pane. // Note: This is currently made into a derivative property driven by mode, @@ -89,13 +88,8 @@ namespace Wabbajack .Subscribe(vm => vm.ModListPath = source) .DisposeWith(this.CompositeDisposable); - // Initialize work queue - WorkQueue.Init( - report_function: (id, msg, progress) => this._statusSubject.OnNext(new CPUStatus() { ID = id, Msg = msg, Progress = progress }), - report_queue_size: (max, current) => this.SetQueueSize(max, current)); - // Compile progress updates and populate ObservableCollection - this._statusSubject + WorkQueue.Status .ObserveOn(RxApp.TaskpoolScheduler) .ToObservableChangeSet(x => x.ID) .Batch(TimeSpan.FromMilliseconds(250)) @@ -105,19 +99,17 @@ namespace Wabbajack .Bind(this.StatusList) .Subscribe() .DisposeWith(this.CompositeDisposable); - } - private void SetQueueSize(int max, int current) - { - if (max == 0) - max = 1; - QueueProgress = current * 100 / max; - } - - public override void Dispose() - { - base.Dispose(); - Utils.SetLoggerFn(s => { }); + this._QueueProgress = WorkQueue.QueueSize + .Select(progress => + { + if (progress.Max == 0) + { + progress.Max = 1; + } + return progress.Current * 100 / progress.Max; + }) + .ToProperty(this, nameof(this.QueueProgress)); } } } diff --git a/Wabbajack/Wabbajack.csproj b/Wabbajack/Wabbajack.csproj index 39c9700d..e5f95f0d 100644 --- a/Wabbajack/Wabbajack.csproj +++ b/Wabbajack/Wabbajack.csproj @@ -182,7 +182,6 @@ InstallationView.xaml -