From 2869de79c7b40bc9d464a43f6ee8e143ec0a18b7 Mon Sep 17 00:00:00 2001 From: Timothy Baldridge Date: Wed, 8 Jan 2020 21:42:25 -0700 Subject: [PATCH] Add Job manager and Nexus updater job --- Wabbajack.BuildServer/AppSettings.cs | 15 +++ Wabbajack.BuildServer/Extensions.cs | 11 ++ Wabbajack.BuildServer/GraphQL/JobType.cs | 1 + Wabbajack.BuildServer/JobManager.cs | 123 ++++++++++++++++++ .../Models/JobQueue/AJobPayload.cs | 6 +- .../Models/Jobs/GetNexusUpdatesJob.cs | 75 +++++++++++ Wabbajack.BuildServer/Models/Jobs/IndexJob.cs | 17 +-- Wabbajack.BuildServer/Startup.cs | 5 + Wabbajack.BuildServer/appsettings.json | 4 + 9 files changed, 244 insertions(+), 13 deletions(-) create mode 100644 Wabbajack.BuildServer/AppSettings.cs create mode 100644 Wabbajack.BuildServer/JobManager.cs create mode 100644 Wabbajack.BuildServer/Models/Jobs/GetNexusUpdatesJob.cs diff --git a/Wabbajack.BuildServer/AppSettings.cs b/Wabbajack.BuildServer/AppSettings.cs new file mode 100644 index 00000000..fc579416 --- /dev/null +++ b/Wabbajack.BuildServer/AppSettings.cs @@ -0,0 +1,15 @@ +using Microsoft.Extensions.Configuration; + +namespace Wabbajack.BuildServer +{ + public class AppSettings + { + public AppSettings(IConfiguration config) + { + config.Bind("WabbajackSettings", this); + } + + public string DownloadDir { get; set; } + public string ArchiveDir { get; set; } + } +} diff --git a/Wabbajack.BuildServer/Extensions.cs b/Wabbajack.BuildServer/Extensions.cs index bfd4a1a5..a5c5d550 100644 --- a/Wabbajack.BuildServer/Extensions.cs +++ b/Wabbajack.BuildServer/Extensions.cs @@ -2,8 +2,11 @@ using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Logging; using MongoDB.Driver; using MongoDB.Driver.Linq; +using Wabbajack.Common; namespace Wabbajack.BuildServer { @@ -13,5 +16,13 @@ namespace Wabbajack.BuildServer { return (await coll.AsQueryable().Where(expr).Take(1).ToListAsync()).FirstOrDefault(); } + + public static void UseJobManager(this IApplicationBuilder b) + { + var manager = (JobManager)b.ApplicationServices.GetService(typeof(JobManager)); + var tsk = manager.JobScheduler(); + + manager.StartJobRunners(); + } } } diff --git a/Wabbajack.BuildServer/GraphQL/JobType.cs b/Wabbajack.BuildServer/GraphQL/JobType.cs index da176fad..e2bbb1d7 100644 --- a/Wabbajack.BuildServer/GraphQL/JobType.cs +++ b/Wabbajack.BuildServer/GraphQL/JobType.cs @@ -9,6 +9,7 @@ namespace Wabbajack.BuildServer.GraphQL { Name = "Job"; Field(x => x.Id, type: typeof(IdGraphType)).Description("Unique Id of the Job"); + Field(x => x.Payload.Description).Description("Description of the job's behavior"); Field(x => x.Created, type: typeof(DateTimeGraphType)).Description("Creation time of the Job"); Field(x => x.Started, type: typeof(DateTimeGraphType)).Description("Started time of the Job"); Field(x => x.Ended, type: typeof(DateTimeGraphType)).Description("Ended time of the Job"); diff --git a/Wabbajack.BuildServer/JobManager.cs b/Wabbajack.BuildServer/JobManager.cs new file mode 100644 index 00000000..bd2644d0 --- /dev/null +++ b/Wabbajack.BuildServer/JobManager.cs @@ -0,0 +1,123 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; +using MongoDB.Driver.Linq; +using Wabbajack.BuildServer.Models; +using Wabbajack.BuildServer.Models.JobQueue; +using Wabbajack.BuildServer.Models.Jobs; +using Wabbajack.Common; + +namespace Wabbajack.BuildServer +{ + public class JobManager + { + protected readonly ILogger Logger; + protected readonly DBContext Db; + protected readonly AppSettings Settings; + + public JobManager(ILogger logger, DBContext db, AppSettings settings) + { + Db = db; + Logger = logger; + Settings = settings; + } + + public void StartJobRunners() + { + for (var idx = 0; idx < 2; idx++) + { + Task.Run(async () => + { + while (true) + { + try + { + var job = await Job.GetNext(Db); + if (job == null) + { + await Task.Delay(5000); + continue; + } + + Logger.Log(LogLevel.Information, $"Starting Job: {job.Payload.Description}"); + JobResult result; + try + { + result = await job.Payload.Execute(Db, Settings); + } + catch (Exception ex) + { + Logger.Log(LogLevel.Error, ex, $"Error while running job: {job.Payload.Description}"); + result = JobResult.Error(ex); + } + + await Job.Finish(Db, job, result); + } + catch (Exception ex) + { + Logger.Log(LogLevel.Error, ex, $"Error getting or updating Job"); + + } + } + }); + } + } + + public async Task JobScheduler() + { + Utils.LogMessages.Subscribe(msg => Logger.Log(LogLevel.Information, msg.ToString())); + while (true) + { + await KillOrphanedJobs(); + await PollNexusMods(); + await Task.Delay(10000); + } + } + + private async Task KillOrphanedJobs() + { + try + { + var started = await Db.Jobs.AsQueryable() + .Where(j => j.Started != null && j.Ended == null) + .ToListAsync(); + foreach (var job in started) + { + var runtime = DateTime.Now - job.Started; + if (runtime > TimeSpan.FromMinutes(30)) + { + await Job.Finish(Db, job, JobResult.Error(new Exception($"Timeout after {runtime.Value.TotalMinutes}"))); + } + } + } + catch (Exception ex) + { + Logger.Log(LogLevel.Error, ex, "Error in JobScheduler when scheduling GetNexusUpdatesJob"); + } + } + + private async Task PollNexusMods() + { + try + { + var updaters = await Db.Jobs.AsQueryable() + .Where(j => j.Payload is GetNexusUpdatesJob) + .Where(j => j.Started == null) + .OrderBy(j => j.Created) + .ToListAsync(); + if (updaters.Count == 0) + { + await Db.Jobs.InsertOneAsync(new Job + { + Payload = new GetNexusUpdatesJob() + }); + } + } + catch (Exception ex) + { + Logger.Log(LogLevel.Error, ex, "Error in JobScheduler when scheduling GetNexusUpdatesJob"); + } + } + } +} diff --git a/Wabbajack.BuildServer/Models/JobQueue/AJobPayload.cs b/Wabbajack.BuildServer/Models/JobQueue/AJobPayload.cs index 48b37775..d108fba4 100644 --- a/Wabbajack.BuildServer/Models/JobQueue/AJobPayload.cs +++ b/Wabbajack.BuildServer/Models/JobQueue/AJobPayload.cs @@ -12,8 +12,8 @@ namespace Wabbajack.BuildServer.Models.JobQueue { public static List KnownSubTypes = new List { - typeof(IndexJob) - + typeof(IndexJob), + typeof(GetNexusUpdatesJob) }; public static Dictionary TypeToName { get; set; } public static Dictionary NameToType { get; set; } @@ -24,7 +24,7 @@ namespace Wabbajack.BuildServer.Models.JobQueue public virtual bool UsesNexus { get; } = false; - public abstract Task Execute(DBContext db); + public abstract Task Execute(DBContext db, AppSettings settings); static AJobPayload() { diff --git a/Wabbajack.BuildServer/Models/Jobs/GetNexusUpdatesJob.cs b/Wabbajack.BuildServer/Models/Jobs/GetNexusUpdatesJob.cs new file mode 100644 index 00000000..54028fab --- /dev/null +++ b/Wabbajack.BuildServer/Models/Jobs/GetNexusUpdatesJob.cs @@ -0,0 +1,75 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Wabbajack.BuildServer.Models.JobQueue; +using Wabbajack.Common; +using Wabbajack.Lib.NexusApi; +using MongoDB.Driver; + + +namespace Wabbajack.BuildServer.Models.Jobs +{ + public class GetNexusUpdatesJob : AJobPayload + { + public override string Description => "Poll the nexus for updated mods, and clean any references to those mods"; + + public override async Task Execute(DBContext db, AppSettings settings) + { + var api = await NexusApiClient.Get(); + + var gameTasks = GameRegistry.Games.Values + .Where(game => game.NexusName != null) + .Select(async game => + { + return (game, + mods: await api.Get>( + $"https://api.nexusmods.com/v1/games/{game.NexusName}/mods/updated.json?period=1m")); + }) + .Select(async rTask => + { + var (game, mods) = await rTask; + return mods.Select(mod => new { game = game, mod = mod }); + }).ToList(); + + Utils.Log($"Getting update list for {gameTasks.Count} games"); + + var purge = (await Task.WhenAll(gameTasks)) + .SelectMany(i => i) + .ToList(); + + Utils.Log($"Found {purge.Count} updated mods in the last month"); + using (var queue = new WorkQueue()) + { + var collected = await purge.Select(d => + { + var a = d.mod.latest_file_update.AsUnixTime(); + // Mod activity could hide files + var b = d.mod.latest_mod_activity.AsUnixTime(); + + return new {Game = d.game.NexusName, Date = (a > b ? a : b), ModId = d.mod.mod_id.ToString()}; + }).PMap(queue, async t => + { + var resultA = await db.NexusModInfos.DeleteManyAsync(f => + f.Game == t.Game && f.ModId == t.ModId && f.LastCheckedUTC <= t.Date); + var resultB = await db.NexusModFiles.DeleteManyAsync(f => + f.Game == t.Game && f.ModId == t.ModId && f.LastCheckedUTC <= t.Date); + var resultC = await db.NexusFileInfos.DeleteManyAsync(f => + f.Game == t.Game && f.ModId == t.ModId && f.LastCheckedUTC <= t.Date); + + return resultA.DeletedCount + resultB.DeletedCount + resultC.DeletedCount; + }); + + Utils.Log($"Purged {collected.Sum()} cache entries"); + } + + return JobResult.Success(); + } + + class UpdatedMod + { + public long mod_id; + public long latest_file_update; + public long latest_mod_activity; + } + } +} diff --git a/Wabbajack.BuildServer/Models/Jobs/IndexJob.cs b/Wabbajack.BuildServer/Models/Jobs/IndexJob.cs index 69c679b1..857a7500 100644 --- a/Wabbajack.BuildServer/Models/Jobs/IndexJob.cs +++ b/Wabbajack.BuildServer/Models/Jobs/IndexJob.cs @@ -19,13 +19,10 @@ namespace Wabbajack.BuildServer.Models.Jobs public class IndexJob : AJobPayload { public Archive Archive { get; set; } - public override string Description { get; } = "Validate and index an archive"; + public override string Description => $"Index ${Archive.State.PrimaryKey} and save the download/file state"; public override bool UsesNexus { get => Archive.State is NexusDownloader.State; } - public override async Task Execute(DBContext db) + public override async Task Execute(DBContext db, AppSettings settings) { - - /* - var pk = new List(); pk.Add(AbstractDownloadState.TypeToName[Archive.State.GetType()]); pk.AddRange(Archive.State.PrimaryKey); @@ -38,13 +35,13 @@ namespace Wabbajack.BuildServer.Models.Jobs string fileName = Archive.Name; string folder = Guid.NewGuid().ToString(); Utils.Log($"Indexer is downloading {fileName}"); - var downloadDest = Path.Combine(Server.Config.Indexer.DownloadDir, folder, fileName); + var downloadDest = Path.Combine(settings.DownloadDir, folder, fileName); await Archive.State.Download(downloadDest); using (var queue = new WorkQueue()) { var vfs = new Context(queue, true); - await vfs.AddRoot(Path.Combine(Server.Config.Indexer.DownloadDir, folder)); + await vfs.AddRoot(Path.Combine(settings.DownloadDir, folder)); var archive = vfs.Index.ByRootPath.First(); var converted = ConvertArchive(new List(), archive.Value); try @@ -63,15 +60,15 @@ namespace Wabbajack.BuildServer.Models.Jobs IsValid = true }); - var to_path = Path.Combine(Server.Config.Indexer.ArchiveDir, + var to_path = Path.Combine(settings.ArchiveDir, $"{Path.GetFileName(fileName)}_{archive.Value.Hash.FromBase64().ToHex()}_{Path.GetExtension(fileName)}"); if (File.Exists(to_path)) File.Delete(downloadDest); else File.Move(downloadDest, to_path); - Utils.DeleteDirectory(Path.Combine(Server.Config.Indexer.DownloadDir, folder)); + Utils.DeleteDirectory(Path.Combine(settings.DownloadDir, folder)); } -*/ + return JobResult.Success(); } diff --git a/Wabbajack.BuildServer/Startup.cs b/Wabbajack.BuildServer/Startup.cs index 8853f069..416997ad 100644 --- a/Wabbajack.BuildServer/Startup.cs +++ b/Wabbajack.BuildServer/Startup.cs @@ -48,6 +48,8 @@ namespace Wabbajack.BuildServer }); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddControllers(o => { @@ -55,6 +57,8 @@ namespace Wabbajack.BuildServer { o.SerializerSettings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; }); + + } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -79,6 +83,7 @@ namespace Wabbajack.BuildServer }); app.UseRouting(); + app.UseJobManager(); app.UseAuthentication(); app.UseAuthorization(); app.UseFileServer(new FileServerOptions diff --git a/Wabbajack.BuildServer/appsettings.json b/Wabbajack.BuildServer/appsettings.json index f93c14d6..39adbc69 100644 --- a/Wabbajack.BuildServer/appsettings.json +++ b/Wabbajack.BuildServer/appsettings.json @@ -26,5 +26,9 @@ "Metrics": "metrics" } }, + "WabbajackSettings": { + "DownloadDir": "c:\\tmp\\downloads", + "ArchiveDir": "c:\\archives" + }, "AllowedHosts": "*" }