2019-11-17 23:48:32 +00:00
using System ;
using System.IO ;
2019-12-09 00:19:36 +00:00
using System.Reactive.Disposables ;
2020-01-10 04:55:57 +00:00
using System.Reactive.Linq ;
2019-11-17 23:48:32 +00:00
using System.Reactive.Subjects ;
using System.Threading ;
using System.Threading.Tasks ;
using Wabbajack.Common ;
2019-12-09 00:19:36 +00:00
using Wabbajack.Common.StatusFeed ;
2019-11-17 23:48:32 +00:00
using Wabbajack.VirtualFileSystem ;
namespace Wabbajack.Lib
{
public abstract class ABatchProcessor : IBatchProcessor
{
2020-01-11 20:20:14 +00:00
public WorkQueue Queue { get ; } = new WorkQueue ( ) ;
2019-11-17 23:48:32 +00:00
2021-02-01 06:41:51 +00:00
public int DownloadThreads { get ; set ; } = Environment . ProcessorCount < = 8 ? Environment . ProcessorCount : 8 ;
2020-09-12 20:23:49 +00:00
public int DiskThreads { get ; set ; } = Environment . ProcessorCount ;
2021-02-01 06:41:51 +00:00
public bool ReduceHDDThreads { get ; set ; } = true ;
2020-09-12 20:23:49 +00:00
public bool FavorPerfOverRam { get ; set ; } = false ;
2020-09-12 20:23:03 +00:00
2020-04-04 18:26:14 +00:00
public Context VFS { get ; }
2019-11-17 23:48:32 +00:00
2020-04-04 18:26:14 +00:00
protected StatusUpdateTracker UpdateTracker { get ; }
2019-11-17 23:48:32 +00:00
2020-02-08 04:35:08 +00:00
private Subject < Percent > _percentCompleted { get ; } = new Subject < Percent > ( ) ;
2019-11-17 23:48:32 +00:00
/// <summary>
/// The current progress of the entire processing system on a scale of 0.0 to 1.0
/// </summary>
2020-02-08 04:35:08 +00:00
public IObservable < Percent > PercentCompleted = > _percentCompleted ;
2019-11-17 23:48:32 +00:00
2019-11-18 00:17:06 +00:00
private Subject < string > _textStatus { get ; } = new Subject < string > ( ) ;
2019-11-17 23:48:32 +00:00
/// <summary>
/// The current status of the processor as a text string
/// </summary>
2019-11-18 00:17:06 +00:00
public IObservable < string > TextStatus = > _textStatus ;
2019-11-17 23:48:32 +00:00
2019-11-18 00:17:06 +00:00
private Subject < CPUStatus > _queueStatus { get ; } = new Subject < CPUStatus > ( ) ;
public IObservable < CPUStatus > QueueStatus = > _queueStatus ;
2019-11-17 23:48:32 +00:00
2019-12-09 00:19:36 +00:00
private Subject < IStatusMessage > _logMessages { get ; } = new Subject < IStatusMessage > ( ) ;
public IObservable < IStatusMessage > LogMessages = > _logMessages ;
2019-11-18 00:17:06 +00:00
private Subject < bool > _isRunning { get ; } = new Subject < bool > ( ) ;
public IObservable < bool > IsRunning = > _isRunning ;
2019-11-17 23:48:32 +00:00
2019-12-03 20:44:52 +00:00
private int _started ;
2019-12-03 21:56:18 +00:00
private readonly CancellationTokenSource _cancel = new CancellationTokenSource ( ) ;
2019-12-03 20:44:52 +00:00
2019-12-09 00:19:36 +00:00
private readonly CompositeDisposable _subs = new CompositeDisposable ( ) ;
2020-01-08 01:57:00 +00:00
// WorkQueue settings
2020-01-10 04:55:57 +00:00
public BehaviorSubject < bool > ManualCoreLimit = new BehaviorSubject < bool > ( true ) ;
public BehaviorSubject < byte > MaxCores = new BehaviorSubject < byte > ( byte . MaxValue ) ;
2020-02-08 04:35:08 +00:00
public BehaviorSubject < Percent > TargetUsagePercent = new BehaviorSubject < Percent > ( Percent . One ) ;
2020-09-12 20:23:03 +00:00
public Subject < int > DesiredThreads { get ; set ; }
2020-01-08 01:57:00 +00:00
2020-04-04 18:26:14 +00:00
public ABatchProcessor ( int steps )
2019-11-17 23:48:32 +00:00
{
UpdateTracker = new StatusUpdateTracker ( steps ) ;
2020-04-04 18:26:14 +00:00
VFS = new Context ( Queue ) { UpdateTracker = UpdateTracker } ;
2019-12-09 00:19:36 +00:00
Queue . Status . Subscribe ( _queueStatus )
. DisposeWith ( _subs ) ;
Queue . LogMessages . Subscribe ( _logMessages )
. DisposeWith ( _subs ) ;
2019-11-17 23:48:32 +00:00
UpdateTracker . Progress . Subscribe ( _percentCompleted ) ;
UpdateTracker . StepName . Subscribe ( _textStatus ) ;
2020-09-12 20:23:03 +00:00
DesiredThreads = new Subject < int > ( ) ;
Queue . SetActiveThreadsObservable ( DesiredThreads ) ;
2019-11-17 23:48:32 +00:00
}
2020-09-12 20:23:03 +00:00
2020-01-11 20:59:40 +00:00
/// <summary>
/// Gets the recommended maximum number of threads that should be used for the current machine.
/// This will either run a heavy processing job to do the measurement in the current folder, or refer to caches.
/// </summary>
/// <returns>Recommended maximum number of threads to use</returns>
2020-01-08 01:57:00 +00:00
public async Task < int > RecommendQueueSize ( )
{
const ulong GB = ( 1024 * 1024 * 1024 ) ;
// Most of the heavy lifting is done on the scratch disk, so we'll use the value from that disk
var memory = Utils . GetMemoryStatus ( ) ;
2020-01-21 20:10:13 +00:00
// Assume roughly 2GB of ram needed to extract each 7zip archive, and then leave 2GB for the OS. If calculation is lower or equal to 1 GB, use 1GB
2020-05-30 13:32:00 +00:00
var basedOnMemory = Math . Max ( ( memory . ullTotalPhys - ( 2 * GB ) ) / ( 2 * GB ) , 1 ) ;
var scratchSize = await RecommendQueueSize ( AbsolutePath . EntryPoint ) ;
var result = Math . Min ( ( int ) basedOnMemory , ( int ) scratchSize ) ;
2020-01-08 01:57:00 +00:00
Utils . Log ( $"Recommending a queue size of {result} based on disk performance, number of cores, and {((long)memory.ullTotalPhys).ToFileSizeString()} of system RAM" ) ;
return result ;
}
2020-01-11 20:59:40 +00:00
/// <summary>
/// Gets the recommended maximum number of threads that should be used for the current machine.
/// This will either run a heavy processing job to do the measurement in the specified folder, or refer to caches.
///
/// If the folder does not exist, it will be created, and not cleaned up afterwards.
/// </summary>
/// <param name="folder"></param>
/// <returns>Recommended maximum number of threads to use</returns>
2020-04-22 20:58:50 +00:00
public static async Task < int > RecommendQueueSize ( AbsolutePath folder )
2020-01-11 20:59:40 +00:00
{
2020-04-22 20:58:50 +00:00
using var queue = new WorkQueue ( ) ;
Utils . Log ( $"Benchmarking {folder}" ) ;
var raw_speed = await Utils . TestDiskSpeed ( queue , folder ) ;
Utils . Log ( $"{raw_speed.ToFileSizeString()}/sec for {folder}" ) ;
int speed = ( int ) ( raw_speed / 1024 / 1024 ) ;
2020-05-30 13:32:00 +00:00
// Less than 200, it's probably a HDD, so we can't go higher than 2
if ( speed < 200 ) return 2 ;
// SATA SSD, so stick with 8 thread maximum
if ( speed < 600 ) return Math . Min ( Environment . ProcessorCount , 8 ) ;
// Anything higher is probably a NVME or a really good SSD, so take off the reins
return Environment . ProcessorCount ;
2020-01-11 20:59:40 +00:00
}
/// <summary>
/// Constructs an observable of the number of threads to be used
///
/// Takes in a recommended amount (based off measuring the machine capabilities), and combines that with user preferences stored in subjects.
///
/// As user preferences change, the number of threads gets recalculated in the resulting observable
/// </summary>
/// <param name="recommendedCount">Maximum recommended number of threads</param>
/// <returns>Observable of number of threads to use based off recommendations and user preferences</returns>
2020-01-10 04:55:57 +00:00
public IObservable < int > ConstructDynamicNumThreads ( int recommendedCount )
{
return Observable . CombineLatest (
ManualCoreLimit ,
MaxCores ,
TargetUsagePercent ,
2020-02-10 23:45:17 +00:00
( manual , max , target ) = > CalculateThreadsToUse ( recommendedCount , manual , max , target . Value ) ) ;
2020-01-11 23:17:55 +00:00
}
/// <summary>
/// Calculates the number of threads to use, based off recommended values and user preferences
/// </summary>
public static int CalculateThreadsToUse (
int recommendedCount ,
bool manual ,
byte manualMax ,
double targetUsage )
{
if ( manual )
{
if ( recommendedCount > manualMax )
2020-01-10 04:55:57 +00:00
{
2020-01-11 23:17:55 +00:00
Utils . Log ( $"Only using {manualMax} due to user preferences." ) ;
}
return Math . Max ( 1 , Math . Min ( manualMax , recommendedCount ) ) ;
}
else if ( targetUsage < 1.0d & & targetUsage > = 0d )
{
var ret = ( int ) Math . Ceiling ( recommendedCount * targetUsage ) ;
return Math . Max ( 1 , ret ) ;
}
return recommendedCount ;
2020-01-10 04:55:57 +00:00
}
2019-12-04 01:26:26 +00:00
protected abstract Task < bool > _Begin ( CancellationToken cancel ) ;
2019-11-17 23:48:32 +00:00
public Task < bool > Begin ( )
{
2019-12-03 20:44:52 +00:00
if ( 1 = = Interlocked . CompareExchange ( ref _started , 1 , 1 ) )
2019-11-17 23:48:32 +00:00
{
throw new InvalidDataException ( "Can't start the processor twice" ) ;
}
2020-03-29 03:29:27 +00:00
Utils . Log ( "Starting Installer Task" ) ;
2019-12-04 01:26:26 +00:00
return Task . Run ( async ( ) = >
2021-02-04 03:48:30 +00:00
{
2019-11-17 23:48:32 +00:00
try
{
2020-03-29 03:29:27 +00:00
Utils . Log ( "Installation has Started" ) ;
2019-12-04 01:26:26 +00:00
_isRunning . OnNext ( true ) ;
return await _Begin ( _cancel . Token ) ;
2019-11-17 23:48:32 +00:00
}
2021-02-04 03:48:30 +00:00
catch ( Exception ex )
{
var _ = Metrics . Error ( this . GetType ( ) , ex ) ;
throw ;
}
2019-11-17 23:48:32 +00:00
finally
{
2021-01-06 15:47:59 +00:00
Utils . Log ( "Vacuuming databases" ) ;
HashCache . VacuumDatabase ( ) ;
PatchCache . VacuumDatabase ( ) ;
VirtualFile . VacuumDatabase ( ) ;
Utils . Log ( "Vacuuming completed" ) ;
2019-11-18 00:17:06 +00:00
_isRunning . OnNext ( false ) ;
2019-11-17 23:48:32 +00:00
}
} ) ;
}
2019-12-07 02:04:42 +00:00
public void Dispose ( )
2019-11-17 23:48:32 +00:00
{
2019-12-03 21:56:18 +00:00
_cancel . Cancel ( ) ;
2019-12-07 02:04:42 +00:00
Queue ? . Dispose ( ) ;
2019-11-18 00:17:06 +00:00
_isRunning . OnNext ( false ) ;
2019-11-17 23:48:32 +00:00
}
2020-01-10 04:55:57 +00:00
2020-07-27 21:33:45 +00:00
public void Abort ( )
{
_cancel . Cancel ( ) ;
}
2020-01-10 04:55:57 +00:00
public void Add ( IDisposable disposable ) = > _subs . Add ( disposable ) ;
2019-11-17 23:48:32 +00:00
}
}