Make BSA Routines async (#168)

* Make BSA routines async
This commit is contained in:
Timothy Baldridge 2019-11-11 21:35:07 -07:00 committed by GitHub
parent 0c05c5fce4
commit d9ca38cdff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 414 additions and 236 deletions

View File

@ -2,12 +2,12 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Remoting.Channels;
using System.Threading.Tasks;
using Alphaleonis.Win32.Filesystem;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
using Wabbajack.Common;
using Wabbajack.Common.CSP;
using Wabbajack.Lib.Downloaders;
using Wabbajack.Lib.NexusApi;
using Directory = Alphaleonis.Win32.Filesystem.Directory;
@ -59,9 +59,11 @@ namespace Compression.BSA.Test
private static string DownloadMod((Game, int) info)
{
var client = new NexusApiClient();
using (var client = new NexusApiClient())
{
var results = client.GetModFiles(info.Item1, info.Item2);
var file = results.FirstOrDefault(f => f.is_primary) ?? results.OrderByDescending(f => f.uploaded_timestamp).First();
var file = results.FirstOrDefault(f => f.is_primary) ??
results.OrderByDescending(f => f.uploaded_timestamp).First();
var src = Path.Combine(StagingFolder, file.file_name);
if (File.Exists(src)) return src;
@ -75,6 +77,7 @@ namespace Compression.BSA.Test
state.Download(src);
return src;
}
}
public static IEnumerable<object[]> BSAs()
{
@ -86,7 +89,7 @@ namespace Compression.BSA.Test
[TestMethod]
[DataTestMethod]
[DynamicData(nameof(BSAs), DynamicDataSourceType.Method)]
public void BSACompressionRecompression(string bsa)
public async Task BSACompressionRecompression(string bsa)
{
TestContext.WriteLine($"From {bsa}");
TestContext.WriteLine("Cleaning Output Dir");
@ -95,9 +98,9 @@ namespace Compression.BSA.Test
Directory.CreateDirectory(TempDir);
TestContext.WriteLine($"Reading {bsa}");
using (var a = BSADispatch.OpenRead(bsa))
using (var a = await BSADispatch.OpenRead(bsa))
{
Parallel.ForEach(a.Files, file =>
await a.Files.UnorderedParallelDo(async file =>
{
var abs_name = Path.Combine(TempDir, file.Path);
ViaJson(file.State);
@ -108,63 +111,41 @@ namespace Compression.BSA.Test
using (var fs = File.OpenWrite(abs_name))
{
file.CopyDataTo(fs);
await file.CopyDataToAsync(fs);
}
Assert.AreEqual(file.Size, new FileInfo(abs_name).Length);
});
/*
Console.WriteLine("Extracting via Archive.exe");
if (bsa.ToLower().EndsWith(".ba2"))
{
var p = Process.Start(Archive2Location, $"\"{bsa}\" -e=\"{ArchiveTempDir}\"");
p.WaitForExit();
foreach (var file in a.Files)
{
var a_path = Path.Combine(TempDir, file.Path);
var b_path = Path.Combine(ArchiveTempDir, file.Path);
Equal(new FileInfo(a_path).Length, new FileInfo(b_path).Length);
Equal(File.ReadAllBytes(a_path), File.ReadAllBytes(b_path));
}
}*/
Console.WriteLine($"Building {bsa}");
string TempFile = Path.Combine("tmp.bsa");
using (var w = ViaJson(a.State).MakeBuilder())
{
Parallel.ForEach(a.Files, file =>
await a.Files.UnorderedParallelDo(async file =>
{
var abs_path = Path.Combine(TempDir, file.Path);
using (var str = File.OpenRead(abs_path))
{
w.AddFile(ViaJson(file.State), str);
await w.AddFile(ViaJson(file.State), str);
}
});
w.Build(TempFile);
await w.Build(TempFile);
}
Console.WriteLine($"Verifying {bsa}");
using (var b = BSADispatch.OpenRead(TempFile))
using (var b = await BSADispatch.OpenRead(TempFile))
{
Console.WriteLine($"Performing A/B tests on {bsa}");
Assert.AreEqual(JsonConvert.SerializeObject(a.State), JsonConvert.SerializeObject(b.State));
//Equal((uint) a.ArchiveFlags, (uint) b.ArchiveFlags);
//Equal((uint) a.FileFlags, (uint) b.FileFlags);
// Check same number of files
Assert.AreEqual(a.Files.Count(), b.Files.Count());
var idx = 0;
foreach (var pair in a.Files.Zip(b.Files, (ai, bi) => (ai, bi)))
await a.Files.Zip(b.Files, (ai, bi) => (ai, bi))
.UnorderedParallelDo(async pair =>
{
idx++;
Assert.AreEqual(JsonConvert.SerializeObject(pair.ai.State),
@ -173,17 +154,17 @@ namespace Compression.BSA.Test
Assert.AreEqual(pair.ai.Path, pair.bi.Path);
//Equal(pair.ai.Compressed, pair.bi.Compressed);
Assert.AreEqual(pair.ai.Size, pair.bi.Size);
CollectionAssert.AreEqual(GetData(pair.ai), GetData(pair.bi));
}
CollectionAssert.AreEqual(await GetData(pair.ai), await GetData(pair.bi));
});
}
}
}
private static byte[] GetData(IFile pairAi)
private static async Task<byte[]> GetData(IFile pairAi)
{
using (var ms = new MemoryStream())
{
pairAi.CopyDataTo(ms);
await pairAi.CopyDataToAsync(ms);
return ms.ToArray();
}
}

View File

@ -58,9 +58,6 @@
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<ItemGroup>
<Reference Include="ReactiveUI">
<HintPath>..\..\..\Users\tbald\.nuget\packages\reactiveui\10.5.7\lib\net461\ReactiveUI.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Transactions" />
@ -74,6 +71,10 @@
<Project>{ff5d892f-8ff4-44fc-8f7f-cd58f307ad1b}</Project>
<Name>Compression.BSA</Name>
</ProjectReference>
<ProjectReference Include="..\Wabbajack.Common.CSP\Wabbajack.Common.CSP.csproj">
<Project>{9e69bc98-1512-4977-b683-6e7e5292c0b8}</Project>
<Name>Wabbajack.Common.CSP</Name>
</ProjectReference>
<ProjectReference Include="..\Wabbajack.Common\Wabbajack.Common.csproj">
<Project>{b3f3fb6e-b9eb-4f49-9875-d78578bc7ae5}</Project>
<Name>Wabbajack.Common</Name>

View File

@ -1,11 +1,13 @@
using System;
using System.Collections.Generic;
using System.Dynamic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ICSharpCode.SharpZipLib.Zip.Compression;
using ICSharpCode.SharpZipLib.Zip.Compression.Streams;
using Wabbajack.Common.CSP;
namespace Compression.BSA
{
@ -17,7 +19,7 @@ namespace Compression.BSA
int Index { get; }
void WriteData(BinaryWriter wtr);
Task WriteData(BinaryWriter wtr);
void WriteHeader(BinaryWriter wtr);
}
@ -35,22 +37,22 @@ namespace Compression.BSA
{
}
public void AddFile(FileStateObject state, Stream src)
public async Task AddFile(FileStateObject state, Stream src)
{
switch (_state.Type)
{
case EntryType.GNRL:
var result = new BA2FileEntryBuilder((BA2FileEntryState)state, src);
var result = await BA2FileEntryBuilder.Create((BA2FileEntryState)state, src);
lock(_entries) _entries.Add(result);
break;
case EntryType.DX10:
var resultdx10 = new BA2DX10FileEntryBuilder((BA2DX10EntryState)state, src);
var resultdx10 = await BA2DX10FileEntryBuilder.Create((BA2DX10EntryState)state, src);
lock(_entries) _entries.Add(resultdx10);
break;
}
}
public void Build(string filename)
public async Task Build(string filename)
{
SortEntries();
using (var fs = File.OpenWrite(filename))
@ -70,7 +72,7 @@ namespace Compression.BSA
foreach (var entry in _entries)
{
entry.WriteData(bw);
await entry.WriteData(bw);
}
if (_state.HasNameTable)
@ -84,7 +86,7 @@ namespace Compression.BSA
{
var bytes = Encoding.UTF7.GetBytes(entry.FullName);
bw.Write((ushort)bytes.Length);
bw.Write(bytes);
await bw.BaseStream.WriteAsync(bytes, 0, bytes.Length);
}
}
}
@ -101,14 +103,21 @@ namespace Compression.BSA
private BA2DX10EntryState _state;
private List<ChunkBuilder> _chunks;
public BA2DX10FileEntryBuilder(BA2DX10EntryState state, Stream src)
public static async Task<BA2DX10FileEntryBuilder> Create(BA2DX10EntryState state, Stream src)
{
_state = state;
var builder = new BA2DX10FileEntryBuilder();
builder._state = state;
var header_size = DDS.HeaderSizeForFormat((DXGI_FORMAT) state.PixelFormat) + 4;
new BinaryReader(src).ReadBytes((int)header_size);
_chunks = _state.Chunks.Select(ch => new ChunkBuilder(state, ch, src)).ToList();
// This can't be parallel because it all runs off the same base IO stream.
builder._chunks = new List<ChunkBuilder>();
foreach (var chunk in state.Chunks)
builder._chunks.Add(await ChunkBuilder.Create(state, chunk, src));
return builder;
}
public uint FileHash => _state.NameHash;
@ -134,10 +143,10 @@ namespace Compression.BSA
chunk.WriteHeader(bw);
}
public void WriteData(BinaryWriter wtr)
public async Task WriteData(BinaryWriter wtr)
{
foreach (var chunk in _chunks)
chunk.WriteData(wtr);
await chunk.WriteData(wtr);
}
}
@ -149,28 +158,31 @@ namespace Compression.BSA
private uint _packSize;
private long _offsetOffset;
public ChunkBuilder(BA2DX10EntryState state, ChunkState ch, Stream src)
public static async Task<ChunkBuilder> Create(BA2DX10EntryState state, ChunkState chunk, Stream src)
{
_chunk = ch;
var builder = new ChunkBuilder {_chunk = chunk};
using (var ms = new MemoryStream())
{
src.CopyToLimit(ms, (int)_chunk.FullSz);
_data = ms.ToArray();
await src.CopyToLimitAsync(ms, (int)chunk.FullSz);
builder._data = ms.ToArray();
}
if (_chunk.Compressed)
{
if (!chunk.Compressed) return builder;
using (var ms = new MemoryStream())
{
using (var ds = new DeflaterOutputStream(ms))
{
ds.Write(_data, 0, _data.Length);
ds.Write(builder._data, 0, builder._data.Length);
}
_data = ms.ToArray();
}
_packSize = (uint)_data.Length;
builder._data = ms.ToArray();
}
builder._packSize = (uint) builder._data.Length;
return builder;
}
public void WriteHeader(BinaryWriter bw)
@ -185,13 +197,13 @@ namespace Compression.BSA
}
public void WriteData(BinaryWriter bw)
public async Task WriteData(BinaryWriter bw)
{
var pos = bw.BaseStream.Position;
bw.BaseStream.Position = _offsetOffset;
bw.Write((ulong)pos);
bw.BaseStream.Position = pos;
bw.Write(_data);
await bw.BaseStream.WriteAsync(_data, 0, _data.Length);
}
}
@ -203,15 +215,17 @@ namespace Compression.BSA
private BA2FileEntryState _state;
private long _offsetOffset;
public BA2FileEntryBuilder(BA2FileEntryState state, Stream src)
public static async Task<BA2FileEntryBuilder> Create(BA2FileEntryState state, Stream src)
{
_state = state;
var builder = new BA2FileEntryBuilder();
builder._state = state;
using (var ms = new MemoryStream())
{
src.CopyTo(ms);
_data = ms.ToArray();
await src.CopyToAsync(ms);
builder._data = ms.ToArray();
}
_rawSize = _data.Length;
builder._rawSize = builder._data.Length;
if (state.Compressed)
{
@ -219,14 +233,13 @@ namespace Compression.BSA
{
using (var ds = new DeflaterOutputStream(ms))
{
ds.Write(_data, 0, _data.Length);
await ds.WriteAsync(builder._data, 0, builder._data.Length);
}
_data = ms.ToArray();
builder._data = ms.ToArray();
}
_size = _data.Length;
builder._size = builder._data.Length;
}
return builder;
}
public uint FileHash => _state.NameHash;
@ -247,13 +260,13 @@ namespace Compression.BSA
wtr.Write(_state.Align);
}
public void WriteData(BinaryWriter wtr)
public async Task WriteData(BinaryWriter wtr)
{
var pos = wtr.BaseStream.Position;
wtr.BaseStream.Seek(_offsetOffset, SeekOrigin.Begin);
wtr.Write((ulong)pos);
wtr.BaseStream.Position = pos;
wtr.Write(_data);
await wtr.BaseStream.WriteAsync(_data, 0, _data.Length);
}
}
}

View File

@ -188,7 +188,7 @@ namespace Compression.BSA
public uint HeaderSize => DDS.HeaderSizeForFormat((DXGI_FORMAT)_format);
public void CopyDataTo(Stream output)
public async Task CopyDataToAsync(Stream output)
{
var bw = new BinaryWriter(output);
@ -199,25 +199,25 @@ namespace Compression.BSA
{
foreach (var chunk in _chunks)
{
byte[] full = new byte[chunk._fullSz];
var full = new byte[chunk._fullSz];
var isCompressed = chunk._packSz != 0;
br.BaseStream.Seek((long)chunk._offset, SeekOrigin.Begin);
if (!isCompressed)
{
br.Read(full, 0, full.Length);
await br.BaseStream.ReadAsync(full, 0, full.Length);
}
else
{
byte[] compressed = new byte[chunk._packSz];
br.Read(compressed, 0, compressed.Length);
await br.BaseStream.ReadAsync(compressed, 0, compressed.Length);
var inflater = new Inflater();
inflater.SetInput(compressed);
inflater.Inflate(full);
}
bw.Write(full);
await bw.BaseStream.WriteAsync(full, 0, full.Length);
}
}
@ -450,21 +450,19 @@ namespace Compression.BSA
public uint Size => _realSize;
public FileStateObject State => new BA2FileEntryState(this);
public void CopyDataTo(Stream output)
public async Task CopyDataToAsync(Stream output)
{
using (var bw = new BinaryWriter(output))
using (var fs = File.OpenRead(_bsa._filename))
using (var br = new BinaryReader(fs))
{
br.BaseStream.Seek((long) _offset, SeekOrigin.Begin);
fs.Seek((long) _offset, SeekOrigin.Begin);
uint len = Compressed ? _size : _realSize;
var bytes = new byte[len];
br.Read(bytes, 0, (int) len);
await fs.ReadAsync(bytes, 0, (int) len);
if (!Compressed)
{
bw.Write(bytes);
await output.WriteAsync(bytes, 0, bytes.Length);
}
else
{
@ -472,7 +470,7 @@ namespace Compression.BSA
var inflater = new Inflater();
inflater.SetInput(bytes);
inflater.Inflate(uncompressed);
bw.Write(uncompressed);
await output.WriteAsync(uncompressed, 0, uncompressed.Length);
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ICSharpCode.SharpZipLib.Zip.Compression.Streams;
using K4os.Compression.LZ4;
using K4os.Compression.LZ4.Streams;
@ -76,24 +77,11 @@ namespace Compression.BSA
public void Dispose()
{
}
public FileEntry AddFile(string path, Stream src, bool flipCompression = false)
{
var r = new FileEntry(this, path, src, flipCompression);
lock (this)
{
_files.Add(r);
}
return r;
}
public void AddFile(FileStateObject state, Stream src)
public async Task AddFile(FileStateObject state, Stream src)
{
var ostate = (BSAFileStateObject) state;
var r = new FileEntry(this, ostate.Path, src, ostate.FlipCompression);
var r = await FileEntry.Create(this, ostate.Path, src, ostate.FlipCompression);
lock (this)
{
@ -101,7 +89,7 @@ namespace Compression.BSA
}
}
public void Build(string outputName)
public async Task Build(string outputName)
{
RegenFolderRecords();
if (File.Exists(outputName)) File.Delete(outputName);
@ -133,7 +121,8 @@ namespace Compression.BSA
foreach (var file in _files) wtr.Write(file._nameBytes);
foreach (var file in _files) file.WriteData(wtr);
foreach (var file in _files)
await file.WriteData(wtr);
}
}
@ -244,28 +233,30 @@ namespace Compression.BSA
private long _offsetOffset;
internal int _originalSize;
internal string _path;
private readonly byte[] _pathBSBytes;
private byte[] _pathBSBytes;
internal byte[] _pathBytes;
internal byte[] _rawData;
public FileEntry(BSABuilder bsa, string path, Stream src, bool flipCompression)
public static async Task<FileEntry> Create(BSABuilder bsa, string path, Stream src, bool flipCompression)
{
_bsa = bsa;
_path = path.ToLowerInvariant();
_name = System.IO.Path.GetFileName(_path);
_hash = _name.GetBSAHash();
_nameBytes = _name.ToTermString(bsa.HeaderType);
_pathBytes = _path.ToTermString(bsa.HeaderType);
_pathBSBytes = _path.ToBSString();
_flipCompression = flipCompression;
var entry = new FileEntry();
entry._bsa = bsa;
entry._path = path.ToLowerInvariant();
entry._name = System.IO.Path.GetFileName(entry._path);
entry._hash = entry._name.GetBSAHash();
entry._nameBytes = entry._name.ToTermString(bsa.HeaderType);
entry._pathBytes = entry._path.ToTermString(bsa.HeaderType);
entry._pathBSBytes = entry._path.ToBSString();
entry._flipCompression = flipCompression;
var ms = new MemoryStream();
src.CopyTo(ms);
_rawData = ms.ToArray();
_originalSize = _rawData.Length;
await src.CopyToAsync(ms);
entry._rawData = ms.ToArray();
entry._originalSize = entry._rawData.Length;
if (Compressed)
CompressData();
if (entry.Compressed)
entry.CompressData();
return entry;
}
public bool Compressed
@ -330,7 +321,7 @@ namespace Compression.BSA
wtr.Write(0xDEADBEEF);
}
internal void WriteData(BinaryWriter wtr)
internal async Task WriteData(BinaryWriter wtr)
{
var offset = (uint) wtr.BaseStream.Position;
wtr.BaseStream.Position = _offsetOffset;
@ -342,11 +333,11 @@ namespace Compression.BSA
if (Compressed)
{
wtr.Write((uint) _originalSize);
wtr.Write(_rawData);
await wtr.BaseStream.WriteAsync(_rawData, 0, _rawData.Length);
}
else
{
wtr.Write(_rawData);
await wtr.BaseStream.WriteAsync(_rawData, 0, _rawData.Length);
}
}
}

View File

@ -4,12 +4,15 @@ using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Wabbajack.Common.CSP;
namespace Compression.BSA
{
public static class BSADispatch
{
public static IBSAReader OpenRead(string filename)
public static Task<IBSAReader> OpenRead(string filename)
{
return CSPExtensions.ThreadedTask<IBSAReader>(() =>
{
string fourcc = "";
using (var file = File.OpenRead(filename))
@ -22,6 +25,8 @@ namespace Compression.BSA
if (fourcc == "BTDX")
return new BA2Reader(filename);
throw new InvalidDataException("Filename is not a .bsa or .ba2, magic " + fourcc);
});
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ICSharpCode.SharpZipLib.Zip.Compression.Streams;
using K4os.Compression.LZ4.Streams;
using File = Alphaleonis.Win32.Filesystem.File;
@ -302,7 +303,7 @@ namespace Compression.BSA
_name = rdr.ReadStringTerm(_bsa.HeaderType);
}
public void CopyDataTo(Stream output)
public async Task CopyDataToAsync(Stream output)
{
using (var in_file = File.OpenRead(_bsa._fileName))
using (var rdr = new BinaryReader(in_file))
@ -314,11 +315,11 @@ namespace Compression.BSA
if (Compressed)
{
var r = LZ4Stream.Decode(rdr.BaseStream);
r.CopyToLimit(output, (int) _originalSize);
await r.CopyToLimitAsync(output, (int) _originalSize);
}
else
{
rdr.BaseStream.CopyToLimit(output, (int) _onDiskSize);
await rdr.BaseStream.CopyToLimitAsync(output, (int) _onDiskSize);
}
}
else
@ -326,18 +327,18 @@ namespace Compression.BSA
if (Compressed)
using (var z = new InflaterInputStream(rdr.BaseStream))
{
z.CopyToLimit(output, (int) _originalSize);
await z.CopyToLimitAsync(output, (int) _originalSize);
}
else
rdr.BaseStream.CopyToLimit(output, (int) _onDiskSize);
await rdr.BaseStream.CopyToLimitAsync(output, (int) _onDiskSize);
}
}
}
public byte[] GetData()
public async Task<byte[]> GetData()
{
var ms = new MemoryStream();
CopyDataTo(ms);
await CopyDataToAsync(ms);
return ms.ToArray();
}
}

View File

@ -110,5 +110,11 @@
<Version>1.2.0</Version>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Wabbajack.Common.CSP\Wabbajack.Common.CSP.csproj">
<Project>{9e69bc98-1512-4977-b683-6e7e5292c0b8}</Project>
<Name>Wabbajack.Common.CSP</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>

View File

@ -19,8 +19,8 @@ namespace Compression.BSA
public interface IBSABuilder : IDisposable
{
void AddFile(FileStateObject state, Stream src);
void Build(string filename);
Task AddFile(FileStateObject state, Stream src);
Task Build(string filename);
}
public class ArchiveStateObject
@ -59,6 +59,6 @@ namespace Compression.BSA
/// in order to maintain thread-safe access.
/// </summary>
/// <param name="output"></param>
void CopyDataTo(Stream output);
Task CopyDataToAsync(Stream output);
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Path = Alphaleonis.Win32.Filesystem.Path;
namespace Compression.BSA
@ -141,6 +142,20 @@ namespace Compression.BSA
return ((ulong) (hash2 + hash3) << 32) + hash1;
}
public static async Task CopyToLimitAsync(this Stream frm, Stream tw, int limit)
{
var buff = new byte[1024];
while (limit > 0)
{
var to_read = Math.Min(buff.Length, limit);
var read = await frm.ReadAsync(buff, 0, to_read);
await tw.WriteAsync(buff, 0, read);
limit -= read;
}
tw.Flush();
}
public static void CopyToLimit(this Stream frm, Stream tw, int limit)
{
var buff = new byte[1024];

View File

@ -1,11 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
namespace Wabbajack.Common.CSP
{
@ -33,6 +31,48 @@ namespace Wabbajack.Common.CSP
return chan;
}
public static IChannel<TOut, TOut> Select<TInSrc, TOutSrc, TOut>(this IChannel<TInSrc, TOutSrc> from, Func<TOutSrc, Task<TOut>> f, bool propagateClose = true)
{
var to = Channel.Create<TOut>(4);
Task.Run(async () =>
{
try
{
while (true)
{
var (is_open_src, val) = await from.Take();
if (!is_open_src) break;
var is_open_dest = await to.Put(await f(val));
if (!is_open_dest) break;
}
}
finally
{
if (propagateClose)
{
from.Close();
to.Close();
}
}
});
return to;
}
public static async Task UnorderedParallelDo<T>(this IEnumerable<T> coll, Func<T, Task> f)
{
var sink = Channel.CreateSink<bool>();
await coll.ToChannel()
.UnorderedPipeline(Environment.ProcessorCount,
sink,
async itm =>
{
await f(itm);
return true;
});
}
/// <summary>
/// Takes all the values from chan, once the channel closes returns a List of the values taken.

View File

@ -1,9 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
namespace Wabbajack.Common.CSP
{

View File

@ -1,17 +1,8 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.ServiceModel;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using System.Windows.Forms;
using System.Windows.Forms.VisualStyles;
using ICSharpCode.SharpZipLib.Zip;
using YamlDotNet.Serialization.NodeTypeResolvers;
namespace Wabbajack.Common.CSP
{

View File

@ -82,6 +82,15 @@ namespace Wabbajack.Common.CSP
}
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
IChannel<TInDest, TOutDest> to,
Func<TOutSrc, Task<TInDest>> f,
bool propagateClose = true)
{
await UnorderedPipeline(from, Environment.ProcessorCount, to, f, propagateClose);
}
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,

View File

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Wabbajack.CSP")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Wabbajack.CSP")]
[assembly: AssemblyCopyright("Copyright © 2019")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("9e69bc98-1512-4977-b683-6e7e5292c0b8")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

View File

@ -1,12 +1,6 @@
using System;
using System.CodeDom.Compiler;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Forms;
namespace Wabbajack.Common.CSP
{

View File

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{9E69BC98-1512-4977-B683-6E7E5292C0B8}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Wabbajack.CSP</RootNamespace>
<AssemblyName>Wabbajack.CSP</AssemblyName>
<TargetFrameworkVersion>v4.7.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<Deterministic>true</Deterministic>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Reactive, Version=4.2.0.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263, processorArchitecture=MSIL">
<HintPath>..\packages\System.Reactive.4.2.0\lib\net46\System.Reactive.dll</HintPath>
</Reference>
<Reference Include="System.Runtime.CompilerServices.Unsafe, Version=4.0.4.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\System.Runtime.CompilerServices.Unsafe.4.5.2\lib\netstandard2.0\System.Runtime.CompilerServices.Unsafe.dll</HintPath>
</Reference>
<Reference Include="System.Threading.Tasks.Extensions, Version=4.2.0.1, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
<HintPath>..\packages\System.Threading.Tasks.Extensions.4.5.3\lib\netstandard2.0\System.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
<Reference Include="System.ValueTuple, Version=4.0.3.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
<HintPath>..\packages\System.ValueTuple.4.5.0\lib\net47\System.ValueTuple.dll</HintPath>
</Reference>
<Reference Include="System.Windows" />
<Reference Include="System.Windows.Forms" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
<Reference Include="WindowsBase" />
</ItemGroup>
<ItemGroup>
<Compile Include="AChannel.cs" />
<Compile Include="Channel.cs" />
<Compile Include="Extensions.cs" />
<Compile Include="EnumeratorBuffer.cs" />
<Compile Include="FixedSizeBuffer.cs" />
<Compile Include="Handler.cs" />
<Compile Include="IBuffer.cs" />
<Compile Include="IChannel.cs" />
<Compile Include="ManyToManyChannel.cs" />
<Compile Include="PIpelines.cs" />
<Compile Include="PutTaskHandler.cs" />
<Compile Include="RingBuffer.cs" />
<Compile Include="RxBuffer.cs" />
<Compile Include="TakeTaskHandler.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="CSP Readme.md" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="System.Reactive" version="4.2.0" targetFramework="net472" />
<package id="System.Runtime.CompilerServices.Unsafe" version="4.5.2" targetFramework="net472" />
<package id="System.Threading.Tasks.Extensions" version="4.5.3" targetFramework="net472" />
<package id="System.ValueTuple" version="4.5.0" targetFramework="net472" />
</packages>

View File

@ -2,10 +2,12 @@
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Alphaleonis.Win32.Filesystem;
using Compression.BSA;
using ICSharpCode.SharpZipLib.GZip;
using OMODFramework;
using Wabbajack.Common.CSP;
namespace Wabbajack.Common
{
@ -36,7 +38,7 @@ namespace Wabbajack.Common
try
{
if (Consts.SupportedBSAs.Any(b => source.ToLower().EndsWith(b)))
ExtractAllWithBSA(source, dest);
ExtractAllWithBSA(source, dest).Wait();
else if (source.EndsWith(".exe"))
ExtractAllWithInno(source, dest);
else if (source.EndsWith(".omod"))
@ -61,13 +63,16 @@ namespace Wabbajack.Common
omod.ExtractPlugins();
}
private static void ExtractAllWithBSA(string source, string dest)
private static async Task ExtractAllWithBSA(string source, string dest)
{
try
{
using (var arch = BSADispatch.OpenRead(source))
using (var arch = await BSADispatch.OpenRead(source))
{
arch.Files.PMap(f =>
await arch.Files.ToChannel()
.UnorderedPipeline(
Channel.CreateSink<IFile>(),
async f =>
{
var path = f.Path;
if (f.Path.StartsWith("\\"))
@ -81,8 +86,10 @@ namespace Wabbajack.Common
using (var fs = File.OpenWrite(out_path))
{
f.CopyDataTo(fs);
await f.CopyDataToAsync(fs);
}
return f;
});
}
}

View File

@ -88,20 +88,6 @@
<Compile Include="BSDiff.cs" />
<Compile Include="ChildProcessTracker.cs" />
<Compile Include="Consts.cs" />
<Compile Include="CSP\AChannel.cs" />
<Compile Include="CSP\Channel.cs" />
<Compile Include="CSP\CSPExtensions.cs" />
<Compile Include="CSP\FixedSizeBuffer.cs" />
<Compile Include="CSP\IBuffer.cs" />
<Compile Include="CSP\Handler.cs" />
<Compile Include="CSP\IChannel.cs" />
<Compile Include="CSP\EnumeratorBuffer.cs" />
<Compile Include="CSP\ManyToManyChannel.cs" />
<Compile Include="CSP\Pipelines.cs" />
<Compile Include="CSP\PutTaskHandler.cs" />
<Compile Include="CSP\RingBuffer.cs" />
<Compile Include="CSP\RxBuffer.cs" />
<Compile Include="CSP\TakeTaskHandler.cs" />
<Compile Include="DynamicIniData.cs" />
<Compile Include="Error States\ErrorResponse.cs" />
<Compile Include="Error States\GetResponse.cs" />
@ -123,13 +109,16 @@
<EmbeddedResource Include="7z.dll.gz" />
<EmbeddedResource Include="7z.exe.gz" />
<None Include="app.config" />
<None Include="CSP\CSP Readme.md" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Compression.BSA\Compression.BSA.csproj">
<Project>{ff5d892f-8ff4-44fc-8f7f-cd58f307ad1b}</Project>
<Name>Compression.BSA</Name>
</ProjectReference>
<ProjectReference Include="..\Wabbajack.Common.CSP\Wabbajack.Common.CSP.csproj">
<Project>{9e69bc98-1512-4977-b683-6e7e5292c0b8}</Project>
<Name>Wabbajack.Common.CSP</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<Content Include="7z.dll" />

View File

@ -75,7 +75,7 @@ namespace Wabbajack.Lib.CompilationSteps
}
CreateBSA directive;
using (var bsa = BSADispatch.OpenRead(source.AbsolutePath))
using (var bsa = BSADispatch.OpenRead(source.AbsolutePath).Result)
{
directive = new CreateBSA
{

View File

@ -11,6 +11,7 @@ using System.Linq;
using System.Reflection;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using VFS;
using Wabbajack.Common;
using Wabbajack.Lib.CompilationSteps;
@ -449,7 +450,7 @@ namespace Wabbajack.Lib
using (var output = new MemoryStream())
{
var a = origin.ReadAll();
var b = LoadDataForTo(entry.To, absolute_paths);
var b = LoadDataForTo(entry.To, absolute_paths).Result;
Utils.CreatePatch(a, b, output);
entry.PatchID = IncludeFile(output.ToArray());
var file_size = File.GetSize(Path.Combine(ModListOutputFolder, entry.PatchID));
@ -459,7 +460,7 @@ namespace Wabbajack.Lib
}
}
private byte[] LoadDataForTo(string to, Dictionary<string, string> absolute_paths)
private async Task<byte[]> LoadDataForTo(string to, Dictionary<string, string> absolute_paths)
{
if (absolute_paths.TryGetValue(to, out var absolute))
return File.ReadAllBytes(absolute);
@ -469,13 +470,13 @@ namespace Wabbajack.Lib
var bsa_id = to.Split('\\')[1];
var bsa = InstallDirectives.OfType<CreateBSA>().First(b => b.TempID == bsa_id);
using (var a = BSADispatch.OpenRead(Path.Combine(MO2Folder, bsa.To)))
using (var a = await BSADispatch.OpenRead(Path.Combine(MO2Folder, bsa.To)))
{
var find = Path.Combine(to.Split('\\').Skip(2).ToArray());
var file = a.Files.First(e => e.Path.Replace('/', '\\') == find);
using (var ms = new MemoryStream())
{
file.CopyDataTo(ms);
await file.CopyDataToAsync(ms);
return ms.ToArray();
}
}

View File

@ -24,9 +24,6 @@ namespace Wabbajack.Lib.NexusApi
{
private static readonly string API_KEY_CACHE_FILE = "nexus.key_cache";
private static readonly uint CACHED_VERSION_NUMBER = 1;
private readonly HttpClient _httpClient;

View File

@ -1,9 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Reactive.Linq;
using System.Security.Policy;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Wabbajack.Common.CSP;

View File

@ -115,6 +115,10 @@
<Project>{5128b489-bc28-4f66-9f0b-b4565af36cbc}</Project>
<Name>VirtualFileSystem</Name>
</ProjectReference>
<ProjectReference Include="..\Wabbajack.Common.CSP\Wabbajack.Common.CSP.csproj">
<Project>{9e69bc98-1512-4977-b683-6e7e5292c0b8}</Project>
<Name>Wabbajack.Common.CSP</Name>
</ProjectReference>
<ProjectReference Include="..\Wabbajack.Common\Wabbajack.Common.csproj">
<Project>{b3f3fb6e-b9eb-4f49-9875-d78578bc7ae5}</Project>
<Name>Wabbajack.Common</Name>

View File

@ -30,6 +30,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Test.ListValidati
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Compression.BSA.Test", "Compression.BSA.Test\Compression.BSA.Test.csproj", "{9C004392-571A-4D28-A9F6-0E25115E6727}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wabbajack.Common.CSP", "Wabbajack.Common.CSP\Wabbajack.Common.CSP.csproj", "{9E69BC98-1512-4977-B683-6E7E5292C0B8}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug (no commandargs)|Any CPU = Debug (no commandargs)|Any CPU
@ -205,6 +207,24 @@ Global
{9C004392-571A-4D28-A9F6-0E25115E6727}.Release|x64.Build.0 = Release|Any CPU
{9C004392-571A-4D28-A9F6-0E25115E6727}.Release|x86.ActiveCfg = Release|Any CPU
{9C004392-571A-4D28-A9F6-0E25115E6727}.Release|x86.Build.0 = Release|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug (no commandargs)|Any CPU.ActiveCfg = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug (no commandargs)|Any CPU.Build.0 = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug (no commandargs)|x64.ActiveCfg = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug (no commandargs)|x64.Build.0 = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug (no commandargs)|x86.ActiveCfg = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug (no commandargs)|x86.Build.0 = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug|x64.ActiveCfg = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug|x64.Build.0 = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug|x86.ActiveCfg = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Debug|x86.Build.0 = Debug|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Release|Any CPU.Build.0 = Release|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Release|x64.ActiveCfg = Release|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Release|x64.Build.0 = Release|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Release|x86.ActiveCfg = Release|Any CPU
{9E69BC98-1512-4977-B683-6E7E5292C0B8}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE