Source code for circuit.circuitmanager

# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information

'''
.. topic:: Details

    CircuitManager manages a pool of circuits. CircuitManager knows about all
    open and pending circuits and can make choices about how and when
    circuits are built and destroyed.

    Currently, it's up to CircuitManager to:

        - Create an initial pool of circuits (currently 4 IPv4 and 1 IPv6
          initial circuits by default)
        - Handle incoming requests for open circuits (made by streams) and
          assign suitable circuits to handle the requests
        - Decide when and if a circuit should be destroyed when its stream
          count drops to zero
        - Replenish the circuit pool when a circuit is destroyed and, if
          necessary, build new circuits to handle any (now orphaned) pending
          streams
        - Assign pending streams to newly opened circuits when the circuits
          complete their extension process
        - Destroy all open and pending circuits when oppy shuts down

'''
import logging
import random

from collections import namedtuple

from twisted.internet import defer

from oppy.circuit.circuit import Circuit, CType
from oppy.path.path import PathConstraints
from oppy.path.defaults import (
    DEFAULT_ENTRY_FLAGS,
    DEFAULT_MIDDLE_FLAGS,
    DEFAULT_EXIT_FLAGS,
)


PendingStream = namedtuple("PendingStream", (
    "stream",
    "deferred",
))


DEFAULT_OPEN_IPv4 = 4
DEFAULT_OPEN_IPv6 = 1


DEFAULT_IPv4_CONSTRAINTS = PathConstraints(
    # add guard here with 'fingerprint': 'value' arg for exit
    entry={'flags': DEFAULT_ENTRY_FLAGS, 'ntor': True},
    middle={'flags': DEFAULT_MIDDLE_FLAGS, 'ntor': True},
    exit={'flags': DEFAULT_EXIT_FLAGS, 'ntor': True},
)
DEFAULT_IPv6_CONSTRAINTS = PathConstraints(
    entry={'flags': DEFAULT_ENTRY_FLAGS, 'ntor': True},
    middle={'flags': DEFAULT_MIDDLE_FLAGS, 'ntor': True},
    exit={'flags': DEFAULT_EXIT_FLAGS, 'ntor': True,
          'exit_IPv6': True},
)


[docs]class CircuitManager(object): '''Manage a pool of circuits.''' def __init__(self): logging.debug("Creating circuit manager.") self._open_circuit_map = {} self._pending_circuit_map = {} self._pending_stream_pool = [] self._id_counter = 1 self._min_IPv4_count = DEFAULT_OPEN_IPv4 self._min_IPv6_count = DEFAULT_OPEN_IPv6 self._sent_open_message = False # create default circuit pool for i in xrange(self._min_IPv4_count): self._buildNewCircuit(DEFAULT_IPv4_CONSTRAINTS) for i in xrange(self._min_IPv6_count): self._buildNewCircuit(DEFAULT_IPv6_CONSTRAINTS)
[docs] def requestOpenCircuit(self, stream): '''Return a deferred that will fire with an open circuit that can handle the stream's request. There are three general cases to handle when a new open circuit request comes in: 1. An open circuit exists that can handle this request. In this case, choose a random circuit from the set of open circuits that can handle this request and immediately callback the deferred with the chosen open circuit. 2. A pending circuit exists that, when open, can handle this request. In this case, add the request to a pool of pending requests. Whenever a circuit opens, circuit manager checks all pending requests and assigns any pending requests that the newly opened circuit can handle to that circuit. So when a pending circuit opens that can handle this stream, the stream's deferred will be called back with the fresly opened circuit. 3. No open or pending circuits exist that can handle the request. In this case, begin building a new circuit that can. Add the request to a pending request pool and, when a circuit opens that can handle the request, callback this stream's deferred with the newly opened circuit. :param oppy.stream.stream.Stream stream: the stream that is requesting an open circuit :returns: **twisted.internet.defer.Deferred** which will fire when a circuit has opened that can handle this stream's request. ''' logging.debug("Circuit manager got an open circuit request.") request = stream.request d = defer.Deferred() # list of currently open circuits that can handle the given request open_candidates = self._getOpenCandidates(request) # choose a random open circuit for this request if we can if len(open_candidates) > 0: circuit_choice = random.choice(open_candidates) msg = "Assigning request to circuit {}." logging.debug(msg.format(circuit_choice.circuit_id)) d.callback(circuit_choice) else: # list of circuits currently being built that can handle the # given request pending_candidates = self._getPendingCandidates(request) # if we have no pending circuits that can handle this request, # start building a new one that can if len(pending_candidates) == 0: msg = "Building a new circuit to handle the new request." logging.debug(msg) self._buildNewCircuitForRequest(request) self._pending_stream_pool.append(PendingStream(stream, d)) return d
[docs] def shouldDestroyCircuit(self, circuit): '''Return **True** iff CircuitManager thinks the calling circuit should be destroyed. Circuits call shouldDestroyCircuit() when their number of open streams drops to zero. Since CircuitManager knows about all open and pending circuits, it can make an informed judgement about whether the calling circuit should be destroyed or remain open. Currently, CircuitManager maintains at least 4 open or pending IPv4 circuits and one open or pending IPv6 circuit. If the number of streams on any circuit drops to zero and it can be closed while still satisfying these basic constraints, then CircuitManager tells it to begin destroying itself (returns True). :param oppy.circuit.circuit.Circuit circuit: circuit to consider destroying. :returns: **bool** **True** if CircuitManager decides this circuit should be destroyed, **False** otherwise. ''' if circuit.ctype == CType.IPv4: if self._totalIPv4Count() - 1 < self._min_IPv4_count: return False return True if self._totalIPv6Count() - 1 < self._min_IPv6_count: return False return True
[docs] def circuitDestroyed(self, circuit): '''Circuits call circuitDestroyed() when they have cleaned up after themselves and closed. Circuits may need to do a number of things before references to them are completely removed. Exactly what steps a circuit needs to take before being fully destroyed depend on both the circuit's state and the state of the whole program. After a circuit has fully cleaned up after itself and taken all necessary closing actions, it calls CircuitManager.circuitDestroyed(). circuitDestroyed() does three things: 1. Remove the closed circuit from any internal maps. 2. Check the pool of pending requests. If any pending requests exist that cannot be handled by any open or pending circuits, begin building a new circuit to handle these orphaned requests. This can happen when a circuit is built to handle a particular request but is then destroyed/an error occurs while it is being built. 3. Check CircuitManager's basic open and pending circuit requirements to see if a new circuit of a certain type should be built. For instance, if an IPv4 circuit closes and the number of open and pending IPv4 circuits is below self._min_IPv4_count, a new IPv4 circuit will be built. :param circuit_id: id of circuit to be destroyed :type circuit_id: int ''' cid = circuit.circuit_id if cid not in self._pending_circuit_map and cid not in self._open_circuit_map: msg = "Circuit manager was notified that circuit {} was destroyed," msg += " but manager has no reference to this circuit." logging.debug(msg) return try: del self._pending_circuit_map[cid] msg = "Destroyed pending circuit {}.".format(cid) logging.debug(msg) except KeyError: del self._open_circuit_map[cid] msg = "Destroyed open circuit {}.".format(cid) logging.debug(msg) # assign any pending stream requests we can to open circuits for circ in self._open_circuit_map.values(): self._assignPossiblePendingRequests(circ) # build new circuits to handle any pending requests that can't # currently be satisfied. for pending_stream in self._pending_stream_pool: request = pending_stream.stream.request if len(self._getPendingCandidates(request)) == 0: msg = "After destroying circuit {}, a pending request has no " msg += "circuits that can handle it. Creating a new circuit." logging.debug(msg.format(cid)) self._buildNewCircuitForRequest(request) # if we've dropped below the default number of circuits that should # be open, start building a new circuit self._considerReplenishingCircuitPool()
[docs] def circuitOpened(self, circuit): '''Circuits call circuitOpened() when they have successfully completed their build process and are ready to handle incoming streams. When a circuit opens, CircuitManager: - removes it from the pending circuit map - adds it to the open circuit map - assigns any pending requests that this circuit can handle to this freshly opened circuit - if this is the first circuit successfully opened, send a nice message to the user letting them know oppy is ready to forward traffic :param oppy.circuit.circuit.Circuit circuit: circuit that has just opened. ''' msg = "Circuit manager notified that circuit {} opened." logging.debug(msg.format(circuit.circuit_id)) # remove circuit from pending map try: del self._pending_circuit_map[circuit.circuit_id] except KeyError: msg = "Circuit manager was notified circuit {} opened, but " msg += "manager has no reference to this circuit." logging.debug(msg.format(circuit.circuit_id)) return # add to open map self._open_circuit_map[circuit.circuit_id] = circuit # assign new circuit any pending streams it can handle self._assignPossiblePendingRequests(circuit) # send a nice message letting the user know we opened a circuit if # we haven't done so yet if self._sent_open_message is False: msg = "Circuit built successfully! oppy is ready to forward " msg += "traffic :)" logging.info(msg) self._sent_open_message = True
[docs] def destroyAllCircuits(self): '''Destroy all open and pending circuits **and** remove all pending requests. ''' msg = "Destroying all open and pending connections, circuits, and " msg += "streams." logging.debug(msg) self._pending_stream_pool = [] for circuit in self._open_circuit_map.values(): circuit.destroyCircuitFromManager() for circuit in self._pending_circuit_map.values(): circuit.destroyCircuitFromManager()
def _assignPossiblePendingRequests(self, circuit): '''Check all pending requests and assign any requests to *circuit* that it can handle. If a pending request is assigned to *circuit*, remove that request from the pending request pool. :param oppy.circuit.circuit.Circuit circuit: circuit to try assigning pending requests to ''' # register any pending streams that this circuit can handle for pending_stream in self._pending_stream_pool[:]: request = pending_stream.stream.request # if this circuit can handle a request, callback with this circuit if circuit.canHandleRequest(request): msg = "Assigning pending request to opened circuit {}." logging.debug(msg.format(circuit.circuit_id)) pending_stream.deferred.callback(circuit) self._pending_stream_pool.remove(pending_stream) def _buildNewCircuit(self, path_constraints): '''Build a new circuit, using a path that satisfies *path_constraints*. Assign an ID to the new circuit and add it to the pending circuit map. :param oppy.path.path.PathConstraints path_constraints: The path constraints that the new circuit's path should satisfy. ''' msg = "Building a new circuit with id {}." logging.debug(msg.format(self._id_counter)) new_circuit = Circuit(self._id_counter, path_constraints) self._pending_circuit_map[new_circuit.circuit_id] = new_circuit self._id_counter += 1 def _buildNewCircuitForRequest(self, request): '''Build a new circuit such that the circuit's exit relay claims to allow the *request*. :param oppy.util.exitrequest.ExitRequest request: The request that the new circuit's exit relay should allow. ''' # build a new circuit with default flags and ntor that has an exit # node that can handle request dest = request.addr + ':' + str(request.port) entry = {'flags': DEFAULT_ENTRY_FLAGS, 'ntor': True} middle = {'flags': DEFAULT_MIDDLE_FLAGS, 'ntor': True} if request.is_ipv4: exit = {'flags': DEFAULT_EXIT_FLAGS, 'ntor': True, 'exit_to_IP_and_port': dest} else: exit = {'flags': DEFAULT_EXIT_FLAGS, 'ntor': True, 'exit_IPv6': True, 'exit_to_IP_and_port': dest} constraints = PathConstraints(entry=entry, middle=middle, exit=exit) self._buildNewCircuit(constraints) def _considerReplenishingCircuitPool(self): '''Decide whether or not to build a new circuit - called when a circuit is destroyed. Check CircuitManager's basic requirements for open/pending circuits. If they are not satisfied, build a new circuit. ''' # check if we're below the threshold of either IPv4 or IPv6 circuits # we should have open or pending. if so, build a new circuit of # the required type if self._totalIPv4Count() < self._min_IPv4_count: msg = "Replenishing circuit pool with a new IPv4 circuit." logging.debug(msg) self._buildNewCircuit(DEFAULT_IPv4_CONSTRAINTS) if self._totalIPv6Count() < self._min_IPv6_count: msg = "Replenishing circuit pool with a new IPv6 circuit." logging.debug(msg) self._buildNewCircuit(DEFAULT_IPv6_CONSTRAINTS) def _openIPv4Count(self): '''Return the number of open IPv4 circuits. :returns: **int** number of open IPv4 circuits ''' return len([i for i in self._open_circuit_map.values() if i.ctype == CType.IPv4]) def _openIPv6Count(self): '''Return the number of open IPv6 circuits. :returns: **int** number of open IPv6 circuits. ''' return len([i for i in self._open_circuit_map.values() if i.ctype == CType.IPv6]) def _pendingIPv4Count(self): '''Return the number of pending IPv4 circuits. :returns: **int** number of pending IPv4 circuits. ''' return len([i for i in self._pending_circuit_map.values() if i.ctype == CType.IPv4]) def _pendingIPv6Count(self): '''Return the number of pending IPv6 circuits. :returns: **int** number of pending IPv6 circuits ''' return len([i for i in self._pending_circuit_map.values() if i.ctype == CType.IPv6]) def _totalIPv4Count(self): '''Return the total (open + pending) IPv4 circuits. :returns: **int** total IPv4 circuits ''' return self._openIPv4Count() + self._pendingIPv4Count() def _totalIPv6Count(self): '''Return the total (open + pending) IPv6 circuits. :returns: **int** total IPv6 circuits ''' return self._openIPv6Count() + self._pendingIPv6Count() def _getOpenCandidates(self, request): '''Return a list of circuits whose exit relay claims to allow the *request*. :returns: **list, oppy.circuit.circuit.Circuit** open circuits whose exit relay can handle the request ''' return [i for i in self._open_circuit_map.values() if i.canHandleRequest(request)] def _getPendingCandidates(self, request): '''Return a list of pending circuits that claim to handle the request. .. note:: Since a pending circuit may not yet have an exit relay, whether or not this circuit can *actually* handle the request is just an informed guess. It may turn out that it can't **actually** handle the request once the circuit is open. :returns: **list, oppy.circuit.circuit.Circuit** pending circuits that can (probably) handle the request ''' return [i for i in self._pending_circuit_map.values() if i.canHandleRequest(request)]