Passed
Pull Request — 2.x (#1864)
by Ramon
07:09
created

senaite.core.schema.uidreferencefield   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 329
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 56
eloc 170
dl 0
loc 329
rs 5.5199
c 0
b 0
f 0

3 Functions

Rating   Name   Duplication   Size   Complexity  
B on_object_created() 0 29 6
A get_backref_storage() 0 7 2
A get_backrefs() 0 18 3

13 Methods

Rating   Name   Duplication   Size   Complexity  
A UIDReferenceField.get_raw() 0 10 1
A UIDReferenceField.__init__() 0 6 2
A UIDReferenceField.get_object() 0 9 2
A UIDReferenceField.link_backref() 0 25 4
B UIDReferenceField._validate() 0 21 7
A UIDReferenceField.is_initializing() 0 11 1
A UIDReferenceField.unlink_backref() 0 29 4
A UIDReferenceField.get_relationship_key() 0 10 1
C UIDReferenceField.set() 0 48 9
A UIDReferenceField.get_allowed_types() 0 11 3
A UIDReferenceField.get() 0 7 1
A UIDReferenceField.get_uid() 0 10 2
B UIDReferenceField._get() 0 28 8

How to fix   Complexity   

Complexity

Complex classes like senaite.core.schema.uidreferencefield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from Acquisition import ImplicitAcquisitionWrapper
6
from Acquisition import aq_base
7
from bika.lims import api
8
from persistent.dict import PersistentDict
9
from persistent.list import PersistentList
10
from senaite.core import logger
11
from senaite.core.interfaces import IHaveUIDReferences
12
from senaite.core.schema.fields import BaseField
13
from senaite.core.schema.interfaces import IUIDReferenceField
14
from zope.annotation.interfaces import IAnnotations
15
from zope.component import adapter
16
from zope.interface import alsoProvides
17
from zope.interface import implementer
18
from zope.lifecycleevent.interfaces import IObjectCreatedEvent
19
from zope.schema import ASCIILine
20
from zope.schema import List
21
22
BACKREFS_STORAGE = "senaite.core.schema.uidreferencefield.backreferences"
23
24
25
@adapter(IHaveUIDReferences, IObjectCreatedEvent)
26
def on_object_created(object, event):
27
    """Link backreferences after the object was created
28
29
    This is necessary, because objects during initialization have no UID set!
30
    https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
31
32
    Therefore, and only for the creation case, we need to link the back
33
    references in this handler for all UID reference fields.
34
    """
35
    fields = api.get_fields(object)
36
    for name, field in fields.items():
37
        if not IUIDReferenceField.providedBy(field):
38
            continue
39
        # after creation, we only need to link backreferences
40
        value = field.get(object)
41
        # handle single valued reference fields
42
        if api.is_object(value):
43
            field.link_backref(value, object)
44
            logger.info(
45
                "Adding back reference from %s -> %s" % (
46
                    value, object))
47
        # handle multi valued reference fields
48
        elif isinstance(value, list):
49
            for ref in value:
50
                logger.info(
51
                    "Adding back reference from %s -> %s" % (
52
                        ref, object))
53
                field.link_backref(ref, object)
54
55
56
def get_backrefs(context, relationship, as_objects=False):
57
    """Return backreferences of the context
58
59
    :returns: List of UIDs that are linked by the relationship
60
    """
61
    context = aq_base(context)
62
    # get the backref annotation storage of the context
63
    backrefs = get_backref_storage(context)
64
    # get the referenced UIDs
65
    backref_uids = list(backrefs.get(relationship, []))
66
67
    if not backref_uids:
68
        return []
69
70
    if as_objects is True:
71
        return [api.get_object(uid) for uid in backref_uids]
72
73
    return backref_uids
74
75
76
def get_backref_storage(context):
77
    """Get the annotation storage for backreferences of the context
78
    """
79
    annotation = IAnnotations(context)
80
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
81
        annotation[BACKREFS_STORAGE] = PersistentDict()
82
    return annotation[BACKREFS_STORAGE]
83
84
85
@implementer(IUIDReferenceField)
86
class UIDReferenceField(List, BaseField):
87
    """Stores UID references to other objects
88
    """
89
90
    value_type = ASCIILine(title=u"UID")
91
92
    def __init__(self, allowed_types=None, multi_valued=True, **kw):
93
        if allowed_types is None:
94
            allowed_types = ()
95
        self.allowed_types = allowed_types
96
        self.multi_valued = multi_valued
97
        super(UIDReferenceField, self).__init__(**kw)
98
99
    def is_initializing(self, object):
100
        """Checks if the object is initialized at the moment
101
102
        Background:
103
104
        Objects being created do not have a UID set.
105
        Therefore, ths backreferences need to be postponed to an event handler.
106
107
        https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
108
        """
109
        return isinstance(object, ImplicitAcquisitionWrapper)
110
111
    def get_relationship_key(self, context):
112
        """Relationship key used for backreferences
113
114
        The key used for the annotation storage on the referenced object to
115
        remember the current object UID.
116
117
        :returns: storage key to lookup back references
118
        """
119
        portal_type = api.get_portal_type(context)
120
        return "%s.%s" % (portal_type, self.__name__)
121
122
    def get_uid(self, value):
123
        """Value -> UID
124
125
        :parm value: object/UID/SuperModel
126
        :returns: UID
127
        """
128
        try:
129
            return api.get_uid(value)
130
        except api.APIError:
131
            return None
132
133
    def get_object(self, value):
134
        """Value -> object
135
136
        :returns: Object or None
137
        """
138
        try:
139
            return api.get_object(value)
140
        except api.APIError:
141
            return None
142
143
    def get_allowed_types(self):
144
        """Returns the allowed reference types
145
146
        :returns: tuple of allowed_types
147
        """
148
        allowed_types = self.allowed_types
149
        if not allowed_types:
150
            allowed_types = ()
151
        elif isinstance(allowed_types, six.string_types):
152
            allowed_types = (allowed_types, )
153
        return allowed_types
154
155
    def set(self, object, value):
156
        """Set UID reference
157
158
        :param object: the instance of the field
159
        :param value: object/UID/SuperModel
160
        :type value: list/tuple/str
161
        """
162
163
        # always mark the object if references are set
164
        # NOTE: there might be multiple UID reference field set on this object!
165
        if value:
166
            alsoProvides(object, IHaveUIDReferences)
167
168
        # always handle all values internally as a list
169
        if isinstance(value, six.string_types):
170
            value = [value]
171
        elif api.is_object(value):
172
            value = [value]
173
        elif value is None:
174
            value = []
175
176
        # convert to UIDs
177
        uids = []
178
        for v in value:
179
            uid = self.get_uid(v)
180
            if uid is None:
181
                continue
182
            uids.append(uid)
183
184
        # current set UIDs
185
        existing = self.get_raw(object)
186
187
        # filter out new/removed UIDs
188
        added_uids = [u for u in uids if u not in existing]
189
        added_objs = filter(None, map(self.get_object, added_uids))
190
191
        removed_uids = [u for u in existing if u not in uids]
192
        removed_objs = filter(None, map(self.get_object, removed_uids))
193
194
        # link backreferences of new uids
195
        for added_obj in added_objs:
196
            self.link_backref(added_obj, object)
197
198
        # unlink backreferences of removed UIDs
199
        for removed_obj in removed_objs:
200
            self.unlink_backref(removed_obj, object)
201
202
        super(UIDReferenceField, self).set(object, uids)
203
204
    def unlink_backref(self, source, target):
205
        """Remove backreference from the source to the target
206
207
        :param source: the object where the backref is stored (our reference)
208
        :param target: the object where the backref points to (our object)
209
        :returns: True when the backref was removed, False otherwise
210
        """
211
212
        # This should be actually not possible
213
        if self.is_initializing(target):
214
            raise ValueError("Objects in initialization state "
215
                             "can not have existing back references!")
216
217
        target_uid = self.get_uid(target)
218
        # get the storage key
219
        key = self.get_relationship_key(target)
220
        # get all backreferences from the source
221
        backrefs = get_backref_storage(source)
222
        if key not in backrefs:
223
            logger.warn(
224
                "Referenced object {} has no backreferences for the key {}"
225
                .format(repr(source), key))
226
            return False
227
        if target_uid not in backrefs[key]:
228
            logger.warn("Target {} was not linked by {}"
229
                        .format(repr(target), repr(source)))
230
            return False
231
        backrefs[key].remove(target_uid)
232
        return True
233
234
    def link_backref(self, source, target):
235
        """Add backreference from the source to the target
236
237
        :param source: the object where the backref is stored (our reference)
238
        :param target: the object where the backref points to (our object)
239
        :returns: True when the backref was written
240
        """
241
242
        # Object is initializing and don't have an UID!
243
        # -> Postpone to set back references in event handler
244
        if self.is_initializing(target):
245
            logger.info("Object is in initialization state. "
246
                        "Back references will be set in event handler")
247
            return
248
249
        target_uid = api.get_uid(target)
250
        # get the annotation storage key
251
        key = self.get_relationship_key(target)
252
        # get all backreferences
253
        backrefs = get_backref_storage(source)
254
        if key not in backrefs:
255
            backrefs[key] = PersistentList()
256
        if target_uid not in backrefs[key]:
257
            backrefs[key].append(target_uid)
258
        return True
259
260
    def get(self, object):
261
        """Get referenced objects
262
263
        :param object: instance of the field
264
        :returns: list of referenced objects
265
        """
266
        return self._get(object, as_objects=True)
267
268
    def get_raw(self, object):
269
        """Get referenced UIDs
270
271
        NOTE: Called from the data manager `query` method
272
              to get the widget value
273
274
        :param object: instance of the field
275
        :returns: list of referenced UIDs
276
        """
277
        return self._get(object, as_objects=False)
278
279
    def _get(self, object, as_objects=False):
280
        """Returns single/multi value
281
282
        :param object: instance of the field
283
        :param as_objects: Flag for UID/object returns
284
        :returns: list of referenced UIDs
285
        """
286
287
        # when creating a new object the context is the container
288
        # which does not have the field
289
        if self.interface and not self.interface.providedBy(object):
290
            if self.multi_valued:
291
                return []
292
            return None
293
294
        uids = super(UIDReferenceField, self).get(object)
295
296
        if not uids:
297
            uids = []
298
299
        if as_objects is True:
300
            uids = filter(None, map(self.get_object, uids))
301
302
        if self.multi_valued:
303
            return uids
304
        if len(uids) == 0:
305
            return None
306
        return uids[0]
307
308
    def _validate(self, value):
309
        """Validator when called from form submission
310
        """
311
        super(UIDReferenceField, self)._validate(value)
312
        # check if the fields accepts single values only
313
        if not self.multi_valued and len(value) > 1:
314
            raise ValueError("Single valued field accepts at most 1 value")
315
316
        # check for valid UIDs
317
        for uid in value:
318
            if not api.is_uid(uid):
319
                raise ValueError("Invalid UID: '%s'" % uid)
320
321
        # check if the type is allowed
322
        allowed_types = self.get_allowed_types()
323
        if allowed_types:
324
            objs = filter(None, map(self.get_object, value))
325
            types = set(map(api.get_portal_type, objs))
326
            if not types.issubset(allowed_types):
327
                raise ValueError("Only the following types are allowed: %s"
328
                                 % ",".join(allowed_types))
329