crafty-4/app/classes/models/server_permissions.py

286 lines
10 KiB
Python
Raw Normal View History

2021-09-08 22:01:10 +00:00
import logging
from app.classes.models.servers import Servers
from app.classes.models.roles import Roles
from app.classes.models.users import User_Roles, users_helper, ApiKeys, Users
2022-03-08 04:40:44 +00:00
from app.classes.shared.helpers import helper
from app.classes.shared.permission_helper import permission_helper
2021-09-08 22:01:10 +00:00
try:
from peewee import (
SqliteDatabase,
Model,
ForeignKeyField,
CharField,
CompositeKey,
JOIN,
)
2021-09-08 22:01:10 +00:00
from enum import Enum
except ModuleNotFoundError as e:
2022-03-08 04:40:44 +00:00
helper.auto_installer_fix(e)
2021-09-08 22:01:10 +00:00
2022-03-08 04:40:44 +00:00
logger = logging.getLogger(__name__)
peewee_logger = logging.getLogger("peewee")
2022-03-08 04:40:44 +00:00
peewee_logger.setLevel(logging.INFO)
database = SqliteDatabase(
helper.db_path, pragmas={"journal_mode": "wal", "cache_size": -1024 * 10}
)
2021-09-08 22:01:10 +00:00
# **********************************************************************************
2021-09-08 22:01:10 +00:00
# Role Servers Class
# **********************************************************************************
2021-09-08 22:01:10 +00:00
class Role_Servers(Model):
role_id = ForeignKeyField(Roles, backref="role_server")
server_id = ForeignKeyField(Servers, backref="role_server")
2021-09-08 22:01:10 +00:00
permissions = CharField(default="00000000")
class Meta:
table_name = "role_servers"
primary_key = CompositeKey("role_id", "server_id")
2021-09-08 22:01:10 +00:00
database = database
# **********************************************************************************
2021-09-08 22:01:10 +00:00
# Servers Permissions Class
# **********************************************************************************
2021-09-08 22:01:10 +00:00
class Enum_Permissions_Server(Enum):
Commands = 0
Terminal = 1
Logs = 2
Schedule = 3
Backup = 4
Files = 5
Config = 6
Players = 7
class Permissions_Servers:
2021-09-08 22:01:10 +00:00
@staticmethod
def get_or_create(role_id, server, permissions_mask):
return Role_Servers.get_or_create(
role_id=role_id, server_id=server, permissions=permissions_mask
)
2021-09-08 22:01:10 +00:00
@staticmethod
def get_permissions_list():
permissions_list = []
for member in Enum_Permissions_Server.__members__.items():
permissions_list.append(member[1])
return permissions_list
@staticmethod
def get_permissions(permissions_mask):
permissions_list = []
for member in Enum_Permissions_Server.__members__.items():
if server_permissions.has_permission(permissions_mask, member[1]):
permissions_list.append(member[1])
return permissions_list
@staticmethod
def has_permission(permission_mask, permission_tested: Enum_Permissions_Server):
return permission_mask[permission_tested.value] == "1"
2021-09-08 22:01:10 +00:00
@staticmethod
def set_permission(
permission_mask, permission_tested: Enum_Permissions_Server, value
):
2021-09-08 22:01:10 +00:00
list_perms = list(permission_mask)
list_perms[permission_tested.value] = str(value)
permission_mask = "".join(list_perms)
2021-09-08 22:01:10 +00:00
return permission_mask
@staticmethod
def get_permission(permission_mask, permission_tested: Enum_Permissions_Server):
return permission_mask[permission_tested.value]
2022-01-15 00:23:50 +00:00
@staticmethod
def get_token_permissions(permissions_mask, api_permissions_mask):
permissions_list = []
for member in Enum_Permissions_Server.__members__.items():
if permission_helper.both_have_perm(
permissions_mask, api_permissions_mask, member[1]
):
2022-01-15 00:23:50 +00:00
permissions_list.append(member[1])
return permissions_list
# **********************************************************************************
2021-09-08 22:01:10 +00:00
# Role_Servers Methods
# **********************************************************************************
2021-09-08 22:01:10 +00:00
@staticmethod
def get_role_servers_from_role_id(roleid):
return Role_Servers.select().where(Role_Servers.role_id == roleid)
@staticmethod
def get_servers_from_role(role_id):
return (
Role_Servers.select()
.join(Servers, JOIN.INNER)
.where(Role_Servers.role_id == role_id)
)
2021-09-08 22:01:10 +00:00
@staticmethod
def get_roles_from_server(server_id):
return (
Role_Servers.select()
.join(Roles, JOIN.INNER)
.where(Role_Servers.server_id == server_id)
)
2021-09-08 22:01:10 +00:00
@staticmethod
def add_role_server(server_id, role_id, rs_permissions="00000000"):
servers = Role_Servers.insert(
{
Role_Servers.server_id: server_id,
Role_Servers.role_id: role_id,
Role_Servers.permissions: rs_permissions,
}
).execute()
2021-09-08 22:01:10 +00:00
return servers
2021-09-08 22:01:10 +00:00
@staticmethod
def get_permissions_mask(role_id, server_id):
permissions_mask = ""
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id == role_id)
.where(Role_Servers.server_id == server_id)
.get()
)
2021-09-08 22:01:10 +00:00
permissions_mask = role_server.permissions
return permissions_mask
2021-11-29 21:22:46 +00:00
@staticmethod
def get_server_roles(server_id):
role_list = []
roles = (
Role_Servers.select().where(Role_Servers.server_id == server_id).execute()
)
2021-11-29 21:22:46 +00:00
for role in roles:
role_list.append(role.role_id)
return role_list
2021-09-08 22:01:10 +00:00
@staticmethod
def get_role_permissions_list(role_id):
permissions_mask = "00000000"
role_server = Role_Servers.get_or_none(Role_Servers.role_id == role_id)
if role_server is not None:
2021-09-29 20:08:43 +00:00
permissions_mask = role_server.permissions
2021-09-08 22:01:10 +00:00
permissions_list = server_permissions.get_permissions(permissions_mask)
return permissions_list
@staticmethod
def update_role_permission(role_id, server_id, permissions_mask):
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id == role_id)
.where(Role_Servers.server_id == server_id)
.get()
)
2021-09-08 22:01:10 +00:00
role_server.permissions = permissions_mask
Role_Servers.save(role_server)
@staticmethod
2022-01-15 00:23:50 +00:00
def delete_roles_permissions(role_id, removed_servers=None):
if removed_servers is None:
removed_servers = {}
return (
Role_Servers.delete()
.where(Role_Servers.role_id == role_id)
.where(Role_Servers.server_id.in_(removed_servers))
.execute()
)
2021-09-08 22:01:10 +00:00
2021-09-25 21:59:01 +00:00
@staticmethod
def remove_roles_of_server(server_id):
with database.atomic():
return (
Role_Servers.delete()
.where(Role_Servers.server_id == server_id)
.execute()
)
2021-09-25 21:59:01 +00:00
2021-09-08 22:01:10 +00:00
@staticmethod
2022-01-15 00:23:50 +00:00
def get_user_id_permissions_mask(user_id, server_id: str):
user = users_helper.get_user_model(user_id)
return server_permissions.get_user_permissions_mask(user, server_id)
@staticmethod
def get_user_permissions_mask(user: Users, server_id: str):
if user.superuser:
permissions_mask = "1" * len(server_permissions.get_permissions_list())
2022-01-15 00:23:50 +00:00
else:
roles_list = users_helper.get_user_roles_id(user.user_id)
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id.in_(roles_list))
.where(Role_Servers.server_id == server_id)
.execute()
)
try:
permissions_mask = role_server[0].permissions
except IndexError:
permissions_mask = "0" * len(server_permissions.get_permissions_list())
2022-01-15 00:23:50 +00:00
return permissions_mask
@staticmethod
def get_server_user_list(server_id):
final_users = []
server_roles = Role_Servers.select().where(Role_Servers.server_id == server_id)
# pylint: disable=singleton-comparison
super_users = Users.select().where(Users.superuser == True)
for role in server_roles:
users = User_Roles.select().where(User_Roles.role_id == role.role_id)
for user in users:
if user.user_id.user_id not in final_users:
final_users.append(user.user_id.user_id)
for suser in super_users:
if suser.user_id not in final_users:
final_users.append(suser.user_id)
return final_users
2022-01-15 00:23:50 +00:00
@staticmethod
def get_user_id_permissions_list(user_id, server_id: str):
user = users_helper.get_user_model(user_id)
return server_permissions.get_user_permissions_list(user, server_id)
2021-09-08 22:01:10 +00:00
2022-01-15 00:23:50 +00:00
@staticmethod
def get_user_permissions_list(user: Users, server_id: str):
if user.superuser:
2021-09-08 22:01:10 +00:00
permissions_list = server_permissions.get_permissions_list()
else:
permissions_mask = server_permissions.get_user_permissions_mask(
user, server_id
)
2021-09-08 22:01:10 +00:00
permissions_list = server_permissions.get_permissions(permissions_mask)
return permissions_list
2022-01-15 00:23:50 +00:00
@staticmethod
def get_api_key_id_permissions_list(key_id, server_id: str):
key = ApiKeys.get(ApiKeys.token_id == key_id)
return server_permissions.get_api_key_permissions_list(key, server_id)
@staticmethod
def get_api_key_permissions_list(key: ApiKeys, server_id: str):
user = key.user
if user.superuser and key.superuser:
return server_permissions.get_permissions_list()
else:
roles_list = users_helper.get_user_roles_id(user["user_id"])
role_server = (
Role_Servers.select()
.where(Role_Servers.role_id.in_(roles_list))
.where(Role_Servers.server_id == server_id)
.execute()
)
2022-01-15 00:23:50 +00:00
user_permissions_mask = role_server[0].permissions
key_permissions_mask = key.server_permissions
permissions_mask = permission_helper.combine_masks(
user_permissions_mask, key_permissions_mask
)
2022-01-15 00:23:50 +00:00
permissions_list = server_permissions.get_permissions(permissions_mask)
return permissions_list
server_permissions = Permissions_Servers()