From a49cad012fc96430910569656aa538bc7a474ba4 Mon Sep 17 00:00:00 2001 From: Timothy Baldridge Date: Sat, 5 Feb 2022 08:47:15 -0700 Subject: [PATCH] Add support for chunked access to modlist data --- .gitignore | 3 +- Wabbajack.CLI/Verbs/ValidateLists.cs | 13 +- .../Wabbajack.Compression.Zip.Test.csproj | 27 ++++ .../ZipReaderTest.cs | 52 +++++++ Wabbajack.Compression.Zip/Extensions.cs | 21 +++ Wabbajack.Compression.Zip/ExtractedEntry.cs | 10 ++ .../Wabbajack.Compression.Zip.csproj | 13 ++ Wabbajack.Compression.Zip/ZipReader.cs | 131 ++++++++++++++++++ .../DownloadDispatcher.cs | 19 +++ .../ChunkedSeekableDownloader.cs | 7 + Wabbajack.Downloaders.Http/HttpDownloader.cs | 38 ++++- .../LoversLabDownloader.cs | 4 +- .../IChunkedSeekableStreamDownloader.cs | 11 ++ .../Wabbajack.Downloaders.Interfaces.csproj | 5 +- .../ChunkedSeekableDownloader.cs | 28 ++++ .../WabbajackCDNDownloader.cs | 25 +++- Wabbajack.IO.Async/AsyncBinaryReader.cs | 111 +++++++++++++++ Wabbajack.IO.Async/Endian.cs | 7 + Wabbajack.IO.Async/Extensions.cs | 16 +++ Wabbajack.IO.Async/Wabbajack.IO.Async.csproj | 9 ++ Wabbajack.Installer/StandardInstaller.cs | 20 +++ .../ChunkedBufferingStreamTests.cs | 76 ++++++++++ .../MemoryChunkedBufferingStream.cs | 22 +++ .../Wabbajack.Networking.Http.Test.csproj | 28 ++++ .../AChunkedBufferingStream.cs | 117 ++++++++++++++++ Wabbajack.sln | 28 ++++ 26 files changed, 823 insertions(+), 18 deletions(-) create mode 100644 Wabbajack.Compression.Zip.Test/Wabbajack.Compression.Zip.Test.csproj create mode 100644 Wabbajack.Compression.Zip.Test/ZipReaderTest.cs create mode 100644 Wabbajack.Compression.Zip/Extensions.cs create mode 100644 Wabbajack.Compression.Zip/ExtractedEntry.cs create mode 100644 Wabbajack.Compression.Zip/Wabbajack.Compression.Zip.csproj create mode 100644 Wabbajack.Compression.Zip/ZipReader.cs create mode 100644 Wabbajack.Downloaders.Http/ChunkedSeekableDownloader.cs create mode 100644 Wabbajack.Downloaders.Interfaces/IChunkedSeekableStreamDownloader.cs create mode 100644 Wabbajack.Downloaders.WabbajackCDN/ChunkedSeekableDownloader.cs create mode 100644 Wabbajack.IO.Async/AsyncBinaryReader.cs create mode 100644 Wabbajack.IO.Async/Endian.cs create mode 100644 Wabbajack.IO.Async/Extensions.cs create mode 100644 Wabbajack.IO.Async/Wabbajack.IO.Async.csproj create mode 100644 Wabbajack.Networking.Http.Test/ChunkedBufferingStreamTests.cs create mode 100644 Wabbajack.Networking.Http.Test/MemoryChunkedBufferingStream.cs create mode 100644 Wabbajack.Networking.Http.Test/Wabbajack.Networking.Http.Test.csproj create mode 100644 Wabbajack.Networking.Http/AChunkedBufferingStream.cs diff --git a/.gitignore b/.gitignore index 15f77973..19689bbd 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ bin obj .idea .vs -*.user \ No newline at end of file +*.user +.\packages\* \ No newline at end of file diff --git a/Wabbajack.CLI/Verbs/ValidateLists.cs b/Wabbajack.CLI/Verbs/ValidateLists.cs index 8ab74f5e..68929d0e 100644 --- a/Wabbajack.CLI/Verbs/ValidateLists.cs +++ b/Wabbajack.CLI/Verbs/ValidateLists.cs @@ -12,7 +12,6 @@ using FluentFTP; using Microsoft.Extensions.Logging; using Wabbajack.CLI.Services; using Wabbajack.Common; -using Wabbajack.Compiler.PatchCache; using Wabbajack.Downloaders; using Wabbajack.Downloaders.Interfaces; using Wabbajack.DTOs; @@ -22,7 +21,6 @@ using Wabbajack.DTOs.GitHub; using Wabbajack.DTOs.JsonConverters; using Wabbajack.DTOs.ModListValidation; using Wabbajack.DTOs.ServerResponses; -using Wabbajack.DTOs.Validation; using Wabbajack.Hashing.xxHash64; using Wabbajack.Installer; using Wabbajack.Networking.Discord; @@ -75,9 +73,6 @@ public class ValidateLists : IVerb var command = new Command("validate-lists"); command.Add(new Option(new[] {"-l", "-lists"}, "Lists of lists to validate") {IsRequired = true}); command.Add(new Option(new[] {"-r", "--reports"}, "Location to store validation report outputs")); - command.Add(new Option(new[] {"-a", "--archives"}, - "Location to store archives (files are named as the hex version of their hashes)") - {IsRequired = true}); command.Add(new Option(new[] {"--other-archives"}, "Look for files here before downloading (stored by hex hash name)") @@ -88,10 +83,9 @@ public class ValidateLists : IVerb return command; } - public async Task Run(List[] lists, AbsolutePath archives, AbsolutePath reports, AbsolutePath otherArchives) + public async Task Run(List[] lists, AbsolutePath reports, AbsolutePath otherArchives) { reports.CreateDirectory(); - var archiveManager = new ArchiveManager(_logger, archives); var token = CancellationToken.None; _logger.LogInformation("Scanning for existing patches/mirrors"); @@ -131,15 +125,14 @@ public class ValidateLists : IVerb using var scope = _logger.BeginScope("MachineURL: {MachineUrl}", modList.Links.MachineURL); _logger.LogInformation("Verifying {MachineUrl} - {Title}", modList.Links.MachineURL, modList.Title); - await DownloadModList(modList, archiveManager, CancellationToken.None); + //await DownloadModList(modList, archiveManager, CancellationToken.None); ModList modListData; try { _logger.LogInformation("Loading Modlist"); modListData = - await StandardInstaller.LoadFromFile(_dtos, - archiveManager.GetPath(modList.DownloadMetadata!.Hash)); + await StandardInstaller.Load(_dtos, _dispatcher, modList, token); } catch (JsonException ex) { diff --git a/Wabbajack.Compression.Zip.Test/Wabbajack.Compression.Zip.Test.csproj b/Wabbajack.Compression.Zip.Test/Wabbajack.Compression.Zip.Test.csproj new file mode 100644 index 00000000..c946f0cf --- /dev/null +++ b/Wabbajack.Compression.Zip.Test/Wabbajack.Compression.Zip.Test.csproj @@ -0,0 +1,27 @@ + + + + net6.0 + enable + + false + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + diff --git a/Wabbajack.Compression.Zip.Test/ZipReaderTest.cs b/Wabbajack.Compression.Zip.Test/ZipReaderTest.cs new file mode 100644 index 00000000..22d72bb4 --- /dev/null +++ b/Wabbajack.Compression.Zip.Test/ZipReaderTest.cs @@ -0,0 +1,52 @@ +using System; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Wabbajack.Compression.Zip.Test +{ + public class Tests + { + [Fact] + public async Task CanReadSimpleZip() + { + var random = new Random(); + + var ms = new MemoryStream(); + + var files = Enumerable.Range(1, 10) + .Select(f => + { + var buffer = new byte[1024]; + random.NextBytes(buffer); + return (f, buffer); + }).ToArray(); + + using (var zipFile = new ZipArchive(ms, ZipArchiveMode.Create, true)) + { + foreach (var (f, buffer) in files) + { + var entry = zipFile.CreateEntry(f.ToString(), CompressionLevel.Optimal); + await using var es = entry.Open(); + await es.WriteAsync(buffer); + } + } + + ms.Position = 0; + + var reader = new ZipReader(ms); + foreach (var file in (await reader.GetFiles()).Zip(files)) + { + var tms = new MemoryStream(); + await reader.Extract(file.First, tms, CancellationToken.None); + Assert.Equal(file.First.FileName, file.Second.f.ToString()); + Assert.Equal(file.Second.buffer, tms.ToArray()); + } + + } + } +} \ No newline at end of file diff --git a/Wabbajack.Compression.Zip/Extensions.cs b/Wabbajack.Compression.Zip/Extensions.cs new file mode 100644 index 00000000..b0e020c7 --- /dev/null +++ b/Wabbajack.Compression.Zip/Extensions.cs @@ -0,0 +1,21 @@ +namespace Wabbajack.Compression.Zip; + +public static class Extensions +{ + public static async Task CopyToLimitAsync(this Stream frm, Stream tw, int limit, CancellationToken token) + { + var buff = new byte[1024 * 128]; + while (limit > 0 && !token.IsCancellationRequested) + { + var toRead = Math.Min(buff.Length, limit); + var read = await frm.ReadAsync(buff.AsMemory(0, toRead), token); + if (read == 0) + throw new Exception("End of stream before end of limit"); + await tw.WriteAsync(buff.AsMemory(0, read), token); + limit -= read; + } + + await tw.FlushAsync(token); + } + +} \ No newline at end of file diff --git a/Wabbajack.Compression.Zip/ExtractedEntry.cs b/Wabbajack.Compression.Zip/ExtractedEntry.cs new file mode 100644 index 00000000..3a96e767 --- /dev/null +++ b/Wabbajack.Compression.Zip/ExtractedEntry.cs @@ -0,0 +1,10 @@ +namespace Wabbajack.Compression.Zip; + +public class ExtractedEntry +{ + public long FileOffset { get; set; } + public string FileName { get; set; } + public long CompressedSize { get; set; } + public uint UncompressedSize { get; set; } + public short CompressionMethod { get; set; } +} \ No newline at end of file diff --git a/Wabbajack.Compression.Zip/Wabbajack.Compression.Zip.csproj b/Wabbajack.Compression.Zip/Wabbajack.Compression.Zip.csproj new file mode 100644 index 00000000..f3914b2e --- /dev/null +++ b/Wabbajack.Compression.Zip/Wabbajack.Compression.Zip.csproj @@ -0,0 +1,13 @@ + + + + net6.0 + enable + enable + + + + + + + diff --git a/Wabbajack.Compression.Zip/ZipReader.cs b/Wabbajack.Compression.Zip/ZipReader.cs new file mode 100644 index 00000000..87b1b798 --- /dev/null +++ b/Wabbajack.Compression.Zip/ZipReader.cs @@ -0,0 +1,131 @@ +using System.IO.Compression; +using System.Text; +using Wabbajack.IO.Async; + +namespace Wabbajack.Compression.Zip; + +public class ZipReader : IAsyncDisposable +{ + private readonly Stream _stream; + private readonly AsyncBinaryReader _rdr; + private readonly bool _leaveOpen; + + private const uint EndOfCentralDirectoryRecordSignature = 0x06054b50; + private const uint CentralDirectoryFileHeaderSignature = 0x02014b50; + + public ZipReader(Stream s, bool leaveOpen = false) + { + _leaveOpen = leaveOpen; + _stream = s; + _rdr = new AsyncBinaryReader(s); + } + + public async Task GetFiles() + { + var sigOffset = 0; + while (true) + { + _rdr.Position = _rdr.Length - 22 - sigOffset; + if (await _rdr.ReadUInt32() == EndOfCentralDirectoryRecordSignature) + break; + sigOffset++; + } + + if (await _rdr.ReadUInt16() != 0) + { + throw new NotImplementedException("Only single disk archives are supported"); + } + + if (await _rdr.ReadInt16() != 0) + { + throw new NotImplementedException("Only single disk archives are supported"); + } + + _rdr.Position += 2; + + var totalCentralDirectoryRecords = await _rdr.ReadUInt16(); + var sizeOfCentralDirectory = await _rdr.ReadUInt32(); + var centralDirectoryOffset = await _rdr.ReadUInt32(); + + + _rdr.Position = centralDirectoryOffset; + + + var entries = new ExtractedEntry[totalCentralDirectoryRecords]; + for (var i = 0; i < totalCentralDirectoryRecords; i += 1) + { + if (await _rdr.ReadUInt32() != CentralDirectoryFileHeaderSignature) + throw new Exception("Data corruption, can't find central directory"); + + _rdr.Position += 6; + var compressionMethod = await _rdr.ReadInt16(); + _rdr.Position += 4; + var crc = await _rdr.ReadUInt32(); + var compressedSize = await _rdr.ReadUInt32(); + + var uncompressedSize = await _rdr.ReadUInt32(); + var fileNameLength = await _rdr.ReadUInt16(); + var extraFieldLength = await _rdr.ReadUInt16(); + var fileCommentLength = await _rdr.ReadUInt16(); + _rdr.Position += 8; + var fileOffset = await _rdr.ReadUInt32(); + var fileName = await _rdr.ReadFixedSizeString(fileNameLength, Encoding.UTF8); + + _rdr.Position += extraFieldLength + fileCommentLength; + + entries[i] = new ExtractedEntry + { + FileOffset = fileOffset, + FileName = fileName, + CompressedSize = compressedSize, + UncompressedSize = uncompressedSize, + CompressionMethod = compressionMethod, + }; + } + + + return entries; + + } + + public async ValueTask Extract(ExtractedEntry entry, Stream stream, CancellationToken token) + { + _stream.Position = entry.FileOffset; + _stream.Position += 6; + var flags = await _rdr.ReadUInt16(); + _stream.Position += 18; + var fnLength = await _rdr.ReadUInt16(); + var efLength = await _rdr.ReadUInt16(); + _stream.Position += fnLength + efLength; + + if (flags != 0) + throw new NotImplementedException("Don't know how to handle flags yet"); + + + switch (entry.CompressionMethod) + { + case 0: + await _stream.CopyToLimitAsync(stream, (int) entry.UncompressedSize, token); + break; + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + case 8: + var ds = new DeflateStream(_rdr.BaseStream, CompressionMode.Decompress, true); + await ds.CopyToLimitAsync(stream, (int)entry.UncompressedSize, token); + break; + default: + throw new NotImplementedException($"Have not implemented compression format {entry.CompressionMethod}"); + } + } + + public async ValueTask DisposeAsync() + { + if (!_leaveOpen) + await _stream.DisposeAsync(); + } +} \ No newline at end of file diff --git a/Wabbajack.Downloaders.Dispatcher/DownloadDispatcher.cs b/Wabbajack.Downloaders.Dispatcher/DownloadDispatcher.cs index 01139cd8..f2d9621f 100644 --- a/Wabbajack.Downloaders.Dispatcher/DownloadDispatcher.cs +++ b/Wabbajack.Downloaders.Dispatcher/DownloadDispatcher.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -255,4 +256,22 @@ public class DownloadDispatcher var url = ud.UnParse(archive.State).ToString(); return mirrorAllowList.AllowedPrefixes.Any(p => url.StartsWith(p)); } + + public async ValueTask ChunkedSeekableStream(Archive archive, CancellationToken token) + { + if (!TryGetDownloader(archive, out var downloader)) + { + throw new NotImplementedException($"Now downloader ot handle {archive.State}"); + } + + + if (downloader is IChunkedSeekableStreamDownloader cs) + { + return await cs.GetChunkedSeekableStream(archive, token); + } + else + { + throw new NotImplementedException($"Downloader {archive.State} does not support chunked seekable streams"); + } + } } \ No newline at end of file diff --git a/Wabbajack.Downloaders.Http/ChunkedSeekableDownloader.cs b/Wabbajack.Downloaders.Http/ChunkedSeekableDownloader.cs new file mode 100644 index 00000000..cf707076 --- /dev/null +++ b/Wabbajack.Downloaders.Http/ChunkedSeekableDownloader.cs @@ -0,0 +1,7 @@ +using System.Net.Http.Headers; +using System.Threading.Tasks; +using Wabbajack.DTOs; +using Wabbajack.Networking.Http; + +namespace Wabbajack.Downloaders.Http; + diff --git a/Wabbajack.Downloaders.Http/HttpDownloader.cs b/Wabbajack.Downloaders.Http/HttpDownloader.cs index d628bb89..b593c067 100644 --- a/Wabbajack.Downloaders.Http/HttpDownloader.cs +++ b/Wabbajack.Downloaders.Http/HttpDownloader.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net.Http; +using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -11,6 +13,7 @@ using Wabbajack.DTOs; using Wabbajack.DTOs.DownloadStates; using Wabbajack.DTOs.Validation; using Wabbajack.Hashing.xxHash64; +using Wabbajack.Networking.Http; using Wabbajack.Networking.Http.Interfaces; using Wabbajack.Paths; using Wabbajack.Paths.IO; @@ -18,7 +21,7 @@ using Wabbajack.RateLimiter; namespace Wabbajack.Downloaders.Http; -public class HttpDownloader : ADownloader, IUrlDownloader, IUpgradingDownloader +public class HttpDownloader : ADownloader, IUrlDownloader, IUpgradingDownloader, IChunkedSeekableStreamDownloader { private readonly HttpClient _client; private readonly IHttpDownloader _downloader; @@ -100,7 +103,7 @@ public class HttpDownloader : ADownloader, IUrlDownloa return await _client.SendAsync(msg, token); } - private static HttpRequestMessage MakeMessage(DTOs.DownloadStates.Http state) + internal static HttpRequestMessage MakeMessage(DTOs.DownloadStates.Http state) { var msg = new HttpRequestMessage(HttpMethod.Get, state.Url); foreach (var header in state.Headers) @@ -151,4 +154,35 @@ public class HttpDownloader : ADownloader, IUrlDownloa }; return new[] {$"directURL={state.Url}"}; } + + public async ValueTask GetChunkedSeekableStream(Archive archive, CancellationToken token) + { + var state = archive.State as DTOs.DownloadStates.Http; + return new ChunkedSeekableDownloader(this, archive, state!); + } + + public class ChunkedSeekableDownloader : AChunkedBufferingStream + { + private readonly DTOs.DownloadStates.Http _state; + private readonly Archive _archive; + private readonly HttpDownloader _downloader; + + public ChunkedSeekableDownloader(HttpDownloader downloader, Archive archive, DTOs.DownloadStates.Http state) : base(21, archive.Size, 8) + { + _downloader = downloader; + _archive = archive; + _state = state; + } + + public override async Task LoadChunk(long offset, int size) + { + var msg = HttpDownloader.MakeMessage(_state); + msg.Headers.Range = new RangeHeaderValue(offset, offset + size); + using var response = await _downloader._client.SendAsync(msg); + if (!response.IsSuccessStatusCode) + throw new HttpException(response); + + return await response.Content.ReadAsByteArrayAsync(); + } + } } \ No newline at end of file diff --git a/Wabbajack.Downloaders.IPS4OAuth2Downloader/LoversLabDownloader.cs b/Wabbajack.Downloaders.IPS4OAuth2Downloader/LoversLabDownloader.cs index ac9801b3..773090c5 100644 --- a/Wabbajack.Downloaders.IPS4OAuth2Downloader/LoversLabDownloader.cs +++ b/Wabbajack.Downloaders.IPS4OAuth2Downloader/LoversLabDownloader.cs @@ -8,9 +8,9 @@ using Wabbajack.Networking.Http.Interfaces; namespace Wabbajack.Downloaders.IPS4OAuth2Downloader; -public class LoversLabDownloader : AIPS4OAuth2Downloader +public class LoversLabDownloader : AIPS4OAuth2Downloader { - public LoversLabDownloader(ILogger logger, ITokenProvider loginInfo, + public LoversLabDownloader(ILogger logger, ITokenProvider loginInfo, HttpClient client, IHttpDownloader downloader, ApplicationInfo appInfo) : base(logger, loginInfo, client, downloader, appInfo, new Uri("https://loverslab.com"), "Lovers Lab") diff --git a/Wabbajack.Downloaders.Interfaces/IChunkedSeekableStreamDownloader.cs b/Wabbajack.Downloaders.Interfaces/IChunkedSeekableStreamDownloader.cs new file mode 100644 index 00000000..e57a222e --- /dev/null +++ b/Wabbajack.Downloaders.Interfaces/IChunkedSeekableStreamDownloader.cs @@ -0,0 +1,11 @@ +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Wabbajack.DTOs; + +namespace Wabbajack.Downloaders.Interfaces; + +public interface IChunkedSeekableStreamDownloader +{ + public ValueTask GetChunkedSeekableStream(Archive archive, CancellationToken token); +} \ No newline at end of file diff --git a/Wabbajack.Downloaders.Interfaces/Wabbajack.Downloaders.Interfaces.csproj b/Wabbajack.Downloaders.Interfaces/Wabbajack.Downloaders.Interfaces.csproj index 2b659529..88869b1c 100644 --- a/Wabbajack.Downloaders.Interfaces/Wabbajack.Downloaders.Interfaces.csproj +++ b/Wabbajack.Downloaders.Interfaces/Wabbajack.Downloaders.Interfaces.csproj @@ -8,8 +8,9 @@ - - + + + diff --git a/Wabbajack.Downloaders.WabbajackCDN/ChunkedSeekableDownloader.cs b/Wabbajack.Downloaders.WabbajackCDN/ChunkedSeekableDownloader.cs new file mode 100644 index 00000000..5f803865 --- /dev/null +++ b/Wabbajack.Downloaders.WabbajackCDN/ChunkedSeekableDownloader.cs @@ -0,0 +1,28 @@ +using System.Threading; +using System.Threading.Tasks; +using Wabbajack.Common; +using Wabbajack.DTOs.CDN; +using Wabbajack.DTOs.DownloadStates; +using Wabbajack.Networking.Http; + +namespace Wabbajack.Downloaders; + +public class ChunkedSeekableDownloader : AChunkedBufferingStream +{ + private readonly FileDefinition _definition; + private readonly WabbajackCDNDownloader _downloader; + private readonly WabbajackCDN _state; + + public ChunkedSeekableDownloader(WabbajackCDN state, FileDefinition definition, WabbajackCDNDownloader downloader) : base(21, definition.Size, 8) + { + _state = state; + _downloader = downloader; + _definition = definition; + } + + public override async Task LoadChunk(long offset, int size) + { + var idx = offset >> 21; + return await _downloader.GetPart(_state, _definition.Parts[idx], CancellationToken.None); + } +} \ No newline at end of file diff --git a/Wabbajack.Downloaders.WabbajackCDN/WabbajackCDNDownloader.cs b/Wabbajack.Downloaders.WabbajackCDN/WabbajackCDNDownloader.cs index 5a9629b0..de6622b1 100644 --- a/Wabbajack.Downloaders.WabbajackCDN/WabbajackCDNDownloader.cs +++ b/Wabbajack.Downloaders.WabbajackCDN/WabbajackCDNDownloader.cs @@ -19,7 +19,7 @@ using Wabbajack.RateLimiter; namespace Wabbajack.Downloaders; -public class WabbajackCDNDownloader : ADownloader, IUrlDownloader +public class WabbajackCDNDownloader : ADownloader, IUrlDownloader, IChunkedSeekableStreamDownloader { public static Dictionary DomainRemaps = new() { @@ -135,4 +135,27 @@ public class WabbajackCDNDownloader : ADownloader, IUrlDownloader { return new[] {$"directURL={state.Url}"}; } + + public async ValueTask GetChunkedSeekableStream(Archive archive, CancellationToken token) + { + var state = archive.State as WabbajackCDN; + var definition = await GetDefinition(state!, token); + return new ChunkedSeekableDownloader(state!, definition!, this); + } + + public async Task GetPart(WabbajackCDN state, PartDefinition part, CancellationToken token) + { + var msg = MakeMessage(new Uri(state.Url + $"/parts/{part.Index}")); + using var response = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead, token); + if (!response.IsSuccessStatusCode) + throw new InvalidDataException($"Bad response for part request for part {part.Index}"); + + var length = response.Content.Headers.ContentLength; + if (length != part.Size) + throw new InvalidDataException( + $"Bad part size, expected {part.Size} got {length} for part {part.Index}"); + + return await response.Content.ReadAsByteArrayAsync(token); + } + } \ No newline at end of file diff --git a/Wabbajack.IO.Async/AsyncBinaryReader.cs b/Wabbajack.IO.Async/AsyncBinaryReader.cs new file mode 100644 index 00000000..6f890261 --- /dev/null +++ b/Wabbajack.IO.Async/AsyncBinaryReader.cs @@ -0,0 +1,111 @@ +using System.Buffers.Binary; +using System.Text; + +namespace Wabbajack.IO.Async; + +public class AsyncBinaryReader +{ + private readonly Stream _s; + private readonly Memory _buffer; + private readonly Endian _endian; + + public AsyncBinaryReader(Stream s, Endian endian = Endian.Little) + { + _s = s; + _buffer = new Memory(new byte[256]); + _endian = endian; + } + + public async ValueTask ReadByte() + { + await _s.ReadAllAsync(_buffer[..1]); + return _buffer.Span[0]; + } + + public async ValueTask ReadUInt16() + { + await _s.ReadAllAsync(_buffer[..2]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadUInt16BigEndian(_buffer[..2].Span) : + BinaryPrimitives.ReadUInt16LittleEndian(_buffer[..2].Span); + } + + public async ValueTask ReadUInt32() + { + await _s.ReadAllAsync(_buffer[..4]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadUInt32BigEndian(_buffer[..4].Span) : + BinaryPrimitives.ReadUInt32LittleEndian(_buffer[..4].Span); + } + + public async ValueTask ReadUInt64() + { + await _s.ReadAllAsync(_buffer[..8]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadUInt64BigEndian(_buffer[..8].Span) : + BinaryPrimitives.ReadUInt64LittleEndian(_buffer[..8].Span); + } + + public async ValueTask ReadInt16() + { + await _s.ReadAllAsync(_buffer[..2]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadInt16BigEndian(_buffer[..2].Span) : + BinaryPrimitives.ReadInt16LittleEndian(_buffer[..2].Span); + } + + public async ValueTask ReadInt32() + { + await _s.ReadAllAsync(_buffer[..4]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadInt32BigEndian(_buffer[..4].Span) : + BinaryPrimitives.ReadInt32LittleEndian(_buffer[..4].Span); + } + + public async ValueTask ReadInt64() + { + await _s.ReadAllAsync(_buffer[..8]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadInt64BigEndian(_buffer[..8].Span) : + BinaryPrimitives.ReadInt64LittleEndian(_buffer[..8].Span); + } + + public async ValueTask ReadFloat() + { + await _s.ReadAllAsync(_buffer[..4]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadSingleBigEndian(_buffer[..4].Span) : + BinaryPrimitives.ReadSingleLittleEndian(_buffer[..4].Span); + } + + public async ValueTask ReadDouble() + { + await _s.ReadAllAsync(_buffer[..8]); + return _endian == Endian.Big ? + BinaryPrimitives.ReadDoubleBigEndian(_buffer[..8].Span) : + BinaryPrimitives.ReadDoubleLittleEndian(_buffer[..8].Span); + } + + public async ValueTask ReadBytes(int size) + { + var bytes = new byte[size]; + await _s.ReadAllAsync(bytes); + return bytes; + } + + public long Position + { + set => _s.Position = value; + get => _s.Position; + } + + public long Length => _s.Length; + public Stream BaseStream => _s; + + public async Task ReadFixedSizeString(ushort length, Encoding encoding) + { + var buf = new Memory(new byte[length]); + await _s.ReadAllAsync(buf); + return encoding.GetString(buf.Span); + } +} \ No newline at end of file diff --git a/Wabbajack.IO.Async/Endian.cs b/Wabbajack.IO.Async/Endian.cs new file mode 100644 index 00000000..13b0e9a5 --- /dev/null +++ b/Wabbajack.IO.Async/Endian.cs @@ -0,0 +1,7 @@ +namespace Wabbajack.IO.Async; + +public enum Endian +{ + Little, + Big +} \ No newline at end of file diff --git a/Wabbajack.IO.Async/Extensions.cs b/Wabbajack.IO.Async/Extensions.cs new file mode 100644 index 00000000..e83fbbce --- /dev/null +++ b/Wabbajack.IO.Async/Extensions.cs @@ -0,0 +1,16 @@ +namespace Wabbajack.IO.Async; + +public static class Extensions +{ + public static async ValueTask ReadAllAsync(this Stream frm, Memory output) + { + var read = 0; + while (read < output.Length) + { + var thisRead = await frm.ReadAsync(output[read..]); + if (thisRead == 0) + throw new Exception("End of stream reached before limit"); + read += thisRead; + } + } +} \ No newline at end of file diff --git a/Wabbajack.IO.Async/Wabbajack.IO.Async.csproj b/Wabbajack.IO.Async/Wabbajack.IO.Async.csproj new file mode 100644 index 00000000..eb2460e9 --- /dev/null +++ b/Wabbajack.IO.Async/Wabbajack.IO.Async.csproj @@ -0,0 +1,9 @@ + + + + net6.0 + enable + enable + + + diff --git a/Wabbajack.Installer/StandardInstaller.cs b/Wabbajack.Installer/StandardInstaller.cs index 2be0601f..f5336699 100644 --- a/Wabbajack.Installer/StandardInstaller.cs +++ b/Wabbajack.Installer/StandardInstaller.cs @@ -4,6 +4,7 @@ using System.Globalization; using System.IO; using System.Linq; using System.Text; +using System.Text.Json; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -14,6 +15,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Wabbajack.Common; using Wabbajack.Compression.BSA; +using Wabbajack.Compression.Zip; using Wabbajack.Downloaders; using Wabbajack.Downloaders.GameFile; using Wabbajack.DTOs; @@ -408,4 +410,22 @@ public class StandardInstaller : AInstaller await BinaryPatching.ApplyPatch(new MemoryStream(srcData), new MemoryStream(patchData), fs); }); } + + public static async Task Load(DTOSerializer dtos, DownloadDispatcher dispatcher, ModlistMetadata metadata, CancellationToken token) + { + var archive = new Archive + { + State = dispatcher.Parse(new Uri(metadata.Links.Download))!, + Size = metadata.DownloadMetadata!.Size, + Hash = metadata.DownloadMetadata.Hash + }; + + var stream = await dispatcher.ChunkedSeekableStream(archive, token); + await using var reader = new ZipReader(stream); + var entry = (await reader.GetFiles()).First(e => e.FileName == "modlist"); + var ms = new MemoryStream(); + await reader.Extract(entry, ms, token); + ms.Position = 0; + return JsonSerializer.Deserialize(ms, dtos.Options)!; + } } \ No newline at end of file diff --git a/Wabbajack.Networking.Http.Test/ChunkedBufferingStreamTests.cs b/Wabbajack.Networking.Http.Test/ChunkedBufferingStreamTests.cs new file mode 100644 index 00000000..c4612cb8 --- /dev/null +++ b/Wabbajack.Networking.Http.Test/ChunkedBufferingStreamTests.cs @@ -0,0 +1,76 @@ +using System; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Wabbajack.Compression.Zip; +using Wabbajack.Hashing.xxHash64; +using Xunit; + +namespace Wabbajack.Networking.Http.Test; + +public class ChunkedBufferingStreamTests +{ + private readonly Random _random; + private readonly byte[] _data; + private readonly MemoryStream _mstream; + + public ChunkedBufferingStreamTests() + { + _random = new Random(); + _data = new byte[1024 * 1024 * 24]; + _random.NextBytes(_data); + _mstream = new MemoryStream(_data); + } + + [Fact] + public async Task CanHashStream() + { + var cstream = new MemoryChunkedBufferingStream(_mstream); + Assert.Equal(await _data.Hash(), await cstream.Hash(CancellationToken.None)); + } + + [Fact] + public async Task CanExtractOneFile() + { + var ms = new MemoryStream(); + + var files = Enumerable.Range(1, 10) + .Select(f => + { + var buffer = new byte[1024]; + _random.NextBytes(buffer); + return (f, buffer); + }).ToArray(); + + using (var zipFile = new ZipArchive(ms, ZipArchiveMode.Create, true)) + { + foreach (var (f, buffer) in files) + { + var entry = zipFile.CreateEntry(f.ToString(), CompressionLevel.Optimal); + await using var es = entry.Open(); + await es.WriteAsync(buffer); + } + + { + var entry = zipFile.CreateEntry("ending", CompressionLevel.Optimal); + await using var es = entry.Open(); + await es.WriteAsync(Encoding.UTF8.GetBytes("Cheese for Everyone!")); + } + } + + ms.Position = 0; + await using (var zipFile = new ZipReader(new MemoryChunkedBufferingStream(ms))) + { + var entry = (await zipFile.GetFiles()).First(f => f.FileName == "ending"); + + var ems = new MemoryStream(); + await zipFile.Extract(entry, ems, CancellationToken.None); + Assert.Equal(Encoding.UTF8.GetBytes("Cheese for Everyone!"), ems.ToArray()); + } + + + } +} \ No newline at end of file diff --git a/Wabbajack.Networking.Http.Test/MemoryChunkedBufferingStream.cs b/Wabbajack.Networking.Http.Test/MemoryChunkedBufferingStream.cs new file mode 100644 index 00000000..e25d357f --- /dev/null +++ b/Wabbajack.Networking.Http.Test/MemoryChunkedBufferingStream.cs @@ -0,0 +1,22 @@ +using System.IO; +using System.Threading.Tasks; + +namespace Wabbajack.Networking.Http.Test; + +public class MemoryChunkedBufferingStream : AChunkedBufferingStream +{ + private readonly MemoryStream _src; + + public MemoryChunkedBufferingStream(MemoryStream src) : base(4, src.Length, 16) + { + _src = src; + } + + public override async Task LoadChunk(long offset, int size) + { + var buff = new byte[size]; + _src.Position = offset; + await _src.ReadAsync(buff); + return buff; + } +} \ No newline at end of file diff --git a/Wabbajack.Networking.Http.Test/Wabbajack.Networking.Http.Test.csproj b/Wabbajack.Networking.Http.Test/Wabbajack.Networking.Http.Test.csproj new file mode 100644 index 00000000..76b1d79e --- /dev/null +++ b/Wabbajack.Networking.Http.Test/Wabbajack.Networking.Http.Test.csproj @@ -0,0 +1,28 @@ + + + + net6.0 + enable + + false + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + diff --git a/Wabbajack.Networking.Http/AChunkedBufferingStream.cs b/Wabbajack.Networking.Http/AChunkedBufferingStream.cs new file mode 100644 index 00000000..d4415daf --- /dev/null +++ b/Wabbajack.Networking.Http/AChunkedBufferingStream.cs @@ -0,0 +1,117 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Wabbajack.Networking.Http; + +public abstract class AChunkedBufferingStream : Stream +{ + private readonly int _chunkSize; + private readonly int _position; + private readonly int _maxChunks; + private readonly Dictionary _chunks; + private readonly uint _chunkBitsShift; + private readonly ulong _chunkMask; + + protected AChunkedBufferingStream(int chunkBitsShift, long totalSize, int maxChunks) + { + _chunkSize = 1 << chunkBitsShift; + _chunkBitsShift = (uint)chunkBitsShift; + _chunkMask = ulong.MaxValue << chunkBitsShift; + Length = totalSize; + _maxChunks = maxChunks; + _chunks = new Dictionary(); + } + + public abstract Task LoadChunk(long offset, int size); + + public override void Flush() + { + throw new System.NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (Position >= Length) return 0; + + var chunkId = (ulong)Position & _chunkMask; + if (!_chunks.TryGetValue(chunkId, out var chunkData)) + { + var chunk = await LoadChunk((long)chunkId, _chunkSize); + _chunks.Add(chunkId, chunk); + } + + var chunkOffset = (ulong)Position ^ _chunkMask; + var availableRead = (ulong)chunkData!.Length - chunkOffset; + var toRead = Math.Min((ulong)count, availableRead); + + Array.Copy(chunkData, (uint)chunkOffset, buffer, offset, count); + return (int)toRead; + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = new CancellationToken()) + { + if (Position >= Length) return 0; + + var chunkId = (ulong)Position & _chunkMask; + if (!_chunks.TryGetValue(chunkId, out var chunkData)) + { + chunkData = await LoadChunk((long)chunkId, _chunkSize); + _chunks.Add(chunkId, chunkData); + } + + var chunkOffset = (ulong)Position & ~_chunkMask; + var availableRead = (ulong)chunkData!.Length - chunkOffset; + var toRead = Math.Min((ulong)buffer.Length, availableRead); + + var chunkBuff = new Memory(chunkData).Slice((int)chunkOffset, (int)toRead); + + chunkBuff.CopyTo(buffer); + Position += (long)toRead; + + return (int)toRead; + } + + + public override long Seek(long offset, SeekOrigin origin) + { + switch (origin) + { + case SeekOrigin.Begin: + Position = offset; + break; + case SeekOrigin.Current: + Position += offset; + break; + case SeekOrigin.End: + Position = Length - offset; + break; + default: + throw new ArgumentOutOfRangeException(nameof(origin), origin, null); + } + return Position; + } + + public override void SetLength(long value) + { + throw new System.NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new System.NotImplementedException(); + } + + public override bool CanRead => true; + public override bool CanSeek => true; + public override bool CanWrite => true; + public override long Length { get; } + public override long Position { get; set; } = 0; +} \ No newline at end of file diff --git a/Wabbajack.sln b/Wabbajack.sln index f0008b0d..c91fb041 100644 --- a/Wabbajack.sln +++ b/Wabbajack.sln @@ -125,6 +125,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Wabbajack.App.Blazor", "Wab EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{18E36813-CB53-4172-8FF3-EFE3B9B30A5F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Networking.Http.Test", "Wabbajack.Networking.Http.Test\Wabbajack.Networking.Http.Test.csproj", "{34FC755D-24F0-456A-B5C1-5BA7F12DC233}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Compression.Zip", "Wabbajack.Compression.Zip\Wabbajack.Compression.Zip.csproj", "{10165025-D30B-44B7-A764-50E15603AE56}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.IO.Async", "Wabbajack.IO.Async\Wabbajack.IO.Async.csproj", "{64AD7E26-5643-4969-A61C-E0A90FA25FCB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Compression.Zip.Test", "Wabbajack.Compression.Zip.Test\Wabbajack.Compression.Zip.Test.csproj", "{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -339,6 +347,22 @@ Global {C6E9B15D-510F-4074-AB1C-069F36BA4622}.Debug|Any CPU.Build.0 = Debug|Any CPU {C6E9B15D-510F-4074-AB1C-069F36BA4622}.Release|Any CPU.ActiveCfg = Release|Any CPU {C6E9B15D-510F-4074-AB1C-069F36BA4622}.Release|Any CPU.Build.0 = Release|Any CPU + {34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Debug|Any CPU.Build.0 = Debug|Any CPU + {34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Release|Any CPU.ActiveCfg = Release|Any CPU + {34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Release|Any CPU.Build.0 = Release|Any CPU + {10165025-D30B-44B7-A764-50E15603AE56}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {10165025-D30B-44B7-A764-50E15603AE56}.Debug|Any CPU.Build.0 = Debug|Any CPU + {10165025-D30B-44B7-A764-50E15603AE56}.Release|Any CPU.ActiveCfg = Release|Any CPU + {10165025-D30B-44B7-A764-50E15603AE56}.Release|Any CPU.Build.0 = Release|Any CPU + {64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Release|Any CPU.Build.0 = Release|Any CPU + {6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -380,6 +404,10 @@ Global {4F252332-CA77-41DE-95A8-9DF38A81D675} = {98B731EE-4FC0-4482-A069-BCBA25497871} {AB9A5C22-10CC-4EE0-A808-FB1DC9E24247} = {F01F8595-5FD7-4506-8469-F4A5522DACC1} {D6351587-CAF6-4CB6-A2BD-5368E69F297C} = {F01F8595-5FD7-4506-8469-F4A5522DACC1} + {34FC755D-24F0-456A-B5C1-5BA7F12DC233} = {F01F8595-5FD7-4506-8469-F4A5522DACC1} + {10165025-D30B-44B7-A764-50E15603AE56} = {F677890D-5109-43BC-97C7-C4CD47C8EE0C} + {64AD7E26-5643-4969-A61C-E0A90FA25FCB} = {F677890D-5109-43BC-97C7-C4CD47C8EE0C} + {6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C} = {F677890D-5109-43BC-97C7-C4CD47C8EE0C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {0AA30275-0F38-4A7D-B645-F5505178DDE8}