Passed
Push — master ( 986f62...fb6d1b )
by Alexander
03:53
created

tcms.telemetry.api._sort_by_failing_rate()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from django.db.models import Count
2
from django.utils.translation import ugettext_lazy as _
3
from modernrpc.core import rpc_method
4
5
from tcms.testcases.models import TestCase, TestCaseStatus
6
from tcms.testruns.models import TestExecution, TestExecutionStatus
7
8
9
@rpc_method(name='Testing.breakdown')
10
def breakdown(query=None):
11
    """
12
    .. function:: XML-RPC Testing.breakdown(query)
13
14
        Perform a search and return the statistics for the selected test cases
15
16
        :param query: Field lookups for :class:`tcms.testcases.models.TestCase`
17
        :type query: dict
18
        :return: Object, containing the statistics for the selected test cases
19
        :rtype: dict
20
    """
21
22
    if query is None:
23
        query = {}
24
25
    test_cases = TestCase.objects.filter(**query).filter()
26
27
    manual_count = test_cases.filter(is_automated=False).count()
28
    automated_count = test_cases.filter(is_automated=True).count()
29
    count = {
30
        'manual': manual_count,
31
        'automated': automated_count,
32
        'all': manual_count + automated_count
33
    }
34
35
    priorities = _get_field_count_map(test_cases, 'priority', 'priority__value')
36
    categories = _get_field_count_map(test_cases, 'category', 'category__name')
37
38
    return {
39
        'count': count,
40
        'priorities': priorities,
41
        'categories': categories,
42
    }
43
44
45
def _get_field_count_map(test_cases, expression, field):
46
    confirmed = TestCaseStatus.get_confirmed()
47
48
    query_set_confirmed = test_cases.filter(
49
        case_status=confirmed
50
    ).values(field).annotate(
51
        count=Count(expression)
52
    )
53
    query_set_not_confirmed = test_cases.exclude(
54
        case_status=confirmed
55
    ).values(field).annotate(
56
        count=Count(expression)
57
    )
58
    return {
59
        confirmed.name: _map_query_set(query_set_confirmed, field),
60
        str(_('OTHER')): _map_query_set(query_set_not_confirmed, field)
61
    }
62
63
64
def _map_query_set(query_set, field):
65
    return {entry[field]: entry['count'] for entry in query_set}
66
67
68
@rpc_method(name='Testing.status_matrix')
69
def status_matrix(query=None):
70
    """
71
        .. function:: XML-RPC Testing.status_matrix(query)
72
73
            Perform a search and return data_set needed to visualize the status matrix
74
            of test plans, test cases and test executions
75
76
            :param query: Field lookups for :class:`tcms.testcases.models.TestPlan`
77
            :type query: dict
78
            :return: List, containing the information about the test executions
79
            :rtype: list
80
        """
81
    if query is None:
82
        query = {}
83
84
    data_set = []
85
    columns = {}
86
    row = {'tc_id': 0}
87
    for test_execution in TestExecution.objects.filter(**query).only(
88
            'case_id', 'run_id', 'case__summary', 'status'
89
    ).order_by('case_id', 'run_id'):
90
91
        columns[test_execution.run.run_id] = test_execution.run.summary
92
        test_execution_response = {
93
            'pk': test_execution.pk,
94
            'class': test_execution.status.color_code(),
95
            'run_id': test_execution.run_id,
96
        }
97
98
        if test_execution.case_id == row['tc_id']:
99
            row['executions'].append(test_execution_response)
100
        else:
101
            data_set.append(row)
102
103
            row = {
104
                'tc_id': test_execution.case_id,
105
                'tc_summary': test_execution.case.summary,
106
                'executions': [test_execution_response],
107
            }
108
109
    # append the last row
110
    data_set.append(row)
111
112
    del data_set[0]
113
114
    return {
115
        'data': data_set,
116
        'columns': columns
117
    }
118
119
120
@rpc_method(name='Testing.execution_trends')
121
def execution_trends(query=None):
122
123
    if query is None:
124
        query = {}
125
126
    data_set = {}
127
    categories = []
128
    colors = []
129
    count = {}
130
131
    for status in TestExecutionStatus.objects.all():
132
        data_set[status.color_code()] = []
133
134
        color = status.color()
135
        if color not in colors:
136
            colors.append(color)
137
138
    run_id = 0
139
    for test_execution in TestExecution.objects.filter(**query).order_by('run_id'):
140
        status = test_execution.status.color_code()
141
142
        if test_execution.run_id == run_id:
143
            if status in count:
144
                count[status] += 1
145
            else:
146
                count[status] = 1
147
148
        else:
149
            _append_status_counts_to_result(count, data_set)
150
151
            count = {}
152
            run_id = test_execution.run_id
153
            categories.append(run_id)
154
155
    # append the last result
156
    _append_status_counts_to_result(count, data_set)
157
158
    for _key, value in data_set.items():
159
        del value[0]
160
161
    return {
162
        'categories': categories,
163
        'data_set': data_set,
164
        'colors': colors
165
    }
166
167
168
def _append_status_counts_to_result(count, result):
169
    for status in TestExecutionStatus.chart_status_names:
170
        status_count = count.get(status, 0)
171
        result.get(status).append(status_count)
172
173
174
@rpc_method(name='Testing.test_case_health')
175
def test_case_health(query=None):
176
177
    if query is None:
178
        query = {}
179
180
    all_test_executions = TestExecution.objects.filter(**query)
181
182
    test_executions = _get_count_for(all_test_executions)
183
    failed_test_executions = _get_count_for(all_test_executions.filter(
184
        status__name__in=TestExecutionStatus.failure_status_names))
185
186
    data = {}
187
    for te in test_executions:
188
        data[te['case_id']] = {
189
            'case_id': te['case_id'],
190
            'case_summary': te['case__summary'],
191
            'count': {
192
                'all': te['count'],
193
                'fail': 0
194
            }
195
        }
196
197
    _count_test_executions(data, failed_test_executions, 'fail')
198
199
    # remove all with 100% success rate, because they are not interesting
200
    _remove_all_excellent_executions(data)
201
202
    data = list(data.values())
203
    data.sort(key=_sort_by_failing_rate, reverse=True)
204
205
    if len(data) > 30:
206
        data = data[:30]
207
208
    return data
209
210
211
def _remove_all_excellent_executions(data):
212
    for key in dict.fromkeys(data):
213
        if data[key]['count']['fail'] == 0:
214
            data.pop(key)
215
216
217
def _count_test_executions(data, test_executions, status):
218
219
    for te in test_executions:
220
        data[te['case_id']]['count'][status] = te['count']
221
222
223
def _sort_by_failing_rate(element):
224
    return element['count']['fail'] / element['count']['all']
225
226
227
def _get_count_for(test_executions):
228
    return test_executions.values(
229
        'case_id', 'case__summary'
230
    ).annotate(count=Count('case_id')).order_by('case_id')
231