Other variants of pipelines

This commit is contained in:
Timothy Baldridge 2019-11-10 15:36:04 -07:00
parent f66427c2ea
commit 6639e9946b
4 changed files with 193 additions and 70 deletions

View File

@ -103,76 +103,6 @@ namespace Wabbajack.Common.CSP
}
}*/
/// <summary>
/// Creates a pipeline that takes items from `from` transforms them with the pipeline given by `transform` and puts
/// the resulting values onto `to`. The pipeline may create 0 or more items for every input item and they will be
/// spooled onto `to` in a undefined order. `n` determines how many parallel tasks will be running at once. Each of
/// these tasks maintains its own transformation pipeline, so `transform` will be called once for every `n`. Completing
/// a `transform` pipeline has no effect.
/// </summary>
/// <typeparam name="TInSrc"></typeparam>
/// <typeparam name="TOutSrc"></typeparam>
/// <typeparam name="TInDest"></typeparam>
/// <typeparam name="TOutDest"></typeparam>
/// <param name="from"></param>
/// <param name="parallelism"></param>
/// <param name="to"></param>
/// <param name="transform"></param>
/// <param name="propagateClose"></param>
/// <returns></returns>
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<IObservable<TOutSrc>, IObservable<TInDest>> transform,
bool propagateClose = true)
{
async Task Pump()
{
var pipeline = new Subject<TOutSrc>();
var buffer = new List<TInDest>();
var dest = transform(pipeline);
dest.Subscribe(itm => buffer.Add(itm));
while (true)
{
var (is_open, tval) = await from.Take();
if (is_open)
{
pipeline.OnNext(tval);
foreach (var pval in buffer)
{
var is_put_open = await to.Put(pval);
if (is_put_open) continue;
if (propagateClose) @from.Close();
return;
}
buffer.Clear();
}
else
{
pipeline.OnCompleted();
if (buffer.Count > 0)
{
foreach (var pval in buffer)
if (!await to.Put(pval))
break;
}
break;
}
}
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
}
}

View File

@ -0,0 +1,153 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Wabbajack.Common.CSP
{
public static class Pipelines
{
/// <summary>
/// Creates a pipeline that takes items from `from` transforms them with the pipeline given by `transform` and puts
/// the resulting values onto `to`. The pipeline may create 0 or more items for every input item and they will be
/// spooled onto `to` in a undefined order. `n` determines how many parallel tasks will be running at once. Each of
/// these tasks maintains its own transformation pipeline, so `transform` will be called once for every `n`. Completing
/// a `transform` pipeline has no effect.
/// </summary>
/// <typeparam name="TInSrc"></typeparam>
/// <typeparam name="TOutSrc"></typeparam>
/// <typeparam name="TInDest"></typeparam>
/// <typeparam name="TOutDest"></typeparam>
/// <param name="from"></param>
/// <param name="parallelism"></param>
/// <param name="to"></param>
/// <param name="transform"></param>
/// <param name="propagateClose"></param>
/// <returns></returns>
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<IObservable<TOutSrc>, IObservable<TInDest>> transform,
bool propagateClose = true)
{
async Task Pump()
{
var pipeline = new Subject<TOutSrc>();
var buffer = new List<TInDest>();
var dest = transform(pipeline);
dest.Subscribe(itm => buffer.Add(itm));
while (true)
{
var (is_open, tval) = await from.Take();
if (is_open)
{
pipeline.OnNext(tval);
foreach (var pval in buffer)
{
var is_put_open = await to.Put(pval);
if (is_put_open) continue;
if (propagateClose) @from.Close();
return;
}
buffer.Clear();
}
else
{
pipeline.OnCompleted();
if (buffer.Count > 0)
{
foreach (var pval in buffer)
if (!await to.Put(pval))
break;
}
break;
}
}
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
public static async Task UnorderedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<TOutSrc, Task<TInDest>> f,
bool propagateClose = true)
{
async Task Pump()
{
while (true)
{
var (is_open, job) = await from.Take();
if (!is_open) break;
var putIsOpen = await to.Put(await f(job));
if (!putIsOpen) return;
}
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
public static async Task UnorderedThreadedPipeline<TInSrc, TOutSrc, TInDest, TOutDest>(
this IChannel<TInSrc, TOutSrc> from,
int parallelism,
IChannel<TInDest, TOutDest> to,
Func<TOutSrc, TInDest> f,
bool propagateClose = true)
{
Task Pump()
{
var tcs = new TaskCompletionSource<bool>();
var th = new Thread(() =>
{
while (true)
{
var (is_open, job) = from.Take().Result;
if (!is_open) break;
var putIsOpen = to.Put(f(job)).Result;
if (!putIsOpen) return;
}
tcs.SetResult(true);
}) {Priority = ThreadPriority.BelowNormal};
th.Start();
return tcs.Task;
}
await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
}
}
}

View File

@ -99,6 +99,7 @@
<Compile Include="CSP\IChannel.cs" />
<Compile Include="CSP\EnumeratorBuffer.cs" />
<Compile Include="CSP\ManyToManyChannel.cs" />
<Compile Include="CSP\Pipelines.cs" />
<Compile Include="CSP\PutTaskHandler.cs" />
<Compile Include="CSP\RingBuffer.cs" />
<Compile Include="CSP\RxBuffer.cs" />

View File

@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Alphaleonis.Win32.Filesystem;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@ -144,6 +145,44 @@ namespace Wabbajack.Test.CSP
await finished;
}
[TestMethod]
public async Task UnorderedTaskPipeline()
{
// Do it a hundred times to try and catch rare deadlocks
var o = Channel.Create<int>(3);
var finished = Enumerable.Range(0, 1024)
.ToChannel()
.UnorderedPipeline(4, o, async v =>
{
await Task.Delay(1);
return v;
});
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 1024).ToList();
CollectionAssert.AreEqual(expected, results);
await finished;
}
[TestMethod]
public async Task UnorderedThreadPipeline()
{
// Do it a hundred times to try and catch rare deadlocks
var o = Channel.Create<int>(3);
var finished = Enumerable.Range(0, 1024)
.ToChannel()
.UnorderedThreadedPipeline(4, o, v =>
{
Thread.Sleep(1);
return v;
});
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 1024).ToList();
CollectionAssert.AreEqual(expected, results);
await finished;
}
[TestMethod]
public async Task ChannelStressTest()
{