diff --git a/Wabbajack.Common/CSP/CSPExtensions.cs b/Wabbajack.Common/CSP/CSPExtensions.cs index cc59d58e..d91e1061 100644 --- a/Wabbajack.Common/CSP/CSPExtensions.cs +++ b/Wabbajack.Common/CSP/CSPExtensions.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Subjects; using System.Text; +using System.Threading; using System.Threading.Tasks; using System.Windows.Forms.VisualStyles; @@ -84,24 +85,43 @@ namespace Wabbajack.Common.CSP } } - /* - private static void PipelineInner(int n, - IChannel from, - Func> fn, - IChannel to, - bool closeOnFinished) + public static Task ThreadedTask(Func action) { - var jobs = Channel.Create(n); - var results = Channel.Create(n); - + var src = new TaskCompletionSource(); + var th = new Thread(() => { - bool Process(TOutSrc val, ) + try { - if () - + src.SetResult(action()); } - } - }*/ + catch (Exception ex) + { + src.SetException(ex); + } + }) {Priority = ThreadPriority.BelowNormal}; + th.Start(); + return src.Task; + } + + public static Task ThreadedTask(Action action) + { + var src = new TaskCompletionSource(); + var th = new Thread(() => + { + try + { + action(); + src.SetResult(true); + } + catch (Exception ex) + { + src.SetException(ex); + } + }) + { Priority = ThreadPriority.BelowNormal }; + th.Start(); + return src.Task; + } } diff --git a/Wabbajack.Common/CSP/Channel.cs b/Wabbajack.Common/CSP/Channel.cs index 449a386b..cd4041ee 100644 --- a/Wabbajack.Common/CSP/Channel.cs +++ b/Wabbajack.Common/CSP/Channel.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Data; using System.Linq; +using System.Reactive.Linq; using System.Reactive.Subjects; using System.Security.Cryptography; using System.Text; @@ -53,7 +54,23 @@ namespace Wabbajack.Common.CSP public static IChannel Create(int buffer_size, Func, IObservable> transform) { var buf = new RxBuffer(buffer_size, transform); + return ChannelForRxBuf(buf); + } + + private static ManyToManyChannel ChannelForRxBuf(RxBuffer buf) + { return new ManyToManyChannel(null, RxBuffer.TransformAdd, RxBuffer.Finalize, buf); } + + /// + /// Creates a channel that discards every value + /// + /// + /// + public static IChannel CreateSink() + { + var buf = new RxBuffer(1, e => e.Where(itm => false)); + return ChannelForRxBuf(buf); + } } } diff --git a/Wabbajack.Common/Wabbajack.Common.csproj b/Wabbajack.Common/Wabbajack.Common.csproj index d26b5e97..0333fff2 100644 --- a/Wabbajack.Common/Wabbajack.Common.csproj +++ b/Wabbajack.Common/Wabbajack.Common.csproj @@ -85,6 +85,8 @@ + + diff --git a/Wabbajack.Test/Wabbajack.Test.csproj b/Wabbajack.Test/Wabbajack.Test.csproj index a8407b4c..001b0865 100644 --- a/Wabbajack.Test/Wabbajack.Test.csproj +++ b/Wabbajack.Test/Wabbajack.Test.csproj @@ -93,6 +93,7 @@ +