Passed
Push — 2.x ( 6a1f51...1afad9 )
by Jordi
05:47
created

senaite.core.schema.uidreferencefield   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 340
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 59
eloc 178
dl 0
loc 340
rs 4.08
c 0
b 0
f 0

3 Functions

Rating   Name   Duplication   Size   Complexity  
B on_object_created() 0 29 6
A get_backref_storage() 0 7 2
A get_backrefs() 0 18 3

14 Methods

Rating   Name   Duplication   Size   Complexity  
A UIDReferenceField.__init__() 0 6 2
A UIDReferenceField.is_initializing() 0 14 2
A UIDReferenceField.get_raw() 0 10 1
A UIDReferenceField.to_list() 0 12 5
A UIDReferenceField.get_object() 0 9 2
A UIDReferenceField.link_backref() 0 25 4
B UIDReferenceField._validate() 0 21 7
A UIDReferenceField.unlink_backref() 0 29 4
A UIDReferenceField.get_relationship_key() 0 10 1
B UIDReferenceField.set() 0 43 6
A UIDReferenceField.get_allowed_types() 0 11 3
A UIDReferenceField.get() 0 7 1
A UIDReferenceField.get_uid() 0 10 2
B UIDReferenceField._get() 0 28 8

How to fix   Complexity   

Complexity

Complex classes like senaite.core.schema.uidreferencefield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from Acquisition import aq_base
6
from bika.lims import api
7
from persistent.dict import PersistentDict
8
from persistent.list import PersistentList
9
from plone.uuid.interfaces import ATTRIBUTE_NAME
10
from senaite.core import logger
11
from senaite.core.interfaces import IHaveUIDReferences
12
from senaite.core.schema.fields import BaseField
13
from senaite.core.schema.interfaces import IUIDReferenceField
14
from zope.annotation.interfaces import IAnnotations
15
from zope.component import adapter
16
from zope.interface import alsoProvides
17
from zope.interface import implementer
18
from zope.lifecycleevent.interfaces import IObjectCreatedEvent
19
from zope.schema import ASCIILine
20
from zope.schema import List
21
22
BACKREFS_STORAGE = "senaite.core.schema.uidreferencefield.backreferences"
23
24
25
@adapter(IHaveUIDReferences, IObjectCreatedEvent)
26
def on_object_created(object, event):
27
    """Link backreferences after the object was created
28
29
    This is necessary, because objects during initialization have no UID set!
30
    https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
31
32
    Therefore, and only for the creation case, we need to link the back
33
    references in this handler for all UID reference fields.
34
    """
35
    fields = api.get_fields(object)
36
    for name, field in fields.items():
37
        if not IUIDReferenceField.providedBy(field):
38
            continue
39
        # after creation, we only need to link backreferences
40
        value = field.get(object)
41
        # handle single valued reference fields
42
        if api.is_object(value):
43
            field.link_backref(value, object)
44
            logger.info(
45
                "Adding back reference from %s -> %s" % (
46
                    value, object))
47
        # handle multi valued reference fields
48
        elif isinstance(value, list):
49
            for ref in value:
50
                logger.info(
51
                    "Adding back reference from %s -> %s" % (
52
                        ref, object))
53
                field.link_backref(ref, object)
54
55
56
def get_backrefs(context, relationship, as_objects=False):
57
    """Return backreferences of the context
58
59
    :returns: List of UIDs that are linked by the relationship
60
    """
61
    context = aq_base(context)
62
    # get the backref annotation storage of the context
63
    backrefs = get_backref_storage(context)
64
    # get the referenced UIDs
65
    backref_uids = list(backrefs.get(relationship, []))
66
67
    if not backref_uids:
68
        return []
69
70
    if as_objects is True:
71
        return [api.get_object(uid) for uid in backref_uids]
72
73
    return backref_uids
74
75
76
def get_backref_storage(context):
77
    """Get the annotation storage for backreferences of the context
78
    """
79
    annotation = IAnnotations(context)
80
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
81
        annotation[BACKREFS_STORAGE] = PersistentDict()
82
    return annotation[BACKREFS_STORAGE]
83
84
85
@implementer(IUIDReferenceField)
86
class UIDReferenceField(List, BaseField):
87
    """Stores UID references to other objects
88
    """
89
90
    value_type = ASCIILine(title=u"UID")
91
92
    def __init__(self, allowed_types=None, multi_valued=True, **kw):
93
        if allowed_types is None:
94
            allowed_types = ()
95
        self.allowed_types = allowed_types
96
        self.multi_valued = multi_valued
97
        super(UIDReferenceField, self).__init__(**kw)
98
99
    def is_initializing(self, object):
100
        """Checks if the object is initialized at the moment
101
102
        Background:
103
104
        Objects being created do not have a UID set.
105
        Therefore, ths backreferences need to be postponed to an event handler.
106
107
        https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
108
        """
109
        uid = getattr(object, ATTRIBUTE_NAME, None)
110
        if uid is None:
111
            return True
112
        return False
113
114
    def to_list(self, value, filter_empty=True):
115
        """Ensure the value is a list
116
        """
117
        if isinstance(value, six.string_types):
118
            value = [value]
119
        elif api.is_object(value):
120
            value = [value]
121
        elif value is None:
122
            value = []
123
        if filter_empty:
124
            value = filter(None, value)
125
        return value
126
127
    def get_relationship_key(self, context):
128
        """Relationship key used for backreferences
129
130
        The key used for the annotation storage on the referenced object to
131
        remember the current object UID.
132
133
        :returns: storage key to lookup back references
134
        """
135
        portal_type = api.get_portal_type(context)
136
        return "%s.%s" % (portal_type, self.__name__)
137
138
    def get_uid(self, value):
139
        """Value -> UID
140
141
        :parm value: object/UID/SuperModel
142
        :returns: UID
143
        """
144
        try:
145
            return api.get_uid(value)
146
        except api.APIError:
147
            return None
148
149
    def get_object(self, value):
150
        """Value -> object
151
152
        :returns: Object or None
153
        """
154
        try:
155
            return api.get_object(value)
156
        except api.APIError:
157
            return None
158
159
    def get_allowed_types(self):
160
        """Returns the allowed reference types
161
162
        :returns: tuple of allowed_types
163
        """
164
        allowed_types = self.allowed_types
165
        if not allowed_types:
166
            allowed_types = ()
167
        elif isinstance(allowed_types, six.string_types):
168
            allowed_types = (allowed_types, )
169
        return allowed_types
170
171
    def set(self, object, value):
172
        """Set UID reference
173
174
        :param object: the instance of the field
175
        :param value: object/UID/SuperModel
176
        :type value: list/tuple/str
177
        """
178
179
        # always mark the object if references are set
180
        # NOTE: there might be multiple UID reference field set on this object!
181
        if value:
182
            alsoProvides(object, IHaveUIDReferences)
183
184
        # always handle all values internally as a list
185
        value = self.to_list(value)
186
187
        # convert to UIDs
188
        uids = []
189
        for v in value:
190
            uid = self.get_uid(v)
191
            if uid is None:
192
                continue
193
            uids.append(uid)
194
195
        # current set UIDs
196
        existing = self.to_list(self.get_raw(object))
197
198
        # filter out new/removed UIDs
199
        added_uids = [u for u in uids if u not in existing]
200
        added_objs = filter(None, map(self.get_object, added_uids))
201
202
        removed_uids = [u for u in existing if u not in uids]
203
        removed_objs = filter(None, map(self.get_object, removed_uids))
204
205
        # link backreferences of new uids
206
        for added_obj in added_objs:
207
            self.link_backref(added_obj, object)
208
209
        # unlink backreferences of removed UIDs
210
        for removed_obj in removed_objs:
211
            self.unlink_backref(removed_obj, object)
212
213
        super(UIDReferenceField, self).set(object, uids)
214
215
    def unlink_backref(self, source, target):
216
        """Remove backreference from the source to the target
217
218
        :param source: the object where the backref is stored (our reference)
219
        :param target: the object where the backref points to (our object)
220
        :returns: True when the backref was removed, False otherwise
221
        """
222
223
        # This should be actually not possible
224
        if self.is_initializing(target):
225
            raise ValueError("Objects in initialization state "
226
                             "can not have existing back references!")
227
228
        target_uid = self.get_uid(target)
229
        # get the storage key
230
        key = self.get_relationship_key(target)
231
        # get all backreferences from the source
232
        backrefs = get_backref_storage(source)
233
        if key not in backrefs:
234
            logger.warn(
235
                "Referenced object {} has no backreferences for the key {}"
236
                .format(repr(source), key))
237
            return False
238
        if target_uid not in backrefs[key]:
239
            logger.warn("Target {} was not linked by {}"
240
                        .format(repr(target), repr(source)))
241
            return False
242
        backrefs[key].remove(target_uid)
243
        return True
244
245
    def link_backref(self, source, target):
246
        """Add backreference from the source to the target
247
248
        :param source: the object where the backref is stored (our reference)
249
        :param target: the object where the backref points to (our object)
250
        :returns: True when the backref was written
251
        """
252
253
        # Object is initializing and don't have an UID!
254
        # -> Postpone to set back references in event handler
255
        if self.is_initializing(target):
256
            logger.info("Object is in initialization state. "
257
                        "Back references will be set in event handler")
258
            return
259
260
        target_uid = api.get_uid(target)
261
        # get the annotation storage key
262
        key = self.get_relationship_key(target)
263
        # get all backreferences
264
        backrefs = get_backref_storage(source)
265
        if key not in backrefs:
266
            backrefs[key] = PersistentList()
267
        if target_uid not in backrefs[key]:
268
            backrefs[key].append(target_uid)
269
        return True
270
271
    def get(self, object):
272
        """Get referenced objects
273
274
        :param object: instance of the field
275
        :returns: list of referenced objects
276
        """
277
        return self._get(object, as_objects=True)
278
279
    def get_raw(self, object):
280
        """Get referenced UIDs
281
282
        NOTE: Called from the data manager `query` method
283
              to get the widget value
284
285
        :param object: instance of the field
286
        :returns: list of referenced UIDs
287
        """
288
        return self._get(object, as_objects=False)
289
290
    def _get(self, object, as_objects=False):
291
        """Returns single/multi value
292
293
        :param object: instance of the field
294
        :param as_objects: Flag for UID/object returns
295
        :returns: list of referenced UIDs
296
        """
297
298
        # when creating a new object the context is the container
299
        # which does not have the field
300
        if self.interface and not self.interface.providedBy(object):
301
            if self.multi_valued:
302
                return []
303
            return None
304
305
        uids = super(UIDReferenceField, self).get(object)
306
307
        if not uids:
308
            uids = []
309
310
        if as_objects is True:
311
            uids = filter(None, map(self.get_object, uids))
312
313
        if self.multi_valued:
314
            return uids
315
        if len(uids) == 0:
316
            return None
317
        return uids[0]
318
319
    def _validate(self, value):
320
        """Validator when called from form submission
321
        """
322
        super(UIDReferenceField, self)._validate(value)
323
        # check if the fields accepts single values only
324
        if not self.multi_valued and len(value) > 1:
325
            raise ValueError("Single valued field accepts at most 1 value")
326
327
        # check for valid UIDs
328
        for uid in value:
329
            if not api.is_uid(uid):
330
                raise ValueError("Invalid UID: '%s'" % uid)
331
332
        # check if the type is allowed
333
        allowed_types = self.get_allowed_types()
334
        if allowed_types:
335
            objs = filter(None, map(self.get_object, value))
336
            types = set(map(api.get_portal_type, objs))
337
            if not types.issubset(allowed_types):
338
                raise ValueError("Only the following types are allowed: %s"
339
                                 % ",".join(allowed_types))
340