mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
242 lines
8.2 KiB
Python
242 lines
8.2 KiB
Python
import os
|
|
import sys
|
|
import logging
|
|
import datetime
|
|
|
|
from app.classes.shared.helpers import helper
|
|
from app.classes.shared.console import console
|
|
|
|
from app.classes.models.roles import Roles, roles_helper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
peewee_logger = logging.getLogger('peewee')
|
|
peewee_logger.setLevel(logging.INFO)
|
|
|
|
try:
|
|
from peewee import *
|
|
from playhouse.shortcuts import model_to_dict
|
|
from enum import Enum
|
|
import yaml
|
|
|
|
except ModuleNotFoundError as e:
|
|
logger.critical("Import Error: Unable to load {} module".format(e.name), exc_info=True)
|
|
console.critical("Import Error: Unable to load {} module".format(e.name))
|
|
sys.exit(1)
|
|
|
|
database = SqliteDatabase(helper.db_path, pragmas={
|
|
'journal_mode': 'wal',
|
|
'cache_size': -1024 * 10})
|
|
|
|
#************************************************************************************************
|
|
# Users Class
|
|
#************************************************************************************************
|
|
class Users(Model):
|
|
user_id = AutoField()
|
|
created = DateTimeField(default=datetime.datetime.now)
|
|
last_login = DateTimeField(default=datetime.datetime.now)
|
|
last_update = DateTimeField(default=datetime.datetime.now)
|
|
last_ip = CharField(default="")
|
|
username = CharField(default="", unique=True, index=True)
|
|
password = CharField(default="")
|
|
enabled = BooleanField(default=True)
|
|
superuser = BooleanField(default=False)
|
|
api_token = CharField(default="", unique=True, index=True) # we may need to revisit this
|
|
|
|
class Meta:
|
|
table_name = "users"
|
|
database = database
|
|
|
|
#************************************************************************************************
|
|
# User Roles Class
|
|
#************************************************************************************************
|
|
class User_Roles(Model):
|
|
user_id = ForeignKeyField(Users, backref='user_role')
|
|
role_id = ForeignKeyField(Roles, backref='user_role')
|
|
|
|
class Meta:
|
|
table_name = 'user_roles'
|
|
primary_key = CompositeKey('user_id', 'role_id')
|
|
database = database
|
|
|
|
#************************************************************************************************
|
|
# Users Helpers
|
|
#************************************************************************************************
|
|
class helper_users:
|
|
|
|
@staticmethod
|
|
def get_by_id(user_id):
|
|
return Users.get_by_id(user_id)
|
|
|
|
@staticmethod
|
|
def get_all_users():
|
|
query = Users.select()
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_user_id_by_name(username):
|
|
if username == "SYSTEM":
|
|
return 0
|
|
try:
|
|
return (Users.get(Users.username == username)).user_id
|
|
except DoesNotExist:
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_user_by_api_token(token: str):
|
|
query = Users.select().where(Users.api_token == token)
|
|
|
|
if query.exists():
|
|
user = model_to_dict(Users.get(Users.api_token == token))
|
|
# I know it should apply it without setting it but I'm just making sure
|
|
user = users_helper.add_user_roles(user)
|
|
return user
|
|
else:
|
|
return {}
|
|
|
|
@staticmethod
|
|
def user_query(user_id):
|
|
user_query = Users.select().where(Users.user_id == user_id)
|
|
return user_query
|
|
|
|
@staticmethod
|
|
def get_user(user_id):
|
|
if user_id == 0:
|
|
return {
|
|
'user_id': 0,
|
|
'created': None,
|
|
'last_login': None,
|
|
'last_update': None,
|
|
'last_ip': "127.27.23.89",
|
|
'username': "SYSTEM",
|
|
'password': None,
|
|
'enabled': True,
|
|
'superuser': False,
|
|
'api_token': None,
|
|
'roles': [],
|
|
'servers': [],
|
|
}
|
|
user = model_to_dict(Users.get(Users.user_id == user_id))
|
|
|
|
if user:
|
|
# I know it should apply it without setting it but I'm just making sure
|
|
user = users_helper.add_user_roles(user)
|
|
return user
|
|
else:
|
|
#logger.debug("user: ({}) {}".format(user_id, {}))
|
|
return {}
|
|
|
|
@staticmethod
|
|
def add_user(username, password=None, api_token=None, enabled=True, superuser=False):
|
|
if password is not None:
|
|
pw_enc = helper.encode_pass(password)
|
|
else:
|
|
pw_enc = None
|
|
if api_token is None:
|
|
api_token = users_helper.new_api_token()
|
|
else:
|
|
if type(api_token) is not str and len(api_token) != 32:
|
|
raise ValueError("API token must be a 32 character string")
|
|
user_id = Users.insert({
|
|
Users.username: username.lower(),
|
|
Users.password: pw_enc,
|
|
Users.api_token: api_token,
|
|
Users.enabled: enabled,
|
|
Users.superuser: superuser,
|
|
Users.created: helper.get_time_as_string()
|
|
}).execute()
|
|
return user_id
|
|
|
|
@staticmethod
|
|
def update_user(user_id, up_data={}):
|
|
if up_data:
|
|
Users.update(up_data).where(Users.user_id == user_id).execute()
|
|
|
|
@staticmethod
|
|
def remove_user(user_id):
|
|
with database.atomic():
|
|
User_Roles.delete().where(User_Roles.user_id == user_id).execute()
|
|
user = Users.get(Users.user_id == user_id)
|
|
return user.delete_instance()
|
|
|
|
@staticmethod
|
|
def user_id_exists(user_id):
|
|
if not users_helper.get_user(user_id):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def new_api_token():
|
|
while True:
|
|
token = helper.random_string_generator(32)
|
|
test = list(Users.select(Users.user_id).where(Users.api_token == token))
|
|
if len(test) == 0:
|
|
return token
|
|
|
|
#************************************************************************************************
|
|
# User_Roles Methods
|
|
#************************************************************************************************
|
|
|
|
@staticmethod
|
|
def get_or_create(user_id, role_id):
|
|
return User_Roles.get_or_create(user_id=user_id, role_id=role_id)
|
|
|
|
@staticmethod
|
|
def get_user_roles_id(user_id):
|
|
roles_list = []
|
|
roles = User_Roles.select().where(User_Roles.user_id == user_id)
|
|
for r in roles:
|
|
roles_list.append(roles_helper.get_role(r.role_id)['role_id'])
|
|
return roles_list
|
|
|
|
@staticmethod
|
|
def get_user_roles_names(user_id):
|
|
roles_list = []
|
|
roles = User_Roles.select().where(User_Roles.user_id == user_id)
|
|
for r in roles:
|
|
roles_list.append(roles_helper.get_role(r.role_id)['role_name'])
|
|
return roles_list
|
|
|
|
@staticmethod
|
|
def add_role_to_user(user_id, role_id):
|
|
User_Roles.insert({
|
|
User_Roles.user_id: user_id,
|
|
User_Roles.role_id: role_id
|
|
}).execute()
|
|
|
|
@staticmethod
|
|
def add_user_roles(user):
|
|
if type(user) == dict:
|
|
user_id = user['user_id']
|
|
else:
|
|
user_id = user.user_id
|
|
|
|
# I just copied this code from get_user, it had those TODOs & comments made by mac - Lukas
|
|
|
|
roles_query = User_Roles.select().join(Roles, JOIN.INNER).where(User_Roles.user_id == user_id)
|
|
# TODO: this query needs to be narrower
|
|
roles = set()
|
|
for r in roles_query:
|
|
roles.add(r.role_id.role_id)
|
|
|
|
user['roles'] = roles
|
|
#logger.debug("user: ({}) {}".format(user_id, user))
|
|
return user
|
|
|
|
@staticmethod
|
|
def user_role_query(user_id):
|
|
user_query = User_Roles.select().where(User_Roles.user_id == user_id)
|
|
query = Roles.select().where(Roles.role_id == -1)
|
|
for u in user_query:
|
|
query = Roles.select().where(Roles.role_id == u.role_id)
|
|
return query
|
|
|
|
@staticmethod
|
|
def delete_user_roles(user_id, removed_roles):
|
|
User_Roles.delete().where(User_Roles.user_id == user_id).where(User_Roles.role_id.in_(removed_roles)).execute()
|
|
|
|
@staticmethod
|
|
def remove_roles_from_role_id(role_id):
|
|
User_Roles.delete().where(User_Roles.role_id == role_id).execute()
|
|
|
|
|
|
users_helper = helper_users() |