Merge pull request #165 from wabbajack-tools/async-work-queue

Async work queue
This commit is contained in:
Timothy Baldridge 2019-11-11 06:17:36 -07:00 committed by GitHub
commit c6953bb42d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1905 additions and 0 deletions

View File

@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public abstract class AChannel<TIn, TOut> : IChannel<TIn, TOut>
{
public abstract bool IsClosed { get; }
public abstract void Close();
public abstract (AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler);
public abstract (AsyncResult, TOut) Take(Handler<Action<bool, TOut>> handler);
private Task<(bool, TOut)> _take_cancelled_task;
private Task<(bool, TOut)> TakeCancelledTask
{
get
{
if (_take_cancelled_task == null)
_take_cancelled_task = Task.FromCanceled<(bool, TOut)>(CancellationToken.None);
return _take_cancelled_task;
}
}
private Task<bool> _put_cancelled_task;
private Task<bool> PutCancelledTask
{
get
{
if (_put_cancelled_task == null)
_put_cancelled_task = Task.FromCanceled<bool>(CancellationToken.None);
return _put_cancelled_task;
}
}
public ValueTask<(bool, TOut)> Take(bool onCaller)
{
var handler = new TakeTaskHandler<TOut>();
var (resultType, val) = Take(handler);
switch (resultType)
{
case AsyncResult.Closed:
return new ValueTask<(bool, TOut)>((false, default));
case AsyncResult.Completed:
return new ValueTask<(bool, TOut)>((true, val));
case AsyncResult.Enqueued:
return new ValueTask<(bool, TOut)>(handler.TaskCompletionSource.Task);
case AsyncResult.Canceled:
return new ValueTask<(bool, TOut)>(TakeCancelledTask);
default:
// Should never happen
throw new InvalidDataException();
}
}
public ValueTask<bool> Put(TIn val, bool onCaller)
{
var handler = new PutTaskHandler<bool>();
var (resultType, putResult) = Put(val, handler);
switch (resultType)
{
case AsyncResult.Completed:
return new ValueTask<bool>(putResult);
case AsyncResult.Canceled:
return new ValueTask<bool>(PutCancelledTask);
case AsyncResult.Closed:
return new ValueTask<bool>(false);
case AsyncResult.Enqueued:
return new ValueTask<bool>(handler.TaskCompletionSource.Task);
default:
// Should never happen
throw new InvalidDataException();
}
}
}
}

View File

@ -0,0 +1,42 @@
### Overview of CSP (Communicating Sequential Processes) for the C# programmer
#### What is CSP?
Communicating Sequential processes is a programming model invented in 1978 by Tony Hoare, who described a process
of computation where hundreds or thousands of small processes communicate via channels. Think of this process like
a assembly line. Each worker in the factory is a process, and the conveyor belts are the channels. The workers don't need
to know where a part came from, or where it's going, they simply take one item off the belt, perform an operation and pass
the item on to another belt. This analogy works quite well, and the following observations about a factory also apply to
CSP:
* Multiple workers can pull from the same belt (channel/queue)
* Multiple workers can put work onto the belt
* Belts can buffer items, for slow consumers, but at some point they backup and block the writer
* A worker can pull/push to multiple belts.
#### What does this look like in C#?
The basic unit of CSP in this library is the channel:
```
var chan = Channel.Create()
```
Without any other parameters this creates a channel with a size of 0, so every pending put must be matched
1:1 with a take. This creates a syncronization point. Channels are fully async and thread-safe:
```
public async Task TestTakePutBlocking()
{
var channel = Channel.Create<int>();
// Channel size is 0, so we can't await, because we'd never complete
var ptask = channel.Put(1);
// Now the put is dispatched to the scheduler because we've taken the value
var (open, val) = await channel.Take();
Assert.AreEqual(1, val);
Assert.IsTrue(open);
Assert.IsTrue(await ptask);
}
```

View File

@ -0,0 +1,128 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
namespace Wabbajack.Common.CSP
{
public static class CSPExtensions
{
public static async Task OntoChannel<TIn, TOut>(this IEnumerable<TIn> coll, IChannel<TIn, TOut> chan)
{
foreach (var val in coll)
{
if (!await chan.Put(val)) break;
}
}
/// <summary>
/// Turns a IEnumerable collection into a channel. Note, computation of the enumerable will happen inside
/// the lock of the channel, so try to keep the work of the enumerable light.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="coll">Collection to spool out of the channel.</param>
/// <returns></returns>
public static IChannel<T, T> ToChannel<T>(this IEnumerable<T> coll)
{
var chan = Channel.Create(coll.GetEnumerator());
chan.Close();
return chan;
}
/// <summary>
/// Takes all the values from chan, once the channel closes returns a List of the values taken.
/// </summary>
/// <typeparam name="TOut"></typeparam>
/// <typeparam name="TIn"></typeparam>
/// <param name="chan"></param>
/// <returns></returns>
public static async Task<List<TOut>> TakeAll<TOut, TIn>(this IChannel<TIn, TOut> chan)
{
List<TOut> acc = new List<TOut>();
while (true)
{
var (open, val) = await chan.Take();
if (!open) break;
acc.Add(val);
}
return acc;
}
/// <summary>
/// Pipes values from `from` into `to`
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TMid"></typeparam>
/// <typeparam name="TOut"></typeparam>
/// <param name="from">source channel</param>
/// <param name="to">destination channel</param>
/// <param name="closeOnFinished">Tf true, will close the other channel when one channel closes</param>
/// <returns></returns>
public static async Task Pipe<TIn, TMid, TOut>(this IChannel<TIn, TMid> from, IChannel<TMid, TOut> to, bool closeOnFinished = true)
{
while (true)
{
var (isFromOpen, val) = await from.Take();
if (isFromOpen)
{
var isToOpen = await to.Put(val);
if (isToOpen) continue;
if (closeOnFinished)
@from.Close();
break;
}
if (closeOnFinished)
to.Close();
break;
}
}
public static Task<T> ThreadedTask<T>(Func<T> action)
{
var src = new TaskCompletionSource<T>();
var th = new Thread(() =>
{
try
{
src.SetResult(action());
}
catch (Exception ex)
{
src.SetException(ex);
}
}) {Priority = ThreadPriority.BelowNormal};
th.Start();
return src.Task;
}
public static Task ThreadedTask<T>(Action action)
{
var src = new TaskCompletionSource<bool>();
var th = new Thread(() =>
{
try
{
action();
src.SetResult(true);
}
catch (Exception ex)
{
src.SetException(ex);
}
})
{ Priority = ThreadPriority.BelowNormal };
th.Start();
return src.Task;
}
}
}

View File

@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public static class Channel
{
/// <summary>
/// Creates a channel without a buffer, and with no conversion function. This provides a syncronization
/// point, where all puts are matched 1:1 with takes.
/// </summary>
/// <param name="bufferSize"></param>
/// <typeparam name="T">The type of values transferred by the channel</typeparam>
/// <returns>A new channel</returns>
public static IChannel<T, T> Create<T>()
{
return new ManyToManyChannel<T, T>(x => x);
}
/// <summary>
/// Creates a channel with a given enumerator as the starting buffer. Values will not be puttable into this channel
/// and it will start closed. This is a easy way to spool a collection onto a channel. Note: the enumerator will be
/// run inside the channel's lock, so it may not be wise to pass in an enumerator that performs heavy computation.
/// </summary>
/// <param name="e">A IEnumerator to use as the contents of the channel</param>
/// <typeparam name="T">The type of values transferred by the channel</typeparam>
/// <returns>A new channel</returns>
public static IChannel<T, T> Create<T>(IEnumerator<T> e)
{
var chan = new ManyToManyChannel<T, T>(x => x, (_, __) => false, _ => {}, new EnumeratorBuffer<T>(e));
chan.Close();
return chan;
}
public static IChannel<T, T> Create<T>(int buffer_size)
{
var buffer = new FixedSizeBuffer<T>(buffer_size);
return new ManyToManyChannel<T, T>(x => x, (buff, itm) =>
{
buff.Add(itm);
return false;
},
b => {}, buffer);
}
public static IChannel<TIn, TOut> Create<TIn, TOut>(int buffer_size, Func<IObservable<TIn>, IObservable<TOut>> transform)
{
var buf = new RxBuffer<TIn, TOut>(buffer_size, transform);
return ChannelForRxBuf(buf);
}
private static ManyToManyChannel<TIn, TOut> ChannelForRxBuf<TIn, TOut>(RxBuffer<TIn, TOut> buf)
{
return new ManyToManyChannel<TIn, TOut>(null, RxBuffer<TIn,TOut>.TransformAdd, RxBuffer<TIn, TOut>.Finalize, buf);
}
/// <summary>
/// Creates a channel that discards every value
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <returns></returns>
public static IChannel<TIn, TIn> CreateSink<TIn>()
{
var buf = new RxBuffer<TIn, TIn>(1, e => e.Where(itm => false));
return ChannelForRxBuf(buf);
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
class EnumeratorBuffer<T> : IBuffer<T>
{
private readonly IEnumerator<T> _enumerator;
private bool _empty;
public EnumeratorBuffer(IEnumerator<T> enumerator)
{
_enumerator = enumerator;
_empty = !_enumerator.MoveNext();
}
public void Dispose()
{
}
public bool IsFull => true;
public bool IsEmpty => _empty;
public T Remove()
{
var val = _enumerator.Current;
_empty = !_enumerator.MoveNext();
return val;
}
public void Add(T itm)
{
throw new InvalidDataException();
}
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class FixedSizeBuffer<T> : IBuffer<T>
{
private int _size;
private RingBuffer<T> _buffer;
public FixedSizeBuffer(int size)
{
_size = size;
_buffer = new RingBuffer<T>(size);
}
public void Dispose()
{
}
public bool IsFull => _buffer.Length >= _size;
public bool IsEmpty => _buffer.IsEmpty;
public T Remove()
{
return _buffer.Pop();
}
public void Add(T itm)
{
_buffer.UnboundedUnshift(itm);
}
}
}

View File

@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace Wabbajack.Common.CSP
{
public interface Handler<T>
{
/// <summary>
/// Returns true if this handler has a callback, must work without a lock
/// </summary>
bool IsActive { get; }
/// <summary>
/// Returns true if this handler may be blocked, otherwise it must not block
/// </summary>
bool IsBlockable { get; }
/// <summary>
/// A unique id for lock aquisition order, 0 if no lock
/// </summary>
uint LockId { get; }
/// <summary>
/// Commit to fulfilling its end of the transfer, returns cb, must be called within a lock
/// </summary>
/// <returns>A callback</returns>
T Commit();
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
namespace Wabbajack.Common.CSP
{
public interface IBuffer<T> : IDisposable
{
bool IsFull { get; }
bool IsEmpty { get; }
T Remove();
void Add(T itm);
}
}

View File

@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public enum AsyncResult : int
{
/// <summary>
/// The channel was closed, so the returned value is meaningless
/// </summary>
Closed,
/// <summary>
/// The handler was canceled, so the returned value is meaningless
/// </summary>
Canceled,
/// <summary>
/// The callback was enqueued into the pending operations buffer, return value is useless
/// </summary>
Enqueued,
/// <summary>
/// The operation passed on the current thread, return the current value as the response value
/// </summary>
Completed
}
public interface IChannel<TIn, TOut>
{
bool IsClosed { get; }
void Close();
(AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler);
(AsyncResult, TOut) Take(Handler<Action<bool, TOut>> handler);
ValueTask<(bool, TOut)> Take(bool onCaller = true);
ValueTask<bool> Put(TIn val, bool onCaller = true);
}
}

View File

@ -0,0 +1,399 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.ServiceModel;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using System.Windows.Forms;
using System.Windows.Forms.VisualStyles;
using ICSharpCode.SharpZipLib.Zip;
using YamlDotNet.Serialization.NodeTypeResolvers;
namespace Wabbajack.Common.CSP
{
/// <summary>
/// An almost 1:1 port of Clojure's core.async channels
/// </summary>
public class ManyToManyChannel<TIn, TOut> : AChannel<TIn, TOut>
{
public const int MAX_QUEUE_SIZE = 1024;
private RingBuffer<Handler<Action<bool, TOut>>> _takes = new RingBuffer<Handler<Action<bool, TOut>>>(8);
private RingBuffer<(Handler<Action<bool>>, TIn)> _puts = new RingBuffer<(Handler<Action<bool>>, TIn)>(8);
private IBuffer<TOut> _buf;
private Func<IBuffer<TOut>, TIn, bool> _add;
private Action<IBuffer<TOut>> _finalize;
private Func<TIn, TOut> _converter;
volatile bool _isClosed = false;
public ManyToManyChannel(Func<TIn, TOut> converter)
{
_buf = null;
_add = null;
_finalize = null;
_converter = converter;
}
public ManyToManyChannel(Func<TIn, TOut> converter, Func<IBuffer<TOut>, TIn, bool> add, Action<IBuffer<TOut>> finalize, IBuffer<TOut> buffer)
{
_buf = buffer;
_add = add;
_finalize = finalize;
_converter = converter;
}
private static bool IsActiveTake(Handler<Action<bool, TOut>> handler)
{
return handler.IsActive;
}
private static bool IsActivePut((Handler<Action<bool>>, TIn) input)
{
return input.Item1.IsActive;
}
/// <summary>
/// Tries to put a put into the channel
/// </summary>
/// <param name="val"></param>
/// <param name="handler"></param>
/// <returns>(result_type, w)</returns>
public override (AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler)
{
Monitor.Enter(this);
if (_isClosed)
{
Monitor.Exit(this);
return (AsyncResult.Completed, false);
}
if (_buf != null && !_buf.IsFull && !_takes.IsEmpty)
{
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
var is_done = _add(_buf, val);
if (!_buf.IsEmpty)
{
var take_cbs = GetTakersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var action in take_cbs)
{
Task.Run(action);
}
return (AsyncResult.Completed, true);
}
if (is_done)
Abort();
Monitor.Exit(this);
return (AsyncResult.Closed, false);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, false);
}
var (put_cb2, take_cb) = GetCallbacks(handler, _takes);
if (put_cb2 != null && take_cb != null)
{
Monitor.Exit(this);
Task.Run(() => take_cb(true, _converter(val)));
return (AsyncResult.Completed, true);
}
if (_buf != null && !_buf.IsFull)
{
if (LockIfActiveCommit(handler) != null)
{
if (_add(_buf, val))
{
Abort();
}
Monitor.Exit(this);
return (AsyncResult.Completed, true);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, true);
}
if (handler.IsActive && handler.IsBlockable)
{
if (_puts.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_puts.Unshift((handler, val));
}
Monitor.Exit(this);
return (AsyncResult.Enqueued, true);
}
public override (AsyncResult, TOut) Take(Handler<Action<bool, TOut>> handler)
{
Monitor.Enter(this);
Cleanup();
if (_buf != null && !_buf.IsEmpty)
{
var take_cb = LockIfActiveCommit(handler);
if (take_cb != null)
{
var val = _buf.Remove();
var (is_done, cbs) = GetPuttersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(() => cb(true));
return (AsyncResult.Completed, val);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, default);
}
var (take_cb2, put_cb, val2, found) = FindMatchingPut(handler);
if (take_cb2 != null && put_cb != null)
{
Monitor.Exit(this);
Task.Run(() => put_cb(true));
return (AsyncResult.Completed, _converter(val2));
}
if (_isClosed)
{
if (_buf != null && found)
_add(_buf, val2);
var has_val = _buf != null && !_buf.IsEmpty;
var take_cb3 = LockIfActiveCommit(handler);
if (take_cb3 != null)
{
var val = has_val ? _buf.Remove() : default;
Monitor.Exit(this);
return has_val ? (AsyncResult.Completed, val) : (AsyncResult.Closed, default);
}
Monitor.Exit(this);
return (AsyncResult.Closed, default);
}
if (handler.IsBlockable)
{
if (_takes.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_takes.Unshift(handler);
}
Monitor.Exit(this);
return (AsyncResult.Enqueued, default);
}
public override bool IsClosed => _isClosed;
public override void Close()
{
Monitor.Enter(this);
Cleanup();
if (_isClosed)
{
Monitor.Exit(this);
return;
}
_isClosed = true;
if (_buf != null && _puts.IsEmpty)
_finalize(_buf);
var cbs = _buf == null? new List<Action>() : GetTakersForBuffer();
while (!_takes.IsEmpty)
{
var take_cb = LockIfActiveCommit(_takes.Pop());
if (take_cb != null)
cbs.Add(() => take_cb(false, default));
}
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(cb);
}
private (Action<bool, TOut>, Action<bool>, TIn, bool) FindMatchingPut(Handler<Action<bool, TOut>> handler)
{
while (!_puts.IsEmpty)
{
var (found, val) = _puts.Peek();
var (handler_cb, put_cb, handler_active, put_active) = LockIfActiveCommit(handler, found);
if (handler_active && put_active)
{
_puts.Pop();
return (handler_cb, put_cb, val, true);
}
if (!put_active)
{
_puts.Pop();
continue;
}
return (null, null, default, false);
}
return (null, null, default, false);
}
private (bool, List<Action<bool>>) GetPuttersForBuffer()
{
List<Action<bool>> acc = new List<Action<bool>>();
while (!_puts.IsEmpty)
{
var (putter, val) = _puts.Pop();
var cb = LockIfActiveCommit(putter);
if (cb != null)
{
acc.Add(cb);
}
var is_done = _add(_buf, val);
if (is_done || _buf.IsFull || _puts.IsEmpty)
return (is_done, acc);
}
return (false, acc);
}
private void Cleanup()
{
_takes.Cleanup(IsActiveTake);
_puts.Cleanup(IsActivePut);
}
private (T1, T2) GetCallbacks<T1, T2>(Handler<T1> handler, RingBuffer<Handler<T2>> queue)
{
while (!queue.IsEmpty)
{
var found = queue.Peek();
var (handler_cb, found_cb, handler_valid, found_valid) = LockIfActiveCommit(handler, found);
if (handler_valid && found_valid)
{
queue.Pop();
return (handler_cb, found_cb);
}
if (handler_valid)
{
queue.Pop();
}
else
{
return (default, default);
}
}
return (default, default);
}
private void Abort()
{
while (!_puts.IsEmpty)
{
var (handler, val) = _puts.Pop();
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
Task.Run(() => put_cb(false));
}
}
_puts.Cleanup(x => false);
Close();
}
private List<Action> GetTakersForBuffer()
{
List<Action> ret = new List<Action>();
while (!_buf.IsEmpty && !_takes.IsEmpty)
{
var taker = _takes.Pop();
var take_cp = LockIfActiveCommit(taker);
if (take_cp != null)
{
var val = _buf.Remove();
ret.Add(() => take_cp(true, val));
}
}
return ret;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static T LockIfActiveCommit<T>(Handler<T> handler)
{
lock (handler)
{
return handler.IsActive ? handler.Commit() : default;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static (T1, T2, bool, bool) LockIfActiveCommit<T1, T2>(Handler<T1> handler1, Handler<T2> handler2)
{
if (handler1.LockId < handler2.LockId)
{
Monitor.Enter(handler1);
Monitor.Enter(handler2);
}
else
{
Monitor.Enter(handler2);
Monitor.Enter(handler1);
}
if (handler1.IsActive && handler2.IsActive)
{
var ret1 = (handler1.Commit(), handler2.Commit(), true, true);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret1;
}
var ret2 = (default(T1), default(T2), handler1.IsActive, handler2.IsActive);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret2;
}
public class TooManyHanldersException : Exception
{
public override string ToString()
{
return $"No more than {MAX_QUEUE_SIZE} pending operations allowed on a single channel.";
}
}
}
}

View File

@ -0,0 +1,153 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public static class Pipelines
{
/// <summary>
/// Creates a pipeline that takes items from `from` transforms them with the pipeline given by `transform` and puts
/// the resulting values onto `to`. The pipeline may create 0 or more items for every input item and they will be
/// spooled onto `to` in a undefined order. `n` determines how many parallel tasks will be running at once. Each of
/// these tasks maintains its own transformation pipeline, so `transform` will be called once for every `n`. Completing
/// a `transform` pipeline has no effect.
/// </summary>
/// <typeparam name="TInSrc"></typeparam>
/// <typeparam name="TOutSrc"></typeparam>
/// <typeparam name="TInDest"></typeparam>
/// <typeparam name="TOutDest"></typeparam>
/// <param name="from"></param>
/// <param name="parallelism"></param>
/// <param name="to"></param>
/// <param name="transform"></param>
/// <param name="propagateClose"></param>
/// <returns></returns>
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<IObservable<TOutSrc>, IObservable<TInDest>> transform,
bool propagateClose = true)
{
async Task Pump()
{
var pipeline = new Subject<TOutSrc>();
var buffer = new List<TInDest>();
var dest = transform(pipeline);
dest.Subscribe(itm => buffer.Add(itm));
while (true)
{
var (is_open, tval) = await from.Take();
if (is_open)
{
pipeline.OnNext(tval);
foreach (var pval in buffer)
{
var is_put_open = await to.Put(pval);
if (is_put_open) continue;
if (propagateClose) @from.Close();
return;
}
buffer.Clear();
}
else
{
pipeline.OnCompleted();
if (buffer.Count > 0)
{
foreach (var pval in buffer)
if (!await to.Put(pval))
break;
}
break;
}
}
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<TOutSrc, Task<TInDest>> f,
bool propagateClose = true)
{
async Task Pump()
{
while (true)
{
var (is_open, job) = await from.Take();
if (!is_open) break;
var putIsOpen = await to.Put(await f(job));
if (!putIsOpen) return;
}
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
public static async Task UnorderedThreadedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<TOutSrc, TInDest> f,
bool propagateClose = true)
{
Task Pump()
{
var tcs = new TaskCompletionSource<bool>();
var th = new Thread(() =>
{
while (true)
{
var (is_open, job) = from.Take().Result;
if (!is_open) break;
var putIsOpen = to.Put(f(job)).Result;
if (!putIsOpen) return;
}
tcs.SetResult(true);
}) {Priority = ThreadPriority.BelowNormal};
th.Start();
return tcs.Task;
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
class PutTaskHandler<T> : Handler<Action<bool>>
{
private readonly bool _blockable;
private TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();
public PutTaskHandler(bool blockable = true)
{
_blockable = blockable;
}
public TaskCompletionSource<bool> TaskCompletionSource
{
get
{
if (_tcs == null)
_tcs = new TaskCompletionSource<bool>();
return _tcs;
}
}
public bool IsActive => true;
public bool IsBlockable => _blockable;
public uint LockId => 0;
public Action<bool> Commit()
{
return Handle;
}
private void Handle(bool val)
{
TaskCompletionSource.SetResult(val);
}
}
}

View File

@ -0,0 +1,119 @@
using System;
using System.CodeDom.Compiler;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Forms;
namespace Wabbajack.Common.CSP
{
public class RingBuffer<T> : IEnumerable<T>
{
private int _size;
private int _length;
private int _tail;
private int _head;
private T[] _arr;
public RingBuffer(int size = 8)
{
_size = size;
_arr = new T[size];
_tail = 0;
_length = 0;
_head = 0;
}
public T Pop()
{
if (_length == 0) return default;
var val = _arr[_tail];
_arr[_tail] = default;
_tail = (_tail + 1) % _size;
_length -= 1;
return val;
}
public T Peek()
{
return _length == 0 ? default : _arr[_tail];
}
public void Unshift(T x)
{
_arr[_head] = x;
_head = (_head + 1) % _size;
_length += 1;
}
public void UnboundedUnshift(T x)
{
if (_length == _size)
Resize();
Unshift(x);
}
public bool IsEmpty => _length == 0;
public int Length => _length;
private void Resize()
{
var new_arr_size = _size * 2;
var new_arr = new T[new_arr_size];
if (_tail < _head)
{
Array.Copy(_arr, _tail, new_arr, 0, _length);
_tail = 0;
_head = _length;
_arr = new_arr;
_size = new_arr_size;
}
else if (_tail > _head)
{
Array.Copy(_arr, _tail, new_arr, 0, _length - _tail);
Array.Copy(_arr, 0, new_arr, (_length - _tail), _head);
_tail = 0;
_head = _length;
_arr = new_arr;
_size = new_arr_size;
}
else
{
_tail = 0;
_head = 0;
_arr = new_arr;
_size = new_arr_size;
}
}
/// <summary>
/// Filers out all items where should_keep(itm) returns false
/// </summary>
/// <param name="should_keep"></param>
public void Cleanup(Func<T, bool> should_keep)
{
for (var idx = 0; idx < _length; idx++)
{
var v = Pop();
if (should_keep(v))
{
Unshift(v);
}
}
}
public IEnumerator<T> GetEnumerator()
{
while (!IsEmpty)
yield return Pop();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}

View File

@ -0,0 +1,68 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class RxBuffer<TIn, TOut> : LinkedList<TOut>, IBuffer<TOut>
{
private Subject<TIn> _inputSubject;
private IObservable<TOut> _outputObservable;
private bool _completed;
private int _maxSize;
public RxBuffer(int size, Func<IObservable<TIn>, IObservable<TOut>> transform) : base()
{
_maxSize = size;
_inputSubject = new Subject<TIn>();
_outputObservable = transform(_inputSubject);
_outputObservable.Subscribe(itm => AddFirst(itm), () => {
_completed = true;
});
}
public bool TransformAdd(TIn val)
{
_inputSubject.OnNext(val);
return _completed;
}
public static bool TransformAdd(IBuffer<TOut> buf, TIn itm)
{
return ((RxBuffer<TIn, TOut>) buf).TransformAdd(itm);
}
public void Finalize()
{
_inputSubject.OnCompleted();
}
public void Dispose()
{
throw new NotImplementedException();
}
public static void Finalize(IBuffer<TOut> buf)
{
((RxBuffer<TIn, TOut>)buf).Finalize();
}
public bool IsFull => Count >= _maxSize;
public bool IsEmpty => Count == 0;
public TOut Remove()
{
var ret = Last.Value;
RemoveLast();
return ret;
}
public void Add(TOut itm)
{
}
}
}

View File

@ -0,0 +1,50 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class TakeTaskHandler<T> : Handler<Action<bool, T>>
{
private readonly bool _blockable;
private TaskCompletionSource<(bool, T)> _tcs;
public TakeTaskHandler(TaskCompletionSource<T> tcs = null, bool blockable = true)
{
_blockable = blockable;
}
public TaskCompletionSource<(bool, T)> TaskCompletionSource
{
get
{
if (_tcs == null)
{
var new_tcs = new TaskCompletionSource<(bool, T)>();
Interlocked.CompareExchange(ref _tcs, new_tcs, null);
}
return _tcs;
}
}
public bool IsActive => true;
public bool IsBlockable => _blockable;
public uint LockId => 0;
public Task<(bool, T)> Task => TaskCompletionSource.Task;
public Action<bool, T> Commit()
{
return Handle;
}
private void Handle(bool is_open, T a)
{
TaskCompletionSource.SetResult((is_open, a));
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Wabbajack.Common
{
public static class ChannelStreams
{
public static async Task IntoChannel<T>(this IEnumerable<T> coll, Channel<T> dest, bool closeAtEnd = false)
{
foreach (var itm in coll)
{
await dest.Writer.WriteAsync(itm);
}
if (closeAtEnd)
dest.Writer.Complete();
}
public static Channel<T> AsChannel<T>(this IEnumerable<T> coll)
{
var chan = Channel.CreateUnbounded<T>();
coll.IntoChannel(chan, true);
return chan;
}
public static async Task<IEnumerable<T>> ToIEnumerable<T>(this Channel<T> src)
{
var buffer = new List<T>();
while (true)
{
var result = await src.Reader.ReadAsync();
buffer.Add(result);
}
return buffer;
}
}
}

View File

@ -88,6 +88,20 @@
<Compile Include="BSDiff.cs" />
<Compile Include="ChildProcessTracker.cs" />
<Compile Include="Consts.cs" />
<Compile Include="CSP\AChannel.cs" />
<Compile Include="CSP\Channel.cs" />
<Compile Include="CSP\CSPExtensions.cs" />
<Compile Include="CSP\FixedSizeBuffer.cs" />
<Compile Include="CSP\IBuffer.cs" />
<Compile Include="CSP\Handler.cs" />
<Compile Include="CSP\IChannel.cs" />
<Compile Include="CSP\EnumeratorBuffer.cs" />
<Compile Include="CSP\ManyToManyChannel.cs" />
<Compile Include="CSP\Pipelines.cs" />
<Compile Include="CSP\PutTaskHandler.cs" />
<Compile Include="CSP\RingBuffer.cs" />
<Compile Include="CSP\RxBuffer.cs" />
<Compile Include="CSP\TakeTaskHandler.cs" />
<Compile Include="DynamicIniData.cs" />
<Compile Include="Error States\ErrorResponse.cs" />
<Compile Include="Error States\GetResponse.cs" />
@ -109,6 +123,7 @@
<EmbeddedResource Include="7z.dll.gz" />
<EmbeddedResource Include="7z.exe.gz" />
<None Include="app.config" />
<None Include="CSP\CSP Readme.md" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Compression.BSA\Compression.BSA.csproj">
@ -149,5 +164,6 @@
<Version>8.0.0</Version>
</PackageReference>
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>

View File

@ -0,0 +1,252 @@
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Alphaleonis.Win32.Filesystem;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Wabbajack.Common.CSP;
namespace Wabbajack.Test.CSP
{
[TestClass]
public class CSPTests
{
public TestContext TestContext { get; set; }
public void Log(string msg)
{
TestContext.WriteLine(msg);
}
[TestInitialize]
public void Startup()
{
}
/// <summary>
/// Test that we can put a value onto a channel without a buffer, and that the put is released once the
/// take finalizes
/// </summary>
/// <returns></returns>
[TestMethod]
public async Task TestTakePutBlocking()
{
var channel = Channel.Create<int>();
var ptask = channel.Put(1);
var (open, val) = await channel.Take();
Assert.AreEqual(1, val);
Assert.IsTrue(open);
Assert.IsTrue(await ptask);
}
/// <summary>
/// If we create a channel with a fixed buffer size, we can enqueue that number of items without blocking
/// We can then take those items later on.
/// </summary>
/// <returns></returns>
[TestMethod]
public async Task TestTakePutBuffered()
{
var channel = Channel.Create<int>(10);
foreach (var itm in Enumerable.Range(0, 10))
await channel.Put(itm);
foreach (var itm in Enumerable.Range(0, 10))
{
var (is_open, val) = await channel.Take();
Assert.AreEqual(itm, val);
Assert.IsTrue(is_open);
}
}
/// <summary>
/// We can convert a IEnumerable into a channel by inlining the enumerable into the channel's buffer.
/// </summary>
/// <returns></returns>
[TestMethod]
public async Task TestToChannel()
{
var channel = Enumerable.Range(0, 10).ToChannel();
foreach (var itm in Enumerable.Range(0, 10))
{
var (is_open, val) = await channel.Take();
Assert.AreEqual(itm, val);
Assert.IsTrue(is_open);
}
}
/// <summary>
/// TakeAll will continue to take from a channel as long as the channel is open. Once the channel closes
/// TakeAll returns a list of the items taken.
/// </summary>
/// <returns></returns>
[TestMethod]
public async Task TestTakeAll()
{
var results = await Enumerable.Range(0, 10).ToChannel().TakeAll();
CollectionAssert.AreEqual(Enumerable.Range(0, 10).ToList(), results);
}
/// <summary>
/// We can add Rx transforms as transforms inside a channel. This allows for cheap conversion and calcuation
/// to be performed in a channel without incuring the dispatch overhead of swapping values between threads.
/// These calculations happen inside the channel's lock, however, so be sure to keep these operations relatively
/// cheap.
/// </summary>
/// <returns></returns>
[TestMethod]
public async Task RxTransformInChannel()
{
var chan = Channel.Create<int, int>(1, o => o.Select(v => v + 1));
var finished = Enumerable.Range(0, 10).OntoChannel(chan);
foreach (var itm in Enumerable.Range(0, 10))
{
var (is_open, val) = await chan.Take();
Assert.AreEqual(itm + 1, val);
Assert.IsTrue(is_open);
}
await finished;
}
[TestMethod]
public async Task UnorderedPipeline()
{
var o = Channel.Create<string>(3);
var finished = Enumerable.Range(0, 3)
.ToChannel()
.UnorderedPipeline(1, o, obs => obs.Select(itm => itm.ToString()));
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 3).Select(i => i.ToString()).OrderBy(e => e).ToList();
CollectionAssert.AreEqual(expected, results);
}
[TestMethod]
public async Task UnorderedPipelineWithParallelism()
{
// Do it a hundred times to try and catch rare deadlocks
var o = Channel.Create<string>(3);
var finished = Enumerable.Range(0, 1024)
.ToChannel()
.UnorderedPipeline(4, o, obs => obs.Select(itm => itm.ToString()));
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 1024).Select(i => i.ToString()).OrderBy(e => e).ToList();
CollectionAssert.AreEqual(expected, results);
await finished;
}
[TestMethod]
public async Task UnorderedTaskPipeline()
{
// Do it a hundred times to try and catch rare deadlocks
var o = Channel.Create<int>(3);
var finished = Enumerable.Range(0, 1024)
.ToChannel()
.UnorderedPipeline(4, o, async v =>
{
await Task.Delay(1);
return v;
});
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 1024).ToList();
CollectionAssert.AreEqual(expected, results);
await finished;
}
[TestMethod]
public async Task UnorderedThreadPipeline()
{
// Do it a hundred times to try and catch rare deadlocks
var o = Channel.Create<int>(3);
var finished = Enumerable.Range(0, 1024)
.ToChannel()
.UnorderedThreadedPipeline(4, o, v =>
{
Thread.Sleep(1);
return v;
});
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 1024).ToList();
CollectionAssert.AreEqual(expected, results);
await finished;
}
[TestMethod]
public async Task ChannelStressTest()
{
var chan = Channel.Create<int>();
var putter = Task.Run(async () =>
{
for (var i = 0; i < 1000; i++)
{
var result = await chan.Put(i);
}
});
var taker = Task.Run(async () =>
{
try
{
for (var i = 0; i < 1000; i++)
{
var (is_open, val) = await chan.Take();
Assert.AreEqual(i, val);
}
}
finally
{
chan.Close();
}
});
await putter;
await taker;
}
[TestMethod]
public async Task ChannelStressWithBuffer()
{
var chan = Channel.Create<int>(1);
var putter = Task.Run(async () =>
{
for (var i = 0; i < 1000; i++)
{
await chan.Put(i);
}
});
var taker = Task.Run(async () =>
{
try
{
for (var i = 0; i < 1000; i++)
{
var (is_open, val) = await chan.Take();
Assert.AreEqual(i, val);
}
}
finally
{
chan.Close();
}
});
await putter;
await taker;
}
}
}

View File

@ -0,0 +1,243 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Security.Policy;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Wabbajack.Common.CSP;
namespace Wabbajack.Test.CSP
{
[TestClass]
public class ChannelTests
{
[TestMethod]
public async Task PutThenTakeNoBuffer()
{
var chan = Channel.Create<int>();
var putter = chan.Put(42);
var taker = chan.Take();
Assert.IsTrue(await putter);
Assert.AreEqual((true, 42), await taker);
}
[TestMethod]
public async Task TakeThenPushNoBuffer()
{
var chan = Channel.Create<int>();
var taker = chan.Take();
var putter = chan.Put(42);
Assert.IsTrue(await putter);
Assert.AreEqual((true, 42), await taker);
}
[TestMethod]
public async Task TakeFromBufferAfterPut()
{
var chan = Channel.Create<int>(1);
var putter = chan.Put(42);
var taker = chan.Take();
Assert.IsTrue(await putter);
Assert.AreEqual((true, 42), await taker);
}
[TestMethod]
public async Task TakeFromBufferBeforePut()
{
var chan = Channel.Create<int>(1);
var taker = chan.Take();
var putter = chan.Put(42);
Assert.IsTrue(await putter);
Assert.AreEqual((true, 42), await taker);
}
[TestMethod]
public async Task TakesAreReleasedAfterClose()
{
var chan = Channel.Create<int>();
var taker = chan.Take();
chan.Close();
Assert.AreEqual((false, 0), await taker);
}
[TestMethod]
public async Task ExpandingTransformsReleaseMultipleTakes()
{
var chan = Channel.Create<int, int>(1, i => i.SelectMany(len => Enumerable.Range(0, len)));
var take1 = chan.Take();
var take2 = chan.Take();
await chan.Put(2);
Assert.AreEqual((true, 0), await take1);
Assert.AreEqual((true, 1), await take2);
}
[TestMethod]
public async Task TransformsCanCloseChannel()
{
var chan = Channel.Create<int, int>(1, i => i.Take(1));
var take1 = chan.Take();
var take2 = chan.Take();
await chan.Put(1);
await chan.Put(2);
Assert.IsTrue(chan.IsClosed);
Assert.AreEqual((true, 1), await take1);
Assert.AreEqual((false, 0), await take2);
}
[TestMethod]
public async Task TransformsCanCloseDuringExpand()
{
var chan = Channel.Create<int, int>(1, i => i.SelectMany(len => Enumerable.Range(1, len)).Take(1));
var take1 = chan.Take();
var take2 = chan.Take();
await chan.Put(2);
Assert.IsTrue(chan.IsClosed);
Assert.AreEqual((true, 1), await take1);
Assert.AreEqual((false, 0), await take2);
}
[TestMethod]
public async Task TransformsCanFilterTakeFirst()
{
var chan = Channel.Create<int, int>(1, i => i.Where(x => x == 2));
var take1 = chan.Take();
var take2 = chan.Take();
await chan.Put(1);
await chan.Put(2);
chan.Close();
Assert.IsTrue(chan.IsClosed);
Assert.AreEqual((true, 2), await take1);
Assert.AreEqual((false, 0), await take2);
}
[TestMethod]
public async Task TransformsCanReturnNothingTakeFirst()
{
var chan = Channel.Create<int, int>(1, i => i.Take(0));
var take1 = chan.Take();
var take2 = chan.Take();
await chan.Put(1);
Assert.IsTrue(chan.IsClosed);
Assert.AreEqual((false, 0), await take1);
Assert.AreEqual((false, 0), await take2);
}
[TestMethod]
public async Task TransformsCanFilterTakeAfter()
{
var chan = Channel.Create<int, int>(1, i => i.Where(x => x == 2));
await chan.Put(1);
await chan.Put(2);
var take1 = chan.Take();
var take2 = chan.Take();
chan.Close();
Assert.IsTrue(chan.IsClosed);
Assert.AreEqual((true, 2), await take1);
Assert.AreEqual((false, 0), await take2);
}
[TestMethod]
public async Task TransformsCanReturnNothingTakeAfter()
{
var chan = Channel.Create<int, int>(1, i => i.Take(0));
await chan.Put(1);
var take1 = chan.Take();
var take2 = chan.Take();
Assert.IsTrue(chan.IsClosed);
Assert.AreEqual((false, 0), await take1);
Assert.AreEqual((false, 0), await take2);
}
[TestMethod]
public void TooManyTakesCausesException()
{
var chan = Channel.Create<int>();
Assert.ThrowsException<ManyToManyChannel<int, int>.TooManyHanldersException>(() =>
{
for (var x = 0; x < ManyToManyChannel<int, int>.MAX_QUEUE_SIZE + 1; x++)
chan.Take();
});
}
[TestMethod]
public void TooManyPutsCausesException()
{
var chan = Channel.Create<int>();
Assert.ThrowsException<ManyToManyChannel<int, int>.TooManyHanldersException>(() =>
{
for (var x = 0; x < ManyToManyChannel<int, int>.MAX_QUEUE_SIZE + 1; x++)
chan.Put(x);
});
}
[TestMethod]
public async Task BlockingPutsGoThroughTransform()
{
var chan = Channel.Create<int, int>(1, i => i.Take(2));
var put1 = chan.Put(1);
var put2 = chan.Put(2);
var put3 = chan.Put(3);
var put4 = chan.Put(4);
var take1 = chan.Take();
var take2 = chan.Take();
var take3 = chan.Take();
Assert.AreEqual((true, 1), await take1);
Assert.AreEqual((true, 2), await take2);
Assert.AreEqual((false, 0), await take3);
Assert.IsTrue(await put1);
Assert.IsTrue(await put2);
Assert.IsFalse(await put3);
Assert.IsFalse(await put4);
Assert.IsTrue(chan.IsClosed);
}
}
}

View File

@ -0,0 +1,19 @@
using System;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Wabbajack.Common;
namespace Wabbajack.Test.Wabbajack.Common.Tests
{
[TestClass]
public class ChannelStreamsTests
{
[TestMethod]
public void ToAndFromChannel()
{
var src = Enumerable.Range(0, 10).ToList();
var result = src.AsChannel().ToIEnumerable();
Assert.AreEqual(src, result);
}
}
}

View File

@ -93,6 +93,8 @@
</ItemGroup>
<ItemGroup>
<Compile Include="ACompilerTest.cs" />
<Compile Include="CSP\ChannelTests.cs" />
<Compile Include="CSP\CSPTests.cs" />
<Compile Include="DownloaderTests.cs" />
<Compile Include="EndToEndTests.cs" />
<Compile Include="Extensions.cs" />
@ -146,6 +148,7 @@
<Version>4.2.0</Version>
</PackageReference>
</ItemGroup>
<ItemGroup />
<Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>