WIP channels

This commit is contained in:
Timothy Baldridge 2019-11-08 23:37:05 -07:00
parent 2fb857a093
commit a59e39deaa
19 changed files with 1035 additions and 0 deletions

View File

@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public abstract class AChannel<TIn, TOut> : IChannel<TIn, TOut>
{
public abstract bool IsClosed { get; }
public abstract void Close();
public abstract Box<bool> Put(TIn val, Handler<Action<bool>> handler);
public abstract Box<TOut> Take(Handler<Action<Box<TOut>>> handler);
public Task<TOut> Take(bool onCaller)
{
var tcs = new TaskCompletionSource<TOut>();
var handler = new TakeTaskHandler<TOut>(tcs);
var result = Take(handler);
if (result.IsSet)
tcs.SetResult(result.Value);
return handler.Task;
}
public Task<bool> Put(TIn val, bool onCaller)
{
var tcs = new TaskCompletionSource<bool>();
var handler = new PutTaskHandler<bool>(tcs);
var result = Put(val, handler);
if (result.IsSet)
tcs.SetResult(result.Value);
return tcs.Task;
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace Wabbajack.Common.CSP
{
public struct Box<T>
{
public T Value;
public bool IsSet;
public Box(T value)
{
Value = value;
IsSet = true;
}
public static Box<T> Empty = new Box<T>();
}
class test : IValueTaskSource
{
public ValueTaskSourceStatus GetStatus(short token)
{
throw new NotImplementedException();
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
throw new NotImplementedException();
}
public void GetResult(short token)
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public static class CSPExtensions
{
public static async Task OntoChannel<TIn, TOut>(this IEnumerable<TIn> coll, IChannel<TIn, TOut> chan)
{
foreach (var val in coll)
{
if (!await chan.Put(val)) break;
}
}
/// <summary>
/// Turns a IEnumerable collection into a channel. Note, computation of the enumerable will happen inside
/// the lock of the channel, so try to keep the work of the enumerable light.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="coll">Collection to spool out of the channel.</param>
/// <returns></returns>
public static IChannel<T, T> ToChannel<T>(this IEnumerable<T> coll)
{
return Channel.Create(coll.GetEnumerator());
}
public static async Task<List<TOut>> TakeAll<TOut, TIn>(this IChannel<TIn, TOut> chan)
{
List<TOut> acc = new List<TOut>();
while (true)
{
acc.Add(await chan.Take());
}
return acc;
}
}
}

View File

@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public static class Channel
{
/// <summary>
/// Creates a channel without a buffer, and with no conversion function. This provides a syncronization
/// point, where all puts are matched 1:1 with takes.
/// </summary>
/// <param name="bufferSize"></param>
/// <typeparam name="T">The type of values transferred by the channel</typeparam>
/// <returns>A new channel</returns>
public static IChannel<T, T> Create<T>()
{
return new ManyToManyChannel<T, T>(x => x);
}
/// <summary>
/// Creates a channel with a given enumerator as the starting buffer. Values will not be puttable into this channel
/// and it will start closed. This is a easy way to spool a collection onto a channel. Note: the enumerator will be
/// run inside the channel's lock, so it may not be wise to pass in an enumerator that performs heavy computation.
/// </summary>
/// <param name="e">A IEnumerator to use as the contents of the channel</param>
/// <typeparam name="T">The type of values transferred by the channel</typeparam>
/// <returns>A new channel</returns>
public static IChannel<T, T> Create<T>(IEnumerator<T> e)
{
var chan = new ManyToManyChannel<T, T>(x => x, (_, __) => false, _ => {}, new EnumeratorBuffer<T>(e));
chan.Close();
return chan;
}
public static IChannel<T, T> Create<T>(int buffer_size)
{
var buffer = new FixedSizeBuffer<T>(buffer_size);
return new ManyToManyChannel<T, T>(x => x, (buff, itm) =>
{
buff.Add(itm);
return false;
},
b => {}, buffer);
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
class EnumeratorBuffer<T> : IBuffer<T>
{
private readonly IEnumerator<T> _enumerator;
private bool _empty;
public EnumeratorBuffer(IEnumerator<T> enumerator)
{
_enumerator = enumerator;
_empty = !_enumerator.MoveNext();
}
public void Dispose()
{
}
public bool IsFull => true;
public bool IsEmpty => _empty;
public T Remove()
{
var val = _enumerator.Current;
_empty = !_enumerator.MoveNext();
return val;
}
public void Add(T itm)
{
throw new InvalidDataException();
}
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class FixedSizeBuffer<T> : IBuffer<T>
{
private int _size;
private RingBuffer<T> _buffer;
public FixedSizeBuffer(int size)
{
_size = size;
_buffer = new RingBuffer<T>(size);
}
public void Dispose()
{
}
public bool IsFull => _buffer.Length >= _size;
public bool IsEmpty => _buffer.IsEmpty;
public T Remove()
{
return _buffer.Pop();
}
public void Add(T itm)
{
_buffer.UnboundedUnshift(itm);
}
}
}

View File

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class FnHandler<T> : Handler<T>
{
private readonly bool _blockable;
private T _f;
public FnHandler(T f, bool blockable=false)
{
_f = f;
_blockable = blockable;
}
public bool IsActive => true;
public bool IsBlockable => _blockable;
public uint LockId => 0;
public T Commit()
{
return _f;
}
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public interface Handler<T>
{
/// <summary>
/// Returns true if this handler has a callback, must work without a lock
/// </summary>
bool IsActive { get; }
/// <summary>
/// Returns true if this handler may be blocked, otherwise it must not block
/// </summary>
bool IsBlockable { get; }
/// <summary>
/// A unique id for lock aquisition order, 0 if no lock
/// </summary>
uint LockId { get; }
/// <summary>
/// Commit to fulfilling its end of the transfer, returns cb, must be called within a lock
/// </summary>
/// <returns>A callback</returns>
T Commit();
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
namespace Wabbajack.Common.CSP
{
public interface IBuffer<T> : IDisposable
{
bool IsFull { get; }
bool IsEmpty { get; }
T Remove();
void Add(T itm);
}
}

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public interface IChannel<TIn, TOut>
{
bool IsClosed { get; }
void Close();
Box<bool> Put(TIn val, Handler<Action<bool>> handler);
Box<TOut> Take(Handler<Action<Box<TOut>>> handler);
Task<TOut> Take(bool onCaller = true);
Task<bool> Put(TIn val, bool onCaller = true);
}
}

View File

@ -0,0 +1,384 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.ServiceModel;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using System.Windows.Forms;
using System.Windows.Forms.VisualStyles;
using ICSharpCode.SharpZipLib.Zip;
using YamlDotNet.Serialization.NodeTypeResolvers;
namespace Wabbajack.Common.CSP
{
/// <summary>
/// An almost 1:1 port of Clojure's core.async channels
/// </summary>
public class ManyToManyChannel<TIn, TOut> : AChannel<TIn, TOut>
{
public const int MAX_QUEUE_SIZE = 1024;
private RingBuffer<Handler<Action<Box<TOut>>>> _takes = new RingBuffer<Handler<Action<Box<TOut>>>>(8);
private RingBuffer<(Handler<Action<bool>>, TIn)> _puts = new RingBuffer<(Handler<Action<bool>>, TIn)>(8);
private IBuffer<TOut> _buf;
private Func<IBuffer<TOut>, TIn, bool> _add;
private Action<IBuffer<TOut>> _finalize;
private Func<TIn, TOut> _converter;
bool _isClosed = false;
public ManyToManyChannel(Func<TIn, TOut> converter)
{
_buf = null;
_add = null;
_finalize = null;
_converter = converter;
}
public ManyToManyChannel(Func<TIn, TOut> converter, Func<IBuffer<TOut>, TIn, bool> add, Action<IBuffer<TOut>> finalize, IBuffer<TOut> buffer)
{
_buf = buffer;
_add = add;
_finalize = finalize;
_converter = converter;
}
private static bool IsActiveTake(Handler<Action<Box<TOut>>> handler)
{
return handler.IsActive;
}
private static bool IsActivePut((Handler<Action<bool>>, TIn) input)
{
return input.Item1.IsActive;
}
public override Box<bool> Put(TIn val, Handler<Action<bool>> handler)
{
Monitor.Enter(this);
if (_isClosed)
{
Monitor.Exit(this);
return new Box<bool>(false);
}
if (_buf != null && !_buf.IsFull && !_takes.IsEmpty)
{
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
var is_done = _add(_buf, val);
if (!_buf.IsEmpty)
{
var take_cbs = GetTakersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var action in take_cbs)
{
Task.Run(() => action());
}
}
else
{
if (is_done)
Abort();
Monitor.Exit(this);
return Box<bool>.Empty;
}
return new Box<bool>(true);
}
Monitor.Exit(this);
return Box<bool>.Empty;
}
var (put_cb2, take_cb) = GetCallbacks(handler, _takes);
if (put_cb2 != null && take_cb != null)
{
Monitor.Exit(this);
Task.Run(() => take_cb(new Box<TOut>(_converter(val))));
return new Box<bool>(true);
}
if (_buf != null && !_buf.IsFull)
{
if (LockIfActiveCommit(handler) != null)
{
if (_add(_buf, val))
{
Abort();
}
Monitor.Exit(this);
return new Box<bool>(true);
}
Monitor.Exit(this);
return Box<bool>.Empty;
}
if (handler.IsActive && handler.IsBlockable)
{
if (_puts.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException();
}
_puts.Unshift((handler, val));
}
Monitor.Exit(this);
return Box<bool>.Empty;
}
public override Box<TOut> Take(Handler<Action<Box<TOut>>> handler)
{
Monitor.Enter(this);
Cleanup();
if (_buf != null && !_buf.IsEmpty)
{
var take_cb = LockIfActiveCommit(handler);
if (take_cb != null)
{
var val = _buf.Remove();
var (is_done, cbs) = GetPuttersForBuffer();
if (is_done)
Abort();
Monitor.Exit(this);
foreach (var cb in cbs)
Task.Run(() => cb(true));
return new Box<TOut>(val);
}
Monitor.Exit(this);
return Box<TOut>.Empty;
}
var (take_cb2, put_cb, val2, found) = FindMatchingPut(handler);
if (take_cb2 != null && put_cb != null)
{
Monitor.Exit(this);
Task.Run(() => put_cb(true));
return new Box<TOut>(_converter(val2));
}
if (_isClosed)
{
if (_buf != null && found)
_add(_buf, val2);
var has_val = _buf != null && !_buf.IsEmpty;
var take_cb3 = LockIfActiveCommit(handler);
if (take_cb3 != null)
{
var val = has_val ? _buf.Remove() : default;
Monitor.Exit(this);
return has_val ? new Box<TOut>(val) : Box<TOut>.Empty;
}
Monitor.Exit(this);
return Box<TOut>.Empty;
}
if (handler.IsBlockable)
{
if (_takes.Length >= MAX_QUEUE_SIZE)
throw new TooManyHanldersException();
_takes.Unshift(handler);
}
Monitor.Exit(this);
return Box<TOut>.Empty;
}
public override bool IsClosed => _isClosed;
public override void Close()
{
Monitor.Enter(this);
Cleanup();
if (_isClosed)
{
Monitor.Exit(this);
return;
}
_isClosed = true;
if (_buf != null && _puts.IsEmpty)
_finalize(_buf);
}
private (Action<Box<TOut>>, Action<bool>, TIn, bool) FindMatchingPut(Handler<Action<Box<TOut>>> handler)
{
while (!_puts.IsEmpty)
{
var (found, val) = _puts.Peek();
var (handler_cb, put_cb, handler_active, put_active) = LockIfActiveCommit(handler, found);
if (handler_active && put_active)
{
_puts.Pop();
return (handler_cb, put_cb, val, true);
}
if (!put_active)
{
_puts.Pop();
continue;
}
return (null, null, default, false);
}
return (null, null, default, false);
}
private (bool, List<Action<bool>>) GetPuttersForBuffer()
{
List<Action<bool>> acc = new List<Action<bool>>();
while (!_puts.IsEmpty)
{
var (putter, val) = _puts.Pop();
var cb = LockIfActiveCommit(putter);
if (cb != null)
{
acc.Add(cb);
}
var is_done = _add(_buf, val);
if (is_done || _buf.IsFull || _puts.IsEmpty)
return (is_done, acc);
}
return (false, acc);
}
private void Cleanup()
{
_takes.Cleanup(IsActiveTake);
_puts.Cleanup(IsActivePut);
}
private (T1, T2) GetCallbacks<T1, T2>(Handler<T1> handler, RingBuffer<Handler<T2>> queue)
{
while (!queue.IsEmpty)
{
var found = queue.Peek();
var (handler_cb, found_cb, handler_valid, found_valid) = LockIfActiveCommit(handler, found);
if (handler_valid && found_valid)
{
queue.Pop();
return (handler_cb, found_cb);
}
if (handler_valid)
{
queue.Pop();
}
else
{
return (default, default);
}
}
return (default, default);
}
private void Abort()
{
while (!_puts.IsEmpty)
{
var (handler, val) = _puts.Pop();
var put_cb = LockIfActiveCommit(handler);
if (put_cb != null)
{
Task.Run(() => put_cb(true));
}
}
_puts.Cleanup(x => false);
Close();
}
private List<Action> GetTakersForBuffer()
{
List<Action> ret = new List<Action>();
while (!_buf.IsEmpty && !_takes.IsEmpty)
{
var taker = _takes.Pop();
var take_cp = LockIfActiveCommit(taker);
if (take_cp != null)
{
var val = _buf.Remove();
ret.Add(() => take_cp(new Box<TOut>(val)));
}
}
return ret;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static T IfActiveCommit<T>(Handler<T> handler)
{
return handler.IsActive ? handler.Commit() : default;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static T LockIfActiveCommit<T>(Handler<T> handler)
{
lock (handler)
{
return handler.IsActive ? handler.Commit() : default;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static (T1, T2, bool, bool) LockIfActiveCommit<T1, T2>(Handler<T1> handler1, Handler<T2> handler2)
{
if (handler1.LockId < handler2.LockId)
{
Monitor.Enter(handler1);
Monitor.Enter(handler2);
}
else
{
Monitor.Enter(handler2);
Monitor.Enter(handler1);
}
if (handler1.IsActive && handler2.IsActive)
{
var ret1 = (handler1.Commit(), handler2.Commit(), true, true);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret1;
}
var ret2 = (default(T1), default(T2), handler1.IsActive, handler2.IsActive);
Monitor.Exit(handler1);
Monitor.Exit(handler2);
return ret2;
}
public class TooManyHanldersException : Exception
{
public override string ToString()
{
return $"No more than {MAX_QUEUE_SIZE} pending operations allowed on a single channel.";
}
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
class PutTaskHandler<T> : Handler<Action<bool>>
{
private readonly bool _blockable;
private readonly TaskCompletionSource<bool> _tcs;
public PutTaskHandler(TaskCompletionSource<bool> tcs, bool blockable = true)
{
_blockable = blockable;
_tcs = tcs ?? new TaskCompletionSource<bool>();
}
public bool IsActive => true;
public bool IsBlockable => _blockable;
public uint LockId => 0;
public Action<bool> Commit()
{
return Handle;
}
private void Handle(bool val)
{
_tcs.SetResult(val);
}
}
}

View File

@ -0,0 +1,108 @@
using System;
using System.CodeDom.Compiler;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Forms;
namespace Wabbajack.Common.CSP
{
public struct RingBuffer<T>
{
private int _size;
private int _length;
private int _tail;
private int _head;
private T[] _arr;
public RingBuffer(int size = 8)
{
_size = size;
_arr = new T[size];
_tail = 0;
_length = 0;
_head = 0;
}
public T Pop()
{
if (_length == 0) return default;
var val = _arr[_tail];
_arr[_tail] = default;
_tail = (_tail + 1) % _size;
_length -= 1;
return val;
}
public T Peek()
{
if (_length == 0) return default;
return _arr[_tail];
}
public void Unshift(T x)
{
_arr[_head] = x;
_head = (_head + 1) % _size;
_length += 1;
}
public void UnboundedUnshift(T x)
{
if (_length == _size)
Resize();
Unshift(x);
}
public bool IsEmpty => _length == 0;
public int Length => _length;
private void Resize()
{
var new_arr_size = _size * 2;
var new_arr = new T[new_arr_size];
if (_tail < _head)
{
Array.Copy(_arr, _tail, new_arr, 0, _length);
_tail = 0;
_head = _length;
_arr = new_arr;
_size = new_arr_size;
}
else if (_tail > _head)
{
Array.Copy(_arr, _tail, new_arr, 0, _length - _tail);
Array.Copy(_arr, 0, new_arr, (_length - _tail), _head);
_tail = 0;
_head = _length;
_arr = new_arr;
_size = new_arr_size;
}
else
{
_tail = 0;
_head = 0;
_arr = new_arr;
_size = new_arr_size;
}
}
/// <summary>
/// Filers out all items where should_keep(itm) returns false
/// </summary>
/// <param name="should_keep"></param>
public void Cleanup(Func<T, bool> should_keep)
{
for (var idx = 0; idx < _length; idx++)
{
var v = Pop();
if (should_keep(v))
{
Unshift(v);
}
}
}
}
}

View File

@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class TakeTaskHandler<T> : Handler<Action<Box<T>>>
{
private readonly bool _blockable;
private readonly TaskCompletionSource<T> _tcs;
public TakeTaskHandler(TaskCompletionSource<T> tcs = null, bool blockable = true)
{
_blockable = blockable;
_tcs = tcs ?? new TaskCompletionSource<T>();
}
public bool IsActive => true;
public bool IsBlockable => _blockable;
public uint LockId => 0;
public Task<T> Task => _tcs.Task;
public Action<Box<T>> Commit()
{
return Handle;
}
private void Handle(Box<T> a)
{
if (a.IsSet)
_tcs.SetResult(a.Value);
_tcs.SetCanceled();
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Wabbajack.Common
{
public static class ChannelStreams
{
public static async Task IntoChannel<T>(this IEnumerable<T> coll, Channel<T> dest, bool closeAtEnd = false)
{
foreach (var itm in coll)
{
await dest.Writer.WriteAsync(itm);
}
if (closeAtEnd)
dest.Writer.Complete();
}
public static Channel<T> AsChannel<T>(this IEnumerable<T> coll)
{
var chan = Channel.CreateUnbounded<T>();
coll.IntoChannel(chan, true);
return chan;
}
public static async Task<IEnumerable<T>> ToIEnumerable<T>(this Channel<T> src)
{
var buffer = new List<T>();
while (true)
{
var result = await src.Reader.ReadAsync();
buffer.Add(result);
}
return buffer;
}
}
}

View File

@ -88,6 +88,20 @@
<Compile Include="BSDiff.cs" />
<Compile Include="ChildProcessTracker.cs" />
<Compile Include="Consts.cs" />
<Compile Include="CSP\AChannel.cs" />
<Compile Include="CSP\Box.cs" />
<Compile Include="CSP\Channel.cs" />
<Compile Include="CSP\CSPExtensions.cs" />
<Compile Include="CSP\FixedSizeBuffer.cs" />
<Compile Include="CSP\FnHandler.cs" />
<Compile Include="CSP\IBuffer.cs" />
<Compile Include="CSP\Handler.cs" />
<Compile Include="CSP\IChannel.cs" />
<Compile Include="CSP\EnumeratorBuffer.cs" />
<Compile Include="CSP\ManyToManyChannel.cs" />
<Compile Include="CSP\PutTaskHandler.cs" />
<Compile Include="CSP\RingBuffer.cs" />
<Compile Include="CSP\TakeTaskHandler.cs" />
<Compile Include="DynamicIniData.cs" />
<Compile Include="Error States\ErrorResponse.cs" />
<Compile Include="Error States\GetResponse.cs" />
@ -147,5 +161,6 @@
<Version>8.0.0</Version>
</PackageReference>
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>

View File

@ -0,0 +1,50 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Wabbajack.Common.CSP;
namespace Wabbajack.Test
{
[TestClass]
public class CSPTests
{
[TestMethod]
public async Task TestTakePutBlocking()
{
var channel = Channel.Create<int>();
var ptask = channel.Put(1);
Assert.AreEqual(1, await channel.Take());
Assert.IsTrue(await ptask);
}
[TestMethod]
public async Task TestTakePutBuffered()
{
var channel = Channel.Create<int>(10);
foreach (var itm in Enumerable.Range(0, 10))
await channel.Put(itm);
foreach (var itm in Enumerable.Range(0, 10))
Assert.AreEqual(itm, await channel.Take());
}
[TestMethod]
public async Task TestToChannel()
{
var channel = Enumerable.Range(0, 10).ToChannel();
foreach (var itm in Enumerable.Range(0, 10))
Assert.AreEqual(itm, await channel.Take());
}
[TestMethod]
public async Task TestTakeAll()
{
var results = await Enumerable.Range(0, 10).ToChannel().TakeAll();
Assert.AreEqual(Enumerable.Range(0, 10).ToList(), results);
}
}
}

View File

@ -0,0 +1,19 @@
using System;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Wabbajack.Common;
namespace Wabbajack.Test.Wabbajack.Common.Tests
{
[TestClass]
public class ChannelStreamsTests
{
[TestMethod]
public void ToAndFromChannel()
{
var src = Enumerable.Range(0, 10).ToList();
var result = src.AsChannel().ToIEnumerable();
Assert.AreEqual(src, result);
}
}
}

View File

@ -93,6 +93,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="ACompilerTest.cs" />
<Compile Include="CSPTests.cs" />
<Compile Include="DownloaderTests.cs" />
<Compile Include="EndToEndTests.cs" />
<Compile Include="Extensions.cs" />