Source code for wheezy.security.crypto.ticket

""" ``crypto`` module.
"""

from base64 import b64decode, b64encode
from binascii import Error as BinError
from hmac import new as hmac_new
from os import urandom
from struct import pack, unpack
from time import time
from warnings import warn

from wheezy.security.crypto.comp import (
    aes128,
    block_size,
    decrypt,
    digest_size,
    encrypt,
    sha1,
)
from wheezy.security.crypto.padding import pad, unpad

BASE64_ALTCHARS = b"-~"
EMPTY = b""
EPOCH = 1317212745


[docs]def ensure_strong_key(key, digestmod): """Translates a given key to a computed strong key of length 3 * digestmode.digest_size suitable for encryption, e.g. with digestmod set to ``sha1`` returns 480 bit (60 bytes) key. """ hmac = hmac_new(key, key, digestmod) k1 = hmac.digest() hmac.update(k1) k2 = hmac.digest() hmac.update(k2) return k1 + k2 + hmac.digest()
def timestamp(): return int(time()) - EPOCH
[docs]class Ticket(object): """Protects sensitive information (e.g. user id). Default policy applies verification and encryption. Verification is provided by ``hmac`` initialized with ``sha1`` digestmod. Encryption is provided if available, by default it attempts to use AES cypher. """ __slots__ = ("cypher", "max_age", "hmac", "digest_size", "block_size") def __init__( self, max_age=900, salt="", digestmod=None, cypher=aes128, options=None ): self.max_age = max_age if not digestmod: warn( "Ticket: digestmod is not specified, fallback to sha1", stacklevel=2, ) digestmod = sha1 options = options or {} key = (salt + options.get("CRYPTO_VALIDATION_KEY", "")).encode( "latin1" ) key = ensure_strong_key(key, digestmod) self.hmac = hmac_new(key, digestmod=digestmod) self.digest_size = digest_size(digestmod) if cypher: key = (salt + options.get("CRYPTO_ENCRYPTION_KEY", "")).encode( "latin1" ) key = ensure_strong_key(key, digestmod) self.cypher = cypher(key) self.block_size = block_size(self.cypher()) else: self.cypher = None warn("Ticket: cypher not available", stacklevel=2)
[docs] def encode(self, value, encoding="UTF-8"): """Encode ``value`` according to ticket policy.""" value = value.encode(encoding) expires = pack("<i", timestamp() + self.max_age) noise = urandom(12) value = EMPTY.join((noise[:4], expires, noise[4:8], value, noise[8:])) if self.cypher: value = encrypt(self.cypher(), pad(value, self.block_size)) return b64encode(self.sign(value) + value, BASE64_ALTCHARS).decode( "latin1" )
[docs] def decode(self, value, encoding="UTF-8"): """Decode ``value`` according to ticket policy.""" if len(value) < 48: return (None, None) try: value = b64decode(value.encode("latin1"), BASE64_ALTCHARS) except (TypeError, BinError): return (None, None) signature = value[: self.digest_size] value = value[self.digest_size :] if signature != self.sign(value): return (None, None) if self.cypher: if len(value) % self.block_size != 0: return (None, None) value = unpad(decrypt(self.cypher(), value), self.block_size) if value is None: return (None, None) if len(value) < 16: # pragma: nocover return (None, None) expires, value = value[4:8], value[12:-4] time_left = unpack("<i", expires)[0] - timestamp() if time_left < 0 or time_left > self.max_age: return (None, None) try: return (value.decode(encoding), time_left) except UnicodeDecodeError: return (None, None)
[docs] def sign(self, value): """Compute hmac digest.""" h = self.hmac.copy() h.update(value) return h.digest()