crafty-4/app/classes/models/server_permissions.py
Zedifus 2a512d7273 Fix files to conform with new Black pylintrc
Mostly just breaking up strings and comments into new lines
Some strings dont require 'f' but keeping in for readability
with the rest of the concatinated string
2022-03-23 06:06:13 +00:00

286 lines
10 KiB
Python

import logging
from app.classes.models.servers import Servers
from app.classes.models.roles import Roles
from app.classes.models.users import User_Roles, users_helper, ApiKeys, Users
from app.classes.shared.helpers import helper
from app.classes.shared.permission_helper import permission_helper
try:
from peewee import (
SqliteDatabase,
Model,
ForeignKeyField,
CharField,
CompositeKey,
JOIN,
)
from enum import Enum
except ModuleNotFoundError as e:
helper.auto_installer_fix(e)
logger = logging.getLogger(__name__)
peewee_logger = logging.getLogger("peewee")
peewee_logger.setLevel(logging.INFO)
database = SqliteDatabase(
helper.db_path, pragmas={"journal_mode": "wal", "cache_size": -1024 * 10}
)
# **********************************************************************************
# Role Servers Class
# **********************************************************************************
class Role_Servers(Model):
role_id = ForeignKeyField(Roles, backref="role_server")
server_id = ForeignKeyField(Servers, backref="role_server")
permissions = CharField(default="00000000")
class Meta:
table_name = "role_servers"
primary_key = CompositeKey("role_id", "server_id")
database = database
# **********************************************************************************
# Servers Permissions Class
# **********************************************************************************
class Enum_Permissions_Server(Enum):
Commands = 0
Terminal = 1
Logs = 2
Schedule = 3
Backup = 4
Files = 5
Config = 6
Players = 7
class Permissions_Servers:
@staticmethod
def get_or_create(role_id, server, permissions_mask):
return Role_Servers.get_or_create(
role_id=role_id, server_id=server, permissions=permissions_mask
)
@staticmethod
def get_permissions_list():
permissions_list = []
for member in Enum_Permissions_Server.__members__.items():
permissions_list.append(member[1])
return permissions_list
@staticmethod
def get_permissions(permissions_mask):
permissions_list = []
for member in Enum_Permissions_Server.__members__.items():
if server_permissions.has_permission(permissions_mask, member[1]):
permissions_list.append(member[1])
return permissions_list
@staticmethod
def has_permission(permission_mask, permission_tested: Enum_Permissions_Server):
return permission_mask[permission_tested.value] == "1"
@staticmethod
def set_permission(
permission_mask, permission_tested: Enum_Permissions_Server, value
):
list_perms = list(permission_mask)
list_perms[permission_tested.value] = str(value)
permission_mask = "".join(list_perms)
return permission_mask
@staticmethod
def get_permission(permission_mask, permission_tested: Enum_Permissions_Server):
return permission_mask[permission_tested.value]
@staticmethod
def get_token_permissions(permissions_mask, api_permissions_mask):
permissions_list = []
for member in Enum_Permissions_Server.__members__.items():
if permission_helper.both_have_perm(
permissions_mask, api_permissions_mask, member[1]
):
permissions_list.append(member[1])
return permissions_list
# **********************************************************************************
# Role_Servers Methods
# **********************************************************************************
@staticmethod
def get_role_servers_from_role_id(roleid):
return Role_Servers.select().where(Role_Servers.role_id == roleid)
@staticmethod
def get_servers_from_role(role_id):
return (
Role_Servers.select()
.join(Servers, JOIN.INNER)
.where(Role_Servers.role_id == role_id)
)
@staticmethod
def get_roles_from_server(server_id):
return (
Role_Servers.select()
.join(Roles, JOIN.INNER)
.where(Role_Servers.server_id == server_id)
)
@staticmethod
def add_role_server(server_id, role_id, rs_permissions="00000000"):
servers = Role_Servers.insert(
{
Role_Servers.server_id: server_id,
Role_Servers.role_id: role_id,
Role_Servers.permissions: rs_permissions,
}
).execute()
return servers
@staticmethod
def get_permissions_mask(role_id, server_id):
permissions_mask = ""
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id == role_id)
.where(Role_Servers.server_id == server_id)
.get()
)
permissions_mask = role_server.permissions
return permissions_mask
@staticmethod
def get_server_roles(server_id):
role_list = []
roles = (
Role_Servers.select().where(Role_Servers.server_id == server_id).execute()
)
for role in roles:
role_list.append(role.role_id)
return role_list
@staticmethod
def get_role_permissions_list(role_id):
permissions_mask = "00000000"
role_server = Role_Servers.get_or_none(Role_Servers.role_id == role_id)
if role_server is not None:
permissions_mask = role_server.permissions
permissions_list = server_permissions.get_permissions(permissions_mask)
return permissions_list
@staticmethod
def update_role_permission(role_id, server_id, permissions_mask):
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id == role_id)
.where(Role_Servers.server_id == server_id)
.get()
)
role_server.permissions = permissions_mask
Role_Servers.save(role_server)
@staticmethod
def delete_roles_permissions(role_id, removed_servers=None):
if removed_servers is None:
removed_servers = {}
return (
Role_Servers.delete()
.where(Role_Servers.role_id == role_id)
.where(Role_Servers.server_id.in_(removed_servers))
.execute()
)
@staticmethod
def remove_roles_of_server(server_id):
with database.atomic():
return (
Role_Servers.delete()
.where(Role_Servers.server_id == server_id)
.execute()
)
@staticmethod
def get_user_id_permissions_mask(user_id, server_id: str):
user = users_helper.get_user_model(user_id)
return server_permissions.get_user_permissions_mask(user, server_id)
@staticmethod
def get_user_permissions_mask(user: Users, server_id: str):
if user.superuser:
permissions_mask = "1" * len(server_permissions.get_permissions_list())
else:
roles_list = users_helper.get_user_roles_id(user.user_id)
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id.in_(roles_list))
.where(Role_Servers.server_id == server_id)
.execute()
)
try:
permissions_mask = role_server[0].permissions
except IndexError:
permissions_mask = "0" * len(server_permissions.get_permissions_list())
return permissions_mask
@staticmethod
def get_server_user_list(server_id):
final_users = []
server_roles = Role_Servers.select().where(Role_Servers.server_id == server_id)
# pylint: disable=singleton-comparison
super_users = Users.select().where(Users.superuser == True)
for role in server_roles:
users = User_Roles.select().where(User_Roles.role_id == role.role_id)
for user in users:
if user.user_id.user_id not in final_users:
final_users.append(user.user_id.user_id)
for suser in super_users:
if suser.user_id not in final_users:
final_users.append(suser.user_id)
return final_users
@staticmethod
def get_user_id_permissions_list(user_id, server_id: str):
user = users_helper.get_user_model(user_id)
return server_permissions.get_user_permissions_list(user, server_id)
@staticmethod
def get_user_permissions_list(user: Users, server_id: str):
if user.superuser:
permissions_list = server_permissions.get_permissions_list()
else:
permissions_mask = server_permissions.get_user_permissions_mask(
user, server_id
)
permissions_list = server_permissions.get_permissions(permissions_mask)
return permissions_list
@staticmethod
def get_api_key_id_permissions_list(key_id, server_id: str):
key = ApiKeys.get(ApiKeys.token_id == key_id)
return server_permissions.get_api_key_permissions_list(key, server_id)
@staticmethod
def get_api_key_permissions_list(key: ApiKeys, server_id: str):
user = key.user
if user.superuser and key.superuser:
return server_permissions.get_permissions_list()
else:
roles_list = users_helper.get_user_roles_id(user["user_id"])
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id.in_(roles_list))
.where(Role_Servers.server_id == server_id)
.execute()
)
user_permissions_mask = role_server[0].permissions
key_permissions_mask = key.server_permissions
permissions_mask = permission_helper.combine_masks(
user_permissions_mask, key_permissions_mask
)
permissions_list = server_permissions.get_permissions(permissions_mask)
return permissions_list
server_permissions = Permissions_Servers()