# Copyright 2014, 2015, Nik Kinkel
# See LICENSE for licensing information
'''
.. topic:: Details
    NetStatus handles downloading network status documents and serving requests
    for server descriptors (needed for building paths and circuits). When
    instantiated, NetStatus immediately tries to download the most
    recent copies of the network consensus and the set of current server
    descriptors. NetStatus also takes care of automatically updating local
    copies of network status documents.
    NetStatus:
        - Uses V2Dir directory caches when possible, only falling back to
          directory authorities if we don't know about any caches.
        - Tries to use a cached copy of the consensus from the filesystem
          to initially figure out the V2Dir caches.
        - Writes a copy of the consensus to the filesystem for later use.
        - Uses stem to represent the consensus, consensus entries, and
          server descriptors.
        - Does the actual document parsing in separate worker threads.
        - Schedules the next document download
        - Handles incoming requests for server descriptors by returning
          a deferred that fires when NetStatus has a good set of server
          descriptors
'''
import io
import logging
import random
import zlib
from twisted.internet import defer, threads
from twisted.web.client import getPage
from stem import Flag
from stem.descriptor import parse_file
from stem.descriptor import DocumentHandler
from stem.descriptor.networkstatus import NetworkStatusDocumentV3
from stem.descriptor.remote import get_authorities
from oppy.netstatus import definitions as DEF
# how long we'll wait before downloading fresh network status documents
# NOTE: this is wrong according to tor-spec. We should actually be checking
#       the times on the consensus and choosing a random time within some
#       window.
DEFAULT_DOWNLOAD_INTERVAL = 3600
[docs]class NetStatus(object):
    '''Download consensus and server descriptor documents.'''
    def __init__(self):
        '''Immediately start downloading network status documents.
        Upon instantiation, NetStatus will immediately begin trying to
        download new network status documents. Incoming requests for
        descriptors (usually caused by requests to get a Path) will be
        added to a callback chain and called back when we have a good set of
        router descriptors.
        '''
        logging.debug("Starting NetStatus.")
        # self._initial tracks whether we're on the "initial" download of the
        # network docs. if so, the path request callback is fired when we
        # have descriptors available.
        self._initial = True
        # chain of requests for descriptors that will get called back when we
        # get the first set of network docs. after that, this is no longer used
        self._descriptor_request_stack = defer.Deferred()
        # endpoints is a list of V2Dir directory caches we can choose from to
        # try getting network docs. upon instantiation, we check if there is
        # a "cached-consensus" file to pull them from, otherwise endpoints is
        # initialized to the set of directory authorities
        self._endpoints = self._getInitialEndpoints()
        self._consensus = None
        self._descriptors = None
        self._getDocuments()
[docs]    def getDescriptors(self):
        '''Return a deferred which will callback with a dict mapping
        fingerprint->RelayDescriptor when we get a set of good descriptors.
        Called back immediately if we already have server descriptors.
        :returns: **twisted.internet.defer.Deferred** that fires with a dict
            that maps fingerprints->RelayDescriptors
        '''
        d = defer.Deferred()
        # if we have descriptors, immediately callback
        if self._descriptors is not None:
            d.callback(self._descriptors)
        # if we don't have descriptors yet, add this request to the
        # descriptor request chain and callback when we get docs downloaded
        else:
            def serveDescriptor(result):
                d.callback(result)
                return result
            self._descriptor_request_stack.addCallback(serveDescriptor)
        return d
 
    @defer.inlineCallbacks
    def _getDocuments(self):
        '''Download and parse network status documents.
        Asynchronously download and parse a fresh consensus and set of server
        descriptors. The parsing is done using stem in a separate thread.
        If this is the first time we've attempted to download documents,
        callback the descriptor request chain to start satisfying pending
        requests for server descriptors as soon as we have a good set of
        descriptors.
        '''
        logging.debug("Starting document downloader.")
        self._consensus = yield self._downloadConsensus()
        logging.info("Got fresh consensus.")
        self._descriptors = yield self._downloadDescriptors()
        logging.info("Got fresh server descriptors.")
        # if this is the first set of documents we've downloaded, start the
        # descriptor_request_stack callback chain to satisfy requests for
        if self._initial is True:
            self._descriptor_request_stack.callback(self._descriptors)
            self._initial = False
        # schedule the next download time
        self._scheduleDownload()
    @defer.inlineCallbacks
    def _downloadConsensus(self):
        '''Download, parse, and cache the current consensus.
        Try random V2Dir directory caches if we know about any already,
        falling back to directory authorities if have to. Actual consensus
        parsing is done in a separate thread.
        .. note: If this download fails for any reason (timeout, 503, zlib
            error, etc.), it will be immediately retried with another random
            V2Dir choice.
        :returns: **twisted.internet.defer.Deferred** that fires with a
            NetworkStatusDocumentV3.
        '''
        try:
            logging.debug("Starting consensus download.")
            d = random.choice(self._endpoints)
            addr = "http://" + str(d.address) + ":" + str(d.dir_port)
            logging.debug("Starting consensus download from {}".format(addr))
            raw = yield getPage(str(addr + DEF.CONSENSUS_URL))
            consensus = yield threads.deferToThread(self._processConsensus,
                                                    raw)
            defer.returnValue(consensus)
        except Exception as e:
            logging.debug("Error downloading consensus: {}.".format(e))
            logging.debug("Retrying consensus download.")
            # immediately retry the download on error
            ret = yield self._downloadConsensus()
            defer.returnValue(ret)
    @defer.inlineCallbacks
    def _downloadDescriptors(self):
        '''Download and parse the full set of server descriptors.
        Choose a random V2Dir cache to attempt a download from. Parse using
        stem in a separate thread.
        .. note:: We currently just download *all* server descriptors at
            once. This is probably not the best way to get descriptors,
            and these requests should be split up over multiple V2Dir
            caches.
        :returns: **twisted.internet.defer.Deferred** that fires with a dict
            mapping fingerprints->RelayDescriptors.
        '''
        try:
            d = random.choice(self._endpoints)
            addr = "http://" + str(d.address) + ":" + str(d.dir_port)
            logging.debug("Downloading descriptors from {}".format(addr))
            raw = yield getPage(str(addr + DEF.DESCRIPTORS_URL))
            descriptors = yield threads.deferToThread(self._processDescriptors,
                                                      raw)
            defer.returnValue(descriptors)
        except Exception as e:
            logging.debug("Error downloading descriptors: {}.".format(e))
            logging.debug("Retrying descriptors download.")
            # immediately retry on failure
            ret = yield self._downloadDescriptors()
            defer.returnValue(ret)
    def _processConsensus(self, raw):
        '''Decompress consensus, parse, write to "cached-consensus" and
        choose a new set of endpoints to use for the next download.
        .. note: This is run in a separate worker thread using
            twisted.internet.threads.deferToThread() because consensus
            parsing can take a while.
        :param str raw: compressed consensus bytes
        :returns: stem.descriptors.networkstatus.NetworkStatusDocumentV3
        '''
        raw = zlib.decompress(raw)
        consensus = NetworkStatusDocumentV3(raw)
        self._cacheConsensus(consensus)
        logging.debug("Wrote cached-consensus.")
        self._endpoints = self._extractV2DirEndpoints(consensus)
        logging.debug("Found {} V2Dir endpoints.".format(len(self._endpoints)))
        return consensus
    def _processDescriptors(self, raw):
        '''Decompress and parse descriptors, then build a dict mapping
        fingerprint -> RelayDescriptor for all relays found in both the
        network consensus and the server descriptor set.
        We throw away and relays that are not found in the network consensus.
        We also add a new attribute 'flags' to each RelayDescriptor. 'flags'
        is an attribue of RouterStatusEntry's found in the consensus, and
        adding them here simplifies path selection. 'flags' is a set of
        unicode strings.
        .. note: This runs in a separate work thread using
            twisted.internet.threads.deferToThread() because parsing tends to
            take a while.
        :param str raw: compressed server descriptor bytes
        :returns: **dict** mapping fingerprint -> RelayDescriptor for every
            relay found in both the current network consensus and the set
            of server descriptors.
        '''
        raw = zlib.decompress(raw)
        gen = parse_file(
            io.BytesIO(raw),
            DEF.STEM_DESCRIPTORS_TYPE,
            validate=True,
            document_handler=DocumentHandler.DOCUMENT,
        )
        descriptors = {}
        # only use descriptors that are also found in the consensus, and
        # also add the 'flags' attribute, a set of unicode strings describing
        # the flags a given RelayDescriptor has
        for relay in gen:
            try:
                flags = set(self._consensus.routers[relay.fingerprint].flags)
                relay.flags = flags
                descriptors[relay.fingerprint] = relay
            # skip any relays not found in the consensus
            except KeyError:
                pass
        return descriptors
    def _cacheConsensus(self, consensus):
        '''Dump a copy of the consensus to the "cached-consensus" file.
        :param stem.descriptor.networkstatus.NetworkStatusDocumentV3 consensus:
            fresh consensus to cache on filesystem
        '''
        try:
            with open(DEF.CONSENSUS_CACHE_FILE, 'w') as f:
                f.write(str(consensus))
        except Exception as e:
            logging.debug("Failed to write 'cached-consensus': {}.".format(e))
    def _extractV2DirEndpoints(self, consensus):
        '''Find a new set of V2Dir directory caches to use from the current
        consensus.
        :param stem.descriptors.networkstatus.NetworkStatusDocumentV3 consensus:
            fresh consensus to grab V2Dir caches from
        :return: **list, stem.descriptor.router_status_entry.RouterStatusEntry**
            RouterStatusEntry's that have the 'V2Dir' flag
        '''
        endpoints = set()
        for relay in consensus.routers.values():
            if Flag.V2DIR in relay.flags:
                endpoints.add(relay)
        return list(endpoints)
    def _getInitialEndpoints(self):
        '''Get an initial set of servers to download the network status
        documents from.
        First try reading from the "cached-consensus" file. If this isn't
        successful for any reason fallback to using the directory
        authorities.
        This is only called on instantiation, and any future downloads will
        already have a fresh set of V2Dir endpoints.
        .. note: We just use the directory authorities defined in stem.
        :returns: **list** containing either RouterStatusEntry objects with
            the 'V2Dir' flag or DirectoryAuthorities
        '''
        endpoints = None
        try:
            with open(DEF.CONSENSUS_CACHE_FILE, 'rb') as f:
                data = f.read()
            old_consensus = NetworkStatusDocumentV3(data)
            endpoints = self._extractV2DirEndpoints(old_consensus)
            msg = "Found {} V2Dir endpoints in cached-consensus."
            logging.debug(msg.format(len(endpoints)))
        except (IOError, ValueError) as e:
            logging.debug("Error reading from cached-consensus: {}".format(e))
            logging.debug("Falling back to directory authorities.")
        return list(endpoints) if endpoints else get_authorities().values()
    def _scheduleDownload(self):
        '''Schedule the next network status document download.
        .. note: This is incorrect. We should actually be calculating the
            correct time according to the parameters defined in Tor's
            **dir-spec**, but for now we just schedule a download an hour in
            the future of the previous download.
        '''
        from twisted.internet import reactor
        # XXX this is a bug, we should be choosing according to parameters
        #     in the tor `dir-spec`
        seconds = DEFAULT_DOWNLOAD_INTERVAL
        reactor.callLater(seconds, self._getDocuments)
        msg = "Scheduled next consensus download in {} seconds."
        logging.debug(msg.format(seconds))