Parallel downloading of Steam files

This commit is contained in:
Timothy Baldridge 2022-01-09 18:05:47 -07:00
parent e775798a03
commit 25cd582797
3 changed files with 36 additions and 18 deletions

View File

@ -1,6 +1,7 @@
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentFTP.Helpers;
using Microsoft.Extensions.Logging;
@ -81,9 +82,9 @@ public class SteamDownloadFile : IVerb
return 1;
}
_logger.LogInformation("File has is {Size} and {ChunkCount} chunks", fileData.TotalSize.FileSizeToString(), fileData.Chunks.Count);
_logger.LogInformation("File is {Size} and {ChunkCount} chunks", fileData.TotalSize.FileSizeToString(), fileData.Chunks.Count);
await _client.Download(appId, depotManifest!.DepotID, steamManifest!.Manifest, fileData, output);
await _client.Download(appId, depotManifest!.DepotID, steamManifest!.Manifest, fileData, output, CancellationToken.None);
_logger.LogInformation("File downloaded");

View File

@ -6,6 +6,7 @@ using Microsoft.VisualBasic.CompilerServices;
using SteamKit2;
using SteamKit2.CDN;
using SteamKit2.Internal;
using Wabbajack.Common;
using Wabbajack.DTOs.Interventions;
using Wabbajack.DTOs.JsonConverters;
using Wabbajack.Hashing.xxHash64;
@ -14,6 +15,7 @@ using Wabbajack.Networking.Steam.DTOs;
using Wabbajack.Networking.Steam.UserInterventions;
using Wabbajack.Paths;
using Wabbajack.Paths.IO;
using Wabbajack.RateLimiter;
namespace Wabbajack.Networking.Steam;
@ -39,7 +41,8 @@ public class Client : IDisposable
public TaskCompletionSource _licenseRequest = new();
private readonly SteamApps _steamApps;
private readonly DTOSerializer _dtos;
private IReadOnlyCollection<Server> _cdnServers = Array.Empty<Server>();
private Server[] _cdnServers = Array.Empty<Server>();
private readonly IResource<HttpClient> _limiter;
public SteamApps.LicenseListCallback.License[] Licenses { get; private set; }
@ -57,12 +60,13 @@ public class Client : IDisposable
public Client(ILogger<Client> logger, HttpClient client, ITokenProvider<SteamLoginState> token,
IUserInterventionHandler interventionHandler, DTOSerializer dtos)
IUserInterventionHandler interventionHandler, DTOSerializer dtos, IResource<HttpClient> limiter)
{
_logger = logger;
_httpClient = client;
_dtos = dtos;
_interventionHandler = interventionHandler;
_limiter = limiter;
_client = new SteamClient(SteamConfiguration.Create(c =>
{
c.WithProtocolTypes(ProtocolTypes.WebSocket);
@ -354,12 +358,12 @@ public class Client : IDisposable
return AppInfo[appId].AppInfo;
}
public async Task<IReadOnlyCollection<Server>> LoadCDNServers()
public async Task<Server[]> LoadCDNServers()
{
if (_cdnServers.Count > 0) return _cdnServers;
if (_cdnServers.Length > 0) return _cdnServers;
_logger.LogInformation("Loading CDN servers");
_cdnServers = await ContentServerDirectoryService.LoadAsync(_client.Configuration);
_logger.LogInformation("{Count} servers found", _cdnServers.Count);
_cdnServers = (await ContentServerDirectoryService.LoadAsync(_client.Configuration)).ToArray();
_logger.LogInformation("{Count} servers found", _cdnServers.Length);
return _cdnServers;
}
@ -408,21 +412,31 @@ public class Client : IDisposable
return result.DepotKey;
}
public async Task Download(uint appId, uint depotId, ulong manifest, DepotManifest.FileData fileData, AbsolutePath output)
private static readonly Random _random = new();
private Server RandomServer()
{
return _cdnServers[_random.Next(0, _cdnServers.Length)];
}
public async Task Download(uint appId, uint depotId, ulong manifest, DepotManifest.FileData fileData, AbsolutePath output, CancellationToken token)
{
await LoadCDNServers();
var client = _cdnServers.First();
var depotKey = await GetDepotKey(depotId, appId);
await using var os = output.Open(FileMode.Create, FileAccess.Write, FileShare.Read);
foreach (var chunk in fileData.Chunks.OrderBy(c => c.Offset))
await fileData.Chunks.OrderBy(c => c.Offset)
.PMapAll(async chunk =>
{
var client = RandomServer();
using var job = await _limiter.Begin($"Downloading chunk of {fileData.FileName}", chunk.CompressedLength, token);
var chunkId = chunk.ChunkID!.ToHex();
var uri = new UriBuilder()
var uri = new UriBuilder
{
Host = client.Host,
Port = client.Port,
@ -430,13 +444,15 @@ public class Client : IDisposable
Path = $"depot/{depotId}/chunk/{chunkId}"
}.Uri;
var data = await _httpClient.GetByteArrayAsync(uri);
var data = await _httpClient.GetByteArrayAsync(uri, token);
await job.Report(data.Length, token);
var chunkData = new DepotChunk(chunk, data);
chunkData.Process(depotKey);
await os.WriteAsync(chunkData.Data);
}
return chunkData;
}).Do(async data =>
{
await os.WriteAsync(data.Data, token);
});
}
}

View File

@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Wabbajack.Common\Wabbajack.Common.csproj" />
<ProjectReference Include="..\Wabbajack.DTOs\Wabbajack.DTOs.csproj" />
<ProjectReference Include="..\Wabbajack.Networking.Http.Interfaces\Wabbajack.Networking.Http.Interfaces.csproj" />
<ProjectReference Include="..\Wabbajack.Paths.IO\Wabbajack.Paths.IO.csproj" />