Merge pull request #182 from wabbajack-tools/compiler-update-streams

Instanced Work Queues
This commit is contained in:
Timothy Baldridge 2019-11-17 06:55:30 -07:00 committed by GitHub
commit 00a1215cf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 262 additions and 201 deletions

View File

@ -25,10 +25,12 @@ namespace Compression.BSA.Test
public TestContext TestContext { get; set; }
private static WorkQueue Queue { get; set; }
[ClassInitialize]
public static void Setup(TestContext TestContext)
{
Queue = new WorkQueue();
Utils.LogMessages.Subscribe(f => TestContext.WriteLine(f));
if (!Directory.Exists(StagingFolder))
Directory.CreateDirectory(StagingFolder);
@ -50,10 +52,13 @@ namespace Compression.BSA.Test
var folder = Path.Combine(BSAFolder, info.Item1.ToString(), info.Item2.ToString());
if (!Directory.Exists(folder))
Directory.CreateDirectory(folder);
FileExtractor.ExtractAll(filename, folder);
FileExtractor.ExtractAll(Queue, filename, folder);
}
}
private static string DownloadMod((Game, int) info)
{
using (var client = new NexusApiClient())
@ -98,7 +103,7 @@ namespace Compression.BSA.Test
string TempFile = Path.Combine("tmp.bsa");
using (var a = BSADispatch.OpenRead(bsa))
{
a.Files.PMap(file =>
a.Files.PMap(Queue, file =>
{
var abs_name = Path.Combine(TempDir, file.Path);
ViaJson(file.State);
@ -119,7 +124,7 @@ namespace Compression.BSA.Test
using (var w = ViaJson(a.State).MakeBuilder())
{
a.Files.PMap(file =>
a.Files.PMap(Queue, file =>
{
var abs_path = Path.Combine(TempDir, file.Path);
using (var str = File.OpenRead(abs_path))
@ -142,7 +147,7 @@ namespace Compression.BSA.Test
var idx = 0;
a.Files.Zip(b.Files, (ai, bi) => (ai, bi))
.PMap(pair =>
.PMap(Queue, pair =>
{
idx++;
Assert.AreEqual(JsonConvert.SerializeObject(pair.ai.State),

View File

@ -31,12 +31,12 @@ namespace Wabbajack.Common
}
public static void ExtractAll(string source, string dest)
public static void ExtractAll(WorkQueue queue, string source, string dest)
{
try
{
if (Consts.SupportedBSAs.Any(b => source.ToLower().EndsWith(b)))
ExtractAllWithBSA(source, dest);
ExtractAllWithBSA(queue, source, dest);
else if (source.EndsWith(".omod"))
ExtractAllWithOMOD(source, dest);
else
@ -60,14 +60,14 @@ namespace Wabbajack.Common
return dest;
}
private static void ExtractAllWithBSA(string source, string dest)
private static void ExtractAllWithBSA(WorkQueue queue, string source, string dest)
{
try
{
using (var arch = BSADispatch.OpenRead(source))
{
arch.Files
.PMap(f =>
.PMap(queue, f =>
{
var path = f.Path;
if (f.Path.StartsWith("\\"))

View File

@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using ReactiveUI;
namespace Wabbajack.Common
{
public class StatusUpdateTracker
{
private Subject<string> _stepName = new Subject<string>();
public IObservable<string> StepName => _stepName;
private Subject<int> _step = new Subject<int>();
public IObservable<int> Step => _step;
private Subject<int> _maxStep = new Subject<int>();
public IObservable<int> MaxStep => _maxStep;
private Subject<float> _progress = new Subject<float>();
public IObservable<float> Progress => _progress;
private int _internalCurrentStep;
private int _internalMaxStep;
public StatusUpdateTracker(int maxStep)
{
_internalMaxStep = maxStep;
}
public void Reset()
{
_maxStep.OnNext(_internalMaxStep);
}
public void NextStep(string name)
{
_internalCurrentStep += 1;
_step.OnNext(_internalCurrentStep);
_stepName.OnNext(name);
_progress.OnNext(0.0f);
}
public void MakeUpdate(double progress)
{
_progress.OnNext((float)0.0);
}
public void MakeUpdate(int max, int curr)
{
_progress.OnNext((float)curr / ((float) (max == 0 ? 1 : max)));
}
}
}

View File

@ -78,10 +78,7 @@ namespace Wabbajack.Common
public static void Status(string msg, int progress = 0)
{
if (WorkQueue.CustomReportFn != null)
WorkQueue.CustomReportFn(progress, msg);
else
_statusSubj.OnNext((msg, progress));
_statusSubj.OnNext((msg, progress));
}
/// <summary>
@ -413,18 +410,29 @@ namespace Wabbajack.Common
}
}
public static List<TR> PMap<TI, TR>(this IEnumerable<TI> coll, Func<TI, TR> f)
public static List<TR> PMap<TI, TR>(this IEnumerable<TI> coll, WorkQueue queue, StatusUpdateTracker updateTracker,
Func<TI, TR> f)
{
var cnt = 0;
var collist = coll.ToList();
return collist.PMap(queue, itm =>
{
updateTracker.MakeUpdate(collist.Count, Interlocked.Increment(ref cnt));
return f(itm);
});
}
public static List<TR> PMap<TI, TR>(this IEnumerable<TI> coll, WorkQueue queue,
Func<TI, TR> f)
{
var colllst = coll.ToList();
Interlocked.Add(ref WorkQueue.MaxQueueSize, colllst.Count);
//WorkQueue.CurrentQueueSize = 0;
var remaining_tasks = colllst.Count;
var tasks = coll.Select(i =>
{
var tc = new TaskCompletionSource<TR>();
WorkQueue.QueueTask(() =>
queue.QueueTask(() =>
{
try
{
@ -434,10 +442,7 @@ namespace Wabbajack.Common
{
tc.SetException(ex);
}
Interlocked.Increment(ref WorkQueue.CurrentQueueSize);
Interlocked.Decrement(ref remaining_tasks);
WorkQueue.ReportNow();
});
return tc.Task;
}).ToList();
@ -445,15 +450,9 @@ namespace Wabbajack.Common
// To avoid thread starvation, we'll start to help out in the work queue
if (WorkQueue.WorkerThread)
while (remaining_tasks > 0)
if (WorkQueue.Queue.TryTake(out var a, 500))
if (queue.Queue.TryTake(out var a, 500))
a();
if (WorkQueue.CurrentQueueSize == WorkQueue.MaxQueueSize)
{
WorkQueue.MaxQueueSize = 0;
WorkQueue.MaxQueueSize = 0;
}
return tasks.Select(t =>
{
t.Wait();
@ -463,9 +462,9 @@ namespace Wabbajack.Common
}).ToList();
}
public static void PMap<TI>(this IEnumerable<TI> coll, Action<TI> f)
public static void PMap<TI>(this IEnumerable<TI> coll, WorkQueue queue, Action<TI> f)
{
coll.PMap(i =>
coll.PMap(queue, i =>
{
f(i);
return false;

View File

@ -104,6 +104,7 @@
<Compile Include="GOGHandler.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SplittingStream.cs" />
<Compile Include="StatusUpdate.cs" />
<Compile Include="SteamHandler.cs" />
<Compile Include="Utils.cs" />
<Compile Include="WorkQueue.cs" />
@ -146,6 +147,9 @@
<PackageReference Include="Newtonsoft.Json.Bson">
<Version>1.0.2</Version>
</PackageReference>
<PackageReference Include="ReactiveUI">
<Version>10.5.7</Version>
</PackageReference>
<PackageReference Include="System.Data.HashFunction.xxHash">
<Version>2.0.0</Version>
</PackageReference>

View File

@ -12,31 +12,26 @@ namespace Wabbajack.Common
{
public class WorkQueue
{
internal static BlockingCollection<Action>
internal BlockingCollection<Action>
Queue = new BlockingCollection<Action>(new ConcurrentStack<Action>());
[ThreadStatic] private static int CpuId;
[ThreadStatic] internal static bool WorkerThread;
internal static bool WorkerThread => CurrentQueue != null;
[ThreadStatic] internal static WorkQueue CurrentQueue;
[ThreadStatic] public static Action<int, string> CustomReportFn;
private static readonly Subject<CPUStatus> _Status = new Subject<CPUStatus>();
public IObservable<CPUStatus> Status => _Status;
public static int MaxQueueSize;
public static int CurrentQueueSize;
private readonly static Subject<CPUStatus> _Status = new Subject<CPUStatus>();
public static IObservable<CPUStatus> Status => _Status;
private readonly static Subject<(int Current, int Max)> _QueueSize = new Subject<(int Current, int Max)>();
public static IObservable<(int Current, int Max)> QueueSize => _QueueSize;
public static int ThreadCount { get; } = Environment.ProcessorCount;
public static List<Thread> Threads { get; private set; }
static WorkQueue()
public WorkQueue()
{
StartThreads();
}
private static void StartThreads()
private void StartThreads()
{
Threads = Enumerable.Range(0, ThreadCount)
.Select(idx =>
@ -50,10 +45,10 @@ namespace Wabbajack.Common
}).ToList();
}
private static void ThreadBody(int idx)
private void ThreadBody(int idx)
{
CpuId = idx;
WorkerThread = true;
CurrentQueue = this;
while (true)
{
@ -63,32 +58,25 @@ namespace Wabbajack.Common
}
}
public static void Report(string msg, int progress)
public void Report(string msg, int progress)
{
if (CustomReportFn != null)
{
CustomReportFn(progress, msg);
}
else
{
_Status.OnNext(
new CPUStatus
{
Progress = progress,
Msg = msg,
ID = CpuId
});
}
_Status.OnNext(
new CPUStatus
{
Progress = progress,
Msg = msg,
ID = CpuId
});
}
public static void QueueTask(Action a)
public void QueueTask(Action a)
{
Queue.Add(a);
}
internal static void ReportNow()
public void Shutdown()
{
_QueueSize.OnNext((MaxQueueSize, CurrentQueueSize));
Threads.Do(th => th.Abort());
}
}

View File

@ -12,6 +12,10 @@ namespace Wabbajack.Lib
{
public abstract class ACompiler
{
public StatusUpdateTracker UpdateTracker { get; protected set; }
public WorkQueue Queue { get; protected set; }
protected static string _vfsCacheName = "vfs_compile_cache.bin";
/// <summary>
/// A stream of tuples of ("Update Title", 0.25) which represent the name of the current task
@ -20,7 +24,7 @@ namespace Wabbajack.Lib
public IObservable<(string, float)> ProgressUpdates => _progressUpdates;
protected readonly Subject<(string, float)> _progressUpdates = new Subject<(string, float)>();
public Context VFS { get; internal set; } = new Context();
public Context VFS { get; internal set; }
public ModManager ModManager;
@ -53,7 +57,7 @@ namespace Wabbajack.Lib
protected ACompiler()
{
ProgressUpdates.Subscribe(itm => Utils.Log($"{itm.Item2} - {itm.Item1}"));
VFS.LogSpam.Subscribe(itm => Utils.Status(itm));
Queue = new WorkQueue();
}
}
}

View File

@ -63,7 +63,7 @@ namespace Wabbajack.Lib.CompilationSteps
var id = Guid.NewGuid().ToString();
var matches = source_files.PMap(e => _mo2Compiler.RunStack(stack, new RawSourceFile(e)
var matches = source_files.PMap(_mo2Compiler.Queue, e => _mo2Compiler.RunStack(stack, new RawSourceFile(e)
{
Path = Path.Combine(Consts.BSACreationDir, id, e.Name)
}));

View File

@ -8,7 +8,9 @@ using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Reactive.Subjects;
using System.Reflection;
using System.Runtime.InteropServices.ComTypes;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
@ -42,6 +44,9 @@ namespace Wabbajack.Lib
public Compiler(string mo2_folder)
{
UpdateTracker = new StatusUpdateTracker(10);
Queue = new WorkQueue();
VFS = new Context(Queue) {UpdateTracker = UpdateTracker};
ModManager = ModManager.MO2;
MO2Folder = mo2_folder;
@ -89,7 +94,7 @@ namespace Wabbajack.Lib
public override void Status(string msg)
{
WorkQueue.Report(msg, 0);
Queue.Report(msg, 0);
}
public override void Error(string msg)
@ -114,6 +119,8 @@ namespace Wabbajack.Lib
public override bool Compile()
{
UpdateTracker.Reset();
UpdateTracker.NextStep("Gathering information");
Info("Looking for other profiles");
var other_profiles_path = Path.Combine(MO2ProfileDir, "otherprofiles.txt");
SelectedProfiles = new HashSet<string>();
@ -124,27 +131,31 @@ namespace Wabbajack.Lib
VFS.IntegrateFromFile(_vfsCacheName);
Info($"Indexing {MO2Folder}");
UpdateTracker.NextStep($"Indexing {MO2Folder}");
VFS.AddRoot(MO2Folder);
UpdateTracker.NextStep("Writing VFS Cache");
VFS.WriteToFile(_vfsCacheName);
Info($"Indexing {GamePath}");
UpdateTracker.NextStep($"Indexing {GamePath}");
VFS.AddRoot(GamePath);
UpdateTracker.NextStep("Writing VFS Cache");
VFS.WriteToFile(_vfsCacheName);
Info($"Indexing {MO2DownloadsFolder}");
UpdateTracker.NextStep($"Indexing {MO2DownloadsFolder}");
VFS.AddRoot(MO2DownloadsFolder);
UpdateTracker.NextStep("Writing VFS Cache");
VFS.WriteToFile(_vfsCacheName);
Info("Cleaning output folder");
UpdateTracker.NextStep("Cleaning output folder");
if (Directory.Exists(ModListOutputFolder))
Directory.Delete(ModListOutputFolder, true);
Directory.Delete(ModListOutputFolder, true, true);
UpdateTracker.NextStep("Finding Install Files");
Directory.CreateDirectory(ModListOutputFolder);
var mo2_files = Directory.EnumerateFiles(MO2Folder, "*", SearchOption.AllDirectories)
@ -174,10 +185,6 @@ namespace Wabbajack.Lib
{ Path = Path.Combine(Consts.LOOTFolderFilesDir, p.RelativeTo(loot_path)) });
}
Info("Indexing Archives");
IndexedArchives = Directory.EnumerateFiles(MO2DownloadsFolder)
.Where(f => File.Exists(f + ".meta"))
.Select(f => new IndexedArchive
@ -189,14 +196,11 @@ namespace Wabbajack.Lib
})
.ToList();
Info("Indexing Files");
IndexedFiles = IndexedArchives.SelectMany(f => f.File.ThisAndAllChildren)
.OrderBy(f => f.NestingFactor)
.GroupBy(f => f.Hash)
.ToDictionary(f => f.Key, f => f.AsEnumerable());
Info("Searching for mod files");
AllFiles = mo2_files.Concat(game_files)
.Concat(loot_files)
.DistinctBy(f => f.Path)
@ -235,8 +239,8 @@ namespace Wabbajack.Lib
var stack = MakeStack();
Info("Running Compilation Stack");
var results = AllFiles.PMap(f => RunStack(stack, f)).ToList();
UpdateTracker.NextStep("Running Compilation Stack");
var results = AllFiles.PMap(Queue, UpdateTracker, f => RunStack(stack, f)).ToList();
// Add the extra files that were generated by the stack
Info($"Adding {ExtraFiles.Count} that were generated by the stack");
@ -426,7 +430,7 @@ namespace Wabbajack.Lib
Info($"Patching building patches from {groups.Count} archives");
var absolute_paths = AllFiles.ToDictionary(e => e.Path, e => e.AbsolutePath);
groups.PMap(group => BuildArchivePatches(group.Key, group, absolute_paths));
groups.PMap(Queue, group => BuildArchivePatches(group.Key, group, absolute_paths));
if (InstallDirectives.OfType<PatchedFromArchive>().FirstOrDefault(f => f.PatchID == null) != null)
Error("Missing patches after generation, this should not happen");
@ -440,7 +444,7 @@ namespace Wabbajack.Lib
var by_path = files.GroupBy(f => string.Join("|", f.FilesInFullPath.Skip(1).Select(i => i.Name)))
.ToDictionary(f => f.Key, f => f.First());
// Now Create the patches
group.PMap(entry =>
group.PMap(Queue, entry =>
{
Info($"Patching {entry.To}");
Status($"Patching {entry.To}");
@ -496,7 +500,7 @@ namespace Wabbajack.Lib
.GroupBy(f => f.File.Hash)
.ToDictionary(f => f.Key, f => f.First());
SelectedArchives = shas.PMap(sha => ResolveArchive(sha, archives));
SelectedArchives = shas.PMap(Queue, sha => ResolveArchive(sha, archives));
}
private Archive ResolveArchive(string sha, IDictionary<string, IndexedArchive> archives)

View File

@ -22,14 +22,18 @@ namespace Wabbajack.Lib
{
private string _downloadsFolder;
private WorkQueue Queue { get; set; }
public Installer(string archive, ModList mod_list, string output_folder)
{
Queue = new WorkQueue();
VFS = new Context(Queue);
ModListArchive = archive;
Outputfolder = output_folder;
ModList = mod_list;
}
public Context VFS { get; } = new Context();
public Context VFS { get; }
public string Outputfolder { get; }
@ -53,12 +57,12 @@ namespace Wabbajack.Lib
public void Status(string msg)
{
WorkQueue.Report(msg, 0);
Queue.Report(msg, 0);
}
public void Status(string msg, int progress)
{
WorkQueue.Report(msg, progress);
Queue.Report(msg, progress);
}
private void Error(string msg)
@ -170,7 +174,7 @@ namespace Wabbajack.Lib
{
ModList.Directives
.OfType<ArchiveMeta>()
.PMap(directive =>
.PMap(Queue, directive =>
{
Status($"Writing included .meta file {directive.To}");
var out_path = Path.Combine(DownloadFolder, directive.To);
@ -221,7 +225,7 @@ namespace Wabbajack.Lib
mods[b] = tmp;
}
mods.PMap(mod =>
mods.PMap(Queue, mod =>
{
var er = new NexusApiClient().EndorseMod(mod);
Utils.Log($"Endorsed {mod.GameName} - {mod.ModID} - Result: {er.message}");
@ -262,7 +266,7 @@ namespace Wabbajack.Lib
using (var a = bsa.State.MakeBuilder())
{
bsa.FileStates.PMap(state =>
bsa.FileStates.PMap(Queue, state =>
{
Status($"Adding {state.Path} to BSA");
using (var fs = File.OpenRead(Path.Combine(source_dir, state.Path)))
@ -290,7 +294,7 @@ namespace Wabbajack.Lib
Info("Writing inline files");
ModList.Directives
.OfType<InlineFile>()
.PMap(directive =>
.PMap(Queue, directive =>
{
Status($"Writing included file {directive.To}");
var out_path = Path.Combine(Outputfolder, directive.To);
@ -372,7 +376,7 @@ namespace Wabbajack.Lib
.ToList();
Info("Installing Archives");
archives.PMap(a => InstallArchive(a.Archive, a.AbsolutePath, grouped[a.Archive.Hash]));
archives.PMap(Queue, a => InstallArchive(a.Archive, a.AbsolutePath, grouped[a.Archive.Hash]));
}
private void InstallArchive(Archive archive, string absolutePath, IGrouping<string, FromArchive> grouping)
@ -488,17 +492,17 @@ namespace Wabbajack.Lib
}
missing.Where(a => a.State.GetType() != typeof(ManualDownloader.State))
.PMap(archive =>
{
Info($"Downloading {archive.Name}");
var output_path = Path.Combine(DownloadFolder, archive.Name);
.PMap(Queue, archive =>
{
Info($"Downloading {archive.Name}");
var output_path = Path.Combine(DownloadFolder, archive.Name);
if (download)
if (output_path.FileExists())
File.Delete(output_path);
if (download)
if (output_path.FileExists())
File.Delete(output_path);
return DownloadArchive(archive, download);
});
return DownloadArchive(archive, download);
});
}
public bool DownloadArchive(Archive archive, bool download)
@ -521,7 +525,7 @@ namespace Wabbajack.Lib
{
HashedArchives = Directory.EnumerateFiles(DownloadFolder)
.Where(e => !e.EndsWith(".sha"))
.PMap(e => (HashArchive(e), e))
.PMap(Queue, e => (HashArchive(e), e))
.OrderByDescending(e => File.GetLastWriteTime(e.Item2))
.GroupBy(e => e.Item1)
.Select(e => e.First())

View File

@ -390,7 +390,7 @@ namespace Wabbajack.Lib.NexusApi
.ToList();
Utils.Log($"Purging {to_purge.Count} cache entries");
to_purge.PMap(f => File.Delete(f.f));
to_purge.Do(f => File.Delete(f.f));
}
}

View File

@ -1,35 +0,0 @@
using System.Linq;
using Wabbajack.Common;
namespace Wabbajack.Lib.Updater
{
public class CheckForUpdates
{
private ModList _modlist;
private string _modlistPath;
public CheckForUpdates(string path)
{
_modlistPath = path;
_modlist = Installer.LoadFromFile(path);
}
public bool FindOutdatedMods()
{
var installer = new Installer(_modlistPath, _modlist, "");
Utils.Log($"Checking links for {_modlist.Archives.Count} archives");
var results = _modlist.Archives.PMap(f =>
{
var result = installer.DownloadArchive(f, false);
if (result) return false;
Utils.Log($"Unable to resolve link for {f.Name}. If this is hosted on the Nexus the file may have been removed.");
return true;
}).ToList();
return results.Any();
}
}
}

View File

@ -16,6 +16,8 @@ namespace Wabbajack.Lib.Validation
public class ValidateModlist
{
public Dictionary<string, Author> AuthorPermissions { get; set; } = new Dictionary<string, Author>();
private WorkQueue Queue = new WorkQueue();
public ServerWhitelist ServerWhitelist { get; set; } = new ServerWhitelist();
public void LoadAuthorPermissionsFromString(string s)
@ -101,12 +103,12 @@ namespace Wabbajack.Lib.Validation
var nexus_mod_permissions = modlist.Archives
.Where(a => a.State is NexusDownloader.State)
.PMap(a => (a.Hash, FilePermissions((NexusDownloader.State)a.State), a))
.PMap(Queue, a => (a.Hash, FilePermissions((NexusDownloader.State)a.State), a))
.ToDictionary(a => a.Hash, a => new { permissions = a.Item2, archive = a.a });
modlist.Directives
.OfType<PatchedFromArchive>()
.PMap(p =>
.PMap(Queue, p =>
{
if (nexus_mod_permissions.TryGetValue(p.ArchiveHashPath[0], out var archive))
{
@ -125,7 +127,7 @@ namespace Wabbajack.Lib.Validation
modlist.Directives
.OfType<FromArchive>()
.PMap(p =>
.PMap(Queue,p =>
{
if (nexus_mod_permissions.TryGetValue(p.ArchiveHashPath[0], out var archive))
{

View File

@ -42,7 +42,8 @@ namespace Wabbajack.Lib
this.VortexFolder = vortexFolder;
this.DownloadsFolder = downloadsFolder;
this.StagingFolder = stagingFolder;
Queue = new WorkQueue();
VFS = new Context(Queue);
ModListOutputFolder = "output_folder";
// TODO: add custom modlist name
@ -56,7 +57,7 @@ namespace Wabbajack.Lib
public override void Status(string msg)
{
WorkQueue.Report(msg, 0);
Queue.Report(msg, 0);
}
public override void Error(string msg)
@ -159,7 +160,7 @@ namespace Wabbajack.Lib
IEnumerable<ICompilationStep> stack = MakeStack();
Info("Running Compilation Stack");
List<Directive> results = AllFiles.PMap(f => RunStack(stack.Where(s => s != null), f)).ToList();
List<Directive> results = AllFiles.PMap(Queue, f => RunStack(stack.Where(s => s != null), f)).ToList();
IEnumerable<NoMatch> noMatch = results.OfType<NoMatch>().ToList();
Info($"No match for {noMatch.Count()} files");
@ -341,7 +342,7 @@ namespace Wabbajack.Lib
.GroupBy(f => f.File.Hash)
.ToDictionary(f => f.Key, f => f.First());
SelectedArchives = shas.PMap(sha => ResolveArchive(sha, archives));
SelectedArchives = shas.PMap(Queue, sha => ResolveArchive(sha, archives));
}
private Archive ResolveArchive(string sha, IDictionary<string, IndexedArchive> archives)

View File

@ -24,12 +24,16 @@ namespace Wabbajack.Lib
public string StagingFolder { get; set; }
public string DownloadFolder { get; set; }
public Context VFS { get; } = new Context();
public WorkQueue Queue { get; }
public Context VFS { get; }
public bool IgnoreMissingFiles { get; internal set; }
public VortexInstaller(string archive, ModList modList)
{
Queue = new WorkQueue();
VFS = new Context(Queue);
ModListArchive = archive;
ModList = modList;
@ -46,7 +50,7 @@ namespace Wabbajack.Lib
public void Status(string msg)
{
WorkQueue.Report(msg, 0);
Queue.Report(msg, 0);
}
private void Error(string msg)
@ -142,7 +146,7 @@ namespace Wabbajack.Lib
.ToList();
Info("Installing Archives");
archives.PMap(a => InstallArchive(a.Archive, a.AbsolutePath, grouped[a.Archive.Hash]));
archives.PMap(Queue,a => InstallArchive(a.Archive, a.AbsolutePath, grouped[a.Archive.Hash]));
}
private void InstallArchive(Archive archive, string absolutePath, IGrouping<string, FromArchive> grouping)
@ -192,7 +196,7 @@ namespace Wabbajack.Lib
{
Info("Writing inline files");
ModList.Directives.OfType<InlineFile>()
.PMap(directive =>
.PMap(Queue,directive =>
{
Status($"Writing included file {directive.To}");
var outPath = Path.Combine(StagingFolder, directive.To);
@ -254,7 +258,7 @@ namespace Wabbajack.Lib
}
missing.Where(a => a.State.GetType() != typeof(ManualDownloader.State))
.PMap(archive =>
.PMap(Queue, archive =>
{
Info($"Downloading {archive.Name}");
var output_path = Path.Combine(DownloadFolder, archive.Name);
@ -287,7 +291,7 @@ namespace Wabbajack.Lib
{
HashedArchives = Directory.EnumerateFiles(DownloadFolder)
.Where(e => !e.EndsWith(".sha"))
.PMap(e => (HashArchive(e), e))
.PMap(Queue,e => (HashArchive(e), e))
.OrderByDescending(e => File.GetLastWriteTime(e.Item2))
.GroupBy(e => e.Item1)
.Select(e => e.First())

View File

@ -131,7 +131,6 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ReportBuilder.cs" />
<Compile Include="UI\UIUtils.cs" />
<Compile Include="Updater\CheckForUpdates.cs" />
<Compile Include="Validation\DTOs.cs" />
<Compile Include="Validation\ValidateModlist.cs" />
<Compile Include="ViewModel.cs" />

View File

@ -165,7 +165,7 @@ namespace Wabbajack.Lib
installer.ModList
.Directives
.OfType<MergedPatch>()
.PMap(m =>
.PMap(_mo2Compiler.Queue, m =>
{
Utils.LogStatus($"Generating zEdit merge: {m.To}");

View File

@ -23,13 +23,23 @@ namespace Wabbajack.Test.ListValidation
api.ClearUpdatedModsInCache();
}
private WorkQueue Queue { get; set; }
[TestInitialize]
public void SetupTest()
public void Setup()
{
Directory.CreateDirectory(Consts.ModListDownloadFolder);
Utils.LogMessages.Subscribe(s => TestContext.WriteLine(s));
Queue = new WorkQueue();
}
[TestCleanup]
public void Cleanup()
{
Queue.Shutdown();
Queue = null;
}
public TestContext TestContext { get; set; }
[TestCategory("ListValidation")]
@ -59,7 +69,7 @@ namespace Wabbajack.Test.ListValidation
Log($"{installer.Archives.Count} archives to validate");
var invalids = installer.Archives
.PMap(archive =>
.PMap(Queue,archive =>
{
Log($"Validating: {archive.Name}");
return new {archive, is_valid = archive.State.Verify()};

View File

@ -21,9 +21,12 @@ namespace Wabbajack.Test
public TestContext TestContext { get; set; }
public WorkQueue Queue { get; set; }
[TestInitialize]
public void TestInitialize()
{
Queue = new WorkQueue();
Consts.TestMode = true;
utils = new TestUtils();
@ -33,7 +36,12 @@ namespace Wabbajack.Test
if (!Directory.Exists(DOWNLOAD_FOLDER))
Directory.CreateDirectory(DOWNLOAD_FOLDER);
}
[TestCleanup]
public void Cleanup()
{
Queue.Shutdown();
}
[TestMethod]
@ -89,11 +97,8 @@ namespace Wabbajack.Test
File.Copy(src, Path.Combine(utils.DownloadsFolder, filename));
if (mod_name == null)
FileExtractor.ExtractAll(src, utils.MO2Folder);
else
FileExtractor.ExtractAll(src, Path.Combine(utils.ModsFolder, mod_name));
FileExtractor.ExtractAll(Queue, src,
mod_name == null ? utils.MO2Folder : Path.Combine(utils.ModsFolder, mod_name));
}
private void DownloadAndInstall(Game game, int modid, string mod_name)
@ -127,7 +132,7 @@ namespace Wabbajack.Test
var dest = Path.Combine(utils.DownloadsFolder, file.file_name);
File.Copy(src, dest);
FileExtractor.ExtractAll(src, Path.Combine(utils.ModsFolder, mod_name));
FileExtractor.ExtractAll(Queue, src, Path.Combine(utils.ModsFolder, mod_name));
File.WriteAllText(dest + ".meta", ini);
}

View File

@ -6,6 +6,8 @@ namespace Wabbajack.Test
[TestClass]
public class VortexTests : AVortexCompilerTest
{
// TODO: figure out what games we want installed on the test server for this
/*
[TestMethod]
public void TestVortexStackSerialization()
{
@ -13,6 +15,7 @@ namespace Wabbajack.Test
utils.Configure();
var vortexCompiler = ConfigureAndRunCompiler();
vortexCompiler.StagingFolder = "vortex_staging";
var stack = vortexCompiler.MakeStack();
var serialized = Serialization.Serialize(stack);
@ -20,6 +23,8 @@ namespace Wabbajack.Test
Assert.AreEqual(serialized, rounded);
Assert.IsNotNull(vortexCompiler.GetStack());
}
*/
}
}

View File

@ -17,6 +17,7 @@ namespace Wabbajack.VirtualFileSystem.Test
private Context context;
public TestContext TestContext { get; set; }
public WorkQueue Queue { get; set; }
[TestInitialize]
public void Setup()
@ -25,7 +26,8 @@ namespace Wabbajack.VirtualFileSystem.Test
if (Directory.Exists(VFS_TEST_DIR))
Directory.Delete(VFS_TEST_DIR, true);
Directory.CreateDirectory(VFS_TEST_DIR);
context = new Context();
Queue = new WorkQueue();
context = new Context(Queue);
}
[TestMethod]
@ -176,7 +178,7 @@ namespace Wabbajack.VirtualFileSystem.Test
var state = context.GetPortableState(files);
var new_context = new Context();
var new_context = new Context(Queue);
new_context.IntegrateFromPortable(state,
new Dictionary<string, string> {{archive.Hash, archive.FullPath}});

View File

@ -32,14 +32,14 @@ namespace Wabbajack.VirtualFileSystem
public IObservable<(string, float)> ProgressUpdates => _progressUpdates;
private readonly Subject<(string, float)> _progressUpdates = new Subject<(string, float)>();
/// <summary>
/// A high throughput firehose of updates from the VFS. These go into more detail on the status
/// of what's happening in the context, but is probably too high bandwidth to tie driectly to the
/// UI.
/// </summary>
public IObservable<string> LogSpam => _logSpam;
internal readonly Subject<string> _logSpam = new Subject<string>();
public StatusUpdateTracker UpdateTracker { get; set; } = new StatusUpdateTracker(1);
public WorkQueue Queue { get; }
public Context(WorkQueue queue)
{
Queue = queue;
}
public TemporaryDirectory GetTemporaryFolder()
{
@ -60,7 +60,7 @@ namespace Wabbajack.VirtualFileSystem
var results = Channel.Create(1024, ProgressUpdater<VirtualFile>($"Indexing {root}", filesToIndex.Count));
var allFiles= filesToIndex
.PMap(f =>
.PMap(Queue, f =>
{
if (byPath.TryGetValue(f, out var found))
{
@ -112,20 +112,19 @@ namespace Wabbajack.VirtualFileSystem
bw.Write(FileVersion);
bw.Write((ulong) Index.AllFiles.Count);
var sizes = Index.AllFiles
.PMap(f =>
Index.AllFiles
.PMap(Queue, f =>
{
var ms = new MemoryStream();
f.Write(ms);
return ms;
})
.Select(ms =>
.Do(ms =>
{
var size = ms.Position;
ms.Position = 0;
bw.Write((ulong) size);
ms.CopyTo(fs);
return ms.Position;
});
Utils.Log($"Wrote {fs.Position.ToFileSizeString()} file as vfs cache file {filename}");
}
@ -181,7 +180,7 @@ namespace Wabbajack.VirtualFileSystem
foreach (var group in grouped)
{
var tmpPath = Path.Combine(_stagingFolder, Guid.NewGuid().ToString());
FileExtractor.ExtractAll(group.Key.StagedPath, tmpPath);
FileExtractor.ExtractAll(Queue, group.Key.StagedPath, tmpPath);
paths.Add(tmpPath);
foreach (var file in group)
file.StagedPath = Path.Combine(tmpPath, file.Name);
@ -215,7 +214,7 @@ namespace Wabbajack.VirtualFileSystem
var indexedState = state.GroupBy(f => f.ParentHash)
.ToDictionary(f => f.Key ?? "", f => (IEnumerable<PortableFile>) f);
var parents = indexedState[""]
.PMap(f => VirtualFile.CreateFromPortable(this, indexedState, links, f));
.PMap(Queue,f => VirtualFile.CreateFromPortable(this, indexedState, links, f));
var newIndex = Index.Integrate(parents);
lock (this)

View File

@ -121,7 +121,6 @@ namespace Wabbajack.VirtualFileSystem
string rel_path)
{
var fi = new FileInfo(abs_path);
context._logSpam.OnNext($"Analyzing {rel_path}");
var self = new VirtualFile
{
Context = context,
@ -135,15 +134,13 @@ namespace Wabbajack.VirtualFileSystem
if (FileExtractor.CanExtract(Path.GetExtension(abs_path)))
{
context._logSpam.OnNext($"Extracting {rel_path}");
using (var tempFolder = context.GetTemporaryFolder())
{
FileExtractor.ExtractAll(abs_path, tempFolder.FullName);
FileExtractor.ExtractAll(context.Queue, abs_path, tempFolder.FullName);
context._logSpam.OnNext($"Analyzing Contents {rel_path}");
self.Children = Directory.EnumerateFiles(tempFolder.FullName, "*", SearchOption.AllDirectories)
.PMap(abs_src => Analyze(context, self, abs_src, abs_src.RelativeTo(tempFolder.FullName)))
.PMap(context.Queue, abs_src => Analyze(context, self, abs_src, abs_src.RelativeTo(tempFolder.FullName)))
.ToImmutableList();
}

View File

@ -2,7 +2,6 @@
using System.Reflection;
using System.Windows;
using Wabbajack.Common;
using Wabbajack.Lib.Updater;
namespace Wabbajack
{

View File

@ -29,6 +29,9 @@ namespace Wabbajack
private readonly ObservableAsPropertyHelper<ModlistSettingsEditorVM> _CurrentModlistSettings;
public ModlistSettingsEditorVM CurrentModlistSettings => _CurrentModlistSettings.Value;
private readonly ObservableAsPropertyHelper<StatusUpdateTracker> _CurrentStatusTracker;
public StatusUpdateTracker CurrentStatusTracker => _CurrentStatusTracker.Value;
public CompilerVM(MainWindowVM mainWindowVM)
{
this.MWVM = mainWindowVM;
@ -72,6 +75,10 @@ namespace Wabbajack
this._CurrentModlistSettings = this.WhenAny(x => x.Compiler.ModlistSettings)
.ToProperty(this, nameof(this.CurrentModlistSettings));
// Let sub VM determine what progress we're seeing
this._CurrentStatusTracker = this.WhenAny(x => x.Compiler.StatusTracker)
.ToProperty(this, nameof(this.CurrentStatusTracker));
this._Image = this.WhenAny(x => x.CurrentModlistSettings.ImagePath.TargetPath)
// Throttle so that it only loads image after any sets of swaps have completed
.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler)

View File

@ -5,6 +5,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Wabbajack.Common;
namespace Wabbajack
{
@ -13,6 +14,7 @@ namespace Wabbajack
IReactiveCommand BeginCommand { get; }
bool Compiling { get; }
ModlistSettingsEditorVM ModlistSettings { get; }
StatusUpdateTracker StatusTracker { get;}
void Unload();
}
}

View File

@ -36,6 +36,9 @@ namespace Wabbajack
private readonly ObservableAsPropertyHelper<ModlistSettingsEditorVM> _ModlistSettings;
public ModlistSettingsEditorVM ModlistSettings => _ModlistSettings.Value;
[Reactive]
public StatusUpdateTracker StatusTracker { get; private set; }
public MO2CompilerVM(CompilerVM parent)
{
this.ModlistLocation = new FilePickerVM()
@ -121,6 +124,7 @@ namespace Wabbajack
{
try
{
this.StatusTracker = compiler.UpdateTracker;
compiler.Compile();
}
catch (Exception ex)
@ -128,6 +132,10 @@ namespace Wabbajack
while (ex.InnerException != null) ex = ex.InnerException;
Utils.Log($"Compiler error: {ex.ExceptionToString()}");
}
finally
{
this.StatusTracker = null;
}
});
});
this._Compiling = this.BeginCommand.IsExecuting

View File

@ -49,6 +49,9 @@ namespace Wabbajack
public ICommand FindGameInGogCommand { get; }
[Reactive]
public StatusUpdateTracker StatusTracker { get; private set; }
public VortexCompilerVM(CompilerVM parent)
{
this.GameLocation = new FilePickerVM()
@ -100,6 +103,7 @@ namespace Wabbajack
{
try
{
this.StatusTracker = compiler.UpdateTracker;
compiler.Compile();
}
catch (Exception ex)
@ -107,6 +111,10 @@ namespace Wabbajack
while (ex.InnerException != null) ex = ex.InnerException;
Utils.Log($"Compiler error: {ex.ExceptionToString()}");
}
finally
{
this.StatusTracker = null;
}
});
});
this._Compiling = this.BeginCommand.IsExecuting

View File

@ -31,9 +31,6 @@ namespace Wabbajack
private readonly ObservableAsPropertyHelper<ViewModel> _ActivePane;
public ViewModel ActivePane => _ActivePane.Value;
private readonly ObservableAsPropertyHelper<int> _QueueProgress;
public int QueueProgress => _QueueProgress.Value;
public ObservableCollectionExtended<CPUStatus> StatusList { get; } = new ObservableCollectionExtended<CPUStatus>();
public ObservableCollectionExtended<string> Log { get; } = new ObservableCollectionExtended<string>();
@ -64,9 +61,6 @@ namespace Wabbajack
.Bind(this.Log)
.Subscribe()
.DisposeWith(this.CompositeDisposable);
Utils.StatusUpdates
.Subscribe((i) => WorkQueue.Report(i.Message, i.Progress))
.DisposeWith(this.CompositeDisposable);
// Wire mode to drive the active pane.
// Note: This is currently made into a derivative property driven by mode,
@ -86,7 +80,9 @@ namespace Wabbajack
})
.ToProperty(this, nameof(this.ActivePane));
// Compile progress updates and populate ObservableCollection
/*
WorkQueue.Status
.ObserveOn(RxApp.TaskpoolScheduler)
.ToObservableChangeSet(x => x.ID)
@ -96,18 +92,7 @@ namespace Wabbajack
.Sort(SortExpressionComparer<CPUStatus>.Ascending(s => s.ID), SortOptimisations.ComparesImmutableValuesOnly)
.Bind(this.StatusList)
.Subscribe()
.DisposeWith(this.CompositeDisposable);
this._QueueProgress = WorkQueue.QueueSize
.Select(progress =>
{
if (progress.Max == 0)
{
progress.Max = 1;
}
return progress.Current * 100 / progress.Max;
})
.ToProperty(this, nameof(this.QueueProgress));
.DisposeWith(this.CompositeDisposable);*/
}
}
}

View File

@ -63,8 +63,6 @@ namespace Wabbajack.UI
{
_downloadThread = new Thread(() =>
{
WorkQueue.CustomReportFn = (progress, msg) => { DownloadProgress = progress; };
var state = DownloadDispatcher.ResolveArchive(_url);
state.Download(new Archive {Name = _downloadName, Size = _size}, _destination);
_destination.FileHash();