Merge pull request #453 from wabbajack-tools/fix-uploader

Fix uploader
This commit is contained in:
Timothy Baldridge 2020-01-30 16:46:34 -07:00 committed by GitHub
commit b40fab7aa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 231 additions and 99 deletions

View File

@ -22,5 +22,7 @@ namespace Wabbajack.BuildServer
public string BunnyCDN_Password { get; set; }
public string SqlConnection { get; set; }
public int MaxJobs { get; set; } = 2;
}
}

View File

@ -4,6 +4,7 @@ using GraphQL.Types;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Wabbajack.BuildServer.GraphQL;
using Wabbajack.BuildServer.Model.Models;
using Wabbajack.BuildServer.Models;
namespace Wabbajack.BuildServer.Controllers
@ -12,15 +13,18 @@ namespace Wabbajack.BuildServer.Controllers
[ApiController]
public class GraphQL : AControllerBase<GraphQL>
{
public GraphQL(ILogger<GraphQL> logger, DBContext db) : base(logger, db)
private SqlService _sql;
public GraphQL(ILogger<GraphQL> logger, DBContext db, SqlService sql) : base(logger, db)
{
_sql = sql;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] GraphQLQuery query)
{
var inputs = query.Variables.ToInputs();
var schema = new Schema {Query = new Query(Db), Mutation = new Mutation(Db)};
var schema = new Schema {Query = new Query(Db, _sql), Mutation = new Mutation(Db)};
var result = await new DocumentExecuter().ExecuteAsync(_ =>
{

View File

@ -1,11 +1,14 @@
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Wabbajack.BuildServer.Model.Models;
using Wabbajack.BuildServer.Models;
using Wabbajack.Common;
using Wabbajack.Lib.ModListRegistry;
@ -16,8 +19,11 @@ namespace Wabbajack.BuildServer.Controllers
[Route("/metrics")]
public class MetricsController : AControllerBase<MetricsController>
{
public MetricsController(ILogger<MetricsController> logger, DBContext db) : base(logger, db)
private SqlService _sql;
public MetricsController(ILogger<MetricsController> logger, DBContext db, SqlService sql) : base(logger, db)
{
_sql = sql;
}
[HttpGet]
@ -29,10 +35,20 @@ namespace Wabbajack.BuildServer.Controllers
return new Result { Timestamp = date};
}
[Authorize]
[HttpGet]
[Route("transfer")]
public async Task<string> Transfer()
{
var all_metrics = await Db.Metrics.AsQueryable().ToListAsync();
await _sql.IngestAllMetrics(all_metrics);
return "done";
}
private async Task Log(DateTime timestamp, string action, string subject, string metricsKey = null)
{
Logger.Log(LogLevel.Information, $"Log - {timestamp} {action} {subject} {metricsKey}");
await Db.Metrics.InsertOneAsync(new Metric
await _sql.IngestMetric(new Metric
{
Timestamp = timestamp, Action = action, Subject = subject, MetricsKey = metricsKey
});

View File

@ -1,10 +1,13 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Data.SqlTypes;
using GraphQL;
using GraphQL.Types;
using GraphQLParser.AST;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Wabbajack.BuildServer.Model.Models;
using Wabbajack.BuildServer.Models;
using Wabbajack.Common;
@ -12,7 +15,7 @@ namespace Wabbajack.BuildServer.GraphQL
{
public class Query : ObjectGraphType
{
public Query(DBContext db)
public Query(DBContext db, SqlService sql)
{
Field<ListGraphType<JobType>>("unfinishedJobs", resolve: context =>
{
@ -68,7 +71,15 @@ namespace Wabbajack.BuildServer.GraphQL
resolve: async context =>
{
var group = context.GetArgument<string>("metric_type");
return await Metric.Report(db, group);
var data = (await sql.MetricsReport(group))
.GroupBy(m => m.Subject)
.Select(g => new MetricResult
{
SeriesName = g.Key,
Labels = g.Select(m => m.Date.ToString()).ToList(),
Values = g.Select(m => m.Count).ToList()
});
return data;
});
}
}

View File

@ -32,7 +32,7 @@ namespace Wabbajack.BuildServer
public void StartJobRunners()
{
if (!Settings.JobRunner) return;
for (var idx = 0; idx < 2; idx++)
for (var idx = 0; idx < Settings.MaxJobs; idx++)
{
Task.Run(async () =>
{

View File

@ -16,11 +16,11 @@ namespace Wabbajack.BuildServer.Models
public class Metric
{
[BsonId]
public ObjectId Id;
public DateTime Timestamp;
public string Action;
public string Subject;
public string MetricsKey;
public ObjectId Id { get; set; }
public DateTime Timestamp { get; set; }
public string Action { get; set; }
public string Subject { get; set; }
public string MetricsKey { get; set; }
public static async Task<IEnumerable<MetricResult>> Report(DBContext db, string grouping)

View File

@ -0,0 +1,11 @@
using System;
namespace Wabbajack.BuildServer.Model.Models.Results
{
public class AggregateMetric
{
public DateTime Date { get; set; }
public string Subject { get; set; }
public int Count { get; set; }
}
}

View File

@ -7,6 +7,7 @@ using System.Linq;
using System.Threading.Tasks;
using Dapper;
using Microsoft.Extensions.Configuration;
using Wabbajack.BuildServer.Model.Models.Results;
using Wabbajack.BuildServer.Models;
using Wabbajack.Common;
using Wabbajack.VirtualFileSystem;
@ -16,15 +17,20 @@ namespace Wabbajack.BuildServer.Model.Models
public class SqlService
{
private IConfiguration _configuration;
private IDbConnection _conn;
private AppSettings _settings;
public SqlService(AppSettings configuration)
public SqlService(AppSettings settings)
{
_conn = new SqlConnection(configuration.SqlConnection);
_conn.Open();
_settings = settings;
}
public IDbConnection Connection { get => _conn; }
private async Task<SqlConnection> Open()
{
var conn = new SqlConnection(_settings.SqlConnection);
await conn.OpenAsync();
return conn;
}
public async Task MergeVirtualFile(VirtualFile vfile)
{
@ -36,7 +42,8 @@ namespace Wabbajack.BuildServer.Model.Models
files = files.DistinctBy(f => f.Hash).ToList();
contents = contents.DistinctBy(c => (c.Parent, c.Path)).ToList();
await Connection.ExecuteAsync("dbo.MergeIndexedFiles", new {Files = files.ToDataTable(), Contents = contents.ToDataTable()},
await using var conn = await Open();
await conn.ExecuteAsync("dbo.MergeIndexedFiles", new {Files = files.ToDataTable(), Contents = contents.ToDataTable()},
commandType: CommandType.StoredProcedure);
}
@ -72,7 +79,8 @@ namespace Wabbajack.BuildServer.Model.Models
public async Task<bool> HaveIndexdFile(string hash)
{
var row = await Connection.QueryAsync(@"SELECT * FROM IndexedFile WHERE Hash = @Hash",
await using var conn = await Open();
var row = await conn.QueryAsync(@"SELECT * FROM IndexedFile WHERE Hash = @Hash",
new {Hash = BitConverter.ToInt64(hash.FromBase64())});
return row.Any();
}
@ -97,9 +105,9 @@ namespace Wabbajack.BuildServer.Model.Models
/// <returns></returns>
public async Task<IndexedVirtualFile> AllArchiveContents(long hash)
{
var files = await Connection.QueryAsync<ArchiveContentsResult>(@"
SELECT 0 as Parent, i.Hash, i.Size, null as Path FROM IndexedFile WHERE Hash = @Hash
await using var conn = await Open();
var files = await conn.QueryAsync<ArchiveContentsResult>(@"
SELECT 0 as Parent, i.Hash, i.Size, null as Path FROM IndexedFile i WHERE Hash = @Hash
UNION ALL
SELECT a.Parent, i.Hash, i.Size, a.Path FROM AllArchiveContent a
LEFT JOIN IndexedFile i ON i.Hash = a.Child
@ -110,15 +118,54 @@ namespace Wabbajack.BuildServer.Model.Models
List<IndexedVirtualFile> Build(long parent)
{
return grouped[parent].Select(f => new IndexedVirtualFile
if (grouped.TryGetValue(parent, out var children))
{
Name = f.Path,
Hash = BitConverter.GetBytes(f.Hash).ToBase64(),
Size = f.Size,
Children = Build(f.Hash)
}).ToList();
return children.Select(f => new IndexedVirtualFile
{
Name = f.Path,
Hash = BitConverter.GetBytes(f.Hash).ToBase64(),
Size = f.Size,
Children = Build(f.Hash)
}).ToList();
}
return new List<IndexedVirtualFile>();
}
return Build(0).First();
}
public async Task IngestAllMetrics(IEnumerable<Metric> allMetrics)
{
await using var conn = await Open();
await conn.ExecuteAsync(@"INSERT INTO dbo.Metrics (Timestamp, Action, Subject, MetricsKey) VALUES (@Timestamp, @Action, @Subject, @MetricsKey)", allMetrics);
}
public async Task IngestMetric(Metric metric)
{
await using var conn = await Open();
await conn.ExecuteAsync(@"INSERT INTO dbo.Metrics (Timestamp, Action, Subject, MetricsKey) VALUES (@Timestamp, @Action, @Subject, @MetricsKey)", metric);
}
public async Task<IEnumerable<AggregateMetric>> MetricsReport(string action)
{
await using var conn = await Open();
return (await conn.QueryAsync<AggregateMetric>(@"
SELECT d.Date, d.GroupingSubject as Subject, Count(*) as Count FROM
(select DISTINCT CONVERT(date, Timestamp) as Date, GroupingSubject, Action, MetricsKey from dbo.Metrics) m
RIGHT OUTER JOIN
(SELECT CONVERT(date, DATEADD(DAY, number + 1, dbo.MinMetricDate())) as Date, GroupingSubject, Action
FROM master..spt_values
CROSS JOIN (
SELECT DISTINCT GroupingSubject, Action FROM dbo.Metrics
WHERE MetricsKey is not null
AND Subject != 'Default'
AND TRY_CONVERT(uniqueidentifier, Subject) is null) as keys
WHERE type = 'P'
AND DATEADD(DAY, number+1, dbo.MinMetricDate()) < dbo.MaxMetricDate()) as d
ON m.Date = d.Date AND m.GroupingSubject = d.GroupingSubject AND m.Action = d.Action
WHERE d.Action = @action
group by d.Date, d.GroupingSubject, d.Action
ORDER BY d.Date, d.GroupingSubject, d.Action", new {Action = action}))
.ToList();
}
}
}

View File

@ -209,7 +209,7 @@ namespace Wabbajack.Common
{
var info = new ProcessStartInfo
{
FileName = "innounp.exe",
FileName = @"Extractors\innounp.exe",
Arguments = $"-t \"{v}\" ",
RedirectStandardError = true,
RedirectStandardInput = true,

View File

@ -1,8 +1,10 @@
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Wabbajack.Common;
using File = Alphaleonis.Win32.Filesystem.File;
@ -12,10 +14,10 @@ namespace Wabbajack.Lib.FileUploader
{
public class AuthorAPI
{
public static IObservable<bool> HaveAuthorAPIKey => Utils.HaveEncryptedJsonObservable("author-api-key");
public static IObservable<bool> HaveAuthorAPIKey => Utils.HaveEncryptedJsonObservable("author-api-key.txt");
public static IObservable<string> AuthorAPIKey => HaveAuthorAPIKey.Where(h => h)
.Select(_ => File.ReadAllText(Path.Combine(Consts.LocalAppDataPath, "author-api-key")));
.Select(_ => File.ReadAllText(Path.Combine(Consts.LocalAppDataPath, "author-api-key.txt")));
public static string GetAPIKey()
@ -26,66 +28,79 @@ namespace Wabbajack.Lib.FileUploader
public static readonly Uri UploadURL = new Uri("https://build.wabbajack.org/upload_file");
public static long BLOCK_SIZE = (long)1024 * 1024 * 8;
public static Task<string> UploadFile(WorkQueue queue, string filename)
public static long BLOCK_SIZE = (long)1024 * 1024 * 2;
public static int MAX_CONNECTIONS = 8;
public static Task<string> UploadFile(WorkQueue queue, string filename, Action<double> progressFn)
{
var tcs = new TaskCompletionSource<string>();
queue.QueueTask(async () =>
Task.Run(async () =>
{
var client = new HttpClient();
var handler = new HttpClientHandler {MaxConnectionsPerServer = MAX_CONNECTIONS};
var client = new HttpClient(handler);
var fsize = new FileInfo(filename).Length;
client.DefaultRequestHeaders.Add("X-API-KEY", AuthorAPI.GetAPIKey());
var response = await client.PutAsync(UploadURL+$"/{Path.GetFileName(filename)}/start", new StringContent(""));
if (!response.IsSuccessStatusCode)
{
tcs.SetResult("FAILED");
tcs.SetException(new Exception($"Start Error: {response.StatusCode} {response.ReasonPhrase}"));
return;
}
var key = await response.Content.ReadAsStringAsync();
using (var iqueue = new WorkQueue(8))
long sent = 0;
using (var iqueue = new WorkQueue(MAX_CONNECTIONS))
{
iqueue.Report("Starting Upload", 1);
await Enumerable.Range(0, (int)(fsize / BLOCK_SIZE))
.PMap(iqueue, async block_idx =>
{
if (tcs.Task.IsFaulted) return;
var block_offset = block_idx * BLOCK_SIZE;
var block_size = block_offset + BLOCK_SIZE > fsize
? fsize - block_offset
: BLOCK_SIZE;
Interlocked.Add(ref sent, block_size);
progressFn((double)sent / fsize);
await Enumerable.Range(0, (int)(fsize / BLOCK_SIZE))
.PMap(iqueue, async block_idx =>
int retries = 0;
using (var fs = File.OpenRead(filename))
{
var block_offset = block_idx * BLOCK_SIZE;
var block_size = block_offset + BLOCK_SIZE > fsize
? fsize - block_offset
: BLOCK_SIZE;
fs.Position = block_offset;
var data = new byte[block_size];
await fs.ReadAsync(data, 0, data.Length);
using (var fs = File.OpenRead(filename))
response = await client.PutAsync(UploadURL + $"/{key}/data/{block_offset}",
new ByteArrayContent(data));
if (!response.IsSuccessStatusCode)
{
fs.Position = block_offset;
var data = new byte[block_size];
await fs.ReadAsync(data, 0, data.Length);
response = await client.PutAsync(UploadURL + $"/{key}/data/{block_offset}",
new ByteArrayContent(data));
if (!response.IsSuccessStatusCode)
{
tcs.SetResult("FAILED");
return;
}
var val = long.Parse(await response.Content.ReadAsStringAsync());
if (val != block_offset + data.Length)
{
tcs.SetResult("Sync Error");
return;
}
tcs.SetException(new Exception($"Put Error: {response.StatusCode} {response.ReasonPhrase}"));
return;
}
});
var val = long.Parse(await response.Content.ReadAsStringAsync());
if (val != block_offset + data.Length)
{
tcs.SetResult($"Sync Error {val} vs {block_offset + data.Length}");
tcs.SetException(new Exception($"Sync Error {val} vs {block_offset + data.Length}"));
}
}
});
}
response = await client.PutAsync(UploadURL + $"/{key}/finish", new StringContent(""));
if (response.IsSuccessStatusCode)
tcs.SetResult(await response.Content.ReadAsStringAsync());
else
tcs.SetResult("FAILED");
if (!tcs.Task.IsFaulted)
{
progressFn(1.0);
response = await client.PutAsync(UploadURL + $"/{key}/finish", new StringContent(""));
if (response.IsSuccessStatusCode)
tcs.SetResult(await response.Content.ReadAsStringAsync());
else
tcs.SetException(new Exception($"Finalization Error: {response.StatusCode} {response.ReasonPhrase}"));
}
progressFn(0.0);
});
return tcs.Task;

View File

@ -245,7 +245,7 @@ namespace Wabbajack
.ToGuiProperty(this, nameof(ErrorTooltip));
}
public ICommand ConstructTypicalPickerCommand()
public ICommand ConstructTypicalPickerCommand(IObservable<bool> canExecute = null)
{
return ReactiveCommand.Create(
execute: () =>
@ -280,7 +280,7 @@ namespace Wabbajack
}
if (dlg.ShowDialog() != CommonFileDialogResult.Ok) return;
TargetPath = dlg.FileName;
});
}, canExecute: canExecute);
}
}
}

View File

@ -6,6 +6,8 @@ using System.Reactive.Subjects;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Documents;
using System.Windows.Input;
using Alphaleonis.Win32.Filesystem;
using Microsoft.WindowsAPICodePack.Shell.PropertySystem;
using ReactiveUI;
@ -20,39 +22,57 @@ namespace Wabbajack
{
public class AuthorFilesVM : BackNavigatingVM
{
public Visibility IsVisible { get; }
private readonly ObservableAsPropertyHelper<Visibility> _isVisible;
public Visibility IsVisible => _isVisible.Value;
[Reactive]
public string SelectedFile { get; set; }
public IReactiveCommand SelectFile { get; }
private readonly ObservableAsPropertyHelper<string> _selectedFile;
public ICommand SelectFile { get; }
public ICommand HyperlinkCommand { get; }
public IReactiveCommand Upload { get; }
[Reactive]
public double UploadProgress { get; set; }
[Reactive] public double UploadProgress { get; set; }
[Reactive] public string FinalUrl { get; set; }
private WorkQueue Queue = new WorkQueue(1);
public FilePickerVM Picker { get;}
private Subject<bool> _isUploading = new Subject<bool>();
private IObservable<bool> IsUploading { get; }
public AuthorFilesVM(SettingsVM vm) : base(vm.MWVM)
{
var sub = new Subject<double>();
Queue.Status.Select(s => (double)s.ProgressPercent).Subscribe(v =>
{
UploadProgress = v;
});
IsVisible = AuthorAPI.HasAPIKey ? Visibility.Visible : Visibility.Collapsed;
SelectFile = ReactiveCommand.Create(() =>
{
var fod = UIUtils.OpenFileDialog("*|*");
if (fod != null)
SelectedFile = fod;
});
IsUploading = _isUploading;
Picker = new FilePickerVM(this);
_isVisible = AuthorAPI.HaveAuthorAPIKey.Select(h => h ? Visibility.Visible : Visibility.Collapsed)
.ToProperty(this, x => x.IsVisible);
SelectFile = Picker.ConstructTypicalPickerCommand(IsUploading.StartWith(false).Select(u => !u));
HyperlinkCommand = ReactiveCommand.Create(() => Clipboard.SetText(FinalUrl));
Upload = ReactiveCommand.Create(async () =>
{
SelectedFile = await AuthorAPI.UploadFile(Queue, SelectedFile);
});
_isUploading.OnNext(true);
try
{
FinalUrl = await AuthorAPI.UploadFile(Queue, Picker.TargetPath,
progress => UploadProgress = progress);
}
catch (Exception ex)
{
FinalUrl = ex.ToString();
}
finally
{
_isUploading.OnNext(false);
}
}, IsUploading.StartWith(false).Select(u => !u)
.CombineLatest(Picker.WhenAnyValue(t => t.TargetPath).Select(f => f != null),
(a, b) => a && b));
}
}
}

View File

@ -24,6 +24,7 @@
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
</Grid.RowDefinitions>
<Grid.ColumnDefinitions>
<ColumnDefinition Width="300"></ColumnDefinition>
@ -36,10 +37,15 @@
FontSize="20"
FontWeight="Bold"
Text="File Uploader" />
<TextBlock Margin="5" Grid.Row="1" Grid.ColumnSpan="2" Text="{Binding AuthorFile.SelectedFile}"></TextBlock>
<TextBlock Margin="5" Grid.Row="1" Grid.ColumnSpan="2" Text="{Binding AuthorFile.Picker.TargetPath}"></TextBlock>
<ProgressBar Margin="5" Grid.Row="2" Grid.ColumnSpan="2" Value="{Binding AuthorFile.UploadProgress, Mode=OneWay}" Minimum="0" Maximum="1"></ProgressBar>
<Button Margin="5" Grid.Row="3" Grid.Column="0" Command="{Binding AuthorFile.SelectFile, Mode=OneTime}">Select</Button>
<Button Margin="5" Grid.Row="3" Grid.Column="1" Command="{Binding AuthorFile.Upload}">Upload</Button>
<TextBlock Margin="5" Grid.Row="3" Grid.ColumnSpan="2">
<Hyperlink Command="{Binding AuthorFile.HyperlinkCommand}">
<TextBlock Text="{Binding AuthorFile.FinalUrl}"></TextBlock>
</Hyperlink>
</TextBlock>
<Button Margin="5" Grid.Row="4" Grid.Column="0" Command="{Binding AuthorFile.SelectFile, Mode=OneTime}">Select</Button>
<Button Margin="5" Grid.Row="4" Grid.Column="1" Command="{Binding AuthorFile.Upload}">Upload</Button>
</Grid>
</Border>
</rxui:ReactiveUserControl>