Passed
Push — master ( c06457...7bdfa8 )
by Ramon
09:30 queued 04:44
created

ipt()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 12
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import importlib
9
import inspect
10
import math
11
import re
12
13
import transaction
14
from AccessControl import ClassSecurityInfo
15
from bika.lims import api
16
from bika.lims import bikaMessageFactory as _
17
from bika.lims.api import get_object_by_uid
18
from bika.lims.browser.fields import InterimFieldsField
19
from bika.lims.browser.fields.uidreferencefield import UIDReferenceField
20
from bika.lims.browser.fields.uidreferencefield import get_backreferences
21
from bika.lims.browser.widgets import RecordsWidget as BikaRecordsWidget
22
from bika.lims.config import PROJECTNAME
23
from bika.lims.content.bikaschema import BikaSchema
24
from bika.lims.interfaces import IDeactivable
25
from bika.lims.interfaces.calculation import ICalculation
26
from Products.Archetypes.atapi import BaseFolder
27
from Products.Archetypes.atapi import ReferenceWidget
28
from Products.Archetypes.atapi import Schema
29
from Products.Archetypes.atapi import TextAreaWidget
30
from Products.Archetypes.atapi import TextField
31
from Products.Archetypes.atapi import registerType
32
from Products.ATContentTypes.lib.historyaware import HistoryAwareMixin
33
from Products.ATExtensions.field import RecordsField
34
from Products.CMFCore.utils import getToolByName
35
from Products.CMFCore.WorkflowCore import WorkflowException
36
from Products.CMFPlone.utils import safe_unicode
37
from zope.interface import implements
38
39
40
schema = BikaSchema.copy() + Schema((
41
42
    InterimFieldsField(
43
        'InterimFields',
44
        widget=BikaRecordsWidget(
45
            label=_("Calculation Interim Fields"),
46
            description=_(
47
                "Define interim fields such as vessel mass, dilution factors, "
48
                "should your calculation require them. The field title "
49
                "specified here will be used as column headers and field "
50
                "descriptors where the interim fields are displayed. If "
51
                "'Apply wide' is enabled the field will be shown in a "
52
                "selection box on the top of the worksheet, allowing to apply "
53
                "a specific value to all the corresponding fields on the "
54
                "sheet."),
55
        )
56
    ),
57
58
    UIDReferenceField(
59
        'DependentServices',
60
        required=1,
61
        multiValued=1,
62
        allowed_types=('AnalysisService',),
63
        widget=ReferenceWidget(
64
            checkbox_bound=0,
65
            visible=False,
66
            label=_("Dependent Analyses"),
67
        ),
68
    ),
69
70
    RecordsField(
71
        'PythonImports',
72
        required=False,
73
        subfields=('module', 'function'),
74
        subfield_labels={'module': _('Module'), 'function': _('Function')},
75
        subfield_readonly={'module': False, 'function': False},
76
        subfield_types={'module': 'string', 'function': 'string'},
77
        default=[
78
            {'module': 'math', 'function': 'ceil'},
79
            {'module': 'math', 'function': 'floor'},
80
        ],
81
        subfield_validators={
82
            'module': 'importvalidator',
83
        },
84
        widget=BikaRecordsWidget(
85
            label=_("Additional Python Libraries"),
86
            description=_(
87
                "If your formula needs a special function from an external "
88
                "Python library, you can import it here. E.g. if you want to "
89
                "use the 'floor' function from the Python 'math' module, "
90
                "you add 'math' to the Module field and 'floor' to the "
91
                "function field. The equivalent in Python would be 'from math "
92
                "import floor'. In your calculation you could use then "
93
                "'floor([Ca] + [Mg])'. "
94
            ),
95
            allowDelete=True,
96
        ),
97
    ),
98
99
    TextField(
100
        'Formula',
101
        validators=('formulavalidator',),
102
        default_content_type='text/plain',
103
        allowable_content_types=('text/plain',),
104
        widget=TextAreaWidget(
105
            label=_("Calculation Formula"),
106
            description=_(
107
                "<p>The formula you type here will be dynamically calculated "
108
                "when an analysis using this calculation is displayed.</p>"
109
                "<p>To enter a Calculation, use standard maths operators,  "
110
                "+ - * / ( ), and all keywords available, both from other "
111
                "Analysis Services and the Interim Fields specified here, "
112
                "as variables. Enclose them in square brackets [ ].</p>"
113
                "<p>E.g, the calculation for Total Hardness, the total of "
114
                "Calcium (ppm) and Magnesium (ppm) ions in water, is entered "
115
                "as [Ca] + [Mg], where Ca and MG are the keywords for those "
116
                "two Analysis Services.</p>"),
117
        )
118
    ),
119
120
    RecordsField(
121
        'TestParameters',
122
        required=False,
123
        subfields=('keyword', 'value'),
124
        subfield_labels={'keyword': _('Keyword'), 'value': _('Value')},
125
        subfield_readonly={'keyword': True, 'value': False},
126
        subfield_types={'keyword': 'string', 'value': 'float'},
127
        default=[{'keyword': '', 'value': 0}],
128
        widget=BikaRecordsWidget(
129
            label=_("Test Parameters"),
130
            description=_("To test the calculation, enter values here for all "
131
                          "calculation parameters.  This includes Interim "
132
                          "fields defined above, as well as any services that "
133
                          "this calculation depends on to calculate results."),
134
            allowDelete=False,
135
        ),
136
    ),
137
138
    TextField(
139
        'TestResult',
140
        default_content_type='text/plain',
141
        allowable_content_types=('text/plain',),
142
        widget=TextAreaWidget(
143
            label=_('Test Result'),
144
            description=_("The result after the calculation has taken place "
145
                          "with test values.  You will need to save the "
146
                          "calculation before this value will be calculated."),
147
        )
148
    ),
149
150
))
151
152
schema['title'].widget.visible = True
153
schema['description'].widget.visible = True
154
155
156
class Calculation(BaseFolder, HistoryAwareMixin):
157
    """Calculation for Analysis Results
158
    """
159
    implements(ICalculation, IDeactivable)
160
161
    security = ClassSecurityInfo()
162
    displayContentsTab = False
163
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
164
165
    _at_rename_after_creation = True
166
167
    def _renameAfterCreation(self, check_auto_id=False):
168
        from bika.lims.idserver import renameAfterCreation
169
        renameAfterCreation(self)
170
171
    def setInterimFields(self, value):
172
        new_value = []
173
174
        for x in range(len(value)):
175
            row = dict(value[x])
176
            keys = row.keys()
177
            if 'value' not in keys:
178
                row['value'] = 0
179
            new_value.append(row)
180
181
        # extract the keywords from the new calculation interims
182
        calculation_interim_keys = map(lambda i: i.get("keyword"), value)
183
184
        # update all service interims
185
        for service in self.getCalculationDependants():
186
            # get the interims of the dependant service
187
            service_interims = service.getInterimFields()
188
            # extract the keywords from the service interims
189
            service_interim_keys = map(lambda i: i.get("keyword"),
190
                                       service_interims)
191
            # sync new interims from the calculation -> service
192
            new_interims = set(calculation_interim_keys).difference(
193
                set(service_interim_keys))
194
            for key in new_interims:
195
                new_interim = value[calculation_interim_keys.index(key)]
196
                service_interims.append(new_interim)
197
            if new_interims:
198
                service.setInterimFields(service_interims)
199
200
        self.getField('InterimFields').set(self, new_value)
201
202
    def setFormula(self, Formula=None):
203
        """Set the Dependent Services from the text of the calculation Formula
204
        """
205
        bsc = getToolByName(self, 'bika_setup_catalog')
206
        if Formula is None:
207
            self.setDependentServices(None)
208
            self.getField('Formula').set(self, Formula)
209
        else:
210
            keywords = re.compile(r"\[([^.^\]]+)\]").findall(Formula)
211
            brains = bsc(portal_type='AnalysisService',
212
                         getKeyword=keywords)
213
            services = [brain.getObject() for brain in brains]
214
            self.getField('DependentServices').set(self, services)
215
            self.getField('Formula').set(self, Formula)
216
217
    def getMinifiedFormula(self):
218
        """Return the current formula value as text.
219
        The result will have newlines and additional spaces stripped out.
220
        """
221
        value = " ".join(self.getFormula().splitlines())
222
        return value
223
224
    def getCalculationDependencies(self, flat=False, deps=None):
225
        """ Recursively calculates all dependencies of this calculation.
226
            The return value is dictionary of dictionaries (of dictionaries...)
227
228
            {service_UID1:
229
                {service_UID2:
230
                    {service_UID3: {},
231
                     service_UID4: {},
232
                    },
233
                },
234
            }
235
236
            set flat=True to get a simple list of AnalysisService objects
237
        """
238
        if deps is None:
239
            deps = [] if flat is True else {}
240
241
        for service in self.getDependentServices():
242
            calc = service.getCalculation()
243
            if calc:
244
                calc.getCalculationDependencies(flat, deps)
245
            if flat:
246
                deps.append(service)
247
            else:
248
                deps[service.UID()] = {}
249
        return deps
250
251
    def getCalculationDependants(self, deps=None):
252
        """Return a flat list of services who depend on this calculation.
253
254
        This refers only to services who's Calculation UIDReferenceField have
255
        the value set to point to this calculation.
256
257
        It has nothing to do with the services referenced in the calculation's
258
        Formula.
259
        """
260
        if deps is None:
261
            deps = []
262
        backrefs = get_backreferences(self, 'AnalysisServiceCalculation')
263
        services = map(get_object_by_uid, backrefs)
264
        for service in services:
265
            calc = service.getCalculation()
266
            if calc and calc.UID() != self.UID():
267
                calc.getCalculationDependants(deps)
268
            deps.append(service)
269
        return deps
270
271
    def setTestParameters(self, form_value):
272
        """This is called from the objectmodified subscriber, to ensure
273
        correct population of the test-parameters field.
274
        It collects Keywords for all services that are direct dependencies of
275
        this calculatioin, and all of this calculation's InterimFields,
276
        and gloms them together.
277
        """
278
        params = []
279
280
        # Set default/existing values for InterimField keywords
281
        for interim in self.getInterimFields():
282
            keyword = interim.get('keyword')
283
            ex = [x.get('value') for x in form_value if
284
                  x.get('keyword') == keyword]
285
            params.append({'keyword': keyword,
286
                           'value': ex[0] if ex else interim.get('value')})
287
        # Set existing/blank values for service keywords
288
        for service in self.getDependentServices():
289
            keyword = service.getKeyword()
290
            ex = [x.get('value') for x in form_value if
291
                  x.get('keyword') == keyword]
292
            params.append({'keyword': keyword,
293
                           'value': ex[0] if ex else ''})
294
        self.Schema().getField('TestParameters').set(self, params)
295
296
    # noinspection PyUnusedLocal
297
    def setTestResult(self, form_value):
298
        """Calculate formula with TestParameters and enter result into
299
         TestResult field.
300
        """
301
        # Create mapping from TestParameters
302
        mapping = {x['keyword']: x['value'] for x in self.getTestParameters()}
303
        # Gather up and parse formula
304
        formula = self.getMinifiedFormula()
305
        test_result_field = self.Schema().getField('TestResult')
306
307
        # Flush the TestResult field and return
308
        if not formula:
309
            return test_result_field.set(self, "")
310
311
        formula = formula.replace('[', '{').replace(']', '}').replace('  ', '')
312
        result = 'Failure'
313
314
        try:
315
            # print "pre: {}".format(formula)
316
            formula = formula.format(**mapping)
317
            # print "formatted: {}".format(formula)
318
            result = eval(formula, self._getGlobals())
319
            # print "result: {}".format(result)
320
        except TypeError as e:
321
            # non-numeric arguments in interim mapping?
322
            result = "TypeError: {}".format(str(e.args[0]))
323
        except ZeroDivisionError as e:
324
            result = "Division by 0: {}".format(str(e.args[0]))
325
        except KeyError as e:
326
            result = "Key Error: {}".format(str(e.args[0]))
327
        except ImportError as e:
328
            result = "Import Error: {}".format(str(e.args[0]))
329
        except Exception as e:
330
            result = "Unspecified exception: {}".format(str(e.args[0]))
331
        test_result_field.set(self, str(result))
332
333
    def _getGlobals(self, **kwargs):
334
        """Return the globals dictionary for the formula calculation
335
        """
336
        # Default globals
337
        globs = {
338
            "__builtins__": None,
339
            "all": all,
340
            "any": any,
341
            "bool": bool,
342
            "chr": chr,
343
            "cmp": cmp,
344
            "complex": complex,
345
            "divmod": divmod,
346
            "enumerate": enumerate,
347
            "float": float,
348
            "format": format,
349
            "frozenset": frozenset,
350
            "hex": hex,
351
            "int": int,
352
            "len": len,
353
            "list": list,
354
            "long": long,
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'long'
Loading history...
355
            "math": math,
356
            "max": max,
357
            "min": min,
358
            "oct": oct,
359
            "ord": ord,
360
            "pow": pow,
361
            "range": range,
362
            "reversed": reversed,
363
            "round": round,
364
            "str": str,
365
            "sum": sum,
366
            "tuple": tuple,
367
            "xrange": xrange,
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
368
        }
369
        # Update with keyword arguments
370
        globs.update(kwargs)
371
        # Update with additional Python libraries
372
        for imp in self.getPythonImports():
373
            mod = imp["module"]
374
            func = imp["function"]
375
            member = self._getModuleMember(mod, func)
376
            if member is None:
377
                raise ImportError(
378
                    "Could not find member {} of module {}".format(
379
                        func, mod))
380
            globs[func] = member
381
        return globs
382
383
    def _getModuleMember(self, dotted_name, member):
384
        """Get the member object of a module.
385
386
        :param dotted_name: The dotted name of the module, e.g. 'scipy.special'
387
        :type dotted_name: string
388
        :param member: The name of the member function, e.g. 'gammaincinv'
389
        :type member: string
390
        :returns: member object or None
391
        """
392
        try:
393
            mod = importlib.import_module(dotted_name)
394
        except ImportError:
395
            return None
396
397
        members = dict(inspect.getmembers(mod))
398
        return members.get(member)
399
400
    def workflow_script_activate(self):
401
        pu = getToolByName(self, 'plone_utils')
402
        # A calculation cannot be re-activated if services it depends on
403
        # are deactivated.
404
        services = self.getDependentServices()
405
        inactive_services = []
406
        for service in services:
407
            if not api.is_active(service):
408
                inactive_services.append(service.Title())
409
        if inactive_services:
410
            msg = _("Cannot activate calculation, because the following "
411
                    "service dependencies are inactive: ${inactive_services}",
412
                    mapping={'inactive_services': safe_unicode(
413
                        ", ".join(inactive_services))})
414
            pu.addPortalMessage(msg, 'error')
415
            transaction.get().abort()
416
            raise WorkflowException
417
418
    def workflow_script_deactivate(self):
419
        bsc = getToolByName(self, 'bika_setup_catalog')
420
        pu = getToolByName(self, 'plone_utils')
421
        # A calculation cannot be deactivated if active services are using it.
422
        services = bsc(portal_type="AnalysisService", is_active=True)
423
        calc_services = []
424
        for service in services:
425
            service = service.getObject()
426
            calc = service.getCalculation()
427
            if calc and calc.UID() == self.UID():
428
                calc_services.append(service.Title())
429
        if calc_services:
430
            msg = _(
431
                'Cannot deactivate calculation, because it is in use by the '
432
                'following services: ${calc_services}',
433
                mapping={
434
                    'calc_services': safe_unicode(", ".join(calc_services))})
435
            pu.addPortalMessage(msg, 'error')
436
            transaction.get().abort()
437
            raise WorkflowException
438
439
440
registerType(Calculation, PROJECTNAME)
441