Passed
Pull Request — 2.x (#1872)
by Jordi
04:50
created

senaite.core.api.catalog   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 195
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 26
eloc 94
dl 0
loc 195
rs 10
c 0
b 0
f 0

11 Functions

Rating   Name   Duplication   Size   Complexity  
A add_zc_text_index() 0 31 3
A add_column() 0 15 2
A is_catalog() 0 7 1
A get_catalog() 0 15 4
A get_indexes() 0 8 1
B add_index() 0 21 6
A del_index() 0 13 2
A reindex_index() 0 15 2
A get_columns() 0 8 1
A del_column() 0 15 2
A get_index() 0 12 2
1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from bika.lims.api import get_tool
6
from Products.ZCatalog.interfaces import IZCatalog
7
from Products.ZCTextIndex.Lexicon import StopWordAndSingleCharRemover
8
from Products.CMFPlone.UnicodeSplitter import CaseNormalizer
9
from Products.CMFPlone.UnicodeSplitter import Splitter
10
from Products.ZCTextIndex.ZCTextIndex import PLexicon
11
from Products.ZCatalog.ZCatalog import ZCatalog
12
from bika.lims.api import APIError
13
14
15
def get_catalog(name_or_obj):
16
    """Get the catalog by name or from the object
17
18
    :param name_or_obj: name of the catalog or a catalog object
19
    :returns: catalog object
20
    """
21
    if is_catalog(name_or_obj):
22
        return name_or_obj
23
24
    catalog = None
25
    if isinstance(name_or_obj, six.string_types):
26
        catalog = get_tool(name_or_obj)
27
    if not is_catalog(catalog):
28
        raise APIError("No catalog found for %s" % repr(name_or_obj))
29
    return catalog
30
31
32
def is_catalog(obj):
33
    """Checks if the given object is a catalog
34
35
    :param obj: An object
36
    :returns: True/False if the object is a ZCatalog object
37
    """
38
    return IZCatalog.providedBy(obj)
39
40
41
def get_indexes(catalog):
42
    """Return the indexes of the catalog
43
44
    :param catalog: Catalog object
45
    :returns: List of all index names
46
    """
47
    catalog = get_catalog(catalog)
48
    return catalog.indexes()
49
50
51
def get_columns(catalog):
52
    """Return the columns of the catalog
53
54
    :param catalog: Catalog object
55
    :returns: List of all column names
56
    """
57
    catalog = get_catalog(catalog)
58
    return catalog.schema()
59
60
61
def add_index(catalog, index, index_type, indexed_attrs=None):
62
    """Add an index to the catalog
63
64
    :param catalog: Catalog object
65
    :param index: Index id
66
    :returns: True when the index was added successfully otherwise False
67
    """
68
    catalog = get_catalog(catalog)
69
    indexes = get_indexes(catalog)
70
    if index in indexes:
71
        return False
72
    if index_type == "ZCTextIndex":
73
        return add_zc_text_index(catalog, index)
74
    catalog.addIndex(index, index_type)
75
    # set indexed attribute
76
    if indexed_attrs and hasattr(index, "indexed_attrs"):
77
        index_obj = get_index(index)
78
        if not isinstance(indexed_attrs, list):
79
            indexed_attrs = [indexed_attrs]
80
        index_obj.indexed_attrs = indexed_attrs
81
    return True
82
83
84
def del_index(catalog, index):
85
    """Delete an index from the catalog
86
87
    :param catalog: Catalog object
88
    :param index: Index id
89
    :returns: True when the index was deleted successfully otherwise False
90
    """
91
    catalog = get_catalog(catalog)
92
    indexes = get_indexes(catalog)
93
    if index not in indexes:
94
        return False
95
    catalog.delIndex(index)
96
    return True
97
98
99
def get_index(catalog, index):
100
    """Get an index from the catalog
101
102
    :param catalog: Catalog object
103
    :param index: Index id
104
    :returns: Index object or None
105
    """
106
    catalog = get_catalog(catalog)
107
    indexes = get_indexes(catalog)
108
    if index not in indexes:
109
        return None
110
    return catalog.Indexes[index]
111
112
113
def add_zc_text_index(catalog, index, lex_id="Lexicon"):
114
    """Add ZC text index to the catalog
115
116
    :param catalog: Catalog object
117
    :param index: Index id
118
    :returns: True when the index was added successfully, otherwise False
119
    """
120
    catalog = get_catalog(catalog)
121
    indexes = get_indexes(catalog)
122
123
    if index in indexes:
124
        return False
125
126
    # check if the lexicon exists
127
    lexicon = getattr(catalog, lex_id, None)
128
    if lexicon is None:
129
        # create the lexicon first
130
        splitter = Splitter()
131
        casenormalizer = CaseNormalizer()
132
        stopwordremover = StopWordAndSingleCharRemover()
133
        pipeline = [splitter, casenormalizer, stopwordremover]
134
        lexicon = PLexicon(lex_id, "Lexicon", *pipeline)
135
        catalog._setObject(lex_id, lexicon)
136
137
    class extra(object):
138
        doc_attr = index
139
        lexicon_id = lex_id
140
        index_type = "Okapi BM25 Rank"
141
142
    catalog.addIndex(index, "ZCTextIndex", extra)
143
    return True
144
145
146
def reindex_index(catalog, index):
147
    """Reindex the index of the catalog
148
149
    :param catalog: Catalog object
150
    :param index: Index id
151
    :returns: True when the index was reindexd successfully, otherwise False
152
    """
153
    catalog = get_catalog(catalog)
154
    indexes = get_indexes(catalog)
155
156
    if index not in indexes:
157
        return False
158
159
    catalog.manage_reindexIndex(index)
160
    return True
161
162
163
def add_column(catalog, column):
164
    """Add a column to the catalog
165
166
    :param catalog: Catalog object
167
    :param column: Column name
168
    :returns: True when the column  was added successfully, otherwise False
169
    """
170
    catalog = get_catalog(catalog)
171
    columns = get_columns(catalog)
172
173
    if column in columns:
174
        return False
175
176
    catalog.addColumn(column)
177
    return True
178
179
180
def del_column(catalog, column):
181
    """Delete a column from the catalog
182
183
    :param catalog: Catalog object
184
    :param column: Column name
185
    :returns: True when the column  was deleted successfully, otherwise False
186
    """
187
    catalog = get_catalog(catalog)
188
    columns = get_columns(catalog)
189
190
    if column not in columns:
191
        return False
192
193
    catalog.delColumn(column)
194
    return True
195