using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading.Tasks; namespace Wabbajack.Common.CSP { public class RxBuffer : LinkedList, IBuffer { private Subject _inputSubject; private IObservable _outputObservable; private bool _completed; private int _maxSize; public RxBuffer(int size, Func, IObservable> transform) : base() { _maxSize = size; _inputSubject = new Subject(); _outputObservable = transform(_inputSubject); _outputObservable.Subscribe(itm => AddFirst(itm), () => { _completed = true; }); } public bool TransformAdd(TIn val) { _inputSubject.OnNext(val); return _completed; } public static bool TransformAdd(IBuffer buf, TIn itm) { return ((RxBuffer) buf).TransformAdd(itm); } public void Finalize() { _inputSubject.OnCompleted(); } public void Dispose() { throw new NotImplementedException(); } public static void Finalize(IBuffer buf) { ((RxBuffer)buf).Finalize(); } public bool IsFull => Count >= _maxSize; public bool IsEmpty => Count == 0; public TOut Remove() { var ret = Last.Value; RemoveLast(); return ret; } public void Add(TOut itm) { } } }