crafty-4/app/classes/controllers/roles_controller.py
Andrew 552a2e26c2 Working role managers.
TODO: Add super user select option
2022-08-20 08:49:38 -04:00

185 lines
6.7 KiB
Python

import logging
import typing as t
from app.classes.models.roles import HelperRoles
from app.classes.models.server_permissions import PermissionsServers, RoleServers
from app.classes.shared.helpers import Helpers
logger = logging.getLogger(__name__)
class RolesController:
def __init__(self, users_helper, roles_helper):
self.users_helper = users_helper
self.roles_helper = roles_helper
@staticmethod
def get_all_roles():
return HelperRoles.get_all_roles()
@staticmethod
def get_all_role_ids():
return HelperRoles.get_all_role_ids()
@staticmethod
def get_roleid_by_name(role_name):
return HelperRoles.get_roleid_by_name(role_name)
@staticmethod
def get_role(role_id):
return HelperRoles.get_role(role_id)
@staticmethod
def update_role(role_id: str, role_data=None, permissions_mask: str = "00000000"):
if role_data is None:
role_data = {}
base_data = RolesController.get_role_with_servers(role_id)
up_data = {}
added_servers = set()
removed_servers = set()
for key in role_data:
if key == "role_id":
continue
if key == "servers":
added_servers = set(role_data["servers"]).difference(
set(base_data["servers"])
)
removed_servers = set(base_data["servers"]).difference(
set(role_data["servers"])
)
elif base_data[key] != role_data[key]:
up_data[key] = role_data[key]
up_data["last_update"] = Helpers.get_time_as_string()
logger.debug(
f"role: {role_data} +server:{added_servers} -server{removed_servers}"
)
for server in added_servers:
PermissionsServers.get_or_create(role_id, server, permissions_mask)
for server in base_data["servers"]:
PermissionsServers.update_role_permission(role_id, server, permissions_mask)
# TODO: This is horribly inefficient and we should be using bulk queries
# but im going for functionality at this point
PermissionsServers.delete_roles_permissions(role_id, removed_servers)
if up_data:
HelperRoles.update_role(role_id, up_data)
@staticmethod
def add_role(role_name, manager):
return HelperRoles.add_role(role_name, manager)
@staticmethod
def get_managed_roles(exec_user_id):
return HelperRoles.get_managed_roles(exec_user_id)
class RoleServerJsonType(t.TypedDict):
server_id: t.Union[str, int]
permissions: str
@staticmethod
def get_server_ids_and_perms_from_role(
role_id: t.Union[str, int]
) -> t.List[RoleServerJsonType]:
# FIXME: somehow retrieve only the server ids, not the whole servers
return [
{
"server_id": role_servers.server_id.server_id,
"permissions": role_servers.permissions,
}
for role_servers in (
RoleServers.select(
RoleServers.server_id, RoleServers.permissions
).where(RoleServers.role_id == role_id)
)
]
@staticmethod
def add_role_advanced(
name: str,
servers: t.Iterable[RoleServerJsonType],
manager: int,
) -> int:
"""Add a role with a name and a list of servers
Args:
name (str): The new role's name
servers (t.List[RoleServerJsonType]): The new role's servers
Returns:
int: The new role's ID
"""
role_id: t.Final[int] = HelperRoles.add_role(name, manager)
for server in servers:
PermissionsServers.get_or_create(
role_id, server["server_id"], server["permissions"]
)
return role_id
@staticmethod
def update_role_advanced(
role_id: t.Union[str, int],
role_name: t.Optional[str],
servers: t.Optional[t.Iterable[RoleServerJsonType]],
) -> None:
"""Update a role with a name and a list of servers
Args:
role_id (t.Union[str, int]): The ID of the role to be modified
role_name (t.Optional[str]): An optional new name for the role
servers (t.Optional[t.Iterable[RoleServerJsonType]]): An optional list of servers for the role
""" # pylint: disable=line-too-long
logger.debug(f"updating role {role_id} with advanced options")
if servers is not None:
base_data = RolesController.get_role_with_servers(role_id)
server_ids = {server["server_id"] for server in servers}
server_permissions_map = {
server["server_id"]: server["permissions"] for server in servers
}
added_servers = server_ids.difference(set(base_data["servers"]))
removed_servers = set(base_data["servers"]).difference(server_ids)
same_servers = server_ids.intersection(set(base_data["servers"]))
logger.debug(
f"role: {role_id} +server:{added_servers} -server{removed_servers}"
)
for server_id in added_servers:
PermissionsServers.get_or_create(
role_id, server_id, server_permissions_map[server_id]
)
if len(removed_servers) != 0:
PermissionsServers.delete_roles_permissions(role_id, removed_servers)
for server_id in same_servers:
PermissionsServers.update_role_permission(
role_id, server_id, server_permissions_map[server_id]
)
if role_name is not None:
up_data = {
"role_name": role_name,
"last_update": Helpers.get_time_as_string(),
}
# TODO: do the last_update on the db side
HelperRoles.update_role(role_id, up_data)
def remove_role(self, role_id):
role_data = RolesController.get_role_with_servers(role_id)
PermissionsServers.delete_roles_permissions(role_id, role_data["servers"])
self.users_helper.remove_roles_from_role_id(role_id)
return self.roles_helper.remove_role(role_id)
@staticmethod
def role_id_exists(role_id):
return HelperRoles.role_id_exists(role_id)
@staticmethod
def get_role_with_servers(role_id):
role = HelperRoles.get_role(role_id)
if role:
server_ids = PermissionsServers.get_server_ids_from_role(role_id)
role["servers"] = server_ids
# logger.debug("role: ({}) {}".format(role_id, role))
return role
# logger.debug("role: ({}) {}".format(role_id, {}))
return {}