Store patches in RocksDb

This commit is contained in:
Timothy Baldridge 2020-06-01 21:41:34 -06:00
parent 4bcf2eb1b1
commit 496c0083bf
7 changed files with 113 additions and 119 deletions

View File

@ -31,7 +31,7 @@ namespace Wabbajack.Common.Test
switch (method)
{
case DiffMethod.Default:
await Utils.CreatePatch(src, dest, ms);
await Utils.CreatePatchCached(src, dest, ms);
break;
case DiffMethod.BSDiff:
BSDiff.Create(src, dest, ms);

106
Wabbajack.Common/Patches.cs Normal file
View File

@ -0,0 +1,106 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using RocksDbSharp;
namespace Wabbajack.Common
{
public static partial class Utils
{
private static RocksDb? _patchCache;
private static void InitPatches()
{
var options = new DbOptions().SetCreateIfMissing(true);
_patchCache = RocksDb.Open(options, (string)Consts.LocalAppDataPath.Combine("PatchCache.rocksDb"));
}
private static byte[] PatchKey(Hash src, Hash dest)
{
var arr = new byte[16];
Array.Copy(BitConverter.GetBytes((ulong)src), 0, arr, 0, 8);
Array.Copy(BitConverter.GetBytes((ulong)dest), 0, arr, 8, 8);
return arr;
}
public static async Task CreatePatchCached(byte[] a, byte[] b, Stream output)
{
var dataA = a.xxHash();
var dataB = b.xxHash();
var key = PatchKey(dataA, dataB);
var found = _patchCache!.Get(key);
if (found != null)
{
await output.WriteAsync(found);
return;
}
await using var patch = new MemoryStream();
Status("Creating Patch");
OctoDiff.Create(a, b, patch);
_patchCache.Put(key, patch.ToArray());
patch.Position = 0;
await patch.CopyToAsync(output);
}
public static async Task CreatePatchCached(Stream srcStream, Hash srcHash, FileStream destStream, Hash destHash,
Stream patchOutStream)
{
var key = PatchKey(srcHash, destHash);
var patch = _patchCache!.Get(key);
if (patch != null)
{
await patchOutStream.WriteAsync(patch);
}
await using var sigStream = new MemoryStream();
await using var patchStream = new MemoryStream();
OctoDiff.Create(srcStream, destStream, sigStream, patchStream);
_patchCache.Put(key, patchStream.ToArray());
patchStream.Position = 0;
await patchStream.CopyToAsync(patchOutStream);
}
public static bool TryGetPatch(Hash foundHash, Hash fileHash, [MaybeNullWhen(false)] out byte[] ePatch)
{
var key = PatchKey(foundHash, fileHash);
var patch = _patchCache!.Get(key);
if (patch != null)
{
ePatch = patch;
return true;
}
ePatch = null;
return false;
}
public static void ApplyPatch(Stream input, Func<Stream> openPatchStream, Stream output)
{
using var ps = openPatchStream();
using var br = new BinaryReader(ps);
var bytes = br.ReadBytes(8);
var str = Encoding.ASCII.GetString(bytes);
switch (str)
{
case "BSDIFF40":
BSDiff.Apply(input, openPatchStream, output);
return;
case "OCTODELT":
OctoDiff.Apply(input, openPatchStream, output);
return;
default:
throw new Exception($"No diff dispatch for: {str}");
}
}
}
}

View File

@ -108,6 +108,7 @@ namespace Wabbajack.Common
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(h => watcher.Deleted += h, h => watcher.Deleted -= h).Select(e => (FileEventType.Deleted, e.EventArgs)))
.ObserveOn(RxApp.TaskpoolScheduler);
watcher.EnableRaisingEvents = true;
InitPatches();
}
private static readonly Subject<IStatusMessage> LoggerSubj = new Subject<IStatusMessage>();
@ -723,121 +724,6 @@ namespace Wabbajack.Common
return ToFileSizeString((long)byteCount);
}
public static async Task CreatePatch(byte[] a, byte[] b, Stream output)
{
var dataA = a.xxHash().ToHex();
var dataB = b.xxHash().ToHex();
var cacheFile = Consts.PatchCacheFolder.Combine($"{dataA}_{dataB}.patch");
Consts.PatchCacheFolder.CreateDirectory();
while (true)
{
if (cacheFile.IsFile)
{
RETRY_OPEN:
try
{
await using var f = await cacheFile.OpenRead();
await f.CopyToAsync(output);
}
catch (IOException)
{
// Race condition with patch caching
await Task.Delay(100);
goto RETRY_OPEN;
}
}
else
{
var tmpName = Consts.PatchCacheFolder.Combine(Guid.NewGuid() + ".tmp");
await using (var f = await tmpName.Create())
{
Status("Creating Patch");
OctoDiff.Create(a, b, f);
}
RETRY:
try
{
await tmpName.MoveToAsync(cacheFile, true);
}
catch (UnauthorizedAccessException)
{
if (cacheFile.IsFile)
continue;
await Task.Delay(1000);
goto RETRY;
}
continue;
}
break;
}
}
public static async Task CreatePatch(Stream srcStream, Hash srcHash, FileStream destStream, Hash destHash,
FileStream patchStream)
{
await using var sigFile = new TempStream();
OctoDiff.Create(srcStream, destStream, sigFile, patchStream);
patchStream.Position = 0;
var tmpName = Consts.PatchCacheFolder.Combine(Guid.NewGuid() + ".tmp");
await using (var f = await tmpName.Create())
{
await patchStream.CopyToAsync(f);
patchStream.Position = 0;
}
try
{
var cacheFile = Consts.PatchCacheFolder.Combine($"{srcHash.ToHex()}_{destHash.ToHex()}.patch");
Consts.PatchCacheFolder.CreateDirectory();
await tmpName.MoveToAsync(cacheFile, true);
}
catch (UnauthorizedAccessException)
{
await tmpName.DeleteAsync();
}
}
public static bool TryGetPatch(Hash foundHash, Hash fileHash, [MaybeNullWhen(false)] out AbsolutePath ePatch)
{
var patchName = Consts.PatchCacheFolder.Combine($"{foundHash.ToHex()}_{fileHash.ToHex()}.patch");
if (patchName.Exists)
{
ePatch = patchName;
return true;
}
ePatch = default;
return false;
}
public static void ApplyPatch(Stream input, Func<Stream> openPatchStream, Stream output)
{
using var ps = openPatchStream();
using var br = new BinaryReader(ps);
var bytes = br.ReadBytes(8);
var str = Encoding.ASCII.GetString(bytes);
switch (str)
{
case "BSDIFF40":
BSDiff.Apply(input, openPatchStream, output);
return;
case "OCTODELT":
OctoDiff.Apply(input, openPatchStream, output);
return;
default:
throw new Exception($"No diff dispatch for: {str}");
}
}
public static IEnumerable<T> ButLast<T>(this IEnumerable<T> coll)
{
var lst = coll.ToList();

View File

@ -94,7 +94,9 @@ namespace Wabbajack.Lib.CompilationSteps
e.Hash = source.File.Hash;
if (Utils.TryGetPatch(found.Hash, source.File.Hash, out var data))
{
e.PatchID = await _compiler.IncludeFile(data);
}
return e;
}

View File

@ -31,7 +31,7 @@ namespace Wabbajack.Lib.CompilationSteps
Utils.Status($"Generating patch of {filename}");
await using (var ms = new MemoryStream())
{
await Utils.CreatePatch(await gameFile.ReadAllBytesAsync(), await source.AbsolutePath.ReadAllBytesAsync(), ms);
await Utils.CreatePatchCached(await gameFile.ReadAllBytesAsync(), await source.AbsolutePath.ReadAllBytesAsync(), ms);
var data = ms.ToArray();
result.SourceDataID = await _compiler.IncludeFile(data);
Utils.Log($"Generated a {data.Length} byte patch for {filename}");

View File

@ -504,7 +504,7 @@ namespace Wabbajack.Lib
await using var outputStream = await IncludeFile(out var id).Create();
entry.PatchID = id;
await using var destStream = await LoadDataForTo(entry.To, absolutePaths);
await Utils.CreatePatch(srcStream, srcFile.Hash, destStream, entry.Hash, outputStream);
await Utils.CreatePatchCached(srcStream, srcFile.Hash, destStream, entry.Hash, outputStream);
Info($"Patch size {outputStream.Length} for {entry.To}");
});
}

View File

@ -192,7 +192,7 @@ namespace Wabbajack.Lib
await using (var ms = new MemoryStream())
{
await Utils.CreatePatch(srcData, dstData, ms);
await Utils.CreatePatchCached(srcData, dstData, ms);
result.PatchID = await _compiler.IncludeFile(ms.ToArray());
}