Source code for crypto.ntorhandshake

# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information

'''
.. topic:: Details

    NTorHandsake objects provide the methods for doing the ntor handshake
    key derivations and crypto operations. NTorHandshakes objects do the
    following jobs:

        - Create temporary public/private Curve 25519 keys
        - Create the initial onion skin
        - Derive key material from a Created2 or Extended2 cell
        - Create and initialize a RelayCrypto object, ready for use by
          a circuit (RelayCrypto objects are just wrappers around AES128-CTR
          ciphers and SHA-1 running digests, initialized with the derived key
          material)


.. warning:: NTorHandshakes do not safely erase/clear memory of private keys.

'''
import base64
import hashlib
import hkdf

from nacl import c
from nacl.public import PrivateKey, PublicKey

from oppy.crypto import util
from oppy.crypto.exceptions import KeyDerivationFailed
from oppy.crypto.relaycrypto import RelayCrypto
from oppy.util import tools


PROTOID     = "ntor-curve25519-sha256-1"
T_MAC       = PROTOID + ":mac"
T_KEY       = PROTOID + ":key_extract"
T_VERIFY    = PROTOID + ":verify"
M_EXPAND    = PROTOID + ":key_expand"
SERVER_STR  = "Server"

CURVE25519_PUBKEY_LEN   = 32
DIGEST_LEN              = 20
KEY_LEN                 = 16
NTOR_ONIONSKIN_LEN      = (2 * CURVE25519_PUBKEY_LEN + DIGEST_LEN)


[docs]class NTorHandshake(object): def __init__(self, relay): ''' :param stem.descriptor.server_descriptor.RelayDescriptor relay: the relay that we're doing an ntor handshake with ''' self._signing_key = tools.signingKeyToSHA1(relay.signing_key) self._ntor_onion_key = base64.b64decode(relay.ntor_onion_key) self._secret_key = PrivateKey.generate() self._public_key = self._secret_key.public_key self.is_bad = False
[docs] def createOnionSkin(self): '''Build and return an *onion skin* to this handshake's relay. .. note:: See tor-spec Section 5.1.4 for more information. :returns: **str** raw byte string for this *onion skin* ''' b = self._signing_key b += self._ntor_onion_key b += bytes(self._public_key) assert len(b) == NTOR_ONIONSKIN_LEN return b
[docs] def deriveRelayCrypto(self, cell): '''Derive shared key material for this ntor handshake; create and return actual cipher and hash instances inside a RelayCrypto object. .. note:: See tor-spec Section 5.1.4, 5.2.2 for more details. :param cell cell: Created2 cell or Extended2 cell used to derive shared keys :returns: **oppy.crypto.relaycrypto.RelayCrypto** object initialized with the derived key material. ''' self.is_bad = False hdata = cell.hdata relay_pubkey = cell.hdata[: CURVE25519_PUBKEY_LEN] AUTH = hdata[CURVE25519_PUBKEY_LEN: CURVE25519_PUBKEY_LEN + DIGEST_LEN] secret_input = self._buildSecretInput(relay_pubkey) verify = util.makeHMACSHA256(msg=secret_input, key=T_VERIFY) auth_input = self._buildAuthInput(verify, relay_pubkey) auth_input = util.makeHMACSHA256(msg=auth_input, key=T_MAC) self.is_bad |= util.constantStrEqual(AUTH, auth_input) ret = self._makeRelayCrypto(secret_input) # don't fail until the very end to avoid leaking timing information if self.is_bad: raise KeyDerivationFailed() return ret
def _makeRelayCrypto(self, secret_input): '''Derive shared key material using HKDF from secret_input. :returns: **oppy.crypto.relaycrypto.RelayCrypto** initialized with shared key data ''' prk = hkdf.hkdf_extract(salt=T_KEY, input_key_material=secret_input, hash=hashlib.sha256) km = hkdf.hkdf_expand(pseudo_random_key=prk, info=M_EXPAND, length=72, hash=hashlib.sha256) df = km[: DIGEST_LEN] db = km[DIGEST_LEN : DIGEST_LEN * 2] kf = km[DIGEST_LEN * 2 : DIGEST_LEN * 2 + KEY_LEN] kb = km[DIGEST_LEN * 2 + KEY_LEN : DIGEST_LEN * 2 + KEY_LEN * 2] f_digest = hashlib.sha1(df) b_digest = hashlib.sha1(db) f_cipher = util.makeAES128CTRCipher(kf) b_cipher = util.makeAES128CTRCipher(kb) return RelayCrypto(forward_digest=f_digest, backward_digest=b_digest, forward_cipher=f_cipher, backward_cipher=b_cipher) def _buildAuthInput(self, verify, relay_pubkey): '''Build and return auth input as a byte string. .. note:: See tor-spec Section 5.1.4 for more details. :param str verify: the verification data derived from secret_input :param str relay_pubkey: the remote relay's CURVE_25519 public key received in the Created2/Extended2 cell :returns: **str** auth_input ''' b = verify b += self._signing_key b += self._ntor_onion_key b += relay_pubkey b += bytes(self._public_key) b += PROTOID b += SERVER_STR return b def _buildSecretInput(self, relay_pubkey): '''Build and return secret input as a byte string. .. note:: See tor-spec Section 5.1.4 for more details. :param relay_pubkey: the remote relay's CURVE_25519 public key received in the Created2/Extended2 cell :returns: **str** secret_input ''' b = self._scalarMult(relay_pubkey) b += self._scalarMult(self._ntor_onion_key) b += self._signing_key b += self._ntor_onion_key b += bytes(self._public_key) b += relay_pubkey b += PROTOID return b def _scalarMult(self, base): '''Perform base**self._secret_key. Set self.is_bad if the result is all zeros. .. note:: See tor-spec Section 5.1.4 for why this is an adequate replacement for checking that none of the EXP() operations produced the point at infinity. :returns: **str** result ''' # args are: exponent, base ret = c.crypto_scalarmult(bytes(self._secret_key), bytes(PublicKey(base))) self.is_bad |= util.constantStrAllZero(ret) return ret