Source code for ska.__init__

__title__ = 'ska'
__version__ = '1.3'
__build__ = 0x00000D
__author__ = 'Artur Barseghyan'
__copyright__ = 'Copyright (c) 2013 Artur Barseghyan'
__license__ = 'GPL 2.0/LGPL 2.1'
__all__ = (
    'sign_url', 'signature_to_dict', 'validate_signed_request_data', 'extract_signed_request_data',
    'Signature', 'RequestHelper', 'SignatureValidationResult',
    )

import datetime
import time
import hmac
from base64 import b64decode, b64encode
from hashlib import sha1

from six import PY3

try:
    from six.moves.urllib.parse import urlencode
except ImportError as e:
    if PY3:
        from urllib.parse import urlencode
    else:
        from urllib import urlencode

from ska.helpers import (
    dict_keys, dict_to_ordered_list, sorted_urlencode, extract_signed_data as extract_signed_data
    )
from ska.exceptions import InvalidData, ImproperlyConfigured
from ska.defaults import (
    SIGNATURE_LIFETIME, TIMESTAMP_FORMAT, DEFAULT_URL_SUFFIX,
    DEFAULT_SIGNATURE_PARAM, DEFAULT_AUTH_USER_PARAM, DEFAULT_VALID_UNTIL_PARAM,
    DEFAULT_EXTRA_PARAM
    )

_ = lambda x: x # For future integrations with gettext

# *******************************************************************************************************
# ******************************************* Signature *************************************************
# *******************************************************************************************************

[docs]class SignatureValidationResult(object): """ Signature validation result container. If signature validation result is True, things like this would work >>> res = SignatureValidationResult(result=True) >>> print bool(res) True >>> res = SignatureValidationResult(result=False, reason=_("Invalid signature")) >>> print bool(res) False """ #__slots__ = ('result', 'reason') def __init__(self, result, reason=[]): self.result = result self.reason = reason def __str__(self): return str(self.result) __unicode__ = __str__ __repr__ = __str__ def __bool__(self): return self.result __nonzero__ = __bool__
[docs]class Signature(object): """ Signature generation and validation based on symmetric keys. :param str signature: :param str auth_user: :param float|str valid_until: """ __slots__ = ('signature', 'auth_user', 'valid_until', 'extra') def __init__(self, signature, auth_user, valid_until, extra={}): self.signature = signature self.auth_user = auth_user self.valid_until = valid_until self.extra = extra def __str__(self): return self.signature __unicode__ = __str__ __repr__ = __str__ def __bool__(self): return self.result __nonzero__ = __bool__ @classmethod
[docs] def validate_signature(cls, signature, auth_user, secret_key, valid_until, extra={}, return_object=False): """ Validates the signature. :param str signature: :param str auth_user: :param str secret_key: :param float|str valid_until: Unix timestamp. :param dict extra: Extra arguments to be validated. If ``extra_keys`` is given, the ``extra`` is stripped to the "white listed" keys only. Otherwise - the entire ``extra`` dictionary is considered to be used. :param bool return_object: If set to True, an instance of ``SignatureValidationResult`` is returned. :return bool: :example: >>> Signature.validate_signature( 'EBS6ipiqRLa6TY5vxIvZU30FpnM=', 'user', 'your-secret-key', '1377997396.0' ) False """ if isinstance(signature, str): signature = signature.encode() sig = cls.generate_signature( auth_user = auth_user, secret_key = secret_key, valid_until = valid_until, extra = extra ) if not return_object: return sig.signature == signature and not sig.is_expired() else: result = (sig.signature == signature and not sig.is_expired()) reason = [] if sig.signature != signature: reason.append(_("Invalid signature!")) if sig.is_expired(): reason.append(_("Signature timestamp expired!")) return SignatureValidationResult(result, reason)
[docs] def is_expired(self): """ Checks if current signature is expired. Returns True if signature is expired and False otherwise. :return bool: :example: >>> sig = Signature.generate_signature('user', 'your-secret-key') # Generating the signature >>> sig.is_expired() False """ now = datetime.datetime.now() valid_util = Signature.unix_timestamp_to_date(self.valid_until) # Expires > now is a valid condition here. res = valid_util > now # But we actually check agains is expired, so it's the opposite. return not res
@classmethod
[docs] def get_base(cls, auth_user, timestamp, extra={}): """ Add something here so that timestamp to signature conversion is not that obvious. :param string auth_user: :param int timestamp: :param dict extra: :param list extra_keys: """ l = [str(timestamp), auth_user] if extra: urlencoded_extra = sorted_urlencode(extra) if urlencoded_extra: l.append(urlencoded_extra) return ("_".join(l)).encode()
@staticmethod
[docs] def make_secret_key(secret_key): """ The secret key how its' supposed to be used in generate signature. :param str secret_key: :return str: """ return secret_key.encode() #return b64encode(secret_key)
@classmethod
[docs] def generate_signature(cls, auth_user, secret_key, valid_until=None, lifetime=SIGNATURE_LIFETIME, extra={}): """ Generates the signature. If timestamp is given, the signature is created using the given timestamp. Otherwise current time is used. :param str auth_user: :param str secret_key: :param float|str valid_until: Unix timestamp, valid until. :param int lifetime: Lifetime of the signature in seconds. :param dict extra: Additional variables to be added. :return str: :example: >>> sig = Signature.generate_signature('user', 'your-secret-key') EBS6ipiqRLa6TY5vxIvZU30FpnM= """ if not valid_until: valid_until = time.mktime( (datetime.datetime.now() + datetime.timedelta(seconds=lifetime)).timetuple() ) else: try: cls.unix_timestamp_to_date(valid_until) except Exception as e: return None # Something went wrong raw_hmac = hmac.new( Signature.make_secret_key(secret_key), cls.get_base(auth_user, valid_until, extra=extra), sha1 ).digest() signature = b64encode(raw_hmac) return Signature(signature=signature, auth_user=auth_user, valid_until=valid_until, extra=extra)
@staticmethod
[docs] def datetime_to_timestamp(dt): """ Human readable datetime according to the format specified in ``TIMESTAMP_FORMAT``. :param datetime.datetime dt: :return str: """ try: return dt.strftime(TIMESTAMP_FORMAT) except Exception as e: pass
@staticmethod
[docs] def datetime_to_unix_timestamp(dt): """ Converts ``datetime.datetime`` to Unix timestamp. :param datetime.datetime dt: :return float: Unix timestamp. """ try: return time.mktime(dt.timetuple()) except Exception as e: pass
@classmethod
[docs] def timestamp_to_date(cls, timestamp, fail_silently=True): """ Converts the given timestamp to date. If ``fail_silently`` is set to False, raises exceptions if timestamp is not valid timestamp (according to the format we have specified in the ``TIMESTAMP_FORMAT``). Mainly used internally. :param str timestamp: :param bool fail_silently: :return str: """ try: return datetime.datetime.strptime(timestamp, TIMESTAMP_FORMAT) except Exception as e: if fail_silently is not True: raise e else: return None
@classmethod
[docs] def unix_timestamp_to_date(cls, timestamp, fail_silently=True): """ Converts the given Unix timestamp to date. If ``fail_silently`` is set to False, raises exceptions if timestamp is not valid timestamp. :param float|str timestamp: UNIX timestamp. Parsable to float. :param bool fail_silently: :return str: """ try: return datetime.datetime.fromtimestamp(float(timestamp)) except Exception as e: if fail_silently is not True: raise e else: return None # ******************************************************************************************************* # **************************************** Request helper *********************************************** # *******************************************************************************************************
[docs]class RequestHelper(object): """ Request helper for easy put/extract of signature params from URLs. :param str signature_param: :param str auth_user_param: :param str valid_until_param: :param str extra_keys_param: """ def __init__(self, signature_param=DEFAULT_SIGNATURE_PARAM, auth_user_param=DEFAULT_AUTH_USER_PARAM, valid_until_param=DEFAULT_VALID_UNTIL_PARAM, extra_param=DEFAULT_EXTRA_PARAM): self.signature_param = signature_param self.auth_user_param = auth_user_param self.valid_until_param = valid_until_param self.extra_param = extra_param
[docs] def signature_to_url(self, signature, endpoint_url='', suffix=DEFAULT_URL_SUFFIX): """ URL encodes the signature params. :param ska.Signature signature: :param str endpoint_url: :param str suffix: Suffix to add after the ``endpoint_url`` and before the appended signature params. :return str: :example: Required imports. >>> from ska import Signature, RequestHelper Generate signature. >>> signature = Signature.generate_signature( >>> auth_user = 'user', >>> secret_key = 'your-secret-key' >>> ) Create a request helper. >>> request_helper = RequestHelper( >>> signature_param = 'signature', >>> auth_user_param = 'auth_user', >>> valid_until_param = 'valid_until' >>> ) Appending signature params to the endpoint URL. >>> url = request_helper.signature_to_url( >>> signature = signature, >>> endpoint_url = 'http://e.com/api/' >>> ) http://e.com/api/?valid_until=1378045287.0&auth_user=user&signature=YlZpLFsjUKBalL4x5trhkeEgqE8%3D """ params = { self.signature_param: signature.signature, self.auth_user_param: signature.auth_user, self.valid_until_param: signature.valid_until, self.extra_param: dict_keys(signature.extra, return_string=True), } # Make some check that params used do not overlap with names reserved (`auth_user`, `signature`, etc). params.update(signature.extra) return "{0}{1}{2}".format(endpoint_url, suffix, urlencode(params))
[docs] def signature_to_dict(self, signature): """ Puts signature into a dictionary, which can later on be used to send when sending (POST) requests to the server. :param ska.Signature signature: :return dict: :example: Required imports. >>> from ska import Signature, RequestHelper Generate signature. >>> signature = Signature.generate_signature( >>> auth_user = 'user', >>> secret_key = 'your-secret-key' >>> ) Create a request helper. >>> request_helper = RequestHelper( >>> signature_param = 'signature', >>> auth_user_param = 'auth_user', >>> valid_until_param = 'valid_until' >>> ) Appending signature params to the endpoint URL. >>> signed_dict = request_helper.signature_to_dict( >>> signature = signature >>> ) { 'signature': 'YlZpLFsjUKBalL4x5trhkeEgqE8=', 'auth_user': 'user', 'valid_until': '1378045287.0' } """ d = { self.signature_param: signature.signature, self.auth_user_param: signature.auth_user, self.valid_until_param: signature.valid_until, self.extra_param: dict_keys(signature.extra, return_string=True), } d.update(signature.extra) return d
[docs] def validate_request_data(self, data, secret_key): """ Validates the request data. :param dict data: :param str secret_key: :return ska.SignatureValidationResult: :example: If your imaginary ``HttpRequest`` object has ``GET`` property (dict), then you would validate the request data as follows. Create a ``RequestHelper`` object with param names expected. Required imports. >>> from ska import RequestHelper Create a request helper. >>> request_helper = RequestHelper( >>> signature_param = 'signature', >>> auth_user_param = 'auth_user', >>> valid_until_param = 'valid_until' >>> ) Validate the request data. >>> validation_result = request_helper.validate_request_data( >>> data = request.GET, >>> secret_key = 'your-secret-key' >>> ) """ signature = data.get(self.signature_param, '') auth_user = data.get(self.auth_user_param, '') valid_until = data.get(self.valid_until_param, '') extra = extract_signed_data(data=data, extra=data.get(self.extra_param, '').split(',')) validation_result = Signature.validate_signature( signature = signature, auth_user = auth_user, secret_key = secret_key, valid_until = valid_until, return_object = True, extra = extra ) return validation_result
[docs] def extract_signed_data(self, data, secret_key=None, validate=False, fail_silently=False): """ Extracts signed data from the request. """ if validate: if not secret_key: if fail_silently: return {} raise ImproperlyConfigured("You should provide `secret_key` if `validate` is set to True.") validation_result = self.validate_request_data(data, secret_key) if not validation_result.result: if fail_silently: return {} raise InvalidData(validation_result.reason) return extract_signed_data(data=data, extra=data.get(self.extra_param, '').split(',')) # ******************************************************************************************************* # ************************************ Shortcut functions *********************************************** # *******************************************************************************************************
[docs]def sign_url(auth_user, secret_key, valid_until=None, lifetime=SIGNATURE_LIFETIME, url='', \ suffix=DEFAULT_URL_SUFFIX, signature_param=DEFAULT_SIGNATURE_PARAM, \ auth_user_param=DEFAULT_AUTH_USER_PARAM, valid_until_param=DEFAULT_VALID_UNTIL_PARAM, \ extra={}, extra_param=DEFAULT_EXTRA_PARAM): """ Signs the URL. :param str auth_user: Username of the user making the request. :param str secret_key: The shared secret key. :param float|str valid_until: Unix timestamp. If not given, generated automatically (now + lifetime). :param int lifetime: Signature lifetime in seconds. :param str url: URL to be signed. :param str suffix: Suffix to add after the ``endpoint_url`` and before the appended signature params. :param str signature_param: Name of the GET param name which would hold the generated signature value. :param str auth_user_param: Name of the GET param name which would hold the ``auth_user`` value. :param str valid_until_param: Name of the GET param name which would hold the ``valid_until`` value. :param dict extra: Extra variables to add to the request. :param str extra_param: Name of the GET param name which would hold the ``extra_keys`` value. :return str: :example: Required imports. >>> from ska import sign_url Producing a signed URL. >>> signed_url = sign_url( >>> auth_user='user', secret_key='your-secret_key', lifetime=120, \ >>> url='http://e.com/api/', signature_param=DEFAULT_SIGNATURE_PARAM, >>> auth_user_param=DEFAULT_AUTH_USER_PARAM, valid_until_param=DEFAULT_VALID_UNTIL_PARAM, >>> extra = {'provider': 'service1.example.com', 'email': 'john.doe@mail.example.com'}, >>> extra_param = DEFAULT_EXTRA_PARAM >>> ) http://e.com/api/?valid_until=1378045287.0&auth_user=user&signature=YlZpLFsjUKBalL4x5trhkeEgqE8%3D """ if lifetime is None: lifetime=SIGNATURE_LIFETIME assert isinstance(lifetime, int) signature = Signature.generate_signature( auth_user = auth_user, secret_key = secret_key, valid_until = valid_until, lifetime = lifetime, extra = extra ) request_helper = RequestHelper( signature_param = signature_param, auth_user_param = auth_user_param, valid_until_param = valid_until_param, extra_param = extra_param ) signed_url = request_helper.signature_to_url( signature = signature, endpoint_url = url, suffix = suffix, ) return signed_url
[docs]def signature_to_dict(auth_user, secret_key, valid_until=None, lifetime=SIGNATURE_LIFETIME, \ signature_param=DEFAULT_SIGNATURE_PARAM, auth_user_param=DEFAULT_AUTH_USER_PARAM, \ valid_until_param=DEFAULT_VALID_UNTIL_PARAM, extra={}, \ extra_param=DEFAULT_EXTRA_PARAM): """ Returns a dictionary containing the signature data params. :param str auth_user: Username of the user making the request. :param str secret_key: The shared secret key. :param float|str valid_until: Unix timestamp. If not given, generated automatically (now + lifetime). :param int lifetime: Signature lifetime in seconds. :param str signature_param: Name of the (for example POST) param name which would hold the generated ``signature`` value. :param str auth_user_param: Name of the (for example POST) param name which would hold the ``auth_user`` value. :param str valid_until_param: Name of the (for example POST) param name which would hold the ``valid_until`` value. :return str: :example: Required imports. >>> from ska import signature_to_dict Producing a dictionary with signature data. >>> signature_dict = signature_to_dict( >>> auth_user='user', secret_key='your-secret_key', lifetime=120, \ >>> signature_param=DEFAULT_SIGNATURE_PARAM, auth_user_param=DEFAULT_AUTH_USER_PARAM, \ >>> valid_until_param=DEFAULT_VALID_UNTIL_PARAM >>> ) { 'signature': 'YlZpLFsjUKBalL4x5trhkeEgqE8=', 'auth_user': 'user', 'valid_until': '1378045287.0' } """ if lifetime is None: lifetime=SIGNATURE_LIFETIME assert isinstance(lifetime, int) signature = Signature.generate_signature( auth_user = auth_user, secret_key = secret_key, valid_until = valid_until, lifetime = lifetime, extra = extra ) request_helper = RequestHelper( signature_param = signature_param, auth_user_param = auth_user_param, valid_until_param = valid_until_param, extra_param = extra_param ) signature_dict = request_helper.signature_to_dict( signature = signature ) return signature_dict
[docs]def validate_signed_request_data(data, secret_key, signature_param=DEFAULT_SIGNATURE_PARAM, \ auth_user_param=DEFAULT_AUTH_USER_PARAM, \ valid_until_param=DEFAULT_VALID_UNTIL_PARAM, \ extra_param=DEFAULT_EXTRA_PARAM): """ Validates the signed request data. :param dict data: Dictionary holding the (HTTP) request (for example GET or POST) data. :param str secret_key: The shared secret key. :param str signature_param: Name of the (for example GET or POST) param name which holds the ``signature`` value. :param str auth_user_param: Name of the (for example GET or POST) param name which holds the ``auth_user`` value. :param str valid_until_param: Name of the (foe example GET or POST) param name which holds the ``valid_until`` value. :return ska.SignatureValidationResult: A ``ska.SignatureValidationResult`` object with the following properties: - `result` (bool): True if data is valid. False otherwise. - `reason` (list): List of strings, indicating validation errors. Empty list in case if `result` is True. """ request_helper = RequestHelper( signature_param = signature_param, auth_user_param = auth_user_param, valid_until_param = valid_until_param, extra_param = extra_param ) validation_result = request_helper.validate_request_data( data = data, secret_key = secret_key ) return validation_result
[docs]def extract_signed_request_data(data, secret_key=None, signature_param=DEFAULT_SIGNATURE_PARAM, \ auth_user_param=DEFAULT_AUTH_USER_PARAM, \ valid_until_param=DEFAULT_VALID_UNTIL_PARAM, \ extra_param=DEFAULT_EXTRA_PARAM, validate=False, fail_silently=False): """ Validates the signed request data. :param dict data: Dictionary holding the (HTTP) request (for example GET or POST) data. :param str secret_key: The shared secret key. :param str signature_param: Name of the (for example GET or POST) param name which holds the ``signature`` value. :param str auth_user_param: Name of the (for example GET or POST) param name which holds the ``auth_user`` value. :param str valid_until_param: Name of the (foe example GET or POST) param name which holds the ``valid_until`` value. :param str extra_param: Name of the (foe example GET or POST) param name which holds the ``extra`` value. :param bool validate: If set to True, request data is validated before returning the result. :param bool fail_silently: If set to True, exceptions are ommitted. :return dict: Dictionary with signed request data. """ request_helper = RequestHelper( signature_param = signature_param, auth_user_param = auth_user_param, valid_until_param = valid_until_param, extra_param = extra_param ) return request_helper.extract_signed_data( data, secret_key = secret_key, validate = validate, fail_silently = fail_silently )
Read the Docs v: 1.3
Versions
latest
1.3
1.2
1.1
1.0
0.9
0.8
0.7
0.6
0.5
0.4
Downloads
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.