Spool large files to disk during extraction to save memory

This commit is contained in:
Timothy Baldridge 2020-09-07 20:22:23 -06:00
parent 9de30ea8b7
commit 5d7bceb6dc
6 changed files with 127 additions and 39 deletions

View File

@ -21,4 +21,23 @@ namespace Compression.BSA
public DateTime LastModifiedUtc => DateTime.UtcNow;
public IPath Name => (RelativePath)"BSA Memory Stream";
}
public class MemoryBufferFactory : IStreamFactory
{
private readonly byte[] _data;
private int _size;
public MemoryBufferFactory(byte[] data, int size)
{
_data = data;
_size = size;
}
public async ValueTask<Stream> GetStream()
{
return new MemoryStream(_data, 0, _size);
}
public DateTime LastModifiedUtc => DateTime.UtcNow;
public IPath Name => (RelativePath)"BSA Memory Stream";
}
}

View File

@ -150,9 +150,11 @@ namespace Wabbajack.Lib
{
var patchData = await LoadBytesFromPath(pfa.PatchID);
var toFile = file.To.RelativeTo(OutputFolder);
await using var os = await toFile.Create();
Utils.ApplyPatch(s, () => new MemoryStream(patchData), os);
{
await using var os = await toFile.Create();
Utils.ApplyPatch(s, () => new MemoryStream(patchData), os);
}
if (await VirusScanner.ShouldScan(toFile) &&
await ClientAPI.GetVirusScanResult(toFile) == VirusScanner.Result.Malware)
{

View File

@ -27,6 +27,10 @@ namespace Wabbajack.VirtualFileSystem
public static async Task<Dictionary<RelativePath, T>> GatheringExtract<T>(IStreamFactory sFn,
Predicate<RelativePath> shouldExtract, Func<RelativePath, IStreamFactory, ValueTask<T>> mapfn)
{
if (sFn is NativeFileStreamFactory)
{
Utils.Log($"Extracting {sFn.Name}");
}
await using var archive = await sFn.GetStream();
var sig = await ArchiveSigs.MatchesAsync(archive);
archive.Position = 0;

View File

@ -21,6 +21,7 @@ namespace Wabbajack.VirtualFileSystem
private Dictionary<uint, (RelativePath, ulong)> _indexes;
private Stream _stream;
private Definitions.FileType _sig;
private Exception _killException;
public GatheringExtractor(Stream stream, Definitions.FileType sig, Predicate<RelativePath> shouldExtract, Func<RelativePath,IStreamFactory, ValueTask<T>> mapfn)
{
@ -51,7 +52,14 @@ namespace Wabbajack.VirtualFileSystem
_archive._archive.Extract(null, 0xFFFFFFFF, 0, this);
_archive.Dispose();
source.SetResult(true);
if (_killException != null)
{
source.SetException(_killException);
}
else
{
source.SetResult(true);
}
}
catch (Exception ex)
{
@ -105,51 +113,97 @@ namespace Wabbajack.VirtualFileSystem
private uint _index;
private bool _written;
private ulong _totalSize;
private MemoryStream _tmpStream;
private Stream _tmpStream;
private TempFile _tmpFile;
private IStreamFactory _factory;
private bool _diskCached;
public GatheringExtractorStream(GatheringExtractor<T> extractor, uint index)
{
_extractor = extractor;
_index = index;
_written = false;
_totalSize = extractor._indexes[index].Item2;
_tmpStream = new MemoryStream();
_diskCached = _totalSize >= 500_000_000;
}
public int Write(IntPtr data, uint size, IntPtr processedSize)
public int Write(byte[] data, uint size, IntPtr processedSize)
{
unsafe
try
{
var ums = new UnmanagedMemoryStream((byte*)data, size);
ums.CopyTo(_tmpStream);
if ((ulong)_tmpStream.Length >= _totalSize)
{
_tmpStream.Position = 0;
var result = _extractor._mapFn(_extractor._indexes[_index].Item1, new MemoryStreamFactory(_tmpStream)).AsTask().Result;
if (size == _totalSize)
WriteSingleCall(data, size);
else if (_diskCached)
WriteDiskCached(data, size);
else
WriteMemoryCached(data, size);
_extractor._results[_extractor._indexes[_index].Item1] = result;
}
if (processedSize != IntPtr.Zero)
{
Marshal.WriteInt32(processedSize, (int) size);
Marshal.WriteInt32(processedSize, (int)size);
}
}
return 0;
if (_written) throw new Exception("TODO");
unsafe
{
_written = true;
return 0;
return 0;
}
catch (Exception ex)
{
Utils.Log($"Error during extraction {ex}");
_extractor.Kill(ex);
return 1;
}
}
private void WriteSingleCall(byte[] data, in uint size)
{
var result = _extractor._mapFn(_extractor._indexes[_index].Item1, new MemoryBufferFactory(data, (int)size)).Result;
AddResult(result);
Cleanup();
}
private void Cleanup()
{
_tmpStream?.Dispose();
_tmpFile?.DisposeAsync().AsTask().Wait();
}
private void AddResult(T result)
{
_extractor._results.Add(_extractor._indexes[_index].Item1, result);
}
private void WriteMemoryCached(byte[] data, in uint size)
{
if (_tmpStream == null)
_tmpStream = new MemoryStream();
_tmpStream.Write(data, 0, (int)size);
if (_tmpStream.Length != (long)_totalSize) return;
_tmpStream.Flush();
_tmpStream.Position = 0;
var result = _extractor._mapFn(_extractor._indexes[_index].Item1, new MemoryStreamFactory((MemoryStream)_tmpStream)).Result;
AddResult(result);
Cleanup();
}
private void WriteDiskCached(byte[] data, in uint size)
{
if (_tmpFile == null)
{
_tmpFile = new TempFile();
_tmpStream = _tmpFile.Path.Create().Result;
}
_tmpStream.Write(data, 0, (int)size);
if (_tmpStream.Length != (long)_totalSize) return;
_tmpStream.Flush();
_tmpStream.Close();
var result = _extractor._mapFn(_extractor._indexes[_index].Item1, new NativeFileStreamFactory(_tmpFile.Path)).Result;
AddResult(result);
Cleanup();
}
public void Seek(long offset, uint seekOrigin, IntPtr newPosition)
@ -162,5 +216,10 @@ namespace Wabbajack.VirtualFileSystem
return 0;
}
}
private void Kill(Exception ex)
{
_killException = ex;
}
}
}

View File

@ -207,7 +207,7 @@ namespace Wabbajack.VirtualFileSystem.SevenZipExtractor
{
[PreserveSig]
int Write(
IntPtr data,
[In, MarshalAs(UnmanagedType.LPArray, SizeParamIndex = 1)]byte[] data,
uint size,
IntPtr processedSize); // ref uint processedSize
/*
@ -246,7 +246,7 @@ namespace Wabbajack.VirtualFileSystem.SevenZipExtractor
{
[PreserveSig]
int Write(
IntPtr data,
[Out, MarshalAs(UnmanagedType.LPArray, SizeParamIndex = 1)] byte[] data,
uint size,
IntPtr processedSize); // ref uint processedSize
@ -444,7 +444,7 @@ namespace Wabbajack.VirtualFileSystem.SevenZipExtractor
return 0;
}
public int Write(IntPtr data, uint size, IntPtr processedSize)
public int Write(byte[] data, uint size, IntPtr processedSize)
{
throw new NotImplementedException();
/*

View File

@ -209,12 +209,16 @@ namespace Wabbajack.VirtualFileSystem
try
{
var list = await FileExtractor2.GatheringExtract(extractedFile,
_ => true,
var list = await FileExtractor2.GatheringExtract(extractedFile,
_ => true,
async (path, sfactory) => await Analyze(context, self, sfactory, path, depth + 1));
self.Children = list.Values.ToImmutableList();
}
catch (EndOfStreamException ex)
{
return self;
}
catch (Exception ex)
{
Utils.Log($"Error while examining the contents of {relPath.FileName}");