crafty-4/app/classes/models/users.py

305 lines
11 KiB
Python

import os
import sys
import logging
import datetime
from typing import Optional, List, Union
from app.classes.shared.helpers import helper
from app.classes.shared.console import console
from app.classes.models.roles import Roles, roles_helper
logger = logging.getLogger(__name__)
peewee_logger = logging.getLogger('peewee')
peewee_logger.setLevel(logging.INFO)
try:
from peewee import *
from playhouse.shortcuts import model_to_dict
from enum import Enum
import yaml
except ModuleNotFoundError as e:
logger.critical("Import Error: Unable to load {} module".format(e.name), exc_info=True)
console.critical("Import Error: Unable to load {} module".format(e.name))
sys.exit(1)
database = SqliteDatabase(helper.db_path, pragmas={
'journal_mode': 'wal',
'cache_size': -1024 * 10})
#************************************************************************************************
# Users Class
#************************************************************************************************
class Users(Model):
user_id = AutoField()
created = DateTimeField(default=datetime.datetime.now)
last_login = DateTimeField(default=datetime.datetime.now)
last_update = DateTimeField(default=datetime.datetime.now)
last_ip = CharField(default="")
username = CharField(default="", unique=True, index=True)
password = CharField(default="")
email = CharField(default="default@example.com")
enabled = BooleanField(default=True)
superuser = BooleanField(default=False)
lang = CharField(default="en_EN")
support_logs = CharField(default = '')
valid_tokens_from = DateTimeField(default=datetime.datetime.now)
class Meta:
table_name = "users"
database = database
# ************************************************************************************************
# API Keys Class
# ************************************************************************************************
class ApiKeys(Model):
token_id = AutoField()
name = CharField(default='', unique=True, index=True)
created = DateTimeField(default=datetime.datetime.now)
user = ForeignKeyField(Users, backref='api_token', index=True)
server_permissions = CharField(default='00000000')
crafty_permissions = CharField(default='000')
superuser = BooleanField(default=False)
class Meta:
table_name = 'api_keys'
database = database
#************************************************************************************************
# User Roles Class
#************************************************************************************************
class User_Roles(Model):
user_id = ForeignKeyField(Users, backref='user_role')
role_id = ForeignKeyField(Roles, backref='user_role')
class Meta:
table_name = 'user_roles'
primary_key = CompositeKey('user_id', 'role_id')
database = database
#************************************************************************************************
# Users Helpers
#************************************************************************************************
class helper_users:
@staticmethod
def get_by_id(user_id):
return Users.get_by_id(user_id)
@staticmethod
def get_all_users():
query = Users.select().where(Users.username != "system")
return query
@staticmethod
def get_user_lang_by_id(user_id):
return Users.get(Users.user_id == user_id).lang
@staticmethod
def get_user_id_by_name(username):
try:
return (Users.get(Users.username == username)).user_id
except DoesNotExist:
return None
@staticmethod
def user_query(user_id):
user_query = Users.select().where(Users.user_id == user_id)
return user_query
@staticmethod
def get_user(user_id):
if user_id == 0:
return {
'user_id': 0,
'created': '10/24/2019, 11:34:00',
'last_login': '10/24/2019, 11:34:00',
'last_update': '10/24/2019, 11:34:00',
'last_ip': "127.27.23.89",
'username': "SYSTEM",
'password': None,
'email': "default@example.com",
'enabled': True,
'superuser': True,
'roles': [],
'servers': [],
'support_logs': '',
}
user = model_to_dict(Users.get(Users.user_id == user_id))
if user:
# I know it should apply it without setting it but I'm just making sure
user = users_helper.add_user_roles(user)
return user
else:
#logger.debug("user: ({}) {}".format(user_id, {}))
return {}
def check_system_user(user_id):
try:
Users.get(Users.user_id == user_id).user_id == user_id
return True
except:
return False
@staticmethod
def get_user_model(user_id: str) -> Users:
user = Users.get(Users.user_id == user_id)
user = users_helper.add_user_roles(user)
return user
@staticmethod
def add_user(username: str, password: Optional[str] = None, email: Optional[str] = None, enabled: bool = True, superuser: bool = False) -> str:
if password is not None:
pw_enc = helper.encode_pass(password)
else:
pw_enc = None
user_id = Users.insert({
Users.username: username.lower(),
Users.password: pw_enc,
Users.email: email,
Users.enabled: enabled,
Users.superuser: superuser,
Users.created: helper.get_time_as_string()
}).execute()
return user_id
@staticmethod
def update_user(user_id, up_data=None):
if up_data is None:
up_data = {}
if up_data:
Users.update(up_data).where(Users.user_id == user_id).execute()
@staticmethod
def get_super_user_list():
final_users = []
super_users = Users.select().where(Users.superuser == True)
for suser in super_users:
if suser.user_id not in final_users:
final_users.append(suser.user_id)
return final_users
@staticmethod
def remove_user(user_id):
with database.atomic():
User_Roles.delete().where(User_Roles.user_id == user_id).execute()
user = Users.get(Users.user_id == user_id)
return user.delete_instance()
@staticmethod
def set_support_path(user_id, support_path):
Users.update(support_logs = support_path).where(Users.user_id == user_id).execute()
@staticmethod
def user_id_exists(user_id):
if not users_helper.get_user(user_id):
return False
return True
#************************************************************************************************
# User_Roles Methods
#************************************************************************************************
@staticmethod
def get_or_create(user_id, role_id):
return User_Roles.get_or_create(user_id=user_id, role_id=role_id)
@staticmethod
def get_user_roles_id(user_id):
roles_list = []
roles = User_Roles.select().where(User_Roles.user_id == user_id)
for r in roles:
roles_list.append(roles_helper.get_role(r.role_id)['role_id'])
return roles_list
@staticmethod
def get_user_roles_names(user_id):
roles_list = []
roles = User_Roles.select().where(User_Roles.user_id == user_id)
for r in roles:
roles_list.append(roles_helper.get_role(r.role_id)['role_name'])
return roles_list
@staticmethod
def add_role_to_user(user_id, role_id):
User_Roles.insert({
User_Roles.user_id: user_id,
User_Roles.role_id: role_id
}).execute()
@staticmethod
def add_user_roles(user: Union[dict, Users]):
if type(user) == dict:
user_id = user['user_id']
else:
user_id = user.user_id
# I just copied this code from get_user, it had those TODOs & comments made by mac - Lukas
roles_query = User_Roles.select().join(Roles, JOIN.INNER).where(User_Roles.user_id == user_id)
# TODO: this query needs to be narrower
roles = set()
for r in roles_query:
roles.add(r.role_id.role_id)
if type(user) == dict:
user['roles'] = roles
else:
user.roles = roles
#logger.debug("user: ({}) {}".format(user_id, user))
return user
@staticmethod
def user_role_query(user_id):
user_query = User_Roles.select().where(User_Roles.user_id == user_id)
query = Roles.select().where(Roles.role_id == -1)
for u in user_query:
query = query + Roles.select().where(Roles.role_id == u.role_id)
return query
@staticmethod
def delete_user_roles(user_id, removed_roles):
User_Roles.delete().where(User_Roles.user_id == user_id).where(User_Roles.role_id.in_(removed_roles)).execute()
@staticmethod
def remove_roles_from_role_id(role_id):
User_Roles.delete().where(User_Roles.role_id == role_id).execute()
# ************************************************************************************************
# ApiKeys Methods
# ************************************************************************************************
@staticmethod
def get_user_api_keys(user_id: str):
return ApiKeys.select().where(ApiKeys.user_id == user_id).execute()
@staticmethod
def get_user_api_key(key_id: str) -> ApiKeys:
return ApiKeys.get(ApiKeys.token_id == key_id)
@staticmethod
def add_user_api_key(name: str, user_id: str, superuser: bool = False, server_permissions_mask: Optional[str] = None, crafty_permissions_mask: Optional[str] = None):
return ApiKeys.insert({
ApiKeys.name: name,
ApiKeys.user_id: user_id,
**({ApiKeys.server_permissions: server_permissions_mask} if server_permissions_mask is not None else {}),
**({ApiKeys.crafty_permissions: crafty_permissions_mask} if crafty_permissions_mask is not None else {}),
ApiKeys.superuser: superuser
}).execute()
@staticmethod
def delete_user_api_keys(user_id: str):
ApiKeys.delete().where(ApiKeys.user_id == user_id).execute()
@staticmethod
def delete_user_api_key(key_id: str):
ApiKeys.delete().where(ApiKeys.token_id == key_id).execute()
users_helper = helper_users()