AccountManager/app.py

561 lines
18 KiB
Python
Raw Permalink Normal View History

2021-12-04 19:35:28 +00:00
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort
from flask.wrappers import Response
from flask_login.utils import login_url
from flask_sqlalchemy import SQLAlchemy
from flask_bootstrap import Bootstrap
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from bcrypt import checkpw, hashpw, gensalt
import re
import random
import string
import hashlib
import time
import json
import resources as res
class Resources():
def __init__(self) -> None:
self.LOGO = res.LOGO or 'logo/logo.png'
self.PRIVACY_POLICY = res.PRIVACY_POLICY
self.TERMS_OF_USE = res.TERMS_OF_USE
resources = Resources()
# Application instance
app = Flask(__name__)
# SQL instance
db = SQLAlchemy()
# Login instance
login_manager = LoginManager()
class UserModel(UserMixin):
def __init__(self, id, username, gm_level):
self.id = id
self.username = username
self.gm_level = gm_level
def __repr__(self):
return '<User %r>' % self.username
def get_id(self):
return self.id
def is_authenticated(self):
return True
def is_active(self):
return True
def is_anonymous(self):
return False
# Activity data
class Activity():
def __init__(self, start, end, map_id):
self.start = start
self.end = end
self.map_id = map_id
class CharacterLog():
def __init__(self, character_id):
self.character_id = character_id
self.activity_log = [] # List of activities
self.last_logout = 0
self.last_activity = None
def register_login(self, start, map_id):
self.last_activity = Activity(start, None, map_id)
def register_logout(self, end):
if not self.last_activity:
return
self.last_activity.end = end
self.activity_log.append(self.last_activity)
self.last_activity = None
def get_play_time(self):
total_time = 0
for activity in self.activity_log:
total_time += activity.end - activity.start
return total_time
def get_play_time_during(self, start, end):
total_time = 0
for activity in self.activity_log:
if activity.start >= start and activity.end <= end:
total_time += activity.end - activity.start
return total_time
def get_play_time_during_in_map(self, start, end, map_id):
total_time = 0
for activity in self.activity_log:
if activity.start >= start and activity.end <= end and activity.map_id == map_id:
total_time += activity.end - activity.start
return total_time
def __repr__(self):
return '<CharacterLog %r>' % self.character_id
class Dataset():
def __init__(self, label, data, borderColor, borderWidth):
self.label = label
self.data = data
self.borderColor = borderColor
self.borderWidth = borderWidth
def toJSON(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)
class ActivityData():
def __init__(self, labels, title, datasets):
self.labels = labels
self.title = title
self.datasets = datasets
def toJSON(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)
activitie_data = {}
# Color definitions per map_id, alpha always 1
map_colors = {
# 1000: Gray
1000: 'rgba(200, 200, 200, 1)',
# 1100: Green
1100: 'rgba(0, 255, 0, 1)',
# 1200: Blue
1200: 'rgba(54, 162, 235, 1)',
# 1300: Dark Red
1300: 'rgba(255, 99, 132, 1)',
# 1400: Purple
1400: 'rgba(153, 102, 255, 1)',
# 1600: Orange
1600: 'rgba(255, 204, 0, 1)',
# 1700: Light Blue
1700: 'rgba(75, 192, 192, 1)',
# 1800: Light Red
1800: 'rgba(102, 0, 204, 1)',
# 1900: Yellow
1900: 'rgba(255, 159, 64, 1)',
# 2000: Black
2000: 'rgba(0, 0, 0, 1)',
}
@app.route('/load_activities', methods=['GET', 'POST'])
@login_required
def load_activities():
if current_user.gm_level != 9:
abort(403)
return
global activitie_data
connection = db.engine
epoch_time = int(time.time())
# Calculate the time 7 days ago
last_time = epoch_time - (7 * 24 * 60 * 60)
query = "SELECT character_id, activity, time, map_id FROM activity_log WHERE time > %s"
result = connection.execute(query, (last_time)).all()
# Get all unique characters, create a new CharacterLog for each and put it in a dictionary
characters = {}
for row in result:
if row[0] not in characters:
characters[row[0]] = CharacterLog(row[0])
# Loop through the results, if activity = 0, then it's a login, if activity = 1, then it's a logout
for row in result:
if row[1] == 0:
characters[row[0]].register_login(row[2], row[3])
else:
characters[row[0]].register_logout(row[2])
# Create activity data for the last 7 days
# Titled: Sessions in the last 7 days
# Data: Number of sessions
labels = []
data = []
for i in range(7):
labels.append(time.strftime("%a", time.localtime(epoch_time - (i * 24 * 60 * 60))))
data.append(0)
for character in characters:
character_data = characters[character]
for activity in character_data.activity_log:
index = (activity.start - last_time) // (24 * 60 * 60)
data[index] += 1
# Flip labels
labels.reverse()
# Insert the data into the dictionary
activitie_data["sessions"] = ActivityData(labels, "Sessions in the last 7 days", [Dataset("Sessions", data, "rgba(255, 99, 132, 1)", 3)])
# Create activity data for total play time over the last 7 days
# Titled: Total play time in the last 7 days
# Data: Total play time in seconds
labels = []
data = []
for i in range(7):
labels.append(time.strftime("%a", time.localtime(epoch_time - (i * 24 * 60 * 60))))
data.append(0)
# Get the play time for the current day
for character in characters:
character_data = characters[character]
begin = epoch_time - ((i + 1) * 24 * 60 * 60)
end = begin + (24 * 60 * 60)
play_time = character_data.get_play_time_during(begin, end) / (60 * 60)
data[i] += play_time
# Flip labels and data
labels.reverse()
data.reverse()
# Insert the data into the dictionary
activitie_data["play_time"] = ActivityData(labels, "Total play time in the last 7 days", [Dataset("Play time", data, "rgba(255, 99, 132, 1)", 3)])
# Create activity data for each unique map played in the last 7 days
# Titled: Maps played in the last 7 days
# Data: Number of times played
labels = []
maps = {} # { 'map_id': [...] }
# Get all unique maps
for character in characters:
character_data = characters[character]
for activity in character_data.activity_log:
# If the map_id is not devicible by 100, skip it
if activity.map_id % 100 != 0:
continue
if activity.map_id not in maps:
maps[activity.map_id] = []
# Make the array of length 7
for i in range(7):
maps[activity.map_id].append(0)
for i in range(7):
labels.append(time.strftime("%a", time.localtime(epoch_time - (i * 24 * 60 * 60))))
# Get the total time played in each map over this day
for map_id in maps:
begin = epoch_time - ((i + 1) * 24 * 60 * 60)
end = begin + (24 * 60 * 60)
for character in characters:
character_data = characters[character]
play_time = character_data.get_play_time_during_in_map(begin, end, map_id) / (60 * 60)
maps[map_id][6 - i] += play_time
# Flip labels and data
labels.reverse()
# Insert the data into the dictionary, one dataset per map
datasets = []
for map_id in maps:
color = map_colors[map_id]
datasets.append(Dataset(map_id, maps[map_id], color, 3))
activitie_data["zone_play_time"] = ActivityData(labels, "Maps played in the last 7 days", datasets)
# Return 200
return "OK", 200
@app.route('/activity_data/<name>', methods=['GET', 'POST'])
@login_required
def activity_data(name):
if current_user.gm_level != 9:
abort(403)
return
global activitie_data
# Get the entry from the dictionary
# Convert it to json and return it
# In the format of:
# {
# "labels": [...]
# "title": "..."
# "datasets": [{...}, {...}]
# }
# Cannot use jsonify because it doesn't support the Dataset class
return activitie_data[name].toJSON()
# User loader
@login_manager.user_loader
def load_user(user_id):
connection = db.engine
query = "SELECT name, gm_level FROM accounts WHERE id = %s;"
account_result = connection.execute(query, (user_id)).fetchone()
if account_result is None:
return None
return UserModel(user_id, account_result[0], account_result[1])
# Index route. Allow both GET and POST requests. Render the "account_creation" page.
@app.route('/activate', defaults={'key': None}, methods=['GET', 'POST'])
@app.route('/activate/<key>', methods=['GET', 'POST'])
def account_creation(key):
# If we get a POST request, collect the form data:
# play_key, account_name, account_password, & account_repeat_password
if request.method == 'POST':
play_key = request.form['play_key']
account_name = request.form['account_name']
account_password = request.form['account_password']
account_repeat_password = request.form['account_repeat_password']
agree = request.form['agree'] if 'agree' in request.form else False
# If the user didn't agree to the terms, return an error.
if not agree:
return render_template('public/account_creation.html', error="You must agree to the Privacy Policy and Terms of Service to continue.", resources=resources)
# If the passwords don't match, return an error.
if account_password != account_repeat_password:
return render_template('public/account_creation.html', error="Passwords don't match!", resources=resources)
connection = db.engine
# Check if the play_key is valid.
query = "SELECT id, key_uses, active FROM play_keys WHERE key_string = %s"
key_result = connection.execute(query, (play_key)).fetchone()
# If the play key is not active, or keyUses is 0, return an error.
if not key_result or key_result['active'] == False or key_result['key_uses'] == 0:
return render_template('public/account_creation.html', error="Invalid play key!", resources=resources)
# Check if the username only contains alphanumeric characters, no spaces, or special characters.
if not re.match("^[A-Za-z0-9_-]*$", account_name):
return render_template('public/account_creation.html', error="Username contains invalid characters!", resources=resources)
# Check if the username is no more than 32 characters long.
if len(account_name) > 32:
return render_template('public/account_creation.html', error="Username is too long!", resources=resources)
# Check if the username is already taken.
query = "SELECT COUNT(*) FROM accounts WHERE name = %s;"
username_taken = connection.execute(query, (account_name))
# If the username is taken, return an error.
if username_taken.fetchone()[0] > 0:
return render_template('public/account_creation.html', error="Username already taken!", resources=resources)
# Decrement keyUses from the play_keys table by id.
query = 'UPDATE play_keys SET key_uses = ' + str(key_result['key_uses'] - 1) + ' WHERE id = ' + str(key_result['id']) + ';'
connection.execute(query)
# Hash the password.
password_hash = hashpw(account_password.encode('utf-8'), gensalt(prefix=b"2a"))
# Insert the new account into the database.
query = 'INSERT INTO accounts (name, password, play_key_id) VALUES (%s, %s, ' + str(key_result['id']) + ');'
connection.execute(query, (account_name, password_hash))
# Notify the user of the successful account creation.
return render_template('public/account_creation.html', message="Successfully created account '{}'!".format(account_name), resources=resources)
key = key if key else ""
return render_template('public/account_creation.html', key=key, resources=resources)
@login_manager.unauthorized_handler
def unauthorized_redirect():
return redirect(url_for('login'))
@app.route('/logout', methods=['GET', 'POST'])
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/', methods=['GET', 'POST'])
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
if current_user.gm_level == 9:
return redirect(url_for('dashboard'))
else:
return redirect(url_for('data_download'))
if request.method == 'POST':
username = request.form['account_name']
password = request.form['account_password']
remember_me = request.form['remember_me'] if 'remember_me' in request.form else False
# Check if the username is valid.
connection = db.engine
query = "SELECT id, password, gm_level FROM accounts WHERE name = %s;"
account_result = connection.execute(query, (username)).fetchone()
# If the username is not valid, return an error.
if not account_result:
return render_template('private/login.html', error="Incorrect username or password!", resources=resources)
password_hash = account_result['password']
gm_level = account_result['gm_level']
# Support the old password style
old_password = hashlib.sha512((password + username).encode('utf-8')).hexdigest()
try:
# If the password doesn't match the account name, return an error.
if old_password != password_hash and not checkpw(password.encode('utf-8'), password_hash.encode('utf-8')):
return render_template('private/login.html', error="Incorrect username or password!", resources=resources)
except:
return render_template('private/login.html', error="Incorrect username or password!", resources=resources)
# Login user
login_user(UserModel(account_result['id'], username, gm_level), remember=remember_me)
# Redirect to the dashboard.
if gm_level == 9:
return redirect(url_for('dashboard'))
else:
return redirect(url_for('data_download'))
return render_template('private/login.html', resources=resources)
# Data download page
@app.route('/data_download', methods=['GET', 'POST'])
@login_required
def data_download():
if request.method == 'POST':
# Get the data from the form.
character_name = request.form['character_name']
# Check if the character exists.
connection = db.engine
query = "SELECT id FROM charinfo WHERE name = %s AND account_id = %s;"
character_result = connection.execute(query, (character_name, current_user.id)).fetchone()
# If the character doesn't exist, return an error.
if not character_result:
return render_template('private/data_download.html', error="Character '{}' does not exist!".format(character_name), resources=resources)
# Get the character's data.
query = "SELECT xml_data FROM charxml WHERE id = %s;"
xml_data = connection.execute(query, (character_result['id'])).fetchone()
# If the character doesn't have any data, return an error.
if not xml_data:
return render_template('private/data_download.html', error="Character '{}' does not have any data!".format(character_name), resources=resources)
# Return the character's data.
return Response(xml_data['xml_data'], mimetype='text/xml', headers={"Content-disposition": "attachment; filename=" + character_name + ".xml"})
return render_template('private/data_download.html', resources=resources)
# Key creation page
@app.route('/dashboard', methods=['GET', 'POST'])
@login_required
def dashboard():
if current_user.gm_level != 9:
return redirect(url_for('data_download'))
if request.method == 'POST':
connection = db.engine
key_count = request.form['key_count']
# Generate keys in the format AAAA-AAAA-AAAA-AAAA. 4 segments of uppercase letters and numbers.
key_list = []
for i in range(int(key_count)):
key = ""
for j in range(4):
key += ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(4)) + '-'
# Remove last dash
key = key[:-1]
key_list.append(key)
# Insert the keys into the database.
for key in key_list:
query = "INSERT INTO play_keys (key_string, active) VALUES (%s, 1);"
connection.execute(query, (key))
# Notify the user of the successful key creation.
return render_template('private/dashboard.html', message="Successfully created {} keys!".format(key_count), keys=key_list, resources=resources)
return render_template('private/dashboard.html', current_user=current_user, resources=resources)
"""
App configuration
Some properties are in env variables.
"""
from os import getenv
def run_app():
secret_key = getenv('SECRET_KEY')
db_url = getenv('DB_URL')
# If either of the env variables are not set, attempt to read them from credentials.py.
if not secret_key or not db_url:
from credentials import SECRET_KEY, DB_URL
secret_key = SECRET_KEY
db_url = DB_URL
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = secret_key
app.config['SQLALCHEMY_DATABASE_URI'] = db_url
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = False
app.config['WTF_CSRF_ENABLED'] = False
# Setup SQL
db.init_app(app)
# Setup Login
login_manager.init_app(app)
# Setup Bootstrap
Bootstrap(app)
app.run(host='0.0.0.0', port=5000)
if __name__ == '__main__':
run_app()