Passed
Push — 2.x ( 3a5aed...187ea7 )
by Jordi
07:51
created

Calculation.getCalculationDependents()   A

Complexity

Conditions 5

Size

Total Lines 19
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 19
rs 9.3333
c 0
b 0
f 0
cc 5
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import importlib
22
import inspect
23
import math
24
import re
25
26
import transaction
27
from AccessControl import ClassSecurityInfo
28
from bika.lims import api
29
from bika.lims import bikaMessageFactory as _
30
from bika.lims.api import get_object_by_uid
31
from bika.lims.browser.fields import InterimFieldsField
32
from bika.lims.browser.fields.uidreferencefield import UIDReferenceField
33
from bika.lims.browser.fields.uidreferencefield import get_backreferences
34
from bika.lims.browser.widgets import RecordsWidget as BikaRecordsWidget
35
from bika.lims.config import PROJECTNAME
36
from bika.lims.content.bikaschema import BikaSchema
37
from bika.lims.interfaces import IDeactivable
38
from bika.lims.interfaces.calculation import ICalculation
39
from Products.Archetypes.atapi import BaseFolder
40
from Products.Archetypes.atapi import Schema
41
from Products.Archetypes.atapi import TextAreaWidget
42
from Products.Archetypes.atapi import TextField
43
from Products.Archetypes.atapi import registerType
44
from Products.ATContentTypes.lib.historyaware import HistoryAwareMixin
45
from Products.CMFCore.utils import getToolByName
46
from Products.CMFCore.WorkflowCore import WorkflowException
47
from Products.CMFPlone.utils import safe_unicode
48
from senaite.core import logger
49
from senaite.core.browser.fields.records import RecordsField
50
from senaite.core.browser.widgets.referencewidget import ReferenceWidget
51
from senaite.core.catalog import SETUP_CATALOG
52
from zope.interface import implements
53
54
schema = BikaSchema.copy() + Schema((
55
56
    InterimFieldsField(
57
        'InterimFields',
58
        widget=BikaRecordsWidget(
59
            label=_("Calculation Interim Fields"),
60
            description=_(
61
                "Define interim fields such as vessel mass, dilution factors, "
62
                "should your calculation require them. The field title "
63
                "specified here will be used as column headers and field "
64
                "descriptors where the interim fields are displayed. If "
65
                "'Apply wide' is enabled the field will be shown in a "
66
                "selection box on the top of the worksheet, allowing to apply "
67
                "a specific value to all the corresponding fields on the "
68
                "sheet."),
69
        )
70
    ),
71
72
    # hidden field
73
    UIDReferenceField(
74
        "DependentServices",
75
        required=1,
76
        multiValued=1,
77
        relationship="CalculationDependentServices",
78
        allowed_types=("AnalysisService",),
79
        widget=ReferenceWidget(
80
            visible=False,
81
            label=_(
82
                "label_calculation_dependentservices",
83
                default="Dependent services"),
84
            description=_(
85
                "description_calculation_dependentservices",
86
                default="Dependent services of this calculation"),
87
            catalog=SETUP_CATALOG,
88
            query={
89
                "is_active": True,
90
                "sort_on": "sortable_title",
91
                "sort_order": "ascending"
92
            },
93
        ),
94
    ),
95
96
    RecordsField(
97
        'PythonImports',
98
        required=False,
99
        subfields=('module', 'function'),
100
        subfield_labels={'module': _('Module'), 'function': _('Function')},
101
        subfield_readonly={'module': False, 'function': False},
102
        subfield_types={'module': 'string', 'function': 'string'},
103
        default=[
104
            {'module': 'math', 'function': 'ceil'},
105
            {'module': 'math', 'function': 'floor'},
106
        ],
107
        subfield_validators={
108
            'module': 'importvalidator',
109
        },
110
        widget=BikaRecordsWidget(
111
            label=_("Additional Python Libraries"),
112
            description=_(
113
                "If your formula needs a special function from an external "
114
                "Python library, you can import it here. E.g. if you want to "
115
                "use the 'floor' function from the Python 'math' module, "
116
                "you add 'math' to the Module field and 'floor' to the "
117
                "function field. The equivalent in Python would be 'from math "
118
                "import floor'. In your calculation you could use then "
119
                "'floor([Ca] + [Mg])'. "
120
            ),
121
            allowDelete=True,
122
        ),
123
    ),
124
125
    TextField(
126
        'Formula',
127
        required=True,
128
        validators=('formulavalidator',),
129
        default_content_type='text/plain',
130
        allowable_content_types=('text/plain',),
131
        widget=TextAreaWidget(
132
            label=_("Calculation Formula"),
133
            description=_(
134
                "<p>The formula you type here will be dynamically calculated "
135
                "when an analysis using this calculation is displayed.</p>"
136
                "<p>To enter a Calculation, use standard maths operators,  "
137
                "+ - * / ( ), and all keywords available, both from other "
138
                "Analysis Services and the Interim Fields specified here, "
139
                "as variables. Enclose them in square brackets [ ].</p>"
140
                "<p>E.g, the calculation for Total Hardness, the total of "
141
                "Calcium (ppm) and Magnesium (ppm) ions in water, is entered "
142
                "as [Ca] + [Mg], where Ca and MG are the keywords for those "
143
                "two Analysis Services.</p>"),
144
        )
145
    ),
146
147
    RecordsField(
148
        'TestParameters',
149
        required=False,
150
        subfields=('keyword', 'value'),
151
        subfield_labels={'keyword': _('Keyword'), 'value': _('Value')},
152
        subfield_readonly={'keyword': True, 'value': False},
153
        subfield_types={'keyword': 'string', 'value': 'float'},
154
        default=[{'keyword': '', 'value': 0}],
155
        widget=BikaRecordsWidget(
156
            label=_("Test Parameters"),
157
            description=_("To test the calculation, enter values here for all "
158
                          "calculation parameters.  This includes Interim "
159
                          "fields defined above, as well as any services that "
160
                          "this calculation depends on to calculate results."),
161
            allowDelete=False,
162
        ),
163
    ),
164
165
    TextField(
166
        'TestResult',
167
        default_content_type='text/plain',
168
        allowable_content_types=('text/plain',),
169
        widget=TextAreaWidget(
170
            label=_('Test Result'),
171
            description=_("The result after the calculation has taken place "
172
                          "with test values.  You will need to save the "
173
                          "calculation before this value will be calculated."),
174
        )
175
    ),
176
177
))
178
179
schema['title'].widget.visible = True
180
schema['description'].widget.visible = True
181
182
183
class Calculation(BaseFolder, HistoryAwareMixin):
184
    """Calculation for Analysis Results
185
    """
186
    implements(ICalculation, IDeactivable)
187
188
    security = ClassSecurityInfo()
189
    displayContentsTab = False
190
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
191
192
    _at_rename_after_creation = True
193
194
    def _renameAfterCreation(self, check_auto_id=False):
195
        from senaite.core.idserver import renameAfterCreation
196
        renameAfterCreation(self)
197
198
    def setInterimFields(self, value):
199
        new_value = []
200
201
        for x in range(len(value)):
202
            row = dict(value[x])
203
            keys = row.keys()
204
            if 'value' not in keys:
205
                row['value'] = 0
206
            new_value.append(row)
207
208
        # extract the keywords from the new calculation interims
209
        calculation_interim_keys = map(lambda i: i.get("keyword"), value)
210
211
        # update all service interims
212
        for service in self.getCalculationDependents():
213
            # get the interims of the dependent service
214
            service_interims = service.getInterimFields()
215
            # extract the keywords from the service interims
216
            service_interim_keys = map(lambda i: i.get("keyword"),
217
                                       service_interims)
218
            # sync new interims from the calculation -> service
219
            new_interims = set(calculation_interim_keys).difference(
220
                set(service_interim_keys))
221
            for key in new_interims:
222
                new_interim = value[calculation_interim_keys.index(key)]
223
                service_interims.append(new_interim)
224
            if new_interims:
225
                service.setInterimFields(service_interims)
226
227
        self.getField('InterimFields').set(self, new_value)
228
229
    def setFormula(self, Formula=None):
230
        """Set the Dependent Services from the text of the calculation Formula
231
        """
232
        bsc = getToolByName(self, 'senaite_catalog_setup')
233
        if Formula is None:
234
            self.setDependentServices(None)
235
            self.getField('Formula').set(self, Formula)
236
        else:
237
            keywords = re.compile(r"\[([^.^\]]+)\]").findall(Formula)
238
            brains = bsc(portal_type='AnalysisService',
239
                         getKeyword=keywords)
240
            services = [brain.getObject() for brain in brains]
241
            self.getField('DependentServices').set(self, services)
242
            self.getField('Formula').set(self, Formula)
243
244
    def getMinifiedFormula(self):
245
        """Return the current formula value as text.
246
        The result will have newlines and additional spaces stripped out.
247
        """
248
        value = " ".join(self.getFormula().splitlines())
249
        return value
250
251
    def getCalculationDependencies(self, flat=False, deps=None):
252
        """ Recursively calculates all dependencies of this calculation.
253
            The return value is dictionary of dictionaries (of dictionaries...)
254
255
            {service_UID1:
256
                {service_UID2:
257
                    {service_UID3: {},
258
                     service_UID4: {},
259
                    },
260
                },
261
            }
262
263
            set flat=True to get a simple list of AnalysisService objects
264
        """
265
        if deps is None:
266
            deps = [] if flat is True else {}
267
268
        def get_fetched(deps):
269
            if isinstance(deps, list):
270
                return map(api.get_uid, deps)
271
            if isinstance(deps, dict):
272
                fetched = deps.keys()
273
                for value in deps.values():
274
                    fetched.extend(get_fetched(value))
275
                return fetched
276
            return []
277
278
        # List of service uids that have been grabbed already. This is used to
279
        # prevent an infinite recursion error when the formula includes the
280
        # Keyword of the Service that includes the Calculation
281
        fetched = get_fetched(deps)
282
283
        for service in self.getDependentServices():
284
            if api.get_uid(service) in fetched:
285
                # Processed already. Omit to prevent recursion
286
                continue
287
288
            if flat:
289
                deps.append(service)
290
            else:
291
                deps[service.UID()] = {}
292
293
            calc = service.getCalculation()
294
            if calc:
295
                calc.getCalculationDependencies(flat, deps)
296
297
        if flat:
298
            # Remove duplicates
299
            deps = list(set(deps))
300
301
        return deps
302
303
    def getCalculationDependents(self, deps=None):
304
        """Return a flat list of services who depend on this calculation.
305
306
        This refers only to services who's Calculation UIDReferenceField have
307
        the value set to point to this calculation.
308
309
        It has nothing to do with the services referenced in the calculation's
310
        Formula.
311
        """
312
        if deps is None:
313
            deps = []
314
        backrefs = get_backreferences(self, "AnalysisServiceCalculation")
315
        services = map(get_object_by_uid, backrefs)
316
        for service in services:
317
            calc = service.getCalculation()
318
            if calc and calc.UID() != self.UID():
319
                calc.getCalculationDependents(deps)
320
            deps.append(service)
321
        return deps
322
323
    def setTestParameters(self, form_value):
324
        """This is called from the objectmodified subscriber, to ensure
325
        correct population of the test-parameters field.
326
        It collects Keywords for all services that are direct dependencies of
327
        this calculatioin, and all of this calculation's InterimFields,
328
        and gloms them together.
329
        """
330
        params = []
331
332
        # Set default/existing values for InterimField keywords
333
        for interim in self.getInterimFields():
334
            keyword = interim.get('keyword')
335
            ex = [x.get('value') for x in form_value if
336
                  x.get('keyword') == keyword]
337
            params.append({'keyword': keyword,
338
                           'value': ex[0] if ex else interim.get('value')})
339
        # Set existing/blank values for service keywords
340
        for service in self.getDependentServices():
341
            keyword = service.getKeyword()
342
            ex = [x.get('value') for x in form_value if
343
                  x.get('keyword') == keyword]
344
            params.append({'keyword': keyword,
345
                           'value': ex[0] if ex else ''})
346
        self.Schema().getField('TestParameters').set(self, params)
347
348
    # noinspection PyUnusedLocal
349
    def setTestResult(self, form_value):
350
        """Calculate formula with TestParameters and enter result into
351
         TestResult field.
352
        """
353
        # Create mapping from TestParameters
354
        mapping = {x['keyword']: x['value'] for x in self.getTestParameters()}
355
        # Gather up and parse formula
356
        formula = self.getMinifiedFormula()
357
        test_result_field = self.Schema().getField('TestResult')
358
359
        # Flush the TestResult field and return
360
        if not formula:
361
            return test_result_field.set(self, "")
362
363
        formula = formula.replace('[', '{').replace(']', '}').replace('  ', '')
364
        result = 'Failure'
365
366
        try:
367
            formula = formula.format(**mapping)
368
            result = eval(formula, self._getGlobals())
369
        except TypeError as e:
370
            # non-numeric arguments in interim mapping?
371
            result = "TypeError: {}".format(str(e.args[0]))
372
        except ZeroDivisionError as e:
373
            result = "Division by 0: {}".format(str(e.args[0]))
374
        except KeyError as e:
375
            result = "Key Error: {}".format(str(e.args[0]))
376
        except ImportError as e:
377
            result = "Import Error: {}".format(str(e.args[0]))
378
        except Exception as e:
379
            result = "Unspecified exception: {}".format(str(e.args[0]))
380
        test_result_field.set(self, str(result))
381
382
    def _getGlobals(self, **kwargs):
383
        """Return the globals dictionary for the formula calculation
384
        """
385
        # Default globals
386
        globs = {
387
            "__builtins__": None,
388
            "all": all,
389
            "any": any,
390
            "bool": bool,
391
            "chr": chr,
392
            "cmp": cmp,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cmp does not seem to be defined.
Loading history...
393
            "complex": complex,
394
            "divmod": divmod,
395
            "enumerate": enumerate,
396
            "float": float,
397
            "format": format,
398
            "frozenset": frozenset,
399
            "hex": hex,
400
            "int": int,
401
            "len": len,
402
            "list": list,
403
            "long": long,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable long does not seem to be defined.
Loading history...
404
            "math": math,
405
            "max": max,
406
            "min": min,
407
            "oct": oct,
408
            "ord": ord,
409
            "pow": pow,
410
            "range": range,
411
            "reversed": reversed,
412
            "round": round,
413
            "str": str,
414
            "sum": sum,
415
            "tuple": tuple,
416
            "xrange": xrange,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable xrange does not seem to be defined.
Loading history...
417
        }
418
        # Update with keyword arguments
419
        globs.update(kwargs)
420
        # Update with additional Python libraries
421
        for imp in self.getPythonImports():
422
            mod = imp["module"]
423
            func = imp["function"]
424
            member = self._getModuleMember(mod, func)
425
            if member is None:
426
                raise ImportError(
427
                    "Could not find member {} of module {}".format(
428
                        func, mod))
429
            globs[func] = member
430
        return globs
431
432
    def _getModuleMember(self, dotted_name, member):
433
        """Get the member object of a module.
434
435
        :param dotted_name: The dotted name of the module, e.g. 'scipy.special'
436
        :type dotted_name: string
437
        :param member: The name of the member function, e.g. 'gammaincinv'
438
        :type member: string
439
        :returns: member object or None
440
        """
441
        try:
442
            mod = importlib.import_module(dotted_name)
443
        except ImportError as e:
444
            logger.error("Cannot import module %s: %s" % (dotted_name, str(e)))
445
            return None
446
447
        members = dict(inspect.getmembers(mod))
448
        return members.get(member)
449
450
    def workflow_script_activate(self):
451
        pu = getToolByName(self, 'plone_utils')
452
        # A calculation cannot be re-activated if services it depends on
453
        # are deactivated.
454
        services = self.getDependentServices()
455
        inactive_services = []
456
        for service in services:
457
            if not api.is_active(service):
458
                inactive_services.append(service.Title())
459
        if inactive_services:
460
            msg = _("Cannot activate calculation, because the following "
461
                    "service dependencies are inactive: ${inactive_services}",
462
                    mapping={'inactive_services': safe_unicode(
463
                        ", ".join(inactive_services))})
464
            pu.addPortalMessage(msg, 'error')
465
            transaction.get().abort()
466
            raise WorkflowException
467
468
    def workflow_script_deactivate(self):
469
        bsc = getToolByName(self, 'senaite_catalog_setup')
470
        pu = getToolByName(self, 'plone_utils')
471
        # A calculation cannot be deactivated if active services are using it.
472
        services = bsc(portal_type="AnalysisService", is_active=True)
473
        calc_services = []
474
        for service in services:
475
            service = service.getObject()
476
            calc = service.getCalculation()
477
            if calc and calc.UID() == self.UID():
478
                calc_services.append(service.Title())
479
        if calc_services:
480
            msg = _(
481
                'Cannot deactivate calculation, because it is in use by the '
482
                'following services: ${calc_services}',
483
                mapping={
484
                    'calc_services': safe_unicode(", ".join(calc_services))})
485
            pu.addPortalMessage(msg, 'error')
486
            transaction.get().abort()
487
            raise WorkflowException
488
489
490
registerType(Calculation, PROJECTNAME)
491