RecordField.isRequired()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import six
22
from types import DictType
23
from types import ListType
24
from types import StringType
25
from types import StringTypes
26
from types import TupleType
27
28
from AccessControl import ClassSecurityInfo
29
from App.class_init import InitializeClass
30
from bika.lims import api
31
from DateTime import DateTime
32
from Products.Archetypes.atapi import DisplayList
33
from Products.Archetypes.debug import log
34
from Products.Archetypes.Field import ObjectField
35
from Products.Archetypes.Field import decode
36
from Products.Archetypes.Field import encode
37
from Products.Archetypes.Registry import registerField
38
from Products.Archetypes.Registry import registerPropertyType
39
from Products.CMFCore.Expression import Expression
40
from Products.CMFCore.Expression import createExprContext
41
from Products.CMFCore.utils import getToolByName
42
from Products.PythonScripts.standard import html_quote
43
from senaite.core.browser.widgets.recordwidget import RecordWidget
44
45
# we have to define our own validation handling
46
try:
47
    from Products.validation import ValidationChain
48
    from Products.validation import UnknowValidatorError
49
    from Products.validation import FalseValidatorError
50
    from Products.validation.interfaces.IValidator \
51
         import IValidator, IValidationChain
52
    HAS_VALIDATION_CHAIN = 1
53
except ImportError:
54
    HAS_VALIDATION_CHAIN = 0
55
56
57
def providedBy(interface, obj):
58
    if getattr(interface, 'providedBy', None):
59
        return interface.providedBy(obj)
60
    return interface.isImplementedBy(obj)
61
62
63
class RecordField(ObjectField):
64
    """A field that stores a "record" (dictionary-like) construct"""
65
    _properties = ObjectField._properties.copy()
66
    _properties.update({
67
        "type": "record",
68
        "default": {},
69
        "subfields": (),
70
        "subfield_types": {},
71
        "subfield_vocabularies": {},
72
        "subfield_labels": {},
73
        "subfield_sizes": {},
74
        "subfield_widths": {},
75
        "subfield_maxlength": {},
76
        "required_subfields": (),
77
        "subfield_validators": {},
78
        "subfield_conditions": {},
79
        "subfield_descriptions": {},
80
        "innerJoin": ", ",
81
        "outerJoin": ", ",
82
        "widget": RecordWidget,
83
        })
84
85
    security = ClassSecurityInfo()
86
87
    security.declarePublic('getSubfields')
88
    def getSubfields(self):
89
        """the tuple of sub-fields"""
90
        return self.subfields
91
92
    security.declarePublic('getSubfieldType')
93
    def getSubfieldType(self, subfield):
94
        """
95
        optional type declaration
96
        default: string
97
        """
98
        return self.subfield_types.get(subfield, 'string')
99
100
    security.declarePublic('getSubfieldLabel')
101
    def getSubfieldLabel(self, subfield):
102
        """
103
        optional custom label for the subfield
104
        default: the id of the subfield
105
        """
106
        return self.subfield_labels.get(subfield, subfield.capitalize())
107
108
    security.declarePublic('getSubfieldDescription')
109
    def getSubfieldDescription(self, subfield):
110
        """
111
        optional custom description for the subfield
112
        """
113
        return self.subfield_descriptions.get(subfield)
114
115
    def getSubfieldWidth(self, subfield, default=305):
116
        """Returns the optional width for the subfield
117
        """
118
        width = self.subfield_widths.get(subfield)
119
        width = api.to_int(width, 0)
120
        if width > 0:
121
            return width
122
123
        # backwards compatibility: rely on subfield size
124
        # https://github.com/senaite/senaite.core/pull/2504
125
        size = self.getSubfieldSize(subfield, default=-1)
126
        size = api.to_int(size, 0)
127
        if size > 0:
128
            # guess a ratio of 10 between width and size
129
            return int(size * 10)
130
131
        return default
132
133
    def getSubfieldSize(self, subfield, default=40):
134
        """
135
        optional custom size for the subfield
136
        default: 40
137
        only effective for string type subfields
138
        """
139
        return self.subfield_sizes.get(subfield, default)
140
141
    def getSubfieldMaxlength(self, subfield):
142
        """
143
        otional custom maxlength size for the subfield
144
        only effective for string type subfields
145
        """
146
        return self.subfield_maxlength.get(subfield, 40)
147
148
    def isRequired(self,subfield):
149
        """
150
        looks whether subfield is included in the list of required subfields
151
        """
152
        return subfield in self.required_subfields
153
154
    def isSelection(self,subfield):
155
        """select box needed?"""
156
157
        return subfield in self.subfield_vocabularies
158
159
    security.declarePublic('testSubfieldCondition')
160
    def testSubfieldCondition(self, subfield, folder, portal, object):
161
        """Test the subfield condition."""
162
        try:
163
            condition = self.subfield_conditions.get(subfield, None)
164
            if condition is not None:
165
                __traceback_info__ = (folder, portal, object, condition)
166
                ec = createExprContext(folder, portal, object)
167
                return Expression(condition)(ec)
168
            else:
169
                return True
170
        except AttributeError:
171
            return True
172
173
    def getVocabularyFor(self, subfield, instance=None):
174
        """the vocabulary (DisplayList) for the subfield"""
175
        ## XXX rr: couldn't we just rely on the field's
176
        ## Vocabulary method here?
177
        value = None
178
        vocab = self.subfield_vocabularies.get(subfield, None)
179
        if not vocab:
180
            raise AttributeError("No vocabulary found for {}".format(subfield))
181
182
        if isinstance(vocab, DisplayList):
183
            return vocab
184
185
        if type(vocab) in StringTypes:
186
            value = None
187
            method = getattr(self, vocab, None)
188
            if method and callable(method):
189
                value = method(instance)
190
            else:
191
                if instance is not None:
192
                    method = getattr(instance, vocab, None)
193
                    if method and callable(method):
194
                        value = method()
195
            if not isinstance(value, DisplayList):
196
                msg = "{} is not a DisplayList {}".format(value, subfield)
197
                raise TypeError(msg)
198
            return value
199
200
        msg = "{} is not a string or DisplayList for {}".format(vocab, subfield)
201
        raise TypeError(msg)
202
203
    def getViewFor(self, instance, subfield, joinWith=', '):
204
        """
205
        formatted value of the subfield for display
206
        """
207
        raw = self.getRaw(instance).get(subfield,'')
208
        if type(raw) in (type(()), type([])):
209
            raw = joinWith.join(raw)
210
        # Prevent XSS attacks by quoting all user input
211
        raw = html_quote(str(raw))
212
        # this is now very specific
213
        if subfield == 'email':
214
            return self.hideEmail(raw,instance)
215
        if subfield == 'phone':
216
            return self.labelPhone(raw)
217
        if subfield == 'fax':
218
            return self.labelFax(raw)
219
        if subfield == 'homepage':
220
            return '<a href="%s">%s</a>' % (raw, raw)
221
        return raw
222
223
    def getSubfieldViews(self,instance,joinWith=', '):
224
        """
225
        list of subfield views for non-empty subfields
226
        """
227
        result = []
228
        for subfield in self.getSubfields():
229
            view = self.getViewFor(instance,subfield,joinWith)
230
            if view:
231
                result.append(view)
232
        return result
233
234
    # this is really special purpose and in no ways generic
235
    def hideEmail(self,email='',instance=None):
236
	masked = 'email: ' + \
237
                 email.replace('@', ' (at) ').replace('.', ' (dot) ')
238
	membertool = getToolByName(instance,'portal_membership',None)
239
	if membertool is None or membertool.isAnonymousUser():
240
	    return masked
241
	return "<a href='mailto:%s'>%s</a>" % (email,email)
242
243
    def labelPhone(self,phone=''):
244
        return 'phone: ' + phone
245
246
    def labelFax(self,fax=''):
247
        return 'fax: ' + fax
248
249
    # enable also a string representation of a dictionary
250
    # to be passed in (external edit may need this)
251
    # store string values as unicode
252
253
    def set(self, instance, value, **kwargs):
254
        if type(value) in StringTypes:
255
            try:
256
                value = eval(value)
257
                # more checks to add?
258
            except Exception: # what to catch here?
259
                pass
260
        value = self._to_dict(value)
261
        value = self._decode_strings(value, instance, **kwargs)
262
        ObjectField.set(self, instance, value, **kwargs)
263
264
    def _to_dict(self, value):
265
        if not isinstance(value, dict) and hasattr(value, 'keys'):
266
            new_value = {}
267
            new_value.update(value)
268
            return new_value
269
        return value
270
271
    def _decode_strings(self, value, instance, **kwargs):
272
        new_value = value
273
        for k, v in value.items():
274
            if isinstance(v, six.string_types):
275
                nv = decode(v, instance, **kwargs)
276
                try:
277
                    new_value[k] = nv
278
                except AttributeError: # Records don't provide __setitem__
279
                    setattr(new_value, k , nv)
280
281
            # convert datetimes
282
            if self.subfield_types.get(k, None) == 'datetime':
283
                try:
284
                    val = DateTime(v)
285
                except Exception:
286
                    val = None
287
288
                new_value[k] = val
289
290
        return new_value
291
292
    # Return strings using the site's encoding
293
294
    def get(self, instance, **kwargs):
295
        value = ObjectField.get(self, instance, **kwargs)
296
        return self._encode_strings(value, instance, **kwargs)
297
298
    def _encode_strings(self, value, instance, **kwargs):
299
        value = value if value else {}
300
        new_value = value
301
        for k, v in value.items():
302
            if isinstance(v, six.text_type):
303
                nv = encode(v, instance, **kwargs)
304
                try:
305
                    new_value[k] = nv
306
                except AttributeError: # Records don't provide __setitem__
307
                    setattr(new_value, k , nv)
308
        return new_value
309
310
    if HAS_VALIDATION_CHAIN:
311
        def _validationLayer(self):
312
            """
313
            Resolve that each validator is in the service. If validator is
314
            not, log a warning.
315
316
            We could replace strings with class refs and keep things impl
317
            the ivalidator in the list.
318
319
            Note: XXX this is not compat with aq_ things like scripts with __call__
320
            """
321
            for subfield in self.getSubfields():
322
                self.subfield_validators[subfield] = self._subfieldValidationLayer(subfield)
323
324
        def _subfieldValidationLayer(self, subfield):
325
            """
326
            for the individual subfields
327
            """
328
            chainname = 'Validator_%s_%s' % (self.getName(), subfield)
329
            current_validators = self.subfield_validators.get(subfield, ())
330
331
            if type(current_validators) is DictType:
332
                msg = "Please use the new syntax with validation chains"
333
                raise NotImplementedError(msg)
334
            elif IValidationChain.providedBy(current_validators):
335
                validators = current_validators
336
            elif IValidator.providedBy(current_validators):
337
                validators = ValidationChain(chainname, validators=current_validators)
338
            elif type(current_validators) in (TupleType, ListType, StringType):
339
                if len(current_validators):
340
                    # got a non empty list or string - create a chain
341
                    try:
342
                        validators = ValidationChain(chainname, validators=current_validators)
343
                    except (UnknowValidatorError, FalseValidatorError) as msg:
344
                        log("WARNING: Disabling validation for %s/%s: %s" % (self.getName(), subfield, msg))
345
                        validators = ()
346
                else:
347
                    validators = ()
348
            else:
349
                log('WARNING: Unknow validation %s. Disabling!' % current_validators)
350
                validators = ()
351
352
            if subfield not in self.required_subfields:
353
                if validators == ():
354
                    validators = ValidationChain(chainname)
355
                if len(validators):
356
                    # insert isEmpty validator at position 0 if first validator
357
                    # is not isEmpty
358
                    if not validators[0][0].name == 'isEmpty':
359
                        validators.insertSufficient('isEmpty')
360
                else:
361
                    validators.insertSufficient('isEmpty')
362
363
            return validators
364
365
        security.declarePublic('validate')
366
        def validate(self, value, instance, errors={}, **kwargs):
367
            """
368
            Validate passed-in value using all subfield validators.
369
            Return None if all validations pass; otherwise, return failed
370
            result returned by validator
371
            """
372
            name = self.getName()
373
            if errors and name in errors:
374
                return True
375
376
            if self.required_subfields:
377
                for subfield in self.required_subfields:
378
                    sub_value = value.get(subfield, None)
379
                    res = self.validate_required(instance, sub_value, errors)
380
                    if res is not None:
381
                        return res
382
383
            # not touched yet
384
            if self.enforceVocabulary:
385
                res = self.validate_vocabulary(instance, value, errors)
386
                if res is not None:
387
                    return res
388
389
            # can this part stay like it is?
390
            res = instance.validate_field(name, value, errors)
391
            if res is not None:
392
                return res
393
394
            # this should work
395
            if self.subfield_validators:
396
                res = self.validate_validators(value, instance, errors, **kwargs)
397
                if res is not True:
398
                    return res
399
400
            # all ok
401
            return None
402
403
        def validate_validators(self, value, instance, errors, **kwargs):
404
            result = True
405
            total = ''
406
            for subfield in self.getSubfields():
407
                subfield_validators = self.subfield_validators.get(subfield, None)
408
                if subfield_validators:
409
                    result = subfield_validators(value.get(subfield),
410
                                                 instance=instance,
411
                                                 errors=errors,
412
                                                 field=self,
413
                                                 **kwargs
414
                                                 )
415
                if result is not True:
416
                    total += result
417
            return total or result
418
419
420
InitializeClass(RecordField)
421
422
423
registerField(RecordField, title="Record", description="")
424
425
registerPropertyType('subfields', 'lines', RecordField)
426
registerPropertyType('required_subfields', 'lines', RecordField)
427
registerPropertyType('subfield_validators', 'mapping', RecordField)
428
registerPropertyType('subfield_types', 'mapping', RecordField)
429
registerPropertyType('subfield_vocabularies', 'mapping', RecordField)
430
registerPropertyType('subfield_labels', 'mapping', RecordField)
431
registerPropertyType('subfield_sizes', 'mapping', RecordField)
432
registerPropertyType('subfield_maxlength', 'mapping', RecordField)
433
registerPropertyType('subfield_widths', 'mapping', RecordField)
434
registerPropertyType('innerJoin', 'string', RecordField)
435
registerPropertyType('outerJoin', 'string', RecordField)
436