crafty-4/app/classes/shared/authentication.py
Zedifus 45c3f73eca Revert "Revert "Merge branch 'dev' into 'master'""
This reverts the 4.4.1 release revert commit 29ce7a2cde.
2024-08-06 20:45:00 +01:00

95 lines
3.2 KiB
Python

import logging
import time
from datetime import datetime
from typing import Optional, Dict, Any, Tuple
import jwt
from jwt import PyJWTError
from app.classes.models.users import HelperUsers, ApiKeys
from app.classes.controllers.management_controller import ManagementController
logger = logging.getLogger(__name__)
class Authentication:
def __init__(self, helper):
self.helper = helper
try:
self.secret = ManagementController.get_crafty_api_key()
if self.secret == "":
self.secret = self.helper.random_string_generator(64)
ManagementController.set_crafty_api_key(str(self.secret))
except:
self.secret = self.helper.random_string_generator(64)
ManagementController.set_crafty_api_key(str(self.secret))
def generate(self, user_id, extra=None):
if extra is None:
extra = {}
jwt_encoded = jwt.encode(
{"user_id": user_id, "iat": int(time.time()), **extra},
self.secret,
algorithm="HS256",
)
return jwt_encoded
def read(self, token):
return jwt.decode(token, self.secret, algorithms=["HS256"])
def check_no_iat(self, token) -> Optional[Dict[str, Any]]:
try:
return jwt.decode(str(token), self.secret, algorithms=["HS256"])
except PyJWTError as error:
logger.debug("Error while checking JWT token: ", exc_info=error)
return None
def check(
self,
token,
) -> Optional[Tuple[Optional[ApiKeys], Dict[str, Any], Dict[str, Any]]]:
try:
data = jwt.decode(str(token), self.secret, algorithms=["HS256"])
except PyJWTError as error:
logger.debug("Error while checking JWT token: ", exc_info=error)
return None
iat: int = data["iat"]
key: Optional[ApiKeys] = None
if "token_id" in data:
key_id = data["token_id"]
key = HelperUsers.get_user_api_key(key_id)
if key is None:
return None
user_id: str = data["user_id"]
user = HelperUsers.get_user(user_id)
# TODO: Have a cache or something so we don't constantly
# have to query the database
valid_tokens_from_str = user.get("valid_tokens_from")
# It's possible this will be a string or a dt coming from the DB
# We need to account for that
try:
valid_tokens_from_dt = datetime.strptime(
valid_tokens_from_str, "%Y-%m-%d %H:%M:%S.%f%z"
)
except TypeError:
valid_tokens_from_dt = valid_tokens_from_str
# Convert the string to a datetime object
if int(valid_tokens_from_dt.timestamp()) < iat:
# Success!
return key, data, user
return None
def check_err(
self,
token,
) -> Tuple[Optional[ApiKeys], Dict[str, Any], Dict[str, Any]]:
# Without this function there would be runtime exceptions like the following:
# "None" object is not iterable
output = self.check(token)
if output is None:
raise ValueError("Invalid token")
return output
def check_bool(self, token) -> bool:
return self.check(token) is not None