import datetime
import unittest
from copy import copy
from decimal import Decimal
from .. import (
HMACMD5Signature,
HMACSHA1Signature,
HMACSHA224Signature,
HMACSHA256Signature,
HMACSHA384Signature,
HMACSHA512Signature,
RequestHelper,
Signature,
error_codes,
sign_url,
signature_to_dict,
validate_signed_request_data,
)
from .base import log_info, parse_url_params, timestamp_to_human_readable
__title__ = "ska.tests.test_core"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2013-2023 Artur Barseghyan"
__license__ = "GPL-2.0-only OR LGPL-2.1-or-later"
__all__ = (
"ExtraTest",
"ShortcutsTest",
"SignatureTest",
"URLHelperTest",
)
[docs]
class SignatureTest(unittest.TestCase):
"""Tests of `ska.Signature` class."""
[docs]
def setUp(self):
"""Set up."""
self.auth_user = "user"
self.secret_key = "secret"
self.signature_classes = (
HMACMD5Signature,
HMACSHA1Signature,
HMACSHA224Signature,
HMACSHA256Signature,
HMACSHA384Signature,
HMACSHA512Signature,
)
@log_info
def __test_01_signature_test(self, signature_cls=Signature):
"""Signature test."""
flow = []
flow.append(("Signature class", signature_cls))
# Generate signature
sig = signature_cls.generate_signature(
auth_user=self.auth_user, secret_key=self.secret_key
)
flow.append(("Valid until used", sig.valid_until))
flow.append(
(
"Valid until (human readable)",
timestamp_to_human_readable(sig.valid_until),
)
)
flow.append(("Signature generated", sig.signature))
flow.append(("Signature is expired", sig.is_expired()))
# Check if not expired
self.assertTrue(not sig.is_expired())
# Check if valid
validation_result = signature_cls.validate_signature(
signature=sig.signature,
auth_user=self.auth_user,
secret_key=self.secret_key,
valid_until=sig.valid_until,
return_object=True,
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(validation_result.result)
return flow
[docs]
def test_01_signature_test(self):
"""Signature test."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_01_signature_test(signature_cls=signature_cls)
return flow
@log_info
def __test_02_signature_test_with_positive_time_lapse(
self, signature_cls=Signature
):
"""Signature test with positive time-lapse.
When signature is made on a host that has a positive (greater) time
difference with server. In this particular example, the host time is
5 minutes ahead the server time.
"""
flow = []
flow.append(("Signature class", signature_cls))
datetime_time_lapse = signature_cls.datetime_to_unix_timestamp(
datetime.datetime.now() + datetime.timedelta(seconds=300)
)
flow.append(("Valid until used", datetime_time_lapse))
flow.append(
(
"Valid until used (human readable)",
timestamp_to_human_readable(datetime_time_lapse),
)
)
# Generate signature
sig = signature_cls.generate_signature(
auth_user=self.auth_user,
secret_key=self.secret_key,
valid_until=datetime_time_lapse,
)
flow.append(("Signature generated", sig.signature))
flow.append(("Signature is expired", sig.is_expired()))
# Check if not expired
self.assertTrue(not sig.is_expired())
# Check if valid
validation_result = signature_cls.validate_signature(
signature=sig.signature,
auth_user=self.auth_user,
secret_key=self.secret_key,
valid_until=sig.valid_until,
return_object=True,
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(validation_result.result)
return flow
[docs]
def test_02_signature_test_with_positive_time_lapse(self):
"""Signature test with positive time-lapse.
When signature is made on a host that has a positive (greater) time
difference with server. In this particular example, the host time is
5 minutes ahead the server time.
"""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_02_signature_test_with_positive_time_lapse(
signature_cls=signature_cls
)
return flow
@log_info
def __test_03_signature_test_with_negative_time_lapse(
self, signature_cls=Signature
):
"""Fail test for signature test with negative time-lapse.
When signature is made on a host that has a negative (less) time
difference with server. In this particular example, the host time is
5 minutes behind the server time, which exceeds the signature
lifetime.
"""
flow = []
flow.append(("Signature class", signature_cls))
datetime_time_lapse = signature_cls.datetime_to_unix_timestamp(
datetime.datetime.now() - datetime.timedelta(seconds=300)
)
flow.append(("Valid until used", datetime_time_lapse))
flow.append(
(
"Valid until used (human readable)",
timestamp_to_human_readable(datetime_time_lapse),
)
)
# Generate signature
sig = signature_cls.generate_signature(
auth_user=self.auth_user,
secret_key=self.secret_key,
valid_until=datetime_time_lapse,
)
flow.append(("Signature generated", sig.signature))
flow.append(("Signature is expired", sig.is_expired()))
# Check if not expired
self.assertTrue(sig.is_expired())
# Check if valid
validation_result = signature_cls.validate_signature(
signature=sig.signature,
auth_user=self.auth_user,
secret_key=self.secret_key,
valid_until=sig.valid_until,
return_object=True,
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(not validation_result.result)
return flow
[docs]
def test_03_signature_test_with_negative_time_lapse(self):
"""Fail test. Signature test with negative time-lapse.
When signature is made on
a host that has a negative (less) time difference with server. In this
particular example, the host time is 5 minutes behind the server time,
which exceeds the signature lifetime.
"""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_03_signature_test_with_negative_time_lapse(
signature_cls=signature_cls
)
return flow
@log_info
def __test_04_fail_signature_test(self, signature_cls=Signature):
"""Fail signature tests."""
flow = []
flow.append(("Signature class", signature_cls))
validation_result = signature_cls.validate_signature(
signature="EBS6ipiqRLa6TY5vxIvZU30FpnM=",
auth_user="fakeuser",
secret_key="fakesecret",
valid_until=1377997396.0,
return_object=True,
)
flow.append(("Valid until used", 1377997396.0))
flow.append(
(
"Valid until used (human readable)",
timestamp_to_human_readable(1377997396.0),
)
)
flow.append(("Signature generated", "EBS6ipiqRLa6TY5vxIvZU30FpnM="))
flow.append(("Signature is expired", True))
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(not validation_result.result)
return flow
[docs]
def test_04_fail_signature_test(self):
"""Fail signature tests."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_04_fail_signature_test(
signature_cls=signature_cls
)
return flow
@log_info
def __test_05_fail_signature_test_validation_result_class(
self, signature_cls=Signature
):
"""Fail signature tests for `ValidationResult` class."""
flow = []
flow.append(("Signature class", signature_cls))
validation_result = signature_cls.validate_signature(
signature="EBS6ipiqRLa6TY5vxIvZU30FpnM=",
auth_user="fakeuser",
secret_key="fakesecret",
valid_until=1377997396.0,
return_object=True,
)
self.assertIsInstance(validation_result.reason, map)
self.assertIsInstance(validation_result.errors, list)
self.assertIsInstance(validation_result.message, str)
self.assertIsInstance(" ".join(validation_result.reason), str)
self.assertIsInstance(" ".join(map(str, validation_result.errors)), str)
flow.append(validation_result.message)
self.assertIn(error_codes.INVALID_SIGNATURE, validation_result.errors)
self.assertIn(
error_codes.SIGNATURE_TIMESTAMP_EXPIRED, validation_result.errors
)
return flow
[docs]
def test_05_fail_signature_test_validation_result_class(self):
"""Fail signature tests of the `ValidationResult` class."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_05_fail_signature_test_validation_result_class(
signature_cls=signature_cls
)
return flow
[docs]
class URLHelperTest(unittest.TestCase):
"""Tests of `ska.URLHelper` class."""
[docs]
def setUp(self):
"""Set up."""
self.auth_user = "user"
self.secret_key = "secret"
self.signature_classes = (
HMACMD5Signature,
HMACSHA1Signature,
HMACSHA224Signature,
HMACSHA256Signature,
HMACSHA384Signature,
HMACSHA512Signature,
)
@log_info
def __test_01_signature_to_url(self, signature_cls=Signature):
"""Signature test."""
flow = []
flow.append(("Signature class", signature_cls))
# Generate signature
signature = signature_cls.generate_signature(
auth_user=self.auth_user, secret_key=self.secret_key
)
request_helper = RequestHelper(
signature_param="signature",
auth_user_param="auth_user",
valid_until_param="valid_until",
signature_cls=signature_cls,
)
signed_endpoint_url = request_helper.signature_to_url(
signature=signature, endpoint_url="http://dev.example.com/api/"
)
flow.append(("URL generated", signed_endpoint_url))
# Now parsing back the URL params.
request_data = parse_url_params(signed_endpoint_url)
validation_result = request_helper.validate_request_data(
data=request_data, secret_key=self.secret_key
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(validation_result.result)
return flow
[docs]
def test_01_signature_to_url(self):
"""Signature test."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_01_signature_to_url(signature_cls=signature_cls)
return flow
@log_info
def __test_02_signature_to_url_fail(self, signature_cls=Signature):
"""Signature test. Fail test."""
flow = []
flow.append(("Signature class", signature_cls))
datetime_time_lapse = signature_cls.datetime_to_unix_timestamp(
datetime.datetime.now() - datetime.timedelta(seconds=300)
)
# Generate signature
signature = signature_cls.generate_signature(
auth_user=self.auth_user,
secret_key=self.secret_key,
valid_until=datetime_time_lapse,
)
request_helper = RequestHelper(
signature_param="signature",
auth_user_param="auth_user",
valid_until_param="valid_until",
signature_cls=signature_cls,
)
signed_endpoint_url = request_helper.signature_to_url(
signature=signature, endpoint_url="http://dev.example.com/api/"
)
flow.append(("URL generated", signed_endpoint_url))
# Now parsing back the URL params.
request_data = parse_url_params(signed_endpoint_url)
validation_result = request_helper.validate_request_data(
data=request_data, secret_key=self.secret_key
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(not validation_result.result)
return flow
[docs]
def test_02_signature_to_url_fail(self):
"""Signature test. Fail test."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_02_signature_to_url_fail(
signature_cls=signature_cls
)
return flow
[docs]
class ShortcutsTest(unittest.TestCase):
"""Tests for shortcut functions.
The following shortcut functions are tested: `sign_url`,
`signature_to_dict` and `validate_signed_request_data`.
"""
[docs]
def setUp(self):
"""Set up."""
self.auth_user = "user"
self.secret_key = "secret"
self.endpoint_url = "http://e.com/api/"
self.signature_classes = (
HMACMD5Signature,
HMACSHA1Signature,
HMACSHA224Signature,
HMACSHA256Signature,
HMACSHA384Signature,
HMACSHA512Signature,
)
@log_info
def __test_sign_url_validate_signed_request_data(
self, signature_cls=Signature
):
"""Tests for `sign_url` & `validate_signed_request_data`."""
flow = []
flow.append(("Signature class", signature_cls))
signed_url = sign_url(
auth_user=self.auth_user,
secret_key=self.secret_key,
url=self.endpoint_url,
)
flow.append(("URL generated", signed_url))
# Now parsing back the URL params and validate the signature data
request_data = parse_url_params(signed_url)
validation_result = validate_signed_request_data(
data=request_data, secret_key=self.secret_key
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(validation_result.result)
return flow
[docs]
def test_01_sign_url_and_validate_signed_request_data(self):
"""Tests for ``sign_url`` & ``validate_signed_request_data``."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_sign_url_validate_signed_request_data(
signature_cls=signature_cls
)
return flow
@log_info
def __test_sign_url_validate_signed_request_data_fail(
self, signature_cls=Signature
):
"""Fail tests for `sign_url` & `validate_signed_request_data`."""
flow = []
flow.append(("Signature class", signature_cls))
datetime_time_lapse = signature_cls.datetime_to_unix_timestamp(
datetime.datetime.now() - datetime.timedelta(seconds=300)
)
flow.append(("Valid until used", datetime_time_lapse))
flow.append(
(
"Valid until used (human readable)",
timestamp_to_human_readable(datetime_time_lapse),
)
)
signed_url = sign_url(
auth_user=self.auth_user,
secret_key=self.secret_key,
url=self.endpoint_url,
valid_until=datetime_time_lapse,
)
flow.append(("URL generated", signed_url))
# Now parsing back the URL params and validate the signature data
request_data = parse_url_params(signed_url)
validation_result = validate_signed_request_data(
data=request_data, secret_key=self.secret_key
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(not validation_result.result)
return flow
[docs]
def test_02_sign_url_and_validate_signed_request_data_fail(self):
"""Fail tests for `sign_url` & `validate_signed_request_data`."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_sign_url_validate_signed_request_data_fail(
signature_cls=signature_cls
)
return flow
@log_info
def __test_signature_to_dict_validate_signed_request_data(
self, signature_cls=Signature
):
"""
Tests for `signature_to_dict` & `validate_signed_request_data`."""
flow = []
flow.append(("Signature class", signature_cls))
signature_dict = signature_to_dict(
auth_user=self.auth_user,
secret_key=self.secret_key,
signature_cls=signature_cls,
)
flow.append(("Dictionary created", signature_dict))
# Now validate the signature data
validation_result = validate_signed_request_data(
data=signature_dict,
secret_key=self.secret_key,
signature_cls=signature_cls,
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(validation_result.result)
return flow
[docs]
def test_03_signature_to_dict_and_validate_signed_request_data(self):
"""Tests for `signature_to_dict` & `validate_signed_request_data`."""
flow = []
for signature_cls in self.signature_classes:
flow += self.__test_signature_to_dict_validate_signed_request_data(
signature_cls=signature_cls
)
return flow
@log_info
def __test_sig_to_dict_var_types_validate_signed_request_data(
self, signature_cls=Signature
):
"""
Tests for `signature_to_dict` with complex data &
`validate_signed_request_data`.
"""
flow = []
flow.append(("Signature class", signature_cls))
signature_dict = signature_to_dict(
auth_user=self.auth_user,
secret_key=self.secret_key,
signature_cls=signature_cls,
extra={
"int_type": 10,
"float_type": 10.99,
"str_type": "Lorem ipsum",
"bytes_type": "Dolor sit".encode(),
"decimal_type": Decimal("10.99"),
"bool_type": True,
},
)
flow.append(("Dictionary created", signature_dict))
# Now validate the signature data
validation_result = validate_signed_request_data(
data=signature_dict,
secret_key=self.secret_key,
signature_cls=signature_cls,
)
flow.append(("Signature is valid", validation_result.result))
flow.append(("Reason not valid", validation_result.reason))
self.assertTrue(validation_result.result)
return flow
[docs]
def test_04_sig_to_dict_var_types_and_validate_signed_request_data(self):
"""Tests for `signature_to_dict` with complex data &
`validate_signed_request_data`.
"""
flow = []
for signature_cls in self.signature_classes:
flow.extend(
self.__test_sig_to_dict_var_types_validate_signed_request_data(
signature_cls=signature_cls
)
)
return flow