Passed
Push — master ( f42093...226b52 )
by Ramon
04:24
created

DefaultReferenceWidgetVocabulary.catalog_name()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
import six
23
24
from zope.interface import implements
25
26
from bika.lims import api
27
from bika.lims import logger
28
from bika.lims.interfaces import IReferenceWidgetVocabulary
29
from bika.lims.utils import get_client
30
from bika.lims.utils import to_unicode as _u
31
from bika.lims.utils import to_utf8 as _c
32
33
34
class DefaultReferenceWidgetVocabulary(object):
35
    implements(IReferenceWidgetVocabulary)
36
37
    def __init__(self, context, request):
38
        self.context = context
39
        self.request = request
40
41
    @property
42
    def search_fields(self):
43
        """Returns the object field names to search against
44
        """
45
        search_fields = self.request.get("search_fields", None)
46
        if not search_fields:
47
            return []
48
49
        search_fields = json.loads(_u(search_fields))
50
        return search_fields
51
52
    @property
53
    def search_field(self):
54
        """Returns the field name to search for
55
        """
56
        search_fields = self.search_fields
57
        if not search_fields:
58
            return "Title"
59
        return search_fields[0]
60
61
    @property
62
    def search_term(self):
63
        """Returns the search term
64
        """
65
        search_term = _c(self.request.get("searchTerm", ""))
66
        return search_term.lower().strip()
67
68
    @property
69
    def minimum_length(self):
70
        """Minimum required length of the search term
71
        """
72
        min_length = self.request.get("minLength", 0)
73
        return api.to_int(min_length, 0)
74
75
    @property
76
    def force_all(self):
77
        """Returns whether all records must be displayed if no match is found
78
        """
79
        force_all = self.request.get("force_all", "").lower()
80
        return force_all in ["1", "true"] or False
81
82
    @property
83
    def catalog_name(self):
84
        """Returns the catalog name to be used for the search
85
        """
86
        catalog_name = self.request.get("catalog_name", None)
87
        return catalog_name or "portal_catalog"
88
89
    @property
90
    def base_query(self):
91
        """Returns the base query to use. This is, the query with the basic
92
        filtering criteria to be used as the baseline. Search criterias defined
93
        in base_query are restricive (AND statments, not included in OR-like)
94
        """
95
        return self.get_query_from_request("base_query")
96
97
    @property
98
    def search_query(self):
99
        """Returns the search query.
100
        """
101
        return self.get_query_from_request("search_query")
102
103
    def to_utf8(self, data):
104
        """
105
        Convert unicode values to strings even if they belong to lists or dicts.
106
        :param data: an object.
107
        :return: The object with all unicode values converted to string.
108
        """
109
        # if this is a unicode string, return its string representation
110
        if isinstance(data, unicode):
111
            return data.encode('utf-8')
112
113
        # if this is a list of values, return list of string values
114
        if isinstance(data, list):
115
            return [self.to_utf8(item) for item in data]
116
117
        # if this is a dictionary, return dictionary of string keys and values
118
        if isinstance(data, dict):
119
            return {
120
                self.to_utf8(key): self.to_utf8(value)
121
                for key, value in data.iteritems()
122
            }
123
124
        # if it's anything else, return it in its original form
125
        return data
126
127
    def get_query_from_request(self, name):
128
        """Returns the query inferred from the request
129
        """
130
        query = self.request.get(name, "{}")
131
        # json.loads does unicode conversion, which will fail in the catalog
132
        # search for some cases. So we need to convert the strings to utf8
133
        # https://github.com/senaite/senaite.core/issues/443
134
        query = json.loads(query)
135
        return self.to_utf8(query)
136
137
    def get_raw_query(self):
138
        """Returns the raw query to use for current search, based on the
139
        base query + update query
140
        """
141
        query = self.base_query.copy()
142
        search_query = self.search_query.copy()
143
        query.update(search_query)
144
145
        # Add sorting criteria
146
        sorting = self.resolve_sorting(query)
147
        query.update(sorting)
148
149
        # Check if sort_on is an index and if is sortable. Otherwise, assume
150
        # the sorting must be done manually
151
        catalog = api.get_tool(self.catalog_name)
152
        sort_on = query.get("sort_on", None)
153
        if sort_on and not self.is_sortable_index(sort_on, catalog):
154
            del(query["sort_on"])
155
        return query
156
157
    def resolve_sorting(self, query):
158
        """Resolves the sorting criteria for the given query
159
        """
160
        sorting = {}
161
162
        # Sort on
163
        sort_on = query.get("sidx", None)
164
        sort_on = sort_on or query.get("sort_on", None)
165
        sort_on = sort_on == "Title" and "sortable_title" or sort_on
166
        if sort_on:
167
            sorting["sort_on"] = sort_on
168
169
            # Sort order
170
            sort_order = query.get("sord", None)
171
            sort_order = sort_order or query.get("sort_order", None)
172
            if sort_order in ["desc", "reverse", "rev", "descending"]:
173
                sorting["sort_order"] = "descending"
174
            else:
175
                sorting["sort_order"] = "ascending"
176
177
            # Sort limit
178
            sort_limit = query.get("sort_limit", query.get("limit"))
179
            sort_limit = api.to_int(sort_limit, default=30)
180
            if sort_limit:
181
                sorting["sort_limit"] = sort_limit
182
183
        return sorting
184
185
    def is_sortable_index(self, index_name, catalog):
186
        """Returns whether the index is sortable
187
        """
188
        index = self.get_index(index_name, catalog)
189
        if not index:
190
            return False
191
        return index.meta_type in ["FieldIndex", "DateIndex"]
192
193
    def get_index(self, field_name, catalog):
194
        """Returns the index of the catalog for the given field_name, if any
195
        """
196
        index = catalog.Indexes.get(field_name, None)
197
        if not index and field_name == "Title":
198
            # Legacy
199
            return self.get_index("sortable_title", catalog)
200
        return index
201
202
    def search(self, query, search_term, search_field, catalog):
203
        """Performs a search against the catalog and returns the brains
204
        """
205
        logger.info("Reference Widget Catalog: {}".format(catalog.id))
206
        if not search_term:
207
            return catalog(query)
208
209
        index = self.get_index(search_field, catalog)
210
        if not index:
211
            logger.warn("*** Index not found: '{}'".format(search_field))
212
            return []
213
214
        meta = index.meta_type
215
        if meta == "TextIndexNG3":
216
            query[index.id] = "{}*".format(search_term)
217
218
        elif meta == "ZCTextIndex":
219
            logger.warn("*** Field '{}' ({}). Better use TextIndexNG3"
220
                        .format(meta, search_field))
221
            query[index.id] = "{}*".format(search_term)
222
223
        elif meta in ["FieldIndex", "KeywordIndex"]:
224
            logger.warn("*** Field '{}' ({}). Better use TextIndexNG3"
225
                        .format(meta, search_field))
226
            query[index.id] = search_term
227
228
        else:
229
            logger.warn("*** Index '{}' ({}) not supported"
230
                        .format(search_field, meta))
231
            return []
232
233
        logger.info("Reference Widget Query: {}".format(repr(query)))
234
        return catalog(query)
235
236
    def __call__(self):
237
        # If search term, check if its length is above the minLength
238
        search_term = self.search_term
239
        if search_term and len(search_term) < self.minimum_length:
240
            return []
241
242
        # Get the raw query to use
243
        # Raw query is built from base query baseline, including additional
244
        # parameters defined in the request and the search query as well
245
        query = self.get_raw_query()
246
        if not query:
247
            return []
248
249
        # Do the search
250
        logger.info("Reference Widget Raw Query: {}".format(repr(query)))
251
        catalog = api.get_tool(self.catalog_name)
252
        brains = self.search(query, search_term, self.search_field, catalog)
253
254
        # If no matches, then just base_query alone ("show all if no match")
255
        if not brains and self.force_all:
256
            query = self.base_query.copy()
257
            sorting = self.resolve_sorting(query)
258
            query.update(sorting)
259
            brains = catalog(query)
260
261
        logger.info("Returned objects: {}".format(len(brains)))
262
        return brains
263
264
265
class ClientAwareReferenceWidgetVocabulary(DefaultReferenceWidgetVocabulary):
266
    """Injects search criteria (filters) in the query when the current context
267
    is, belongs or is associated to a Client
268
    """
269
270
    # portal_types that might be bound to a client
271
    client_bound_types = [
272
        "Contact",
273
        "Batch",
274
        "AnalysisProfile",
275
        "AnalysisSpec",
276
        "ARTemplate",
277
        "SamplePoint"
278
    ]
279
280
    def get_raw_query(self):
281
        """Returns the raw query to use for current search, based on the
282
        base query + update query
283
        """
284
        query = super(
285
            ClientAwareReferenceWidgetVocabulary, self).get_raw_query()
286
287
        if self.is_client_aware(query):
288
289
            client = get_client(self.context)
290
            client_uid = client and api.get_uid(client) or None
291
292
            if client_uid:
293
                # Apply the search criteria for this client
294
                # Contact is the only object bound to a Client that is stored
295
                # in portal_catalog. And in this catalog, getClientUID does not
296
                # exist, rather getParentUID
297
                if "Contact" in self.get_portal_types(query):
298
                    query["getParentUID"] = [client_uid]
299
                else:
300
                    query["getClientUID"] = [client_uid, ""]
301
302
        return query
303
304
    def is_client_aware(self, query):
305
        """Returns whether the query passed in requires a filter by client
306
        """
307
        portal_types = self.get_portal_types(query)
308
        intersect = set(portal_types).intersection(self.client_bound_types)
309
        return len(intersect) > 0
310
311
    def get_portal_types(self, query):
312
        """Return the list of portal types from the query passed-in
313
        """
314
        portal_types = query.get("portal_type", [])
315
        if isinstance(portal_types, six.string_types):
316
            portal_types = [portal_types]
317
        return portal_types
318