2020-01-11 20:20:14 +00:00
using System ;
2019-07-22 22:17:46 +00:00
using System.Collections.Concurrent ;
using System.Collections.Generic ;
using System.Linq ;
2020-01-10 04:55:57 +00:00
using System.Reactive.Disposables ;
2019-11-05 04:14:31 +00:00
using System.Reactive.Linq ;
using System.Reactive.Subjects ;
2020-01-11 23:17:45 +00:00
using System.Runtime.CompilerServices ;
2019-07-22 22:17:46 +00:00
using System.Threading ;
2019-12-04 01:26:26 +00:00
using System.Threading.Tasks ;
2020-01-10 04:55:57 +00:00
using DynamicData ;
2019-12-04 04:12:08 +00:00
using Wabbajack.Common.StatusFeed ;
2019-07-22 22:17:46 +00:00
2020-01-11 23:17:45 +00:00
[assembly: InternalsVisibleTo("Wabbajack.Test")]
2019-07-22 22:17:46 +00:00
namespace Wabbajack.Common
{
2019-11-20 23:39:03 +00:00
public class WorkQueue : IDisposable
2019-07-22 22:17:46 +00:00
{
2020-03-30 21:38:01 +00:00
internal AsyncBlockingCollection < Func < Task > > Queue = new AsyncBlockingCollection < Func < Task > > ( ) ;
2019-07-22 22:17:46 +00:00
2019-12-13 00:40:21 +00:00
public const int UnassignedCpuId = 0 ;
2019-12-09 00:19:36 +00:00
2019-12-13 00:40:21 +00:00
private static readonly AsyncLocal < int > _cpuId = new AsyncLocal < int > ( ) ;
public int CpuId = > _cpuId . Value ;
2019-07-22 22:17:46 +00:00
2019-12-23 04:42:57 +00:00
public static bool WorkerThread = > AsyncLocalCurrentQueue . Value ! = null ;
public bool IsWorkerThread = > WorkerThread ;
2019-12-07 06:40:48 +00:00
internal static readonly AsyncLocal < WorkQueue > AsyncLocalCurrentQueue = new AsyncLocal < WorkQueue > ( ) ;
2019-09-14 04:35:42 +00:00
2019-12-03 23:03:47 +00:00
private readonly Subject < CPUStatus > _Status = new Subject < CPUStatus > ( ) ;
2019-11-17 04:16:42 +00:00
public IObservable < CPUStatus > Status = > _Status ;
2019-11-17 06:02:09 +00:00
2020-01-10 04:55:57 +00:00
private int _nextCpuID = 1 ; // Start at 1, as 0 is "Unassigned"
2020-01-12 00:36:45 +00:00
internal Dictionary < int , Task > _tasks = new Dictionary < int , Task > ( ) ;
2020-01-11 20:20:14 +00:00
public int DesiredNumWorkers { get ; private set ; } = 0 ;
2019-08-10 15:21:50 +00:00
2020-01-10 04:55:57 +00:00
private CancellationTokenSource _shutdown = new CancellationTokenSource ( ) ;
private CompositeDisposable _disposables = new CompositeDisposable ( ) ;
2019-12-04 00:03:43 +00:00
2019-12-09 00:19:36 +00:00
// This is currently a lie, as it wires to the Utils singleton stream This is still good to have,
// so that logic related to a single WorkQueue can subscribe to this dummy member so that If/when we
// implement log messages in a non-singleton fashion, they will already be wired up properly.
public IObservable < IStatusMessage > LogMessages = > Utils . LogMessages ;
2020-01-10 04:55:57 +00:00
private AsyncLock _lock = new AsyncLock ( ) ;
2019-07-22 22:17:46 +00:00
2020-01-11 20:20:14 +00:00
private readonly BehaviorSubject < ( int DesiredCPUs , int CurrentCPUs ) > _cpuCountSubj = new BehaviorSubject < ( int DesiredCPUs , int CurrentCPUs ) > ( ( 0 , 0 ) ) ;
public IObservable < ( int CurrentCPUs , int DesiredCPUs ) > CurrentCpuCount = > _cpuCountSubj ;
private readonly Subject < IObservable < int > > _activeNumThreadsObservable = new Subject < IObservable < int > > ( ) ;
2020-03-30 21:38:01 +00:00
public TimeSpan PollMS = TimeSpan . FromMilliseconds ( 200 ) ;
2020-01-11 23:17:45 +00:00
2020-01-11 20:41:44 +00:00
/// <summary>
/// Creates a WorkQueue with the given number of threads
/// </summary>
/// <param name="threadCount">Number of threads for the WorkQueue to have. Null represents default, which is the Processor count of the machine.</param>
2020-01-07 05:50:04 +00:00
public WorkQueue ( int? threadCount = null )
2020-01-10 04:55:57 +00:00
: this ( Observable . Return ( threadCount ? ? Environment . ProcessorCount ) )
{
}
2020-01-11 20:41:44 +00:00
/// <summary>
/// Creates a WorkQueue whos number of threads is determined by the given observable
/// </summary>
/// <param name="numThreads">Driving observable that determines how many threads should be actively pulling jobs from the queue</param>
2020-01-10 04:55:57 +00:00
public WorkQueue ( IObservable < int > numThreads )
2019-07-22 22:17:46 +00:00
{
2020-01-11 20:41:44 +00:00
// Hook onto the number of active threads subject, and subscribe to it for changes
2020-01-11 20:20:14 +00:00
_activeNumThreadsObservable
2020-01-11 20:41:44 +00:00
// Select the latest driving observable
2020-01-11 20:20:14 +00:00
. Select ( x = > x ? ? Observable . Return ( Environment . ProcessorCount ) )
. Switch ( )
2020-01-10 04:55:57 +00:00
. DistinctUntilChanged ( )
2020-01-11 20:41:44 +00:00
// Add new threads if it increases
2020-01-10 04:55:57 +00:00
. SelectTask ( AddNewThreadsIfNeeded )
. Subscribe ( )
. DisposeWith ( _disposables ) ;
2020-01-11 20:41:44 +00:00
// Set the incoming driving observable to be active
2020-01-11 20:20:14 +00:00
SetActiveThreadsObservable ( numThreads ) ;
}
2020-01-11 20:41:44 +00:00
/// <summary>
/// Sets the driving observable that determines how many threads should be actively pulling jobs from the queue
/// </summary>
/// <param name="numThreads">Driving observable that determines how many threads should be actively pulling jobs from the queue</param>
2020-01-11 20:20:14 +00:00
public void SetActiveThreadsObservable ( IObservable < int > numThreads )
{
_activeNumThreadsObservable . OnNext ( numThreads ) ;
2020-01-10 04:55:57 +00:00
}
private async Task AddNewThreadsIfNeeded ( int desired )
{
using ( await _lock . Wait ( ) )
{
2020-01-11 20:20:14 +00:00
DesiredNumWorkers = desired ;
while ( DesiredNumWorkers > _tasks . Count )
2019-09-14 04:35:42 +00:00
{
2020-01-10 04:55:57 +00:00
var cpuID = _nextCpuID + + ;
2020-01-12 00:36:45 +00:00
_tasks [ cpuID ] = Task . Run ( async ( ) = >
{
await ThreadBody ( cpuID ) ;
} ) ;
2020-01-10 04:55:57 +00:00
}
2020-01-11 20:20:14 +00:00
_cpuCountSubj . OnNext ( ( _tasks . Count , DesiredNumWorkers ) ) ;
2020-01-10 04:55:57 +00:00
}
2019-07-22 22:17:46 +00:00
}
2020-01-10 04:55:57 +00:00
private async Task ThreadBody ( int cpuID )
2019-07-22 22:17:46 +00:00
{
2020-01-10 04:55:57 +00:00
_cpuId . Value = cpuID ;
2019-12-07 06:40:48 +00:00
AsyncLocalCurrentQueue . Value = this ;
2019-07-22 22:17:46 +00:00
2019-12-04 00:03:43 +00:00
try
{
while ( true )
{
2020-02-08 04:35:08 +00:00
Report ( "Waiting" , Percent . Zero , false ) ;
2020-01-10 04:55:57 +00:00
if ( _shutdown . IsCancellationRequested ) return ;
2020-01-11 23:17:45 +00:00
2019-12-14 05:46:20 +00:00
Func < Task > f ;
2020-01-11 23:17:45 +00:00
bool got ;
2019-12-14 05:46:20 +00:00
try
{
2020-03-30 21:38:01 +00:00
( got , f ) = await Queue . TryTake ( PollMS , _shutdown . Token ) ;
2019-12-14 05:46:20 +00:00
}
catch ( Exception )
{
throw new OperationCanceledException ( ) ;
}
2020-01-11 23:17:45 +00:00
if ( got )
{
await f ( ) ;
}
2020-01-10 04:55:57 +00:00
// Check if we're currently trimming threads
2020-01-11 20:20:14 +00:00
if ( DesiredNumWorkers > = _tasks . Count ) continue ;
2020-01-10 04:55:57 +00:00
// Noticed that we may need to shut down, lock and check again
using ( await _lock . Wait ( ) )
{
2020-01-11 20:59:40 +00:00
// Check if another thread shut down before this one and got us back to the desired amount already
2020-01-11 20:20:14 +00:00
if ( DesiredNumWorkers > = _tasks . Count ) continue ;
2020-01-10 04:55:57 +00:00
2020-01-12 00:36:45 +00:00
// Shutdown
if ( ! _tasks . Remove ( cpuID ) )
2020-01-10 04:55:57 +00:00
{
2020-01-12 00:36:45 +00:00
Utils . Error ( $"Could not remove thread from workpool with CPU ID {cpuID}" ) ;
2020-01-10 04:55:57 +00:00
}
2020-02-08 04:35:08 +00:00
Report ( "Shutting down" , Percent . Zero , false ) ;
2020-01-12 00:36:45 +00:00
_cpuCountSubj . OnNext ( ( _tasks . Count , DesiredNumWorkers ) ) ;
2020-01-10 04:55:57 +00:00
return ;
}
2019-12-04 00:03:43 +00:00
}
}
catch ( OperationCanceledException )
2019-07-22 22:17:46 +00:00
{
}
2020-01-08 00:46:14 +00:00
catch ( Exception ex )
{
Utils . Error ( ex , "Error in WorkQueue thread." ) ;
}
2019-07-22 22:17:46 +00:00
}
2019-09-14 04:35:42 +00:00
2020-02-08 04:35:08 +00:00
public void Report ( string msg , Percent progress , bool isWorking = true )
2019-07-22 22:17:46 +00:00
{
2019-11-17 04:16:42 +00:00
_Status . OnNext (
new CPUStatus
{
2020-02-08 04:35:08 +00:00
ProgressPercent = progress ,
2019-11-17 04:16:42 +00:00
Msg = msg ,
2019-12-13 00:40:21 +00:00
ID = _cpuId . Value ,
2019-11-28 17:30:19 +00:00
IsWorking = isWorking
2019-11-17 04:16:42 +00:00
} ) ;
2019-07-22 22:17:46 +00:00
}
2019-12-04 01:26:26 +00:00
public void QueueTask ( Func < Task > a )
2019-07-22 22:17:46 +00:00
{
Queue . Add ( a ) ;
}
2019-12-07 02:04:42 +00:00
public void Dispose ( )
2019-08-02 23:04:04 +00:00
{
2020-01-10 04:55:57 +00:00
_shutdown . Cancel ( ) ;
_disposables . Dispose ( ) ;
2019-11-20 23:39:03 +00:00
Queue ? . Dispose ( ) ;
}
2019-07-22 22:17:46 +00:00
}
2019-11-05 04:14:31 +00:00
public class CPUStatus
{
2020-02-08 04:35:08 +00:00
public Percent ProgressPercent { get ; internal set ; }
2019-11-05 04:14:31 +00:00
public string Msg { get ; internal set ; }
public int ID { get ; internal set ; }
2019-11-28 17:30:19 +00:00
public bool IsWorking { get ; internal set ; }
2019-11-05 04:14:31 +00:00
}
2019-11-20 23:39:03 +00:00
}