# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information
'''
.. topic:: Details
NetStatus handles downloading network status documents and serving requests
for server descriptors (needed for building paths and circuits). When
instantiated, NetStatus immediately tries to download the most
recent copies of the network consensus and the set of current server
descriptors. NetStatus also takes care of automatically updating local
copies of network status documents.
NetStatus:
- Uses V2Dir directory caches when possible, only falling back to
directory authorities if we don't know about any caches.
- Tries to use a cached copy of the consensus from the filesystem
to initially figure out the V2Dir caches.
- Writes a copy of the consensus to the filesystem for later use.
- Uses stem to represent the consensus, consensus entries, and
server descriptors.
- Does the actual document parsing in separate worker threads.
- Schedules the next document download
- Handles incoming requests for server descriptors by returning
a deferred that fires when NetStatus has a good set of server
descriptors
'''
import io
import logging
import random
import zlib
from twisted.internet import defer, threads
from twisted.web.client import getPage
from stem import Flag
from stem.descriptor import parse_file
from stem.descriptor import DocumentHandler
from stem.descriptor.networkstatus import NetworkStatusDocumentV3
from stem.descriptor.remote import get_authorities
from oppy.netstatus import definitions as DEF
# how long we'll wait before downloading fresh network status documents
# NOTE: this is wrong according to tor-spec. We should actually be checking
# the times on the consensus and choosing a random time within some
# window.
DEFAULT_DOWNLOAD_INTERVAL = 3600
[docs]class NetStatus(object):
'''Download consensus and server descriptor documents.'''
def __init__(self):
'''Immediately start downloading network status documents.
Upon instantiation, NetStatus will immediately begin trying to
download new network status documents. Incoming requests for
descriptors (usually caused by requests to get a Path) will be
added to a callback chain and called back when we have a good set of
router descriptors.
'''
logging.debug("Starting NetStatus.")
# self._initial tracks whether we're on the "initial" download of the
# network docs. if so, the path request callback is fired when we
# have descriptors available.
self._initial = True
# chain of requests for descriptors that will get called back when we
# get the first set of network docs. after that, this is no longer used
self._descriptor_request_stack = defer.Deferred()
# endpoints is a list of V2Dir directory caches we can choose from to
# try getting network docs. upon instantiation, we check if there is
# a "cached-consensus" file to pull them from, otherwise endpoints is
# initialized to the set of directory authorities
self._endpoints = self._getInitialEndpoints()
self._consensus = None
self._descriptors = None
self._getDocuments()
[docs] def getDescriptors(self):
'''Return a deferred which will callback with a dict mapping
fingerprint->RelayDescriptor when we get a set of good descriptors.
Called back immediately if we already have server descriptors.
:returns: **twisted.internet.defer.Deferred** that fires with a dict
that maps fingerprints->RelayDescriptors
'''
d = defer.Deferred()
# if we have descriptors, immediately callback
if self._descriptors is not None:
d.callback(self._descriptors)
# if we don't have descriptors yet, add this request to the
# descriptor request chain and callback when we get docs downloaded
else:
def serveDescriptor(result):
d.callback(result)
return result
self._descriptor_request_stack.addCallback(serveDescriptor)
return d
@defer.inlineCallbacks
def _getDocuments(self):
'''Download and parse network status documents.
Asynchronously download and parse a fresh consensus and set of server
descriptors. The parsing is done using stem in a separate thread.
If this is the first time we've attempted to download documents,
callback the descriptor request chain to start satisfying pending
requests for server descriptors as soon as we have a good set of
descriptors.
'''
logging.debug("Starting document downloader.")
self._consensus = yield self._downloadConsensus()
logging.info("Got fresh consensus.")
self._descriptors = yield self._downloadDescriptors()
logging.info("Got fresh server descriptors.")
# if this is the first set of documents we've downloaded, start the
# descriptor_request_stack callback chain to satisfy requests for
if self._initial is True:
self._descriptor_request_stack.callback(self._descriptors)
self._initial = False
# schedule the next download time
self._scheduleDownload()
@defer.inlineCallbacks
def _downloadConsensus(self):
'''Download, parse, and cache the current consensus.
Try random V2Dir directory caches if we know about any already,
falling back to directory authorities if have to. Actual consensus
parsing is done in a separate thread.
.. note: If this download fails for any reason (timeout, 503, zlib
error, etc.), it will be immediately retried with another random
V2Dir choice.
:returns: **twisted.internet.defer.Deferred** that fires with a
NetworkStatusDocumentV3.
'''
try:
logging.debug("Starting consensus download.")
d = random.choice(self._endpoints)
addr = "http://" + str(d.address) + ":" + str(d.dir_port)
logging.debug("Starting consensus download from {}".format(addr))
raw = yield getPage(str(addr + DEF.CONSENSUS_URL))
consensus = yield threads.deferToThread(self._processConsensus,
raw)
defer.returnValue(consensus)
except Exception as e:
logging.debug("Error downloading consensus: {}.".format(e))
logging.debug("Retrying consensus download.")
# immediately retry the download on error
ret = yield self._downloadConsensus()
defer.returnValue(ret)
@defer.inlineCallbacks
def _downloadDescriptors(self):
'''Download and parse the full set of server descriptors.
Choose a random V2Dir cache to attempt a download from. Parse using
stem in a separate thread.
.. note:: We currently just download *all* server descriptors at
once. This is probably not the best way to get descriptors,
and these requests should be split up over multiple V2Dir
caches.
:returns: **twisted.internet.defer.Deferred** that fires with a dict
mapping fingerprints->RelayDescriptors.
'''
try:
d = random.choice(self._endpoints)
addr = "http://" + str(d.address) + ":" + str(d.dir_port)
logging.debug("Downloading descriptors from {}".format(addr))
raw = yield getPage(str(addr + DEF.DESCRIPTORS_URL))
descriptors = yield threads.deferToThread(self._processDescriptors,
raw)
defer.returnValue(descriptors)
except Exception as e:
logging.debug("Error downloading descriptors: {}.".format(e))
logging.debug("Retrying descriptors download.")
# immediately retry on failure
ret = yield self._downloadDescriptors()
defer.returnValue(ret)
def _processConsensus(self, raw):
'''Decompress consensus, parse, write to "cached-consensus" and
choose a new set of endpoints to use for the next download.
.. note: This is run in a separate worker thread using
twisted.internet.threads.deferToThread() because consensus
parsing can take a while.
:param str raw: compressed consensus bytes
:returns: stem.descriptors.networkstatus.NetworkStatusDocumentV3
'''
raw = zlib.decompress(raw)
consensus = NetworkStatusDocumentV3(raw)
self._cacheConsensus(consensus)
logging.debug("Wrote cached-consensus.")
self._endpoints = self._extractV2DirEndpoints(consensus)
logging.debug("Found {} V2Dir endpoints.".format(len(self._endpoints)))
return consensus
def _processDescriptors(self, raw):
'''Decompress and parse descriptors, then build a dict mapping
fingerprint -> RelayDescriptor for all relays found in both the
network consensus and the server descriptor set.
We throw away and relays that are not found in the network consensus.
We also add a new attribute 'flags' to each RelayDescriptor. 'flags'
is an attribue of RouterStatusEntry's found in the consensus, and
adding them here simplifies path selection. 'flags' is a set of
unicode strings.
.. note: This runs in a separate work thread using
twisted.internet.threads.deferToThread() because parsing tends to
take a while.
:param str raw: compressed server descriptor bytes
:returns: **dict** mapping fingerprint -> RelayDescriptor for every
relay found in both the current network consensus and the set
of server descriptors.
'''
raw = zlib.decompress(raw)
gen = parse_file(
io.BytesIO(raw),
DEF.STEM_DESCRIPTORS_TYPE,
validate=True,
document_handler=DocumentHandler.DOCUMENT,
)
descriptors = {}
# only use descriptors that are also found in the consensus, and
# also add the 'flags' attribute, a set of unicode strings describing
# the flags a given RelayDescriptor has
for relay in gen:
try:
flags = set(self._consensus.routers[relay.fingerprint].flags)
relay.flags = flags
descriptors[relay.fingerprint] = relay
# skip any relays not found in the consensus
except KeyError:
pass
return descriptors
def _cacheConsensus(self, consensus):
'''Dump a copy of the consensus to the "cached-consensus" file.
:param stem.descriptor.networkstatus.NetworkStatusDocumentV3 consensus:
fresh consensus to cache on filesystem
'''
try:
with open(DEF.CONSENSUS_CACHE_FILE, 'w') as f:
f.write(str(consensus))
except Exception as e:
logging.debug("Failed to write 'cached-consensus': {}.".format(e))
def _extractV2DirEndpoints(self, consensus):
'''Find a new set of V2Dir directory caches to use from the current
consensus.
:param stem.descriptors.networkstatus.NetworkStatusDocumentV3 consensus:
fresh consensus to grab V2Dir caches from
:return: **list, stem.descriptor.router_status_entry.RouterStatusEntry**
RouterStatusEntry's that have the 'V2Dir' flag
'''
endpoints = set()
for relay in consensus.routers.values():
if Flag.V2DIR in relay.flags:
endpoints.add(relay)
return list(endpoints)
def _getInitialEndpoints(self):
'''Get an initial set of servers to download the network status
documents from.
First try reading from the "cached-consensus" file. If this isn't
successful for any reason fallback to using the directory
authorities.
This is only called on instantiation, and any future downloads will
already have a fresh set of V2Dir endpoints.
.. note: We just use the directory authorities defined in stem.
:returns: **list** containing either RouterStatusEntry objects with
the 'V2Dir' flag or DirectoryAuthorities
'''
endpoints = None
try:
with open(DEF.CONSENSUS_CACHE_FILE, 'rb') as f:
data = f.read()
old_consensus = NetworkStatusDocumentV3(data)
endpoints = self._extractV2DirEndpoints(old_consensus)
msg = "Found {} V2Dir endpoints in cached-consensus."
logging.debug(msg.format(len(endpoints)))
except (IOError, ValueError) as e:
logging.debug("Error reading from cached-consensus: {}".format(e))
logging.debug("Falling back to directory authorities.")
return list(endpoints) if endpoints else get_authorities().values()
def _scheduleDownload(self):
'''Schedule the next network status document download.
.. note: This is incorrect. We should actually be calculating the
correct time according to the parameters defined in Tor's
**dir-spec**, but for now we just schedule a download an hour in
the future of the previous download.
'''
from twisted.internet import reactor
# XXX this is a bug, we should be choosing according to parameters
# in the tor `dir-spec`
seconds = DEFAULT_DOWNLOAD_INTERVAL
reactor.callLater(seconds, self._getDocuments)
msg = "Scheduled next consensus download in {} seconds."
logging.debug(msg.format(seconds))