Passed
Push — 2.x ( c8e751...75e40c )
by Jordi
06:10
created

senaite.core.api.catalog   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 307
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 46
eloc 138
dl 0
loc 307
rs 8.72
c 0
b 0
f 0

12 Functions

Rating   Name   Duplication   Size   Complexity  
A is_catalog() 0 7 1
A get_indexes() 0 8 1
A get_catalog() 0 15 4
B add_index() 0 21 6
A del_index() 0 13 2
A get_columns() 0 8 1
A get_index() 0 12 2
F to_searchable_text_qs() 0 91 19
A add_zc_text_index() 0 30 4
A add_column() 0 15 2
A reindex_index() 0 15 2
A del_column() 0 15 2

How to fix   Complexity   

Complexity

Complex classes like senaite.core.api.catalog often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2023 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
23
import six
24
from six.moves.urllib.parse import unquote_plus
25
26
from bika.lims.api import APIError
27
from bika.lims.api import get_tool
28
from bika.lims.api import safe_unicode
29
from Products.CMFPlone.UnicodeSplitter import CaseNormalizer
30
from Products.CMFPlone.UnicodeSplitter import Splitter
31
from Products.ZCatalog.interfaces import IZCatalog
32
from Products.ZCTextIndex.ZCTextIndex import PLexicon
33
34
35
def get_catalog(name_or_obj):
36
    """Get the catalog by name or from the object
37
38
    :param name_or_obj: name of the catalog or a catalog object
39
    :returns: catalog object
40
    """
41
    if is_catalog(name_or_obj):
42
        return name_or_obj
43
44
    catalog = None
45
    if isinstance(name_or_obj, six.string_types):
46
        catalog = get_tool(name_or_obj)
47
    if not is_catalog(catalog):
48
        raise APIError("No catalog found for %s" % repr(name_or_obj))
49
    return catalog
50
51
52
def is_catalog(obj):
53
    """Checks if the given object is a catalog
54
55
    :param obj: An object
56
    :returns: True/False if the object is a ZCatalog object
57
    """
58
    return IZCatalog.providedBy(obj)
59
60
61
def get_indexes(catalog):
62
    """Return the indexes of the catalog
63
64
    :param catalog: Catalog object
65
    :returns: List of all index names
66
    """
67
    catalog = get_catalog(catalog)
68
    return catalog.indexes()
69
70
71
def get_columns(catalog):
72
    """Return the columns of the catalog
73
74
    :param catalog: Catalog object
75
    :returns: List of all column names
76
    """
77
    catalog = get_catalog(catalog)
78
    return catalog.schema()
79
80
81
def add_index(catalog, index, index_type, indexed_attrs=None):
82
    """Add an index to the catalog
83
84
    :param catalog: Catalog object
85
    :param index: Index id
86
    :returns: True when the index was added successfully otherwise False
87
    """
88
    catalog = get_catalog(catalog)
89
    indexes = get_indexes(catalog)
90
    if index in indexes:
91
        return False
92
    if index_type == "ZCTextIndex":
93
        return add_zc_text_index(catalog, index, indexed_attrs=indexed_attrs)
94
    catalog.addIndex(index, index_type)
95
    # set indexed attribute
96
    index_obj = get_index(catalog, index)
97
    if indexed_attrs and hasattr(index_obj, "indexed_attrs"):
98
        if not isinstance(indexed_attrs, list):
99
            indexed_attrs = [indexed_attrs]
100
        index_obj.indexed_attrs = indexed_attrs
101
    return True
102
103
104
def del_index(catalog, index):
105
    """Delete an index from the catalog
106
107
    :param catalog: Catalog object
108
    :param index: Index id
109
    :returns: True when the index was deleted successfully otherwise False
110
    """
111
    catalog = get_catalog(catalog)
112
    indexes = get_indexes(catalog)
113
    if index not in indexes:
114
        return False
115
    catalog.delIndex(index)
116
    return True
117
118
119
def get_index(catalog, index):
120
    """Get an index from the catalog
121
122
    :param catalog: Catalog object
123
    :param index: Index id
124
    :returns: Index object or None
125
    """
126
    catalog = get_catalog(catalog)
127
    indexes = get_indexes(catalog)
128
    if index not in indexes:
129
        return None
130
    return catalog.Indexes[index]
131
132
133
def add_zc_text_index(catalog, index, lex_id="Lexicon", indexed_attrs=None):
134
    """Add ZC text index to the catalog
135
136
    :param catalog: Catalog object
137
    :param index: Index id
138
    :returns: True when the index was added successfully, otherwise False
139
    """
140
    catalog = get_catalog(catalog)
141
    indexes = get_indexes(catalog)
142
143
    if index in indexes:
144
        return False
145
146
    # check if the lexicon exists
147
    lexicon = getattr(catalog, lex_id, None)
148
    if lexicon is None:
149
        # create the lexicon first
150
        splitter = Splitter()
151
        casenormalizer = CaseNormalizer()
152
        pipeline = [splitter, casenormalizer]
153
        lexicon = PLexicon(lex_id, "Lexicon", *pipeline)
154
        catalog._setObject(lex_id, lexicon)
155
156
    class extra(object):
157
        doc_attr = indexed_attrs if indexed_attrs else index
158
        lexicon_id = lex_id
159
        index_type = "Okapi BM25 Rank"
160
161
    catalog.addIndex(index, "ZCTextIndex", extra)
162
    return True
163
164
165
def reindex_index(catalog, index):
166
    """Reindex the index of the catalog
167
168
    :param catalog: Catalog object
169
    :param index: Index id
170
    :returns: True when the index was reindexd successfully, otherwise False
171
    """
172
    catalog = get_catalog(catalog)
173
    indexes = get_indexes(catalog)
174
175
    if index not in indexes:
176
        return False
177
178
    catalog.manage_reindexIndex(index)
179
    return True
180
181
182
def add_column(catalog, column):
183
    """Add a column to the catalog
184
185
    :param catalog: Catalog object
186
    :param column: Column name
187
    :returns: True when the column  was added successfully, otherwise False
188
    """
189
    catalog = get_catalog(catalog)
190
    columns = get_columns(catalog)
191
192
    if column in columns:
193
        return False
194
195
    catalog.addColumn(column)
196
    return True
197
198
199
def del_column(catalog, column):
200
    """Delete a column from the catalog
201
202
    :param catalog: Catalog object
203
    :param column: Column name
204
    :returns: True when the column  was deleted successfully, otherwise False
205
    """
206
    catalog = get_catalog(catalog)
207
    columns = get_columns(catalog)
208
209
    if column not in columns:
210
        return False
211
212
    catalog.delColumn(column)
213
    return True
214
215
216
def to_searchable_text_qs(qs, op="AND", wildcard=True):
217
    """Convert the query string for a searchable text index
218
219
    https://zope.readthedocs.io/en/latest/zopebook/SearchingZCatalog.html#searching-zctextindexes
220
221
    NOTE: we do not support parenthesis, questionmarks or negated searches,
222
          because this raises quickly parse errors for ZCTextIndexes
223
224
    :param qs: search string
225
    :param op: operator for token concatenation
226
    :param wildcard: append `*` to the tokens
227
    :returns: sarchable text string
228
    """
229
    OPERATORS = ["AND", "OR"]
230
    WILDCARDS = ["*", "?"]
231
232
    if op not in OPERATORS:
233
        op = "AND"
234
    if not isinstance(qs, six.string_types):
235
        return ""
236
237
    def is_op(token):
238
        return token.upper() in OPERATORS
239
240
    def is_wc(char):
241
        return char in WILDCARDS
242
243
    def append_op_after(index, token, tokens):
244
        # do not append an operator after the last token
245
        if index == len(tokens) - 1:
246
            return False
247
        # do not append an operator if the next token is an operator
248
        next_token = tokens[num + 1]
249
        if is_op(next_token):
250
            return False
251
        # append an operator (AND/OR) after this token
252
        return True
253
254
    # convert to unicode
255
    term = unquote_plus(safe_unicode(qs))
256
257
    # Wildcards at the beginning are not allowed and therefore removed!
258
    first_char = term[0] if len(term) > 0 else ""
259
    if is_wc(first_char):
260
        term = term.replace(first_char, "", 1)
261
262
    # splits the string on all characters that do not match the regex
263
    regex = r"[^\w\-\_\.\<\>\+\{\}\:\/\?\$]"
264
265
    # allow only words when searching just a single character
266
    if len(term) == 1:
267
        regex = r"[^\w]"
268
269
    tokens = re.split(regex, term, flags=re.U | re.I)
270
271
    # filter out all empty tokens
272
    tokens = filter(None, tokens)
273
274
    # cleanup starting operators
275
    while tokens and is_op(tokens[0]):
276
        tokens.pop(0)
277
278
    # cleanup any trailing operators
279
    while tokens and is_op(tokens[-1]):
280
        tokens.pop(-1)
281
282
    parts = []
283
284
    for num, token in enumerate(tokens):
285
286
        # retain wildcards at the end of a token
287
        last_token_char = token[-1] if len(token) > 0 else ""
288
289
        # append operators without changes and continue
290
        if is_op(token):
291
            parts.append(token.upper())
292
            continue
293
294
        # append wildcard to token
295
        if wildcard and not is_op(token) and not is_wc(last_token_char):
296
            token = token + "*"
297
298
        # append the token
299
        parts.append(token)
300
301
        # check if we need to append an operator after the current token
302
        if append_op_after(num, token, tokens):
303
            parts.append(op)
304
305
    # return the final querystring
306
    return u" ".join(parts)
307