using System; using System.Collections.Generic; using System.Data; using System.Linq; using System.Reactive.Linq; namespace Wabbajack.Common.CSP { public static class Channel { /// /// Creates a channel without a buffer, and with no conversion function. This provides a syncronization /// point, where all puts are matched 1:1 with takes. /// /// /// The type of values transferred by the channel /// A new channel public static IChannel Create() { return new ManyToManyChannel(x => x); } /// /// Creates a channel with a given enumerator as the starting buffer. Values will not be puttable into this channel /// and it will start closed. This is a easy way to spool a collection onto a channel. Note: the enumerator will be /// run inside the channel's lock, so it may not be wise to pass in an enumerator that performs heavy computation. /// /// A IEnumerator to use as the contents of the channel /// The type of values transferred by the channel /// A new channel public static IChannel Create(IEnumerator e) { var chan = new ManyToManyChannel(x => x, (_, __) => false, _ => {}, new EnumeratorBuffer(e)); chan.Close(); return chan; } public static IChannel Create(int buffer_size) { var buffer = new FixedSizeBuffer(buffer_size); return new ManyToManyChannel(x => x, (buff, itm) => { buff.Add(itm); return false; }, b => {}, buffer); } public static IChannel Create(int buffer_size, Func, IObservable> transform) { var buf = new RxBuffer(buffer_size, transform); return ChannelForRxBuf(buf); } private static ManyToManyChannel ChannelForRxBuf(RxBuffer buf) { return new ManyToManyChannel(null, RxBuffer.TransformAdd, (final) => ((RxBuffer)final).Dispose(), buf); } /// /// Creates a channel that discards every value /// /// /// public static IChannel CreateSink() { var buf = new RxBuffer(1, e => e.Where(itm => false)); return ChannelForRxBuf(buf); } } }