crafty-4/app/classes/shared/authentication.py

95 lines
3.2 KiB
Python
Raw Normal View History

2022-01-15 00:23:50 +00:00
import logging
import time
from datetime import datetime
2022-01-15 00:23:50 +00:00
from typing import Optional, Dict, Any, Tuple
import jwt
from jwt import PyJWTError
2022-01-15 00:23:50 +00:00
from app.classes.models.users import HelperUsers, ApiKeys
from app.classes.controllers.management_controller import ManagementController
2022-03-08 04:40:44 +00:00
logger = logging.getLogger(__name__)
2022-01-15 00:23:50 +00:00
2022-01-15 00:23:50 +00:00
class Authentication:
def __init__(self, helper):
self.helper = helper
try:
self.secret = ManagementController.get_crafty_api_key()
if self.secret == "":
self.secret = self.helper.random_string_generator(64)
ManagementController.set_crafty_api_key(str(self.secret))
except:
self.secret = self.helper.random_string_generator(64)
ManagementController.set_crafty_api_key(str(self.secret))
2022-01-15 00:23:50 +00:00
def generate(self, user_id, extra=None):
2022-01-15 00:23:50 +00:00
if extra is None:
extra = {}
jwt_encoded = jwt.encode(
{"user_id": user_id, "iat": int(time.time()), **extra},
self.secret,
algorithm="HS256",
2022-01-15 00:23:50 +00:00
)
return jwt_encoded
2022-01-15 00:23:50 +00:00
def read(self, token):
return jwt.decode(token, self.secret, algorithms=["HS256"])
2022-01-15 00:23:50 +00:00
def check_no_iat(self, token) -> Optional[Dict[str, Any]]:
2022-01-15 00:23:50 +00:00
try:
return jwt.decode(str(token), self.secret, algorithms=["HS256"])
2022-01-15 00:23:50 +00:00
except PyJWTError as error:
logger.debug("Error while checking JWT token: ", exc_info=error)
return None
def check(
self,
token,
) -> Optional[Tuple[Optional[ApiKeys], Dict[str, Any], Dict[str, Any]]]:
2022-01-15 00:23:50 +00:00
try:
data = jwt.decode(str(token), self.secret, algorithms=["HS256"])
2022-01-15 00:23:50 +00:00
except PyJWTError as error:
logger.debug("Error while checking JWT token: ", exc_info=error)
return None
iat: int = data["iat"]
2022-01-15 00:23:50 +00:00
key: Optional[ApiKeys] = None
if "token_id" in data:
key_id = data["token_id"]
key = HelperUsers.get_user_api_key(key_id)
2022-01-15 00:23:50 +00:00
if key is None:
return None
user_id: str = data["user_id"]
user = HelperUsers.get_user(user_id)
# TODO: Have a cache or something so we don't constantly
# have to query the database
valid_tokens_from_str = user.get("valid_tokens_from")
# It's possible this will be a string or a dt coming from the DB
# We need to account for that
try:
valid_tokens_from_dt = datetime.strptime(
valid_tokens_from_str, "%Y-%m-%d %H:%M:%S.%f%z"
)
except TypeError:
valid_tokens_from_dt = valid_tokens_from_str
# Convert the string to a datetime object
if int(valid_tokens_from_dt.timestamp()) < iat:
2022-01-15 00:23:50 +00:00
# Success!
return key, data, user
2022-06-14 12:40:57 +00:00
return None
2022-01-15 00:23:50 +00:00
def check_err(
self,
token,
) -> Tuple[Optional[ApiKeys], Dict[str, Any], Dict[str, Any]]:
# Without this function there would be runtime exceptions like the following:
# "None" object is not iterable
output = self.check(token)
if output is None:
2023-02-15 23:44:53 +00:00
raise ValueError("Invalid token")
return output
def check_bool(self, token) -> bool:
return self.check(token) is not None