Source code for connection.connection

# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information

'''
.. topic:: Details

    Connection objects represent TLS connections to Tor entry nodes. Connection
    objects have a few important jobs:

        - Do the initial handshake to authenticate the entry node and negotiate
          a Link Protocl version (currently oppy only supports Link Protocol
          Version 3)
        - Extract cells from incoming data streams and pass them to the
          appropriate circuit (based on circuit ID)
        - Write cells from circuits to entry nodes
        - Notify all associated circuits when the connection goes down

'''
import logging

from twisted.internet.protocol import Protocol

from oppy.cell.cell import Cell
from oppy.cell.definitions import PADDING_CMD_IDS
from oppy.cell.exceptions import NotEnoughBytes

from oppy.connection.handshake.v3 import V3FSM
from oppy.connection.handshake.exceptions import (
    BadHandshakeState,
    HandshakeFailed,
    UnexpectedCell,
)
from oppy.util.tools import enum


ConnState = enum(
    PENDING=0,
    OPEN=1,
)


[docs]class Connection(Protocol): '''A TLS connection to an entry node.''' def __init__(self, relay): ''' :param stem.descriptor.server_descriptor.RelayDescriptor relay: relay we should create a connection to ''' logging.debug('Creating connection to {0}'.format(relay.address)) # map all circuits using this connections self._circuit_map = {} self._buffer = '' self._relay = relay self._handshake = None self._state = ConnState.PENDING self._cell_queue = []
[docs] def writeCell(self, cell): '''Write a cell to this connections transport. If this connection is not yet open, append to the cell_queue to be written when the connection opens up. :param cell cell: cell to write ''' if self._state == ConnState.OPEN: self.transport.write(cell.getBytes()) else: self._cell_queue.append(cell)
[docs] def dataReceived(self, data): '''We received data from the remote connection. Extract cells from the data stream and send them along to be processed. :param str data: data received from remote end ''' self._buffer += data while Cell.enoughDataForCell(self._buffer): try: cell = Cell.parse(self._buffer, encrypted=True) self._deliverCell(cell) self._buffer = self._buffer[len(cell):] # this shouldn't happen and if it does, it's probably a bug except NotEnoughBytes as e: logging.debug(str(e)) break # XXX remove len(unimplemented cell bytes) from buffer except NotImplementedError: logging.debug("Received a cell we can't handle yet.") logging.debug('buffer contents:\n') logging.debug([ord(i) for i in self._buffer]) raise except (BadHandshakeState, HandshakeFailed, UnexpectedCell) as e: logging.warning(e) self.closeConnection() self._buffer = '' break
def _deliverCell(self, cell): '''Deliver *cell* either to an appropriate circuit or, if the handshake is not complete, to the handshake fsm. :param cell cell: incoming cell to deliver ''' # just drop any padding cells if cell.header.cmd in PADDING_CMD_IDS: return if self._state == ConnState.OPEN: self._recvCircuitCell(cell) else: self._recvHandshakeCell(cell) def _recvCircuitCell(self, cell): '''Process an incoming cell destined for a circuit. If we have a reference to the circuit with the ID indicated in the cell, hand off this cell to that circuit. If we have no reference to the circuit, drop the cell. :param cell cell: incoming cell ''' try: self._circuit_map[cell.header.circ_id].recvCell(cell) # drop cells to circuits we don't know about except KeyError: msg = "Connection {} received a cell to nonexistent circuit:" msg += "{}." msg = msg.format(self._relay.address, cell.header.circ_id) logging.debug(msg) def _recvHandshakeCell(self, cell): '''Process an incoming cell as part of this connection's handshake. Immediately write any response from the handshake fsm to this connection's transport. If any errors occur in the handshake, tear down this connection. If the handshake is completed after receiving this cell, transition this connection's state to ConnState.OPEN and flush this connection's queued up cells. :param cell cell: incoming handshake cell ''' try: response = self._handshake.recvCell(cell) except Exception as e: logging.debug(str(e)) self.closeConnection() return # send a handshake response if required if response is not None: self.transport.write(response.getBytes()) # empty our queue if we're finished with the handshake if self._handshake.isDone(): self._state = ConnState.OPEN msg = 'Completed handshake on connection: {0}' msg = msg.format(self._relay.address) logging.debug(msg) self._emptyQueue() self._handshake = None def _emptyQueue(self): '''Write any cells in this connection's queue to this connection's transport and remove them from the queue. ''' for cell in self._cell_queue: self.writeCell(cell) self._cell_queue = None
[docs] def connectionMade(self): '''Initial TLS connection made, immediately start the connection handshake. ''' logging.debug('Connection made to {0}.'.format(self._relay.address)) self._handshake = V3FSM(self.transport) cell = self._handshake.getInitiatingCell() self.transport.write(cell.getBytes())
[docs] def addNewCircuit(self, circuit): '''Add new a new circuit to the circuit map for this connection. Raise a ValueError if *circuit.circuit_id* already exists in self._circuit_map - that means something went very wrong. :param oppy.circuit.circuit.Circuit circuit: circuit to add to this connection's circuit map ''' if circuit.circuit_id in self._circuit_map: msg = "Circuit with id {} already exists on connection to {}" msg = msg.format(circuit.circuit_id, self._relay.address) raise ValueError(msg) self._circuit_map[circuit.circuit_id] = circuit
[docs] def closeConnection(self): '''Close this connection and all associated circuits; notify the connection pool. ''' from oppy.shared import connection_pool logging.debug("Closing connection to {}.".format(self._relay.address)) self._destroyAllCircuits() connection_pool.removeConnection(self._relay.fingerprint) self.transport.abortConnection()
[docs] def connectionLost(self, reason): '''Connection to relay has been lost; close this connection and all associated circuits; notify connection pool. :param reason reason: reason this connection was lost ''' from oppy.shared import connection_pool msg = "Connection to {} lost: {}." logging.warning(msg.format(self._relay.address, reason)) self._destroyAllCircuits() connection_pool.removeConnection(self._relay.fingerprint)
def _destroyAllCircuits(self): '''Destroy all circuits associated with this connection. ''' for circuit in self._circuit_map.values(): circuit.destroyCircuitFromConnection()
[docs] def circuitDestroyed(self, circuit_id): '''The circuit with *circuit_id* has been destroyed. Remove this circuit from this connection's circuit map if we know about it. If there are no remaining circuit's using this connection, ask the connection pool if this connection should be closed and, if so, close this connection. :param int circuit_id: id of the circuit that was destroyed ''' from oppy.shared import connection_pool try: del self._circuit_map[circuit_id] except KeyError: msg = "Connection to {} was notified that circuit {} was destroyed" msg += ", but connection has no reference to circuit {}." msg = msg.format(self._relay.address, circuit_id, circuit_id) logging.debug(msg) if len(self._circuit_map) == 0: fprint = self._relay.fingerprint if connection_pool.shouldDestroyConnection(fprint) is True: connection_pool.removeConnection(fprint) self.closeConnection()