import time
from base64 import b64encode
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Union
from . import error_codes
from .defaults import SIGNATURE_LIFETIME, TIMESTAMP_FORMAT
from .error_codes import ErrorCode
from .helpers import sorted_urlencode
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2013-2023 Artur Barseghyan"
__license__ = "GPL-2.0-only OR LGPL-2.1-or-later"
__all__ = (
"AbstractSignature",
"SignatureValidationResult",
)
# ****************************************************************************
# ****************************************************************************
# ******************************* Signature **********************************
# ****************************************************************************
# ****************************************************************************
[docs]
class SignatureValidationResult:
"""
Signature validation result container.
If signature validation result is True, things like this would work:
>>> res = SignatureValidationResult(result=True)
>>> print bool(res)
True
>>> res = SignatureValidationResult(
>>> result=False,
>>> reason=[error_codes.INVALID_SIGNATURE,]
>>> )
>>> print bool(res)
False
"""
# __slots__ = ('result', 'reason', 'errors')
def __init__(
self,
result: bool,
errors: Optional[List[Union[ErrorCode, Any]]] = None,
) -> None:
"""Constructor."""
self.result = result
self.errors = errors if errors else {}
def __str__(self) -> str:
return str(self.result)
__repr__ = __str__
def __bool__(self) -> bool:
return self.result
__nonzero__ = __bool__
@property
def message(self) -> str:
"""Human readable message of all errors.
:return:
"""
return " ".join(map(str, self.errors))
@property
def reason(self) -> map:
"""Reason.
For backwards compatibility. Returns list of text messages.
:return:
"""
return map(str, self.errors)
[docs]
class AbstractSignature:
"""Abstract class for signature generation and validation.
Based on symmetric keys.
:param signature:
:param auth_user:
:param valid_until:
"""
__slots__ = (
"signature",
"auth_user",
"valid_until",
"extra",
)
def __init__(
self,
signature: bytes,
auth_user: str,
valid_until: Union[float, str],
extra: Optional[Dict[str, Union[bytes, str, float, int]]] = None,
) -> None:
"""Constructor."""
self.signature = signature
self.auth_user = auth_user
self.valid_until = valid_until
self.extra = extra if extra else {}
def __str__(self) -> str:
return self.signature.decode()
__repr__ = __str__
def __bool__(self) -> bool:
return not self.is_expired()
__nonzero__ = __bool__
[docs]
@classmethod
def validate_signature(
cls,
signature: Union[str, bytes],
auth_user: str,
secret_key: str,
valid_until: Union[str, float],
extra: Optional[Dict[str, Union[bytes, str, float, int]]] = None,
return_object: bool = False,
value_dumper: Optional[Callable] = None,
quoter: Optional[Callable] = None,
) -> Union[SignatureValidationResult, bool]:
"""Validates the signature.
:param signature:
:param auth_user:
:param secret_key:
:param valid_until: Unix timestamp.
:param extra: Extra arguments to be validated.
:param return_object: If set to True, an instance of
``SignatureValidationResult`` is returned.
:param value_dumper:
:param quoter:
:return:
:example:
>>> Signature.validate_signature(
>>> 'EBS6ipiqRLa6TY5vxIvZU30FpnM=',
>>> 'user',
>>> 'your-secret-key',
>>> '1377997396.0'
>>> )
False
"""
if isinstance(signature, str):
signature = signature.encode()
if not extra:
extra = {}
sig = cls.generate_signature(
auth_user=auth_user,
secret_key=secret_key,
valid_until=valid_until,
extra=extra,
value_dumper=value_dumper,
quoter=quoter,
)
if not return_object:
return sig.signature == signature and not sig.is_expired()
else:
result = sig.signature == signature and not sig.is_expired()
errors = []
if sig.signature != signature:
errors.append(error_codes.INVALID_SIGNATURE)
if sig.is_expired():
errors.append(error_codes.SIGNATURE_TIMESTAMP_EXPIRED)
return SignatureValidationResult(result, errors)
[docs]
def is_expired(self) -> bool:
"""Checks if current signature is expired.
Returns True if signature is expired and False otherwise.
:return:
:example:
>>> # Generating the signature
>>> sig = Signature.generate_signature('user', 'your-secret-key')
>>> sig.is_expired()
False
"""
now = datetime.now()
valid_util = self.__class__.unix_timestamp_to_date(self.valid_until)
# Expires > now is a valid condition here.
res = valid_util > now
# But we actually check agains is expired, so it's the opposite.
return not res
[docs]
@classmethod
def get_base(
cls,
auth_user: str,
timestamp: Union[float, str],
extra: Optional[Dict[str, Union[bytes, str, float, int]]] = None,
value_dumper: Optional[Callable] = None,
quoter: Optional[Callable] = None,
) -> bytes:
"""Get base string.
Add something here so that timestamp to signature conversion is not
that obvious.
:param auth_user:
:param timestamp:
:param extra:
:param value_dumper:
:param quoter:
"""
if not extra:
extra = {}
_base = [str(timestamp), auth_user]
if extra:
urlencoded_extra = sorted_urlencode(
extra,
value_dumper=value_dumper,
quoter=quoter,
)
if urlencoded_extra:
_base.append(urlencoded_extra)
return ("_".join(_base)).encode()
[docs]
@staticmethod
def make_secret_key(secret_key: str) -> bytes:
"""The secret key how its' supposed to be used in generate signature.
:param secret_key:
:return:
"""
return secret_key.encode() # return b64encode(secret_key)
[docs]
@classmethod
def make_hash(
cls,
auth_user: str,
secret_key: str,
valid_until: Union[str, float] = None,
extra: Optional[Dict[str, Union[bytes, str, float, int]]] = None,
value_dumper: Optional[Callable] = None,
quoter: Optional[Callable] = None,
) -> bytes:
"""Make hash.
You should implement this method in your signature class.
:param auth_user:
:param secret_key:
:param valid_until: Unix timestamp, valid until.
:param extra: Additional variables to be added.
:param value_dumper:
:param quoter:
:return:
"""
raise NotImplementedError("You should implement this method!")
[docs]
@classmethod
def generate_signature(
cls,
auth_user: str,
secret_key: str,
valid_until: Optional[Union[float, str]] = None,
lifetime: int = SIGNATURE_LIFETIME,
extra: Optional[Dict[str, Union[bytes, str, float, int]]] = None,
value_dumper: Optional[Callable] = None,
quoter: Optional[Callable] = None,
) -> "AbstractSignature":
"""Generates the signature.
If timestamp is given, the signature is created using the given
timestamp. Otherwise current time is used.
:param auth_user:
:param secret_key:
:param valid_until: Unix timestamp, valid until.
:param lifetime: Lifetime of the signature in seconds.
:param extra: Additional variables to be added.
:param value_dumper:
:param quoter:
:return:
:example:
>>> sig = Signature.generate_signature('user', 'your-secret-key')
EBS6ipiqRLa6TY5vxIvZU30FpnM=
"""
if not extra:
extra = {}
if not valid_until:
valid_until = time.mktime(
(datetime.now() + timedelta(seconds=lifetime)).timetuple()
)
else:
try:
cls.unix_timestamp_to_date(valid_until)
except Exception:
return None # Something went wrong
signature = b64encode(
cls.make_hash(
auth_user,
secret_key,
valid_until,
extra,
value_dumper=value_dumper,
quoter=quoter,
)
)
return cls(
signature=signature,
auth_user=auth_user,
valid_until=valid_until,
extra=extra,
)
[docs]
@staticmethod
def datetime_to_timestamp(dtv: datetime) -> Optional[str]:
"""Human readable datetime according to the format specified.
Format is specified in ``TIMESTAMP_FORMAT``.
:param dtv:
:return:
"""
try:
return dtv.strftime(TIMESTAMP_FORMAT)
except Exception:
pass
[docs]
@staticmethod
def datetime_to_unix_timestamp(dtv: datetime) -> Optional[float]:
"""Convert ``datetime.datetime`` to Unix timestamp.
:param dtv:
:return: Unix timestamp.
"""
try:
return time.mktime(dtv.timetuple())
except Exception:
pass
[docs]
@classmethod
def timestamp_to_date(
cls, timestamp: Union[float, str], fail_silently: bool = True
) -> Union[datetime, None]:
"""Converts the given timestamp to date.
If ``fail_silently`` is set to False, raises exceptions if timestamp
is not valid timestamp (according to the format we have specified in
the ``TIMESTAMP_FORMAT``). Mainly used internally.
:param timestamp:
:param fail_silently:
:return:
"""
try:
return datetime.strptime(timestamp, TIMESTAMP_FORMAT)
except Exception as err:
if fail_silently is not True:
raise err
else:
return None
[docs]
@classmethod
def unix_timestamp_to_date(
cls, timestamp: Union[float, str], fail_silently: bool = True
) -> Union[datetime, None]:
"""Converts the given Unix timestamp to date.
If ``fail_silently`` is set to False, raises exceptions if timestamp
is not valid timestamp.
:param timestamp: UNIX timestamp. Possible to parse to float.
:param fail_silently:
:return:
"""
try:
return datetime.fromtimestamp(float(timestamp))
except Exception as err:
if fail_silently is not True:
raise err
else:
return None