using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
///
/// An almost 1:1 port of Clojure's core.async channels
///
public class ManyToManyChannel : AChannel
{
public const int MAX_QUEUE_SIZE = 1024;
private RingBuffer>> _takes = new RingBuffer>>(8);
private RingBuffer<(Handler>, TIn)> _puts = new RingBuffer<(Handler>, TIn)>(8);
private IBuffer _buf;
private Func, TIn, bool> _add;
private Action> _finalize;
private Func _converter;
volatile bool _isClosed = false;
public ManyToManyChannel(Func converter)
{
_buf = null;
_add = null;
_finalize = null;
_converter = converter;
}
public ManyToManyChannel(Func converter, Func, TIn, bool> add, Action> finalize, IBuffer buffer)
{
_buf = buffer;
_add = add;
_finalize = finalize;
_converter = converter;
}
private static bool IsActiveTake(Handler> handler)
{
return handler.IsActive;
}
private static bool IsActivePut((Handler>, TIn) input)
{
return input.Item1.IsActive;
}
///
/// Tries to put a put into the channel
///
///
///
/// (result_type, w)
public override (AsyncResult, bool) Put(TIn val, Handler> handler)
{
Monitor.Enter(this);
if (_isClosed)
{
Monitor.Exit(this);
return (AsyncResult.Completed, false);
}
if (_buf != null && !_buf.IsFull && !_takes.IsEmpty)
{
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
var is_done = _add(_buf, val);
if (!_buf.IsEmpty)
{
var take_cbs = GetTakersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var action in take_cbs)
{
Task.Run(action);
}
return (AsyncResult.Completed, true);
}
if (is_done)
Abort();
Monitor.Exit(this);
return (AsyncResult.Closed, false);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, false);
}
var (put_cb2, take_cb) = GetCallbacks(handler, _takes);
if (put_cb2 != null && take_cb != null)
{
Monitor.Exit(this);
Task.Run(() => take_cb(true, _converter(val)));
return (AsyncResult.Completed, true);
}
if (_buf != null && !_buf.IsFull)
{
if (LockIfActiveCommit(handler) != null)
{
if (_add(_buf, val))
{
Abort();
}
Monitor.Exit(this);
return (AsyncResult.Completed, true);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, true);
}
if (handler.IsActive && handler.IsBlockable)
{
if (_puts.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_puts.UnboundedUnshift((handler, val));
}
Monitor.Exit(this);
return (AsyncResult.Enqueued, true);
}
public override (AsyncResult, TOut) Take(Handler> handler)
{
Monitor.Enter(this);
Cleanup();
if (_buf != null && !_buf.IsEmpty)
{
var take_cb = LockIfActiveCommit(handler);
if (take_cb != null)
{
var val = _buf.Remove();
var (is_done, cbs) = GetPuttersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(() => cb(true));
return (AsyncResult.Completed, val);
}
Monitor.Exit(this);
return (AsyncResult.Canceled, default);
}
var (take_cb2, put_cb, val2, found) = FindMatchingPut(handler);
if (take_cb2 != null && put_cb != null)
{
Monitor.Exit(this);
Task.Run(() => put_cb(true));
return (AsyncResult.Completed, _converter(val2));
}
if (_isClosed)
{
if (_buf != null && found)
_add(_buf, val2);
var has_val = _buf != null && !_buf.IsEmpty;
var take_cb3 = LockIfActiveCommit(handler);
if (take_cb3 != null)
{
var val = has_val ? _buf.Remove() : default;
Monitor.Exit(this);
return has_val ? (AsyncResult.Completed, val) : (AsyncResult.Closed, default);
}
Monitor.Exit(this);
return (AsyncResult.Closed, default);
}
if (handler.IsBlockable)
{
if (_takes.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_takes.UnboundedUnshift(handler);
}
Monitor.Exit(this);
return (AsyncResult.Enqueued, default);
}
public override bool IsClosed => _isClosed;
public override void Close()
{
Monitor.Enter(this);
Cleanup();
if (_isClosed)
{
Monitor.Exit(this);
return;
}
_isClosed = true;
if (_buf != null && _puts.IsEmpty)
_finalize(_buf);
var cbs = _buf == null? new List() : GetTakersForBuffer();
while (!_takes.IsEmpty)
{
var take_cb = LockIfActiveCommit(_takes.Pop());
if (take_cb != null)
cbs.Add(() => take_cb(false, default));
}
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(cb);
}
private (Action, Action, TIn, bool) FindMatchingPut(Handler> handler)
{
while (!_puts.IsEmpty)
{
var (found, val) = _puts.Peek();
var (handler_cb, put_cb, handler_active, put_active) = LockIfActiveCommit(handler, found);
if (handler_active && put_active)
{
_puts.Pop();
return (handler_cb, put_cb, val, true);
}
if (!put_active)
{
_puts.Pop();
continue;
}
return (null, null, default, false);
}
return (null, null, default, false);
}
private (bool, List>) GetPuttersForBuffer()
{
List> acc = new List>();
while (!_puts.IsEmpty)
{
var (putter, val) = _puts.Pop();
var cb = LockIfActiveCommit(putter);
if (cb != null)
{
acc.Add(cb);
}
var is_done = _add(_buf, val);
if (is_done || _buf.IsFull || _puts.IsEmpty)
return (is_done, acc);
}
return (false, acc);
}
private void Cleanup()
{
_takes.Cleanup(IsActiveTake);
_puts.Cleanup(IsActivePut);
}
private (T1, T2) GetCallbacks(Handler handler, RingBuffer> queue)
{
while (!queue.IsEmpty)
{
var found = queue.Peek();
var (handler_cb, found_cb, handler_valid, found_valid) = LockIfActiveCommit(handler, found);
if (handler_valid && found_valid)
{
queue.Pop();
return (handler_cb, found_cb);
}
if (handler_valid)
{
queue.Pop();
}
else
{
return (default, default);
}
}
return (default, default);
}
private void Abort()
{
while (!_puts.IsEmpty)
{
var (handler, val) = _puts.Pop();
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
Task.Run(() => put_cb(false));
}
}
_puts.Cleanup(x => false);
Close();
}
private List GetTakersForBuffer()
{
List ret = new List();
while (!_buf.IsEmpty && !_takes.IsEmpty)
{
var taker = _takes.Pop();
var take_cp = LockIfActiveCommit(taker);
if (take_cp != null)
{
var val = _buf.Remove();
ret.Add(() => take_cp(true, val));
}
}
return ret;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static T LockIfActiveCommit(Handler handler)
{
lock (handler)
{
return handler.IsActive ? handler.Commit() : default;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static (T1, T2, bool, bool) LockIfActiveCommit(Handler handler1, Handler handler2)
{
if (handler1.LockId < handler2.LockId)
{
Monitor.Enter(handler1);
Monitor.Enter(handler2);
}
else
{
Monitor.Enter(handler2);
Monitor.Enter(handler1);
}
if (handler1.IsActive && handler2.IsActive)
{
var ret1 = (handler1.Commit(), handler2.Commit(), true, true);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret1;
}
var ret2 = (default(T1), default(T2), handler1.IsActive, handler2.IsActive);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret2;
}
public class TooManyHanldersException : Exception
{
public override string ToString()
{
return $"No more than {MAX_QUEUE_SIZE} pending operations allowed on a single channel.";
}
}
}
}