using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace Wabbajack.Common { public static class AsyncParallelExtensions { public static IAsyncEnumerable PMap(this IEnumerable coll, ParallelOptions options, Func> mapFn) { var queue = Channel.CreateBounded(options.MaxDegreeOfParallelism); Parallel.ForEachAsync(coll, options, async (x, token) => { var result = await mapFn(x); await queue.Writer.WriteAsync(result, token); }).ContinueWith(async t => { queue.Writer.TryComplete(); }).FireAndForget(); return queue.Reader.ReadAllAsync(); } public static async Task PDo(this IEnumerable coll, ParallelOptions options, Func mapFn) { await Parallel.ForEachAsync(coll, options, async (x, token) => await mapFn(x)); } public static async Task PDoAll(this IEnumerable coll, Func mapFn) { var tasks = coll.Select(mapFn).ToList(); await Task.WhenAll(tasks); } public static async IAsyncEnumerable PMapAll(this IEnumerable coll, Func> mapFn) { var tasks = coll.Select(mapFn).ToList(); foreach (var itm in tasks) { yield return await itm; } } public static async Task> ToList(this IAsyncEnumerable coll) { List lst = new(); await foreach (var itm in coll) lst.Add(itm); return lst; } public static async Task ToArray(this IAsyncEnumerable coll) { List lst = new(); await foreach (var itm in coll) lst.Add(itm); return lst.ToArray(); } public static async Task> ToReadOnlyCollection(this IAsyncEnumerable coll) { List lst = new(); await foreach (var itm in coll) lst.Add(itm); return lst; } public static async Task> ToHashSet(this IAsyncEnumerable coll, Predicate? filter = default) { HashSet lst = new(); if (filter == default) { await foreach (var itm in coll) lst.Add(itm); } else { await foreach (var itm in coll.Where(filter)) lst.Add(itm); } return lst; } public static async Task Do(this IAsyncEnumerable coll, Func fn) { await foreach (var itm in coll) await fn(itm); } public static async Task Do(this IAsyncEnumerable coll, Action fn) { await foreach (var itm in coll) fn(itm); } public static async Task> ToDictionary(this IAsyncEnumerable coll, Func kSelector) where TK : notnull { Dictionary dict = new(); await foreach (var itm in coll) dict.Add(kSelector(itm), itm); return dict; } public static async Task> ToDictionary(this IAsyncEnumerable coll, Func kSelector, Func vSelector) where TK : notnull { Dictionary dict = new(); await foreach (var itm in coll) dict.Add(kSelector(itm), vSelector(itm)); return dict; } public static async IAsyncEnumerable Where(this IAsyncEnumerable coll, Predicate p) { await foreach (var itm in coll) if (p(itm)) yield return itm; } public static async IAsyncEnumerable SelectAsync(this IEnumerable coll, Func> fn) { foreach (var itm in coll) yield return await fn(itm); } public static async IAsyncEnumerable SelectMany(this IEnumerable coll, Func>> fn) { foreach (var itm in coll) foreach (var inner in await fn(itm)) yield return inner; } public static async IAsyncEnumerable Select(this IAsyncEnumerable coll, Func> fn) { await foreach (var itm in coll) yield return await fn(itm); } public static async IAsyncEnumerable SelectMany(this IAsyncEnumerable coll, Func> fn) { await foreach (var itm in coll) foreach (var inner in fn(itm)) yield return inner; } } }