ppp_core.Router.request_from_answer()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
cc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""Router of the PPP core."""
2
3 1
import time
4 1
import json
5 1
import logging
6 1
import requests
7 1
import operator
8 1
import itertools
9 1
import functools
10 1
import traceback
11 1
import importlib
12 1
import threading
13 1
import ppp_libmodule
14 1
from ppp_datamodel import AbstractNode
15 1
from ppp_datamodel.communication import Request, Response, TraceItem
16 1
from .config import CoreConfig
17 1
from .exceptions import ClientError, BadGateway
18 1
from . import verbose_log
19
20 1
s = lambda x:x if isinstance(x, str) else x.decode()
21
22 1
DEFAULT_ACCURACY = 0
23 1
DEFAULT_RELEVANCE = 0
24
25 1
try:
26 1
    level = getattr(logging, CoreConfig().loglevel.upper(), None)
27 1
except ppp_libmodule.exceptions.InvalidConfig:
28 1
    pass
29
else:
30
    if not isinstance(level, int):
31
        logger.error('Invalid log level: %s' % self.config.loglevel)
32
    else:
33
        logging.basicConfig(level=level)
34 1
logger = logging.getLogger('router')
35
36 1
def freeze(obj):
37
    if isinstance(obj, dict):
38
        return frozenset((freeze(x), freeze(y)) for (x,y) in obj.items())
39
    elif isinstance(obj, list):
40
        return tuple(map(freeze, obj))
41
    elif isinstance(obj, set):
42
        return frozenset(map(freeze, obj))
43
    elif isinstance(obj, TraceItem):
44
        return freeze(obj.as_dict())
45
    elif isinstance(obj, (str, tuple, frozenset, int, float)):
46
        return obj
47
    else:
48
        raise Exception(obj)
49
50 1
def answer_id(answer):
51 1
    return (answer.language, answer.tree,
52
            frozenset(answer.measures.items()))
53 1
def remove_duplicates(reference, new):
54 1
    result = []
55 1
    for x in new:
56 1
        id_ = answer_id(x)
57 1
        if id_ in reference:
58 1
            continue
59 1
        reference.append(id_)
60 1
        result.append(x)
61 1
    return result
62
63
64 1
class Router:
65 1
    def __init__(self, request):
66 1
        self.id = request.id
67 1
        self.language = request.language
68 1
        assert isinstance(request.tree, AbstractNode)
69 1
        self.tree = request.tree
70 1
        logger.info('Request: %s' % self.tree)
71 1
        self.measures = request.measures
72 1
        self.trace = request.trace
73 1
        self.config = CoreConfig()
74
75 1
    def answer(self):
76 1
        start_time = time.time()
77 1
        answer_ids = []
78 1
        answers = []
79 1
        new_answers = [Response(self.language, self.tree,
80
                                self.measures, self.trace)]
81 1
        for i in range(0, self.config.nb_passes):
82
            # Perform the pass
83 1
            requests = list(map(self.request_from_answer, new_answers))
84 1
            new_answers = []
85 1
            for request in requests:
86 1
                new_answers.extend(self.one_pass(request))
87
            # Remove duplicates, and update the answer list
88 1
            new_answers = list(remove_duplicates(answer_ids, new_answers))
89 1
            answers.extend(new_answers)
90
        # TODO: should sort according to accuracy too
91 1
        answers = sorted(answers,
92
                      key=lambda x:x.measures.get('relevance', DEFAULT_RELEVANCE),
93
                      reverse=True)
94
95
        # Log answers in an other thread, so it does not add a delay.
96 1
        end_time = time.time()
97 1
        threading.Thread(target=verbose_log.log_answers,
98
                args=(self.config, answers, start_time, end_time)).start()
99
100 1
        return answers
101
102 1
    def request_from_answer(self, answer):
103 1
        return Request(self.id, answer.language, answer.tree,
104
                       answer.measures, answer.trace)
105
106 1
    def one_pass(self, request):
107
        # First make all requests so modules can prepare their answer
108
        # while we send requests to other modules
109 1
        streams = self._get_streams(request)
110 1
        answers = map(self._stream_reader, streams)
111 1
        answers = map(self._process_answers, answers)
112 1
        answers = itertools.chain(*list(answers)) # Flatten answers lists
113 1
        answers = itertools.chain(self._get_python(request), answers)
114 1
        answers = filter(bool, answers) # Eliminate None values
115 1
        return answers
116
117 1
    def _get_python_class(self, url):
118 1
        (module_path, class_path) = url.split(':')
119 1
        cls = importlib.import_module(module_path)
120 1
        for token in class_path.split('.'):
121 1
            cls = getattr(cls, token)
122 1
        return cls
123
124 1
    def _get_python(self, request):
125 1
        for module in self.config.modules:
126 1
            if module.should_send(request) and module.method == 'python':
127 1
                try:
128 1
                    obj = self._get_python_class(module.url)(request)
129 1
                    for answer in obj.answer():
130 1
                        yield answer
131 1
                except KeyboardInterrupt:
132
                    raise
133 1
                except Exception:
134 1
                    tb = traceback.format_exc()
135 1
                    logger.error('Error in module %s\n: %s' % (module, tb))
136
137 1
    def _get_streams(self, request):
138 1
        headers = {'Content-type': 'application/json',
139
                   'Accept': 'application/json'}
140 1
        payload = request.as_json()
141 1
        getter = functools.partial(requests.post, stream=True,
142
                                   headers=headers, data=payload)
143 1
        streams = []
144 1
        for module in self.config.modules:
145 1
            try:
146 1
                if module.should_send(request) and module.method == 'http':
147 1
                    streams.append((module, getter(module.url)))
148
            except requests.exceptions.ConnectionError as exc: # pragma: no cover
149
                logger.warning('Module %s could not be queried: %s' %
150
                                (module, exc.args[0]))
151
                pass
152 1
        return streams
153
154 1
    def _stream_reader(self, stream):
155 1
        (module, stream) = stream
156 1
        if stream.status_code != 200:
157
            logger.warning('Module %s returned %d: %s' %
158
                            (module, stream.status_code, stream.content))
159
            return None
160
        else:
161 1
            try:
162 1
                return (module, json.loads(s(stream.content)))
163
            except ValueError:
164
                logger.warning('Module %s returned %d: %s' %
165
                                (module, stream.status_code, stream.content))
166
                return None
167
            finally:
168 1
                stream.close()
169
170 1
    def _process_answers(self, t):
171 1
        if t:
172 1
            (module, answers) = t
173 1
            answers = map(Response.from_dict, answers)
174 1
            return list(map(functools.partial(self._process_answer, module), answers))
175
        else:
176
            return []
177
178 1
    def _process_answer(self, module, answer):
179 1
        missing = {'accuracy', 'relevance'} - set(answer.measures)
180 1
        if missing:
181
            logger.warning('Missing mandatory measures from module %s: %r' %
182
                             (module, missing))
183 1
        accuracy = answer.measures.get('accuracy', DEFAULT_ACCURACY)
184 1
        relevance = answer.measures.get('relevance', DEFAULT_RELEVANCE)
185 1
        if accuracy < 0 or accuracy > 1:
186 1
            logger.warning('Module %s answered with invalid accuracy: %r' %
187
                    (module, accuracy))
188 1
            return None
189 1
        answer.measures['accuracy'] = accuracy
190 1
        answer.measures['relevance'] = relevance * module.coefficient
191
        return answer
192