2022-05-09 23:08:49 +00:00
|
|
|
import typing as t
|
|
|
|
from jsonschema import ValidationError, validate
|
|
|
|
import orjson
|
2022-05-07 21:07:55 +00:00
|
|
|
from playhouse.shortcuts import model_to_dict
|
|
|
|
from app.classes.web.base_api_handler import BaseApiHandler
|
|
|
|
|
2022-05-09 23:08:49 +00:00
|
|
|
create_role_schema = {
|
|
|
|
"type": "object",
|
|
|
|
"properties": {
|
|
|
|
"name": {
|
|
|
|
"type": "string",
|
|
|
|
"minLength": 1,
|
|
|
|
},
|
|
|
|
"servers": {
|
|
|
|
"type": "array",
|
|
|
|
"items": {
|
|
|
|
"type": "object",
|
|
|
|
"properties": {
|
|
|
|
"server_id": {
|
|
|
|
"type": "integer",
|
|
|
|
"minimum": 1,
|
|
|
|
},
|
|
|
|
"permissions": {
|
|
|
|
"type": "string",
|
|
|
|
"pattern": "^[01]{8}$", # 8 bits, see EnumPermissionsServer
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"required": ["server_id", "permissions"],
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"required": ["name"],
|
|
|
|
"additionalProperties": False,
|
|
|
|
}
|
|
|
|
|
2022-05-07 21:07:55 +00:00
|
|
|
|
|
|
|
class ApiRolesIndexHandler(BaseApiHandler):
|
|
|
|
def get(self):
|
|
|
|
auth_data = self.authenticate_user()
|
|
|
|
if not auth_data:
|
|
|
|
return
|
|
|
|
(
|
|
|
|
_,
|
|
|
|
_,
|
|
|
|
_,
|
|
|
|
superuser,
|
|
|
|
_,
|
|
|
|
) = auth_data
|
|
|
|
|
|
|
|
# GET /api/v2/roles?ids=true
|
|
|
|
get_only_ids = self.get_query_argument("ids", None) == "true"
|
|
|
|
|
|
|
|
if not superuser:
|
|
|
|
return self.finish_json(400, {"status": "error", "error": "NOT_AUTHORIZED"})
|
|
|
|
|
|
|
|
self.finish_json(
|
|
|
|
200,
|
|
|
|
{
|
|
|
|
"status": "ok",
|
|
|
|
"data": self.controller.roles.get_all_role_ids()
|
|
|
|
if get_only_ids
|
|
|
|
else [model_to_dict(r) for r in self.controller.roles.get_all_roles()],
|
|
|
|
},
|
|
|
|
)
|
2022-05-09 23:08:49 +00:00
|
|
|
|
|
|
|
def post(self):
|
|
|
|
auth_data = self.authenticate_user()
|
|
|
|
if not auth_data:
|
|
|
|
return
|
|
|
|
(
|
|
|
|
_,
|
|
|
|
_,
|
|
|
|
_,
|
|
|
|
superuser,
|
|
|
|
user,
|
|
|
|
) = auth_data
|
|
|
|
|
|
|
|
if not superuser:
|
|
|
|
return self.finish_json(400, {"status": "error", "error": "NOT_AUTHORIZED"})
|
|
|
|
|
|
|
|
try:
|
|
|
|
data = orjson.loads(self.request.body) # pylint: disable=no-member
|
|
|
|
except orjson.decoder.JSONDecodeError as e: # pylint: disable=no-member
|
|
|
|
return self.finish_json(
|
|
|
|
400, {"status": "error", "error": "INVALID_JSON", "error_data": str(e)}
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
|
|
|
validate(data, create_role_schema)
|
|
|
|
except ValidationError as e:
|
|
|
|
return self.finish_json(
|
|
|
|
400,
|
|
|
|
{
|
|
|
|
"status": "error",
|
|
|
|
"error": "INVALID_JSON_SCHEMA",
|
|
|
|
"error_data": str(e),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
role_name = data["name"]
|
|
|
|
|
|
|
|
# Get the servers
|
|
|
|
servers_dict = {server["server_id"]: server for server in data["servers"]}
|
|
|
|
server_ids = (
|
2022-05-18 10:05:58 +00:00
|
|
|
(
|
|
|
|
{server["server_id"] for server in data["servers"]}
|
|
|
|
& set(self.controller.get_all_server_ids())
|
|
|
|
) # Only allow existing servers
|
2022-05-09 23:08:49 +00:00
|
|
|
if "servers" in data
|
|
|
|
else set()
|
|
|
|
)
|
|
|
|
servers: t.List[dict] = [servers_dict[server_id] for server_id in server_ids]
|
|
|
|
|
|
|
|
if self.controller.roles.get_roleid_by_name(role_name) is not None:
|
|
|
|
return self.finish_json(
|
|
|
|
400, {"status": "error", "error": "ROLE_NAME_ALREADY_EXISTS"}
|
|
|
|
)
|
|
|
|
|
|
|
|
role_id = self.controller.roles.add_role_advanced(role_name, servers)
|
|
|
|
|
|
|
|
self.controller.management.add_to_audit_log(
|
|
|
|
user["user_id"],
|
|
|
|
f"created role {role_name} (RID:{role_id})",
|
|
|
|
server_id=0,
|
|
|
|
source_ip=self.get_remote_ip(),
|
|
|
|
)
|
|
|
|
|
|
|
|
self.finish_json(
|
|
|
|
200,
|
|
|
|
{"status": "ok", "data": {"role_id": role_id}},
|
|
|
|
)
|