Passed
Push — master ( 0b6604...4c3d35 )
by Ramon
04:28
created

DefaultReferenceWidgetVocabulary.__call__()   B

Complexity

Conditions 6

Size

Total Lines 27
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 27
rs 8.6166
c 0
b 0
f 0
cc 6
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
23
from bika.lims import api
24
from bika.lims import logger
25
from bika.lims.interfaces import IReferenceWidgetVocabulary, IAnalysisRequest
26
from bika.lims.utils import to_unicode as _u
27
from bika.lims.utils import to_utf8 as _c
28
from zope.interface import implements
29
30
31
class DefaultReferenceWidgetVocabulary(object):
32
    implements(IReferenceWidgetVocabulary)
33
34
    def __init__(self, context, request):
35
        self.context = context
36
        self.request = request
37
38
    @property
39
    def search_fields(self):
40
        """Returns the object field names to search against
41
        """
42
        search_fields = self.request.get("search_fields", None)
43
        if not search_fields:
44
            return []
45
46
        search_fields = json.loads(_u(search_fields))
47
        return search_fields
48
49
    @property
50
    def search_field(self):
51
        """Returns the field name to search for
52
        """
53
        search_fields = self.search_fields
54
        if not search_fields:
55
            return "Title"
56
        return search_fields[0]
57
58
    @property
59
    def search_term(self):
60
        """Returns the search term
61
        """
62
        search_term = _c(self.request.get("searchTerm", ""))
63
        return search_term.lower().strip()
64
65
    @property
66
    def minimum_length(self):
67
        """Minimum required length of the search term
68
        """
69
        min_length = self.request.get("minLength", 0)
70
        return api.to_int(min_length, 0)
71
72
    @property
73
    def force_all(self):
74
        """Returns whether all records must be displayed if no match is found
75
        """
76
        force_all = self.request.get("force_all", "").lower()
77
        return force_all in ["1", "true"] or False
78
79
    @property
80
    def catalog_name(self):
81
        """Returns the catalog name to be used for the search
82
        """
83
        catalog_name = self.request.get("catalog_name", None)
84
        return catalog_name or "portal_catalog"
85
86
    @property
87
    def base_query(self):
88
        """Returns the base query to use. This is, the query with the basic
89
        filtering criteria to be used as the baseline. Search criterias defined
90
        in base_query are restricive (AND statments, not included in OR-like)
91
        """
92
        return self.get_query_from_request("base_query")
93
94
    @property
95
    def search_query(self):
96
        """Returns the search query.
97
        """
98
        return self.get_query_from_request("search_query")
99
100
    def to_utf8(self, data):
101
        """
102
        Convert unicode values to strings even if they belong to lists or dicts.
103
        :param data: an object.
104
        :return: The object with all unicode values converted to string.
105
        """
106
        # if this is a unicode string, return its string representation
107
        if isinstance(data, unicode):
108
            return data.encode('utf-8')
109
110
        # if this is a list of values, return list of string values
111
        if isinstance(data, list):
112
            return [self.to_utf8(item) for item in data]
113
114
        # if this is a dictionary, return dictionary of string keys and values
115
        if isinstance(data, dict):
116
            return {
117
                self.to_utf8(key): self.to_utf8(value)
118
                for key, value in data.iteritems()
119
            }
120
121
        # if it's anything else, return it in its original form
122
        return data
123
124
    def get_query_from_request(self, name):
125
        """Returns the query inferred from the request
126
        """
127
        query = self.request.get(name, "{}")
128
        # json.loads does unicode conversion, which will fail in the catalog
129
        # search for some cases. So we need to convert the strings to utf8
130
        # https://github.com/senaite/senaite.core/issues/443
131
        query = json.loads(query)
132
        return self.to_utf8(query)
133
134
    def get_raw_query(self):
135
        """Returns the raw query to use for current search, based on the
136
        base query + update query
137
        """
138
        query = self.base_query.copy()
139
        search_query = self.search_query.copy()
140
        query.update(search_query)
141
142
        # Add sorting criteria
143
        sorting = self.resolve_sorting(query)
144
        query.update(sorting)
145
146
        # Check if sort_on is an index and if is sortable. Otherwise, assume
147
        # the sorting must be done manually
148
        catalog = api.get_tool(self.catalog_name)
149
        sort_on = query.get("sort_on", None)
150
        if sort_on and not self.is_sortable_index(sort_on, catalog):
151
            del(query["sort_on"])
152
        return query
153
154
    def resolve_sorting(self, query):
155
        """Resolves the sorting criteria for the given query
156
        """
157
        sorting = {}
158
159
        # Sort on
160
        sort_on = query.get("sidx", None)
161
        sort_on = sort_on or query.get("sort_on", None)
162
        sort_on = sort_on == "Title" and "sortable_title" or sort_on
163
        if sort_on:
164
            sorting["sort_on"] = sort_on
165
166
            # Sort order
167
            sort_order = query.get("sord", None)
168
            sort_order = sort_order or query.get("sort_order", None)
169
            if sort_order in ["desc", "reverse", "rev", "descending"]:
170
                sorting["sort_order"] = "descending"
171
            else:
172
                sorting["sort_order"] = "ascending"
173
174
            # Sort limit
175
            sort_limit = query.get("sort_limit", query.get("limit"))
176
            sort_limit = api.to_int(sort_limit, default=30)
177
            if sort_limit:
178
                sorting["sort_limit"] = sort_limit
179
180
        return sorting
181
182
    def is_sortable_index(self, index_name, catalog):
183
        """Returns whether the index is sortable
184
        """
185
        index = self.get_index(index_name, catalog)
186
        if not index:
187
            return False
188
        return index.meta_type in ["FieldIndex", "DateIndex"]
189
190
    def get_index(self, field_name, catalog):
191
        """Returns the index of the catalog for the given field_name, if any
192
        """
193
        index = catalog.Indexes.get(field_name, None)
194
        if not index and field_name == "Title":
195
            # Legacy
196
            return self.get_index("sortable_title", catalog)
197
        return index
198
199
    def search(self, query, search_term, search_field, catalog):
200
        """Performs a search against the catalog and returns the brains
201
        """
202
        logger.info("Reference Widget Catalog: {}".format(catalog.id))
203
        if not search_term:
204
            return catalog(query)
205
206
        index = self.get_index(search_field, catalog)
207
        if not index:
208
            logger.warn("*** Index not found: '{}'".format(search_field))
209
            return []
210
211
        meta = index.meta_type
212
        if meta == "TextIndexNG3":
213
            query[index.id] = "{}*".format(search_term)
214
215
        elif meta == "ZCTextIndex":
216
            logger.warn("*** Field '{}' ({}). Better use TextIndexNG3"
217
                        .format(meta, search_field))
218
            query[index.id] = "{}*".format(search_term)
219
220
        elif meta in ["FieldIndex", "KeywordIndex"]:
221
            logger.warn("*** Field '{}' ({}). Better use TextIndexNG3"
222
                        .format(meta, search_field))
223
            query[index.id] = search_term
224
225
        else:
226
            logger.warn("*** Index '{}' ({}) not supported"
227
                        .format(search_field, meta))
228
            return []
229
230
        logger.info("Reference Widget Query: {}".format(repr(query)))
231
        return catalog(query)
232
233
    def __call__(self):
234
        # If search term, check if its length is above the minLength
235
        search_term = self.search_term
236
        if search_term and len(search_term) < self.minimum_length:
237
            return []
238
239
        # Get the raw query to use
240
        # Raw query is built from base query baseline, including additional
241
        # parameters defined in the request and the search query as well
242
        query = self.get_raw_query()
243
        if not query:
244
            return []
245
246
        # Do the search
247
        logger.info("Reference Widget Raw Query: {}".format(repr(query)))
248
        catalog = api.get_tool(self.catalog_name)
249
        brains = self.search(query, search_term, self.search_field, catalog)
250
251
        # If no matches, then just base_query alone ("show all if no match")
252
        if not brains and self.force_all:
253
            query = self.base_query.copy()
254
            sorting = self.resolve_sorting(query)
255
            query.update(sorting)
256
            brains = catalog(query)
257
258
        logger.info("Returned objects: {}".format(len(brains)))
259
        return brains
260
261
262
class BatchReferenceWidgetVocabulary(DefaultReferenceWidgetVocabulary):
263
264
    def get_raw_query(self):
265
        """Returns the raw query to use for current search, based on the
266
        base query + update query
267
        """
268
        query = super(BatchReferenceWidgetVocabulary, self).get_raw_query()
269
        if IAnalysisRequest.providedBy(self.context):
270
            # Display the Batches from the Client the Sample belongs to and
271
            # those that do not belong to any Client
272
            client = self.context.getClient()
273
            query["getClientUID"] = [api.get_uid(client), ""]
274
        return query
275