diff --git a/app/classes/shared/models.py b/app/classes/shared/models.py index 4b470eb8..3336f235 100644 --- a/app/classes/shared/models.py +++ b/app/classes/shared/models.py @@ -502,6 +502,43 @@ class db_shortcuts: return (Users.get(Users.username == username)).user_id except DoesNotExist: return None + + @staticmethod + def get_user_by_api_token(token: str): + query = Users.select().where(Users.api_token == token) + + if query.exists(): + user = model_to_dict(Users.get(Users.api_token == token)) + # I know it should apply it without setting it but I'm just making sure + user = db_shortcuts.add_user_roles(user) + return user + else: + return {} + + @staticmethod + def add_user_roles(user): + if type(user) == dict: + user_id = user['user_id'] + else: + user_id = user.user_id + + # I just copied this code from get_user, it had those TODOs & comments made by mac - Lukas + + roles_query = User_Roles.select().join(Roles, JOIN.INNER).where(User_Roles.user_id == user_id) + # TODO: this query needs to be narrower + roles = set() + for r in roles_query: + roles.add(r.role_id.role_id) + #servers_query = User_Servers.select().join(Servers, JOIN.INNER).where(User_Servers.user_id == user_id) + ## TODO: this query needs to be narrower + servers = set() + #for s in servers_query: + # servers.add(s.server_id.server_id) + user['roles'] = roles + #user['servers'] = servers + #logger.debug("user: ({}) {}".format(user_id, user)) + return user + @staticmethod def get_user(user_id): @@ -523,19 +560,8 @@ class db_shortcuts: user = model_to_dict(Users.get(Users.user_id == user_id)) if user: - roles_query = User_Roles.select().join(Roles, JOIN.INNER).where(User_Roles.user_id == user_id) - # TODO: this query needs to be narrower - roles = set() - for r in roles_query: - roles.add(r.role_id.role_id) - #servers_query = User_Servers.select().join(Servers, JOIN.INNER).where(User_Servers.user_id == user_id) - ## TODO: this query needs to be narrower - servers = set() - #for s in servers_query: - # servers.add(s.server_id.server_id) - user['roles'] = roles - #user['servers'] = servers - #logger.debug("user: ({}) {}".format(user_id, user)) + # I know it should apply it without setting it but I'm just making sure + user = db_shortcuts.add_user_roles(user) return user else: #logger.debug("user: ({}) {}".format(user_id, {})) diff --git a/app/classes/web/api_handler.py b/app/classes/web/api_handler.py index f0e57e07..933b6235 100644 --- a/app/classes/web/api_handler.py +++ b/app/classes/web/api_handler.py @@ -6,53 +6,63 @@ import tornado.escape import logging from app.classes.web.base_handler import BaseHandler -from app.classes.shared.models import Users +from app.classes.shared.models import db_shortcuts log = logging.getLogger(__name__) class ApiHandler(BaseHandler): - def return_response(self, data: dict): + def return_response(self, status: int, data: dict): # Define a standardized response + self.set_status(status) self.write(data) - def access_denied(self, user): - log.info("User %s was denied access to API route", user) - self.set_status(403) - self.finish(self.return_response(403, {'error':'ACCESS_DENIED', 'info':'You were denied access to the requested resource'})) + def access_denied(self, user, reason=''): + if reason: reason = ' because ' + reason + log.info("User %s from IP %s was denied access to the API route " + self.request.path + reason, user, self.get_remote_ip()) + self.finish(self.return_response(403, { + 'error':'ACCESS_DENIED', + 'info':'You were denied access to the requested resource' + })) - def authenticate_user(self): + def authenticate_user(self) -> bool: try: log.debug("Searching for specified token") # TODO: YEET THIS - user_data = Users.get(api_token=self.get_argument('token')) + user_data = db_shortcuts.get_user_by_api_token(self.get_argument('token')) log.debug("Checking results") if user_data: # Login successful! Check perms - log.info("User {} has authenticated to API".format(user_data.username)) + log.info("User {} has authenticated to API".format(user_data['username'])) # TODO: Role check + + return True # This is to set the "authenticated" else: logging.debug("Auth unsuccessful") - return self.access_denied("unknown") - except: - log.warning("Traceback occurred when authenticating user to API. Most likely wrong token") - return self.access_denied("unknown") - pass + self.access_denied("unknown", "the user provided an invalid token") + return False + except Exception as e: + log.warning("An error occured while authenticating an API user: %s", e) + self.access_denied("unknown"), "an error occured while authenticating the user" + return False class ServersStats(ApiHandler): def get(self): """Get details about all servers""" - self.authenticate_user() + authenticated = self.authenticate_user() + if not authenticated: return # Get server stats + # TODO Check perms self.finish(self.write({"servers": self.controller.stats.get_servers_stats()})) class NodeStats(ApiHandler): def get(self): """Get stats for particular node""" - self.authenticate_user() + authenticated = self.authenticate_user() + if not authenticated: return # Get node stats node_stats = self.controller.stats.get_node_stats() node_stats.pop("servers")