Make the Steam downloader parallel, add retries and multiple server support

This commit is contained in:
Timothy Baldridge 2022-01-09 21:09:47 -07:00
parent 25cd582797
commit 2e01293f58
2 changed files with 34 additions and 22 deletions

View File

@ -11,7 +11,7 @@ public static class CircuitBreaker
public static int DEFAULT_DELAY_MULTIPLIER = 2;
public static int DEFAULT_RETRIES = 5;
public static async ValueTask<TR> WithAutoRetryAsync<TR, TE>(ILogger logger, Func<ValueTask<TR>> f,
public static async ValueTask<TR> WithAutoRetryAsync<TR, TE>(ILogger logger, Func<Task<TR>> f,
TimeSpan? delay = null, int? multipler = null, int? maxRetries = null) where TE : Exception
{
var retries = 0;
@ -37,7 +37,7 @@ public static class CircuitBreaker
}
}
public static async ValueTask WithAutoRetryAsync<TE>(ILogger logger, Func<ValueTask> f, TimeSpan? delay = null,
public static async ValueTask WithAutoRetryAsync<TE>(ILogger logger, Func<Task> f, TimeSpan? delay = null,
int? multipler = null, int? maxRetries = null) where TE : Exception
{
var retries = 0;

View File

@ -419,7 +419,8 @@ public class Client : IDisposable
return _cdnServers[_random.Next(0, _cdnServers.Length)];
}
public async Task Download(uint appId, uint depotId, ulong manifest, DepotManifest.FileData fileData, AbsolutePath output, CancellationToken token)
public async Task Download(uint appId, uint depotId, ulong manifest, DepotManifest.FileData fileData, AbsolutePath output,
CancellationToken token, IJob? parentJob = null)
{
await LoadCDNServers();
@ -429,30 +430,41 @@ public class Client : IDisposable
await fileData.Chunks.OrderBy(c => c.Offset)
.PMapAll(async chunk =>
{
var client = RandomServer();
using var job = await _limiter.Begin($"Downloading chunk of {fileData.FileName}", chunk.CompressedLength, token);
var chunkId = chunk.ChunkID!.ToHex();
var uri = new UriBuilder
{
Host = client.Host,
Port = client.Port,
Scheme = client.Protocol.ToString(),
Path = $"depot/{depotId}/chunk/{chunkId}"
}.Uri;
async Task<DepotChunk> AttemptDownload(DepotManifest.ChunkData chunk)
{
var client = RandomServer();
using var job = await _limiter.Begin($"Downloading chunk of {fileData.FileName}",
chunk.CompressedLength, token);
var data = await _httpClient.GetByteArrayAsync(uri, token);
await job.Report(data.Length, token);
var chunkData = new DepotChunk(chunk, data);
chunkData.Process(depotKey);
var chunkId = chunk.ChunkID!.ToHex();
return chunkData;
}).Do(async data =>
var uri = new UriBuilder
{
Host = client.Host,
Port = client.Port,
Scheme = client.Protocol.ToString(),
Path = $"depot/{depotId}/chunk/{chunkId}"
}.Uri;
var data = await _httpClient.GetByteArrayAsync(uri, token);
await job.Report(data.Length, token);
if (parentJob != null)
await parentJob.Report(data.Length, token);
var chunkData = new DepotChunk(chunk, data);
chunkData.Process(depotKey);
return chunkData;
}
return await CircuitBreaker.WithAutoRetryAsync<DepotChunk, HttpRequestException>(_logger, () => AttemptDownload(chunk));
}).Do(async data =>
{
await os.WriteAsync(data.Data, token);
});
}
}