mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
225 lines
9.3 KiB
Python
225 lines
9.3 KiB
Python
import logging
|
|
|
|
from app.classes.models.servers import Servers
|
|
from app.classes.models.roles import Roles
|
|
from app.classes.models.users import User_Roles, users_helper, ApiKeys, Users
|
|
from app.classes.shared.helpers import helper
|
|
from app.classes.shared.permission_helper import permission_helper
|
|
|
|
try:
|
|
from peewee import SqliteDatabase, Model, ForeignKeyField, CharField, CompositeKey, JOIN
|
|
from enum import Enum
|
|
|
|
except ModuleNotFoundError as e:
|
|
helper.auto_installer_fix(e)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
peewee_logger = logging.getLogger('peewee')
|
|
peewee_logger.setLevel(logging.INFO)
|
|
database = SqliteDatabase(helper.db_path, pragmas = {
|
|
'journal_mode': 'wal',
|
|
'cache_size': -1024 * 10})
|
|
|
|
#************************************************************************************************
|
|
# Role Servers Class
|
|
#************************************************************************************************
|
|
class Role_Servers(Model):
|
|
role_id = ForeignKeyField(Roles, backref='role_server')
|
|
server_id = ForeignKeyField(Servers, backref='role_server')
|
|
permissions = CharField(default="00000000")
|
|
|
|
class Meta:
|
|
table_name = 'role_servers'
|
|
primary_key = CompositeKey('role_id', 'server_id')
|
|
database = database
|
|
|
|
#************************************************************************************************
|
|
# Servers Permissions Class
|
|
#************************************************************************************************
|
|
class Enum_Permissions_Server(Enum):
|
|
Commands = 0
|
|
Terminal = 1
|
|
Logs = 2
|
|
Schedule = 3
|
|
Backup = 4
|
|
Files = 5
|
|
Config = 6
|
|
Players = 7
|
|
|
|
class Permissions_Servers:
|
|
|
|
@staticmethod
|
|
def get_or_create(role_id, server, permissions_mask):
|
|
return Role_Servers.get_or_create(role_id=role_id, server_id=server, permissions=permissions_mask)
|
|
|
|
@staticmethod
|
|
def get_permissions_list():
|
|
permissions_list = []
|
|
for member in Enum_Permissions_Server.__members__.items():
|
|
permissions_list.append(member[1])
|
|
return permissions_list
|
|
|
|
@staticmethod
|
|
def get_permissions(permissions_mask):
|
|
permissions_list = []
|
|
for member in Enum_Permissions_Server.__members__.items():
|
|
if server_permissions.has_permission(permissions_mask, member[1]):
|
|
permissions_list.append(member[1])
|
|
return permissions_list
|
|
|
|
@staticmethod
|
|
def has_permission(permission_mask, permission_tested: Enum_Permissions_Server):
|
|
return permission_mask[permission_tested.value] == '1'
|
|
|
|
@staticmethod
|
|
def set_permission(permission_mask, permission_tested: Enum_Permissions_Server, value):
|
|
list_perms = list(permission_mask)
|
|
list_perms[permission_tested.value] = str(value)
|
|
permission_mask = ''.join(list_perms)
|
|
return permission_mask
|
|
|
|
@staticmethod
|
|
def get_permission(permission_mask, permission_tested: Enum_Permissions_Server):
|
|
return permission_mask[permission_tested.value]
|
|
|
|
@staticmethod
|
|
def get_token_permissions(permissions_mask, api_permissions_mask):
|
|
permissions_list = []
|
|
for member in Enum_Permissions_Server.__members__.items():
|
|
if permission_helper.both_have_perm(permissions_mask, api_permissions_mask, member[1]):
|
|
permissions_list.append(member[1])
|
|
return permissions_list
|
|
|
|
|
|
#************************************************************************************************
|
|
# Role_Servers Methods
|
|
#************************************************************************************************
|
|
@staticmethod
|
|
def get_role_servers_from_role_id(roleid):
|
|
return Role_Servers.select().where(Role_Servers.role_id == roleid)
|
|
|
|
@staticmethod
|
|
def get_servers_from_role(role_id):
|
|
return Role_Servers.select().join(Servers, JOIN.INNER).where(Role_Servers.role_id == role_id)
|
|
|
|
@staticmethod
|
|
def get_roles_from_server(server_id):
|
|
return Role_Servers.select().join(Roles, JOIN.INNER).where(Role_Servers.server_id == server_id)
|
|
|
|
@staticmethod
|
|
def add_role_server(server_id, role_id, rs_permissions="00000000"):
|
|
servers = Role_Servers.insert({Role_Servers.server_id: server_id, Role_Servers.role_id: role_id,
|
|
Role_Servers.permissions: rs_permissions}).execute()
|
|
return servers
|
|
|
|
@staticmethod
|
|
def get_permissions_mask(role_id, server_id):
|
|
permissions_mask = ''
|
|
role_server = Role_Servers.select().where(Role_Servers.role_id == role_id).where(Role_Servers.server_id == server_id).get()
|
|
permissions_mask = role_server.permissions
|
|
return permissions_mask
|
|
|
|
@staticmethod
|
|
def get_server_roles(server_id):
|
|
role_list = []
|
|
roles = Role_Servers.select().where(Role_Servers.server_id == server_id).execute()
|
|
for role in roles:
|
|
role_list.append(role.role_id)
|
|
return role_list
|
|
|
|
@staticmethod
|
|
def get_role_permissions_list(role_id):
|
|
permissions_mask = '00000000'
|
|
role_server = Role_Servers.get_or_none(Role_Servers.role_id == role_id)
|
|
if role_server is not None:
|
|
permissions_mask = role_server.permissions
|
|
permissions_list = server_permissions.get_permissions(permissions_mask)
|
|
return permissions_list
|
|
|
|
@staticmethod
|
|
def update_role_permission(role_id, server_id, permissions_mask):
|
|
role_server = Role_Servers.select().where(Role_Servers.role_id == role_id).where(Role_Servers.server_id == server_id).get()
|
|
role_server.permissions = permissions_mask
|
|
Role_Servers.save(role_server)
|
|
|
|
@staticmethod
|
|
def delete_roles_permissions(role_id, removed_servers=None):
|
|
if removed_servers is None:
|
|
removed_servers = {}
|
|
return Role_Servers.delete().where(Role_Servers.role_id == role_id).where(Role_Servers.server_id.in_(removed_servers)).execute()
|
|
|
|
@staticmethod
|
|
def remove_roles_of_server(server_id):
|
|
with database.atomic():
|
|
return Role_Servers.delete().where(Role_Servers.server_id == server_id).execute()
|
|
|
|
@staticmethod
|
|
def get_user_id_permissions_mask(user_id, server_id: str):
|
|
user = users_helper.get_user_model(user_id)
|
|
return server_permissions.get_user_permissions_mask(user, server_id)
|
|
|
|
@staticmethod
|
|
def get_user_permissions_mask(user: Users, server_id: str):
|
|
if user.superuser:
|
|
permissions_mask = '1' * len(server_permissions.get_permissions_list())
|
|
else:
|
|
roles_list = users_helper.get_user_roles_id(user.user_id)
|
|
role_server = Role_Servers.select().where(Role_Servers.role_id.in_(roles_list)).where(Role_Servers.server_id == server_id).execute()
|
|
try:
|
|
permissions_mask = role_server[0].permissions
|
|
except IndexError:
|
|
permissions_mask = '0' * len(server_permissions.get_permissions_list())
|
|
return permissions_mask
|
|
|
|
@staticmethod
|
|
def get_server_user_list(server_id):
|
|
final_users = []
|
|
server_roles = Role_Servers.select().where(Role_Servers.server_id == server_id)
|
|
# pylint: disable=singleton-comparison
|
|
super_users = Users.select().where(Users.superuser == True)
|
|
for role in server_roles:
|
|
users = User_Roles.select().where(User_Roles.role_id == role.role_id)
|
|
for user in users:
|
|
if user.user_id.user_id not in final_users:
|
|
final_users.append(user.user_id.user_id)
|
|
for suser in super_users:
|
|
if suser.user_id not in final_users:
|
|
final_users.append(suser.user_id)
|
|
return final_users
|
|
|
|
@staticmethod
|
|
def get_user_id_permissions_list(user_id, server_id: str):
|
|
user = users_helper.get_user_model(user_id)
|
|
return server_permissions.get_user_permissions_list(user, server_id)
|
|
|
|
@staticmethod
|
|
def get_user_permissions_list(user: Users, server_id: str):
|
|
if user.superuser:
|
|
permissions_list = server_permissions.get_permissions_list()
|
|
else:
|
|
permissions_mask = server_permissions.get_user_permissions_mask(user, server_id)
|
|
permissions_list = server_permissions.get_permissions(permissions_mask)
|
|
return permissions_list
|
|
|
|
@staticmethod
|
|
def get_api_key_id_permissions_list(key_id, server_id: str):
|
|
key = ApiKeys.get(ApiKeys.token_id == key_id)
|
|
return server_permissions.get_api_key_permissions_list(key, server_id)
|
|
|
|
@staticmethod
|
|
def get_api_key_permissions_list(key: ApiKeys, server_id: str):
|
|
user = key.user
|
|
if user.superuser and key.superuser:
|
|
return server_permissions.get_permissions_list()
|
|
else:
|
|
roles_list = users_helper.get_user_roles_id(user['user_id'])
|
|
role_server = Role_Servers.select().where(Role_Servers.role_id.in_(roles_list)).where(Role_Servers.server_id == server_id).execute()
|
|
user_permissions_mask = role_server[0].permissions
|
|
key_permissions_mask = key.server_permissions
|
|
permissions_mask = permission_helper.combine_masks(user_permissions_mask, key_permissions_mask)
|
|
permissions_list = server_permissions.get_permissions(permissions_mask)
|
|
return permissions_list
|
|
|
|
|
|
server_permissions = Permissions_Servers()
|