Merge pull request #149 from Noggog/WorkQueue-Utils-Rx

WorkQueue and Utils Rx instead of callbacks
This commit is contained in:
Timothy Baldridge 2019-11-04 22:30:05 -07:00 committed by GitHub
commit 6a93e4eca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 67 additions and 81 deletions

View File

@ -1,6 +1,7 @@
using System; using System;
using Wabbajack.Common; using Wabbajack.Common;
using Microsoft.Win32; using Microsoft.Win32;
using System.Reactive;
namespace VirtualFileSystem.Test namespace VirtualFileSystem.Test
{ {
@ -10,10 +11,8 @@ namespace VirtualFileSystem.Test
{ {
var result = Registry.LocalMachine.OpenSubKey(@"SOFTWARE\7-zip\"); var result = Registry.LocalMachine.OpenSubKey(@"SOFTWARE\7-zip\");
Utils.SetLoggerFn(s => Console.WriteLine(s)); Utils.LogMessages.Subscribe(s => Console.WriteLine(s));
Utils.SetStatusFn((s, i) => Console.Write(s + "\r")); Utils.StatusUpdates.Subscribe((i) => Console.Write(i.Message + "\r"));
WorkQueue.Init((a, b, c) => { },
(a, b) => { });
VFS.VirtualFileSystem.VFS.AddRoot(@"D:\tmp\archivetests"); VFS.VirtualFileSystem.VFS.AddRoot(@"D:\tmp\archivetests");
} }
} }

View File

@ -101,5 +101,10 @@
<Name>Wabbajack.Common</Name> <Name>Wabbajack.Common</Name>
</ProjectReference> </ProjectReference>
</ItemGroup> </ItemGroup>
<ItemGroup>
<PackageReference Include="System.Reactive">
<Version>4.2.0</Version>
</PackageReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project> </Project>

View File

@ -6,6 +6,7 @@ using System.IO;
using System.Linq; using System.Linq;
using System.Net.Configuration; using System.Net.Configuration;
using System.Net.Http; using System.Net.Http;
using System.Reactive.Subjects;
using System.Reflection; using System.Reflection;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text; using System.Text;
@ -43,22 +44,15 @@ namespace Wabbajack.Common
File.Delete(LogFile); File.Delete(LogFile);
} }
private static Action<string> _loggerFn; private static readonly Subject<string> _loggerSubj = new Subject<string>();
private static Action<string, int> _statusFn; public static IObservable<string> LogMessages => _loggerSubj;
private static readonly Subject<(string Message, int Progress)> _statusSubj = new Subject<(string Message, int Progress)>();
public static IObservable<(string Message, int Progress)> StatusUpdates => _statusSubj;
private static readonly string[] Suffix = {"B", "KB", "MB", "GB", "TB", "PB", "EB"}; // Longs run out around EB private static readonly string[] Suffix = {"B", "KB", "MB", "GB", "TB", "PB", "EB"}; // Longs run out around EB
public static void SetLoggerFn(Action<string> f)
{
_loggerFn = f;
}
public static void SetStatusFn(Action<string, int> f)
{
_statusFn = f;
}
private static object _lock = new object(); private static object _lock = new object();
private static DateTime _startTime; private static DateTime _startTime;
public static void Log(string msg) public static void Log(string msg)
@ -69,7 +63,7 @@ namespace Wabbajack.Common
File.AppendAllText(LogFile, msg + "\r\n"); File.AppendAllText(LogFile, msg + "\r\n");
} }
_loggerFn?.Invoke(msg); _loggerSubj.OnNext(msg);
} }
public static void LogToFile(string msg) public static void LogToFile(string msg)
@ -87,10 +81,9 @@ namespace Wabbajack.Common
if (WorkQueue.CustomReportFn != null) if (WorkQueue.CustomReportFn != null)
WorkQueue.CustomReportFn(progress, msg); WorkQueue.CustomReportFn(progress, msg);
else else
_statusFn?.Invoke(msg, progress); _statusSubj.OnNext((msg, progress));
} }
/// <summary> /// <summary>
/// MurMur3 hashes the file pointed to by this string /// MurMur3 hashes the file pointed to by this string
/// </summary> /// </summary>

View File

@ -143,6 +143,9 @@
<PackageReference Include="System.Data.HashFunction.xxHash"> <PackageReference Include="System.Data.HashFunction.xxHash">
<Version>2.0.0</Version> <Version>2.0.0</Version>
</PackageReference> </PackageReference>
<PackageReference Include="System.Reactive">
<Version>4.2.0</Version>
</PackageReference>
<PackageReference Include="YamlDotNet"> <PackageReference Include="YamlDotNet">
<Version>8.0.0</Version> <Version>8.0.0</Version>
</PackageReference> </PackageReference>

View File

@ -2,6 +2,10 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading; using System.Threading;
namespace Wabbajack.Common namespace Wabbajack.Common
@ -19,22 +23,17 @@ namespace Wabbajack.Common
public static int MaxQueueSize; public static int MaxQueueSize;
public static int CurrentQueueSize; public static int CurrentQueueSize;
private static bool _inited;
public static Action<int, string, int> ReportFunction { get; private set; } private readonly static Subject<CPUStatus> _Status = new Subject<CPUStatus>();
public static Action<int, int> ReportQueueSize { get; private set; } public static IObservable<CPUStatus> Status => _Status;
public static int ThreadCount { get; private set; } private readonly static Subject<(int Current, int Max)> _QueueSize = new Subject<(int Current, int Max)>();
public static IObservable<(int Current, int Max)> QueueSize => _QueueSize;
public static int ThreadCount { get; } = Environment.ProcessorCount;
public static List<Thread> Threads { get; private set; } public static List<Thread> Threads { get; private set; }
public static void Init(Action<int, string, int> report_function, Action<int, int> report_queue_size) static WorkQueue()
{ {
ReportFunction = report_function;
ReportQueueSize = report_queue_size;
ThreadCount = Environment.ProcessorCount;
if (_inited) return;
StartThreads(); StartThreads();
_inited = true;
} }
private static void StartThreads() private static void StartThreads()
@ -67,9 +66,19 @@ namespace Wabbajack.Common
public static void Report(string msg, int progress) public static void Report(string msg, int progress)
{ {
if (CustomReportFn != null) if (CustomReportFn != null)
{
CustomReportFn(progress, msg); CustomReportFn(progress, msg);
}
else else
ReportFunction(CpuId, msg, progress); {
_Status.OnNext(
new CPUStatus
{
Progress = progress,
Msg = msg,
ID = CpuId
});
}
} }
public static void QueueTask(Action a) public static void QueueTask(Action a)
@ -79,7 +88,14 @@ namespace Wabbajack.Common
internal static void ReportNow() internal static void ReportNow()
{ {
ReportQueueSize(MaxQueueSize, CurrentQueueSize); _QueueSize.OnNext((MaxQueueSize, CurrentQueueSize));
} }
} }
public class CPUStatus
{
public int Progress { get; internal set; }
public string Msg { get; internal set; }
public int ID { get; internal set; }
}
} }

View File

@ -23,9 +23,7 @@ namespace Wabbajack.Test
utils = new TestUtils(); utils = new TestUtils();
utils.GameName = "Skyrim Special Edition"; utils.GameName = "Skyrim Special Edition";
Utils.SetStatusFn((f, idx) => { }); Utils.LogMessages.Subscribe(f => TestContext.WriteLine(f));
Utils.SetLoggerFn(f => TestContext.WriteLine(f));
WorkQueue.Init((x, y, z) => { }, (min, max) => { });
} }

View File

@ -48,7 +48,6 @@ namespace Wabbajack.Test
[TestInitialize] [TestInitialize]
public void TestSetup() public void TestSetup()
{ {
WorkQueue.Init((x, y, z) => { }, (min, max) => { });
validate = new ValidateModlist(); validate = new ValidateModlist();
validate.LoadAuthorPermissionsFromString(permissions); validate.LoadAuthorPermissionsFromString(permissions);
validate.LoadServerWhitelist(server_whitelist); validate.LoadServerWhitelist(server_whitelist);

View File

@ -30,9 +30,7 @@ namespace Wabbajack.Test
utils = new TestUtils(); utils = new TestUtils();
utils.GameName = "Skyrim Special Edition"; utils.GameName = "Skyrim Special Edition";
Utils.SetStatusFn((f, idx) => { }); Utils.LogMessages.Subscribe(f => TestContext.WriteLine(f));
Utils.SetLoggerFn(f => TestContext.WriteLine(f));
WorkQueue.Init((x, y, z) => { }, (min, max) => { });
if (!Directory.Exists(DOWNLOAD_FOLDER)) if (!Directory.Exists(DOWNLOAD_FOLDER))
Directory.CreateDirectory(DOWNLOAD_FOLDER); Directory.CreateDirectory(DOWNLOAD_FOLDER);

View File

@ -17,7 +17,6 @@ namespace Wabbajack
AppDomain.CurrentDomain.UnhandledException += (sender, e) => AppDomain.CurrentDomain.UnhandledException += (sender, e) =>
{ {
// Don't do any special logging side effects // Don't do any special logging side effects
Utils.SetLoggerFn((s) => { });
Utils.Log("Uncaught error:"); Utils.Log("Uncaught error:");
Utils.Log(((Exception)e.ExceptionObject).ExceptionToString()); Utils.Log(((Exception)e.ExceptionObject).ExceptionToString());
}; };

View File

@ -1,15 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack
{
public class CPUStatus
{
public int Progress { get; internal set; }
public string Msg { get; internal set; }
public int ID { get; internal set; }
}
}

View File

@ -29,11 +29,9 @@ namespace Wabbajack
private readonly ObservableAsPropertyHelper<ViewModel> _ActivePane; private readonly ObservableAsPropertyHelper<ViewModel> _ActivePane;
public ViewModel ActivePane => _ActivePane.Value; public ViewModel ActivePane => _ActivePane.Value;
[Reactive] private readonly ObservableAsPropertyHelper<int> _QueueProgress;
public int QueueProgress { get; set; } public int QueueProgress => _QueueProgress.Value;
private readonly Subject<CPUStatus> _statusSubject = new Subject<CPUStatus>();
public IObservable<CPUStatus> StatusObservable => _statusSubject;
public ObservableCollectionExtended<CPUStatus> StatusList { get; } = new ObservableCollectionExtended<CPUStatus>(); public ObservableCollectionExtended<CPUStatus> StatusList { get; } = new ObservableCollectionExtended<CPUStatus>();
private Subject<string> _logSubj = new Subject<string>(); private Subject<string> _logSubj = new Subject<string>();
@ -53,7 +51,7 @@ namespace Wabbajack
this._Compiler = new Lazy<CompilerVM>(() => new CompilerVM(this, source)); this._Compiler = new Lazy<CompilerVM>(() => new CompilerVM(this, source));
// Set up logging // Set up logging
_logSubj Utils.LogMessages
.ToObservableChangeSet() .ToObservableChangeSet()
.Buffer(TimeSpan.FromMilliseconds(250)) .Buffer(TimeSpan.FromMilliseconds(250))
.Where(l => l.Count > 0) .Where(l => l.Count > 0)
@ -63,8 +61,9 @@ namespace Wabbajack
.Bind(this.Log) .Bind(this.Log)
.Subscribe() .Subscribe()
.DisposeWith(this.CompositeDisposable); .DisposeWith(this.CompositeDisposable);
Utils.SetLoggerFn(s => _logSubj.OnNext(s)); Utils.StatusUpdates
Utils.SetStatusFn((msg, progress) => WorkQueue.Report(msg, progress)); .Subscribe((i) => WorkQueue.Report(i.Message, i.Progress))
.DisposeWith(this.CompositeDisposable);
// Wire mode to drive the active pane. // Wire mode to drive the active pane.
// Note: This is currently made into a derivative property driven by mode, // Note: This is currently made into a derivative property driven by mode,
@ -89,13 +88,8 @@ namespace Wabbajack
.Subscribe(vm => vm.ModListPath = source) .Subscribe(vm => vm.ModListPath = source)
.DisposeWith(this.CompositeDisposable); .DisposeWith(this.CompositeDisposable);
// Initialize work queue
WorkQueue.Init(
report_function: (id, msg, progress) => this._statusSubject.OnNext(new CPUStatus() { ID = id, Msg = msg, Progress = progress }),
report_queue_size: (max, current) => this.SetQueueSize(max, current));
// Compile progress updates and populate ObservableCollection // Compile progress updates and populate ObservableCollection
this._statusSubject WorkQueue.Status
.ObserveOn(RxApp.TaskpoolScheduler) .ObserveOn(RxApp.TaskpoolScheduler)
.ToObservableChangeSet(x => x.ID) .ToObservableChangeSet(x => x.ID)
.Batch(TimeSpan.FromMilliseconds(250)) .Batch(TimeSpan.FromMilliseconds(250))
@ -105,19 +99,17 @@ namespace Wabbajack
.Bind(this.StatusList) .Bind(this.StatusList)
.Subscribe() .Subscribe()
.DisposeWith(this.CompositeDisposable); .DisposeWith(this.CompositeDisposable);
}
private void SetQueueSize(int max, int current) this._QueueProgress = WorkQueue.QueueSize
{ .Select(progress =>
if (max == 0) {
max = 1; if (progress.Max == 0)
QueueProgress = current * 100 / max; {
} progress.Max = 1;
}
public override void Dispose() return progress.Current * 100 / progress.Max;
{ })
base.Dispose(); .ToProperty(this, nameof(this.QueueProgress));
Utils.SetLoggerFn(s => { });
} }
} }
} }

View File

@ -182,7 +182,6 @@
<Compile Include="Views\InstallationView.xaml.cs"> <Compile Include="Views\InstallationView.xaml.cs">
<DependentUpon>InstallationView.xaml</DependentUpon> <DependentUpon>InstallationView.xaml</DependentUpon>
</Compile> </Compile>
<Compile Include="Util\CPUStatus.cs" />
<Compile Include="View Models\CompilerVM.cs" /> <Compile Include="View Models\CompilerVM.cs" />
<Compile Include="View Models\MainWindowVM.cs" /> <Compile Include="View Models\MainWindowVM.cs" />
<Compile Include="Views\ModeSelectionWindow.xaml.cs"> <Compile Include="Views\ModeSelectionWindow.xaml.cs">