Passed
Push — master ( ae226f...980100 )
by Alexander
03:01
created

tcms.rpc.utils   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 215
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 31
eloc 108
dl 0
loc 215
rs 9.92
c 0
b 0
f 0

10 Functions

Rating   Name   Duplication   Size   Complexity  
B pre_check_product() 0 15 6
B _lookup_fields_in_model() 0 38 6
A add_attachment() 0 11 2
A encode_multipart() 0 25 1
A distinct_filter() 0 2 1
A _need_distinct_m2m_rows() 0 12 2
B distinct_m2m_rows() 0 22 6
A get_attachments_for() 0 12 2
A parse_bool_value() 0 8 4
A request_for_upload() 0 29 1
1
# -*- coding: utf-8 -*-
2
3
import io
4
import time
5
from mock import MagicMock
6
7
from attachments.models import Attachment
8
from attachments import views as attachment_views
9
10
from django.http import HttpRequest
11
from django.middleware.csrf import get_token
12
from django.db.models import FieldDoesNotExist
13
14
from tcms.management.models import Product
15
from tcms.core.utils import request_host_link
16
17
18
QUERY_DISTINCT = 1
19
20
ACCEPTABLE_BOOL_VALUES = ('0', '1', 0, 1, True, False)
21
22
23
def parse_bool_value(value):
24
    if value in ACCEPTABLE_BOOL_VALUES:
25
        if value == '0':
26
            return False
27
        if value == '1':
28
            return True
29
        return value
30
    raise ValueError('Unacceptable bool value.')
31
32
33
def pre_check_product(values):
34
    if isinstance(values, dict):
35
        if not values.get('product'):
36
            raise ValueError('No product given.')
37
        product_str = values['product']
38
    else:
39
        product_str = values
40
41
    if isinstance(product_str, str):
42
        if not product_str:
43
            raise ValueError('Got empty product name.')
44
        return Product.objects.get(name=product_str)
45
    if isinstance(product_str, int):
46
        return Product.objects.get(pk=product_str)
47
    raise ValueError('The type of product is not recognizable.')
48
49
50
def _lookup_fields_in_model(cls, fields):
51
    """Lookup ManyToMany fields in current table and related tables. For
52
    distinct duplicate rows when using inner join
53
54
    @param cls: table model class
55
    @type cls: subclass of django.db.models.Model
56
    @param fields: fields in where condition.
57
    @type fields: list
58
    @return: whether use distinct or not
59
    @rtype: bool
60
61
    Example:
62
        cls is TestRun (<class 'tcms.testruns.models.TestRun'>)
63
        fields is 'plan__case__is_automated'
64
                    |     |         |----- Normal Field in TestCase
65
                    |     |--------------- ManyToManyKey in TestPlan
66
                    |--------------------- ForeignKey in TestRun
67
68
    1. plan is a ForeignKey field of TestRun and it will trigger getting the
69
    related model TestPlan by django orm framework.
70
    2. case is a ManyToManyKey field of TestPlan and it will trigger using
71
    INNER JOIN to join TestCase, here will be many duplicated rows.
72
    3. is_automated is a local field of TestCase only filter the rows (where
73
    condition).
74
75
    So this method will find out that case is a m2m field and notice the
76
    outter method use distinct to avoid duplicated rows.
77
    """
78
    for field_name in fields:
79
        try:
80
            field = cls._meta.get_field(field_name)
81
            if field.is_relation and field.many_to_many:
82
                yield True
83
            else:
84
                if getattr(field, 'remote_field', None):
85
                    cls = field.remote_field.model
86
        except FieldDoesNotExist:
87
            pass
88
89
90
def _need_distinct_m2m_rows(cls, fields):
91
    """Check whether the query string has ManyToMany field or not, return
92
    False if the query string is empty.
93
94
    @param cls: table model class
95
    @type cls: subclass of django.db.models.Model
96
    @param fields: fields in where condition.
97
    @type fields: list
98
    @return: whether use distinct or not
99
    @rtype: bool
100
    """
101
    return next(_lookup_fields_in_model(cls, fields), False) if fields else False
102
103
104
def distinct_m2m_rows(cls, values, op_type):
105
    """By django model field looking up syntax, loop values and check the
106
    condition if there is a multi-tables query.
107
108
    @param cls: table model class
109
    @type cls: subclass of django.db.models.Model
110
    @param values: fields in where condition.
111
    @type values: dict
112
    @return: QuerySet
113
    @rtype: django.db.models.query.QuerySet
114
    """
115
    flag = False
116
    for field in values.keys():
117
        if '__' in field:
118
            if _need_distinct_m2m_rows(cls, field.split('__')):
119
                flag = True
120
                break
121
122
    qs = cls.objects.filter(**values)
123
    if op_type == QUERY_DISTINCT:
124
        return qs.distinct() if flag else qs
125
    raise TypeError('Not implement op type %s' % op_type)
126
127
128
def distinct_filter(cls, values):
129
    return distinct_m2m_rows(cls, values, op_type=QUERY_DISTINCT)
130
131
132
def get_attachments_for(request, obj):
133
    host_link = request_host_link(request)
134
    result = []
135
    for attachment in Attachment.objects.attachments_for_object(obj):
136
        result.append({
137
            'pk': attachment.pk,
138
            'url': host_link + attachment.attachment_file.url,
139
            'owner_pk': attachment.creator.pk,
140
            'owner_username': attachment.creator.username,
141
            'date': attachment.created.isoformat(),
142
        })
143
    return result
144
145
146
def encode_multipart(csrf_token, filename, b64content):
147
    """
148
        Build a multipart/form-data body with generated random boundary
149
        suitable for parsing by django.http.request.HttpRequest and
150
        the parser classes related to it!
151
152
        .. note::
153
154
            ``\\r\\n`` are expected! Do not change!
155
    """
156
    boundary = '----------%s' % int(time.time() * 1000)
157
    data = ['--%s' % boundary]
158
159
    data.append('Content-Disposition: form-data; name="csrfmiddlewaretoken"\r\n')
160
    data.append(csrf_token)
161
    data.append('--%s' % boundary)
162
163
    data.append('Content-Disposition: form-data; name="attachment_file"; filename="%s"' % filename)
164
    data.append('Content-Type: application/octet-stream')
165
    data.append('Content-Transfer-Encoding: base64')
166
    data.append('Content-Length: %d\r\n' % len(b64content))
167
    data.append(b64content)
168
169
    data.append('--%s--\r\n' % boundary)
170
    return '\r\n'.join(data), boundary
171
172
173
def request_for_upload(user, filename, b64content):
174
    """
175
        Return a request object containing all fields necessary for file
176
        upload as if it was sent by the browser.
177
    """
178
    request = HttpRequest()
179
    request.user = user
180
    request.method = 'POST'
181
    request.content_type = 'multipart/form-data'
182
    # because attachment.views.add_attachment() calls messages.success()
183
    request._messages = MagicMock()
184
185
    data, boundary = encode_multipart(
186
        get_token(request),
187
        filename,
188
        b64content
189
    )
190
191
    request.META['CONTENT_TYPE'] = 'multipart/form-data; boundary=%s' % boundary
192
    request.META['CONTENT_LENGTH'] = len(data)
193
    request._stream = io.BytesIO(data.encode())
194
195
    # manually parse the input data and populate data attributes
196
    request._read_started = False
197
    request._load_post_and_files()
198
    request.POST = request._post
199
    request.FILES = request._files
200
201
    return request
202
203
204
def add_attachment(obj_id, app_model, user, filename, b64content):
205
    """
206
        High-level function which performs the attachment process
207
        by constructing an HttpRequest object and passing it to
208
        attachments.views.add_attachment() as if it came from the browser.
209
    """
210
    request = request_for_upload(user, filename, b64content)
211
    app, model = app_model.split('.')
212
    response = attachment_views.add_attachment(request, app, model, obj_id)
213
    if response.status_code == 404:
214
        raise Exception("Adding attachment to %s(%d) failed" % (app_model, obj_id))
215