using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; namespace Wabbajack.Common.CSP { public static class Pipelines { /// /// Creates a pipeline that takes items from `from` transforms them with the pipeline given by `transform` and puts /// the resulting values onto `to`. The pipeline may create 0 or more items for every input item and they will be /// spooled onto `to` in a undefined order. `n` determines how many parallel tasks will be running at once. Each of /// these tasks maintains its own transformation pipeline, so `transform` will be called once for every `n`. Completing /// a `transform` pipeline has no effect. /// /// /// /// /// /// /// /// /// /// /// public static async Task UnorderedPipeline( this IReadPort from, int parallelism, IWritePort to, Func, IObservable> transform, bool propagateClose = true) { async Task Pump() { var pipeline = new Subject(); var buffer = new List(); var dest = transform(pipeline); dest.Subscribe(itm => buffer.Add(itm)); while (true) { var (is_open, tval) = await from.Take(); if (is_open) { pipeline.OnNext(tval); foreach (var pval in buffer) { var is_put_open = await to.Put(pval); if (is_put_open) continue; if (propagateClose) @from.Close(); return; } buffer.Clear(); } else { pipeline.OnCompleted(); if (buffer.Count > 0) { foreach (var pval in buffer) if (!await to.Put(pval)) break; } break; } } } await Task.WhenAll(Enumerable.Range(0, parallelism) .Select(idx => Task.Run(Pump))); if (propagateClose) { from.Close(); to.Close(); } } public static async Task UnorderedPipeline( this IReadPort from, IWritePort to, Func> f, bool propagateClose = true) { await UnorderedPipeline(from, Environment.ProcessorCount, to, f, propagateClose); } public static async Task UnorderedPipeline( this IReadPort from, int parallelism, IWritePort to, Func> f, bool propagateClose = true) { async Task Pump() { while (true) { var (is_open, job) = await from.Take(); if (!is_open) break; var putIsOpen = await to.Put(await f(job)); if (!putIsOpen) return; } } await Task.WhenAll(Enumerable.Range(0, parallelism) .Select(idx => Task.Run(Pump))); if (propagateClose) { from.Close(); to.Close(); } } public static IReadPort UnorderedPipelineRx( this IReadPort from, Func, IObservable> f, bool propagateClose = true) { var parallelism = Environment.ProcessorCount; var to = Channel.Create(parallelism * 2); var pipeline = from.UnorderedPipeline(parallelism, to, f); return to; } public static IReadPort UnorderedPipelineSync( this IReadPort from, Func f, bool propagateClose = true) { var parallelism = Environment.ProcessorCount; var to = Channel.Create(parallelism * 2); async Task Pump() { while (true) { var (is_open, job) = await from.Take(); if (!is_open) break; try { var putIsOpen = await to.Put(f(job)); if (!putIsOpen) return; } catch (Exception) { // ignored } } } Task.Run(async () => { await Task.WhenAll(Enumerable.Range(0, parallelism) .Select(idx => Task.Run(Pump))); if (propagateClose) { from.Close(); to.Close(); } }); return to; } public static async Task UnorderedThreadedPipeline( this IReadPort from, int parallelism, IWritePort to, Func f, bool propagateClose = true) { Task Pump() { var tcs = new TaskCompletionSource(); var th = new Thread(() => { while (true) { var (is_open, job) = from.Take().Result; if (!is_open) break; var putIsOpen = to.Put(f(job)).Result; if (!putIsOpen) return; } tcs.SetResult(true); }) {Priority = ThreadPriority.BelowNormal}; th.Start(); return tcs.Task; } await Task.WhenAll(Enumerable.Range(0, parallelism) .Select(idx => Task.Run(Pump))); if (propagateClose) { from.Close(); to.Close(); } } } }