crafty-4/app/classes/models/server_permissions.py

270 lines
9.7 KiB
Python
Raw Normal View History

from enum import Enum
2021-09-08 22:01:10 +00:00
import logging
from peewee import (
ForeignKeyField,
CharField,
CompositeKey,
JOIN,
)
2021-09-08 22:01:10 +00:00
from app.classes.models.base_model import BaseModel
2021-09-08 22:01:10 +00:00
from app.classes.models.servers import Servers
from app.classes.models.roles import Roles
from app.classes.models.users import UserRoles, HelperUsers, ApiKeys, Users
from app.classes.shared.permission_helper import PermissionHelper
2021-09-08 22:01:10 +00:00
2022-03-08 04:40:44 +00:00
logger = logging.getLogger(__name__)
2021-09-08 22:01:10 +00:00
# **********************************************************************************
2021-09-08 22:01:10 +00:00
# Role Servers Class
# **********************************************************************************
class RoleServers(BaseModel):
role_id = ForeignKeyField(Roles, backref="role_server")
server_id = ForeignKeyField(Servers, backref="role_server")
2021-09-08 22:01:10 +00:00
permissions = CharField(default="00000000")
class Meta:
table_name = "role_servers"
primary_key = CompositeKey("role_id", "server_id")
2021-09-08 22:01:10 +00:00
# **********************************************************************************
2021-09-08 22:01:10 +00:00
# Servers Permissions Class
# **********************************************************************************
class EnumPermissionsServer(Enum):
COMMANDS = 0
TERMINAL = 1
LOGS = 2
SCHEDULE = 3
BACKUP = 4
FILES = 5
CONFIG = 6
PLAYERS = 7
2021-09-08 22:01:10 +00:00
class PermissionsServers:
2021-09-08 22:01:10 +00:00
@staticmethod
def get_or_create(role_id, server, permissions_mask):
return RoleServers.get_or_create(
role_id=role_id, server_id=server, permissions=permissions_mask
)
2021-09-08 22:01:10 +00:00
@staticmethod
def get_permissions_list():
permissions_list = []
for member in EnumPermissionsServer.__members__.items():
2021-09-08 22:01:10 +00:00
permissions_list.append(member[1])
return permissions_list
@staticmethod
def get_permissions(permissions_mask):
permissions_list = []
for member in EnumPermissionsServer.__members__.items():
if PermissionsServers.has_permission(permissions_mask, member[1]):
2021-09-08 22:01:10 +00:00
permissions_list.append(member[1])
return permissions_list
@staticmethod
def has_permission(permission_mask, permission_tested: EnumPermissionsServer):
return permission_mask[permission_tested.value] == "1"
2021-09-08 22:01:10 +00:00
@staticmethod
def set_permission(
permission_mask, permission_tested: EnumPermissionsServer, value
):
2021-09-08 22:01:10 +00:00
list_perms = list(permission_mask)
list_perms[permission_tested.value] = str(value)
permission_mask = "".join(list_perms)
2021-09-08 22:01:10 +00:00
return permission_mask
@staticmethod
def get_permission(permission_mask, permission_tested: EnumPermissionsServer):
2021-09-08 22:01:10 +00:00
return permission_mask[permission_tested.value]
2022-01-15 00:23:50 +00:00
@staticmethod
def get_token_permissions(permissions_mask, api_permissions_mask):
permissions_list = []
for member in EnumPermissionsServer.__members__.items():
if PermissionHelper.both_have_perm(
permissions_mask, api_permissions_mask, member[1]
):
2022-01-15 00:23:50 +00:00
permissions_list.append(member[1])
return permissions_list
# **********************************************************************************
2021-09-08 22:01:10 +00:00
# Role_Servers Methods
# **********************************************************************************
2021-09-08 22:01:10 +00:00
@staticmethod
def get_role_servers_from_role_id(roleid):
return RoleServers.select().where(RoleServers.role_id == roleid)
2021-09-08 22:01:10 +00:00
@staticmethod
def get_servers_from_role(role_id):
return (
RoleServers.select()
.join(Servers, JOIN.INNER)
.where(RoleServers.role_id == role_id)
)
2021-09-08 22:01:10 +00:00
@staticmethod
def get_roles_from_server(server_id):
return (
RoleServers.select()
.join(Roles, JOIN.INNER)
.where(RoleServers.server_id == server_id)
)
2021-09-08 22:01:10 +00:00
@staticmethod
def add_role_server(server_id, role_id, rs_permissions="00000000"):
servers = RoleServers.insert(
{
RoleServers.server_id: server_id,
RoleServers.role_id: role_id,
RoleServers.permissions: rs_permissions,
}
).execute()
2021-09-08 22:01:10 +00:00
return servers
2021-09-08 22:01:10 +00:00
@staticmethod
def get_permissions_mask(role_id, server_id):
permissions_mask = ""
role_server = (
RoleServers.select()
.where(RoleServers.role_id == role_id)
.where(RoleServers.server_id == server_id)
.get()
)
2021-09-08 22:01:10 +00:00
permissions_mask = role_server.permissions
return permissions_mask
2021-11-29 21:22:46 +00:00
@staticmethod
def get_server_roles(server_id):
role_list = []
roles = RoleServers.select().where(RoleServers.server_id == server_id).execute()
2021-11-29 21:22:46 +00:00
for role in roles:
role_list.append(role.role_id)
return role_list
2021-09-08 22:01:10 +00:00
@staticmethod
def get_role_permissions_list(role_id):
permissions_mask = "00000000"
role_server = RoleServers.get_or_none(RoleServers.role_id == role_id)
if role_server is not None:
2021-09-29 20:08:43 +00:00
permissions_mask = role_server.permissions
permissions_list = PermissionsServers.get_permissions(permissions_mask)
2021-09-08 22:01:10 +00:00
return permissions_list
@staticmethod
def update_role_permission(role_id, server_id, permissions_mask):
role_server = (
RoleServers.select()
.where(RoleServers.role_id == role_id)
.where(RoleServers.server_id == server_id)
.get()
)
2021-09-08 22:01:10 +00:00
role_server.permissions = permissions_mask
RoleServers.save(role_server)
2021-09-08 22:01:10 +00:00
@staticmethod
2022-01-15 00:23:50 +00:00
def delete_roles_permissions(role_id, removed_servers=None):
if removed_servers is None:
removed_servers = {}
return (
RoleServers.delete()
.where(RoleServers.role_id == role_id)
.where(RoleServers.server_id.in_(removed_servers))
.execute()
)
2021-09-08 22:01:10 +00:00
2021-09-25 21:59:01 +00:00
@staticmethod
def remove_roles_of_server(server_id):
return RoleServers.delete().where(RoleServers.server_id == server_id).execute()
2021-09-25 21:59:01 +00:00
2021-09-08 22:01:10 +00:00
@staticmethod
2022-01-15 00:23:50 +00:00
def get_user_id_permissions_mask(user_id, server_id: str):
user = HelperUsers.get_user_model(user_id)
return PermissionsServers.get_user_permissions_mask(user, server_id)
2022-01-15 00:23:50 +00:00
@staticmethod
def get_user_permissions_mask(user: Users, server_id: str):
if user.superuser:
permissions_mask = "1" * len(PermissionsServers.get_permissions_list())
2022-01-15 00:23:50 +00:00
else:
roles_list = HelperUsers.get_user_roles_id(user.user_id)
role_server = (
RoleServers.select()
.where(RoleServers.role_id.in_(roles_list))
.where(RoleServers.server_id == server_id)
.execute()
)
try:
permissions_mask = role_server[0].permissions
except IndexError:
permissions_mask = "0" * len(PermissionsServers.get_permissions_list())
2022-01-15 00:23:50 +00:00
return permissions_mask
@staticmethod
def get_server_user_list(server_id):
final_users = []
server_roles = RoleServers.select().where(RoleServers.server_id == server_id)
super_users = Users.select().where(
Users.superuser == True # pylint: disable=singleton-comparison
)
for role in server_roles:
users = UserRoles.select().where(UserRoles.role_id == role.role_id)
for user in users:
if user.user_id.user_id not in final_users:
final_users.append(user.user_id.user_id)
for suser in super_users:
if suser.user_id not in final_users:
final_users.append(suser.user_id)
return final_users
2022-01-15 00:23:50 +00:00
@staticmethod
def get_user_id_permissions_list(user_id, server_id: str):
user = HelperUsers.get_user_model(user_id)
return PermissionsServers.get_user_permissions_list(user, server_id)
2021-09-08 22:01:10 +00:00
2022-01-15 00:23:50 +00:00
@staticmethod
def get_user_permissions_list(user: Users, server_id: str):
if user.superuser:
permissions_list = PermissionsServers.get_permissions_list()
2021-09-08 22:01:10 +00:00
else:
permissions_mask = PermissionsServers.get_user_permissions_mask(
user, server_id
)
permissions_list = PermissionsServers.get_permissions(permissions_mask)
2021-09-08 22:01:10 +00:00
return permissions_list
2022-01-15 00:23:50 +00:00
@staticmethod
def get_api_key_id_permissions_list(key_id, server_id: str):
key = ApiKeys.get(ApiKeys.token_id == key_id)
return PermissionsServers.get_api_key_permissions_list(key, server_id)
2022-01-15 00:23:50 +00:00
@staticmethod
def get_api_key_permissions_list(key: ApiKeys, server_id: str):
user = HelperUsers.get_user(key.user_id)
if user["superuser"] and key.superuser:
return PermissionsServers.get_permissions_list()
2022-01-15 00:23:50 +00:00
else:
roles_list = HelperUsers.get_user_roles_id(user["user_id"])
role_server = (
RoleServers.select()
.where(RoleServers.role_id.in_(roles_list))
.where(RoleServers.server_id == server_id)
.execute()
)
2022-04-13 01:52:40 +00:00
try:
user_permissions_mask = role_server[0].permissions
except:
if user["superuser"]:
user_permissions_mask = "11111111"
else:
user_permissions_mask = "00000000"
key_permissions_mask = key.server_permissions
permissions_mask = PermissionHelper.combine_masks(
user_permissions_mask, key_permissions_mask
)
permissions_list = PermissionsServers.get_permissions(permissions_mask)
2022-01-15 00:23:50 +00:00
return permissions_list