From 2e01293f58965140b0a5ba1ec1e81c20535a4de2 Mon Sep 17 00:00:00 2001 From: Timothy Baldridge Date: Sun, 9 Jan 2022 21:09:47 -0700 Subject: [PATCH] Make the Steam downloader parallel, add retries and multiple server support --- Wabbajack.Common/CircuitBreaker.cs | 4 +-- Wabbajack.Networking.Steam/Client.cs | 52 +++++++++++++++++----------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/Wabbajack.Common/CircuitBreaker.cs b/Wabbajack.Common/CircuitBreaker.cs index 9b50d354..83bc9e4c 100644 --- a/Wabbajack.Common/CircuitBreaker.cs +++ b/Wabbajack.Common/CircuitBreaker.cs @@ -11,7 +11,7 @@ public static class CircuitBreaker public static int DEFAULT_DELAY_MULTIPLIER = 2; public static int DEFAULT_RETRIES = 5; - public static async ValueTask WithAutoRetryAsync(ILogger logger, Func> f, + public static async ValueTask WithAutoRetryAsync(ILogger logger, Func> f, TimeSpan? delay = null, int? multipler = null, int? maxRetries = null) where TE : Exception { var retries = 0; @@ -37,7 +37,7 @@ public static class CircuitBreaker } } - public static async ValueTask WithAutoRetryAsync(ILogger logger, Func f, TimeSpan? delay = null, + public static async ValueTask WithAutoRetryAsync(ILogger logger, Func f, TimeSpan? delay = null, int? multipler = null, int? maxRetries = null) where TE : Exception { var retries = 0; diff --git a/Wabbajack.Networking.Steam/Client.cs b/Wabbajack.Networking.Steam/Client.cs index cc7f26e4..19011990 100644 --- a/Wabbajack.Networking.Steam/Client.cs +++ b/Wabbajack.Networking.Steam/Client.cs @@ -419,7 +419,8 @@ public class Client : IDisposable return _cdnServers[_random.Next(0, _cdnServers.Length)]; } - public async Task Download(uint appId, uint depotId, ulong manifest, DepotManifest.FileData fileData, AbsolutePath output, CancellationToken token) + public async Task Download(uint appId, uint depotId, ulong manifest, DepotManifest.FileData fileData, AbsolutePath output, + CancellationToken token, IJob? parentJob = null) { await LoadCDNServers(); @@ -429,30 +430,41 @@ public class Client : IDisposable await fileData.Chunks.OrderBy(c => c.Offset) .PMapAll(async chunk => - { - var client = RandomServer(); - using var job = await _limiter.Begin($"Downloading chunk of {fileData.FileName}", chunk.CompressedLength, token); - - var chunkId = chunk.ChunkID!.ToHex(); - - - var uri = new UriBuilder { - Host = client.Host, - Port = client.Port, - Scheme = client.Protocol.ToString(), - Path = $"depot/{depotId}/chunk/{chunkId}" - }.Uri; + async Task AttemptDownload(DepotManifest.ChunkData chunk) + { + var client = RandomServer(); + using var job = await _limiter.Begin($"Downloading chunk of {fileData.FileName}", + chunk.CompressedLength, token); - var data = await _httpClient.GetByteArrayAsync(uri, token); - await job.Report(data.Length, token); - var chunkData = new DepotChunk(chunk, data); - chunkData.Process(depotKey); + var chunkId = chunk.ChunkID!.ToHex(); - return chunkData; - }).Do(async data => + + var uri = new UriBuilder + { + Host = client.Host, + Port = client.Port, + Scheme = client.Protocol.ToString(), + Path = $"depot/{depotId}/chunk/{chunkId}" + }.Uri; + + var data = await _httpClient.GetByteArrayAsync(uri, token); + await job.Report(data.Length, token); + + if (parentJob != null) + await parentJob.Report(data.Length, token); + + var chunkData = new DepotChunk(chunk, data); + chunkData.Process(depotKey); + + return chunkData; + } + + return await CircuitBreaker.WithAutoRetryAsync(_logger, () => AttemptDownload(chunk)); + }).Do(async data => { await os.WriteAsync(data.Data, token); + }); } } \ No newline at end of file