wabbajack/Wabbajack.Common.CSP/ManyToManyChannel.cs
Timothy Baldridge 133fa2febd
Async VFS Implementation (#171)
* New VFS implementation can load/save/analyze files. All in a immutable, lock free (mostly) async parallel manner.

* VFS indexing is complete

* Can stage files

* Can extract VirtualFiles into PortableFiles and create contexts from PortableFiles

* Code cleanup
2019-11-14 15:22:53 -07:00

391 lines
12 KiB
C#

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
/// <summary>
/// An almost 1:1 port of Clojure's core.async channels
/// </summary>
public class ManyToManyChannel<TIn, TOut> : AChannel<TIn, TOut>
{
public const int MAX_QUEUE_SIZE = 1024;
private RingBuffer<Handler<Action<bool, TOut>>> _takes = new RingBuffer<Handler<Action<bool, TOut>>>(8);
private RingBuffer<(Handler<Action<bool>>, TIn)> _puts = new RingBuffer<(Handler<Action<bool>>, TIn)>(8);
private IBuffer<TOut> _buf;
private Func<IBuffer<TOut>, TIn, bool> _add;
private Action<IBuffer<TOut>> _finalize;
private Func<TIn, TOut> _converter;
volatile bool _isClosed = false;
public ManyToManyChannel(Func<TIn, TOut> converter)
{
_buf = null;
_add = null;
_finalize = null;
_converter = converter;
}
public ManyToManyChannel(Func<TIn, TOut> converter, Func<IBuffer<TOut>, TIn, bool> add, Action<IBuffer<TOut>> finalize, IBuffer<TOut> buffer)
{
_buf = buffer;
_add = add;
_finalize = finalize;
_converter = converter;
}
private static bool IsActiveTake(Handler<Action<bool, TOut>> handler)
{
return handler.IsActive;
}
private static bool IsActivePut((Handler<Action<bool>>, TIn) input)
{
return input.Item1.IsActive;
}
/// <summary>
/// Tries to put a put into the channel
/// </summary>
/// <param name="val"></param>
/// <param name="handler"></param>
/// <returns>(result_type, w)</returns>
public override (AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler)
{
Monitor.Enter(this);
if (_isClosed)
{
Monitor.Exit(this);
return (AsyncResult.Completed, false);
}
if (_buf != null && !_buf.IsFull && !_takes.IsEmpty)
{
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
var is_done = _add(_buf, val);
if (!_buf.IsEmpty)
{
var take_cbs = GetTakersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var action in take_cbs)
{
Task.Run(action);
}
return (AsyncResult.Completed, true);
}
if (is_done)
Abort();
Monitor.Exit(this);
return (AsyncResult.Closed, false);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, false);
}
var (put_cb2, take_cb) = GetCallbacks(handler, _takes);
if (put_cb2 != null && take_cb != null)
{
Monitor.Exit(this);
Task.Run(() => take_cb(true, _converter(val)));
return (AsyncResult.Completed, true);
}
if (_buf != null && !_buf.IsFull)
{
if (LockIfActiveCommit(handler) != null)
{
if (_add(_buf, val))
{
Abort();
}
Monitor.Exit(this);
return (AsyncResult.Completed, true);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, true);
}
if (handler.IsActive && handler.IsBlockable)
{
if (_puts.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_puts.UnboundedUnshift((handler, val));
}
Monitor.Exit(this);
return (AsyncResult.Enqueued, true);
}
public override (AsyncResult, TOut) Take(Handler<Action<bool, TOut>> handler)
{
Monitor.Enter(this);
Cleanup();
if (_buf != null && !_buf.IsEmpty)
{
var take_cb = LockIfActiveCommit(handler);
if (take_cb != null)
{
var val = _buf.Remove();
var (is_done, cbs) = GetPuttersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(() => cb(true));
return (AsyncResult.Completed, val);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, default);
}
var (take_cb2, put_cb, val2, found) = FindMatchingPut(handler);
if (take_cb2 != null && put_cb != null)
{
Monitor.Exit(this);
Task.Run(() => put_cb(true));
return (AsyncResult.Completed, _converter(val2));
}
if (_isClosed)
{
if (_buf != null && found)
_add(_buf, val2);
var has_val = _buf != null && !_buf.IsEmpty;
var take_cb3 = LockIfActiveCommit(handler);
if (take_cb3 != null)
{
var val = has_val ? _buf.Remove() : default;
Monitor.Exit(this);
return has_val ? (AsyncResult.Completed, val) : (AsyncResult.Closed, default);
}
Monitor.Exit(this);
return (AsyncResult.Closed, default);
}
if (handler.IsBlockable)
{
if (_takes.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_takes.UnboundedUnshift(handler);
}
Monitor.Exit(this);
return (AsyncResult.Enqueued, default);
}
public override bool IsClosed => _isClosed;
public override void Close()
{
Monitor.Enter(this);
Cleanup();
if (_isClosed)
{
Monitor.Exit(this);
return;
}
_isClosed = true;
if (_buf != null && _puts.IsEmpty)
_finalize(_buf);
var cbs = _buf == null? new List<Action>() : GetTakersForBuffer();
while (!_takes.IsEmpty)
{
var take_cb = LockIfActiveCommit(_takes.Pop());
if (take_cb != null)
cbs.Add(() => take_cb(false, default));
}
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(cb);
}
private (Action<bool, TOut>, Action<bool>, TIn, bool) FindMatchingPut(Handler<Action<bool, TOut>> handler)
{
while (!_puts.IsEmpty)
{
var (found, val) = _puts.Peek();
var (handler_cb, put_cb, handler_active, put_active) = LockIfActiveCommit(handler, found);
if (handler_active && put_active)
{
_puts.Pop();
return (handler_cb, put_cb, val, true);
}
if (!put_active)
{
_puts.Pop();
continue;
}
return (null, null, default, false);
}
return (null, null, default, false);
}
private (bool, List<Action<bool>>) GetPuttersForBuffer()
{
List<Action<bool>> acc = new List<Action<bool>>();
while (!_puts.IsEmpty)
{
var (putter, val) = _puts.Pop();
var cb = LockIfActiveCommit(putter);
if (cb != null)
{
acc.Add(cb);
}
var is_done = _add(_buf, val);
if (is_done || _buf.IsFull || _puts.IsEmpty)
return (is_done, acc);
}
return (false, acc);
}
private void Cleanup()
{
_takes.Cleanup(IsActiveTake);
_puts.Cleanup(IsActivePut);
}
private (T1, T2) GetCallbacks<T1, T2>(Handler<T1> handler, RingBuffer<Handler<T2>> queue)
{
while (!queue.IsEmpty)
{
var found = queue.Peek();
var (handler_cb, found_cb, handler_valid, found_valid) = LockIfActiveCommit(handler, found);
if (handler_valid && found_valid)
{
queue.Pop();
return (handler_cb, found_cb);
}
if (handler_valid)
{
queue.Pop();
}
else
{
return (default, default);
}
}
return (default, default);
}
private void Abort()
{
while (!_puts.IsEmpty)
{
var (handler, val) = _puts.Pop();
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
Task.Run(() => put_cb(false));
}
}
_puts.Cleanup(x => false);
Close();
}
private List<Action> GetTakersForBuffer()
{
List<Action> ret = new List<Action>();
while (!_buf.IsEmpty && !_takes.IsEmpty)
{
var taker = _takes.Pop();
var take_cp = LockIfActiveCommit(taker);
if (take_cp != null)
{
var val = _buf.Remove();
ret.Add(() => take_cp(true, val));
}
}
return ret;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static T LockIfActiveCommit<T>(Handler<T> handler)
{
lock (handler)
{
return handler.IsActive ? handler.Commit() : default;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static (T1, T2, bool, bool) LockIfActiveCommit<T1, T2>(Handler<T1> handler1, Handler<T2> handler2)
{
if (handler1.LockId < handler2.LockId)
{
Monitor.Enter(handler1);
Monitor.Enter(handler2);
}
else
{
Monitor.Enter(handler2);
Monitor.Enter(handler1);
}
if (handler1.IsActive && handler2.IsActive)
{
var ret1 = (handler1.Commit(), handler2.Commit(), true, true);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret1;
}
var ret2 = (default(T1), default(T2), handler1.IsActive, handler2.IsActive);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret2;
}
public class TooManyHanldersException : Exception
{
public override string ToString()
{
return $"No more than {MAX_QUEUE_SIZE} pending operations allowed on a single channel.";
}
}
}
}