import logging from app.classes.models.servers import Servers from app.classes.models.roles import Roles from app.classes.models.users import User_Roles, users_helper, ApiKeys, Users from app.classes.shared.helpers import helper from app.classes.shared.permission_helper import permission_helper try: from peewee import ( SqliteDatabase, Model, ForeignKeyField, CharField, CompositeKey, JOIN, ) from enum import Enum except ModuleNotFoundError as e: helper.auto_installer_fix(e) logger = logging.getLogger(__name__) peewee_logger = logging.getLogger("peewee") peewee_logger.setLevel(logging.INFO) database = SqliteDatabase( helper.db_path, pragmas={"journal_mode": "wal", "cache_size": -1024 * 10} ) # ********************************************************************************** # Role Servers Class # ********************************************************************************** class Role_Servers(Model): role_id = ForeignKeyField(Roles, backref="role_server") server_id = ForeignKeyField(Servers, backref="role_server") permissions = CharField(default="00000000") class Meta: table_name = "role_servers" primary_key = CompositeKey("role_id", "server_id") database = database # ********************************************************************************** # Servers Permissions Class # ********************************************************************************** class Enum_Permissions_Server(Enum): Commands = 0 Terminal = 1 Logs = 2 Schedule = 3 Backup = 4 Files = 5 Config = 6 Players = 7 class Permissions_Servers: @staticmethod def get_or_create(role_id, server, permissions_mask): return Role_Servers.get_or_create( role_id=role_id, server_id=server, permissions=permissions_mask ) @staticmethod def get_permissions_list(): permissions_list = [] for member in Enum_Permissions_Server.__members__.items(): permissions_list.append(member[1]) return permissions_list @staticmethod def get_permissions(permissions_mask): permissions_list = [] for member in Enum_Permissions_Server.__members__.items(): if server_permissions.has_permission(permissions_mask, member[1]): permissions_list.append(member[1]) return permissions_list @staticmethod def has_permission(permission_mask, permission_tested: Enum_Permissions_Server): return permission_mask[permission_tested.value] == "1" @staticmethod def set_permission( permission_mask, permission_tested: Enum_Permissions_Server, value ): list_perms = list(permission_mask) list_perms[permission_tested.value] = str(value) permission_mask = "".join(list_perms) return permission_mask @staticmethod def get_permission(permission_mask, permission_tested: Enum_Permissions_Server): return permission_mask[permission_tested.value] @staticmethod def get_token_permissions(permissions_mask, api_permissions_mask): permissions_list = [] for member in Enum_Permissions_Server.__members__.items(): if permission_helper.both_have_perm( permissions_mask, api_permissions_mask, member[1] ): permissions_list.append(member[1]) return permissions_list # ********************************************************************************** # Role_Servers Methods # ********************************************************************************** @staticmethod def get_role_servers_from_role_id(roleid): return Role_Servers.select().where(Role_Servers.role_id == roleid) @staticmethod def get_servers_from_role(role_id): return ( Role_Servers.select() .join(Servers, JOIN.INNER) .where(Role_Servers.role_id == role_id) ) @staticmethod def get_roles_from_server(server_id): return ( Role_Servers.select() .join(Roles, JOIN.INNER) .where(Role_Servers.server_id == server_id) ) @staticmethod def add_role_server(server_id, role_id, rs_permissions="00000000"): servers = Role_Servers.insert( { Role_Servers.server_id: server_id, Role_Servers.role_id: role_id, Role_Servers.permissions: rs_permissions, } ).execute() return servers @staticmethod def get_permissions_mask(role_id, server_id): permissions_mask = "" role_server = ( Role_Servers.select() .where(Role_Servers.role_id == role_id) .where(Role_Servers.server_id == server_id) .get() ) permissions_mask = role_server.permissions return permissions_mask @staticmethod def get_server_roles(server_id): role_list = [] roles = ( Role_Servers.select().where(Role_Servers.server_id == server_id).execute() ) for role in roles: role_list.append(role.role_id) return role_list @staticmethod def get_role_permissions_list(role_id): permissions_mask = "00000000" role_server = Role_Servers.get_or_none(Role_Servers.role_id == role_id) if role_server is not None: permissions_mask = role_server.permissions permissions_list = server_permissions.get_permissions(permissions_mask) return permissions_list @staticmethod def update_role_permission(role_id, server_id, permissions_mask): role_server = ( Role_Servers.select() .where(Role_Servers.role_id == role_id) .where(Role_Servers.server_id == server_id) .get() ) role_server.permissions = permissions_mask Role_Servers.save(role_server) @staticmethod def delete_roles_permissions(role_id, removed_servers=None): if removed_servers is None: removed_servers = {} return ( Role_Servers.delete() .where(Role_Servers.role_id == role_id) .where(Role_Servers.server_id.in_(removed_servers)) .execute() ) @staticmethod def remove_roles_of_server(server_id): with database.atomic(): return ( Role_Servers.delete() .where(Role_Servers.server_id == server_id) .execute() ) @staticmethod def get_user_id_permissions_mask(user_id, server_id: str): user = users_helper.get_user_model(user_id) return server_permissions.get_user_permissions_mask(user, server_id) @staticmethod def get_user_permissions_mask(user: Users, server_id: str): if user.superuser: permissions_mask = "1" * len(server_permissions.get_permissions_list()) else: roles_list = users_helper.get_user_roles_id(user.user_id) role_server = ( Role_Servers.select() .where(Role_Servers.role_id.in_(roles_list)) .where(Role_Servers.server_id == server_id) .execute() ) try: permissions_mask = role_server[0].permissions except IndexError: permissions_mask = "0" * len(server_permissions.get_permissions_list()) return permissions_mask @staticmethod def get_server_user_list(server_id): final_users = [] server_roles = Role_Servers.select().where(Role_Servers.server_id == server_id) super_users = Users.select().where( Users.superuser == True # pylint: disable=singleton-comparison ) for role in server_roles: users = User_Roles.select().where(User_Roles.role_id == role.role_id) for user in users: if user.user_id.user_id not in final_users: final_users.append(user.user_id.user_id) for suser in super_users: if suser.user_id not in final_users: final_users.append(suser.user_id) return final_users @staticmethod def get_user_id_permissions_list(user_id, server_id: str): user = users_helper.get_user_model(user_id) return server_permissions.get_user_permissions_list(user, server_id) @staticmethod def get_user_permissions_list(user: Users, server_id: str): if user.superuser: permissions_list = server_permissions.get_permissions_list() else: permissions_mask = server_permissions.get_user_permissions_mask( user, server_id ) permissions_list = server_permissions.get_permissions(permissions_mask) return permissions_list @staticmethod def get_api_key_id_permissions_list(key_id, server_id: str): key = ApiKeys.get(ApiKeys.token_id == key_id) return server_permissions.get_api_key_permissions_list(key, server_id) @staticmethod def get_api_key_permissions_list(key: ApiKeys, server_id: str): user = key.user if user.superuser and key.superuser: return server_permissions.get_permissions_list() else: roles_list = users_helper.get_user_roles_id(user["user_id"]) role_server = ( Role_Servers.select() .where(Role_Servers.role_id.in_(roles_list)) .where(Role_Servers.server_id == server_id) .execute() ) user_permissions_mask = role_server[0].permissions key_permissions_mask = key.server_permissions permissions_mask = permission_helper.combine_masks( user_permissions_mask, key_permissions_mask ) permissions_list = server_permissions.get_permissions(permissions_mask) return permissions_list server_permissions = Permissions_Servers()