mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2024-08-30 18:23:09 +00:00
2a512d7273
Mostly just breaking up strings and comments into new lines Some strings dont require 'f' but keeping in for readability with the rest of the concatinated string
80 lines
2.4 KiB
Python
80 lines
2.4 KiB
Python
import logging
|
|
import time
|
|
from typing import Optional, Dict, Any, Tuple
|
|
|
|
from app.classes.models.users import users_helper, ApiKeys
|
|
from app.classes.shared.helpers import helper
|
|
|
|
try:
|
|
import jwt
|
|
from jwt import PyJWTError
|
|
|
|
except ModuleNotFoundError as e:
|
|
helper.auto_installer_fix(e)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Authentication:
|
|
def __init__(self):
|
|
self.secret = "my secret"
|
|
self.secret = helper.get_setting("apikey_secret", None)
|
|
|
|
if self.secret is None or self.secret == "random":
|
|
self.secret = helper.random_string_generator(64)
|
|
|
|
@staticmethod
|
|
def generate(user_id, extra=None):
|
|
if extra is None:
|
|
extra = {}
|
|
return jwt.encode(
|
|
{"user_id": user_id, "iat": int(time.time()), **extra},
|
|
authentication.secret,
|
|
algorithm="HS256",
|
|
)
|
|
|
|
@staticmethod
|
|
def read(token):
|
|
return jwt.decode(token, authentication.secret, algorithms=["HS256"])
|
|
|
|
@staticmethod
|
|
def check_no_iat(token) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
return jwt.decode(token, authentication.secret, algorithms=["HS256"])
|
|
except PyJWTError as error:
|
|
logger.debug("Error while checking JWT token: ", exc_info=error)
|
|
return None
|
|
|
|
@staticmethod
|
|
def check(
|
|
token,
|
|
) -> Optional[Tuple[Optional[ApiKeys], Dict[str, Any], Dict[str, Any]]]:
|
|
try:
|
|
data = jwt.decode(token, authentication.secret, algorithms=["HS256"])
|
|
except PyJWTError as error:
|
|
logger.debug("Error while checking JWT token: ", exc_info=error)
|
|
return None
|
|
iat: int = data["iat"]
|
|
key: Optional[ApiKeys] = None
|
|
if "token_id" in data:
|
|
key_id = data["token_id"]
|
|
key = users_helper.get_user_api_key(key_id)
|
|
if key is None:
|
|
return None
|
|
user_id: str = data["user_id"]
|
|
user = users_helper.get_user(user_id)
|
|
# TODO: Have a cache or something so we don't constantly
|
|
# have to query the database
|
|
if int(user.get("valid_tokens_from").timestamp()) < iat:
|
|
# Success!
|
|
return key, data, user
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def check_bool(token) -> bool:
|
|
return authentication.check(token) is not None
|
|
|
|
|
|
authentication = Authentication()
|