diff --git a/Wabbajack.Common/CSP/AChannel.cs b/Wabbajack.Common/CSP/AChannel.cs new file mode 100644 index 00000000..c7d36ac5 --- /dev/null +++ b/Wabbajack.Common/CSP/AChannel.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public abstract class AChannel : IChannel + { + public abstract bool IsClosed { get; } + public abstract void Close(); + public abstract Box Put(TIn val, Handler> handler); + public abstract Box Take(Handler>> handler); + + public Task Take(bool onCaller) + { + var tcs = new TaskCompletionSource(); + var handler = new TakeTaskHandler(tcs); + var result = Take(handler); + if (result.IsSet) + tcs.SetResult(result.Value); + return handler.Task; + } + + public Task Put(TIn val, bool onCaller) + { + var tcs = new TaskCompletionSource(); + var handler = new PutTaskHandler(tcs); + var result = Put(val, handler); + if (result.IsSet) + tcs.SetResult(result.Value); + return tcs.Task; + } + + + } +} diff --git a/Wabbajack.Common/CSP/Box.cs b/Wabbajack.Common/CSP/Box.cs new file mode 100644 index 00000000..356aa9f5 --- /dev/null +++ b/Wabbajack.Common/CSP/Box.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace Wabbajack.Common.CSP +{ + public struct Box + { + public T Value; + public bool IsSet; + + public Box(T value) + { + Value = value; + IsSet = true; + } + + + public static Box Empty = new Box(); + } + + class test : IValueTaskSource + { + public ValueTaskSourceStatus GetStatus(short token) + { + throw new NotImplementedException(); + } + + public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) + { + throw new NotImplementedException(); + } + + public void GetResult(short token) + { + throw new NotImplementedException(); + } + } +} diff --git a/Wabbajack.Common/CSP/CSPExtensions.cs b/Wabbajack.Common/CSP/CSPExtensions.cs new file mode 100644 index 00000000..ce23bc54 --- /dev/null +++ b/Wabbajack.Common/CSP/CSPExtensions.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public static class CSPExtensions + { + public static async Task OntoChannel(this IEnumerable coll, IChannel chan) + { + foreach (var val in coll) + { + if (!await chan.Put(val)) break; + } + } + + /// + /// Turns a IEnumerable collection into a channel. Note, computation of the enumerable will happen inside + /// the lock of the channel, so try to keep the work of the enumerable light. + /// + /// + /// Collection to spool out of the channel. + /// + public static IChannel ToChannel(this IEnumerable coll) + { + return Channel.Create(coll.GetEnumerator()); + } + + + public static async Task> TakeAll(this IChannel chan) + { + List acc = new List(); + while (true) + { + acc.Add(await chan.Take()); + } + return acc; + } + } +} diff --git a/Wabbajack.Common/CSP/Channel.cs b/Wabbajack.Common/CSP/Channel.cs new file mode 100644 index 00000000..90c1a3de --- /dev/null +++ b/Wabbajack.Common/CSP/Channel.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public static class Channel + { + /// + /// Creates a channel without a buffer, and with no conversion function. This provides a syncronization + /// point, where all puts are matched 1:1 with takes. + /// + /// + /// The type of values transferred by the channel + /// A new channel + public static IChannel Create() + { + return new ManyToManyChannel(x => x); + } + + /// + /// Creates a channel with a given enumerator as the starting buffer. Values will not be puttable into this channel + /// and it will start closed. This is a easy way to spool a collection onto a channel. Note: the enumerator will be + /// run inside the channel's lock, so it may not be wise to pass in an enumerator that performs heavy computation. + /// + /// A IEnumerator to use as the contents of the channel + /// The type of values transferred by the channel + /// A new channel + public static IChannel Create(IEnumerator e) + { + var chan = new ManyToManyChannel(x => x, (_, __) => false, _ => {}, new EnumeratorBuffer(e)); + chan.Close(); + return chan; + } + + + public static IChannel Create(int buffer_size) + { + var buffer = new FixedSizeBuffer(buffer_size); + return new ManyToManyChannel(x => x, (buff, itm) => + { + buff.Add(itm); + return false; + }, + b => {}, buffer); + } + } +} diff --git a/Wabbajack.Common/CSP/EnumeratorBuffer.cs b/Wabbajack.Common/CSP/EnumeratorBuffer.cs new file mode 100644 index 00000000..46697255 --- /dev/null +++ b/Wabbajack.Common/CSP/EnumeratorBuffer.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + class EnumeratorBuffer : IBuffer + { + private readonly IEnumerator _enumerator; + private bool _empty; + + public EnumeratorBuffer(IEnumerator enumerator) + { + _enumerator = enumerator; + _empty = !_enumerator.MoveNext(); + } + + public void Dispose() + { + } + + public bool IsFull => true; + public bool IsEmpty => _empty; + public T Remove() + { + var val = _enumerator.Current; + _empty = !_enumerator.MoveNext(); + return val; + } + + public void Add(T itm) + { + throw new InvalidDataException(); + } + } +} diff --git a/Wabbajack.Common/CSP/FixedSizeBuffer.cs b/Wabbajack.Common/CSP/FixedSizeBuffer.cs new file mode 100644 index 00000000..249f95a8 --- /dev/null +++ b/Wabbajack.Common/CSP/FixedSizeBuffer.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public class FixedSizeBuffer : IBuffer + { + private int _size; + private RingBuffer _buffer; + + public FixedSizeBuffer(int size) + { + _size = size; + _buffer = new RingBuffer(size); + } + + public void Dispose() + { + } + + public bool IsFull => _buffer.Length >= _size; + public bool IsEmpty => _buffer.IsEmpty; + public T Remove() + { + return _buffer.Pop(); + } + + public void Add(T itm) + { + _buffer.UnboundedUnshift(itm); + } + } +} diff --git a/Wabbajack.Common/CSP/FnHandler.cs b/Wabbajack.Common/CSP/FnHandler.cs new file mode 100644 index 00000000..8ffc5ba4 --- /dev/null +++ b/Wabbajack.Common/CSP/FnHandler.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public class FnHandler : Handler + { + private readonly bool _blockable; + private T _f; + + public FnHandler(T f, bool blockable=false) + { + _f = f; + _blockable = blockable; + } + + public bool IsActive => true; + public bool IsBlockable => _blockable; + public uint LockId => 0; + public T Commit() + { + return _f; + } + } +} diff --git a/Wabbajack.Common/CSP/Handler.cs b/Wabbajack.Common/CSP/Handler.cs new file mode 100644 index 00000000..b5cdf199 --- /dev/null +++ b/Wabbajack.Common/CSP/Handler.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public interface Handler + { + /// + /// Returns true if this handler has a callback, must work without a lock + /// + bool IsActive { get; } + + /// + /// Returns true if this handler may be blocked, otherwise it must not block + /// + bool IsBlockable { get; } + + /// + /// A unique id for lock aquisition order, 0 if no lock + /// + uint LockId { get; } + + /// + /// Commit to fulfilling its end of the transfer, returns cb, must be called within a lock + /// + /// A callback + T Commit(); + } +} diff --git a/Wabbajack.Common/CSP/IBuffer.cs b/Wabbajack.Common/CSP/IBuffer.cs new file mode 100644 index 00000000..32145549 --- /dev/null +++ b/Wabbajack.Common/CSP/IBuffer.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Windows.Forms.VisualStyles; + +namespace Wabbajack.Common.CSP +{ + public interface IBuffer : IDisposable + { + bool IsFull { get; } + bool IsEmpty { get; } + T Remove(); + void Add(T itm); + } +} diff --git a/Wabbajack.Common/CSP/IChannel.cs b/Wabbajack.Common/CSP/IChannel.cs new file mode 100644 index 00000000..98824f8b --- /dev/null +++ b/Wabbajack.Common/CSP/IChannel.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public interface IChannel + { + bool IsClosed { get; } + void Close(); + + Box Put(TIn val, Handler> handler); + Box Take(Handler>> handler); + Task Take(bool onCaller = true); + Task Put(TIn val, bool onCaller = true); + } +} diff --git a/Wabbajack.Common/CSP/ManyToManyChannel.cs b/Wabbajack.Common/CSP/ManyToManyChannel.cs new file mode 100644 index 00000000..e78ac9f0 --- /dev/null +++ b/Wabbajack.Common/CSP/ManyToManyChannel.cs @@ -0,0 +1,384 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.ServiceModel; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; +using System.Windows.Forms; +using System.Windows.Forms.VisualStyles; +using ICSharpCode.SharpZipLib.Zip; +using YamlDotNet.Serialization.NodeTypeResolvers; + +namespace Wabbajack.Common.CSP +{ + /// + /// An almost 1:1 port of Clojure's core.async channels + /// + + public class ManyToManyChannel : AChannel + { + public const int MAX_QUEUE_SIZE = 1024; + + private RingBuffer>>> _takes = new RingBuffer>>>(8); + private RingBuffer<(Handler>, TIn)> _puts = new RingBuffer<(Handler>, TIn)>(8); + private IBuffer _buf; + private Func, TIn, bool> _add; + private Action> _finalize; + private Func _converter; + bool _isClosed = false; + + public ManyToManyChannel(Func converter) + { + _buf = null; + _add = null; + _finalize = null; + _converter = converter; + } + + public ManyToManyChannel(Func converter, Func, TIn, bool> add, Action> finalize, IBuffer buffer) + { + _buf = buffer; + _add = add; + _finalize = finalize; + _converter = converter; + } + + private static bool IsActiveTake(Handler>> handler) + { + return handler.IsActive; + } + + private static bool IsActivePut((Handler>, TIn) input) + { + return input.Item1.IsActive; + } + + public override Box Put(TIn val, Handler> handler) + { + Monitor.Enter(this); + if (_isClosed) + { + Monitor.Exit(this); + return new Box(false); + } + + if (_buf != null && !_buf.IsFull && !_takes.IsEmpty) + { + var put_cb = LockIfActiveCommit(handler); + if (put_cb != null) + { + var is_done = _add(_buf, val); + if (!_buf.IsEmpty) + { + var take_cbs = GetTakersForBuffer(); + if (is_done) + Abort(); + Monitor.Exit(this); + foreach (var action in take_cbs) + { + Task.Run(() => action()); + } + } + else + { + if (is_done) + Abort(); + Monitor.Exit(this); + return Box.Empty; + } + return new Box(true); + } + Monitor.Exit(this); + return Box.Empty; + } + + var (put_cb2, take_cb) = GetCallbacks(handler, _takes); + + if (put_cb2 != null && take_cb != null) + { + Monitor.Exit(this); + Task.Run(() => take_cb(new Box(_converter(val)))); + return new Box(true); + } + + if (_buf != null && !_buf.IsFull) + { + if (LockIfActiveCommit(handler) != null) + { + if (_add(_buf, val)) + { + Abort(); + } + Monitor.Exit(this); + return new Box(true); + } + Monitor.Exit(this); + return Box.Empty; + } + + if (handler.IsActive && handler.IsBlockable) + { + if (_puts.Length >= MAX_QUEUE_SIZE) + { + Monitor.Exit(this); + throw new TooManyHanldersException(); + } + _puts.Unshift((handler, val)); + } + Monitor.Exit(this); + return Box.Empty; + } + + public override Box Take(Handler>> handler) + { + Monitor.Enter(this); + Cleanup(); + + if (_buf != null && !_buf.IsEmpty) + { + var take_cb = LockIfActiveCommit(handler); + if (take_cb != null) + { + var val = _buf.Remove(); + var (is_done, cbs) = GetPuttersForBuffer(); + + if (is_done) + Abort(); + + Monitor.Exit(this); + + foreach (var cb in cbs) + Task.Run(() => cb(true)); + + return new Box(val); + + } + Monitor.Exit(this); + return Box.Empty; + } + + var (take_cb2, put_cb, val2, found) = FindMatchingPut(handler); + + if (take_cb2 != null && put_cb != null) + { + Monitor.Exit(this); + Task.Run(() => put_cb(true)); + return new Box(_converter(val2)); + } + + if (_isClosed) + { + if (_buf != null && found) + _add(_buf, val2); + + var has_val = _buf != null && !_buf.IsEmpty; + var take_cb3 = LockIfActiveCommit(handler); + + if (take_cb3 != null) + { + var val = has_val ? _buf.Remove() : default; + Monitor.Exit(this); + return has_val ? new Box(val) : Box.Empty; + } + Monitor.Exit(this); + return Box.Empty; + } + + if (handler.IsBlockable) + { + if (_takes.Length >= MAX_QUEUE_SIZE) + throw new TooManyHanldersException(); + _takes.Unshift(handler); + + } + Monitor.Exit(this); + return Box.Empty; + } + + public override bool IsClosed => _isClosed; + + public override void Close() + { + Monitor.Enter(this); + Cleanup(); + if (_isClosed) + { + Monitor.Exit(this); + return; + } + + _isClosed = true; + if (_buf != null && _puts.IsEmpty) + _finalize(_buf); + } + + private (Action>, Action, TIn, bool) FindMatchingPut(Handler>> handler) + { + while (!_puts.IsEmpty) + { + var (found, val) = _puts.Peek(); + var (handler_cb, put_cb, handler_active, put_active) = LockIfActiveCommit(handler, found); + + if (handler_active && put_active) + { + _puts.Pop(); + return (handler_cb, put_cb, val, true); + } + + if (!put_active) + { + _puts.Pop(); + continue; + } + + return (null, null, default, false); + + } + + return (null, null, default, false); + } + + private (bool, List>) GetPuttersForBuffer() + { + List> acc = new List>(); + + while (!_puts.IsEmpty) + { + var (putter, val) = _puts.Pop(); + var cb = LockIfActiveCommit(putter); + if (cb != null) + { + acc.Add(cb); + } + + var is_done = _add(_buf, val); + if (is_done || _buf.IsFull || _puts.IsEmpty) + return (is_done, acc); + } + + return (false, acc); + } + + private void Cleanup() + { + _takes.Cleanup(IsActiveTake); + _puts.Cleanup(IsActivePut); + } + + private (T1, T2) GetCallbacks(Handler handler, RingBuffer> queue) + { + while (!queue.IsEmpty) + { + var found = queue.Peek(); + var (handler_cb, found_cb, handler_valid, found_valid) = LockIfActiveCommit(handler, found); + if (handler_valid && found_valid) + { + queue.Pop(); + return (handler_cb, found_cb); + } + + if (handler_valid) + { + queue.Pop(); + } + else + { + return (default, default); + } + } + + return (default, default); + } + + private void Abort() + { + while (!_puts.IsEmpty) + { + var (handler, val) = _puts.Pop(); + var put_cb = LockIfActiveCommit(handler); + if (put_cb != null) + { + Task.Run(() => put_cb(true)); + } + } + _puts.Cleanup(x => false); + Close(); + } + + private List GetTakersForBuffer() + { + List ret = new List(); + while (!_buf.IsEmpty && !_takes.IsEmpty) + { + var taker = _takes.Pop(); + var take_cp = LockIfActiveCommit(taker); + if (take_cp != null) + { + var val = _buf.Remove(); + ret.Add(() => take_cp(new Box(val))); + } + } + + return ret; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static T IfActiveCommit(Handler handler) + { + return handler.IsActive ? handler.Commit() : default; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static T LockIfActiveCommit(Handler handler) + { + lock (handler) + { + return handler.IsActive ? handler.Commit() : default; + } + } + + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static (T1, T2, bool, bool) LockIfActiveCommit(Handler handler1, Handler handler2) + { + if (handler1.LockId < handler2.LockId) + { + Monitor.Enter(handler1); + Monitor.Enter(handler2); + } + else + { + Monitor.Enter(handler2); + Monitor.Enter(handler1); + } + + if (handler1.IsActive && handler2.IsActive) + { + var ret1 = (handler1.Commit(), handler2.Commit(), true, true); + Monitor.Exit(handler1); + Monitor.Exit(handler2); + return ret1; + } + + var ret2 = (default(T1), default(T2), handler1.IsActive, handler2.IsActive); + Monitor.Exit(handler1); + Monitor.Exit(handler2); + return ret2; + } + + public class TooManyHanldersException : Exception + { + public override string ToString() + { + return $"No more than {MAX_QUEUE_SIZE} pending operations allowed on a single channel."; + } + + } + } + + +} diff --git a/Wabbajack.Common/CSP/PutTaskHandler.cs b/Wabbajack.Common/CSP/PutTaskHandler.cs new file mode 100644 index 00000000..cd24a66d --- /dev/null +++ b/Wabbajack.Common/CSP/PutTaskHandler.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + class PutTaskHandler : Handler> + { + private readonly bool _blockable; + private readonly TaskCompletionSource _tcs; + + public PutTaskHandler(TaskCompletionSource tcs, bool blockable = true) + { + _blockable = blockable; + _tcs = tcs ?? new TaskCompletionSource(); + } + + public bool IsActive => true; + public bool IsBlockable => _blockable; + public uint LockId => 0; + public Action Commit() + { + return Handle; + } + + private void Handle(bool val) + { + _tcs.SetResult(val); + } + } +} diff --git a/Wabbajack.Common/CSP/RingBuffer.cs b/Wabbajack.Common/CSP/RingBuffer.cs new file mode 100644 index 00000000..ea04333a --- /dev/null +++ b/Wabbajack.Common/CSP/RingBuffer.cs @@ -0,0 +1,108 @@ +using System; +using System.CodeDom.Compiler; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Windows; +using System.Windows.Forms; + +namespace Wabbajack.Common.CSP +{ + public struct RingBuffer + { + private int _size; + private int _length; + private int _tail; + private int _head; + private T[] _arr; + + public RingBuffer(int size = 8) + { + _size = size; + _arr = new T[size]; + _tail = 0; + _length = 0; + _head = 0; + } + + public T Pop() + { + if (_length == 0) return default; + var val = _arr[_tail]; + _arr[_tail] = default; + _tail = (_tail + 1) % _size; + _length -= 1; + return val; + } + + public T Peek() + { + if (_length == 0) return default; + return _arr[_tail]; + } + + public void Unshift(T x) + { + _arr[_head] = x; + _head = (_head + 1) % _size; + _length += 1; + } + + public void UnboundedUnshift(T x) + { + if (_length == _size) + Resize(); + Unshift(x); + } + + public bool IsEmpty => _length == 0; + public int Length => _length; + + private void Resize() + { + var new_arr_size = _size * 2; + var new_arr = new T[new_arr_size]; + if (_tail < _head) + { + Array.Copy(_arr, _tail, new_arr, 0, _length); + _tail = 0; + _head = _length; + _arr = new_arr; + _size = new_arr_size; + } + else if (_tail > _head) + { + Array.Copy(_arr, _tail, new_arr, 0, _length - _tail); + Array.Copy(_arr, 0, new_arr, (_length - _tail), _head); + _tail = 0; + _head = _length; + _arr = new_arr; + _size = new_arr_size; + } + else + { + _tail = 0; + _head = 0; + _arr = new_arr; + _size = new_arr_size; + } + } + + /// + /// Filers out all items where should_keep(itm) returns false + /// + /// + public void Cleanup(Func should_keep) + { + for (var idx = 0; idx < _length; idx++) + { + var v = Pop(); + if (should_keep(v)) + { + Unshift(v); + } + } + } + } +} diff --git a/Wabbajack.Common/CSP/TakeTaskHandler.cs b/Wabbajack.Common/CSP/TakeTaskHandler.cs new file mode 100644 index 00000000..566aa8d7 --- /dev/null +++ b/Wabbajack.Common/CSP/TakeTaskHandler.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; + +namespace Wabbajack.Common.CSP +{ + public class TakeTaskHandler : Handler>> + { + private readonly bool _blockable; + private readonly TaskCompletionSource _tcs; + + public TakeTaskHandler(TaskCompletionSource tcs = null, bool blockable = true) + { + _blockable = blockable; + _tcs = tcs ?? new TaskCompletionSource(); + } + + + public bool IsActive => true; + public bool IsBlockable => _blockable; + public uint LockId => 0; + public Task Task => _tcs.Task; + public Action> Commit() + { + return Handle; + } + + private void Handle(Box a) + { + if (a.IsSet) + _tcs.SetResult(a.Value); + _tcs.SetCanceled(); + } + } +} diff --git a/Wabbajack.Common/ChannelStreams.cs b/Wabbajack.Common/ChannelStreams.cs new file mode 100644 index 00000000..28bc99a5 --- /dev/null +++ b/Wabbajack.Common/ChannelStreams.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Wabbajack.Common +{ + public static class ChannelStreams + { + public static async Task IntoChannel(this IEnumerable coll, Channel dest, bool closeAtEnd = false) + { + foreach (var itm in coll) + { + await dest.Writer.WriteAsync(itm); + } + + if (closeAtEnd) + dest.Writer.Complete(); + } + + public static Channel AsChannel(this IEnumerable coll) + { + var chan = Channel.CreateUnbounded(); + coll.IntoChannel(chan, true); + return chan; + } + + public static async Task> ToIEnumerable(this Channel src) + { + var buffer = new List(); + while (true) + { + var result = await src.Reader.ReadAsync(); + buffer.Add(result); + } + + return buffer; + } + } +} diff --git a/Wabbajack.Common/Wabbajack.Common.csproj b/Wabbajack.Common/Wabbajack.Common.csproj index 9485a93e..72dcf1d4 100644 --- a/Wabbajack.Common/Wabbajack.Common.csproj +++ b/Wabbajack.Common/Wabbajack.Common.csproj @@ -88,6 +88,20 @@ + + + + + + + + + + + + + + @@ -147,5 +161,6 @@ 8.0.0 + \ No newline at end of file diff --git a/Wabbajack.Test/CSPTests.cs b/Wabbajack.Test/CSPTests.cs new file mode 100644 index 00000000..2d1aeca2 --- /dev/null +++ b/Wabbajack.Test/CSPTests.cs @@ -0,0 +1,50 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Wabbajack.Common.CSP; + +namespace Wabbajack.Test +{ + [TestClass] + public class CSPTests + { + [TestMethod] + public async Task TestTakePutBlocking() + { + var channel = Channel.Create(); + var ptask = channel.Put(1); + Assert.AreEqual(1, await channel.Take()); + Assert.IsTrue(await ptask); + + } + + [TestMethod] + public async Task TestTakePutBuffered() + { + var channel = Channel.Create(10); + foreach (var itm in Enumerable.Range(0, 10)) + await channel.Put(itm); + + foreach (var itm in Enumerable.Range(0, 10)) + Assert.AreEqual(itm, await channel.Take()); + } + + [TestMethod] + public async Task TestToChannel() + { + var channel = Enumerable.Range(0, 10).ToChannel(); + + foreach (var itm in Enumerable.Range(0, 10)) + Assert.AreEqual(itm, await channel.Take()); + } + + [TestMethod] + public async Task TestTakeAll() + { + var results = await Enumerable.Range(0, 10).ToChannel().TakeAll(); + + Assert.AreEqual(Enumerable.Range(0, 10).ToList(), results); + } + } +} diff --git a/Wabbajack.Test/Wabbajack.Common.Tests/ChannelStreamsTests.cs b/Wabbajack.Test/Wabbajack.Common.Tests/ChannelStreamsTests.cs new file mode 100644 index 00000000..a53e0b7f --- /dev/null +++ b/Wabbajack.Test/Wabbajack.Common.Tests/ChannelStreamsTests.cs @@ -0,0 +1,19 @@ +using System; +using System.Linq; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Wabbajack.Common; + +namespace Wabbajack.Test.Wabbajack.Common.Tests +{ + [TestClass] + public class ChannelStreamsTests + { + [TestMethod] + public void ToAndFromChannel() + { + var src = Enumerable.Range(0, 10).ToList(); + var result = src.AsChannel().ToIEnumerable(); + Assert.AreEqual(src, result); + } + } +} diff --git a/Wabbajack.Test/Wabbajack.Test.csproj b/Wabbajack.Test/Wabbajack.Test.csproj index 6079c766..1ee2fd18 100644 --- a/Wabbajack.Test/Wabbajack.Test.csproj +++ b/Wabbajack.Test/Wabbajack.Test.csproj @@ -93,6 +93,7 @@ +