wabbajack/Wabbajack.Hashing.xxHash64/StreamExtensions.cs

209 lines
6.1 KiB
C#
Raw Normal View History

using System;
2021-09-27 12:42:46 +00:00
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.RateLimiter;
2021-10-23 16:51:17 +00:00
namespace Wabbajack.Hashing.xxHash64;
public static class StreamExtensions
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
public static async Task<Hash> Hash(this Stream stream, CancellationToken token)
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
return await stream.HashingCopy(Stream.Null, token);
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
public static async Task<Hash> HashingCopy(this Stream inputStream, Stream outputStream,
CancellationToken token, IJob? job = null, int buffserSize = 1024 * 1024)
2021-10-23 16:51:17 +00:00
{
using var rented = MemoryPool<byte>.Shared.Rent(buffserSize);
2021-10-23 16:51:17 +00:00
var buffer = rented.Memory;
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
var hasher = new xxHashAlgorithm(0);
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
var running = true;
ulong finalHash = 0;
while (running && !token.IsCancellationRequested)
{
var totalRead = 0;
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
while (totalRead != buffer.Length)
{
var read = await inputStream.ReadAsync(buffer.Slice(totalRead, buffer.Length - totalRead),
token);
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
if (read == 0)
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
running = false;
break;
2021-09-27 12:42:46 +00:00
}
2021-10-23 16:51:17 +00:00
if (job != null)
await job.Report(read, token);
totalRead += read;
}
var pendingWrite = outputStream.WriteAsync(buffer[..totalRead], token);
if (running)
{
hasher.TransformByteGroupsInternal(buffer.Span);
await pendingWrite;
}
else
{
var preSize = (totalRead >> 5) << 5;
if (preSize > 0)
{
hasher.TransformByteGroupsInternal(buffer[..preSize].Span);
finalHash = hasher.FinalizeHashValueInternal(buffer[preSize..totalRead].Span);
2021-09-27 12:42:46 +00:00
await pendingWrite;
break;
}
2021-10-23 16:51:17 +00:00
finalHash = hasher.FinalizeHashValueInternal(buffer[..totalRead].Span);
await pendingWrite;
break;
2021-09-27 12:42:46 +00:00
}
2021-10-23 16:51:17 +00:00
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
await outputStream.FlushAsync(token);
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
return new Hash(finalHash);
2021-09-27 12:42:46 +00:00
}
public static async Task<Hash> HashingCopy(this Stream inputStream, Func<Memory<byte>, Task> fn,
CancellationToken token)
{
using var rented = MemoryPool<byte>.Shared.Rent(1024 * 1024);
var buffer = rented.Memory;
var hasher = new xxHashAlgorithm(0);
var running = true;
ulong finalHash = 0;
while (running && !token.IsCancellationRequested)
{
var totalRead = 0;
while (totalRead != buffer.Length)
{
var read = await inputStream.ReadAsync(buffer.Slice(totalRead, buffer.Length - totalRead),
token);
if (read == 0)
{
running = false;
break;
}
totalRead += read;
}
var pendingWrite = fn(buffer[..totalRead]);
if (running)
{
hasher.TransformByteGroupsInternal(buffer.Span);
await pendingWrite;
}
else
{
var preSize = (totalRead >> 5) << 5;
if (preSize > 0)
{
hasher.TransformByteGroupsInternal(buffer[..preSize].Span);
finalHash = hasher.FinalizeHashValueInternal(buffer[preSize..totalRead].Span);
await pendingWrite;
break;
}
finalHash = hasher.FinalizeHashValueInternal(buffer[..totalRead].Span);
await pendingWrite;
break;
}
}
return new Hash(finalHash);
}
public static (Stream InputStream, Func<Hash> Fn) HashingPull(this Stream src)
{
var stream = new PullingStream(src);
return (new BufferedStream(stream), () => stream.Hash);
}
class PullingStream : Stream
{
private readonly Stream _src;
private readonly xxHashAlgorithm _hasher;
private ulong? _hash;
public PullingStream(Stream src)
{
_src = src;
_hasher = new xxHashAlgorithm(0);
}
public Hash Hash => new(_hash ?? throw new InvalidOperationException("Hash not yet computed"));
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => _src.Length;
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override void Flush()
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_hash.HasValue)
throw new InvalidDataException("HashingPull can only be read once");
var sized = count >> 5 << 5;
if (sized == 0)
throw new ArgumentException("count must be a multiple of 32, got " + count, nameof(count));
var read = _src.ReadAtLeast(buffer, sized);
if (read == 0)
return 0;
if (read == sized)
{
_hasher.TransformByteGroupsInternal(buffer.AsSpan(offset, read));
}
else
{
_hash = _hasher.FinalizeHashValueInternal(buffer.AsSpan(offset, read));
return read;
}
return read;
}
}
2021-09-27 12:42:46 +00:00
}