2019-11-17 23:48:32 +00:00
using System ;
using System.IO ;
2019-12-09 00:19:36 +00:00
using System.Reactive.Disposables ;
2020-01-10 04:55:57 +00:00
using System.Reactive.Linq ;
2019-11-17 23:48:32 +00:00
using System.Reactive.Subjects ;
using System.Threading ;
using System.Threading.Tasks ;
using Wabbajack.Common ;
2019-12-09 00:19:36 +00:00
using Wabbajack.Common.StatusFeed ;
2019-11-17 23:48:32 +00:00
using Wabbajack.VirtualFileSystem ;
namespace Wabbajack.Lib
{
public abstract class ABatchProcessor : IBatchProcessor
{
2020-01-11 20:20:14 +00:00
public WorkQueue Queue { get ; } = new WorkQueue ( ) ;
2019-11-17 23:48:32 +00:00
public Context VFS { get ; private set ; }
protected StatusUpdateTracker UpdateTracker { get ; private set ; }
2020-02-08 04:35:08 +00:00
private Subject < Percent > _percentCompleted { get ; } = new Subject < Percent > ( ) ;
2019-11-17 23:48:32 +00:00
/// <summary>
/// The current progress of the entire processing system on a scale of 0.0 to 1.0
/// </summary>
2020-02-08 04:35:08 +00:00
public IObservable < Percent > PercentCompleted = > _percentCompleted ;
2019-11-17 23:48:32 +00:00
2019-11-18 00:17:06 +00:00
private Subject < string > _textStatus { get ; } = new Subject < string > ( ) ;
2019-11-17 23:48:32 +00:00
/// <summary>
/// The current status of the processor as a text string
/// </summary>
2019-11-18 00:17:06 +00:00
public IObservable < string > TextStatus = > _textStatus ;
2019-11-17 23:48:32 +00:00
2019-11-18 00:17:06 +00:00
private Subject < CPUStatus > _queueStatus { get ; } = new Subject < CPUStatus > ( ) ;
public IObservable < CPUStatus > QueueStatus = > _queueStatus ;
2019-11-17 23:48:32 +00:00
2019-12-09 00:19:36 +00:00
private Subject < IStatusMessage > _logMessages { get ; } = new Subject < IStatusMessage > ( ) ;
public IObservable < IStatusMessage > LogMessages = > _logMessages ;
2019-11-18 00:17:06 +00:00
private Subject < bool > _isRunning { get ; } = new Subject < bool > ( ) ;
public IObservable < bool > IsRunning = > _isRunning ;
2019-11-17 23:48:32 +00:00
2019-12-03 20:44:52 +00:00
private int _configured ;
private int _started ;
2019-12-03 21:56:18 +00:00
private readonly CancellationTokenSource _cancel = new CancellationTokenSource ( ) ;
2019-12-03 20:44:52 +00:00
2019-12-09 00:19:36 +00:00
private readonly CompositeDisposable _subs = new CompositeDisposable ( ) ;
2020-01-08 01:57:00 +00:00
// WorkQueue settings
2020-01-10 04:55:57 +00:00
public BehaviorSubject < bool > ManualCoreLimit = new BehaviorSubject < bool > ( true ) ;
public BehaviorSubject < byte > MaxCores = new BehaviorSubject < byte > ( byte . MaxValue ) ;
2020-02-08 04:35:08 +00:00
public BehaviorSubject < Percent > TargetUsagePercent = new BehaviorSubject < Percent > ( Percent . One ) ;
2020-01-08 01:57:00 +00:00
2020-01-10 04:55:57 +00:00
protected void ConfigureProcessor ( int steps , IObservable < int > numThreads = null )
2019-11-17 23:48:32 +00:00
{
2019-12-03 20:44:52 +00:00
if ( 1 = = Interlocked . CompareExchange ( ref _configured , 1 , 1 ) )
{
2019-11-17 23:48:32 +00:00
throw new InvalidDataException ( "Can't configure a processor twice" ) ;
2019-12-03 20:44:52 +00:00
}
2020-01-11 20:20:14 +00:00
Queue . SetActiveThreadsObservable ( numThreads ) ;
2019-11-17 23:48:32 +00:00
UpdateTracker = new StatusUpdateTracker ( steps ) ;
2019-12-09 00:19:36 +00:00
Queue . Status . Subscribe ( _queueStatus )
. DisposeWith ( _subs ) ;
Queue . LogMessages . Subscribe ( _logMessages )
. DisposeWith ( _subs ) ;
2019-11-17 23:48:32 +00:00
UpdateTracker . Progress . Subscribe ( _percentCompleted ) ;
UpdateTracker . StepName . Subscribe ( _textStatus ) ;
VFS = new Context ( Queue ) { UpdateTracker = UpdateTracker } ;
}
2020-01-11 20:59:40 +00:00
/// <summary>
/// Gets the recommended maximum number of threads that should be used for the current machine.
/// This will either run a heavy processing job to do the measurement in the current folder, or refer to caches.
/// </summary>
/// <returns>Recommended maximum number of threads to use</returns>
2020-01-08 01:57:00 +00:00
public async Task < int > RecommendQueueSize ( )
{
const ulong GB = ( 1024 * 1024 * 1024 ) ;
// Most of the heavy lifting is done on the scratch disk, so we'll use the value from that disk
var memory = Utils . GetMemoryStatus ( ) ;
2020-01-21 20:10:13 +00:00
// Assume roughly 2GB of ram needed to extract each 7zip archive, and then leave 2GB for the OS. If calculation is lower or equal to 1 GB, use 1GB
2020-01-21 20:40:47 +00:00
var based_on_memory = Math . Max ( ( memory . ullTotalPhys - ( 2 * GB ) ) / ( 2 * GB ) , 1 ) ;
2020-01-08 01:57:00 +00:00
var scratch_size = await RecommendQueueSize ( Directory . GetCurrentDirectory ( ) ) ;
var result = Math . Min ( ( int ) based_on_memory , ( int ) scratch_size ) ;
Utils . Log ( $"Recommending a queue size of {result} based on disk performance, number of cores, and {((long)memory.ullTotalPhys).ToFileSizeString()} of system RAM" ) ;
return result ;
}
2020-01-11 20:59:40 +00:00
/// <summary>
/// Gets the recommended maximum number of threads that should be used for the current machine.
/// This will either run a heavy processing job to do the measurement in the specified folder, or refer to caches.
///
/// If the folder does not exist, it will be created, and not cleaned up afterwards.
/// </summary>
/// <param name="folder"></param>
/// <returns>Recommended maximum number of threads to use</returns>
public static async Task < int > RecommendQueueSize ( string folder )
{
if ( ! Directory . Exists ( folder ) )
Directory . CreateDirectory ( folder ) ;
using ( var queue = new WorkQueue ( ) )
{
Utils . Log ( $"Benchmarking {folder}" ) ;
var raw_speed = await Utils . TestDiskSpeed ( queue , folder ) ;
Utils . Log ( $"{raw_speed.ToFileSizeString()}/sec for {folder}" ) ;
int speed = ( int ) ( raw_speed / 1024 / 1024 ) ;
// Less than 100MB/sec, stick with two threads.
return speed < 100 ? 2 : Math . Min ( Environment . ProcessorCount , speed / 100 * 2 ) ;
}
}
/// <summary>
/// Constructs an observable of the number of threads to be used
///
/// Takes in a recommended amount (based off measuring the machine capabilities), and combines that with user preferences stored in subjects.
///
/// As user preferences change, the number of threads gets recalculated in the resulting observable
/// </summary>
/// <param name="recommendedCount">Maximum recommended number of threads</param>
/// <returns>Observable of number of threads to use based off recommendations and user preferences</returns>
2020-01-10 04:55:57 +00:00
public IObservable < int > ConstructDynamicNumThreads ( int recommendedCount )
{
return Observable . CombineLatest (
ManualCoreLimit ,
MaxCores ,
TargetUsagePercent ,
2020-01-11 23:17:55 +00:00
( manual , max , target ) = > CalculateThreadsToUse ( recommendedCount , manual , max , target ) ) ;
}
/// <summary>
/// Calculates the number of threads to use, based off recommended values and user preferences
/// </summary>
public static int CalculateThreadsToUse (
int recommendedCount ,
bool manual ,
byte manualMax ,
double targetUsage )
{
if ( manual )
{
if ( recommendedCount > manualMax )
2020-01-10 04:55:57 +00:00
{
2020-01-11 23:17:55 +00:00
Utils . Log ( $"Only using {manualMax} due to user preferences." ) ;
}
return Math . Max ( 1 , Math . Min ( manualMax , recommendedCount ) ) ;
}
else if ( targetUsage < 1.0d & & targetUsage > = 0d )
{
var ret = ( int ) Math . Ceiling ( recommendedCount * targetUsage ) ;
return Math . Max ( 1 , ret ) ;
}
return recommendedCount ;
2020-01-10 04:55:57 +00:00
}
2019-12-04 01:26:26 +00:00
protected abstract Task < bool > _Begin ( CancellationToken cancel ) ;
2019-11-17 23:48:32 +00:00
public Task < bool > Begin ( )
{
2019-12-03 20:44:52 +00:00
if ( 1 = = Interlocked . CompareExchange ( ref _started , 1 , 1 ) )
2019-11-17 23:48:32 +00:00
{
throw new InvalidDataException ( "Can't start the processor twice" ) ;
}
2019-12-04 01:26:26 +00:00
return Task . Run ( async ( ) = >
{
2019-11-17 23:48:32 +00:00
try
{
2019-12-04 01:26:26 +00:00
_isRunning . OnNext ( true ) ;
return await _Begin ( _cancel . Token ) ;
2019-11-17 23:48:32 +00:00
}
finally
{
2019-11-18 00:17:06 +00:00
_isRunning . OnNext ( false ) ;
2019-11-17 23:48:32 +00:00
}
} ) ;
}
2019-12-07 02:04:42 +00:00
public void Dispose ( )
2019-11-17 23:48:32 +00:00
{
2019-12-03 21:56:18 +00:00
_cancel . Cancel ( ) ;
2019-12-07 02:04:42 +00:00
Queue ? . Dispose ( ) ;
2019-11-18 00:17:06 +00:00
_isRunning . OnNext ( false ) ;
2019-11-17 23:48:32 +00:00
}
2020-01-10 04:55:57 +00:00
public void Add ( IDisposable disposable ) = > _subs . Add ( disposable ) ;
2019-11-17 23:48:32 +00:00
}
}