More deadlock bugfixes

This commit is contained in:
Timothy Baldridge 2019-11-09 17:22:28 -07:00
parent 081dea2368
commit e9c2ababec
5 changed files with 100 additions and 15 deletions

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Reactive.Subjects; using System.Reactive.Subjects;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Windows.Forms.VisualStyles;
namespace Wabbajack.Common.CSP namespace Wabbajack.Common.CSP
{ {
@ -157,7 +158,6 @@ namespace Wabbajack.Common.CSP
if (!await to.Put(pval)) if (!await to.Put(pval))
break; break;
} }
if (propagateClose) to.Close();
break; break;
} }
} }
@ -166,6 +166,13 @@ namespace Wabbajack.Common.CSP
await Task.WhenAll(Enumerable.Range(0, parallelism) await Task.WhenAll(Enumerable.Range(0, parallelism)
.Select(idx => Task.Run(Pump))); .Select(idx => Task.Run(Pump)));
if (propagateClose)
{
from.Close();
to.Close();
}
} }
} }
} }

View File

@ -88,16 +88,13 @@ namespace Wabbajack.Common.CSP
{ {
Task.Run(action); Task.Run(action);
} }
} return (AsyncResult.Completed, true);
else
{
if (is_done)
Abort();
Monitor.Exit(this);
return (AsyncResult.Closed, false);
} }
return (AsyncResult.Completed, true); if (is_done)
Abort();
Monitor.Exit(this);
return (AsyncResult.Closed, false);
} }
Monitor.Exit(this); Monitor.Exit(this);
return (AsyncResult.Canceled, false); return (AsyncResult.Canceled, false);
@ -198,9 +195,12 @@ namespace Wabbajack.Common.CSP
if (handler.IsBlockable) if (handler.IsBlockable)
{ {
if (_takes.Length >= MAX_QUEUE_SIZE) if (_takes.Length >= MAX_QUEUE_SIZE)
{
Monitor.Exit(this);
throw new TooManyHanldersException(); throw new TooManyHanldersException();
_takes.Unshift(handler); }
_takes.Unshift(handler);
} }
Monitor.Exit(this); Monitor.Exit(this);
return (AsyncResult.Enqueued, default); return (AsyncResult.Enqueued, default);
@ -221,7 +221,8 @@ namespace Wabbajack.Common.CSP
_isClosed = true; _isClosed = true;
if (_buf != null && _puts.IsEmpty) if (_buf != null && _puts.IsEmpty)
_finalize(_buf); _finalize(_buf);
var cbs = GetTakersForBuffer();
var cbs = _buf == null? new List<Action>() : GetTakersForBuffer();
while (!_takes.IsEmpty) while (!_takes.IsEmpty)
{ {

View File

@ -36,7 +36,7 @@ namespace Wabbajack.Common.CSP
private void Handle(bool val) private void Handle(bool val)
{ {
_tcs.SetResult(val); TaskCompletionSource.SetResult(val);
} }
} }
} }

View File

@ -10,7 +10,7 @@ using System.Windows.Forms;
namespace Wabbajack.Common.CSP namespace Wabbajack.Common.CSP
{ {
public struct RingBuffer<T> : IEnumerable<T> public class RingBuffer<T> : IEnumerable<T>
{ {
private int _size; private int _size;
private int _length; private int _length;

View File

@ -109,9 +109,86 @@ namespace Wabbajack.Test
.UnorderedPipeline(1, o, obs => obs.Select(itm => itm.ToString())); .UnorderedPipeline(1, o, obs => obs.Select(itm => itm.ToString()));
var results = (await o.TakeAll()).OrderBy(e => e).ToList(); var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 3).Select(i => i.ToString()).ToList(); var expected = Enumerable.Range(0, 3).Select(i => i.ToString()).OrderBy(e => e).ToList();
CollectionAssert.AreEqual(expected, results); CollectionAssert.AreEqual(expected, results);
} }
[TestMethod]
public async Task UnorderedPipelineWithParallelism()
{
// Do it a hundred times to try and catch rare deadlocks
var o = Channel.Create<string>(3);
var finished = Enumerable.Range(0, 1024)
.ToChannel()
.UnorderedPipeline(4, o, obs => obs.Select(itm => itm.ToString()));
var results = (await o.TakeAll()).OrderBy(e => e).ToList();
var expected = Enumerable.Range(0, 1024).Select(i => i.ToString()).OrderBy(e => e).ToList();
CollectionAssert.AreEqual(expected, results);
await finished;
}
[TestMethod]
public async Task ChannelStressTest()
{
var chan = Channel.Create<int>();
var putter = Task.Run(async () =>
{
for (var i = 0; i < 1000; i++)
await chan.Put(i);
});
var taker = Task.Run(async () =>
{
try
{
for (var i = 0; i < 1000; i++)
{
var (is_open, val) = await chan.Take();
Assert.AreEqual(i, val);
}
}
finally
{
chan.Close();
}
});
await putter;
await taker;
}
[TestMethod]
public async Task ChannelStressWithBuffer()
{
var chan = Channel.Create<int>(1);
var putter = Task.Run(async () =>
{
for (var i = 0; i < 1000; i++)
await chan.Put(i);
});
var taker = Task.Run(async () =>
{
try
{
for (var i = 0; i < 1000; i++)
{
var (is_open, val) = await chan.Take();
Assert.AreEqual(i, val);
}
}
finally
{
chan.Close();
}
});
await putter;
await taker;
}
} }
} }