Passed
Push — 2.x ( d668b2...78a11b )
by Ramon
05:54
created

bika.lims.api.analysisservice.get_by_keyword()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
23
from bika.lims import api
24
from bika.lims import bikaMessageFactory as _
25
from bika.lims.browser.fields.uidreferencefield import get_backreferences
26
from bika.lims.catalog import SETUP_CATALOG
27
28
RX_SERVICE_KEYWORD = r"[^A-Za-z\w\d\-_]"
29
30
31
def get_calculation_dependants_for(service):
32
    """Collect all services which depend on this service
33
34
    :param service: Analysis Service Object/ZCatalog Brain
35
    :returns: List of services that depend on this service
36
    """
37
38
    def calc_dependants_gen(service, collector=None):
39
        """Generator for recursive resolution of dependant sevices.
40
        """
41
42
        # The UID of the service
43
        service_uid = api.get_uid(service)
44
45
        # maintain an internal dependency mapping
46
        if collector is None:
47
            collector = {}
48
49
        # Stop iteration if we processed this service already
50
        if service_uid in collector:
51
            raise StopIteration
52
53
        # Get the dependant calculations of the service
54
        # (calculations that use the service in their formula).
55
        calc_uids = get_backreferences(
56
            service, relationship="CalculationDependentServices")
57
58
        for calc_uid in calc_uids:
59
            # Get the calculation object
60
            calc = api.get_object_by_uid(calc_uid)
61
62
            # Get the Analysis Services which have this calculation assigned
63
            dep_service_uids = get_backreferences(
64
                calc, relationship='AnalysisServiceCalculation')
65
66
            for dep_service_uid in dep_service_uids:
67
                dep_service = api.get_object_by_uid(dep_service_uid)
68
69
                # remember the dependent service
70
                collector[dep_service_uid] = dep_service
71
72
                # yield the dependent service
73
                yield dep_service
74
75
                # check the dependants of the dependant services
76
                for ddep_service in calc_dependants_gen(
77
                        dep_service, collector=collector):
78
                    yield ddep_service
79
80
    dependants = {}
81
    service = api.get_object(service)
82
83
    for dep_service in calc_dependants_gen(service):
84
        # Skip the initial (requested) service
85
        if dep_service == service:
86
            continue
87
        uid = api.get_uid(dep_service)
88
        dependants[uid] = dep_service
89
90
    return dependants
91
92
93
def get_calculation_dependencies_for(service):
94
    """Calculation dependencies of this service and the calculation of each
95
    dependent service (recursively).
96
    """
97
98
    def calc_dependencies_gen(service, collector=None):
99
        """Generator for recursive dependency resolution.
100
        """
101
102
        # The UID of the service
103
        service_uid = api.get_uid(service)
104
105
        # maintain an internal dependency mapping
106
        if collector is None:
107
            collector = {}
108
109
        # Stop iteration if we processed this service already
110
        if service_uid in collector:
111
            raise StopIteration
112
113
        # Get the calculation of the service.
114
        # The calculation comes either from an assigned method or the user
115
        # has set a calculation manually (see content/analysisservice.py).
116
        calculation = service.getCalculation()
117
118
        # Stop iteration if there is no calculation
119
        if not calculation:
120
            raise StopIteration
121
122
        # The services used in this calculation.
123
        # These are the actual dependencies of the used formula.
124
        dep_services = calculation.getDependentServices()
125
        for dep_service in dep_services:
126
            # get the UID of the dependent service
127
            dep_service_uid = api.get_uid(dep_service)
128
129
            # remember the dependent service
130
            collector[dep_service_uid] = dep_service
131
132
            # yield the dependent service
133
            yield dep_service
134
135
            # check the dependencies of the dependent services
136
            for ddep_service in calc_dependencies_gen(dep_service,
137
                                                      collector=collector):
138
                yield ddep_service
139
140
    dependencies = {}
141
    for dep_service in calc_dependencies_gen(service):
142
        # Skip the initial (requested) service
143
        if dep_service == service:
144
            continue
145
        uid = api.get_uid(dep_service)
146
        dependencies[uid] = dep_service
147
148
    return dependencies
149
150
151
def get_service_dependencies_for(service):
152
    """Calculate the dependencies for the given service.
153
    """
154
155
    dependants = get_calculation_dependants_for(service)
156
    dependencies = get_calculation_dependencies_for(service)
157
158
    return {
159
        "dependencies": dependencies.values(),
160
        "dependants": dependants.values(),
161
    }
162
163
164
def check_keyword(keyword, instance=None):
165
    """Checks if the given service keyword is valid and unique. Returns an
166
    error message if not valid. None otherwise
167
    """
168
    # Ensure the format is valid
169
    if re.findall(RX_SERVICE_KEYWORD, keyword):
170
        return _("Validation failed: keyword contains invalid characters")
171
172
    # Ensure no other service with this keyword exists
173
    brains = get_by_keyword(keyword)
174
    if instance and len(brains) > 1:
175
        return _("Validation failed: keyword is already in use")
176
    elif brains:
177
        return _("Validation failed: keyword is already in use")
178
179
    # Ensure the keyword is not used in calculations
180
    ref = "[{}]".format(keyword)
181
    cat = api.get_tool(SETUP_CATALOG)
182
    for calculation in cat(portal_type="Calculation"):
183
        calculation = api.get_object(calculation)
184
        if ref in calculation.getFormula():
185
            return _("Validation failed: keyword is already in use by "
186
                     "calculation '{}'").format(api.get_title(calculation))
187
188
189
def get_by_keyword(keyword, full_objects=False):
190
    """Returns an Analysis Service object for the given keyword, if any
191
    """
192
    query = {"portal_type": "AnalysisService", "getKeyword": keyword}
193
    brains = api.search(query, SETUP_CATALOG)
194
    if full_objects:
195
        return [api.get_object(brain) for brain in brains]
196
    return brains
197