Source code for stream.stream

# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information

'''
.. topic:: Details

    Streams are the interface between local requests coming from
    OppySOCKSProtocol instances and circuits. Streams are responsible for:

        - Initiating a connection request (i.e. a RelayBeginCell) on behalf
          of a local application
        - Passing data from circuits to local applications and vice versa
        - Informing OppySOCKSProtocol instances (and thus, the client
          application) when a remote resource closes the stream
        - Informing the circuit when the local application closes the stream
        - Splitting up data to be written to the network into chunks that
          can fit into a RelayData cell
        - Doing some rudimentary flow-control

'''
import logging

from twisted.internet import defer

from oppy.cell.definitions import MAX_RPAYLOAD_LEN
from oppy.shared import circuit_manager


SENDME_THRESHOLD = 450
STREAM_WINDOW_INIT = 500
STREAM_WINDOW_SIZE = 50


[docs]class Stream(object): '''Represent a Tor Stream.''' def __init__(self, request, socks): ''' :param oppy.util.exitrequest.ExitRequest request: connection request for this stream :param oppy.socks.socks.OppySOCKSProtocol socks: socks protocol instance this stream should relay data to and from ''' self.stream_id = None self._read_queue = defer.DeferredQueue() self._write_queue = defer.DeferredQueue() self._read_deferred = None self._write_deferred = None self.request = request self.socks = socks self._deliver_window = STREAM_WINDOW_INIT self._package_window = STREAM_WINDOW_INIT self.circuit = None self._circuit_request = circuit_manager.requestOpenCircuit(self) self._circuit_request.addCallback(self._registerNewStream) def _registerNewStream(self, circuit): '''Register this stream with it's circuit, initiate a conenction request, and begin listening for data from the network. Called when this stream receives a suitable open circuit. :param oppy.circuit.circuit.Circuit circuit: open circuit suitable for use on this stream ''' self.circuit = circuit self._circuit_request = None # notify circuit it has a new stream # NOTE: circuit sets this stream's stream_id self.circuit.registerStream(self) # tell the circuit to setup this stream (i.e. send a RELAY_BEGIN cell) self.circuit.initiateStream(self) # start listening for incoming cells from our circuit self._pollReadQueue() @staticmethod def _chunkRelayData(data): '''Split *data* into chunks that can fit inside a RelayData cell. :param str data: data to split :returns **list, str** list of pieces of data split into sizes that fit into a RelayData cell ''' LEN = MAX_RPAYLOAD_LEN return [data[i:i + LEN] for i in xrange(0, len(data), LEN)]
[docs] def recvData(self, data): '''Put data received from the network on this stream's read queue. Called when the circuit attached to this stream passes data to this stream. :param str data: data passed in from circuit to write to this stream's attached SOCKS protocol ''' self._read_queue.put(data)
[docs] def writeData(self, data): '''Split *data* into chunks that can fit in a RelayData cell, and put each chunk on this stream's write queue. Called when the local application attached to this stream sends data to the network. :param str data: data passed in from this stream's attached SOCKS protocol to write to this stream's circuit ''' chunks = Stream._chunkRelayData(data) for chunk in chunks: self._write_queue.put(chunk)
def _pollWriteQueue(self): '''Pull a chunk of data from this stream's write queue and, when the data is ready, write it to the attached circuit. ''' self._write_deferred = self._write_queue.get() self._write_deferred.addCallback(self._writeData) def _pollReadQueue(self): '''Pull a chunk of data from this stream's read queue and, when the data is ready, write it to the attached SOCKS protocol instance. ''' self._read_deferred = self._read_queue.get() self._read_deferred.addCallback(self._recvData) def _writeData(self, data): '''Write *data* to the circuit attached to this stream and decrement the packaging window. :param str data: data received from attached SOCKS protocol instance to be written to the attached circuit ''' self.circuit.writeData(data, self.stream_id) self._decPackageWindow() def _recvData(self, data): '''Receive *data* from the attached circuit and hand off to the attached SOCKS protocol instance. Decrement this stream's deliver window. :param str data: data received from attached circuit, to be written to the attached SOCKS protocol instance ''' self.socks.writeData(data) self._decDeliverWindow() def _decDeliverWindow(self): '''Decrement this stream's deliver window and initiate sending a sendme cell if the deliver window drops too low. If the deliver window is <= SENDME_THRESHOLD, tell the attached circuit to send a sendme cell on behalf of this stream. ''' # XXX we should be checking how many cells we have left to flush # here before just blindly writing a RELAY_SENDME self._deliver_window -= 1 if self._deliver_window <= SENDME_THRESHOLD: self.circuit.sendStreamSendMe(self.stream_id) self._deliver_window += STREAM_WINDOW_SIZE self._pollReadQueue() def _decPackageWindow(self): '''Decrement this stream's package window and, if we still can, listen for more data from the attached SOCKS protocol instance. If the package window <= 0, we need to wait until we receive a sendme cell before writing anymore local data from this stream to the attached circuit. ''' self._package_window -= 1 if self._package_window > 0: self._pollWriteQueue() else: self._write_deferred = None
[docs] def incrementPackageWindow(self): '''Increment this stream's package window and, if the package window is now above zero and this stream was in a buffering state, begin listening for local data again. Called by the attached circuit when it receives a sendme cell for this stream. ''' self._package_window += STREAM_WINDOW_SIZE # if we were buffering, we're now free to send data again if self._write_deferred is None and self._package_window > 0: self._pollWriteQueue()
[docs] def streamConnected(self): '''Begin listening for local data from the attached SOCKS protocol to write to this stream's circuit. Called when the attached circuit receives a RelayConnected cell for this stream's RelayBegin request. ''' self._pollWriteQueue()
[docs] def closeFromCircuit(self): '''Called when this stream is closed by the circuit. This can be caused by receiving a RelayEnd cell, the circuit being torn down, or the connection going down. We do not need to send a RelayEnd cell ourselves if the circuit closed this stream. Notify any associated SOCKS protocols and let circuit know this stream has closed. ''' msg = "Stream {} closing from circuit {}" msg = msg.format(self.stream_id, self.circuit.circuit_id) logging.debug(msg) self.socks.closeFromStream()
[docs] def closeFromSOCKS(self): '''Called when the attached SOCKS protocol object is done with this stream. Request that circuit send a RelayEnd cell on our behalf and notify circuit we're now closed. ''' msg = "Stream {} on circuit {} closing from SOCKS." msg = msg.format(self.stream_id, self.circuit.circuit_id) logging.debug(msg) self.circuit.unregisterStream(self)