mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
23ecef6826
These are not compatible with peewee querys, see MR !233 for details
389 lines
12 KiB
Python
389 lines
12 KiB
Python
import logging
|
|
import datetime
|
|
from typing import Optional, Union
|
|
|
|
from app.classes.models.roles import Roles, roles_helper
|
|
from app.classes.shared.helpers import helper
|
|
|
|
try:
|
|
from peewee import (
|
|
SqliteDatabase,
|
|
Model,
|
|
ForeignKeyField,
|
|
CharField,
|
|
AutoField,
|
|
DateTimeField,
|
|
BooleanField,
|
|
CompositeKey,
|
|
DoesNotExist,
|
|
JOIN,
|
|
)
|
|
from playhouse.shortcuts import model_to_dict
|
|
|
|
except ModuleNotFoundError as e:
|
|
helper.auto_installer_fix(e)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
peewee_logger = logging.getLogger("peewee")
|
|
peewee_logger.setLevel(logging.INFO)
|
|
database = SqliteDatabase(
|
|
helper.db_path, pragmas={"journal_mode": "wal", "cache_size": -1024 * 10}
|
|
)
|
|
|
|
# **********************************************************************************
|
|
# Users Class
|
|
# **********************************************************************************
|
|
class Users(Model):
|
|
user_id = AutoField()
|
|
created = DateTimeField(default=datetime.datetime.now)
|
|
last_login = DateTimeField(default=datetime.datetime.now)
|
|
last_update = DateTimeField(default=datetime.datetime.now)
|
|
last_ip = CharField(default="")
|
|
username = CharField(default="", unique=True, index=True)
|
|
password = CharField(default="")
|
|
email = CharField(default="default@example.com")
|
|
enabled = BooleanField(default=True)
|
|
superuser = BooleanField(default=False)
|
|
lang = CharField(default="en_EN")
|
|
support_logs = CharField(default="")
|
|
valid_tokens_from = DateTimeField(default=datetime.datetime.now)
|
|
server_order = CharField(default="")
|
|
preparing = BooleanField(default=False)
|
|
|
|
class Meta:
|
|
table_name = "users"
|
|
database = database
|
|
|
|
|
|
# **********************************************************************************
|
|
# API Keys Class
|
|
# **********************************************************************************
|
|
class ApiKeys(Model):
|
|
token_id = AutoField()
|
|
name = CharField(default="", unique=True, index=True)
|
|
created = DateTimeField(default=datetime.datetime.now)
|
|
user_id = ForeignKeyField(Users, backref="api_token", index=True)
|
|
server_permissions = CharField(default="00000000")
|
|
crafty_permissions = CharField(default="000")
|
|
superuser = BooleanField(default=False)
|
|
|
|
class Meta:
|
|
table_name = "api_keys"
|
|
database = database
|
|
|
|
|
|
# **********************************************************************************
|
|
# User Roles Class
|
|
# **********************************************************************************
|
|
class User_Roles(Model):
|
|
user_id = ForeignKeyField(Users, backref="user_role")
|
|
role_id = ForeignKeyField(Roles, backref="user_role")
|
|
|
|
class Meta:
|
|
table_name = "user_roles"
|
|
primary_key = CompositeKey("user_id", "role_id")
|
|
database = database
|
|
|
|
|
|
# **********************************************************************************
|
|
# Users Helpers
|
|
# **********************************************************************************
|
|
class helper_users:
|
|
@staticmethod
|
|
def get_by_id(user_id):
|
|
return Users.get_by_id(user_id)
|
|
|
|
@staticmethod
|
|
def get_all_users():
|
|
query = Users.select().where(Users.username != "system")
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_user_lang_by_id(user_id):
|
|
return Users.get(Users.user_id == user_id).lang
|
|
|
|
@staticmethod
|
|
def get_user_id_by_name(username):
|
|
try:
|
|
return (Users.get(Users.username == username)).user_id
|
|
except DoesNotExist:
|
|
return None
|
|
|
|
@staticmethod
|
|
def user_query(user_id):
|
|
user_query = Users.select().where(Users.user_id == user_id)
|
|
return user_query
|
|
|
|
@staticmethod
|
|
def get_user(user_id):
|
|
if user_id == 0:
|
|
return {
|
|
"user_id": 0,
|
|
"created": "10/24/2019, 11:34:00",
|
|
"last_login": "10/24/2019, 11:34:00",
|
|
"last_update": "10/24/2019, 11:34:00",
|
|
"last_ip": "127.27.23.89",
|
|
"username": "SYSTEM",
|
|
"password": None,
|
|
"email": "default@example.com",
|
|
"enabled": True,
|
|
"superuser": True,
|
|
"roles": [],
|
|
"servers": [],
|
|
"support_logs": "",
|
|
}
|
|
user = model_to_dict(Users.get(Users.user_id == user_id))
|
|
|
|
if user:
|
|
# I know it should apply it without setting it but I'm just making sure
|
|
user = users_helper.add_user_roles(user)
|
|
return user
|
|
else:
|
|
# logger.debug("user: ({}) {}".format(user_id, {}))
|
|
return {}
|
|
|
|
@staticmethod
|
|
def check_system_user(user_id):
|
|
try:
|
|
result = Users.get(Users.user_id == user_id).user_id == user_id
|
|
if result:
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_user_model(user_id: str) -> Users:
|
|
user = Users.get(Users.user_id == user_id)
|
|
user = users_helper.add_user_roles(user)
|
|
return user
|
|
|
|
@staticmethod
|
|
def add_user(
|
|
username: str,
|
|
password: str = None,
|
|
email: Optional[str] = None,
|
|
enabled: bool = True,
|
|
superuser: bool = False,
|
|
) -> str:
|
|
if password is not None:
|
|
pw_enc = helper.encode_pass(password)
|
|
else:
|
|
pw_enc = None
|
|
user_id = Users.insert(
|
|
{
|
|
Users.username: username.lower(),
|
|
Users.password: pw_enc,
|
|
Users.email: email,
|
|
Users.enabled: enabled,
|
|
Users.superuser: superuser,
|
|
Users.created: helper.get_time_as_string(),
|
|
}
|
|
).execute()
|
|
return user_id
|
|
|
|
@staticmethod
|
|
def add_rawpass_user(
|
|
username: str,
|
|
password: str = None,
|
|
email: Optional[str] = None,
|
|
enabled: bool = True,
|
|
superuser: bool = False,
|
|
) -> str:
|
|
user_id = Users.insert(
|
|
{
|
|
Users.username: username.lower(),
|
|
Users.password: password,
|
|
Users.email: email,
|
|
Users.enabled: enabled,
|
|
Users.superuser: superuser,
|
|
Users.created: helper.get_time_as_string(),
|
|
}
|
|
).execute()
|
|
return user_id
|
|
|
|
@staticmethod
|
|
def update_user(user_id, up_data=None):
|
|
if up_data is None:
|
|
up_data = {}
|
|
if up_data:
|
|
Users.update(up_data).where(Users.user_id == user_id).execute()
|
|
|
|
@staticmethod
|
|
def update_server_order(user_id, user_server_order):
|
|
Users.update(server_order=user_server_order).where(
|
|
Users.user_id == user_id
|
|
).execute()
|
|
|
|
@staticmethod
|
|
def get_server_order(user_id):
|
|
return Users.select().where(Users.user_id == user_id)
|
|
|
|
@staticmethod
|
|
def get_super_user_list():
|
|
final_users = []
|
|
super_users = Users.select().where(
|
|
Users.superuser == True # pylint: disable=singleton-comparison
|
|
)
|
|
for suser in super_users:
|
|
if suser.user_id not in final_users:
|
|
final_users.append(suser.user_id)
|
|
return final_users
|
|
|
|
@staticmethod
|
|
def remove_user(user_id):
|
|
with database.atomic():
|
|
User_Roles.delete().where(User_Roles.user_id == user_id).execute()
|
|
user = Users.get(Users.user_id == user_id)
|
|
return user.delete_instance()
|
|
|
|
@staticmethod
|
|
def set_support_path(user_id, support_path):
|
|
Users.update(support_logs=support_path).where(
|
|
Users.user_id == user_id
|
|
).execute()
|
|
|
|
@staticmethod
|
|
def set_prepare(user_id):
|
|
Users.update(preparing=True).where(Users.user_id == user_id).execute()
|
|
|
|
@staticmethod
|
|
def stop_prepare(user_id):
|
|
Users.update(preparing=False).where(Users.user_id == user_id).execute()
|
|
|
|
@staticmethod
|
|
def clear_support_status():
|
|
Users.update(preparing=False).where(
|
|
Users.preparing == True # pylint: disable=singleton-comparison
|
|
).execute()
|
|
|
|
@staticmethod
|
|
def user_id_exists(user_id):
|
|
if not users_helper.get_user(user_id):
|
|
return False
|
|
return True
|
|
|
|
# **********************************************************************************
|
|
# User_Roles Methods
|
|
# **********************************************************************************
|
|
|
|
@staticmethod
|
|
def get_or_create(user_id, role_id):
|
|
return User_Roles.get_or_create(user_id=user_id, role_id=role_id)
|
|
|
|
@staticmethod
|
|
def get_user_roles_id(user_id):
|
|
roles_list = []
|
|
roles = User_Roles.select().where(User_Roles.user_id == user_id)
|
|
for r in roles:
|
|
roles_list.append(roles_helper.get_role(r.role_id)["role_id"])
|
|
return roles_list
|
|
|
|
@staticmethod
|
|
def get_user_roles_names(user_id):
|
|
roles_list = []
|
|
roles = User_Roles.select().where(User_Roles.user_id == user_id)
|
|
for r in roles:
|
|
roles_list.append(roles_helper.get_role(r.role_id)["role_name"])
|
|
return roles_list
|
|
|
|
@staticmethod
|
|
def add_role_to_user(user_id, role_id):
|
|
User_Roles.insert(
|
|
{User_Roles.user_id: user_id, User_Roles.role_id: role_id}
|
|
).execute()
|
|
|
|
@staticmethod
|
|
def add_user_roles(user: Union[dict, Users]):
|
|
if isinstance(user, dict):
|
|
user_id = user["user_id"]
|
|
else:
|
|
user_id = user.user_id
|
|
|
|
# I just copied this code from get_user,
|
|
# it had those TODOs & comments made by mac - Lukas
|
|
|
|
roles_query = (
|
|
User_Roles.select()
|
|
.join(Roles, JOIN.INNER)
|
|
.where(User_Roles.user_id == user_id)
|
|
)
|
|
# TODO: this query needs to be narrower
|
|
roles = set()
|
|
for r in roles_query:
|
|
roles.add(r.role_id.role_id)
|
|
|
|
if isinstance(user, dict):
|
|
user["roles"] = roles
|
|
else:
|
|
user.roles = roles
|
|
|
|
# logger.debug("user: ({}) {}".format(user_id, user))
|
|
return user
|
|
|
|
@staticmethod
|
|
def user_role_query(user_id):
|
|
user_query = User_Roles.select().where(User_Roles.user_id == user_id)
|
|
query = Roles.select().where(Roles.role_id == -1)
|
|
for u in user_query:
|
|
query = query + Roles.select().where(Roles.role_id == u.role_id)
|
|
return query
|
|
|
|
@staticmethod
|
|
def delete_user_roles(user_id, removed_roles):
|
|
User_Roles.delete().where(User_Roles.user_id == user_id).where(
|
|
User_Roles.role_id.in_(removed_roles)
|
|
).execute()
|
|
|
|
@staticmethod
|
|
def remove_roles_from_role_id(role_id):
|
|
User_Roles.delete().where(User_Roles.role_id == role_id).execute()
|
|
|
|
# **********************************************************************************
|
|
# ApiKeys Methods
|
|
# **********************************************************************************
|
|
|
|
@staticmethod
|
|
def get_user_api_keys(user_id: str):
|
|
return ApiKeys.select().where(ApiKeys.user_id == user_id).execute()
|
|
|
|
@staticmethod
|
|
def get_user_api_key(key_id: str) -> ApiKeys:
|
|
return ApiKeys.get(ApiKeys.token_id == key_id)
|
|
|
|
@staticmethod
|
|
def add_user_api_key(
|
|
name: str,
|
|
user_id: str,
|
|
superuser: bool = False,
|
|
server_permissions_mask: Optional[str] = None,
|
|
crafty_permissions_mask: Optional[str] = None,
|
|
):
|
|
return ApiKeys.insert(
|
|
{
|
|
ApiKeys.name: name,
|
|
ApiKeys.user_id: user_id,
|
|
**(
|
|
{ApiKeys.server_permissions: server_permissions_mask}
|
|
if server_permissions_mask is not None
|
|
else {}
|
|
),
|
|
**(
|
|
{ApiKeys.crafty_permissions: crafty_permissions_mask}
|
|
if crafty_permissions_mask is not None
|
|
else {}
|
|
),
|
|
ApiKeys.superuser: superuser,
|
|
}
|
|
).execute()
|
|
|
|
@staticmethod
|
|
def delete_user_api_keys(user_id: str):
|
|
ApiKeys.delete().where(ApiKeys.user_id == user_id).execute()
|
|
|
|
@staticmethod
|
|
def delete_user_api_key(key_id: str):
|
|
ApiKeys.delete().where(ApiKeys.token_id == key_id).execute()
|
|
|
|
|
|
users_helper = helper_users()
|