Break the Channel interface into Read/Write/Close ports (#169)

This commit is contained in:
Timothy Baldridge 2019-11-11 22:14:04 -07:00 committed by GitHub
parent d9ca38cdff
commit 29e6d577d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 54 deletions

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public enum AsyncResult : int
{
/// <summary>
/// The channel was closed, so the returned value is meaningless
/// </summary>
Closed,
/// <summary>
/// The handler was canceled, so the returned value is meaningless
/// </summary>
Canceled,
/// <summary>
/// The callback was enqueued into the pending operations buffer, return value is useless
/// </summary>
Enqueued,
/// <summary>
/// The operation passed on the current thread, return the current value as the response value
/// </summary>
Completed
}
}

View File

@ -9,7 +9,7 @@ namespace Wabbajack.Common.CSP
{ {
public static class CSPExtensions public static class CSPExtensions
{ {
public static async Task OntoChannel<TIn, TOut>(this IEnumerable<TIn> coll, IChannel<TIn, TOut> chan) public static async Task OntoChannel<T>(this IEnumerable<T> coll, IWritePort<T> chan)
{ {
foreach (var val in coll) foreach (var val in coll)
{ {
@ -24,14 +24,14 @@ namespace Wabbajack.Common.CSP
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="coll">Collection to spool out of the channel.</param> /// <param name="coll">Collection to spool out of the channel.</param>
/// <returns></returns> /// <returns></returns>
public static IChannel<T, T> ToChannel<T>(this IEnumerable<T> coll) public static IReadPort<T> ToChannel<T>(this IEnumerable<T> coll)
{ {
var chan = Channel.Create(coll.GetEnumerator()); var chan = Channel.Create(coll.GetEnumerator());
chan.Close(); chan.Close();
return chan; return chan;
} }
public static IChannel<TOut, TOut> Select<TInSrc, TOutSrc, TOut>(this IChannel<TInSrc, TOutSrc> from, Func<TOutSrc, Task<TOut>> f, bool propagateClose = true) public static IReadPort<TOut> Select<TIn, TOut>(this IReadPort<TIn> from, Func<TIn, Task<TOut>> f, bool propagateClose = true)
{ {
var to = Channel.Create<TOut>(4); var to = Channel.Create<TOut>(4);
Task.Run(async () => Task.Run(async () =>
@ -81,9 +81,9 @@ namespace Wabbajack.Common.CSP
/// <typeparam name="TIn"></typeparam> /// <typeparam name="TIn"></typeparam>
/// <param name="chan"></param> /// <param name="chan"></param>
/// <returns></returns> /// <returns></returns>
public static async Task<List<TOut>> TakeAll<TOut, TIn>(this IChannel<TIn, TOut> chan) public static async Task<List<T>> TakeAll<T>(this IReadPort<T> chan)
{ {
List<TOut> acc = new List<TOut>(); List<T> acc = new List<T>();
while (true) while (true)
{ {
var (open, val) = await chan.Take(); var (open, val) = await chan.Take();
@ -106,7 +106,7 @@ namespace Wabbajack.Common.CSP
/// <param name="to">destination channel</param> /// <param name="to">destination channel</param>
/// <param name="closeOnFinished">Tf true, will close the other channel when one channel closes</param> /// <param name="closeOnFinished">Tf true, will close the other channel when one channel closes</param>
/// <returns></returns> /// <returns></returns>
public static async Task Pipe<TIn, TMid, TOut>(this IChannel<TIn, TMid> from, IChannel<TMid, TOut> to, bool closeOnFinished = true) public static async Task Pipe<T>(this IReadPort<T> from, IWritePort<T> to, bool closeOnFinished = true)
{ {
while (true) while (true)
{ {

View File

@ -6,36 +6,7 @@ using System.Threading.Tasks;
namespace Wabbajack.Common.CSP namespace Wabbajack.Common.CSP
{ {
public enum AsyncResult : int public interface IChannel<TIn, TOut> : IReadPort<TOut>, IWritePort<TIn>
{ {
/// <summary>
/// The channel was closed, so the returned value is meaningless
/// </summary>
Closed,
/// <summary>
/// The handler was canceled, so the returned value is meaningless
/// </summary>
Canceled,
/// <summary>
/// The callback was enqueued into the pending operations buffer, return value is useless
/// </summary>
Enqueued,
/// <summary>
/// The operation passed on the current thread, return the current value as the response value
/// </summary>
Completed
}
public interface IChannel<TIn, TOut>
{
bool IsClosed { get; }
void Close();
(AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler);
(AsyncResult, TOut) Take(Handler<Action<bool, TOut>> handler);
ValueTask<(bool, TOut)> Take(bool onCaller = true);
ValueTask<bool> Put(TIn val, bool onCaller = true);
} }
} }

View File

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public interface ICloseable
{
bool IsClosed { get; }
void Close();
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public interface IReadPort<TOut> : ICloseable
{
ValueTask<(bool, TOut)> Take(bool onCaller = true);
(AsyncResult, TOut) Take(Handler<Action<bool, TOut>> handler);
}
}

View File

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public interface IWritePort<TIn> : ICloseable
{
(AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler);
ValueTask<bool> Put(TIn val, bool onCaller = true);
}
}

View File

@ -29,17 +29,17 @@ namespace Wabbajack.Common.CSP
/// <param name="transform"></param> /// <param name="transform"></param>
/// <param name="propagateClose"></param> /// <param name="propagateClose"></param>
/// <returns></returns> /// <returns></returns>
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>( public static async Task UnorderedPipeline<TIn, TOut>(
this IChannel<TInSrc, TOutSrc> from, this IReadPort<TIn> from,
int parallelism, int parallelism,
IChannel<TInDest, TOutDest> to, IWritePort<TOut> to,
Func<IObservable<TOutSrc>, IObservable<TInDest>> transform, Func<IObservable<TIn>, IObservable<TOut>> transform,
bool propagateClose = true) bool propagateClose = true)
{ {
async Task Pump() async Task Pump()
{ {
var pipeline = new Subject<TOutSrc>(); var pipeline = new Subject<TIn>();
var buffer = new List<TInDest>(); var buffer = new List<TOut>();
var dest = transform(pipeline); var dest = transform(pipeline);
dest.Subscribe(itm => buffer.Add(itm)); dest.Subscribe(itm => buffer.Add(itm));
while (true) while (true)
@ -82,20 +82,20 @@ namespace Wabbajack.Common.CSP
} }
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>( public static async Task UnorderedPipeline<TIn, TOut>(
this IChannel<TInSrc, TOutSrc> from, this IReadPort<TIn> from,
IChannel<TInDest, TOutDest> to, IWritePort<TOut> to,
Func<TOutSrc, Task<TInDest>> f, Func<TIn, Task<TOut>> f,
bool propagateClose = true) bool propagateClose = true)
{ {
await UnorderedPipeline(from, Environment.ProcessorCount, to, f, propagateClose); await UnorderedPipeline(from, Environment.ProcessorCount, to, f, propagateClose);
} }
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>( public static async Task UnorderedPipeline<TIn, TOut>(
this IChannel<TInSrc, TOutSrc> from, this IReadPort<TIn> from,
int parallelism, int parallelism,
IChannel<TInDest, TOutDest> to, IWritePort<TOut> to,
Func<TOutSrc, Task<TInDest>> f, Func<TIn, Task<TOut>> f,
bool propagateClose = true) bool propagateClose = true)
{ {
async Task Pump() async Task Pump()
@ -121,11 +121,11 @@ namespace Wabbajack.Common.CSP
} }
public static async Task UnorderedThreadedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>( public static async Task UnorderedThreadedPipeline<TIn, TOut>(
this IChannel<TInSrc, TOutSrc> from, this IReadPort<TIn> from,
int parallelism, int parallelism,
IChannel<TInDest, TOutDest> to, IWritePort<TOut> to,
Func<TOutSrc, TInDest> f, Func<TIn, TOut> f,
bool propagateClose = true) bool propagateClose = true)
{ {
Task Pump() Task Pump()

View File

@ -57,6 +57,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="AChannel.cs" /> <Compile Include="AChannel.cs" />
<Compile Include="AsyncResult.cs" />
<Compile Include="Channel.cs" /> <Compile Include="Channel.cs" />
<Compile Include="Extensions.cs" /> <Compile Include="Extensions.cs" />
<Compile Include="EnumeratorBuffer.cs" /> <Compile Include="EnumeratorBuffer.cs" />
@ -64,6 +65,9 @@
<Compile Include="Handler.cs" /> <Compile Include="Handler.cs" />
<Compile Include="IBuffer.cs" /> <Compile Include="IBuffer.cs" />
<Compile Include="IChannel.cs" /> <Compile Include="IChannel.cs" />
<Compile Include="ICloseable.cs" />
<Compile Include="IReadPort.cs" />
<Compile Include="IWritePort.cs" />
<Compile Include="ManyToManyChannel.cs" /> <Compile Include="ManyToManyChannel.cs" />
<Compile Include="PIpelines.cs" /> <Compile Include="PIpelines.cs" />
<Compile Include="PutTaskHandler.cs" /> <Compile Include="PutTaskHandler.cs" />