diff --git a/Wabbajack.Common/IEnumerableExtensions.cs b/Wabbajack.Common/IEnumerableExtensions.cs index ee8141fa..3958df0d 100644 --- a/Wabbajack.Common/IEnumerableExtensions.cs +++ b/Wabbajack.Common/IEnumerableExtensions.cs @@ -58,7 +58,14 @@ public static class IEnumerableExtensions return data; } - public static IEnumerable> Partition(this IEnumerable coll, int size) + /// + /// Splits the collection into `size` parts + /// + /// + /// + /// + /// + public static IEnumerable> Partition(this IEnumerable coll, int count) { var asList = coll.ToList(); @@ -70,7 +77,30 @@ public static class IEnumerableExtensions } } - return Enumerable.Range(0, size).Select(offset => SkipEnumerable(asList, offset, size)); + return Enumerable.Range(0, count).Select(offset => SkipEnumerable(asList, offset, count)); + } + + /// + /// Split the collection into `size` parts + /// + /// + /// + /// + /// + public static IEnumerable> Batch(this IEnumerable coll, int size) + { + List current = new(); + foreach (var itm in coll) + { + current.Add(itm); + if (current.Count == size) + { + yield return current; + current = new List(); + } + } + if (current.Count > 0) + yield return current; } diff --git a/Wabbajack.Networking.WabbajackClientApi/Client.cs b/Wabbajack.Networking.WabbajackClientApi/Client.cs index d32d7d9f..6fe37c3c 100644 --- a/Wabbajack.Networking.WabbajackClientApi/Client.cs +++ b/Wabbajack.Networking.WabbajackClientApi/Client.cs @@ -93,7 +93,7 @@ public class Client { _logger.LogError("HTTP Error: {Result}", result); await SendMetric("rebound", "Error", false); - Environment.Exit(0); + // Environment.Exit(0); } } diff --git a/Wabbajack.Server/AppSettings.cs b/Wabbajack.Server/AppSettings.cs index 5c51ebeb..6b16a846 100644 --- a/Wabbajack.Server/AppSettings.cs +++ b/Wabbajack.Server/AppSettings.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Configuration; +using Amazon.S3; +using Microsoft.Extensions.Configuration; using Wabbajack.Paths; namespace Wabbajack.BuildServer; @@ -26,8 +27,6 @@ public class AppSettings public string DiscordKey { get; set; } - public string AuthoredFilesFolder { get; set; } - public string PatchesFilesFolder { get; set; } public string MirrorFilesFolder { get; set; } public string NexusCacheFolder { get; set; } @@ -37,6 +36,19 @@ public class AppSettings public CouchDBSetting CesiDB { get; set; } public CouchDBSetting MetricsDB { get; set; } + + public S3Settings AuthoredFilesS3 { get; set; } +} + +public class S3Settings +{ + public string AccessKey { get; set; } + public string SecretKey { get; set; } + public string ServiceURL { get; set; } + + public string BucketName { get; set; } + + public string BucketCacheFile { get; set; } } public class CouchDBSetting diff --git a/Wabbajack.Server/Controllers/AuthorControls.cs b/Wabbajack.Server/Controllers/AuthorControls.cs index 0608c9c6..71cb8c64 100644 --- a/Wabbajack.Server/Controllers/AuthorControls.cs +++ b/Wabbajack.Server/Controllers/AuthorControls.cs @@ -147,7 +147,7 @@ public class AuthorControls : ControllerBase public async Task HomePage() { var user = User.FindFirstValue(ClaimTypes.Name); - var files = (await _authorFiles.AllAuthoredFiles()) + var files = _authorFiles.AllDefinitions .Where(af => af.Definition.Author == user) .Select(af => new { diff --git a/Wabbajack.Server/Controllers/AuthoredFiles.cs b/Wabbajack.Server/Controllers/AuthoredFiles.cs index 6abd80ed..107b7956 100644 --- a/Wabbajack.Server/Controllers/AuthoredFiles.cs +++ b/Wabbajack.Server/Controllers/AuthoredFiles.cs @@ -68,8 +68,7 @@ public class AuthoredFiles : ControllerBase $"Hashes don't match for index {index}. Sizes ({ms.Length} vs {part.Size}). Hashes ({hash} vs {part.Hash}"); ms.Position = 0; - await using var partStream = await _authoredFiles.CreatePart(definition.MungedName, (int)index); - await ms.CopyToAsync(partStream, token); + await _authoredFiles.WritePart(definition.MungedName, (int) index, ms); return Ok(part.Hash.ToBase64()); } @@ -123,7 +122,7 @@ public class AuthoredFiles : ControllerBase public async Task DeleteUpload(string serverAssignedUniqueId) { var user = User.FindFirstValue(ClaimTypes.Name); - var definition = (await _authoredFiles.AllAuthoredFiles()) + var definition = _authoredFiles.AllDefinitions .First(f => f.Definition.ServerAssignedUniqueId == serverAssignedUniqueId) .Definition; if (definition.Author != user) @@ -145,12 +144,12 @@ public class AuthoredFiles : ControllerBase [Route("")] public async Task UploadedFilesGet() { - var files = await _authoredFiles.AllAuthoredFiles(); + var files = _authoredFiles.AllDefinitions + .ToArray(); var response = _authoredFilesTemplate(new { Files = files.OrderByDescending(f => f.Updated).ToArray(), - TotalSpace = _authoredFiles.TotalSpace.Bytes().Humanize("#.##"), - FreeSpace = _authoredFiles.FreeSpace.Bytes().Humanize("#.##") + UsedSpace = _authoredFiles.UsedSpace.Bytes().Humanize("#.##"), }); return new ContentResult { @@ -172,10 +171,13 @@ public class AuthoredFiles : ControllerBase Response.Headers.ContentType = new StringValues("application/octet-stream"); Response.Headers.ContentLength = definition.Size; Response.Headers.ETag = definition.MungedName + "_direct"; - foreach (var part in definition.Parts) + + foreach (var part in definition.Parts.OrderBy(p => p.Index)) { - await using var partStream = await _authoredFiles.StreamForPart(mungedName, (int)part.Index); - await partStream.CopyToAsync(Response.Body); + await _authoredFiles.StreamForPart(mungedName, (int)part.Index, async stream => + { + await stream.CopyToAsync(Response.Body); + }); } } } \ No newline at end of file diff --git a/Wabbajack.Server/DataModels/AuthorFiles.cs b/Wabbajack.Server/DataModels/AuthorFiles.cs index 5f76efb9..74726b9f 100644 --- a/Wabbajack.Server/DataModels/AuthorFiles.cs +++ b/Wabbajack.Server/DataModels/AuthorFiles.cs @@ -1,6 +1,11 @@ +using System.Collections.Concurrent; +using System.Diagnostics; using System.IO.Compression; using System.Web; +using Amazon.S3; +using Amazon.S3.Model; using Microsoft.Extensions.Logging; +using Microsoft.IO; using Wabbajack.BuildServer; using Wabbajack.Common; using Wabbajack.DTOs.CDN; @@ -16,88 +21,207 @@ public class AuthorFiles private readonly ILogger _logger; private readonly AppSettings _settings; private readonly DTOSerializer _dtos; - private Dictionary _byServerId = new(); + private ConcurrentDictionary _byServerId = new(); + private readonly IAmazonS3 _s3; + private readonly ConcurrentDictionary _fileCache; + private readonly string _bucketName; + private ConcurrentDictionary _allObjects = new(); + private HashSet _mangledNames; + private readonly RecyclableMemoryStreamManager _streamPool; + private readonly HttpClient _httpClient; - public AbsolutePath AuthorFilesLocation => _settings.AuthoredFilesFolder.ToAbsolutePath(); - - public AuthorFiles(ILogger logger, AppSettings settings, DTOSerializer dtos) + private Uri _baseUri => new($"https://r2.wabbajack.org/"); + + public AuthorFiles(ILogger logger, AppSettings settings, DTOSerializer dtos, IAmazonS3 s3, HttpClient client) { + _httpClient = client; + _s3 = s3; _logger = logger; _settings = settings; _dtos = dtos; + _fileCache = new ConcurrentDictionary(); + _bucketName = settings.AuthoredFilesS3.BucketName; + _ = PrimeCache(); + _streamPool = new RecyclableMemoryStreamManager(); } - public IEnumerable AllDefinitions => AuthorFilesLocation.EnumerateFiles("definition.json.gz"); - - /// - /// Total unused space available for authored files - /// - public long FreeSpace => new DriveInfo(AuthorFilesLocation.ToString()).AvailableFreeSpace; - - /// - /// Total space available for authored files - /// - public long TotalSpace => new DriveInfo(AuthorFilesLocation.ToString()).TotalSize; - - /// - /// - /// - /// - - public async Task AllAuthoredFiles() + private async Task PrimeCache() { - var defs = new List(); - foreach (var file in AllDefinitions) + try { - defs.Add(new FileDefinitionMetadata + var cacheFile = _settings.AuthoredFilesS3.BucketCacheFile.ToAbsolutePath(); + if (!cacheFile.FileExists()) { - Definition = await ReadDefinition(file), - Updated = file.LastModifiedUtc() + var allObjects = await AllObjects().ToArrayAsync(); + foreach (var obje in allObjects) + { + _allObjects.TryAdd(obje.Key.ToRelativePath(), obje.LastModified.ToFileTimeUtc()); + } + SaveBucketCacheFile(cacheFile); + } + else + { + LoadBucketCacheFile(cacheFile); + } + + + _mangledNames = _allObjects + .Where(f => f.Key.EndsWith("definition.json.gz")) + .Select(f => f.Key.Parent) + .ToHashSet(); + + await Parallel.ForEachAsync(_mangledNames, async (name, _) => + { + if (!_allObjects.TryGetValue(name.Combine("definition.json.gz"), out var value)) + return; + + _logger.LogInformation("Priming {Name}", name); + var definition = await PrimeDefinition(name); + var metadata = new FileDefinitionMetadata() + { + Definition = definition, + Updated = DateTime.FromFileTimeUtc(value) + }; + _fileCache.TryAdd(definition.MungedName, metadata); + _byServerId.TryAdd(definition.ServerAssignedUniqueId!, definition); }); + + _logger.LogInformation("Finished priming cache, {Count} files {Size} GB cached", _fileCache.Count, + _fileCache.Sum(s => s.Value.Definition.Size) / (1024 * 1024 * 1024)); + + } + catch (Exception ex) + { + _logger.LogCritical(ex, "Failed to prime cache"); + } + } + + private void SaveBucketCacheFile(AbsolutePath cacheFile) + { + using var file = cacheFile.Open(FileMode.Create, FileAccess.Write); + using var sw = new StreamWriter(file); + foreach(var entry in _allObjects) + { + sw.WriteLine($"{entry.Key}||{entry.Value}"); + } + } + + private void LoadBucketCacheFile(AbsolutePath cacheFile) + { + using var file = cacheFile.Open(FileMode.Open, FileAccess.Read); + using var sr = new StreamReader(file); + while (!sr.EndOfStream) + { + var line = sr.ReadLine(); + var parts = line!.Split("||"); + _allObjects.TryAdd(parts[0].ToRelativePath(), long.Parse(parts[1])); + } + } + + private async Task PrimeDefinition(RelativePath name) + { + var uri = _baseUri + $"{name}/definition.json.gz"; + using var response = await _httpClient.GetAsync(uri); + return await ReadDefinition(await response.Content.ReadAsStreamAsync()); + } + + private async IAsyncEnumerable AllObjects() + { + var sw = Stopwatch.StartNew(); + var total = 0; + _logger.Log(LogLevel.Information, "Listing all objects in S3"); + var results = await _s3.ListObjectsV2Async(new ListObjectsV2Request() + { + BucketName = _bucketName, + }); + TOP: + total += results.S3Objects.Count; + _logger.Log(LogLevel.Information, "Got {S3ObjectsCount} objects, {Total} total", results.S3Objects.Count, total); + foreach (var result in results.S3Objects) + { + yield return result; } - _byServerId = defs.ToDictionary(f => f.Definition.ServerAssignedUniqueId!, f => f.Definition); - return defs.ToArray(); + if (results.IsTruncated) + { + results = await _s3.ListObjectsV2Async(new ListObjectsV2Request + { + ContinuationToken = results.NextContinuationToken, + BucketName = _bucketName, + }); + goto TOP; + } + _logger.LogInformation("Finished listing all objects in S3 in {Elapsed}", sw.Elapsed); } - public async Task StreamForPart(string mungedName, int part) + public IEnumerable AllDefinitions => _fileCache.Values; + + /// + /// Used space in bytes + /// + public long UsedSpace => _fileCache.Sum(s => s.Value.Definition.Size); + + public async Task StreamForPart(string mungedName, int part, Func func) { - return AuthorFilesLocation.Combine(mungedName, "parts", part.ToString()).Open(FileMode.Open); + var definition = _fileCache[mungedName].Definition; + + if (part >= definition.Parts.Length) + throw new ArgumentOutOfRangeException(nameof(part)); + + var uri = _baseUri + $"{mungedName}/parts/{part}"; + using var response = await _httpClient.GetAsync(uri); + await func(await response.Content.ReadAsStreamAsync()); } - public async Task CreatePart(string mungedName, int part) + public async Task WritePart(string mungedName, int part, Stream ms) { - return AuthorFilesLocation.Combine(mungedName, "parts", part.ToString()).Open(FileMode.Create, FileAccess.Write, FileShare.None); + await _s3.PutObjectAsync(new PutObjectRequest + { + BucketName = _bucketName, + Key = mungedName.ToRelativePath().Combine("parts", part.ToString()).ToString().Replace("\\", "/"), + InputStream = ms, + DisablePayloadSigning = true + }); } public async Task WriteDefinition(FileDefinition definition) { - var path = AuthorFilesLocation.Combine(definition.MungedName, "definition.json.gz"); - path.Parent.CreateDirectory(); - path.Parent.Combine("parts").CreateDirectory(); - await using var ms = new MemoryStream(); await using (var gz = new GZipStream(ms, CompressionLevel.Optimal, true)) { await _dtos.Serialize(definition, gz); } - - await path.WriteAllBytesAsync(ms.ToArray()); + ms.Position = 0; + + await _s3.PutObjectAsync(new PutObjectRequest + { + BucketName = _bucketName, + Key = definition.MungedName.ToRelativePath().Combine("definition.json.gz").ToString().Replace("\\", "/"), + InputStream = ms, + DisablePayloadSigning = true + }); + _fileCache.TryAdd(definition.MungedName, new FileDefinitionMetadata + { + Definition = definition, + Updated = DateTime.UtcNow + }); + _byServerId.TryAdd(definition.ServerAssignedUniqueId!, definition); } public async Task ReadDefinition(string mungedName) { - return await ReadDefinition(AuthorFilesLocation.Combine(mungedName, "definition.json.gz")); + return _fileCache[mungedName].Definition; } public bool IsDefinition(string mungedName) { - return AuthorFilesLocation.Combine(mungedName, "definition.json.gz").FileExists(); + return _fileCache.ContainsKey(mungedName); } - private async Task ReadDefinition(AbsolutePath file) + + private async Task ReadDefinition(Stream stream) { - var gz = new GZipStream(new MemoryStream(await file.ReadAllBytesAsync()), CompressionMode.Decompress); + var gz = new GZipStream(stream, CompressionMode.Decompress); var definition = (await _dtos.DeserializeAsync(gz))!; return definition; } @@ -111,15 +235,33 @@ public class AuthorFiles public async Task DeleteFile(FileDefinition definition) { - var folder = AuthorFilesLocation.Combine(definition.MungedName); - folder.DeleteDirectory(); + var allFiles = _allObjects.Where(f => f.Key.TopParent.ToString() == definition.MungedName) + .Select(f => f.Key).ToList(); + foreach (var batch in allFiles.Batch(512)) + { + var batchedArray = batch.ToHashSet(); + _logger.LogInformation("Deleting {Count} files for prefix {Prefix}", batchedArray.Count, definition.MungedName); + await _s3.DeleteObjectsAsync(new DeleteObjectsRequest + { + BucketName = _bucketName, + + Objects = batchedArray.Select(f => new KeyVersion + { + Key = f.ToString().Replace("\\", "/") + }).ToList() + }); + foreach (var key in batchedArray) + { + _allObjects.TryRemove(key, out _); + } + } + + _byServerId.TryRemove(definition.ServerAssignedUniqueId!, out _); + _fileCache.TryRemove(definition.MungedName, out _); } - public async Task ReadDefinitionForServerId(string serverAssignedUniqueId) + public async ValueTask ReadDefinitionForServerId(string serverAssignedUniqueId) { - if (_byServerId.TryGetValue(serverAssignedUniqueId, out var found)) - return found; - await AllAuthoredFiles(); return _byServerId[serverAssignedUniqueId]; } diff --git a/Wabbajack.Server/Resources/Reports/AuthoredFiles.html b/Wabbajack.Server/Resources/Reports/AuthoredFiles.html index 42a4ed20..fe85202d 100644 --- a/Wabbajack.Server/Resources/Reports/AuthoredFiles.html +++ b/Wabbajack.Server/Resources/Reports/AuthoredFiles.html @@ -11,7 +11,7 @@

Authored Files:

-

{{$.FreeSpace}} remaining of {{$.TotalSpace}}

+

{{$.UsedSpace}}

diff --git a/Wabbajack.Server/Startup.cs b/Wabbajack.Server/Startup.cs index 30ba2c14..57923c89 100644 --- a/Wabbajack.Server/Startup.cs +++ b/Wabbajack.Server/Startup.cs @@ -5,6 +5,8 @@ using System.Runtime.InteropServices; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; +using Amazon.Runtime; +using Amazon.S3; using cesi.DTOs; using CouchDB.Driver; using CouchDB.Driver.Options; @@ -39,10 +41,10 @@ using Wabbajack.Server.Services; using Wabbajack.Services.OSIntegrated.TokenProviders; using Wabbajack.Networking.WabbajackClientApi; using Wabbajack.Paths.IO; -using Wabbajack.Server.DTOs; using Wabbajack.VFS; using YamlDotNet.Serialization.NamingConventions; using Client = Wabbajack.Networking.GitHub.Client; +using Metric = Wabbajack.Server.DTOs.Metric; namespace Wabbajack.Server; @@ -93,6 +95,16 @@ public class Startup services.AddSingleton(); services.AddAllSingleton(); services.AddDownloadDispatcher(useLoginDownloaders:false, useProxyCache:false); + services.AddSingleton(s => + { + var appSettings = s.GetRequiredService(); + var settings = new BasicAWSCredentials(appSettings.AuthoredFilesS3.AccessKey, + appSettings.AuthoredFilesS3.SecretKey); + return new AmazonS3Client(settings, new AmazonS3Config + { + ServiceURL = appSettings.AuthoredFilesS3.ServiceURL, + }); + }); services.AddTransient(s => { var settings = s.GetRequiredService(); @@ -243,5 +255,7 @@ public class Startup // Trigger the internal update code app.ApplicationServices.GetRequiredService(); app.ApplicationServices.GetRequiredService(); + + app.ApplicationServices.GetRequiredService(); } } \ No newline at end of file diff --git a/Wabbajack.Server/Wabbajack.Server.csproj b/Wabbajack.Server/Wabbajack.Server.csproj index 9abb308e..61f73c13 100644 --- a/Wabbajack.Server/Wabbajack.Server.csproj +++ b/Wabbajack.Server/Wabbajack.Server.csproj @@ -12,6 +12,7 @@ + @@ -22,6 +23,7 @@ +