Final bits for CSP backend

This commit is contained in:
Timothy Baldridge
2019-11-10 20:47:25 -07:00
parent 6639e9946b
commit e61b203d4c
4 changed files with 54 additions and 14 deletions

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
@ -84,24 +85,43 @@ namespace Wabbajack.Common.CSP
}
}
/*
private static void PipelineInner<TInSrc, TOutSrc, TInDest, TOutDest>(int n,
IChannel<TInSrc, TOutSrc> from,
Func<TOutSrc, Task<TInDest>> fn,
IChannel<TInDest, TOutDest> to,
bool closeOnFinished)
public static Task<T> ThreadedTask<T>(Func<T> action)
{
var jobs = Channel.Create<TOutSrc>(n);
var results = Channel.Create<TInDest>(n);
var src = new TaskCompletionSource<T>();
var th = new Thread(() =>
{
bool Process(TOutSrc val, )
try
{
if ()
src.SetResult(action());
}
}
}*/
catch (Exception ex)
{
src.SetException(ex);
}
}) {Priority = ThreadPriority.BelowNormal};
th.Start();
return src.Task;
}
public static Task ThreadedTask<T>(Action action)
{
var src = new TaskCompletionSource<bool>();
var th = new Thread(() =>
{
try
{
action();
src.SetResult(true);
}
catch (Exception ex)
{
src.SetException(ex);
}
})
{ Priority = ThreadPriority.BelowNormal };
th.Start();
return src.Task;
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Security.Cryptography;
using System.Text;
@ -53,7 +54,23 @@ namespace Wabbajack.Common.CSP
public static IChannel<TIn, TOut> Create<TIn, TOut>(int buffer_size, Func<IObservable<TIn>, IObservable<TOut>> transform)
{
var buf = new RxBuffer<TIn, TOut>(buffer_size, transform);
return ChannelForRxBuf(buf);
}
private static ManyToManyChannel<TIn, TOut> ChannelForRxBuf<TIn, TOut>(RxBuffer<TIn, TOut> buf)
{
return new ManyToManyChannel<TIn, TOut>(null, RxBuffer<TIn,TOut>.TransformAdd, RxBuffer<TIn, TOut>.Finalize, buf);
}
/// <summary>
/// Creates a channel that discards every value
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <returns></returns>
public static IChannel<TIn, TIn> CreateSink<TIn>()
{
var buf = new RxBuffer<TIn, TIn>(1, e => e.Where(itm => false));
return ChannelForRxBuf(buf);
}
}
}

View File

@ -85,6 +85,8 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncWorkQueue.cs" />
<Compile Include="AsyncWorkQueueExtensions.cs" />
<Compile Include="BSDiff.cs" />
<Compile Include="ChildProcessTracker.cs" />
<Compile Include="Consts.cs" />

View File

@ -93,6 +93,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="ACompilerTest.cs" />
<Compile Include="CSP\AsyncWorkQueueTests.cs" />
<Compile Include="CSP\ChannelTests.cs" />
<Compile Include="CSP\CSPTests.cs" />
<Compile Include="DownloaderTests.cs" />