Refactored to use ValueTask and lazy task creation

This commit is contained in:
Timothy Baldridge 2019-11-09 07:49:00 -07:00
parent a59e39deaa
commit 67dfaa3581
10 changed files with 212 additions and 60 deletions

View File

@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
@ -10,27 +12,73 @@ namespace Wabbajack.Common.CSP
{
public abstract bool IsClosed { get; }
public abstract void Close();
public abstract Box<bool> Put(TIn val, Handler<Action<bool>> handler);
public abstract Box<TOut> Take(Handler<Action<Box<TOut>>> handler);
public abstract (AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler);
public abstract (AsyncResult, TOut) Take(Handler<Action<TOut>> handler);
public Task<TOut> Take(bool onCaller)
private Task<(bool, TOut)> _take_cancelled_task;
private Task<(bool, TOut)> TakeCancelledTask
{
var tcs = new TaskCompletionSource<TOut>();
var handler = new TakeTaskHandler<TOut>(tcs);
var result = Take(handler);
if (result.IsSet)
tcs.SetResult(result.Value);
return handler.Task;
get
{
if (_take_cancelled_task == null)
_take_cancelled_task = Task.FromCanceled<(bool, TOut)>(CancellationToken.None);
return _take_cancelled_task;
}
}
public Task<bool> Put(TIn val, bool onCaller)
private Task<bool> _put_cancelled_task;
private Task<bool> PutCancelledTask
{
var tcs = new TaskCompletionSource<bool>();
var handler = new PutTaskHandler<bool>(tcs);
var result = Put(val, handler);
if (result.IsSet)
tcs.SetResult(result.Value);
return tcs.Task;
get
{
if (_put_cancelled_task == null)
_put_cancelled_task = Task.FromCanceled<bool>(CancellationToken.None);
return _put_cancelled_task;
}
}
public ValueTask<(bool, TOut)> Take(bool onCaller)
{
var handler = new TakeTaskHandler<TOut>();
var (resultType, val) = Take(handler);
switch (resultType)
{
case AsyncResult.Closed:
return new ValueTask<(bool, TOut)>((false, default));
case AsyncResult.Completed:
return new ValueTask<(bool, TOut)>((true, val));
case AsyncResult.Enqueued:
return new ValueTask<(bool, TOut)>(handler.TaskCompletionSource.Task);
case AsyncResult.Canceled:
return new ValueTask<(bool, TOut)>(TakeCancelledTask);
default:
// Should never happen
throw new InvalidDataException();
}
}
public ValueTask<bool> Put(TIn val, bool onCaller)
{
var handler = new PutTaskHandler<bool>();
var (resultType, putResult) = Put(val, handler);
switch (resultType)
{
case AsyncResult.Completed:
return new ValueTask<bool>(putResult);
case AsyncResult.Canceled:
return new ValueTask<bool>(PutCancelledTask);
case AsyncResult.Closed:
return new ValueTask<bool>(false);
case AsyncResult.Enqueued:
return new ValueTask<bool>(handler.TaskCompletionSource.Task);
default:
// Should never happen
throw new InvalidDataException();
}
}

View File

@ -0,0 +1,42 @@
### Overview of CSP (Communicating Sequential Processes) for the C# programmer
#### What is CSP?
Communicating Sequential processes is a programming model invented in 1978 by Tony Hoare, who described a process
of computation where hundreds or thousands of small processes communicate via channels. Think of this process like
a assembly line. Each worker in the factory is a process, and the conveyor belts are the channels. The workers don't need
to know where a part came from, or where it's going, they simply take one item off the belt, perform an operation and pass
the item on to another belt. This analogy works quite well, and the following observations about a factory also apply to
CSP:
* Multiple workers can pull from the same belt (channel/queue)
* Multiple workers can put work onto the belt
* Belts can buffer items, for slow consumers, but at some point they backup and block the writer
* A worker can pull/push to multiple belts.
#### What does this look like in C#?
The basic unit of CSP in this library is the channel:
```
var chan = Channel.Create()
```
Without any other parameters this creates a channel with a size of 0, so every pending put must be matched
1:1 with a take. This creates a syncronization point. Channels are fully async and thread-safe:
```
public async Task TestTakePutBlocking()
{
var channel = Channel.Create<int>();
// Channel size is 0, so we can't await, because we'd never complete
var ptask = channel.Put(1);
// Now the put is dispatched to the scheduler because we've taken the value
var (open, val) = await channel.Take();
Assert.AreEqual(1, val);
Assert.IsTrue(open);
Assert.IsTrue(await ptask);
}
```

View File

@ -34,7 +34,11 @@ namespace Wabbajack.Common.CSP
List<TOut> acc = new List<TOut>();
while (true)
{
acc.Add(await chan.Take());
var (open, val) = await chan.Take();
if (!open) break;
acc.Add(val);
}
return acc;
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace Wabbajack.Common.CSP
{
@ -29,4 +30,5 @@ namespace Wabbajack.Common.CSP
/// <returns>A callback</returns>
T Commit();
}
}

View File

@ -6,14 +6,36 @@ using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public enum AsyncResult : int
{
/// <summary>
/// The channel was closed, so the returned value is meaningless
/// </summary>
Closed,
/// <summary>
/// The handler was canceled, so the returned value is meaningless
/// </summary>
Canceled,
/// <summary>
/// The callback was enqueued into the pending operations buffer, return value is useless
/// </summary>
Enqueued,
/// <summary>
/// The operation passed on the current thread, return the current value as the response value
/// </summary>
Completed
}
public interface IChannel<TIn, TOut>
{
bool IsClosed { get; }
void Close();
Box<bool> Put(TIn val, Handler<Action<bool>> handler);
Box<TOut> Take(Handler<Action<Box<TOut>>> handler);
Task<TOut> Take(bool onCaller = true);
Task<bool> Put(TIn val, bool onCaller = true);
(AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler);
(AsyncResult, TOut) Take(Handler<Action<TOut>> handler);
ValueTask<(bool, TOut)> Take(bool onCaller = true);
ValueTask<bool> Put(TIn val, bool onCaller = true);
}
}

View File

@ -23,7 +23,7 @@ namespace Wabbajack.Common.CSP
{
public const int MAX_QUEUE_SIZE = 1024;
private RingBuffer<Handler<Action<Box<TOut>>>> _takes = new RingBuffer<Handler<Action<Box<TOut>>>>(8);
private RingBuffer<Handler<Action<TOut>>> _takes = new RingBuffer<Handler<Action<TOut>>>(8);
private RingBuffer<(Handler<Action<bool>>, TIn)> _puts = new RingBuffer<(Handler<Action<bool>>, TIn)>(8);
private IBuffer<TOut> _buf;
private Func<IBuffer<TOut>, TIn, bool> _add;
@ -47,7 +47,7 @@ namespace Wabbajack.Common.CSP
_converter = converter;
}
private static bool IsActiveTake(Handler<Action<Box<TOut>>> handler)
private static bool IsActiveTake(Handler<Action<TOut>> handler)
{
return handler.IsActive;
}
@ -57,13 +57,19 @@ namespace Wabbajack.Common.CSP
return input.Item1.IsActive;
}
public override Box<bool> Put(TIn val, Handler<Action<bool>> handler)
/// <summary>
/// Tries to put a put into the channel
/// </summary>
/// <param name="val"></param>
/// <param name="handler"></param>
/// <returns>(result_type, w)</returns>
public override (AsyncResult, bool) Put(TIn val, Handler<Action<bool>> handler)
{
Monitor.Enter(this);
if (_isClosed)
{
Monitor.Exit(this);
return new Box<bool>(false);
return (AsyncResult.Completed, false);
}
if (_buf != null && !_buf.IsFull && !_takes.IsEmpty)
@ -80,7 +86,7 @@ namespace Wabbajack.Common.CSP
Monitor.Exit(this);
foreach (var action in take_cbs)
{
Task.Run(() => action());
Task.Run(action);
}
}
else
@ -88,12 +94,13 @@ namespace Wabbajack.Common.CSP
if (is_done)
Abort();
Monitor.Exit(this);
return Box<bool>.Empty;
return (AsyncResult.Closed, false);
}
return new Box<bool>(true);
return (AsyncResult.Completed, true);
}
Monitor.Exit(this);
return Box<bool>.Empty;
return (AsyncResult.Canceled, false);
}
var (put_cb2, take_cb) = GetCallbacks(handler, _takes);
@ -101,8 +108,8 @@ namespace Wabbajack.Common.CSP
if (put_cb2 != null && take_cb != null)
{
Monitor.Exit(this);
Task.Run(() => take_cb(new Box<TOut>(_converter(val))));
return new Box<bool>(true);
Task.Run(() => take_cb(_converter(val)));
return (AsyncResult.Completed, true);
}
if (_buf != null && !_buf.IsFull)
@ -114,10 +121,10 @@ namespace Wabbajack.Common.CSP
Abort();
}
Monitor.Exit(this);
return new Box<bool>(true);
return (AsyncResult.Completed, true);
}
Monitor.Exit(this);
return Box<bool>.Empty;
return (AsyncResult.Canceled, true);
}
if (handler.IsActive && handler.IsBlockable)
@ -130,10 +137,10 @@ namespace Wabbajack.Common.CSP
_puts.Unshift((handler, val));
}
Monitor.Exit(this);
return Box<bool>.Empty;
return (AsyncResult.Enqueued, true);
}
public override Box<TOut> Take(Handler<Action<Box<TOut>>> handler)
public override (AsyncResult, TOut) Take(Handler<Action<TOut>> handler)
{
Monitor.Enter(this);
Cleanup();
@ -154,11 +161,11 @@ namespace Wabbajack.Common.CSP
foreach (var cb in cbs)
Task.Run(() => cb(true));
return new Box<TOut>(val);
return (AsyncResult.Completed, val);
}
Monitor.Exit(this);
return Box<TOut>.Empty;
return (AsyncResult.Canceled, default);
}
var (take_cb2, put_cb, val2, found) = FindMatchingPut(handler);
@ -167,7 +174,7 @@ namespace Wabbajack.Common.CSP
{
Monitor.Exit(this);
Task.Run(() => put_cb(true));
return new Box<TOut>(_converter(val2));
return (AsyncResult.Completed, _converter(val2));
}
if (_isClosed)
@ -182,10 +189,10 @@ namespace Wabbajack.Common.CSP
{
var val = has_val ? _buf.Remove() : default;
Monitor.Exit(this);
return has_val ? new Box<TOut>(val) : Box<TOut>.Empty;
return has_val ? (AsyncResult.Completed, val) : (AsyncResult.Closed, default);
}
Monitor.Exit(this);
return Box<TOut>.Empty;
return (AsyncResult.Closed, default);
}
if (handler.IsBlockable)
@ -196,7 +203,7 @@ namespace Wabbajack.Common.CSP
}
Monitor.Exit(this);
return Box<TOut>.Empty;
return (AsyncResult.Enqueued, default);
}
public override bool IsClosed => _isClosed;
@ -216,7 +223,7 @@ namespace Wabbajack.Common.CSP
_finalize(_buf);
}
private (Action<Box<TOut>>, Action<bool>, TIn, bool) FindMatchingPut(Handler<Action<Box<TOut>>> handler)
private (Action<TOut>, Action<bool>, TIn, bool) FindMatchingPut(Handler<Action<TOut>> handler)
{
while (!_puts.IsEmpty)
{
@ -319,7 +326,7 @@ namespace Wabbajack.Common.CSP
if (take_cp != null)
{
var val = _buf.Remove();
ret.Add(() => take_cp(new Box<TOut>(val)));
ret.Add(() => take_cp(val));
}
}

View File

@ -9,12 +9,21 @@ namespace Wabbajack.Common.CSP
class PutTaskHandler<T> : Handler<Action<bool>>
{
private readonly bool _blockable;
private readonly TaskCompletionSource<bool> _tcs;
private TaskCompletionSource<bool> _tcs;
public PutTaskHandler(TaskCompletionSource<bool> tcs, bool blockable = true)
public PutTaskHandler(bool blockable = true)
{
_blockable = blockable;
_tcs = tcs ?? new TaskCompletionSource<bool>();
}
public TaskCompletionSource<bool> TaskCompletionSource
{
get
{
if (_tcs == null)
_tcs = new TaskCompletionSource<bool>();
return _tcs;
}
}
public bool IsActive => true;

View File

@ -7,32 +7,39 @@ using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public class TakeTaskHandler<T> : Handler<Action<Box<T>>>
public class TakeTaskHandler<T> : Handler<Action<T>>
{
private readonly bool _blockable;
private readonly TaskCompletionSource<T> _tcs;
private TaskCompletionSource<(bool, T)> _tcs;
public TakeTaskHandler(TaskCompletionSource<T> tcs = null, bool blockable = true)
{
_blockable = blockable;
_tcs = tcs ?? new TaskCompletionSource<T>();
}
public TaskCompletionSource<(bool, T)> TaskCompletionSource
{
get
{
if (_tcs == null)
_tcs = new TaskCompletionSource<(bool, T)>();
return _tcs;
}
}
public bool IsActive => true;
public bool IsBlockable => _blockable;
public uint LockId => 0;
public Task<T> Task => _tcs.Task;
public Action<Box<T>> Commit()
public Task<(bool, T)> Task => TaskCompletionSource.Task;
public Action<T> Commit()
{
return Handle;
}
private void Handle(Box<T> a)
private void Handle(T a)
{
if (a.IsSet)
_tcs.SetResult(a.Value);
_tcs.SetCanceled();
TaskCompletionSource.SetResult((true, a));
}
}
}

View File

@ -121,6 +121,7 @@
<EmbeddedResource Include="7z.dll.gz" />
<EmbeddedResource Include="7z.exe.gz" />
<None Include="app.config" />
<None Include="CSP\CSP Readme.md" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Compression.BSA\Compression.BSA.csproj">

View File

@ -14,9 +14,11 @@ namespace Wabbajack.Test
{
var channel = Channel.Create<int>();
var ptask = channel.Put(1);
Assert.AreEqual(1, await channel.Take());
Assert.IsTrue(await ptask);
var (open, val) = await channel.Take();
Assert.AreEqual(1, val);
Assert.IsTrue(open);
Assert.IsTrue(await ptask);
}
[TestMethod]
@ -27,7 +29,11 @@ namespace Wabbajack.Test
await channel.Put(itm);
foreach (var itm in Enumerable.Range(0, 10))
Assert.AreEqual(itm, await channel.Take());
{
var (is_open, val) = await channel.Take();
Assert.AreEqual(itm, val);
Assert.IsTrue(is_open);
}
}
[TestMethod]
@ -36,7 +42,11 @@ namespace Wabbajack.Test
var channel = Enumerable.Range(0, 10).ToChannel();
foreach (var itm in Enumerable.Range(0, 10))
Assert.AreEqual(itm, await channel.Take());
{
var (is_open, val) = await channel.Take();
Assert.AreEqual(itm, val);
Assert.IsTrue(is_open);
}
}
[TestMethod]
@ -44,7 +54,7 @@ namespace Wabbajack.Test
{
var results = await Enumerable.Range(0, 10).ToChannel().TakeAll();
Assert.AreEqual(Enumerable.Range(0, 10).ToList(), results);
CollectionAssert.AreEqual(Enumerable.Range(0, 10).ToList(), results);
}
}
}