Add support for chunked access to modlist data

This commit is contained in:
Timothy Baldridge 2022-02-05 08:47:15 -07:00
parent a551fce80b
commit a49cad012f
26 changed files with 823 additions and 18 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ obj
.idea
.vs
*.user
.\packages\*

View File

@ -12,7 +12,6 @@ using FluentFTP;
using Microsoft.Extensions.Logging;
using Wabbajack.CLI.Services;
using Wabbajack.Common;
using Wabbajack.Compiler.PatchCache;
using Wabbajack.Downloaders;
using Wabbajack.Downloaders.Interfaces;
using Wabbajack.DTOs;
@ -22,7 +21,6 @@ using Wabbajack.DTOs.GitHub;
using Wabbajack.DTOs.JsonConverters;
using Wabbajack.DTOs.ModListValidation;
using Wabbajack.DTOs.ServerResponses;
using Wabbajack.DTOs.Validation;
using Wabbajack.Hashing.xxHash64;
using Wabbajack.Installer;
using Wabbajack.Networking.Discord;
@ -75,9 +73,6 @@ public class ValidateLists : IVerb
var command = new Command("validate-lists");
command.Add(new Option<List[]>(new[] {"-l", "-lists"}, "Lists of lists to validate") {IsRequired = true});
command.Add(new Option<AbsolutePath>(new[] {"-r", "--reports"}, "Location to store validation report outputs"));
command.Add(new Option<AbsolutePath>(new[] {"-a", "--archives"},
"Location to store archives (files are named as the hex version of their hashes)")
{IsRequired = true});
command.Add(new Option<AbsolutePath>(new[] {"--other-archives"},
"Look for files here before downloading (stored by hex hash name)")
@ -88,10 +83,9 @@ public class ValidateLists : IVerb
return command;
}
public async Task<int> Run(List[] lists, AbsolutePath archives, AbsolutePath reports, AbsolutePath otherArchives)
public async Task<int> Run(List[] lists, AbsolutePath reports, AbsolutePath otherArchives)
{
reports.CreateDirectory();
var archiveManager = new ArchiveManager(_logger, archives);
var token = CancellationToken.None;
_logger.LogInformation("Scanning for existing patches/mirrors");
@ -131,15 +125,14 @@ public class ValidateLists : IVerb
using var scope = _logger.BeginScope("MachineURL: {MachineUrl}", modList.Links.MachineURL);
_logger.LogInformation("Verifying {MachineUrl} - {Title}", modList.Links.MachineURL, modList.Title);
await DownloadModList(modList, archiveManager, CancellationToken.None);
//await DownloadModList(modList, archiveManager, CancellationToken.None);
ModList modListData;
try
{
_logger.LogInformation("Loading Modlist");
modListData =
await StandardInstaller.LoadFromFile(_dtos,
archiveManager.GetPath(modList.DownloadMetadata!.Hash));
await StandardInstaller.Load(_dtos, _dispatcher, modList, token);
}
catch (JsonException ex)
{

View File

@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Wabbajack.Compression.Zip\Wabbajack.Compression.Zip.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,52 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Wabbajack.Compression.Zip.Test
{
public class Tests
{
[Fact]
public async Task CanReadSimpleZip()
{
var random = new Random();
var ms = new MemoryStream();
var files = Enumerable.Range(1, 10)
.Select(f =>
{
var buffer = new byte[1024];
random.NextBytes(buffer);
return (f, buffer);
}).ToArray();
using (var zipFile = new ZipArchive(ms, ZipArchiveMode.Create, true))
{
foreach (var (f, buffer) in files)
{
var entry = zipFile.CreateEntry(f.ToString(), CompressionLevel.Optimal);
await using var es = entry.Open();
await es.WriteAsync(buffer);
}
}
ms.Position = 0;
var reader = new ZipReader(ms);
foreach (var file in (await reader.GetFiles()).Zip(files))
{
var tms = new MemoryStream();
await reader.Extract(file.First, tms, CancellationToken.None);
Assert.Equal(file.First.FileName, file.Second.f.ToString());
Assert.Equal(file.Second.buffer, tms.ToArray());
}
}
}
}

View File

@ -0,0 +1,21 @@
namespace Wabbajack.Compression.Zip;
public static class Extensions
{
public static async Task CopyToLimitAsync(this Stream frm, Stream tw, int limit, CancellationToken token)
{
var buff = new byte[1024 * 128];
while (limit > 0 && !token.IsCancellationRequested)
{
var toRead = Math.Min(buff.Length, limit);
var read = await frm.ReadAsync(buff.AsMemory(0, toRead), token);
if (read == 0)
throw new Exception("End of stream before end of limit");
await tw.WriteAsync(buff.AsMemory(0, read), token);
limit -= read;
}
await tw.FlushAsync(token);
}
}

View File

@ -0,0 +1,10 @@
namespace Wabbajack.Compression.Zip;
public class ExtractedEntry
{
public long FileOffset { get; set; }
public string FileName { get; set; }
public long CompressedSize { get; set; }
public uint UncompressedSize { get; set; }
public short CompressionMethod { get; set; }
}

View File

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Wabbajack.IO.Async\Wabbajack.IO.Async.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,131 @@
using System.IO.Compression;
using System.Text;
using Wabbajack.IO.Async;
namespace Wabbajack.Compression.Zip;
public class ZipReader : IAsyncDisposable
{
private readonly Stream _stream;
private readonly AsyncBinaryReader _rdr;
private readonly bool _leaveOpen;
private const uint EndOfCentralDirectoryRecordSignature = 0x06054b50;
private const uint CentralDirectoryFileHeaderSignature = 0x02014b50;
public ZipReader(Stream s, bool leaveOpen = false)
{
_leaveOpen = leaveOpen;
_stream = s;
_rdr = new AsyncBinaryReader(s);
}
public async Task<ExtractedEntry[]> GetFiles()
{
var sigOffset = 0;
while (true)
{
_rdr.Position = _rdr.Length - 22 - sigOffset;
if (await _rdr.ReadUInt32() == EndOfCentralDirectoryRecordSignature)
break;
sigOffset++;
}
if (await _rdr.ReadUInt16() != 0)
{
throw new NotImplementedException("Only single disk archives are supported");
}
if (await _rdr.ReadInt16() != 0)
{
throw new NotImplementedException("Only single disk archives are supported");
}
_rdr.Position += 2;
var totalCentralDirectoryRecords = await _rdr.ReadUInt16();
var sizeOfCentralDirectory = await _rdr.ReadUInt32();
var centralDirectoryOffset = await _rdr.ReadUInt32();
_rdr.Position = centralDirectoryOffset;
var entries = new ExtractedEntry[totalCentralDirectoryRecords];
for (var i = 0; i < totalCentralDirectoryRecords; i += 1)
{
if (await _rdr.ReadUInt32() != CentralDirectoryFileHeaderSignature)
throw new Exception("Data corruption, can't find central directory");
_rdr.Position += 6;
var compressionMethod = await _rdr.ReadInt16();
_rdr.Position += 4;
var crc = await _rdr.ReadUInt32();
var compressedSize = await _rdr.ReadUInt32();
var uncompressedSize = await _rdr.ReadUInt32();
var fileNameLength = await _rdr.ReadUInt16();
var extraFieldLength = await _rdr.ReadUInt16();
var fileCommentLength = await _rdr.ReadUInt16();
_rdr.Position += 8;
var fileOffset = await _rdr.ReadUInt32();
var fileName = await _rdr.ReadFixedSizeString(fileNameLength, Encoding.UTF8);
_rdr.Position += extraFieldLength + fileCommentLength;
entries[i] = new ExtractedEntry
{
FileOffset = fileOffset,
FileName = fileName,
CompressedSize = compressedSize,
UncompressedSize = uncompressedSize,
CompressionMethod = compressionMethod,
};
}
return entries;
}
public async ValueTask Extract(ExtractedEntry entry, Stream stream, CancellationToken token)
{
_stream.Position = entry.FileOffset;
_stream.Position += 6;
var flags = await _rdr.ReadUInt16();
_stream.Position += 18;
var fnLength = await _rdr.ReadUInt16();
var efLength = await _rdr.ReadUInt16();
_stream.Position += fnLength + efLength;
if (flags != 0)
throw new NotImplementedException("Don't know how to handle flags yet");
switch (entry.CompressionMethod)
{
case 0:
await _stream.CopyToLimitAsync(stream, (int) entry.UncompressedSize, token);
break;
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
case 8:
var ds = new DeflateStream(_rdr.BaseStream, CompressionMode.Decompress, true);
await ds.CopyToLimitAsync(stream, (int)entry.UncompressedSize, token);
break;
default:
throw new NotImplementedException($"Have not implemented compression format {entry.CompressionMethod}");
}
}
public async ValueTask DisposeAsync()
{
if (!_leaveOpen)
await _stream.DisposeAsync();
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -255,4 +256,22 @@ public class DownloadDispatcher
var url = ud.UnParse(archive.State).ToString();
return mirrorAllowList.AllowedPrefixes.Any(p => url.StartsWith(p));
}
public async ValueTask<Stream> ChunkedSeekableStream(Archive archive, CancellationToken token)
{
if (!TryGetDownloader(archive, out var downloader))
{
throw new NotImplementedException($"Now downloader ot handle {archive.State}");
}
if (downloader is IChunkedSeekableStreamDownloader cs)
{
return await cs.GetChunkedSeekableStream(archive, token);
}
else
{
throw new NotImplementedException($"Downloader {archive.State} does not support chunked seekable streams");
}
}
}

View File

@ -0,0 +1,7 @@
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Wabbajack.DTOs;
using Wabbajack.Networking.Http;
namespace Wabbajack.Downloaders.Http;

View File

@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@ -11,6 +13,7 @@ using Wabbajack.DTOs;
using Wabbajack.DTOs.DownloadStates;
using Wabbajack.DTOs.Validation;
using Wabbajack.Hashing.xxHash64;
using Wabbajack.Networking.Http;
using Wabbajack.Networking.Http.Interfaces;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
@ -18,7 +21,7 @@ using Wabbajack.RateLimiter;
namespace Wabbajack.Downloaders.Http;
public class HttpDownloader : ADownloader<DTOs.DownloadStates.Http>, IUrlDownloader, IUpgradingDownloader
public class HttpDownloader : ADownloader<DTOs.DownloadStates.Http>, IUrlDownloader, IUpgradingDownloader, IChunkedSeekableStreamDownloader
{
private readonly HttpClient _client;
private readonly IHttpDownloader _downloader;
@ -100,7 +103,7 @@ public class HttpDownloader : ADownloader<DTOs.DownloadStates.Http>, IUrlDownloa
return await _client.SendAsync(msg, token);
}
private static HttpRequestMessage MakeMessage(DTOs.DownloadStates.Http state)
internal static HttpRequestMessage MakeMessage(DTOs.DownloadStates.Http state)
{
var msg = new HttpRequestMessage(HttpMethod.Get, state.Url);
foreach (var header in state.Headers)
@ -151,4 +154,35 @@ public class HttpDownloader : ADownloader<DTOs.DownloadStates.Http>, IUrlDownloa
};
return new[] {$"directURL={state.Url}"};
}
public async ValueTask<Stream> GetChunkedSeekableStream(Archive archive, CancellationToken token)
{
var state = archive.State as DTOs.DownloadStates.Http;
return new ChunkedSeekableDownloader(this, archive, state!);
}
public class ChunkedSeekableDownloader : AChunkedBufferingStream
{
private readonly DTOs.DownloadStates.Http _state;
private readonly Archive _archive;
private readonly HttpDownloader _downloader;
public ChunkedSeekableDownloader(HttpDownloader downloader, Archive archive, DTOs.DownloadStates.Http state) : base(21, archive.Size, 8)
{
_downloader = downloader;
_archive = archive;
_state = state;
}
public override async Task<byte[]> LoadChunk(long offset, int size)
{
var msg = HttpDownloader.MakeMessage(_state);
msg.Headers.Range = new RangeHeaderValue(offset, offset + size);
using var response = await _downloader._client.SendAsync(msg);
if (!response.IsSuccessStatusCode)
throw new HttpException(response);
return await response.Content.ReadAsByteArrayAsync();
}
}
}

View File

@ -8,9 +8,9 @@ using Wabbajack.Networking.Http.Interfaces;
namespace Wabbajack.Downloaders.IPS4OAuth2Downloader;
public class LoversLabDownloader : AIPS4OAuth2Downloader<VectorPlexusDownloader, LoversLabLoginState, LoversLab>
public class LoversLabDownloader : AIPS4OAuth2Downloader<LoversLabDownloader, LoversLabLoginState, LoversLab>
{
public LoversLabDownloader(ILogger<VectorPlexusDownloader> logger, ITokenProvider<LoversLabLoginState> loginInfo,
public LoversLabDownloader(ILogger<LoversLabDownloader> logger, ITokenProvider<LoversLabLoginState> loginInfo,
HttpClient client,
IHttpDownloader downloader, ApplicationInfo appInfo) : base(logger, loginInfo, client, downloader, appInfo,
new Uri("https://loverslab.com"), "Lovers Lab")

View File

@ -0,0 +1,11 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.DTOs;
namespace Wabbajack.Downloaders.Interfaces;
public interface IChunkedSeekableStreamDownloader
{
public ValueTask<Stream> GetChunkedSeekableStream(Archive archive, CancellationToken token);
}

View File

@ -8,6 +8,7 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Wabbajack.Compression.Zip\Wabbajack.Compression.Zip.csproj" />
<ProjectReference Include="..\Wabbajack.DTOs\Wabbajack.DTOs.csproj" />
<ProjectReference Include="..\Wabbajack.Paths.IO\Wabbajack.Paths.IO.csproj" />
</ItemGroup>

View File

@ -0,0 +1,28 @@
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.Common;
using Wabbajack.DTOs.CDN;
using Wabbajack.DTOs.DownloadStates;
using Wabbajack.Networking.Http;
namespace Wabbajack.Downloaders;
public class ChunkedSeekableDownloader : AChunkedBufferingStream
{
private readonly FileDefinition _definition;
private readonly WabbajackCDNDownloader _downloader;
private readonly WabbajackCDN _state;
public ChunkedSeekableDownloader(WabbajackCDN state, FileDefinition definition, WabbajackCDNDownloader downloader) : base(21, definition.Size, 8)
{
_state = state;
_downloader = downloader;
_definition = definition;
}
public override async Task<byte[]> LoadChunk(long offset, int size)
{
var idx = offset >> 21;
return await _downloader.GetPart(_state, _definition.Parts[idx], CancellationToken.None);
}
}

View File

@ -19,7 +19,7 @@ using Wabbajack.RateLimiter;
namespace Wabbajack.Downloaders;
public class WabbajackCDNDownloader : ADownloader<WabbajackCDN>, IUrlDownloader
public class WabbajackCDNDownloader : ADownloader<WabbajackCDN>, IUrlDownloader, IChunkedSeekableStreamDownloader
{
public static Dictionary<string, string> DomainRemaps = new()
{
@ -135,4 +135,27 @@ public class WabbajackCDNDownloader : ADownloader<WabbajackCDN>, IUrlDownloader
{
return new[] {$"directURL={state.Url}"};
}
public async ValueTask<Stream> GetChunkedSeekableStream(Archive archive, CancellationToken token)
{
var state = archive.State as WabbajackCDN;
var definition = await GetDefinition(state!, token);
return new ChunkedSeekableDownloader(state!, definition!, this);
}
public async Task<byte[]> GetPart(WabbajackCDN state, PartDefinition part, CancellationToken token)
{
var msg = MakeMessage(new Uri(state.Url + $"/parts/{part.Index}"));
using var response = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead, token);
if (!response.IsSuccessStatusCode)
throw new InvalidDataException($"Bad response for part request for part {part.Index}");
var length = response.Content.Headers.ContentLength;
if (length != part.Size)
throw new InvalidDataException(
$"Bad part size, expected {part.Size} got {length} for part {part.Index}");
return await response.Content.ReadAsByteArrayAsync(token);
}
}

View File

@ -0,0 +1,111 @@
using System.Buffers.Binary;
using System.Text;
namespace Wabbajack.IO.Async;
public class AsyncBinaryReader
{
private readonly Stream _s;
private readonly Memory<byte> _buffer;
private readonly Endian _endian;
public AsyncBinaryReader(Stream s, Endian endian = Endian.Little)
{
_s = s;
_buffer = new Memory<byte>(new byte[256]);
_endian = endian;
}
public async ValueTask<byte> ReadByte()
{
await _s.ReadAllAsync(_buffer[..1]);
return _buffer.Span[0];
}
public async ValueTask<ushort> ReadUInt16()
{
await _s.ReadAllAsync(_buffer[..2]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadUInt16BigEndian(_buffer[..2].Span) :
BinaryPrimitives.ReadUInt16LittleEndian(_buffer[..2].Span);
}
public async ValueTask<uint> ReadUInt32()
{
await _s.ReadAllAsync(_buffer[..4]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadUInt32BigEndian(_buffer[..4].Span) :
BinaryPrimitives.ReadUInt32LittleEndian(_buffer[..4].Span);
}
public async ValueTask<ulong> ReadUInt64()
{
await _s.ReadAllAsync(_buffer[..8]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadUInt64BigEndian(_buffer[..8].Span) :
BinaryPrimitives.ReadUInt64LittleEndian(_buffer[..8].Span);
}
public async ValueTask<short> ReadInt16()
{
await _s.ReadAllAsync(_buffer[..2]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadInt16BigEndian(_buffer[..2].Span) :
BinaryPrimitives.ReadInt16LittleEndian(_buffer[..2].Span);
}
public async ValueTask<int> ReadInt32()
{
await _s.ReadAllAsync(_buffer[..4]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadInt32BigEndian(_buffer[..4].Span) :
BinaryPrimitives.ReadInt32LittleEndian(_buffer[..4].Span);
}
public async ValueTask<long> ReadInt64()
{
await _s.ReadAllAsync(_buffer[..8]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadInt64BigEndian(_buffer[..8].Span) :
BinaryPrimitives.ReadInt64LittleEndian(_buffer[..8].Span);
}
public async ValueTask<float> ReadFloat()
{
await _s.ReadAllAsync(_buffer[..4]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadSingleBigEndian(_buffer[..4].Span) :
BinaryPrimitives.ReadSingleLittleEndian(_buffer[..4].Span);
}
public async ValueTask<double> ReadDouble()
{
await _s.ReadAllAsync(_buffer[..8]);
return _endian == Endian.Big ?
BinaryPrimitives.ReadDoubleBigEndian(_buffer[..8].Span) :
BinaryPrimitives.ReadDoubleLittleEndian(_buffer[..8].Span);
}
public async ValueTask<byte[]> ReadBytes(int size)
{
var bytes = new byte[size];
await _s.ReadAllAsync(bytes);
return bytes;
}
public long Position
{
set => _s.Position = value;
get => _s.Position;
}
public long Length => _s.Length;
public Stream BaseStream => _s;
public async Task<string> ReadFixedSizeString(ushort length, Encoding encoding)
{
var buf = new Memory<byte>(new byte[length]);
await _s.ReadAllAsync(buf);
return encoding.GetString(buf.Span);
}
}

View File

@ -0,0 +1,7 @@
namespace Wabbajack.IO.Async;
public enum Endian
{
Little,
Big
}

View File

@ -0,0 +1,16 @@
namespace Wabbajack.IO.Async;
public static class Extensions
{
public static async ValueTask ReadAllAsync(this Stream frm, Memory<byte> output)
{
var read = 0;
while (read < output.Length)
{
var thisRead = await frm.ReadAsync(output[read..]);
if (thisRead == 0)
throw new Exception("End of stream reached before limit");
read += thisRead;
}
}
}

View File

@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>

View File

@ -4,6 +4,7 @@ using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
@ -14,6 +15,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Wabbajack.Common;
using Wabbajack.Compression.BSA;
using Wabbajack.Compression.Zip;
using Wabbajack.Downloaders;
using Wabbajack.Downloaders.GameFile;
using Wabbajack.DTOs;
@ -408,4 +410,22 @@ public class StandardInstaller : AInstaller<StandardInstaller>
await BinaryPatching.ApplyPatch(new MemoryStream(srcData), new MemoryStream(patchData), fs);
});
}
public static async Task<ModList> Load(DTOSerializer dtos, DownloadDispatcher dispatcher, ModlistMetadata metadata, CancellationToken token)
{
var archive = new Archive
{
State = dispatcher.Parse(new Uri(metadata.Links.Download))!,
Size = metadata.DownloadMetadata!.Size,
Hash = metadata.DownloadMetadata.Hash
};
var stream = await dispatcher.ChunkedSeekableStream(archive, token);
await using var reader = new ZipReader(stream);
var entry = (await reader.GetFiles()).First(e => e.FileName == "modlist");
var ms = new MemoryStream();
await reader.Extract(entry, ms, token);
ms.Position = 0;
return JsonSerializer.Deserialize<ModList>(ms, dtos.Options)!;
}
}

View File

@ -0,0 +1,76 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.Compression.Zip;
using Wabbajack.Hashing.xxHash64;
using Xunit;
namespace Wabbajack.Networking.Http.Test;
public class ChunkedBufferingStreamTests
{
private readonly Random _random;
private readonly byte[] _data;
private readonly MemoryStream _mstream;
public ChunkedBufferingStreamTests()
{
_random = new Random();
_data = new byte[1024 * 1024 * 24];
_random.NextBytes(_data);
_mstream = new MemoryStream(_data);
}
[Fact]
public async Task CanHashStream()
{
var cstream = new MemoryChunkedBufferingStream(_mstream);
Assert.Equal(await _data.Hash(), await cstream.Hash(CancellationToken.None));
}
[Fact]
public async Task CanExtractOneFile()
{
var ms = new MemoryStream();
var files = Enumerable.Range(1, 10)
.Select(f =>
{
var buffer = new byte[1024];
_random.NextBytes(buffer);
return (f, buffer);
}).ToArray();
using (var zipFile = new ZipArchive(ms, ZipArchiveMode.Create, true))
{
foreach (var (f, buffer) in files)
{
var entry = zipFile.CreateEntry(f.ToString(), CompressionLevel.Optimal);
await using var es = entry.Open();
await es.WriteAsync(buffer);
}
{
var entry = zipFile.CreateEntry("ending", CompressionLevel.Optimal);
await using var es = entry.Open();
await es.WriteAsync(Encoding.UTF8.GetBytes("Cheese for Everyone!"));
}
}
ms.Position = 0;
await using (var zipFile = new ZipReader(new MemoryChunkedBufferingStream(ms)))
{
var entry = (await zipFile.GetFiles()).First(f => f.FileName == "ending");
var ems = new MemoryStream();
await zipFile.Extract(entry, ems, CancellationToken.None);
Assert.Equal(Encoding.UTF8.GetBytes("Cheese for Everyone!"), ems.ToArray());
}
}
}

View File

@ -0,0 +1,22 @@
using System.IO;
using System.Threading.Tasks;
namespace Wabbajack.Networking.Http.Test;
public class MemoryChunkedBufferingStream : AChunkedBufferingStream
{
private readonly MemoryStream _src;
public MemoryChunkedBufferingStream(MemoryStream src) : base(4, src.Length, 16)
{
_src = src;
}
public override async Task<byte[]> LoadChunk(long offset, int size)
{
var buff = new byte[size];
_src.Position = offset;
await _src.ReadAsync(buff);
return buff;
}
}

View File

@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Wabbajack.Compression.Zip\Wabbajack.Compression.Zip.csproj" />
<ProjectReference Include="..\Wabbajack.Networking.Http\Wabbajack.Networking.Http.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,117 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Networking.Http;
public abstract class AChunkedBufferingStream : Stream
{
private readonly int _chunkSize;
private readonly int _position;
private readonly int _maxChunks;
private readonly Dictionary<ulong, byte[]> _chunks;
private readonly uint _chunkBitsShift;
private readonly ulong _chunkMask;
protected AChunkedBufferingStream(int chunkBitsShift, long totalSize, int maxChunks)
{
_chunkSize = 1 << chunkBitsShift;
_chunkBitsShift = (uint)chunkBitsShift;
_chunkMask = ulong.MaxValue << chunkBitsShift;
Length = totalSize;
_maxChunks = maxChunks;
_chunks = new Dictionary<ulong, byte[]>();
}
public abstract Task<byte[]> LoadChunk(long offset, int size);
public override void Flush()
{
throw new System.NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (Position >= Length) return 0;
var chunkId = (ulong)Position & _chunkMask;
if (!_chunks.TryGetValue(chunkId, out var chunkData))
{
var chunk = await LoadChunk((long)chunkId, _chunkSize);
_chunks.Add(chunkId, chunk);
}
var chunkOffset = (ulong)Position ^ _chunkMask;
var availableRead = (ulong)chunkData!.Length - chunkOffset;
var toRead = Math.Min((ulong)count, availableRead);
Array.Copy(chunkData, (uint)chunkOffset, buffer, offset, count);
return (int)toRead;
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = new CancellationToken())
{
if (Position >= Length) return 0;
var chunkId = (ulong)Position & _chunkMask;
if (!_chunks.TryGetValue(chunkId, out var chunkData))
{
chunkData = await LoadChunk((long)chunkId, _chunkSize);
_chunks.Add(chunkId, chunkData);
}
var chunkOffset = (ulong)Position & ~_chunkMask;
var availableRead = (ulong)chunkData!.Length - chunkOffset;
var toRead = Math.Min((ulong)buffer.Length, availableRead);
var chunkBuff = new Memory<byte>(chunkData).Slice((int)chunkOffset, (int)toRead);
chunkBuff.CopyTo(buffer);
Position += (long)toRead;
return (int)toRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
switch (origin)
{
case SeekOrigin.Begin:
Position = offset;
break;
case SeekOrigin.Current:
Position += offset;
break;
case SeekOrigin.End:
Position = Length - offset;
break;
default:
throw new ArgumentOutOfRangeException(nameof(origin), origin, null);
}
return Position;
}
public override void SetLength(long value)
{
throw new System.NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new System.NotImplementedException();
}
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => true;
public override long Length { get; }
public override long Position { get; set; } = 0;
}

View File

@ -125,6 +125,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Wabbajack.App.Blazor", "Wab
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{18E36813-CB53-4172-8FF3-EFE3B9B30A5F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Networking.Http.Test", "Wabbajack.Networking.Http.Test\Wabbajack.Networking.Http.Test.csproj", "{34FC755D-24F0-456A-B5C1-5BA7F12DC233}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Compression.Zip", "Wabbajack.Compression.Zip\Wabbajack.Compression.Zip.csproj", "{10165025-D30B-44B7-A764-50E15603AE56}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.IO.Async", "Wabbajack.IO.Async\Wabbajack.IO.Async.csproj", "{64AD7E26-5643-4969-A61C-E0A90FA25FCB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Compression.Zip.Test", "Wabbajack.Compression.Zip.Test\Wabbajack.Compression.Zip.Test.csproj", "{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -339,6 +347,22 @@ Global
{C6E9B15D-510F-4074-AB1C-069F36BA4622}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C6E9B15D-510F-4074-AB1C-069F36BA4622}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C6E9B15D-510F-4074-AB1C-069F36BA4622}.Release|Any CPU.Build.0 = Release|Any CPU
{34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Debug|Any CPU.Build.0 = Debug|Any CPU
{34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Release|Any CPU.ActiveCfg = Release|Any CPU
{34FC755D-24F0-456A-B5C1-5BA7F12DC233}.Release|Any CPU.Build.0 = Release|Any CPU
{10165025-D30B-44B7-A764-50E15603AE56}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{10165025-D30B-44B7-A764-50E15603AE56}.Debug|Any CPU.Build.0 = Debug|Any CPU
{10165025-D30B-44B7-A764-50E15603AE56}.Release|Any CPU.ActiveCfg = Release|Any CPU
{10165025-D30B-44B7-A764-50E15603AE56}.Release|Any CPU.Build.0 = Release|Any CPU
{64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64AD7E26-5643-4969-A61C-E0A90FA25FCB}.Release|Any CPU.Build.0 = Release|Any CPU
{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -380,6 +404,10 @@ Global
{4F252332-CA77-41DE-95A8-9DF38A81D675} = {98B731EE-4FC0-4482-A069-BCBA25497871}
{AB9A5C22-10CC-4EE0-A808-FB1DC9E24247} = {F01F8595-5FD7-4506-8469-F4A5522DACC1}
{D6351587-CAF6-4CB6-A2BD-5368E69F297C} = {F01F8595-5FD7-4506-8469-F4A5522DACC1}
{34FC755D-24F0-456A-B5C1-5BA7F12DC233} = {F01F8595-5FD7-4506-8469-F4A5522DACC1}
{10165025-D30B-44B7-A764-50E15603AE56} = {F677890D-5109-43BC-97C7-C4CD47C8EE0C}
{64AD7E26-5643-4969-A61C-E0A90FA25FCB} = {F677890D-5109-43BC-97C7-C4CD47C8EE0C}
{6D7EA87E-6ABE-4BA3-B93A-BE5E71A4DE7C} = {F677890D-5109-43BC-97C7-C4CD47C8EE0C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0AA30275-0F38-4A7D-B645-F5505178DDE8}