crafty-4/app/classes/web/routes/api/roles/index.py

175 lines
5.0 KiB
Python
Raw Normal View History

2022-05-09 23:08:49 +00:00
import typing as t
from jsonschema import ValidationError, validate
import orjson
from playhouse.shortcuts import model_to_dict
from app.classes.web.base_api_handler import BaseApiHandler
2022-05-09 23:08:49 +00:00
create_role_schema = {
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 1,
2024-06-21 18:00:06 +00:00
"pattern": r"^[^,\[\]]*$",
2022-05-09 23:08:49 +00:00
},
"servers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"server_id": {
2024-03-13 21:47:07 +00:00
"type": "string",
2022-05-09 23:08:49 +00:00
"minimum": 1,
},
"permissions": {
"type": "string",
2024-06-21 18:00:06 +00:00
"pattern": r"^[01]{8}$", # 8 bits, see EnumPermissionsServer
2022-05-09 23:08:49 +00:00
},
},
"required": ["server_id", "permissions"],
},
},
2023-08-16 17:34:52 +00:00
"manager": {"type": ["integer", "null"]},
2022-05-09 23:08:49 +00:00
},
"additionalProperties": False,
2023-08-16 17:34:52 +00:00
"minProperties": 1,
}
basic_create_role_schema = {
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 1,
},
"servers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"server_id": {
2024-03-13 21:47:07 +00:00
"type": "string",
2023-08-16 17:34:52 +00:00
"minimum": 1,
},
"permissions": {
"type": "string",
"pattern": "^[01]{8}$", # 8 bits, see EnumPermissionsServer
},
},
"required": ["server_id", "permissions"],
},
},
},
"additionalProperties": False,
"minProperties": 1,
2022-05-09 23:08:49 +00:00
}
class ApiRolesIndexHandler(BaseApiHandler):
def get(self):
auth_data = self.authenticate_user()
if not auth_data:
return
(
_,
_,
_,
superuser,
_,
2024-05-10 23:31:20 +00:00
_,
) = auth_data
# GET /api/v2/roles?ids=true
get_only_ids = self.get_query_argument("ids", None) == "true"
if not superuser:
return self.finish_json(400, {"status": "error", "error": "NOT_AUTHORIZED"})
self.finish_json(
200,
{
"status": "ok",
"data": (
self.controller.roles.get_all_role_ids()
if get_only_ids
else [
model_to_dict(r) for r in self.controller.roles.get_all_roles()
]
),
},
)
2022-05-09 23:08:49 +00:00
def post(self):
auth_data = self.authenticate_user()
if not auth_data:
return
(
_,
_,
_,
superuser,
user,
2024-05-10 23:31:20 +00:00
_,
2022-05-09 23:08:49 +00:00
) = auth_data
if not superuser:
return self.finish_json(400, {"status": "error", "error": "NOT_AUTHORIZED"})
try:
data = orjson.loads(self.request.body)
except orjson.JSONDecodeError as e:
2022-05-09 23:08:49 +00:00
return self.finish_json(
400, {"status": "error", "error": "INVALID_JSON", "error_data": str(e)}
)
try:
2023-08-16 17:34:52 +00:00
if auth_data[4]["superuser"]:
validate(data, create_role_schema)
else:
validate(data, basic_create_role_schema)
2022-05-09 23:08:49 +00:00
except ValidationError as e:
return self.finish_json(
400,
{
"status": "error",
"error": "INVALID_JSON_SCHEMA",
"error_data": str(e),
},
)
role_name = data["name"]
2023-08-16 17:34:52 +00:00
manager = data.get("manager", None)
if manager == self.controller.users.get_id_by_name("SYSTEM") or manager == 0:
manager = None
2022-05-09 23:08:49 +00:00
# Get the servers
servers_dict = {server["server_id"]: server for server in data["servers"]}
server_ids = (
2022-05-18 10:05:58 +00:00
(
{server["server_id"] for server in data["servers"]}
2022-05-30 05:36:25 +00:00
& set(self.controller.servers.get_all_server_ids())
2022-05-18 10:05:58 +00:00
) # Only allow existing servers
2022-05-09 23:08:49 +00:00
if "servers" in data
else set()
)
servers: t.List[dict] = [servers_dict[server_id] for server_id in server_ids]
if self.controller.roles.get_roleid_by_name(role_name) is not None:
return self.finish_json(
400, {"status": "error", "error": "ROLE_NAME_ALREADY_EXISTS"}
)
2023-08-16 17:34:52 +00:00
role_id = self.controller.roles.add_role_advanced(role_name, servers, manager)
2022-05-09 23:08:49 +00:00
self.controller.management.add_to_audit_log(
user["user_id"],
f"created role {role_name} (RID:{role_id})",
server_id=None,
2022-05-09 23:08:49 +00:00
source_ip=self.get_remote_ip(),
)
self.finish_json(
200,
{"status": "ok", "data": {"role_id": role_id}},
)