Add Job manager and Nexus updater job

This commit is contained in:
Timothy Baldridge 2020-01-08 21:42:25 -07:00
parent 103fd23eba
commit d9585f8740
9 changed files with 244 additions and 13 deletions

View File

@ -0,0 +1,15 @@
using Microsoft.Extensions.Configuration;
namespace Wabbajack.BuildServer
{
public class AppSettings
{
public AppSettings(IConfiguration config)
{
config.Bind("WabbajackSettings", this);
}
public string DownloadDir { get; set; }
public string ArchiveDir { get; set; }
}
}

View File

@ -2,8 +2,11 @@
using System.Linq; using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Logging;
using MongoDB.Driver; using MongoDB.Driver;
using MongoDB.Driver.Linq; using MongoDB.Driver.Linq;
using Wabbajack.Common;
namespace Wabbajack.BuildServer namespace Wabbajack.BuildServer
{ {
@ -13,5 +16,13 @@ namespace Wabbajack.BuildServer
{ {
return (await coll.AsQueryable().Where(expr).Take(1).ToListAsync()).FirstOrDefault(); return (await coll.AsQueryable().Where(expr).Take(1).ToListAsync()).FirstOrDefault();
} }
public static void UseJobManager(this IApplicationBuilder b)
{
var manager = (JobManager)b.ApplicationServices.GetService(typeof(JobManager));
var tsk = manager.JobScheduler();
manager.StartJobRunners();
}
} }
} }

View File

@ -9,6 +9,7 @@ namespace Wabbajack.BuildServer.GraphQL
{ {
Name = "Job"; Name = "Job";
Field(x => x.Id, type: typeof(IdGraphType)).Description("Unique Id of the Job"); Field(x => x.Id, type: typeof(IdGraphType)).Description("Unique Id of the Job");
Field(x => x.Payload.Description).Description("Description of the job's behavior");
Field(x => x.Created, type: typeof(DateTimeGraphType)).Description("Creation time of the Job"); Field(x => x.Created, type: typeof(DateTimeGraphType)).Description("Creation time of the Job");
Field(x => x.Started, type: typeof(DateTimeGraphType)).Description("Started time of the Job"); Field(x => x.Started, type: typeof(DateTimeGraphType)).Description("Started time of the Job");
Field(x => x.Ended, type: typeof(DateTimeGraphType)).Description("Ended time of the Job"); Field(x => x.Ended, type: typeof(DateTimeGraphType)).Description("Ended time of the Job");

View File

@ -0,0 +1,123 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Wabbajack.BuildServer.Models;
using Wabbajack.BuildServer.Models.JobQueue;
using Wabbajack.BuildServer.Models.Jobs;
using Wabbajack.Common;
namespace Wabbajack.BuildServer
{
public class JobManager
{
protected readonly ILogger<JobManager> Logger;
protected readonly DBContext Db;
protected readonly AppSettings Settings;
public JobManager(ILogger<JobManager> logger, DBContext db, AppSettings settings)
{
Db = db;
Logger = logger;
Settings = settings;
}
public void StartJobRunners()
{
for (var idx = 0; idx < 2; idx++)
{
Task.Run(async () =>
{
while (true)
{
try
{
var job = await Job.GetNext(Db);
if (job == null)
{
await Task.Delay(5000);
continue;
}
Logger.Log(LogLevel.Information, $"Starting Job: {job.Payload.Description}");
JobResult result;
try
{
result = await job.Payload.Execute(Db, Settings);
}
catch (Exception ex)
{
Logger.Log(LogLevel.Error, ex, $"Error while running job: {job.Payload.Description}");
result = JobResult.Error(ex);
}
await Job.Finish(Db, job, result);
}
catch (Exception ex)
{
Logger.Log(LogLevel.Error, ex, $"Error getting or updating Job");
}
}
});
}
}
public async Task JobScheduler()
{
Utils.LogMessages.Subscribe(msg => Logger.Log(LogLevel.Information, msg.ToString()));
while (true)
{
await KillOrphanedJobs();
await PollNexusMods();
await Task.Delay(10000);
}
}
private async Task KillOrphanedJobs()
{
try
{
var started = await Db.Jobs.AsQueryable()
.Where(j => j.Started != null && j.Ended == null)
.ToListAsync();
foreach (var job in started)
{
var runtime = DateTime.Now - job.Started;
if (runtime > TimeSpan.FromMinutes(30))
{
await Job.Finish(Db, job, JobResult.Error(new Exception($"Timeout after {runtime.Value.TotalMinutes}")));
}
}
}
catch (Exception ex)
{
Logger.Log(LogLevel.Error, ex, "Error in JobScheduler when scheduling GetNexusUpdatesJob");
}
}
private async Task PollNexusMods()
{
try
{
var updaters = await Db.Jobs.AsQueryable()
.Where(j => j.Payload is GetNexusUpdatesJob)
.Where(j => j.Started == null)
.OrderBy(j => j.Created)
.ToListAsync();
if (updaters.Count == 0)
{
await Db.Jobs.InsertOneAsync(new Job
{
Payload = new GetNexusUpdatesJob()
});
}
}
catch (Exception ex)
{
Logger.Log(LogLevel.Error, ex, "Error in JobScheduler when scheduling GetNexusUpdatesJob");
}
}
}
}

View File

@ -12,8 +12,8 @@ namespace Wabbajack.BuildServer.Models.JobQueue
{ {
public static List<Type> KnownSubTypes = new List<Type> public static List<Type> KnownSubTypes = new List<Type>
{ {
typeof(IndexJob) typeof(IndexJob),
typeof(GetNexusUpdatesJob)
}; };
public static Dictionary<Type, string> TypeToName { get; set; } public static Dictionary<Type, string> TypeToName { get; set; }
public static Dictionary<string, Type> NameToType { get; set; } public static Dictionary<string, Type> NameToType { get; set; }
@ -24,7 +24,7 @@ namespace Wabbajack.BuildServer.Models.JobQueue
public virtual bool UsesNexus { get; } = false; public virtual bool UsesNexus { get; } = false;
public abstract Task<JobResult> Execute(DBContext db); public abstract Task<JobResult> Execute(DBContext db, AppSettings settings);
static AJobPayload() static AJobPayload()
{ {

View File

@ -0,0 +1,75 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Wabbajack.BuildServer.Models.JobQueue;
using Wabbajack.Common;
using Wabbajack.Lib.NexusApi;
using MongoDB.Driver;
namespace Wabbajack.BuildServer.Models.Jobs
{
public class GetNexusUpdatesJob : AJobPayload
{
public override string Description => "Poll the nexus for updated mods, and clean any references to those mods";
public override async Task<JobResult> Execute(DBContext db, AppSettings settings)
{
var api = await NexusApiClient.Get();
var gameTasks = GameRegistry.Games.Values
.Where(game => game.NexusName != null)
.Select(async game =>
{
return (game,
mods: await api.Get<List<UpdatedMod>>(
$"https://api.nexusmods.com/v1/games/{game.NexusName}/mods/updated.json?period=1m"));
})
.Select(async rTask =>
{
var (game, mods) = await rTask;
return mods.Select(mod => new { game = game, mod = mod });
}).ToList();
Utils.Log($"Getting update list for {gameTasks.Count} games");
var purge = (await Task.WhenAll(gameTasks))
.SelectMany(i => i)
.ToList();
Utils.Log($"Found {purge.Count} updated mods in the last month");
using (var queue = new WorkQueue())
{
var collected = await purge.Select(d =>
{
var a = d.mod.latest_file_update.AsUnixTime();
// Mod activity could hide files
var b = d.mod.latest_mod_activity.AsUnixTime();
return new {Game = d.game.NexusName, Date = (a > b ? a : b), ModId = d.mod.mod_id.ToString()};
}).PMap(queue, async t =>
{
var resultA = await db.NexusModInfos.DeleteManyAsync(f =>
f.Game == t.Game && f.ModId == t.ModId && f.LastCheckedUTC <= t.Date);
var resultB = await db.NexusModFiles.DeleteManyAsync(f =>
f.Game == t.Game && f.ModId == t.ModId && f.LastCheckedUTC <= t.Date);
var resultC = await db.NexusFileInfos.DeleteManyAsync(f =>
f.Game == t.Game && f.ModId == t.ModId && f.LastCheckedUTC <= t.Date);
return resultA.DeletedCount + resultB.DeletedCount + resultC.DeletedCount;
});
Utils.Log($"Purged {collected.Sum()} cache entries");
}
return JobResult.Success();
}
class UpdatedMod
{
public long mod_id;
public long latest_file_update;
public long latest_mod_activity;
}
}
}

View File

@ -19,13 +19,10 @@ namespace Wabbajack.BuildServer.Models.Jobs
public class IndexJob : AJobPayload public class IndexJob : AJobPayload
{ {
public Archive Archive { get; set; } public Archive Archive { get; set; }
public override string Description { get; } = "Validate and index an archive"; public override string Description => $"Index ${Archive.State.PrimaryKey} and save the download/file state";
public override bool UsesNexus { get => Archive.State is NexusDownloader.State; } public override bool UsesNexus { get => Archive.State is NexusDownloader.State; }
public override async Task<JobResult> Execute(DBContext db) public override async Task<JobResult> Execute(DBContext db, AppSettings settings)
{ {
/*
var pk = new List<object>(); var pk = new List<object>();
pk.Add(AbstractDownloadState.TypeToName[Archive.State.GetType()]); pk.Add(AbstractDownloadState.TypeToName[Archive.State.GetType()]);
pk.AddRange(Archive.State.PrimaryKey); pk.AddRange(Archive.State.PrimaryKey);
@ -38,13 +35,13 @@ namespace Wabbajack.BuildServer.Models.Jobs
string fileName = Archive.Name; string fileName = Archive.Name;
string folder = Guid.NewGuid().ToString(); string folder = Guid.NewGuid().ToString();
Utils.Log($"Indexer is downloading {fileName}"); Utils.Log($"Indexer is downloading {fileName}");
var downloadDest = Path.Combine(Server.Config.Indexer.DownloadDir, folder, fileName); var downloadDest = Path.Combine(settings.DownloadDir, folder, fileName);
await Archive.State.Download(downloadDest); await Archive.State.Download(downloadDest);
using (var queue = new WorkQueue()) using (var queue = new WorkQueue())
{ {
var vfs = new Context(queue, true); var vfs = new Context(queue, true);
await vfs.AddRoot(Path.Combine(Server.Config.Indexer.DownloadDir, folder)); await vfs.AddRoot(Path.Combine(settings.DownloadDir, folder));
var archive = vfs.Index.ByRootPath.First(); var archive = vfs.Index.ByRootPath.First();
var converted = ConvertArchive(new List<IndexedFile>(), archive.Value); var converted = ConvertArchive(new List<IndexedFile>(), archive.Value);
try try
@ -63,15 +60,15 @@ namespace Wabbajack.BuildServer.Models.Jobs
IsValid = true IsValid = true
}); });
var to_path = Path.Combine(Server.Config.Indexer.ArchiveDir, var to_path = Path.Combine(settings.ArchiveDir,
$"{Path.GetFileName(fileName)}_{archive.Value.Hash.FromBase64().ToHex()}_{Path.GetExtension(fileName)}"); $"{Path.GetFileName(fileName)}_{archive.Value.Hash.FromBase64().ToHex()}_{Path.GetExtension(fileName)}");
if (File.Exists(to_path)) if (File.Exists(to_path))
File.Delete(downloadDest); File.Delete(downloadDest);
else else
File.Move(downloadDest, to_path); File.Move(downloadDest, to_path);
Utils.DeleteDirectory(Path.Combine(Server.Config.Indexer.DownloadDir, folder)); Utils.DeleteDirectory(Path.Combine(settings.DownloadDir, folder));
} }
*/
return JobResult.Success(); return JobResult.Success();
} }

View File

@ -48,6 +48,8 @@ namespace Wabbajack.BuildServer
}); });
services.AddSingleton<DBContext>(); services.AddSingleton<DBContext>();
services.AddSingleton<JobManager>();
services.AddSingleton<AppSettings>();
services.AddControllers(o => services.AddControllers(o =>
{ {
@ -55,6 +57,8 @@ namespace Wabbajack.BuildServer
{ {
o.SerializerSettings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; o.SerializerSettings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
}); });
} }
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@ -79,6 +83,7 @@ namespace Wabbajack.BuildServer
}); });
app.UseRouting(); app.UseRouting();
app.UseJobManager();
app.UseAuthentication(); app.UseAuthentication();
app.UseAuthorization(); app.UseAuthorization();
app.UseFileServer(new FileServerOptions app.UseFileServer(new FileServerOptions

View File

@ -26,5 +26,9 @@
"Metrics": "metrics" "Metrics": "metrics"
} }
}, },
"WabbajackSettings": {
"DownloadDir": "c:\\tmp\\downloads",
"ArchiveDir": "c:\\archives"
},
"AllowedHosts": "*" "AllowedHosts": "*"
} }