crafty-4/app/classes/web/base_handler.py

267 lines
8.8 KiB
Python

import logging
import re
import typing as t
import orjson
import nh3
import tornado.web
from app.classes.models.crafty_permissions import EnumPermissionsCrafty
from app.classes.models.users import ApiKeys
from app.classes.shared.helpers import Helpers
from app.classes.shared.file_helpers import FileHelpers
from app.classes.shared.main_controller import Controller
from app.classes.shared.translation import Translation
from app.classes.shared.main_models import DatabaseShortcuts
from app.classes.models.users import DoesNotExist
logger = logging.getLogger(__name__)
auth_log = logging.getLogger("auth")
bearer_pattern = re.compile(r"^Bearer ", flags=re.IGNORECASE)
class BaseHandler(tornado.web.RequestHandler):
nobleach = {bool, type(None)}
redactables = ("pass", "api")
helper: Helpers
controller: Controller
translator: Translation
file_helper: FileHelpers
# noinspection PyAttributeOutsideInit
def initialize(
self,
helper=None,
controller=None,
tasks_manager=None,
translator=None,
file_helper=None,
):
self.helper = helper
self.controller = controller
self.tasks_manager = tasks_manager
self.translator = translator
self.file_helper = file_helper
def set_default_headers(self) -> None:
"""
Fix CORS
"""
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header(
"Access-Control-Allow-Headers",
"Content-Type, x-requested-with, Authorization",
)
self.set_header(
"Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS"
)
def options(self, *_, **__):
"""
Fix CORS
"""
# no body
self.set_status(204)
self.finish()
def get_remote_ip(self):
remote_ip = (
self.request.headers.get("X-Real-IP")
or self.request.headers.get("X-Forwarded-For")
or self.request.remote_ip
)
return remote_ip
current_user: t.Tuple[t.Optional[ApiKeys], t.Dict[str, t.Any], t.Dict[str, t.Any]]
"""
A variable that contains the current user's data. Please see
Please only use this with routes using the `@tornado.web.authenticated` decorator.
"""
def get_current_user(
self,
) -> t.Optional[
t.Tuple[t.Optional[ApiKeys], t.Dict[str, t.Any], t.Dict[str, t.Any]]
]:
"""
Get the token's API key, the token's payload and user data.
Returns:
t.Optional[ApiKeys]: The API key of the token.
t.Dict[str, t.Any]: The token's payload.
t.Dict[str, t.Any]: The user's data from the database.
"""
try:
return self.controller.authentication.check(self.get_cookie("token"))
except DoesNotExist:
return None
def autobleach(self, name, text):
for r in self.redactables:
if r in name:
logger.debug(f"Auto-bleaching {name}: [**REDACTED**]")
break
logger.debug(f"Auto-bleaching {name}: {text}")
if type(text) in self.nobleach:
logger.debug("Auto-bleaching - bypass type")
return text
return nh3.clean(text) # pylint: disable=no-member
def get_argument(
self,
name: str,
default: t.Union[
None, str, tornado.web._ArgDefaultMarker
] = tornado.web._ARG_DEFAULT,
strip: bool = True,
) -> t.Optional[str]:
arg = self._get_argument(name, default, self.request.arguments, strip)
bleached = self.autobleach(name, arg)
if "&" in str(bleached):
bleached = bleached.replace("&", "&")
return bleached
def get_arguments(self, name: str, strip: bool = True) -> t.List[str]:
if not isinstance(strip, bool):
raise AssertionError
args = self._get_arguments(name, self.request.arguments, strip)
args_ret = []
for arg in args:
args_ret += self.autobleach(name, arg)
return args_ret
def access_denied(self, user: t.Optional[str], reason: t.Optional[str]):
ip = self.get_remote_ip()
route = self.request.path
if user is not None:
user_data = f"User {user} from IP {ip}"
else:
user_data = f"An unknown user from IP {ip}"
if reason:
ending = f"to the API route {route} because {reason}"
else:
ending = f"to the API route {route}"
logger.info(f"{user_data} was denied access {ending}")
self.finish_json(
403,
{
"status": "error",
"error": "ACCESS_DENIED",
"info": "You were denied access to the requested resource",
},
)
def _auth_get_api_token(self) -> t.Optional[str]:
"""Get an API token from the request
The API token is searched in the following order:
1. The `token` query parameter
2. The `Authorization` header
3. The `token` cookie
Returns:
t.Optional[str]: The API token or None if no token was found.
"""
logger.debug("Searching for specified token")
api_token = self.get_query_argument("token", None)
if api_token is None and self.request.headers.get("Authorization"):
api_token = bearer_pattern.sub(
"", self.request.headers.get("Authorization")
)
elif api_token is None:
api_token = self.get_cookie("token")
return api_token
def authenticate_user(
self,
) -> t.Optional[
t.Tuple[
t.List,
t.List[EnumPermissionsCrafty],
t.List[str],
bool,
t.Dict[str, t.Any],
]
]:
try:
api_key, _token_data, user = self.controller.authentication.check_err(
self._auth_get_api_token()
)
superuser = user["superuser"]
if api_key is not None:
superuser = superuser and api_key.full_access
exec_user_role = set()
if superuser:
authorized_servers = self.controller.servers.get_all_defined_servers()
exec_user_role.add("Super User")
exec_user_crafty_permissions = (
self.controller.crafty_perms.list_defined_crafty_permissions()
)
else:
if api_key is not None:
exec_user_crafty_permissions = (
self.controller.crafty_perms.get_api_key_permissions_list(
api_key
)
)
else:
exec_user_crafty_permissions = (
self.controller.crafty_perms.get_crafty_permissions_list(
user["user_id"]
)
)
logger.debug(user["roles"])
for r in user["roles"]:
role = self.controller.roles.get_role(r)
exec_user_role.add(role["role_name"])
authorized_servers = self.controller.servers.get_authorized_servers(
user["user_id"] # TODO: API key authorized servers?
)
authorized_servers = [
DatabaseShortcuts.get_data_obj(x.server_object)
for x in authorized_servers
]
logger.debug("Checking results")
if user:
return (
authorized_servers,
exec_user_crafty_permissions,
exec_user_role,
superuser,
user,
)
logging.debug("Auth unsuccessful")
auth_log.error(
f"Authentication attempted from {self.get_remote_ip()}. Invalid token"
)
self.access_denied(None, "the user provided an invalid token")
return None
except Exception as auth_exception:
auth_log.error(
f"Authentication attempted from {self.get_remote_ip()}."
f" Error: {auth_exception}"
)
logger.debug(
"An error occured while authenticating an API user:",
exc_info=auth_exception,
)
self.finish_json(
403,
{
"status": "error",
"error": "ACCESS_DENIED",
"info": "An error occured while authenticating the user",
},
)
return None
def finish_json(self, status: int, data: t.Dict[str, t.Any]):
self.set_status(status)
self.set_header("Content-Type", "application/json")
self.finish(orjson.dumps(data))