Source code for crypto.util

# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information

'''
.. topic:: Details

    Crypto utility functions. Includes:

        - Constant time string comparisons
        - Wrappers around encryption/decryption operations
        - The "recognized" check for incoming cells
        - A couple methods for verifying TLS certificate properties (signatures
          and times)

'''
import hashlib
import hmac
import struct

from datetime import datetime
from itertools import izip

import OpenSSL

from Crypto.Cipher import AES
from Crypto.Util import asn1, Counter

from oppy.cell.cell import Cell
from oppy.cell.definitions import RECOGNIZED, EMPTY_DIGEST
from oppy.cell.fixedlen import EncryptedCell
from oppy.crypto.exceptions import UnrecognizedCell


[docs]def constantStrEqual(str1, str2): '''Do a constant-time comparison of str1 and str2, returning **True** if they are equal, **False** otherwise. Use built-in hmac.compare_digest if it's available, otherwise custom constant-time comparison. :param str str1: first string to compare :param str str2: second string to compare :returns: **bool** **True** if str1 == str2, **False** otherwise ''' try: from hmac import compare_digest return compare_digest(str1, str2) # use our own constant time equality comparison if no stdlib version except ImportError: pass if len(str1) != len(str2): # we've already failed at this point, but loop through # all chars anyway res = 1 comp1 = bytearray(str2) comp2 = bytearray(str2) else: res = 0 comp1 = bytearray(str1) comp2 = bytearray(str2) for a, b in izip(comp1, comp2): res |= a ^ b return res == 0
[docs]def constantStrAllZero(s): '''Check if *s* consists of all zero bytes. :param str s: string to check :returns: **bool** **True** if *s* contains all zero bytes, **False** otherwise ''' return constantStrEqual(s, '\x00' * len(s))
[docs]def makeAES128CTRCipher(key, initial_value=0): '''Create and return a new AES128-CTR cipher instance. :param str key: key to use for this cipher :param initial_value: initial_value to use :returns: **Crypto.Cipher.AES.AES** ''' ctr = Counter.new(128, initial_value=initial_value) return AES.new(key, AES.MODE_CTR, counter=ctr)
[docs]def makeHMACSHA256(msg, key): '''Make a new HMAC-SHA256 with *msg* and *key* and return digest byte string. :param str msg: msg :param str key: key to use :returns: **str** HMAC digest ''' t = hmac.new(msg=msg, key=key, digestmod=hashlib.sha256) return t.digest()
[docs]def makePayloadWithDigest(payload, digest=EMPTY_DIGEST): '''Make a new payload with *digest* inserted in the correct position. :param str payload: payload in which to insert digest :param str digest: digest to insert :returns: **str** payload with digest inserted into correct position ''' DIGEST_START = 5 DIGEST_END = 9 return payload[:DIGEST_START] + digest + payload[DIGEST_END:]
[docs]def encryptCellToTarget(cell, crypt_path, target=2, early=False): '''Encrypt *cell* to the *target* relay in *crypt_path* and update the appropriate forward digest. :param cell cell: cell to encrypt :param list crypt_path: list of RelayCrypto instances available for encryption :param int target: target node to encrypt to :param bool early: if **True**, use a RELAY_EARLY cmd instead of a RELAY cmd :returns: **oppy.cell.fixedlen.EncryptedCell** ''' assert target >= 0 and target < len(crypt_path) assert cell.rheader.digest == EMPTY_DIGEST # 1) update f_digest with cell payload bytes crypt_path[target].forward_digest.update(cell.getPayload()) # 2) insert first four bytes into new digest position cell.rheader.digest = crypt_path[target].forward_digest.digest()[:4] # 3) encrypt payload payload = cell.getPayload() for node in xrange(target + 1): payload = crypt_path[node].forward_cipher.encrypt(payload) # 4) return encrypted relay cell with new payload return EncryptedCell.make(cell.header.circ_id, payload, early=early)
[docs]def cellRecognized(payload, relay_crypto): '''Return **True** if this payload is *recognized*. .. note:: See tor-spec Section 6.1 for details about what it means for a cell to be *recognized*. :param str payload: payload to check if recognized :param oppy.crypto.relaycrypto.RelayCrypto relay_crypto: RelayCrypto instance to use for checking if payload is recognized :returns: **bool** **True** if this payload is recognized, **False** otherwise ''' test_payload = payload recognized = test_payload[2:4] digest = test_payload[5:9] test_payload = makePayloadWithDigest(test_payload) if recognized != RECOGNIZED: return False test_digest = relay_crypto.backward_digest.copy() test_digest.update(test_payload) # no danger of timing attack here since we just # drop the cell if it's not recognized return test_digest.digest()[:4] == digest
[docs]def decryptCellUntilRecognized(cell, crypt_path, origin=2): '''Decrypt *cell* until it is recognized or we've tried all RelayCrypto's in *crypt_path*. Attempt to decrypt the cell one hop at a time. Stop if the cell is recognized. Raise an exception if the cell is not recognized at all. :param cell cell: cell to decrypt :param list, oppy.crypto.relaycrypto.RelayCrypto crypt_path: list of RelayCrypto instances to use for decryption :param int origin: the originating hop we think this cell came from :returns: the concrete RelayCell type of this decrypted cell ''' assert 0 <= origin <= 2, 'We can only handle 3-hop paths' recognized = False payload = cell.getPayload() for node in xrange(len(crypt_path)): relay_crypto = crypt_path[node] payload = relay_crypto.backward_cipher.decrypt(payload) if cellRecognized(payload, relay_crypto): recognized = True origin = node break if recognized: updated_payload = makePayloadWithDigest(payload) crypt_path[origin].backward_digest.update(updated_payload) if cell.header.link_version < 4: cid = struct.pack('!H', cell.header.circ_id) else: cid = struct.pack('!I', cell.header.circ_id) cmd = struct.pack('!B', cell.header.cmd) dec = Cell.parse(cid + cmd + payload) return (dec, origin) else: raise UnrecognizedCell()
[docs]def verifyCertSig(id_cert, cert_to_verify, algo='sha1'): '''Verify that the SSL certificate *id_cert* has signed the TLS cert *cert_to_verify*. :param id_cert: Identification Certificate :type id_cert: OpenSSL.crypto.X509 :param cert_to_verify: certificate to verify signature on :type cert_to_verify: OpenSSL.crypto.X509 :param algo: algorithm to use for certificate verification :type algo: str :returns: **bool** **True** if the signature of *cert_to_verify* can be verified from *id_cert*, **False** otherwise ''' cert_to_verify_ASN1 = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_ASN1, cert_to_verify) der = asn1.DerSequence() der.decode(cert_to_verify_ASN1) cert_to_verify_DER = der[0] cert_to_verify_ALGO = der[1] cert_to_verify_SIG = der[2] sig_DER = asn1.DerObject() sig_DER.decode(cert_to_verify_SIG) sig = sig_DER.payload # first byte is number of unused bytes. should be zero assert sig[0] == '\x00' sig = sig[1:] try: OpenSSL.crypto.verify(id_cert, sig, cert_to_verify_DER, algo) return True except OpenSSL.crypto.Error: return False # XXX should we check that the time is not later than the current time?
[docs]def validCertTime(cert): '''Verify that TLS certificate *cert*'s time is not earlier than cert.notBefore and not later than cert.notAfter. :param OpenSSL.crypto.X509 cert: TLS Certificate to verify times of :returns: **bool** **True** if cert.notBefore < now < cert.notAfter, **False** otherwise ''' now = datetime.now() validAfter = datetime.strptime(cert.get_notBefore(), '%Y%m%d%H%M%SZ') validUntil = datetime.strptime(cert.get_notAfter(), '%Y%m%d%H%M%SZ') return validAfter < now < validUntil