Passed
Push — master ( 1b98f4...c06457 )
by Ramon
09:26 queued 04:32
created

Calculation.setTestParameters()   A

Complexity

Conditions 5

Size

Total Lines 24
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 24
rs 9.1832
c 0
b 0
f 0
cc 5
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import importlib
9
import inspect
10
import math
11
import re
12
13
import transaction
14
from AccessControl import ClassSecurityInfo
15
from bika.lims import api
16
from bika.lims import bikaMessageFactory as _
17
from bika.lims.api import get_object_by_uid
18
from bika.lims.browser.fields import InterimFieldsField
19
from bika.lims.browser.fields.uidreferencefield import UIDReferenceField
20
from bika.lims.browser.fields.uidreferencefield import get_backreferences
21
from bika.lims.browser.widgets import RecordsWidget as BikaRecordsWidget
22
from bika.lims.config import PROJECTNAME
23
from bika.lims.content.bikaschema import BikaSchema
24
from bika.lims.interfaces import IDeactivable
25
from bika.lims.interfaces.calculation import ICalculation
26
from Products.Archetypes.atapi import BaseFolder
27
from Products.Archetypes.atapi import ReferenceWidget
28
from Products.Archetypes.atapi import Schema
29
from Products.Archetypes.atapi import TextAreaWidget
30
from Products.Archetypes.atapi import TextField
31
from Products.Archetypes.atapi import registerType
32
from Products.ATContentTypes.lib.historyaware import HistoryAwareMixin
33
from Products.ATExtensions.field import RecordsField
34
from Products.CMFCore.utils import getToolByName
35
from Products.CMFCore.WorkflowCore import WorkflowException
36
from Products.CMFPlone.utils import safe_unicode
37
from zope.interface import implements
38
39
40
schema = BikaSchema.copy() + Schema((
41
42
    InterimFieldsField(
43
        'InterimFields',
44
        widget=BikaRecordsWidget(
45
            label=_("Calculation Interim Fields"),
46
            description=_(
47
                "Define interim fields such as vessel mass, dilution factors, "
48
                "should your calculation require them. The field title "
49
                "specified here will be used as column headers and field "
50
                "descriptors where the interim fields are displayed. If "
51
                "'Apply wide' is enabled the field will be shown in a "
52
                "selection box on the top of the worksheet, allowing to apply "
53
                "a specific value to all the corresponding fields on the "
54
                "sheet."),
55
        )
56
    ),
57
58
    UIDReferenceField(
59
        'DependentServices',
60
        required=1,
61
        multiValued=1,
62
        allowed_types=('AnalysisService',),
63
        widget=ReferenceWidget(
64
            checkbox_bound=0,
65
            visible=False,
66
            label=_("Dependent Analyses"),
67
        ),
68
    ),
69
70
    RecordsField(
71
        'PythonImports',
72
        required=False,
73
        subfields=('module', 'function'),
74
        subfield_labels={'module': _('Module'), 'function': _('Function')},
75
        subfield_readonly={'module': False, 'function': False},
76
        subfield_types={'module': 'string', 'function': 'string'},
77
        default=[
78
            {'module': 'math', 'function': 'ceil'},
79
            {'module': 'math', 'function': 'floor'},
80
        ],
81
        subfield_validators={
82
            'module': 'importvalidator',
83
        },
84
        widget=BikaRecordsWidget(
85
            label=_("Additional Python Libraries"),
86
            description=_(
87
                "If your formula needs a special function from an external "
88
                "Python library, you can import it here. E.g. if you want to "
89
                "use the 'floor' function from the Python 'math' module, "
90
                "you add 'math' to the Module field and 'floor' to the "
91
                "function field. The equivalent in Python would be 'from math "
92
                "import floor'. In your calculation you could use then "
93
                "'floor([Ca] + [Mg])'. "
94
            ),
95
            allowDelete=True,
96
        ),
97
    ),
98
99
    TextField(
100
        'Formula',
101
        validators=('formulavalidator',),
102
        default_content_type='text/plain',
103
        allowable_content_types=('text/plain',),
104
        widget=TextAreaWidget(
105
            label=_("Calculation Formula"),
106
            description=_(
107
                "<p>The formula you type here will be dynamically calculated "
108
                "when an analysis using this calculation is displayed.</p>"
109
                "<p>To enter a Calculation, use standard maths operators,  "
110
                "+ - * / ( ), and all keywords available, both from other "
111
                "Analysis Services and the Interim Fields specified here, "
112
                "as variables. Enclose them in square brackets [ ].</p>"
113
                "<p>E.g, the calculation for Total Hardness, the total of "
114
                "Calcium (ppm) and Magnesium (ppm) ions in water, is entered "
115
                "as [Ca] + [Mg], where Ca and MG are the keywords for those "
116
                "two Analysis Services.</p>"),
117
        )
118
    ),
119
120
    RecordsField(
121
        'TestParameters',
122
        required=False,
123
        subfields=('keyword', 'value'),
124
        subfield_labels={'keyword': _('Keyword'), 'value': _('Value')},
125
        subfield_readonly={'keyword': True, 'value': False},
126
        subfield_types={'keyword': 'string', 'value': 'float'},
127
        default=[{'keyword': '', 'value': 0}],
128
        widget=BikaRecordsWidget(
129
            label=_("Test Parameters"),
130
            description=_("To test the calculation, enter values here for all "
131
                          "calculation parameters.  This includes Interim "
132
                          "fields defined above, as well as any services that "
133
                          "this calculation depends on to calculate results."),
134
            allowDelete=False,
135
        ),
136
    ),
137
138
    TextField(
139
        'TestResult',
140
        default_content_type='text/plain',
141
        allowable_content_types=('text/plain',),
142
        widget=TextAreaWidget(
143
            label=_('Test Result'),
144
            description=_("The result after the calculation has taken place "
145
                          "with test values.  You will need to save the "
146
                          "calculation before this value will be calculated."),
147
        )
148
    ),
149
150
))
151
152
schema['title'].widget.visible = True
153
schema['description'].widget.visible = True
154
155
156
class Calculation(BaseFolder, HistoryAwareMixin):
157
    """Calculation for Analysis Results
158
    """
159
    implements(ICalculation, IDeactivable)
160
161
    security = ClassSecurityInfo()
162
    displayContentsTab = False
163
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
164
165
    _at_rename_after_creation = True
166
167
    def _renameAfterCreation(self, check_auto_id=False):
168
        from bika.lims.idserver import renameAfterCreation
169
        renameAfterCreation(self)
170
171
    def at_post_create_script(self):
172
        """This method is only called once after object creation
173
174
        This hook is called in `processForm` before the
175
        `ObjectInitializedEvent` is fired
176
        """
177
178
        if not hasattr(self, "version_id"):
179
            # Ensure we have an initial version
180
            # https://github.com/senaite/senaite.core/pull/1260
181
            pr = api.get_tool("portal_repository")
182
            pr.save(obj=self, comment="First version")
183
184
    def setInterimFields(self, value):
185
        new_value = []
186
187
        for x in range(len(value)):
188
            row = dict(value[x])
189
            keys = row.keys()
190
            if 'value' not in keys:
191
                row['value'] = 0
192
            new_value.append(row)
193
194
        # extract the keywords from the new calculation interims
195
        calculation_interim_keys = map(lambda i: i.get("keyword"), value)
196
197
        # update all service interims
198
        for service in self.getCalculationDependants():
199
            # get the interims of the dependant service
200
            service_interims = service.getInterimFields()
201
            # extract the keywords from the service interims
202
            service_interim_keys = map(lambda i: i.get("keyword"),
203
                                       service_interims)
204
            # sync new interims from the calculation -> service
205
            new_interims = set(calculation_interim_keys).difference(
206
                set(service_interim_keys))
207
            for key in new_interims:
208
                new_interim = value[calculation_interim_keys.index(key)]
209
                service_interims.append(new_interim)
210
            if new_interims:
211
                service.setInterimFields(service_interims)
212
213
        self.getField('InterimFields').set(self, new_value)
214
215
    def setFormula(self, Formula=None):
216
        """Set the Dependent Services from the text of the calculation Formula
217
        """
218
        bsc = getToolByName(self, 'bika_setup_catalog')
219
        if Formula is None:
220
            self.setDependentServices(None)
221
            self.getField('Formula').set(self, Formula)
222
        else:
223
            keywords = re.compile(r"\[([^.^\]]+)\]").findall(Formula)
224
            brains = bsc(portal_type='AnalysisService',
225
                         getKeyword=keywords)
226
            services = [brain.getObject() for brain in brains]
227
            self.getField('DependentServices').set(self, services)
228
            self.getField('Formula').set(self, Formula)
229
230
    def getMinifiedFormula(self):
231
        """Return the current formula value as text.
232
        The result will have newlines and additional spaces stripped out.
233
        """
234
        value = " ".join(self.getFormula().splitlines())
235
        return value
236
237
    def getCalculationDependencies(self, flat=False, deps=None):
238
        """ Recursively calculates all dependencies of this calculation.
239
            The return value is dictionary of dictionaries (of dictionaries...)
240
241
            {service_UID1:
242
                {service_UID2:
243
                    {service_UID3: {},
244
                     service_UID4: {},
245
                    },
246
                },
247
            }
248
249
            set flat=True to get a simple list of AnalysisService objects
250
        """
251
        if deps is None:
252
            deps = [] if flat is True else {}
253
254
        for service in self.getDependentServices():
255
            calc = service.getCalculation()
256
            if calc:
257
                calc.getCalculationDependencies(flat, deps)
258
            if flat:
259
                deps.append(service)
260
            else:
261
                deps[service.UID()] = {}
262
        return deps
263
264
    def getCalculationDependants(self, deps=None):
265
        """Return a flat list of services who depend on this calculation.
266
267
        This refers only to services who's Calculation UIDReferenceField have
268
        the value set to point to this calculation.
269
270
        It has nothing to do with the services referenced in the calculation's
271
        Formula.
272
        """
273
        if deps is None:
274
            deps = []
275
        backrefs = get_backreferences(self, 'AnalysisServiceCalculation')
276
        services = map(get_object_by_uid, backrefs)
277
        for service in services:
278
            calc = service.getCalculation()
279
            if calc and calc.UID() != self.UID():
280
                calc.getCalculationDependants(deps)
281
            deps.append(service)
282
        return deps
283
284
    def setTestParameters(self, form_value):
285
        """This is called from the objectmodified subscriber, to ensure
286
        correct population of the test-parameters field.
287
        It collects Keywords for all services that are direct dependencies of
288
        this calculatioin, and all of this calculation's InterimFields,
289
        and gloms them together.
290
        """
291
        params = []
292
293
        # Set default/existing values for InterimField keywords
294
        for interim in self.getInterimFields():
295
            keyword = interim.get('keyword')
296
            ex = [x.get('value') for x in form_value if
297
                  x.get('keyword') == keyword]
298
            params.append({'keyword': keyword,
299
                           'value': ex[0] if ex else interim.get('value')})
300
        # Set existing/blank values for service keywords
301
        for service in self.getDependentServices():
302
            keyword = service.getKeyword()
303
            ex = [x.get('value') for x in form_value if
304
                  x.get('keyword') == keyword]
305
            params.append({'keyword': keyword,
306
                           'value': ex[0] if ex else ''})
307
        self.Schema().getField('TestParameters').set(self, params)
308
309
    # noinspection PyUnusedLocal
310
    def setTestResult(self, form_value):
311
        """Calculate formula with TestParameters and enter result into
312
         TestResult field.
313
        """
314
        # Create mapping from TestParameters
315
        mapping = {x['keyword']: x['value'] for x in self.getTestParameters()}
316
        # Gather up and parse formula
317
        formula = self.getMinifiedFormula()
318
        test_result_field = self.Schema().getField('TestResult')
319
320
        # Flush the TestResult field and return
321
        if not formula:
322
            return test_result_field.set(self, "")
323
324
        formula = formula.replace('[', '{').replace(']', '}').replace('  ', '')
325
        result = 'Failure'
326
327
        try:
328
            # print "pre: {}".format(formula)
329
            formula = formula.format(**mapping)
330
            # print "formatted: {}".format(formula)
331
            result = eval(formula, self._getGlobals())
332
            # print "result: {}".format(result)
333
        except TypeError as e:
334
            # non-numeric arguments in interim mapping?
335
            result = "TypeError: {}".format(str(e.args[0]))
336
        except ZeroDivisionError as e:
337
            result = "Division by 0: {}".format(str(e.args[0]))
338
        except KeyError as e:
339
            result = "Key Error: {}".format(str(e.args[0]))
340
        except ImportError as e:
341
            result = "Import Error: {}".format(str(e.args[0]))
342
        except Exception as e:
343
            result = "Unspecified exception: {}".format(str(e.args[0]))
344
        test_result_field.set(self, str(result))
345
346
    def _getGlobals(self, **kwargs):
347
        """Return the globals dictionary for the formula calculation
348
        """
349
        # Default globals
350
        globs = {
351
            "__builtins__": None,
352
            "all": all,
353
            "any": any,
354
            "bool": bool,
355
            "chr": chr,
356
            "cmp": cmp,
357
            "complex": complex,
358
            "divmod": divmod,
359
            "enumerate": enumerate,
360
            "float": float,
361
            "format": format,
362
            "frozenset": frozenset,
363
            "hex": hex,
364
            "int": int,
365
            "len": len,
366
            "list": list,
367
            "long": long,
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'long'
Loading history...
368
            "math": math,
369
            "max": max,
370
            "min": min,
371
            "oct": oct,
372
            "ord": ord,
373
            "pow": pow,
374
            "range": range,
375
            "reversed": reversed,
376
            "round": round,
377
            "str": str,
378
            "sum": sum,
379
            "tuple": tuple,
380
            "xrange": xrange,
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
381
        }
382
        # Update with keyword arguments
383
        globs.update(kwargs)
384
        # Update with additional Python libraries
385
        for imp in self.getPythonImports():
386
            mod = imp["module"]
387
            func = imp["function"]
388
            member = self._getModuleMember(mod, func)
389
            if member is None:
390
                raise ImportError(
391
                    "Could not find member {} of module {}".format(
392
                        func, mod))
393
            globs[func] = member
394
        return globs
395
396
    def _getModuleMember(self, dotted_name, member):
397
        """Get the member object of a module.
398
399
        :param dotted_name: The dotted name of the module, e.g. 'scipy.special'
400
        :type dotted_name: string
401
        :param member: The name of the member function, e.g. 'gammaincinv'
402
        :type member: string
403
        :returns: member object or None
404
        """
405
        try:
406
            mod = importlib.import_module(dotted_name)
407
        except ImportError:
408
            return None
409
410
        members = dict(inspect.getmembers(mod))
411
        return members.get(member)
412
413
    def workflow_script_activate(self):
414
        wf = getToolByName(self, 'portal_workflow')
415
        pu = getToolByName(self, 'plone_utils')
416
        # A calculation cannot be re-activated if services it depends on
417
        # are deactivated.
418
        services = self.getDependentServices()
419
        inactive_services = []
420
        for service in services:
421
            if not api.is_active(service):
422
                inactive_services.append(service.Title())
423
        if inactive_services:
424
            msg = _("Cannot activate calculation, because the following "
425
                    "service dependencies are inactive: ${inactive_services}",
426
                    mapping={'inactive_services': safe_unicode(
427
                        ", ".join(inactive_services))})
428
            pu.addPortalMessage(msg, 'error')
429
            transaction.get().abort()
430
            raise WorkflowException
431
432
    def workflow_script_deactivate(self):
433
        bsc = getToolByName(self, 'bika_setup_catalog')
434
        pu = getToolByName(self, 'plone_utils')
435
        # A calculation cannot be deactivated if active services are using it.
436
        services = bsc(portal_type="AnalysisService", is_active=True)
437
        calc_services = []
438
        for service in services:
439
            service = service.getObject()
440
            calc = service.getCalculation()
441
            if calc and calc.UID() == self.UID():
442
                calc_services.append(service.Title())
443
        if calc_services:
444
            msg = _(
445
                'Cannot deactivate calculation, because it is in use by the '
446
                'following services: ${calc_services}',
447
                mapping={
448
                    'calc_services': safe_unicode(", ".join(calc_services))})
449
            pu.addPortalMessage(msg, 'error')
450
            transaction.get().abort()
451
            raise WorkflowException
452
453
454
registerType(Calculation, PROJECTNAME)
455