using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Wabbajack.Common.CSP { public static class CSPExtensions { public static async Task OntoChannel(this IEnumerable coll, IWritePort chan) { foreach (var val in coll) { if (!await chan.Put(val)) break; } } /// /// Turns a IEnumerable collection into a channel. Note, computation of the enumerable will happen inside /// the lock of the channel, so try to keep the work of the enumerable light. /// /// /// Collection to spool out of the channel. /// public static IReadPort ToChannel(this IEnumerable coll) { var chan = Channel.Create(coll.GetEnumerator()); chan.Close(); return chan; } public static IReadPort Select(this IReadPort from, Func> f, bool propagateClose = true) { var to = Channel.Create(4); Task.Run(async () => { try { while (true) { var (is_open_src, val) = await from.Take(); if (!is_open_src) break; var is_open_dest = await to.Put(await f(val)); if (!is_open_dest) break; } } finally { if (propagateClose) { from.Close(); to.Close(); } } }); return to; } public static async Task UnorderedParallelDo(this IEnumerable coll, Func f) { var sink = Channel.CreateSink(); await coll.ToChannel() .UnorderedPipeline(Environment.ProcessorCount, sink, async itm => { await f(itm); return true; }); } /// /// Takes all the values from chan, once the channel closes returns a List of the values taken. /// /// /// /// /// public static async Task> TakeAll(this IReadPort chan) { List acc = new List(); while (true) { var (open, val) = await chan.Take(); if (!open) break; acc.Add(val); } return acc; } /// /// Pipes values from `from` into `to` /// /// /// /// /// source channel /// destination channel /// Tf true, will close the other channel when one channel closes /// public static async Task Pipe(this IReadPort from, IWritePort to, bool closeOnFinished = true) { while (true) { var (isFromOpen, val) = await from.Take(); if (isFromOpen) { var isToOpen = await to.Put(val); if (isToOpen) continue; if (closeOnFinished) @from.Close(); break; } if (closeOnFinished) to.Close(); break; } } public static Task ThreadedTask(Func action) { var src = new TaskCompletionSource(); var th = new Thread(() => { try { src.SetResult(action()); } catch (Exception ex) { src.SetException(ex); } }) {Priority = ThreadPriority.BelowNormal}; th.Start(); return src.Task; } public static Task ThreadedTask(Action action) { var src = new TaskCompletionSource(); var th = new Thread(() => { try { action(); src.SetResult(true); } catch (Exception ex) { src.SetException(ex); } }) { Priority = ThreadPriority.BelowNormal }; th.Start(); return src.Task; } } }