Rework validation

This commit is contained in:
Timothy Baldridge 2021-12-18 09:14:39 -07:00
parent 5b3225e5f8
commit 19ad2ec331
4 changed files with 61 additions and 137 deletions

View File

@ -3,9 +3,7 @@ using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Diagnostics;
using System.Drawing;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text.Json;
using System.Threading;
@ -96,10 +94,10 @@ public class ValidateLists : IVerb
var token = CancellationToken.None;
_logger.LogInformation("Scanning for existing patches/mirrors");
var mirroredFiles = await AllMirroredFiles(token);
_logger.LogInformation("Found {count} mirrored files", mirroredFiles.Count);
var patchFiles = await AllPatchFiles(token);
_logger.LogInformation("Found {count} patches", patchFiles.Count);
var mirroredFiles = (await _wjClient.GetAllMirroredFileDefinitions(token)).Select(m => m.Hash).ToHashSet();
_logger.LogInformation("Found {Count} mirrored files", mirroredFiles.Count);
var patchFiles = await _wjClient.GetAllPatches(token);
_logger.LogInformation("Found {Count} patches", patchFiles.Length);
var otherArchiveManager = otherArchives == default ? null : new ArchiveManager(_logger, otherArchives);
@ -111,14 +109,6 @@ public class ValidateLists : IVerb
(x => x.State.PrimaryKeyString + x.Hash,
archive => DownloadAndValidate(archive, archiveManager, otherArchiveManager, mirrorAllowList, token));
var mirrorCache = new LazyCache<string, Archive, (ArchiveStatus Status, Archive archive)>
(x => x.State.PrimaryKeyString + x.Hash,
archive => AttemptToMirrorArchive(archive, archiveManager, mirrorAllowList, mirroredFiles, token));
var patchCache = new LazyCache<string, Archive, (ArchiveStatus Status, ValidatedArchive? ValidatedArchive)>
(x => x.State.PrimaryKeyString + x.Hash,
archive => AttemptToPatchArchive(archive, archiveManager, upgradedArchives, patchFiles, token));
var stopWatch = Stopwatch.StartNew();
var listData = await lists.SelectAsync(async l => await _gitHubClient.GetData(l))
.SelectMany(l => l.Lists)
@ -141,8 +131,8 @@ public class ValidateLists : IVerb
return validatedList;
}
using var scope = _logger.BeginScope("MachineURL: {machineURL}", modList.Links.MachineURL);
_logger.LogInformation("Verifying {machineURL} - {title}", modList.Links.MachineURL, modList.Title);
using var scope = _logger.BeginScope("MachineURL: {MachineUrl}", modList.Links.MachineURL);
_logger.LogInformation("Verifying {MachineUrl} - {Title}", modList.Links.MachineURL, modList.Title);
await DownloadModList(modList, archiveManager, CancellationToken.None);
ModList modListData;
@ -159,7 +149,7 @@ public class ValidateLists : IVerb
return validatedList;
}
_logger.LogInformation("Verifying {count} archives", modListData.Archives.Length);
_logger.LogInformation("Verifying {Count} archives", modListData.Archives.Length);
var archives = await modListData.Archives.PMapAll(async archive =>
{
@ -168,29 +158,48 @@ public class ValidateLists : IVerb
if (result.Status == ArchiveStatus.InValid)
{
result = await mirrorCache.Get(archive);
if (mirroredFiles.Contains(archive.Hash))
{
return new ValidatedArchive
{
Status = ArchiveStatus.Mirrored,
Original = archive,
PatchedFrom = new Archive
{
State = new WabbajackCDN
{
Url = _wjClient.GetMirrorUrl(archive.Hash)!
},
Size = archive.Size,
Name = archive.Name,
Hash = archive.Hash
}
};
}
}
if (result.Status == ArchiveStatus.InValid)
{
_logger.LogInformation("Looking for patch");
var patchResult = await patchCache.Get(archive);
if (patchResult.Status == ArchiveStatus.Updated)
return patchResult.ValidatedArchive;
return new ValidatedArchive
_logger.LogInformation("Looking for patch for {Hash}", archive.Hash);
foreach (var file in patchFiles.Where(p => p.Original.Hash == archive.Hash && p.Status == ArchiveStatus.Updated))
{
Original = archive,
Status = ArchiveStatus.InValid
};
if (await _dispatcher.Verify(file.PatchedFrom!, token))
{
return new ValidatedArchive()
{
Original = archive,
Status = ArchiveStatus.Updated,
PatchUrl = file.PatchUrl,
PatchedFrom = file.PatchedFrom
};
}
}
}
return new ValidatedArchive
return new ValidatedArchive()
{
Original = archive,
Status = result.Status,
PatchedFrom = result.Status is ArchiveStatus.Mirrored or ArchiveStatus.Updated
? result.archive
: null
Status = ArchiveStatus.InValid,
Original = archive
};
}).ToArray();
@ -432,94 +441,6 @@ public class ValidateLists : IVerb
await _dtos.Serialize(upgradedMetas, upgradedMetasFile, true);
}
private async Task<(ArchiveStatus Status, Archive archive)> AttemptToMirrorArchive(Archive archive,
ArchiveManager archiveManager, ServerAllowList mirrorAllowList, HashSet<Hash> previouslyMirrored,
CancellationToken token)
{
// Do we have a file to mirror?
if (!archiveManager.HaveArchive(archive.Hash)) return (ArchiveStatus.InValid, archive);
// Are we allowed to mirror the file?
if (!_dispatcher.Matches(archive, mirrorAllowList)) return (ArchiveStatus.InValid, archive);
var mirroredArchive = new Archive
{
Name = archive.Name,
Size = archive.Size,
Hash = archive.Hash,
State = new WabbajackCDN
{
Url = new Uri($"{MirrorPrefix}{archive.Hash.ToHex()}")
}
};
mirroredArchive.Meta = _dispatcher.MetaIniSection(mirroredArchive);
// If it's already mirrored, we can exit
if (previouslyMirrored.Contains(archive.Hash)) return (ArchiveStatus.Mirrored, mirroredArchive);
// We need to mirror the file, but do we have a copy to mirror?
if (!archiveManager.HaveArchive(archive.Hash)) return (ArchiveStatus.InValid, mirroredArchive);
var srcPath = archiveManager.GetPath(archive.Hash);
var definition = await _wjClient.GenerateFileDefinition(srcPath);
using (var client = await GetMirrorFtpClient(token))
{
using var job = await _ftpRateLimiter.Begin("Starting uploading mirrored file", 0, token);
await client.CreateDirectoryAsync($"{definition.Hash.ToHex()}", token);
await client.CreateDirectoryAsync($"{definition.Hash.ToHex()}/parts", token);
}
string MakePath(long idx)
{
return $"{definition!.Hash.ToHex()}/parts/{idx}";
}
foreach (var part in definition.Parts)
{
_logger.LogInformation("Uploading mirror part of {Name} {Hash} ({Index}/{Length})", archive.Name,
archive.Hash, part.Index, definition.Parts.Length);
using var job = await _ftpRateLimiter.Begin("Uploading mirror part", part.Size, token);
var buffer = new byte[part.Size];
await using (var fs = srcPath.Open(FileMode.Open, FileAccess.Read, FileShare.Read))
{
fs.Position = part.Offset;
await fs.ReadAsync(buffer, token);
}
var tsk = job.Report((int) part.Size, token);
await CircuitBreaker.WithAutoRetryAllAsync(_logger, async () =>
{
using var client = await GetMirrorFtpClient(token);
var name = MakePath(part.Index);
await client.UploadAsync(new MemoryStream(buffer), name, token: token);
});
await tsk;
}
await CircuitBreaker.WithAutoRetryAllAsync(_logger, async () =>
{
using var client = await GetMirrorFtpClient(token);
_logger.LogInformation($"Finishing mirror upload");
using var job = await _ftpRateLimiter.Begin("Finishing mirror upload", 0, token);
await using var ms = new MemoryStream();
await using (var gz = new GZipStream(ms, CompressionLevel.Optimal, true))
{
await _dtos.Serialize(definition, gz);
}
ms.Position = 0;
var remoteName = $"{definition.Hash.ToHex()}/definition.json.gz";
await client.UploadAsync(ms, remoteName, token: token);
});
return (ArchiveStatus.Mirrored, mirroredArchive);
}
private async Task<(ArchiveStatus, Archive)> DownloadAndValidate(Archive archive, ArchiveManager archiveManager,
ArchiveManager? otherArchiveManager, ServerAllowList mirrorAllowList, CancellationToken token)
{

View File

@ -143,7 +143,7 @@ public class DownloadDispatcher
{
try
{
var url = await _wjClient.GetMirrorUrl(archive.Hash);
var url = _wjClient.GetMirrorUrl(archive.Hash);
if (url == null) return default;
var newArchive =

View File

@ -113,19 +113,9 @@ public class Client
return await _client.GetFromJsonAsync<Archive[]>(_limiter, msg, _dtos.Options) ?? Array.Empty<Archive>();
}
public async Task<Uri?> GetMirrorUrl(Hash archiveHash)
public Uri GetMirrorUrl(Hash archiveHash)
{
try
{
var result =
await _client.GetStringAsync($"{_configuration.BuildServerUrl}mirror/{archiveHash.ToHex()}");
return new Uri(result);
}
catch (Exception ex)
{
_logger.LogCritical(ex, "While downloading mirror for {hash}", archiveHash);
return null;
}
return new Uri($"{_configuration.MirrorServerUrl}{archiveHash.ToHex()}");
}
public async Task SendModListDefinition(ModList modList)
@ -267,16 +257,18 @@ public class Client
throw new HttpException(result);
}
private async Task<(string Sha, T Content)> GetGithubFile<T>(string owner, string repo, string path)
private async Task<(string Sha, T Content)> GetGithubFile<T>(string owner, string repo, string path, CancellationToken? token = null)
{
token ??= CancellationToken.None;
var msg = await MakeMessage(HttpMethod.Get,
new Uri($"{_configuration.BuildServerUrl}github/?owner={owner}&repo={repo}&path={path}"));
using var oldData = await _client.SendAsync(msg);
using var oldData = await _client.SendAsync(msg, token.Value);
if (!oldData.IsSuccessStatusCode)
throw new HttpException(oldData);
var sha = oldData.Headers.GetValues(_configuration.ResponseShaHeader).First();
return (sha, (await oldData.Content.ReadFromJsonAsync<T>(_dtos.Options))!);
return (sha, (await oldData.Content.ReadFromJsonAsync<T>(_dtos.Options, token.Value))!);
}
@ -316,6 +308,16 @@ public class Client
if (!finalResult.IsSuccessStatusCode)
throw new HttpException(result);
}
public async Task<FileDefinition[]> GetAllMirroredFileDefinitions(CancellationToken token)
{
return (await _client.GetFromJsonAsync<FileDefinition[]>($"{_configuration.BuildServerUrl}mirrored_files",
_dtos.Options, token))!;
}
public async Task<ValidatedArchive[]> GetAllPatches(CancellationToken token)
{
return (await GetGithubFile<ValidatedArchive[]>("wabbajack-tools", "mod-lists", "configs/forced_healing.json", token)).Content;
}
}

View File

@ -4,6 +4,7 @@ namespace Wabbajack.Networking.WabbajackClientApi;
public class Configuration
{
public Uri MirrorServerUrl { get; set; } = new ("https://mirror.wabbajack.org");
public Uri ServerUri { get; set; } = new("https://build.wabbajack.org");
public string MetricsKey { get; set; }
public string MetricsKeyHeader { get; set; } = "x-metrics-key";