tests.LoggerTest.testLog()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5
Metric Value
cc 5
dl 0
loc 19
ccs 17
cts 17
cp 1
crap 5
rs 8.5455
1
"""Test HTTP capabilities of the logger."""
2
3 1
import json
4 1
import sqlite3
5 1
import tempfile
6
7 1
from ppp_logger.logger import make_responses_forest, freeze
8 1
from ppp_logger import app
9 1
from ppp_libmodule.tests import PPPTestCase
10
11 1
R = lambda x:{'type': 'resource', 'value': x}
12 1
def to_trace(x): # Copy object and remove trace
13 1
    y = x.copy() # Shallow copy
14 1
    y['module'] = str(len(y['tree']['value']))
15 1
    del y['trace']
16 1
    return y
17 1
def striptraces(obj):
18 1
    if isinstance(obj, list):
19 1
        return list(map(striptraces, obj))
20 1
    elif isinstance(obj, dict):
21 1
        return {x:striptraces(y) for (x,y) in obj.items() if x != 'trace'}
22 1
    elif isinstance(obj, tuple):
23 1
        return tuple(map(striptraces, obj))
24
    else:
25 1
        return obj
26 1
m = lambda x:{'tree': R(x), 'measures': {}}
27
28 1
def build_responses():
29
    """
30
    one
31
    +- two
32
    |  +- three
33
    |  |  +- four
34
    |  +- five
35
    +- six
36
    |  +- seven
37
    eight
38
    nine
39
    """
40 1
    one   = {'tree': R('one'),   'measures': {}, 'trace': []}
41 1
    two   = {'tree': R('two'),   'measures': {}, 'trace': [to_trace(one)]}
42 1
    three = {'tree': R('three'), 'measures': {}, 'trace': [to_trace(one), to_trace(two)]}
43 1
    four  = {'tree': R('four'),  'measures': {}, 'trace': [to_trace(one), to_trace(two), to_trace(three)]}
44 1
    five  = {'tree': R('five'),  'measures': {}, 'trace': [to_trace(one), to_trace(two)]}
45 1
    six   = {'tree': R('six'),   'measures': {}, 'trace': [to_trace(one)]}
46 1
    seven = {'tree': R('seven'), 'measures': {}, 'trace': [to_trace(one), to_trace(six)]}
47 1
    eight = {'tree': R('eight'), 'measures': {}, 'trace': []}
48 1
    nine  = {'tree': R('nine'),  'measures': {}, 'trace': []}
49 1
    L = [one, two, three, four, five, six, seven, eight, nine]
50 1
    for x in L:
51 1
        x['trace'].append({'tree': x['tree'], 'measures': x['measures'],
52
                           'module': str(len(x['tree']['value']))})
53 1
    return L
54
55 1
def notrace(obj):
56 1
    if isinstance(obj, list) or isinstance(obj, tuple):
57 1
        return all(map(notrace, obj))
58 1
    elif isinstance(obj, dict):
59 1
        return 'trace' not in obj and all(map(notrace, obj))
60
    else:
61 1
        return True
62
63 1
class LoggerTest(PPPTestCase(app)):
64 1
    config_var = 'PPP_LOGGER_CONFIG'
65 1
    def setUp(self):
66 1
        self.fd = tempfile.NamedTemporaryFile('w+')
67 1
        self.config = '{"database_url": "sqlite:///%s"}' % self.fd.name
68 1
        super(LoggerTest, self).setUp()
69 1
    def tearDown(self):
70 1
        super(LoggerTest, self).tearDown()
71 1
        self.fd.close()
72 1
    def testTree(self):
73
        # Only the last assertEqual would be enough, actually.
74
        # But the previous assertions makes it easier to debug (more precise
75
        # informations on was is going wrong).
76 1
        tree = make_responses_forest(build_responses())
77 1
        tree = striptraces(tree)
78 1
        self.assertTrue(notrace(tree), tree)
79 1
        self.assertEqual(len(tree), 3, [x['tree']['value'] for (x,y) in tree])
80 1
        self.assertEqual(tree, [
81
            (m('one'), [
82
                (m('two'), [
83
                    (m('three'), [
84
                        (m('four'), [
85
                        ]),
86
                    ]),
87
                    (m('five'), [
88
                    ]),
89
                ]),
90
                (m('six'), [
91
                    (m('seven'), [
92
                    ]),
93
                ]),
94
            ]),
95
            (m('eight'), [
96
            ]),
97
            (m('nine'), [
98
            ]),
99
        ])
100
101 1
    def testLog(self):
102 1
        question = 'question?'
103 1
        responses = build_responses()
104 1
        q = {'id': 'foo', 'question': question,
105
             'responses': responses}
106 1
        self.assertStatusInt(q, 200)
107 1
        conn = sqlite3.connect(self.fd.name)
108 1
        with conn:
109 1
            r = conn.execute('SELECT response_id, parent_response_id, response_tree FROM responses;').fetchall()
110 1
            fields = ('id', 'parent', 'tree')
111 1
            zipper = lambda x:(x[0], {'id': x[0], 'parent': x[1],
112
                                      'tree': json.loads(x[2])})
113 1
            r = dict(map(zipper, r))
114 1
            self.assertEqual(len(r), 9, r)
115 1
            for x in r.values():
116 1
                if x['tree']['value'] in ('three', 'five'):
117 1
                    self.assertNotEqual(x['parent'], None)
118 1
                    parent = r[x['parent']]
119 1
                    self.assertEqual(parent['tree']['value'], 'two')
120
121 1
    def testTraceItemMissingFromResponses(self):
122
        # This was a bug caused by the WebUI putting the input in the trace
123
        # without putting it as a response
124 1
        i = r'''{"id":"1417024784682-682-12-webui","question":"sqrt(2)","responses":[{"measures":{"accuracy":0.5,"relevance":0.1},"trace":[{"measures":{"accuracy":1,"relevance":0},"module":"input","tree":{"type":"sentence","value":"sqrt(2)"}},{"measures":{"accuracy":0.5,"relevance":0.1},"module":"spell-checker","tree":{"value":"sq rt 2","type":"sentence"}}],"tree":{"value":"sq rt 2","type":"sentence"},"language":"en"}]}'''
125 1
        q = json.loads(i)
126
        self.assertStatusInt(q, 200)
127