Source code for cell.util
# Copyright 2014, 2015, Nik Kinkel and David Johnston
# See LICENSE for licensing information
import struct
import ipaddress
import oppy.cell.definitions as DEF
import oppy.util.tools as tools
from oppy.cell.exceptions import BadLinkSpecifier
[docs]class LinkSpecifier(object):
'''.. note:: tor-spec, Section 5.1.2'''
def __init__(self, relay, legacy=False):
'''
:param stem.descriptor.server_descriptor.RelayDescriptor relay:
relay descriptor that describes the relay for this link
specifier
:param bool legacy: if **True**, make a legacy link specifier.
make an IPv4 or IPv6 link specifier otherwise according to
the relay's public IP address.
'''
if legacy is True:
self.lstype = DEF.LSTYPE_LEGACY
self.lslen = DEF.LSLEN_LEGACY
self.lspec = tools.signingKeyToSHA1(relay.signing_key)
else:
addr = ipaddress.ip_address(unicode(relay.address))
port = relay.or_port
if isinstance(addr, ipaddress.IPv4Address):
self.lstype = DEF.LSTYPE_IPv4
self.lslen = DEF.LSLEN_IPv4
else:
self.lstype = DEF.LSTYPE_IPv6
self.lslen = DEF.LSLEN_IPv6
self.lspec = addr.packed + struct.pack('!H', port)
if len(self.lspec) != self.lslen:
raise BadLinkSpecifier()
[docs] def getBytes(self):
'''Build and construct the raw byte string represented by this
link specifier.
:returns: **str** raw byte string this link specifier represents
'''
ret = struct.pack('!B', self.lstype)
ret += struct.pack('!B', self.lslen)
ret += self.lspec
return ret
def __len__(self):
# lstype and lslen fields are one byte each
return 1 + 1 + self.lslen
TLV_ADDR_TYPE_LEN = 1
TLV_ADDR_LEN_LEN = 1
TLV_ERROR_TRANSIENT = 0xF0
TLV_ERROR_NONTRANSIENT = 0xF1
# XXX what about TTL?
[docs]class TLVTriple(object):
'''.. note:: tor-spec, Section 6.4
.. todo:: Handle the hostname type properly. TLVTriple's currently
don't know how to deal with a hostname type. Additionally, they
don't know how to handle a TTL or the various errors that can
occur.
'''
def __init__(self, addr):
'''
:param str addr: IP address for this TLVTriple
'''
addr = ipaddress.ip_address(addr)
if isinstance(addr, ipaddress.IPv4Address):
self.addr_type = DEF.IPv4_ADDR_TYPE
self.addr_len = DEF.IPv4_ADDR_LEN
elif isinstance(addr, ipaddress.IPv6Address):
self.addr_type = DEF.IPv6_ADDR_TYPE
self.addr_len = DEF.IPv6_ADDR_LEN
else:
msg = 'TLVTriple can only handle IPv4 and IPv6 type/length/value '
msg += 'triples for now.'
raise ValueError(msg)
self.value = addr.packed
# XXX addr_len is currently ignored because we can only currently handle
# IPv4 and IPv6 TLV's and not hostnames. When RELAY_RESOLVE/D cells
# are implemented, this should be changed to handle hostnames of
# different lengths
@staticmethod
[docs] def parse(data, offset):
'''Parse and extract TLVTriple fields from a byte string.
:param str data: byte string to parse
:param int offset: offset in str data where we should start
reading
:returns: :class:`~oppy.cell.util.TLVTriple`
'''
addr_type = struct.unpack('!B', data[offset:offset +
TLV_ADDR_TYPE_LEN])[0]
offset += TLV_ADDR_TYPE_LEN
# use addr_len for hostname types
addr_len = struct.unpack('!B', data[offset:offset +
TLV_ADDR_LEN_LEN])[0]
offset += TLV_ADDR_LEN_LEN
if addr_type == DEF.IPv4_ADDR_TYPE:
value = data[offset:offset + DEF.IPv4_ADDR_LEN]
offset += DEF.IPv4_ADDR_LEN
elif addr_type == DEF.IPv6_ADDR_TYPE:
value = data[offset:offset + DEF.IPv6_ADDR_LEN]
offset += DEF.IPv6_ADDR_LEN
else:
msg = "TLVTriple can't parse type {0} yet.".format(addr_type)
raise ValueError(msg)
return TLVTriple(value)
# XXX handle hostname types and errors properly
[docs] def getBytes(self):
'''Construct and return the raw byte string this TLVTriple
represents.
:returns: **str** raw byte string this TLVTriple represents.
'''
ret = struct.pack('!B', self.addr_type)
ret += struct.pack('!B', self.addr_len)
ret += self.value
return ret
def __len__(self):
return TLV_ADDR_TYPE_LEN + TLV_ADDR_LEN_LEN + len(self.value)