1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
|
3
|
|
|
import io |
4
|
|
|
import time |
5
|
|
|
from mock import MagicMock |
6
|
|
|
|
7
|
|
|
from attachments.models import Attachment |
8
|
|
|
from attachments import views as attachment_views |
9
|
|
|
|
10
|
|
|
from django.http import HttpRequest |
11
|
|
|
from django.middleware.csrf import get_token |
12
|
|
|
from django.db.models import FieldDoesNotExist |
13
|
|
|
|
14
|
|
|
from tcms.management.models import Product |
15
|
|
|
from tcms.core.utils import request_host_link |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
QUERY_DISTINCT = 1 |
19
|
|
|
|
20
|
|
|
ACCEPTABLE_BOOL_VALUES = ('0', '1', 0, 1, True, False) |
21
|
|
|
|
22
|
|
|
|
23
|
|
|
def parse_bool_value(value): |
24
|
|
|
if value in ACCEPTABLE_BOOL_VALUES: |
25
|
|
|
if value == '0': |
26
|
|
|
return False |
27
|
|
|
if value == '1': |
28
|
|
|
return True |
29
|
|
|
return value |
30
|
|
|
raise ValueError('Unacceptable bool value.') |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
def pre_check_product(values): |
34
|
|
|
if isinstance(values, dict): |
35
|
|
|
if not values.get('product'): |
36
|
|
|
raise ValueError('No product given.') |
37
|
|
|
product_str = values['product'] |
38
|
|
|
else: |
39
|
|
|
product_str = values |
40
|
|
|
|
41
|
|
|
if isinstance(product_str, str): |
42
|
|
|
if not product_str: |
43
|
|
|
raise ValueError('Got empty product name.') |
44
|
|
|
return Product.objects.get(name=product_str) |
45
|
|
|
if isinstance(product_str, int): |
46
|
|
|
return Product.objects.get(pk=product_str) |
47
|
|
|
raise ValueError('The type of product is not recognizable.') |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
def _lookup_fields_in_model(cls, fields): |
51
|
|
|
"""Lookup ManyToMany fields in current table and related tables. For |
52
|
|
|
distinct duplicate rows when using inner join |
53
|
|
|
|
54
|
|
|
@param cls: table model class |
55
|
|
|
@type cls: subclass of django.db.models.Model |
56
|
|
|
@param fields: fields in where condition. |
57
|
|
|
@type fields: list |
58
|
|
|
@return: whether use distinct or not |
59
|
|
|
@rtype: bool |
60
|
|
|
|
61
|
|
|
Example: |
62
|
|
|
cls is TestRun (<class 'tcms.testruns.models.TestRun'>) |
63
|
|
|
fields is 'plan__case__is_automated' |
64
|
|
|
| | |----- Normal Field in TestCase |
65
|
|
|
| |--------------- ManyToManyKey in TestPlan |
66
|
|
|
|--------------------- ForeignKey in TestRun |
67
|
|
|
|
68
|
|
|
1. plan is a ForeignKey field of TestRun and it will trigger getting the |
69
|
|
|
related model TestPlan by django orm framework. |
70
|
|
|
2. case is a ManyToManyKey field of TestPlan and it will trigger using |
71
|
|
|
INNER JOIN to join TestCase, here will be many duplicated rows. |
72
|
|
|
3. is_automated is a local field of TestCase only filter the rows (where |
73
|
|
|
condition). |
74
|
|
|
|
75
|
|
|
So this method will find out that case is a m2m field and notice the |
76
|
|
|
outter method use distinct to avoid duplicated rows. |
77
|
|
|
""" |
78
|
|
|
for field_name in fields: |
79
|
|
|
try: |
80
|
|
|
field = cls._meta.get_field(field_name) |
81
|
|
|
if field.is_relation and field.many_to_many: |
82
|
|
|
yield True |
83
|
|
|
else: |
84
|
|
|
if getattr(field, 'remote_field', None): |
85
|
|
|
cls = field.remote_field.model |
86
|
|
|
except FieldDoesNotExist: |
87
|
|
|
pass |
88
|
|
|
|
89
|
|
|
|
90
|
|
|
def _need_distinct_m2m_rows(cls, fields): |
91
|
|
|
"""Check whether the query string has ManyToMany field or not, return |
92
|
|
|
False if the query string is empty. |
93
|
|
|
|
94
|
|
|
@param cls: table model class |
95
|
|
|
@type cls: subclass of django.db.models.Model |
96
|
|
|
@param fields: fields in where condition. |
97
|
|
|
@type fields: list |
98
|
|
|
@return: whether use distinct or not |
99
|
|
|
@rtype: bool |
100
|
|
|
""" |
101
|
|
|
return next(_lookup_fields_in_model(cls, fields), False) if fields else False |
102
|
|
|
|
103
|
|
|
|
104
|
|
|
def distinct_m2m_rows(cls, values, op_type): |
105
|
|
|
"""By django model field looking up syntax, loop values and check the |
106
|
|
|
condition if there is a multi-tables query. |
107
|
|
|
|
108
|
|
|
@param cls: table model class |
109
|
|
|
@type cls: subclass of django.db.models.Model |
110
|
|
|
@param values: fields in where condition. |
111
|
|
|
@type values: dict |
112
|
|
|
@return: QuerySet |
113
|
|
|
@rtype: django.db.models.query.QuerySet |
114
|
|
|
""" |
115
|
|
|
flag = False |
116
|
|
|
for field in values.keys(): |
117
|
|
|
if '__' in field: |
118
|
|
|
if _need_distinct_m2m_rows(cls, field.split('__')): |
119
|
|
|
flag = True |
120
|
|
|
break |
121
|
|
|
|
122
|
|
|
qs = cls.objects.filter(**values) |
123
|
|
|
if op_type == QUERY_DISTINCT: |
124
|
|
|
return qs.distinct() if flag else qs |
125
|
|
|
raise TypeError('Not implement op type %s' % op_type) |
126
|
|
|
|
127
|
|
|
|
128
|
|
|
def distinct_filter(cls, values): |
129
|
|
|
return distinct_m2m_rows(cls, values, op_type=QUERY_DISTINCT) |
130
|
|
|
|
131
|
|
|
|
132
|
|
|
def get_attachments_for(request, obj): |
133
|
|
|
host_link = request_host_link(request) |
134
|
|
|
result = [] |
135
|
|
|
for attachment in Attachment.objects.attachments_for_object(obj): |
136
|
|
|
result.append({ |
137
|
|
|
'pk': attachment.pk, |
138
|
|
|
'url': host_link + attachment.attachment_file.url, |
139
|
|
|
'owner_pk': attachment.creator.pk, |
140
|
|
|
'owner_username': attachment.creator.username, |
141
|
|
|
'date': attachment.created.isoformat(), |
142
|
|
|
}) |
143
|
|
|
return result |
144
|
|
|
|
145
|
|
|
|
146
|
|
|
def encode_multipart(csrf_token, filename, b64content): |
147
|
|
|
""" |
148
|
|
|
Build a multipart/form-data body with generated random boundary |
149
|
|
|
suitable for parsing by django.http.request.HttpRequest and |
150
|
|
|
the parser classes related to it! |
151
|
|
|
|
152
|
|
|
.. note:: |
153
|
|
|
|
154
|
|
|
``\\r\\n`` are expected! Do not change! |
155
|
|
|
""" |
156
|
|
|
boundary = '----------%s' % int(time.time() * 1000) |
157
|
|
|
data = ['--%s' % boundary] |
158
|
|
|
|
159
|
|
|
data.append('Content-Disposition: form-data; name="csrfmiddlewaretoken"\r\n') |
160
|
|
|
data.append(csrf_token) |
161
|
|
|
data.append('--%s' % boundary) |
162
|
|
|
|
163
|
|
|
data.append('Content-Disposition: form-data; name="attachment_file"; filename="%s"' % filename) |
164
|
|
|
data.append('Content-Type: application/octet-stream') |
165
|
|
|
data.append('Content-Transfer-Encoding: base64') |
166
|
|
|
data.append('Content-Length: %d\r\n' % len(b64content)) |
167
|
|
|
data.append(b64content) |
168
|
|
|
|
169
|
|
|
data.append('--%s--\r\n' % boundary) |
170
|
|
|
return '\r\n'.join(data), boundary |
171
|
|
|
|
172
|
|
|
|
173
|
|
|
def request_for_upload(user, filename, b64content): |
174
|
|
|
""" |
175
|
|
|
Return a request object containing all fields necessary for file |
176
|
|
|
upload as if it was sent by the browser. |
177
|
|
|
""" |
178
|
|
|
request = HttpRequest() |
179
|
|
|
request.user = user |
180
|
|
|
request.method = 'POST' |
181
|
|
|
request.content_type = 'multipart/form-data' |
182
|
|
|
# because attachment.views.add_attachment() calls messages.success() |
183
|
|
|
request._messages = MagicMock() |
184
|
|
|
|
185
|
|
|
data, boundary = encode_multipart( |
186
|
|
|
get_token(request), |
187
|
|
|
filename, |
188
|
|
|
b64content |
189
|
|
|
) |
190
|
|
|
|
191
|
|
|
request.META['CONTENT_TYPE'] = 'multipart/form-data; boundary=%s' % boundary |
192
|
|
|
request.META['CONTENT_LENGTH'] = len(data) |
193
|
|
|
request._stream = io.BytesIO(data.encode()) |
194
|
|
|
|
195
|
|
|
# manually parse the input data and populate data attributes |
196
|
|
|
request._read_started = False |
197
|
|
|
request._load_post_and_files() |
198
|
|
|
request.POST = request._post |
199
|
|
|
request.FILES = request._files |
200
|
|
|
|
201
|
|
|
return request |
202
|
|
|
|
203
|
|
|
|
204
|
|
|
def add_attachment(obj_id, app_model, user, filename, b64content): |
205
|
|
|
""" |
206
|
|
|
High-level function which performs the attachment process |
207
|
|
|
by constructing an HttpRequest object and passing it to |
208
|
|
|
attachments.views.add_attachment() as if it came from the browser. |
209
|
|
|
""" |
210
|
|
|
request = request_for_upload(user, filename, b64content) |
211
|
|
|
app, model = app_model.split('.') |
212
|
|
|
response = attachment_views.add_attachment(request, app, model, obj_id) |
213
|
|
|
if response.status_code == 404: |
214
|
|
|
raise Exception("Adding attachment to %s(%d) failed" % (app_model, obj_id)) |
215
|
|
|
|