bug fixes and priority settings

This commit is contained in:
Timothy Baldridge 2021-10-20 21:18:15 -06:00
parent 2d0147ac8a
commit 24830c1df6
28 changed files with 158 additions and 93 deletions

View File

@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
@ -119,8 +120,8 @@ namespace Wabbajack.App.ViewModels
_slides[InSlideRange(_currentSlideIndex + 1)].PreCache(_httpClient).FireAndForget();
var prevSlide = _slides[InSlideRange(_currentSlideIndex - 2)];
if (prevSlide.Image != null)
prevSlide.Image = null;
//if (prevSlide.Image != null)
// prevSlide.Image = null;
Dispatcher.UIThread.InvokeAsync(() =>
{
@ -180,14 +181,15 @@ namespace Wabbajack.App.ViewModels
_installer = _provider.GetService<StandardInstaller>()!;
_installer.OnStatusUpdate += (_, update) =>
_installer.OnStatusUpdate = async update =>
{
Dispatcher.UIThread.InvokeAsync(() =>
Trace.TraceInformation("Update....");
await Dispatcher.UIThread.InvokeAsync(() =>
{
StatusText = update.StatusText;
StepsProgress = update.StepsProgress;
StepProgress = update.StepProgress;
});
}, DispatcherPriority.Background);
};
_logger.LogInformation("Installer created, starting the installation process");

View File

@ -3,6 +3,7 @@ using System.Net.Http;
using System.Threading.Tasks;
using Avalonia.Media;
using Avalonia.Media.Imaging;
using Avalonia.Threading;
using ReactiveUI;
using ReactiveUI.Fody.Helpers;
using Wabbajack.DTOs.DownloadStates;
@ -29,7 +30,13 @@ namespace Wabbajack.App.ViewModels.SubViewModels
{
Loading = true;
var url = await client.GetByteArrayAsync(MetaState.ImageURL);
Image = new Bitmap(new MemoryStream(url));
var img = new Bitmap(new MemoryStream(url));
await Dispatcher.UIThread.InvokeAsync(() =>
{
Image = img;
});
Loading = false;
}

View File

@ -46,7 +46,6 @@ namespace Wabbajack.CLI
services.AddSingleton<TemporaryFileManager>();
services.AddSingleton<FileExtractor.FileExtractor>();
services.AddSingleton(new VFSCache(KnownFolders.EntryPoint.Combine("vfscache.sqlite")));
services.AddSingleton(new FileHashCache(KnownFolders.EntryPoint.Combine("filehashpath.sqlite")));
services.AddSingleton(new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount});
services.AddSingleton<Client>();
services.AddSingleton<Networking.WabbajackClientApi.Client>();

View File

@ -468,6 +468,7 @@ namespace Wabbajack.CLI.Verbs
return $"{definition!.Hash.ToHex()}/parts/{idx}";
}
/* Outdated
await definition.Parts.PDo(_parallelOptions, async part =>
{
_logger.LogInformation("Uploading mirror part of {name} {hash} ({index}/{length})", archive.Name, archive.Hash, part.Index, definition.Parts.Length);
@ -490,6 +491,7 @@ namespace Wabbajack.CLI.Verbs
});
*/
await CircuitBreaker.WithAutoRetryAllAsync(_logger, async () =>
{
using var client = await GetMirrorFtpClient(token);

View File

@ -4,39 +4,25 @@ using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Wabbajack.RateLimiter;
namespace Wabbajack.Common
{
public static class AsyncParallelExtensions
{
public static IAsyncEnumerable<TOut> PMap<TIn, TOut>(this IEnumerable<TIn> coll, ParallelOptions options,
Func<TIn, Task<TOut>> mapFn)
{
var queue = Channel.CreateBounded<TOut>(options.MaxDegreeOfParallelism);
Parallel.ForEachAsync(coll, options, async (x, token) =>
{
var result = await mapFn(x);
await queue.Writer.WriteAsync(result, token);
}).ContinueWith(async t =>
{
queue.Writer.TryComplete();
}).FireAndForget();
return queue.Reader.ReadAllAsync();
}
public static async Task PDo<TIn>(this IEnumerable<TIn> coll, ParallelOptions options, Func<TIn, Task> mapFn)
{
await Parallel.ForEachAsync(coll, options, async (x, token) => await mapFn(x));
}
public static async Task PDoAll<TIn>(this IEnumerable<TIn> coll, Func<TIn, Task> mapFn)
{
var tasks = coll.Select(mapFn).ToList();
await Task.WhenAll(tasks);
}
public static async Task PDoAll<TIn, TJob>(this IEnumerable<TIn> coll, IResource<TJob> limiter, Func<TIn, Task> mapFn)
{
using var job = await limiter.Begin("", 0, CancellationToken.None);
var tasks = coll.Select(mapFn).ToList();
await Task.WhenAll(tasks);
}
public static async IAsyncEnumerable<TOut> PMapAll<TIn, TOut>(this IEnumerable<TIn> coll, Func<TIn, Task<TOut>> mapFn)
{
var tasks = coll.Select(mapFn).ToList();
@ -46,6 +32,16 @@ namespace Wabbajack.Common
}
}
public static async IAsyncEnumerable<TOut> PMapAll<TIn, TJob, TOut>(this IEnumerable<TIn> coll, IResource<TJob> limiter, Func<TIn, Task<TOut>> mapFn)
{
var tasks = coll.Select(mapFn).ToList();
foreach (var itm in tasks)
{
using var job = await limiter.Begin("", 0, CancellationToken.None);
yield return await itm;
}
}
public static async Task<List<T>> ToList<T>(this IAsyncEnumerable<T> coll)
{
List<T> lst = new();

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Wabbajack.Common
{
@ -40,5 +41,14 @@ namespace Wabbajack.Common
yield return itm;
}
}
public static async IAsyncEnumerable<T> OnEach<T>(this IEnumerable<T> coll, Func<T, Task> fn)
{
foreach (var itm in coll)
{
await fn(itm);
yield return itm;
}
}
}
}

View File

@ -16,6 +16,7 @@ using Wabbajack.DTOs.Texture;
using Wabbajack.Hashing.PHash;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
using Xunit;
namespace Wabbajack.Compiler.Test
@ -125,7 +126,8 @@ namespace Wabbajack.Compiler.Test
bsa.Delete();
var creator = BSADispatch.CreateBuilder(bsaState, _manager);
await fileStates.Take(2).PDo(_parallelOptions, async f => await creator.AddFile(f, f.Path.RelativeTo(_mod.FullPath).Open(FileMode.Open),CancellationToken.None));
await fileStates.Take(2).PDoAll(new Resource<CompilerSanityTests>(),
async f => await creator.AddFile(f, f.Path.RelativeTo(_mod.FullPath).Open(FileMode.Open),CancellationToken.None));
{
await using var fs = bsa.Open(FileMode.Create, FileAccess.Write);
await creator.Build(fs, CancellationToken.None);

View File

@ -56,15 +56,17 @@ namespace Wabbajack.Compiler
private string _statusText;
private long _currentStepProgress;
private readonly Stopwatch _updateStopWatch = new();
public readonly IResource<ACompiler> CompilerLimiter;
protected long MaxSteps { get; set; }
public event EventHandler<StatusUpdate> OnStatusUpdate;
public ACompiler(ILogger logger, FileExtractor.FileExtractor extractor, FileHashCache hashCache, Context vfs, TemporaryFileManager manager, CompilerSettings settings,
ParallelOptions parallelOptions, DownloadDispatcher dispatcher, Client wjClient, IGameLocator locator, DTOSerializer dtos,
ParallelOptions parallelOptions, DownloadDispatcher dispatcher, Client wjClient, IGameLocator locator, DTOSerializer dtos, IResource<ACompiler> compilerLimiter,
IBinaryPatchCache patchCache)
{
CompilerLimiter = compilerLimiter;
_logger = logger;
_extractor = extractor;
_hashCache = hashCache;
@ -196,7 +198,7 @@ namespace Wabbajack.Compiler
{
_logger.LogInformation("Getting meta data for {count} archives", SelectedArchives.Count);
NextStep("Gathering Metadata", SelectedArchives.Count);
await SelectedArchives.PDo(_parallelOptions, async a =>
await SelectedArchives.PDoAll(CompilerLimiter, async a =>
{
UpdateProgress(1);
await _dispatcher.FillInMetadata(a);
@ -263,7 +265,7 @@ namespace Wabbajack.Compiler
protected async Task CleanInvalidArchivesAndFillState()
{
NextStep("Cleaning Invalid Archives", 1);
var remove = await IndexedArchives.PMap(_parallelOptions, async a =>
var remove = await IndexedArchives.PMapAll(CompilerLimiter, async a =>
{
try
{
@ -319,7 +321,7 @@ namespace Wabbajack.Compiler
var toFind = await _settings.Downloads.EnumerateFiles()
.Where(f => f.Extension != Ext.Meta)
.PMap(_parallelOptions, async f => await HasInvalidMeta(f) ? f : default)
.PMapAll(CompilerLimiter,async f => await HasInvalidMeta(f) ? f : default)
.Where(f => f != default)
.Where(f => f.FileExists())
.ToList();
@ -480,7 +482,7 @@ namespace Wabbajack.Compiler
// Load in the patches
await InstallDirectives.OfType<PatchedFromArchive>()
.Where(p => p.PatchID == default)
.PDo(_parallelOptions, async pfa =>
.PDoAll(CompilerLimiter,async pfa =>
{
var patches = await _patchOptions[pfa]
@ -556,7 +558,7 @@ namespace Wabbajack.Compiler
.ToDictionary(f => f.Key, f => f.First());
SelectedArchives.Clear();
SelectedArchives.AddRange(await hashes.PMap(_parallelOptions, hash =>
SelectedArchives.AddRange(await hashes.PMapAll(CompilerLimiter,hash =>
{
UpdateProgress(1);
return ResolveArchive(hash, archives);

View File

@ -83,7 +83,7 @@ namespace Wabbajack.Compiler.CompilationSteps
//_cleanup = await source.File.Context.Stage(source.File.Children);
}
var matches = await sourceFiles.PMap(_compiler._parallelOptions, e => _mo2Compiler.RunStack(stack, new RawSourceFile(e, Consts.BSACreationDir.Combine((RelativePath)id, (RelativePath)e.Name))))
var matches = await sourceFiles.PMapAll(_compiler.CompilerLimiter, e => _mo2Compiler.RunStack(stack, new RawSourceFile(e, Consts.BSACreationDir.Combine((RelativePath)id, (RelativePath)e.Name))))
.ToList();

View File

@ -16,6 +16,7 @@ using Wabbajack.Installer;
using Wabbajack.Networking.WabbajackClientApi;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
using Wabbajack.VFS;
namespace Wabbajack.Compiler
@ -26,8 +27,8 @@ namespace Wabbajack.Compiler
public MO2Compiler(ILogger<MO2Compiler> logger, FileExtractor.FileExtractor extractor, FileHashCache hashCache, Context vfs,
TemporaryFileManager manager, MO2CompilerSettings settings, ParallelOptions parallelOptions, DownloadDispatcher dispatcher,
Client wjClient, IGameLocator locator, DTOSerializer dtos, IBinaryPatchCache patchCache) :
base(logger, extractor, hashCache, vfs, manager, settings, parallelOptions, dispatcher, wjClient, locator, dtos, patchCache)
Client wjClient, IGameLocator locator, DTOSerializer dtos, IResource<ACompiler> compilerLimiter, IBinaryPatchCache patchCache) :
base(logger, extractor, hashCache, vfs, manager, settings, parallelOptions, dispatcher, wjClient, locator, dtos, compilerLimiter, patchCache)
{
MaxSteps = 14;
}
@ -64,7 +65,7 @@ namespace Wabbajack.Compiler
// Find all Downloads
IndexedArchives = await Settings.Downloads.EnumerateFiles()
.Where(f => f.WithExtension(Ext.Meta).FileExists())
.PMap(_parallelOptions,
.PMapAll(CompilerLimiter,
async f => new IndexedArchive(_vfs.Index.ByRootPath[f])
{
Name = (string)f.FileName,
@ -120,7 +121,7 @@ namespace Wabbajack.Compiler
var stack = MakeStack();
NextStep("Running Compilation Stack", AllFiles.Count);
var results = await AllFiles.PMap(_parallelOptions, f =>
var results = await AllFiles.PMapAll(CompilerLimiter, f =>
{
UpdateProgress(1);
return RunStack(stack, f);

View File

@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging;
using Wabbajack.Common;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
using Xunit;
namespace Wabbajack.Compression.BSA.Test
@ -55,7 +56,7 @@ namespace Wabbajack.Compression.BSA.Test
var reader = await BSADispatch.Open(path);
var dataStates = await reader.Files
.PMap(_parallelOptions,
.PMapAll(new Resource<CompressionTests>("Compression Test", 4),
async file =>
{
var ms = new MemoryStream();
@ -69,7 +70,7 @@ namespace Wabbajack.Compression.BSA.Test
var build = BSADispatch.CreateBuilder(oldState, _tempManager);
await dataStates.PDo(_parallelOptions,
await dataStates.PDoAll(
async itm => { await build.AddFile(itm.State, itm.Stream, CancellationToken.None); });
@ -79,7 +80,7 @@ namespace Wabbajack.Compression.BSA.Test
var reader2 = await BSADispatch.Open(new MemoryStreamFactory(rebuiltStream, path, path.LastModifiedUtc()));
await reader.Files.Zip(reader2.Files)
.PDo(_parallelOptions, async pair =>
.PDoAll(async pair =>
{
var (oldFile, newFile) = pair;
_logger.LogInformation("Comparing {old} and {new}", oldFile.Path, newFile.Path);

View File

@ -7,6 +7,7 @@ using Wabbajack.DTOs.JsonConverters;
using Wabbajack.DTOs.Validation;
using Wabbajack.Networking.WabbajackClientApi;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
using Xunit;
using YamlDotNet.Serialization;
using YamlDotNet.Serialization.NamingConventions;
@ -79,7 +80,8 @@ namespace Wabbajack.DTOs.Test
var statuses = await _wjClient.GetListStatuses();
Assert.True(statuses.Length > 10);
await statuses.PDo(_parallelOptions, async status =>
await statuses.PDoAll(new Resource<ModListTests>("Resource Test", 4),
async status =>
{
_logger.LogInformation("Loading {machineURL}", status.MachineURL);
var detailed = await _wjClient.GetDetailedStatus(status.MachineURL);

View File

@ -277,7 +277,7 @@ namespace Wabbajack.FileExtractor
}*/
var results = await dest.Path.EnumerateFiles()
.PMap(_parallelOptions, async f =>
.PMapAll(async f =>
{
var path = f.RelativeTo(dest.Path);
if (!shouldExtract(path)) return ((RelativePath, T))default;

View File

@ -1,14 +1,15 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.RateLimiter;
namespace Wabbajack.Hashing.xxHash64
{
public static class ByteArrayExtensions
{
public static async ValueTask<Hash> Hash(this byte[] data)
public static async ValueTask<Hash> Hash(this byte[] data, IJob job = null)
{
return await new MemoryStream(data).HashingCopy(Stream.Null, CancellationToken.None);
return await new MemoryStream(data).HashingCopy(Stream.Null, CancellationToken.None, job);
}
}

View File

@ -55,7 +55,7 @@ namespace Wabbajack.Installer
protected long MaxSteps { get; set; }
public event EventHandler<StatusUpdate> OnStatusUpdate;
public Func<StatusUpdate, Task>? OnStatusUpdate;
@ -79,7 +79,7 @@ namespace Wabbajack.Installer
_wjClient = wjClient;
}
public void NextStep(string statusText, long maxStepProgress)
public async Task NextStep(string statusText, long maxStepProgress)
{
_updateStopWatch.Restart();
_maxStepProgress = maxStepProgress;
@ -89,24 +89,27 @@ namespace Wabbajack.Installer
if (OnStatusUpdate != null)
{
OnStatusUpdate(this, new StatusUpdate($"[{_currentStep}/{MaxSteps}] " + statusText, Percent.FactoryPutInRange(_currentStep, MaxSteps),
Percent.Zero));
await OnStatusUpdate!(new StatusUpdate($"[{_currentStep}/{MaxSteps}] " + statusText, Percent.FactoryPutInRange(_currentStep, MaxSteps), Percent.Zero));
}
}
public void UpdateProgress(long stepProgress)
private const int _limitMS = 100;
public async ValueTask UpdateProgress(long stepProgress)
{
Interlocked.Add(ref _currentStepProgress, stepProgress);
if (_updateStopWatch.ElapsedMilliseconds < _limitMS) return;
lock (_updateStopWatch)
{
if (_updateStopWatch.ElapsedMilliseconds < 100) return;
if (_updateStopWatch.ElapsedMilliseconds < _limitMS) return;
_updateStopWatch.Restart();
}
if (OnStatusUpdate != null)
{
OnStatusUpdate(this, new StatusUpdate(_statusText, Percent.FactoryPutInRange(_currentStep, MaxSteps),
await OnStatusUpdate!(new StatusUpdate(_statusText, Percent.FactoryPutInRange(_currentStep, MaxSteps),
Percent.FactoryPutInRange(_currentStepProgress, _maxStepProgress)));
}
}
@ -184,7 +187,7 @@ namespace Wabbajack.Installer
public async Task InstallArchives(CancellationToken token)
{
NextStep("Installing files", ModList.Directives.Sum(d => d.Size));
await NextStep("Installing files", ModList.Directives.Sum(d => d.Size));
var grouped = ModList.Directives
.OfType<FromArchive>()
.Select(a => new { VF = _vfs.Index.FileForArchiveHashPath(a.ArchiveHashPath), Directive = a })
@ -198,7 +201,7 @@ namespace Wabbajack.Installer
foreach (var directive in grouped[vf])
{
var file = directive.Directive;
UpdateProgress(file.Size);
await UpdateProgress(file.Size);
switch (file)
{
@ -285,7 +288,7 @@ namespace Wabbajack.Installer
}
_logger.LogInformation("Downloading {count} archives", missing.Count);
NextStep("Downloading files", missing.Count);
await NextStep("Downloading files", missing.Count);
await missing
.OrderBy(a => a.Size)
@ -306,7 +309,7 @@ namespace Wabbajack.Installer
}
await DownloadArchive(archive, download, token, outputPath);
UpdateProgress(1);
await UpdateProgress(1);
});
}
@ -362,7 +365,7 @@ namespace Wabbajack.Installer
var hashResults = await
toHash
.PMap(_parallelOptions, async e => (await _fileHashCache.FileHashCachedAsync(e, token), e))
.PMapAll(async e => (await _fileHashCache.FileHashCachedAsync(e, token), e))
.ToList();
HashedArchives = hashResults
@ -391,11 +394,11 @@ namespace Wabbajack.Installer
var savePath = (RelativePath)"saves";
var existingFiles = _configuration.Install.EnumerateFiles().ToList();
NextStep("Optimizing Modlist: Looking for files to delete", existingFiles.Count);
await NextStep("Optimizing Modlist: Looking for files to delete", existingFiles.Count);
await existingFiles
.PDo(_parallelOptions, async f =>
.PDoAll(async f =>
{
UpdateProgress(1);
await UpdateProgress(1);
var relativeTo = f.RelativeTo(_configuration.Install);
if (indexed.ContainsKey(relativeTo) || f.InFolder(_configuration.Downloads))
return;
@ -410,12 +413,12 @@ namespace Wabbajack.Installer
});
_logger.LogInformation("Cleaning empty folders");
NextStep("Optimizing Modlist: Cleaning empty folders", indexed.Keys.Count);
var expectedFolders = indexed.Keys
await NextStep("Optimizing Modlist: Cleaning empty folders", indexed.Keys.Count);
var expectedFolders = (await indexed.Keys
.Select(f => f.RelativeTo(_configuration.Install))
// We ignore the last part of the path, so we need a dummy file name
.Append(_configuration.Downloads.Combine("_"))
.OnEach(_ => UpdateProgress(1))
.OnEach(async _ => await UpdateProgress(1))
.Where(f => f.InFolder(_configuration.Install))
.SelectMany(path =>
{
@ -424,6 +427,7 @@ namespace Wabbajack.Installer
var split = ((string)path.RelativeTo(_configuration.Install)).Split('\\');
return Enumerable.Range(1, split.Length - 1).Select(t => string.Join("\\", split.Take(t)));
})
.ToList())
.Distinct()
.Select(p => _configuration.Install.Combine(p))
.ToHashSet();
@ -444,10 +448,10 @@ namespace Wabbajack.Installer
var existingfiles = _configuration.Install.EnumerateFiles().ToHashSet();
NextStep("Optimizing Modlist: Removing redundant directives", indexed.Count);
await NextStep("Optimizing Modlist: Removing redundant directives", indexed.Count);
await indexed.Values.PMapAll<Directive, Directive?>(async d =>
{
UpdateProgress(1);
await UpdateProgress(1);
// Bit backwards, but we want to return null for
// all files we *want* installed. We return the files
// to remove from the install list.

View File

@ -33,6 +33,12 @@ namespace Wabbajack.Installer
return new FileIniDataParser(IniParser()).ReadFile(file.ToString());
}
public static void SaveIniFile(this IniData data, AbsolutePath file)
{
var parser = new FileIniDataParser(IniParser());
parser.WriteFile(file.ToString(), data);
}
/// <summary>
/// Loads a INI from the given string
/// </summary>

View File

@ -121,6 +121,7 @@ namespace Wabbajack.Installer
await GenerateZEditMerges(token);
await ForcePortable();
await RemapMO2File();
CreateOutputMods();
@ -129,11 +130,24 @@ namespace Wabbajack.Installer
await ExtractedModlistFolder!.DisposeAsync();
await _wjClient.SendMetric(MetricNames.FinishInstall, ModList.Name);
NextStep("Finished", 1);
await NextStep("Finished", 1);
_logger.LogInformation("Finished Installation");
return true;
}
private async Task RemapMO2File()
{
var iniFile = _configuration.Install.Combine("ModOrganizer.ini");
if (!iniFile.FileExists()) return;
_logger.LogInformation("Remapping ModOrganizer.ini");
var iniData = iniFile.LoadIniFile();
var settings = iniData["Settings"];
settings["download_directory"] = _configuration.Downloads.ToString();
iniData.SaveIniFile(iniFile);
}
private void CreateOutputMods()
{
_configuration.Install.Combine("profiles")
@ -185,7 +199,7 @@ namespace Wabbajack.Installer
private async Task InstallIncludedDownloadMetas(CancellationToken token)
{
await ModList.Archives
.PDo(_parallelOptions, async archive =>
.PDoAll(async archive =>
{
if (HashedArchives.TryGetValue(archive.Hash, out var paths))
{
@ -219,8 +233,9 @@ namespace Wabbajack.Installer
var sourceDir = _configuration.Install.Combine(BSACreationDir, bsa.TempID);
var a = BSADispatch.CreateBuilder(bsa.State, _manager);
var streams = await bsa.FileStates.PMap(_parallelOptions, async state =>
var streams = await bsa.FileStates.PMapAll(async state =>
{
var fs = sourceDir.Combine(state.Path).Open(FileMode.Open, FileAccess.Read, FileShare.Read);
await a.AddFile(state, fs, token);
return fs;
@ -246,12 +261,12 @@ namespace Wabbajack.Installer
private async Task InstallIncludedFiles(CancellationToken token)
{
_logger.LogInformation("Writing inline files");
NextStep("Installing Included Files", ModList.Directives.OfType<InlineFile>().Count());
await NextStep("Installing Included Files", ModList.Directives.OfType<InlineFile>().Count());
await ModList.Directives
.OfType<InlineFile>()
.PDo(_parallelOptions, async directive =>
.PDoAll(async directive =>
{
UpdateProgress(1);
await UpdateProgress(1);
var outPath = _configuration.Install.Combine(directive.To);
outPath.Delete();
@ -261,7 +276,7 @@ namespace Wabbajack.Installer
await WriteRemappedFile(file);
break;
default:
await outPath.WriteAllBytesAsync(await LoadBytesFromPath(directive.SourceDataID));
await outPath.WriteAllBytesAsync(await LoadBytesFromPath(directive.SourceDataID), token: token);
break;
}
});
@ -363,7 +378,7 @@ namespace Wabbajack.Installer
await _configuration.ModList
.Directives
.OfType<MergedPatch>()
.PDo(_parallelOptions, async m =>
.PDoAll(async m =>
{
_logger.LogInformation("Generating zEdit merge: {to}", m.To);

View File

@ -5,6 +5,7 @@ using System.IO.Compression;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Wabbajack.Common;
@ -20,6 +21,7 @@ using Wabbajack.Networking.Http.Interfaces;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
using Wabbajack.VFS;
using YamlDotNet.Serialization;
using YamlDotNet.Serialization.NamingConventions;
@ -37,9 +39,11 @@ namespace Wabbajack.Networking.WabbajackClientApi
private readonly ParallelOptions _parallelOptions;
private readonly IResource<HttpClient> _limiter;
private readonly Configuration _configuration;
private readonly IResource<FileHashCache> _hashLimiter;
public Client(ILogger<Client> logger, HttpClient client, ITokenProvider<WabbajackApiState> token, DTOSerializer dtos, IResource<HttpClient> limiter, Configuration configuration)
public Client(ILogger<Client> logger, HttpClient client, ITokenProvider<WabbajackApiState> token, DTOSerializer dtos,
IResource<HttpClient> limiter, IResource<FileHashCache> hashLimiter, Configuration configuration)
{
_configuration = configuration;
_token = token;
@ -47,6 +51,7 @@ namespace Wabbajack.Networking.WabbajackClientApi
_logger = logger;
_dtos = dtos;
_limiter = limiter;
_hashLimiter = hashLimiter;
}
private async ValueTask<HttpRequestMessage> MakeMessage(HttpMethod method, Uri uri)
@ -163,15 +168,16 @@ namespace Wabbajack.Networking.WabbajackClientApi
OriginalFileName = path.FileName,
Size = path.Size(),
Hash = await path.Hash(),
Parts = await parts.PMap(_parallelOptions, async part =>
Parts = await parts.PMapAll(async part =>
{
var buffer = new byte[part.Size];
using var job = await _hashLimiter.Begin("Hashing part", part.Size, CancellationToken.None);
await using (var fs = path.Open(FileMode.Open, FileAccess.Read, FileShare.Read))
{
fs.Position = part.Offset;
await fs.ReadAsync(buffer);
}
part.Hash = await buffer.Hash();
part.Hash = await buffer.Hash(job);
return part;
}).ToArray()
};

View File

@ -16,6 +16,7 @@
<ProjectReference Include="..\Wabbajack.Common\Wabbajack.Common.csproj" />
<ProjectReference Include="..\Wabbajack.DTOs\Wabbajack.DTOs.csproj" />
<ProjectReference Include="..\Wabbajack.Paths.IO\Wabbajack.Paths.IO.csproj" />
<ProjectReference Include="..\Wabbajack.VFS\Wabbajack.VFS.csproj" />
</ItemGroup>
</Project>

View File

@ -7,6 +7,7 @@ namespace Wabbajack.RateLimiter
{
StatusReport StatusReport { get; }
string Name { get; }
}
public interface IResource<T> : IResource
{

View File

@ -23,10 +23,10 @@ namespace Wabbajack.RateLimiter
public Resource(string humanName, int maxTasks = Int32.MaxValue, long maxThroughput = long.MaxValue)
public Resource(string? humanName = null, int? maxTasks = 0, long maxThroughput = long.MaxValue)
{
_humanName = humanName;
_maxTasks = maxTasks;
_humanName = humanName ?? "<unknown>";
_maxTasks = maxTasks ?? Environment.ProcessorCount;
_maxThroughput = maxThroughput;
_semaphore = new SemaphoreSlim(_maxTasks);

View File

@ -15,6 +15,7 @@ using Wabbajack.Hashing.xxHash64;
using Wabbajack.Networking.WabbajackClientApi;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
using Wabbajack.Server.DataLayer;
using Wabbajack.Server.DTOs;
using Wabbajack.Server.TokenProviders;
@ -105,7 +106,7 @@ namespace Wabbajack.Server.Services
return $"{definition.Hash.ToHex()}/parts/{idx}";
}
await definition.Parts.PDo(_parallelOptions, async part =>
await definition.Parts.PDoAll(new Resource<MirrorUploader>(), async part =>
{
_logger.LogInformation("Uploading mirror part ({index}/{length})", part.Index, definition.Parts.Length);

View File

@ -34,6 +34,7 @@ namespace Wabbajack.Server.Services
await _dispatcher.PrepareAll(archives.Select(a => a.State));
var random = new Random();
/*
var results = await archives.PMap(_parallelOptions, async archive =>
{
try
@ -82,7 +83,8 @@ namespace Wabbajack.Server.Services
_logger.Log(LogLevel.Warning, $"Validation failed for {archive.Name} from {archive.State.PrimaryKeyString}");
_logger.Log(LogLevel.Information, $"Non-nexus validation completed {failed} out of {passed} failed");
return failed;
*/
return default;
}
}
}

View File

@ -44,8 +44,8 @@ namespace Wabbajack.Services.OSIntegrated
service.AddTransient(s => new TemporaryFileManager(KnownFolders.EntryPoint.Combine("temp", Guid.NewGuid().ToString())));
service.AddSingleton(s => options.UseLocalCache ?
new FileHashCache(s.GetService<TemporaryFileManager>()!.CreateFile().Path)
: new FileHashCache(KnownFolders.AppDataLocal.Combine("Wabbajack", "GlobalHashCache.sqlite")));
new FileHashCache(s.GetService<TemporaryFileManager>()!.CreateFile().Path, s.GetService<IResource<FileHashCache>>()!)
: new FileHashCache(KnownFolders.AppDataLocal.Combine("Wabbajack", "GlobalHashCache.sqlite"), s.GetService<IResource<FileHashCache>>()!));
service.AddSingleton(s => options.UseLocalCache ?
new VFSCache(s.GetService<TemporaryFileManager>()!.CreateFile().Path)
@ -59,6 +59,7 @@ namespace Wabbajack.Services.OSIntegrated
service.AddAllSingleton<IResource, IResource<DownloadDispatcher>>(s => new Resource<DownloadDispatcher>("Downloads", 12));
service.AddAllSingleton<IResource, IResource<HttpClient>>(s => new Resource<HttpClient>("Web Requests", 12));
service.AddAllSingleton<IResource, IResource<Context>>(s => new Resource<Context>("VFS", 12));
service.AddAllSingleton<IResource, IResource<FileHashCache>>(s => new Resource<FileHashCache>("File Hashing", 12));
service.AddAllSingleton<IResource, IResource<FileExtractor.FileExtractor>>(s =>
new Resource<FileExtractor.FileExtractor>("File Extractor", 12));

View File

@ -21,7 +21,7 @@ namespace Wabbajack.VFS.Test
// Keep this fixed at 2 so that we can detect deadlocks in the VFS parallelOptions
service.AddSingleton(new ParallelOptions {MaxDegreeOfParallelism = 2});
service.AddSingleton(new FileHashCache(KnownFolders.EntryPoint.Combine("hashcache.sqlite")));
service.AddSingleton(new FileHashCache(KnownFolders.EntryPoint.Combine("hashcache.sqlite"), new Resource<FileHashCache>("File Hashing", 10)));
service.AddSingleton(new VFSCache(KnownFolders.EntryPoint.Combine("vfscache.sqlite")));
service.AddTransient<Context>();
service.AddSingleton<FileExtractor.FileExtractor>();

View File

@ -80,7 +80,7 @@ namespace Wabbajack.VFS
var filesToIndex = roots.SelectMany(root => root.EnumerateFiles()).ToList();
var allFiles = await filesToIndex
.PMap(_parallelOptions, async f =>
.PMapAll(async f =>
{
if (native.TryGetValue(f, out var found))
if (found.LastModified == f.LastModifiedUtc().AsUnixTime() && found.Size == f.Size())
@ -151,7 +151,7 @@ namespace Wabbajack.VFS
}
}
await filesByParent[top].PDo(_parallelOptions,
await filesByParent[top].PDoAll(
async file => await HandleFile(file, new ExtractedNativeFile(file.AbsoluteName) { CanMove = false }));
}

View File

@ -14,9 +14,11 @@ namespace Wabbajack.VFS
private readonly SQLiteConnection _conn;
private readonly string _connectionString;
private readonly AbsolutePath _location;
private readonly IResource<FileHashCache> _limiter;
public FileHashCache(AbsolutePath location)
public FileHashCache(AbsolutePath location, IResource<FileHashCache> limiter)
{
_limiter = limiter;
_location = location;
if (!_location.Parent.DirectoryExists())
@ -114,10 +116,11 @@ namespace Wabbajack.VFS
WriteHashCache(file, hash);
}
public async Task<Hash> FileHashCachedAsync(AbsolutePath file, CancellationToken token, IJob? job = null)
public async Task<Hash> FileHashCachedAsync(AbsolutePath file, CancellationToken token)
{
if (TryGetHashCache(file, out var foundHash)) return foundHash;
using var job = await _limiter.Begin($"Hasing {file.FileName}", file.Size(), token);
await using var fs = file.Open(FileMode.Open, FileAccess.Read, FileShare.Read);
var hash = await fs.HashingCopy(Stream.Null, token, job);

View File

@ -168,7 +168,7 @@ namespace Wabbajack.VFS
{
var absPath = (AbsolutePath)extractedFile.Name;
job.Size = absPath.Size();
hash = await context.HashCache.FileHashCachedAsync(absPath, token, job);
hash = await context.HashCache.FileHashCachedAsync(absPath, token);
}
else
{