tcms.telemetry.api   A
last analyzed

Complexity

Total Complexity 32

Size/Duplication

Total Lines 265
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 32
eloc 163
dl 0
loc 265
rs 9.84
c 0
b 0
f 0

12 Functions

Rating   Name   Duplication   Size   Complexity  
A _map_query_set() 0 2 1
A breakdown() 0 34 2
A _get_field_count_map() 0 14 1
A status_matrix() 0 51 4
A test_case_health() 0 34 4
A _append_status_counts_to_result() 0 9 2
C execution_trends() 0 58 9
A _remove_all_excellent_executions() 0 4 3
A _get_count_for() 0 5 1
A _count_test_executions() 0 3 2
A individual_test_case_health_simple() 0 14 2
A _sort_by_failing_rate() 0 2 1
1
from django.db.models import Count
2
from django.utils.translation import gettext_lazy as _
3
from modernrpc.auth.basic import http_basic_auth_login_required
4
from modernrpc.core import rpc_method
5
6
from tcms.testcases.models import TestCase
7
from tcms.testruns.models import TestExecution, TestExecutionStatus
8
9
10
@http_basic_auth_login_required
11
@rpc_method(name="Testing.breakdown")
12
def breakdown(query=None):
13
    """
14
    .. function:: RPC Testing.breakdown(query)
15
16
        Perform a search and return the statistics for the selected test cases
17
18
        :param query: Field lookups for :class:`tcms.testcases.models.TestCase`
19
        :type query: dict
20
        :return: Object, containing the statistics for the selected test cases
21
        :rtype: dict
22
    """
23
24
    if query is None:
25
        query = {}
26
27
    test_cases = TestCase.objects.filter(**query).distinct()
28
29
    manual_count = test_cases.filter(is_automated=False).count()
30
    automated_count = test_cases.filter(is_automated=True).count()
31
    count = {
32
        "manual": manual_count,
33
        "automated": automated_count,
34
        "all": manual_count + automated_count,
35
    }
36
37
    priorities = _get_field_count_map(test_cases, "priority", "priority__value")
38
    categories = _get_field_count_map(test_cases, "category", "category__name")
39
40
    return {
41
        "count": count,
42
        "priorities": priorities,
43
        "categories": categories,
44
    }
45
46
47
def _get_field_count_map(test_cases, expression, field):
48
    query_set_confirmed = (
49
        test_cases.filter(case_status__is_confirmed=True)
50
        .values(field)
51
        .annotate(count=Count(expression, distinct=True))
52
    )
53
    query_set_not_confirmed = (
54
        test_cases.exclude(case_status__is_confirmed=True)
55
        .values(field)
56
        .annotate(count=Count(expression, distinct=True))
57
    )
58
    return {
59
        str(_("CONFIRMED")): _map_query_set(query_set_confirmed, field),
60
        str(_("OTHER")): _map_query_set(query_set_not_confirmed, field),
61
    }
62
63
64
def _map_query_set(query_set, field):
65
    return {entry[field]: entry["count"] for entry in query_set}
66
67
68
@http_basic_auth_login_required
69
@rpc_method(name="Testing.status_matrix")
70
def status_matrix(query=None):
71
    """
72
    .. function:: RPC Testing.status_matrix(query)
73
74
        Perform a search and return data_set needed to visualize the status matrix
75
        of test plans, test cases and test executions
76
77
        :param query: Field lookups for :class:`tcms.testcases.models.TestPlan`
78
        :type query: dict
79
        :return: List, containing the information about the test executions
80
        :rtype: list
81
    """
82
    if query is None:
83
        query = {}
84
85
    data_set = []
86
    columns = {}
87
    row = {"tc_id": 0}
88
    for test_execution in (
89
        TestExecution.objects.filter(**query)
90
        .only("case_id", "run_id", "case__summary", "status")
91
        .order_by("case_id", "run_id")
92
    ):
93
94
        columns[test_execution.run_id] = test_execution.run.summary
95
        test_execution_response = {
96
            "pk": test_execution.pk,
97
            "color": test_execution.status.color,
98
            "run_id": test_execution.run_id,
99
            "plan_id": test_execution.run.plan_id,
100
        }
101
102
        if test_execution.case_id == row["tc_id"]:
103
            row["executions"].append(test_execution_response)
104
        else:
105
            data_set.append(row)
106
107
            row = {
108
                "tc_id": test_execution.case_id,
109
                "tc_summary": test_execution.case.summary,
110
                "executions": [test_execution_response],
111
            }
112
113
    # append the last row
114
    data_set.append(row)
115
116
    del data_set[0]
117
118
    return {"data": data_set, "columns": columns}
119
120
121
@http_basic_auth_login_required
122
@rpc_method(name="Testing.execution_trends")
123
def execution_trends(query=None):
124
125
    if query is None:
126
        query = {}
127
128
    data_set = {}
129
    categories = []
130
    colors = []
131
    count = {}
132
133
    for status in TestExecutionStatus.objects.all():
134
        data_set[status.name] = []
135
        colors.append(status.color)
136
    data_set[str(_("TOTAL"))] = []
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable _ does not seem to be defined.
Loading history...
137
    colors.append("black")
138
139
    status_count = {
140
        "positive": 0,
141
        "negative": 0,
142
        "neutral": 0,
143
    }
144
    run_id = 0
145
    for test_execution in TestExecution.objects.filter(**query).order_by("run_id"):
146
        status = test_execution.status
147
148
        if status.weight > 0:
149
            status_count["positive"] += 1
150
        elif status.weight < 0:
151
            status_count["negative"] += 1
152
        else:
153
            status_count["neutral"] += 1
154
155
        if test_execution.run_id == run_id:
156
            if status.name in count:
157
                count[status.name] += 1
158
            else:
159
                count[status.name] = 1
160
161
        else:
162
            _append_status_counts_to_result(count, data_set)
163
164
            count = {status.name: 1}
165
            run_id = test_execution.run_id
166
            categories.append(run_id)
167
168
    # append the last result
169
    _append_status_counts_to_result(count, data_set)
170
171
    for _key, value in data_set.items():
172
        del value[0]
173
174
    return {
175
        "categories": categories,
176
        "data_set": data_set,
177
        "colors": colors,
178
        "status_count": status_count,
179
    }
180
181
182
def _append_status_counts_to_result(count, result):
183
    total = 0
184
    for status in TestExecutionStatus.objects.all():
185
        status_count = count.get(status.name, 0)
186
        result.get(status.name).append(status_count)
187
188
        total += status_count
189
190
    result.get(str(_("TOTAL"))).append(total)
191
192
193
@http_basic_auth_login_required
194
@rpc_method(name="Testing.test_case_health")
195
def test_case_health(query=None):
196
197
    if query is None:
198
        query = {}
199
200
    all_test_executions = TestExecution.objects.filter(**query)
201
202
    test_executions = _get_count_for(all_test_executions)
203
    failed_test_executions = _get_count_for(
204
        all_test_executions.filter(status__weight__lt=0)
205
    )
206
207
    data = {}
208
    for value in test_executions:
209
        data[value["case_id"]] = {
210
            "case_id": value["case_id"],
211
            "case_summary": value["case__summary"],
212
            "count": {"all": value["count"], "fail": 0},
213
        }
214
215
    _count_test_executions(data, failed_test_executions, "fail")
216
217
    # remove all with 100% success rate, because they are not interesting
218
    _remove_all_excellent_executions(data)
219
220
    data = list(data.values())
221
    data.sort(key=_sort_by_failing_rate, reverse=True)
222
223
    if len(data) > 30:
224
        data = data[:30]
225
226
    return data
227
228
229
@http_basic_auth_login_required
230
@rpc_method(name="Testing.individual_test_case_health")
231
def individual_test_case_health_simple(query=None):
232
233
    if query is None:
234
        query = {}
235
236
    res = (
237
        TestExecution.objects.filter(**query)
238
        .values("run__plan", "case_id", "status__name", "status__weight")
239
        .order_by("case", "run__plan", "status__weight")
240
    )
241
242
    return list(res)
243
244
245
def _remove_all_excellent_executions(data):
246
    for key in dict.fromkeys(data):
247
        if data[key]["count"]["fail"] == 0:
248
            data.pop(key)
249
250
251
def _count_test_executions(data, test_executions, status):
252
    for value in test_executions:
253
        data[value["case_id"]]["count"][status] = value["count"]
254
255
256
def _sort_by_failing_rate(element):
257
    return element["count"]["fail"] / element["count"]["all"]
258
259
260
def _get_count_for(test_executions):
261
    return (
262
        test_executions.values("case_id", "case__summary")
263
        .annotate(count=Count("case_id"))
264
        .order_by("case_id")
265
    )
266