using System; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Threading.Tasks; namespace Wabbajack { public static class RxExt { /// /// Convenience function that discards events that are null /// /// /// /// Source events that are not null public static IObservable NotNull(this IObservable source) where T : class { return source.Where(u => u != null); } /// /// Converts any observable to type Unit. Useful for when you care that a signal occurred, /// but don't care about what its value is downstream. /// /// An observable that returns Unit anytime the source signal fires an event. public static IObservable Unit(this IObservable source) { return source.Select(_ => System.Reactive.Unit.Default); } /// /// Convenience operator to subscribe to the source observable, only when a second "switch" observable is on. /// When the switch is on, the source will be subscribed to, and its updates passed through. /// When the switch is off, the subscription to the source observable will be stopped, and no signal will be published. /// /// Source observable to subscribe to if on /// On/Off signal of whether to subscribe to source observable /// Observable that publishes data from source, if the switch is on. public static IObservable FlowSwitch(this IObservable source, IObservable filterSwitch) { return filterSwitch .DistinctUntilChanged() .Select(on => { if (on) { return source; } else { return Observable.Empty(); } }) .Switch(); } /// /// Convenience operator to subscribe to the source observable, only when a second "switch" observable is on. /// When the switch is on, the source will be subscribed to, and its updates passed through. /// When the switch is off, the subscription to the source observable will be stopped, and no signal will be published. /// /// Source observable to subscribe to if on /// On/Off signal of whether to subscribe to source observable /// Value to fire when switching off /// Observable that publishes data from source, if the switch is on. public static IObservable FlowSwitch(this IObservable source, IObservable filterSwitch, T valueWhenOff) { return filterSwitch .DistinctUntilChanged() .Select(on => { if (on) { return source; } else { return Observable.Return(valueWhenOff); } }) .Switch(); } /// Inspiration: /// http://reactivex.io/documentation/operators/debounce.html /// https://stackoverflow.com/questions/20034476/how-can-i-use-reactive-extensions-to-throttle-events-using-a-max-window-size public static IObservable Debounce(this IObservable source, TimeSpan interval, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return Observable.Create(o => { var hasValue = false; bool throttling = false; T value = default; var dueTimeDisposable = new SerialDisposable(); void internalCallback() { if (hasValue) { // We have another value that came in to fire. // Reregister for callback dueTimeDisposable.Disposable = scheduler.Schedule(interval, internalCallback); o.OnNext(value); value = default; hasValue = false; } else { // Nothing to do, throttle is complete. throttling = false; } } return source.Subscribe( onNext: (x) => { if (!throttling) { // Fire initial value o.OnNext(x); // Mark that we're throttling throttling = true; // Register for callback when throttle is complete dueTimeDisposable.Disposable = scheduler.Schedule(interval, internalCallback); } else { // In the middle of throttle // Save value and return hasValue = true; value = x; } }, onError: o.OnError, onCompleted: o.OnCompleted); }); } public static IObservable SelectTask(this IObservable source, Func task) { return source .SelectMany(async i => { await task(i).ConfigureAwait(false); return System.Reactive.Unit.Default; }); } public static IObservable SelectTask(this IObservable source, Func task) { return source .SelectMany(async _ => { await task().ConfigureAwait(false); return System.Reactive.Unit.Default; }); } public static IObservable SelectTask(this IObservable source, Func> task) { return source .SelectMany(_ => task()); } public static IObservable SelectTask(this IObservable source, Func> task) { return source .SelectMany(x => task(x)); } public static IObservable DoTask(this IObservable source, Func task) { return source .SelectMany(async (x) => { await task(x).ConfigureAwait(false); return x; }); } public static IObservable WhereCastable(this IObservable source) where R : class where T : class { return source .Select(x => x as R) .NotNull(); } public static IObservable Invert(this IObservable source) { return source.Select(x => !x); } public static IObservable<(T Previous, T Current)> Pairwise(this IObservable source) { T prevStorage = default; return source.Select(i => { var prev = prevStorage; prevStorage = i; return (prev, i); }); } public static IObservable DelayInitial(this IObservable source, TimeSpan delay, IScheduler scheduler) { return source.FlowSwitch( Observable.Return(System.Reactive.Unit.Default) .Delay(delay, scheduler) .Select(_ => true) .StartWith(false)); } public static IObservable DisposeOld(this IObservable source) where T : IDisposable { return source .StartWith(default(T)) .Pairwise() .Do(x => { if (x.Previous != null) { x.Previous.Dispose(); } }) .Select(x => x.Current); } } }