diff --git a/Wabbajack.Common/CSP/CSPExtensions.cs b/Wabbajack.Common/CSP/CSPExtensions.cs index 75a45daa..e4543370 100644 --- a/Wabbajack.Common/CSP/CSPExtensions.cs +++ b/Wabbajack.Common/CSP/CSPExtensions.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading.Tasks; +using System.Windows.Forms.VisualStyles; namespace Wabbajack.Common.CSP { @@ -157,7 +158,6 @@ namespace Wabbajack.Common.CSP if (!await to.Put(pval)) break; } - if (propagateClose) to.Close(); break; } } @@ -165,7 +165,14 @@ namespace Wabbajack.Common.CSP await Task.WhenAll(Enumerable.Range(0, parallelism) .Select(idx => Task.Run(Pump))); - + + if (propagateClose) + { + from.Close(); + to.Close(); + } + } } + } diff --git a/Wabbajack.Common/CSP/ManyToManyChannel.cs b/Wabbajack.Common/CSP/ManyToManyChannel.cs index 8223b5d2..b5dbc2ce 100644 --- a/Wabbajack.Common/CSP/ManyToManyChannel.cs +++ b/Wabbajack.Common/CSP/ManyToManyChannel.cs @@ -88,16 +88,13 @@ namespace Wabbajack.Common.CSP { Task.Run(action); } - } - else - { - if (is_done) - Abort(); - Monitor.Exit(this); - return (AsyncResult.Closed, false); + return (AsyncResult.Completed, true); } - return (AsyncResult.Completed, true); + if (is_done) + Abort(); + Monitor.Exit(this); + return (AsyncResult.Closed, false); } Monitor.Exit(this); return (AsyncResult.Canceled, false); @@ -198,9 +195,12 @@ namespace Wabbajack.Common.CSP if (handler.IsBlockable) { if (_takes.Length >= MAX_QUEUE_SIZE) + { + Monitor.Exit(this); throw new TooManyHanldersException(); - _takes.Unshift(handler); + } + _takes.Unshift(handler); } Monitor.Exit(this); return (AsyncResult.Enqueued, default); @@ -221,7 +221,8 @@ namespace Wabbajack.Common.CSP _isClosed = true; if (_buf != null && _puts.IsEmpty) _finalize(_buf); - var cbs = GetTakersForBuffer(); + + var cbs = _buf == null? new List() : GetTakersForBuffer(); while (!_takes.IsEmpty) { diff --git a/Wabbajack.Common/CSP/PutTaskHandler.cs b/Wabbajack.Common/CSP/PutTaskHandler.cs index 71e52a2c..0f0da275 100644 --- a/Wabbajack.Common/CSP/PutTaskHandler.cs +++ b/Wabbajack.Common/CSP/PutTaskHandler.cs @@ -36,7 +36,7 @@ namespace Wabbajack.Common.CSP private void Handle(bool val) { - _tcs.SetResult(val); + TaskCompletionSource.SetResult(val); } } } diff --git a/Wabbajack.Common/CSP/RingBuffer.cs b/Wabbajack.Common/CSP/RingBuffer.cs index 206e7954..c55f1034 100644 --- a/Wabbajack.Common/CSP/RingBuffer.cs +++ b/Wabbajack.Common/CSP/RingBuffer.cs @@ -10,7 +10,7 @@ using System.Windows.Forms; namespace Wabbajack.Common.CSP { - public struct RingBuffer : IEnumerable + public class RingBuffer : IEnumerable { private int _size; private int _length; diff --git a/Wabbajack.Test/CSPTests.cs b/Wabbajack.Test/CSPTests.cs index 84491bea..7c5ae6c1 100644 --- a/Wabbajack.Test/CSPTests.cs +++ b/Wabbajack.Test/CSPTests.cs @@ -109,9 +109,86 @@ namespace Wabbajack.Test .UnorderedPipeline(1, o, obs => obs.Select(itm => itm.ToString())); var results = (await o.TakeAll()).OrderBy(e => e).ToList(); - var expected = Enumerable.Range(0, 3).Select(i => i.ToString()).ToList(); + var expected = Enumerable.Range(0, 3).Select(i => i.ToString()).OrderBy(e => e).ToList(); CollectionAssert.AreEqual(expected, results); } + + [TestMethod] + public async Task UnorderedPipelineWithParallelism() + { + // Do it a hundred times to try and catch rare deadlocks + var o = Channel.Create(3); + var finished = Enumerable.Range(0, 1024) + .ToChannel() + .UnorderedPipeline(4, o, obs => obs.Select(itm => itm.ToString())); + + var results = (await o.TakeAll()).OrderBy(e => e).ToList(); + var expected = Enumerable.Range(0, 1024).Select(i => i.ToString()).OrderBy(e => e).ToList(); + CollectionAssert.AreEqual(expected, results); + await finished; + } + + [TestMethod] + public async Task ChannelStressTest() + { + var chan = Channel.Create(); + + var putter = Task.Run(async () => + { + for (var i = 0; i < 1000; i++) + await chan.Put(i); + }); + + var taker = Task.Run(async () => + { + try + { + for (var i = 0; i < 1000; i++) + { + var (is_open, val) = await chan.Take(); + Assert.AreEqual(i, val); + } + } + finally + { + chan.Close(); + } + }); + + await putter; + await taker; + } + + [TestMethod] + public async Task ChannelStressWithBuffer() + { + var chan = Channel.Create(1); + + var putter = Task.Run(async () => + { + for (var i = 0; i < 1000; i++) + await chan.Put(i); + }); + + var taker = Task.Run(async () => + { + try + { + for (var i = 0; i < 1000; i++) + { + var (is_open, val) = await chan.Take(); + Assert.AreEqual(i, val); + } + } + finally + { + chan.Close(); + } + }); + + await putter; + await taker; + } } }