Completed
Push — verbose-log ( bcf960 )
by Valentin
01:50
created

ppp_core.Router   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 128
Duplicated Lines 0 %

Test Coverage

Coverage 0%
Metric Value
dl 0
loc 128
ccs 0
cts 93
cp 0
rs 9.8
wmc 31

10 Methods

Rating   Name   Duplication   Size   Complexity  
A request_from_answer() 0 3 1
A _process_answer() 0 14 4
A _process_answers() 0 7 2
B answer() 0 26 4
A _get_python_class() 0 6 2
A _stream_reader() 0 15 3
B _get_streams() 0 16 5
A one_pass() 0 10 1
B _get_python() 0 12 7
A __init__() 0 9 2
1
"""Router of the PPP core."""
2
3 1
import time
4 1
import json
5 1
import logging
6 1
import requests
7 1
import operator
8 1
import itertools
9 1
import functools
10 1
import traceback
11 1
import importlib
12 1
import threading
13 1
import ppp_libmodule
14 1
from ppp_datamodel import AbstractNode
15 1
from ppp_datamodel.communication import Request, Response, TraceItem
16 1
from .config import CoreConfig
17 1
from .exceptions import ClientError, BadGateway
18 1
from . import verbose_log
19
20
s = lambda x:x if isinstance(x, str) else x.decode()
21
22
DEFAULT_ACCURACY = 0
23
DEFAULT_RELEVANCE = 0
24
25
try:
26
    level = getattr(logging, CoreConfig().loglevel.upper(), None)
27
except ppp_libmodule.exceptions.InvalidConfig:
28
    pass
29
else:
30
    if not isinstance(level, int):
31
        logger.error('Invalid log level: %s' % self.config.loglevel)
32
    else:
33
        logging.basicConfig(level=level)
34
logger = logging.getLogger('router')
35
36
def freeze(obj):
37
    if isinstance(obj, dict):
38
        return frozenset((freeze(x), freeze(y)) for (x,y) in obj.items())
39
    elif isinstance(obj, list):
40
        return tuple(map(freeze, obj))
41
    elif isinstance(obj, set):
42
        return frozenset(map(freeze, obj))
43
    elif isinstance(obj, TraceItem):
44
        return freeze(obj.as_dict())
45
    elif isinstance(obj, (str, tuple, frozenset, int, float)):
46
        return obj
47
    else:
48
        raise Exception(obj)
49
50
def answer_id(answer):
51
    return (answer.language, answer.tree,
52
            frozenset(answer.measures.items()))
53
def remove_duplicates(reference, new):
54
    result = []
55
    for x in new:
56
        id_ = answer_id(x)
57
        if id_ in reference:
58
            continue
59
        reference.append(id_)
60
        result.append(x)
61
    return result
62
63
64
class Router:
65
    def __init__(self, request):
66
        self.id = request.id
67
        self.language = request.language
68
        assert isinstance(request.tree, AbstractNode)
69
        self.tree = request.tree
70
        logger.info('Request: %s' % self.tree)
71
        self.measures = request.measures
72
        self.trace = request.trace
73
        self.config = CoreConfig()
74
75
    def answer(self):
76
        start_time = time.time()
77
        answer_ids = []
78
        answers = []
79
        new_answers = [Response(self.language, self.tree,
80
                                self.measures, self.trace)]
81
        for i in range(0, self.config.nb_passes):
82
            # Perform the pass
83
            requests = list(map(self.request_from_answer, new_answers))
84
            new_answers = []
85
            for request in requests:
86
                new_answers.extend(self.one_pass(request))
87
            # Remove duplicates, and update the answer list
88
            new_answers = list(remove_duplicates(answer_ids, new_answers))
89
            answers.extend(new_answers)
90
        # TODO: should sort according to accuracy too
91
        answers = sorted(answers,
92
                      key=lambda x:x.measures.get('relevance', DEFAULT_RELEVANCE),
93
                      reverse=True)
94
95
        # Log answers in an other thread, so it does not add a delay.
96
        end_time = time.time()
97
        threading.Thread(target=verbose_log.log_answers,
98
                args=(self.config, answers, start_time, end_time)).start()
99
100
        return answers
101
102
    def request_from_answer(self, answer):
103
        return Request(self.id, answer.language, answer.tree,
104
                       answer.measures, answer.trace)
105
106
    def one_pass(self, request):
107
        # First make all requests so modules can prepare their answer
108
        # while we send requests to other modules
109
        streams = self._get_streams(request)
110
        answers = map(self._stream_reader, streams)
111
        answers = map(self._process_answers, answers)
112
        answers = itertools.chain(*list(answers)) # Flatten answers lists
113
        answers = itertools.chain(self._get_python(request), answers)
114
        answers = filter(bool, answers) # Eliminate None values
115
        return answers
116
117
    def _get_python_class(self, url):
118
        (module_path, class_path) = url.split(':')
119
        cls = importlib.import_module(module_path)
120
        for token in class_path.split('.'):
121
            cls = getattr(cls, token)
122
        return cls
123
124
    def _get_python(self, request):
125
        for module in self.config.modules:
126
            if module.should_send(request) and module.method == 'python':
127
                try:
128
                    obj = self._get_python_class(module.url)(request)
129
                    for answer in obj.answer():
130
                        yield answer
131
                except KeyboardInterrupt:
132
                    raise
133
                except Exception:
134
                    tb = traceback.format_exc()
135
                    logger.error('Error in module %s\n: %s' % (module, tb))
136
137
    def _get_streams(self, request):
138
        headers = {'Content-type': 'application/json',
139
                   'Accept': 'application/json'}
140
        payload = request.as_json()
141
        getter = functools.partial(requests.post, stream=True,
142
                                   headers=headers, data=payload)
143
        streams = []
144
        for module in self.config.modules:
145
            try:
146
                if module.should_send(request) and module.method == 'http':
147
                    streams.append((module, getter(module.url)))
148
            except requests.exceptions.ConnectionError as exc: # pragma: no cover
149
                logger.warning('Module %s could not be queried: %s' %
150
                                (module, exc.args[0]))
151
                pass
152
        return streams
153
154
    def _stream_reader(self, stream):
155
        (module, stream) = stream
156
        if stream.status_code != 200:
157
            logger.warning('Module %s returned %d: %s' %
158
                            (module, stream.status_code, stream.content))
159
            return None
160
        else:
161
            try:
162
                return (module, json.loads(s(stream.content)))
163
            except ValueError:
164
                logger.warning('Module %s returned %d: %s' %
165
                                (module, stream.status_code, stream.content))
166
                return None
167
            finally:
168
                stream.close()
169
170
    def _process_answers(self, t):
171
        if t:
172
            (module, answers) = t
173
            answers = map(Response.from_dict, answers)
174
            return list(map(functools.partial(self._process_answer, module), answers))
175
        else:
176
            return []
177
178
    def _process_answer(self, module, answer):
179
        missing = {'accuracy', 'relevance'} - set(answer.measures)
180
        if missing:
181
            logger.warning('Missing mandatory measures from module %s: %r' %
182
                             (module, missing))
183
        accuracy = answer.measures.get('accuracy', DEFAULT_ACCURACY)
184
        relevance = answer.measures.get('relevance', DEFAULT_RELEVANCE)
185
        if accuracy < 0 or accuracy > 1:
186
            logger.warning('Module %s answered with invalid accuracy: %r' %
187
                    (module, accuracy))
188
            return None
189
        answer.measures['accuracy'] = accuracy
190
        answer.measures['relevance'] = relevance * module.coefficient
191
        return answer
192