Passed
Pull Request — 2.x (#1873)
by Jordi
08:36 queued 03:21
created

senaite.core.schema.uidreferencefield   F

Complexity

Total Complexity 61

Size/Duplication

Total Lines 349
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 61
eloc 183
dl 0
loc 349
rs 3.52
c 0
b 0
f 0

3 Functions

Rating   Name   Duplication   Size   Complexity  
B on_object_created() 0 29 6
A get_backref_storage() 0 7 2
A get_backrefs() 0 18 3

14 Methods

Rating   Name   Duplication   Size   Complexity  
A UIDReferenceField.get_raw() 0 10 1
A UIDReferenceField.__init__() 0 6 2
A UIDReferenceField.to_list() 0 12 5
A UIDReferenceField.get_object() 0 9 2
A UIDReferenceField.link_backref() 0 29 5
B UIDReferenceField._validate() 0 21 7
A UIDReferenceField.is_initializing() 0 14 2
B UIDReferenceField.unlink_backref() 0 33 5
A UIDReferenceField.get_relationship_key() 0 10 1
B UIDReferenceField.set() 0 43 6
A UIDReferenceField.get_allowed_types() 0 11 3
A UIDReferenceField.get() 0 7 1
A UIDReferenceField.get_uid() 0 10 2
B UIDReferenceField._get() 0 28 8

How to fix   Complexity   

Complexity

Complex classes like senaite.core.schema.uidreferencefield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from Acquisition import aq_base
6
from bika.lims import api
7
from persistent.dict import PersistentDict
8
from persistent.list import PersistentList
9
from plone.behavior.interfaces import IBehavior
10
from plone.uuid.interfaces import ATTRIBUTE_NAME
11
from senaite.core import logger
12
from senaite.core.interfaces import IHaveUIDReferences
13
from senaite.core.schema.fields import BaseField
14
from senaite.core.schema.interfaces import IUIDReferenceField
15
from zope.annotation.interfaces import IAnnotations
16
from zope.component import adapter
17
from zope.interface import alsoProvides
18
from zope.interface import implementer
19
from zope.lifecycleevent.interfaces import IObjectCreatedEvent
20
from zope.schema import ASCIILine
21
from zope.schema import List
22
23
BACKREFS_STORAGE = "senaite.core.schema.uidreferencefield.backreferences"
24
25
26
@adapter(IHaveUIDReferences, IObjectCreatedEvent)
27
def on_object_created(object, event):
28
    """Link backreferences after the object was created
29
30
    This is necessary, because objects during initialization have no UID set!
31
    https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
32
33
    Therefore, and only for the creation case, we need to link the back
34
    references in this handler for all UID reference fields.
35
    """
36
    fields = api.get_fields(object)
37
    for name, field in fields.items():
38
        if not IUIDReferenceField.providedBy(field):
39
            continue
40
        # after creation, we only need to link backreferences
41
        value = field.get(object)
42
        # handle single valued reference fields
43
        if api.is_object(value):
44
            field.link_backref(value, object)
45
            logger.info(
46
                "Adding back reference from %s -> %s" % (
47
                    value, object))
48
        # handle multi valued reference fields
49
        elif isinstance(value, list):
50
            for ref in value:
51
                logger.info(
52
                    "Adding back reference from %s -> %s" % (
53
                        ref, object))
54
                field.link_backref(ref, object)
55
56
57
def get_backrefs(context, relationship, as_objects=False):
58
    """Return backreferences of the context
59
60
    :returns: List of UIDs that are linked by the relationship
61
    """
62
    context = aq_base(context)
63
    # get the backref annotation storage of the context
64
    backrefs = get_backref_storage(context)
65
    # get the referenced UIDs
66
    backref_uids = list(backrefs.get(relationship, []))
67
68
    if not backref_uids:
69
        return []
70
71
    if as_objects is True:
72
        return [api.get_object(uid) for uid in backref_uids]
73
74
    return backref_uids
75
76
77
def get_backref_storage(context):
78
    """Get the annotation storage for backreferences of the context
79
    """
80
    annotation = IAnnotations(context)
81
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
82
        annotation[BACKREFS_STORAGE] = PersistentDict()
83
    return annotation[BACKREFS_STORAGE]
84
85
86
@implementer(IUIDReferenceField)
87
class UIDReferenceField(List, BaseField):
88
    """Stores UID references to other objects
89
    """
90
91
    value_type = ASCIILine(title=u"UID")
92
93
    def __init__(self, allowed_types=None, multi_valued=True, **kw):
94
        if allowed_types is None:
95
            allowed_types = ()
96
        self.allowed_types = allowed_types
97
        self.multi_valued = multi_valued
98
        super(UIDReferenceField, self).__init__(**kw)
99
100
    def is_initializing(self, object):
101
        """Checks if the object is initialized at the moment
102
103
        Background:
104
105
        Objects being created do not have a UID set.
106
        Therefore, ths backreferences need to be postponed to an event handler.
107
108
        https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
109
        """
110
        uid = getattr(object, ATTRIBUTE_NAME, None)
111
        if uid is None:
112
            return True
113
        return False
114
115
    def to_list(self, value, filter_empty=True):
116
        """Ensure the value is a list
117
        """
118
        if isinstance(value, six.string_types):
119
            value = [value]
120
        elif api.is_object(value):
121
            value = [value]
122
        elif value is None:
123
            value = []
124
        if filter_empty:
125
            value = filter(None, value)
126
        return value
127
128
    def get_relationship_key(self, context):
129
        """Relationship key used for backreferences
130
131
        The key used for the annotation storage on the referenced object to
132
        remember the current object UID.
133
134
        :returns: storage key to lookup back references
135
        """
136
        portal_type = api.get_portal_type(context)
137
        return "%s.%s" % (portal_type, self.__name__)
138
139
    def get_uid(self, value):
140
        """Value -> UID
141
142
        :parm value: object/UID/SuperModel
143
        :returns: UID
144
        """
145
        try:
146
            return api.get_uid(value)
147
        except api.APIError:
148
            return None
149
150
    def get_object(self, value):
151
        """Value -> object
152
153
        :returns: Object or None
154
        """
155
        try:
156
            return api.get_object(value)
157
        except api.APIError:
158
            return None
159
160
    def get_allowed_types(self):
161
        """Returns the allowed reference types
162
163
        :returns: tuple of allowed_types
164
        """
165
        allowed_types = self.allowed_types
166
        if not allowed_types:
167
            allowed_types = ()
168
        elif isinstance(allowed_types, six.string_types):
169
            allowed_types = (allowed_types, )
170
        return allowed_types
171
172
    def set(self, object, value):
173
        """Set UID reference
174
175
        :param object: the instance of the field
176
        :param value: object/UID/SuperModel
177
        :type value: list/tuple/str
178
        """
179
180
        # always mark the object if references are set
181
        # NOTE: there might be multiple UID reference field set on this object!
182
        if value:
183
            alsoProvides(object, IHaveUIDReferences)
184
185
        # always handle all values internally as a list
186
        value = self.to_list(value)
187
188
        # convert to UIDs
189
        uids = []
190
        for v in value:
191
            uid = self.get_uid(v)
192
            if uid is None:
193
                continue
194
            uids.append(uid)
195
196
        # current set UIDs
197
        existing = self.to_list(self.get_raw(object))
198
199
        # filter out new/removed UIDs
200
        added_uids = [u for u in uids if u not in existing]
201
        added_objs = filter(None, map(self.get_object, added_uids))
202
203
        removed_uids = [u for u in existing if u not in uids]
204
        removed_objs = filter(None, map(self.get_object, removed_uids))
205
206
        # link backreferences of new uids
207
        for added_obj in added_objs:
208
            self.link_backref(added_obj, object)
209
210
        # unlink backreferences of removed UIDs
211
        for removed_obj in removed_objs:
212
            self.unlink_backref(removed_obj, object)
213
214
        super(UIDReferenceField, self).set(object, uids)
215
216
    def unlink_backref(self, source, target):
217
        """Remove backreference from the source to the target
218
219
        :param source: the object where the backref is stored (our reference)
220
        :param target: the object where the backref points to (our object)
221
        :returns: True when the backref was removed, False otherwise
222
        """
223
224
        # Target might be a behavior instead of the object itself
225
        if IBehavior.providedBy(target):
226
            target = target.context
227
228
        # This should be actually not possible
229
        if self.is_initializing(target):
230
            raise ValueError("Objects in initialization state "
231
                             "can not have existing back references!")
232
233
        target_uid = self.get_uid(target)
234
        # get the storage key
235
        key = self.get_relationship_key(target)
236
        # get all backreferences from the source
237
        backrefs = get_backref_storage(source)
238
        if key not in backrefs:
239
            logger.warn(
240
                "Referenced object {} has no backreferences for the key {}"
241
                .format(repr(source), key))
242
            return False
243
        if target_uid not in backrefs[key]:
244
            logger.warn("Target {} was not linked by {}"
245
                        .format(repr(target), repr(source)))
246
            return False
247
        backrefs[key].remove(target_uid)
248
        return True
249
250
    def link_backref(self, source, target):
251
        """Add backreference from the source to the target
252
253
        :param source: the object where the backref is stored (our reference)
254
        :param target: the object where the backref points to (our object)
255
        :returns: True when the backref was written
256
        """
257
258
        # Target might be a behavior instead of the object itself
259
        if IBehavior.providedBy(target):
260
            target = target.context
261
262
        # Object is initializing and don't have an UID!
263
        # -> Postpone to set back references in event handler
264
        if self.is_initializing(target):
265
            logger.info("Object is in initialization state. "
266
                        "Back references will be set in event handler")
267
            return
268
269
        target_uid = api.get_uid(target)
270
        # get the annotation storage key
271
        key = self.get_relationship_key(target)
272
        # get all backreferences
273
        backrefs = get_backref_storage(source)
274
        if key not in backrefs:
275
            backrefs[key] = PersistentList()
276
        if target_uid not in backrefs[key]:
277
            backrefs[key].append(target_uid)
278
        return True
279
280
    def get(self, object):
281
        """Get referenced objects
282
283
        :param object: instance of the field
284
        :returns: list of referenced objects
285
        """
286
        return self._get(object, as_objects=True)
287
288
    def get_raw(self, object):
289
        """Get referenced UIDs
290
291
        NOTE: Called from the data manager `query` method
292
              to get the widget value
293
294
        :param object: instance of the field
295
        :returns: list of referenced UIDs
296
        """
297
        return self._get(object, as_objects=False)
298
299
    def _get(self, object, as_objects=False):
300
        """Returns single/multi value
301
302
        :param object: instance of the field
303
        :param as_objects: Flag for UID/object returns
304
        :returns: list of referenced UIDs
305
        """
306
307
        # when creating a new object the context is the container
308
        # which does not have the field
309
        if self.interface and not self.interface.providedBy(object):
310
            if self.multi_valued:
311
                return []
312
            return None
313
314
        uids = super(UIDReferenceField, self).get(object)
315
316
        if not uids:
317
            uids = []
318
319
        if as_objects is True:
320
            uids = filter(None, map(self.get_object, uids))
321
322
        if self.multi_valued:
323
            return uids
324
        if len(uids) == 0:
325
            return None
326
        return uids[0]
327
328
    def _validate(self, value):
329
        """Validator when called from form submission
330
        """
331
        super(UIDReferenceField, self)._validate(value)
332
        # check if the fields accepts single values only
333
        if not self.multi_valued and len(value) > 1:
334
            raise ValueError("Single valued field accepts at most 1 value")
335
336
        # check for valid UIDs
337
        for uid in value:
338
            if not api.is_uid(uid):
339
                raise ValueError("Invalid UID: '%s'" % uid)
340
341
        # check if the type is allowed
342
        allowed_types = self.get_allowed_types()
343
        if allowed_types:
344
            objs = filter(None, map(self.get_object, value))
345
            types = set(map(api.get_portal_type, objs))
346
            if not types.issubset(allowed_types):
347
                raise ValueError("Only the following types are allowed: %s"
348
                                 % ",".join(allowed_types))
349