Source code for ska.contrib.django.ska.backends.base

import logging
from typing import Dict, Optional, Union

from django.contrib.auth.hashers import make_password
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
from django.db import IntegrityError
from django.http import HttpRequest
from rest_framework.request import Request

from ..... import Signature, extract_signed_request_data
from .....defaults import (
    DEFAULT_AUTH_USER_PARAM,
    DEFAULT_EXTRA_PARAM,
    DEFAULT_SIGNATURE_PARAM,
    DEFAULT_VALID_UNTIL_PARAM,
)
from .....exceptions import ImproperlyConfigured, InvalidData
from .....helpers import get_callback_func
from ..models import Signature as SignatureModel
from ..settings import (
    DB_PERFORM_SIGNATURE_CHECK,
    DB_STORE_SIGNATURES,
    SECRET_KEY,
    USER_CREATE_CALLBACK,
    USER_GET_CALLBACK,
    USER_INFO_CALLBACK,
    USER_VALIDATE_CALLBACK,
)
from ..utils import get_provider_data

LOGGER = logging.getLogger(__file__)

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2013-2023 Artur Barseghyan"
__license__ = "GPL-2.0-only OR LGPL-2.1-or-later"
__all__ = ("BaseSkaAuthenticationBackend",)


[docs] class BaseSkaAuthenticationBackend(object): """Base authentication backend."""
[docs] def get_settings( self, request_data: Dict[str, Union[bytes, str, float, int]] = None, request: HttpRequest = None, **kwargs, ): """Get settings. :return: """ raise NotImplementedError( "You should implement this method in your authentication backend" )
[docs] def get_secret_key( self, request_data: Dict[str, Union[bytes, str, float, int]] = None, request: HttpRequest = None, **kwargs, ) -> str: """Get secret key. :return: """ raise NotImplementedError( "You should implement this method in your authentication backend" )
[docs] def get_request_data( self, request: Union[HttpRequest, Request], **kwargs ) -> Dict[str, str]: return request.GET.dict()
[docs] def authenticate( self, request: Union[HttpRequest, Request], **kwargs ) -> Optional[User]: """Authenticate. :param django.http.HttpRequest request: :return django.contrib.auth.models.User: Instance or None on failure. """ if request is None: LOGGER.debug("Request is None, skipping") return None request_data = self.get_request_data(request, **kwargs) provider_settings = self.get_settings(request_data, request, **kwargs) provider_data = get_provider_data(request_data, provider_settings) if provider_data: secret_key = provider_data["SECRET_KEY"] else: secret_key = self.get_secret_key(request_data, request, **kwargs) if not secret_key: secret_key = SECRET_KEY try: # If authentication/data validation failed. signed_request_data = extract_signed_request_data( data=request_data, secret_key=secret_key, signature_param=DEFAULT_SIGNATURE_PARAM, auth_user_param=DEFAULT_AUTH_USER_PARAM, valid_until_param=DEFAULT_VALID_UNTIL_PARAM, extra_param=DEFAULT_EXTRA_PARAM, validate=True, fail_silently=False, ) except (ImproperlyConfigured, InvalidData) as err: LOGGER.debug(str(err)) return None # Get the username from request. auth_user = request_data.get(DEFAULT_AUTH_USER_PARAM) signature = request_data.get(DEFAULT_SIGNATURE_PARAM) valid_until = request_data.get(DEFAULT_VALID_UNTIL_PARAM) # All other specific data is taken from signed request data email = signed_request_data.get("email", "") first_name = signed_request_data.get("first_name", "") last_name = signed_request_data.get("last_name", "") # Validate request callback. Created to allow adding custom logic to # the incoming authentication requests. The main purpose is to provide # a flexible way of raising exceptions if the incoming authentication # request shall be blocked (for instance, email or username is in # black-list or right the opposite - not in the white list). The only # aim of the `USER_VALIDATE_CALLBACK` is to raise a # ``django.core.PermissionDenied`` exception if request data is # invalid. In that case, the authentication flow will halt. All # other exceptions would simply be ignored (but logged) and if no # exception raised, the normal flow would be continued. user_validate_callback = provider_data.get( "USER_VALIDATE_CALLBACK", USER_VALIDATE_CALLBACK ) if user_validate_callback is not None: callback_func = get_callback_func(user_validate_callback) if callback_func: try: user_validate_callback_resp = callback_func( request=request, signed_request_data=signed_request_data, ) except PermissionDenied as err: LOGGER.debug(str(err)) raise err except Exception as err: LOGGER.debug(str(err)) # Storing the signatures to database if set to be so. if DB_STORE_SIGNATURES: token = SignatureModel( auth_user=auth_user, signature=signature, valid_until=Signature.unix_timestamp_to_date(valid_until), ) try: token.save() except IntegrityError: if DB_PERFORM_SIGNATURE_CHECK: # Token has already been used. Do not authenticate. return None # Try to get user. If doesn't exist - create. try: user = User._default_manager.get(username=auth_user) # User get callback user_get_callback = provider_data.get( "USER_GET_CALLBACK", USER_GET_CALLBACK ) if user_get_callback is not None: callback_func = get_callback_func(user_get_callback) if callback_func: try: user_get_callback_resp = callback_func( user, request=request, signed_request_data=signed_request_data, ) except Exception as err: LOGGER.debug(str(err)) except User.DoesNotExist: user = User._default_manager.create_user( username=auth_user, email=email, password=make_password(password=None), first_name=first_name, last_name=last_name, ) user.save() # User create callback user_create_callback = provider_data.get( "USER_CREATE_CALLBACK", USER_CREATE_CALLBACK ) if user_create_callback is not None: callback_func = get_callback_func(user_create_callback) if callback_func: try: user_create_callback_resp = callback_func( user, request=request, signed_request_data=signed_request_data, ) except Exception as err: LOGGER.debug(str(err)) # User info callback user_info_callback = provider_data.get( "USER_INFO_CALLBACK", USER_INFO_CALLBACK ) if user_info_callback is not None: callback_func = get_callback_func(user_info_callback) if callback_func: try: callback_func( user, request=request, signed_request_data=signed_request_data, ) except Exception as err: LOGGER.debug(str(err)) return user
[docs] def get_user(self, user_id: int) -> User: """Get user in the ``django.contrib.auth.models.User`` if exists. :param int user_id: :return django.contrib.auth.models.User: """ try: return User._default_manager.get(pk=user_id) except User.DoesNotExist: return None