wabbajack/Wabbajack.RateLimiter/Resource.cs

132 lines
3.8 KiB
C#
Raw Permalink Normal View History

2021-09-27 12:42:46 +00:00
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
2021-09-27 12:42:46 +00:00
using System.Diagnostics;
using System.Linq;
using System.Runtime.Versioning;
2021-09-27 12:42:46 +00:00
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
2021-10-23 16:51:17 +00:00
namespace Wabbajack.RateLimiter;
public class Resource<T> : IResource<T>
2021-09-27 12:42:46 +00:00
{
private Channel<PendingReport> _channel;
private SemaphoreSlim _semaphore;
private readonly ConcurrentDictionary<ulong, Job<T>> _tasks;
2021-10-23 16:51:17 +00:00
private ulong _nextId;
private long _totalUsed;
public IEnumerable<IJob> Jobs => _tasks.Values;
2021-10-23 16:51:17 +00:00
public Resource(string? humanName = null, int? maxTasks = null, long maxThroughput = long.MaxValue, CancellationToken? token = null)
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
Name = humanName ?? "<unknown>";
MaxTasks = maxTasks ?? Environment.ProcessorCount;
MaxThroughput = maxThroughput;
_semaphore = new SemaphoreSlim(MaxTasks);
_channel = Channel.CreateBounded<PendingReport>(10);
_tasks = new ConcurrentDictionary<ulong, Job<T>>();
var tsk = StartTask(token ?? CancellationToken.None);
2021-11-10 05:15:13 +00:00
}
public Resource(string humanName, Func<Task<(int MaxTasks, long MaxThroughput)>> settingGetter, CancellationToken? token = null)
{
Name = humanName;
_tasks = new ConcurrentDictionary<ulong, Job<T>>();
Task.Run(async () =>
{
var (maxTasks, maxThroughput) = await settingGetter();
MaxTasks = maxTasks;
MaxThroughput = maxThroughput;
_semaphore = new SemaphoreSlim(MaxTasks);
_channel = Channel.CreateBounded<PendingReport>(10);
await StartTask(token ?? CancellationToken.None);
}, token ?? CancellationToken.None);
}
2021-10-23 16:51:17 +00:00
public int MaxTasks { get; set; }
public long MaxThroughput { get; set; }
public string Name { get; }
public async ValueTask<Job<T>> Begin(string jobTitle, long size, CancellationToken token)
{
var id = Interlocked.Increment(ref _nextId);
var job = new Job<T>
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
ID = id,
Description = jobTitle,
Size = size,
Resource = this
};
_tasks.TryAdd(id, job);
await _semaphore.WaitAsync(token);
job.Started = true;
return job;
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
public void ReportNoWait(Job<T> job, int processedSize)
{
job.Current += processedSize;
Interlocked.Add(ref _totalUsed, processedSize);
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
public void Finish(Job<T> job)
{
_semaphore.Release();
_tasks.TryRemove(job.ID, out _);
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
public async ValueTask Report(Job<T> job, int size, CancellationToken token)
{
var tcs = new TaskCompletionSource();
await _channel.Writer.WriteAsync(new PendingReport
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
Job = job,
Size = size,
Result = tcs
}, token);
await tcs.Task;
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
public StatusReport StatusReport =>
new(_tasks.Count(t => t.Value.Started),
_tasks.Count(t => !t.Value.Started),
_totalUsed);
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
private async ValueTask StartTask(CancellationToken token)
{
var sw = new Stopwatch();
sw.Start();
await foreach (var item in _channel.Reader.ReadAllAsync(token))
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
Interlocked.Add(ref _totalUsed, item.Size);
if (MaxThroughput is long.MaxValue or 0)
2021-09-27 12:42:46 +00:00
{
2021-10-23 16:51:17 +00:00
item.Result.TrySetResult();
sw.Restart();
continue;
}
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
var span = TimeSpan.FromSeconds((double) item.Size / MaxThroughput);
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
await Task.Delay(span, token);
2021-09-27 12:42:46 +00:00
2021-10-23 16:51:17 +00:00
sw.Restart();
item.Result.TrySetResult();
}
}
private struct PendingReport
{
public Job<T> Job { get; set; }
public int Size { get; set; }
public TaskCompletionSource Result { get; set; }
2021-09-27 12:42:46 +00:00
}
}