Passed
Push — 2.x ( d2787e...873989 )
by Jordi
10:51 queued 04:42
created

UIDReferenceField.keep_backreferences()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 3
nop 1
1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from Acquisition import aq_base
6
from bika.lims import api
7
from persistent.dict import PersistentDict
8
from persistent.list import PersistentList
9
from plone.behavior.interfaces import IBehavior
10
from plone.uuid.interfaces import ATTRIBUTE_NAME
11
from senaite.core import logger
12
from senaite.core.events.uidreference import UIDReferenceCreated
13
from senaite.core.events.uidreference import UIDReferenceDestroyed
14
from senaite.core.interfaces import IHaveUIDReferences
15
from senaite.core.schema.fields import BaseField
16
from senaite.core.schema.interfaces import IUIDReferenceField
17
from zope.annotation.interfaces import IAnnotations
18
from zope.event import notify
19
from zope.interface import alsoProvides
20
from zope.interface import implementer
21
from zope.schema import ASCIILine
22
from zope.schema import List
23
24
BACKREFS_STORAGE = "senaite.core.schema.uidreferencefield.backreferences"
25
26
27
def notify_reference_created(field, source, target):
28
    """Notify that a reference between two objects was created
29
    """
30
    notify(UIDReferenceCreated(field, source, target))
31
32
33
def notify_reference_destroyed(field, source, target):
34
    """Notify that a reference between two objects was destroyed
35
    """
36
    notify(UIDReferenceDestroyed(field, source, target))
37
38
39
def on_object_created(object, event):
40
    """Link backreferences after the object was created
41
42
    This is necessary, because objects during initialization have no UID set!
43
    https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
44
45
    Therefore, and only for the creation case, we need to link the back
46
    references in this handler for all UID reference fields.
47
    """
48
    fields = api.get_fields(object)
49
    for name, field in fields.items():
50
        if not IUIDReferenceField.providedBy(field):
51
            continue
52
        # after creation, we only need to link backreferences
53
        value = field.get(object)
54
        # handle single valued reference fields
55
        if api.is_object(value):
56
            if field.link_backref(value, object):
57
                logger.info(
58
                    "Added back reference from %s -> %s" % (value, object))
59
            notify_reference_created(field, object, value)
60
        # handle multi valued reference fields
61
        elif isinstance(value, list):
62
            for ref in value:
63
                if field.link_backref(ref, object):
64
                    logger.info(
65
                        "Added back reference from %s -> %s" % (ref, object))
66
                notify_reference_created(field, object, ref)
67
68
69
def get_backrefs(context, relationship, as_objects=False):
70
    """Return backreferences of the context
71
72
    :returns: List of UIDs that are linked by the relationship
73
    """
74
    context = aq_base(context)
75
    # get the backref annotation storage of the context
76
    backrefs = get_backref_storage(context)
77
    # get the referenced UIDs
78
    backref_uids = list(backrefs.get(relationship, []))
79
80
    if not backref_uids:
81
        return []
82
83
    if as_objects is True:
84
        return [api.get_object(uid) for uid in backref_uids]
85
86
    return backref_uids
87
88
89
def get_backref_storage(context):
90
    """Get the annotation storage for backreferences of the context
91
    """
92
    annotation = IAnnotations(context)
93
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
94
        annotation[BACKREFS_STORAGE] = PersistentDict()
95
    return annotation[BACKREFS_STORAGE]
96
97
98
@implementer(IUIDReferenceField)
99
class UIDReferenceField(List, BaseField):
100
    """Stores UID references to other objects
101
    """
102
103
    value_type = ASCIILine(title=u"UID")
104
105
    def __init__(self, allowed_types=None, multi_valued=True, **kw):
106
        if allowed_types is None:
107
            allowed_types = ()
108
        self.relationship = kw.get("relationship")
109
        self.allowed_types = allowed_types
110
        self.multi_valued = multi_valued
111
        super(UIDReferenceField, self).__init__(**kw)
112
113
    def is_initializing(self, object):
114
        """Checks if the object is initialized at the moment
115
116
        Background:
117
118
        Objects being created do not have a UID set.
119
        Therefore, ths backreferences need to be postponed to an event handler.
120
121
        https://community.plone.org/t/accessing-uid-in-dexterity-field-setter/14468
122
        """
123
        uid = getattr(object, ATTRIBUTE_NAME, None)
124
        if uid is None:
125
            return True
126
        return False
127
128
    def to_list(self, value, filter_empty=True):
129
        """Ensure the value is a list
130
        """
131
        if isinstance(value, six.string_types):
132
            value = [value]
133
        elif api.is_object(value):
134
            value = [value]
135
        elif value is None:
136
            value = []
137
        if filter_empty:
138
            value = filter(None, value)
139
        return value
140
141
    def get_relationship_key(self, context):
142
        """Relationship key used for backreferences
143
144
        The key used for the annotation storage on the referenced object to
145
        remember the current object UID.
146
147
        :returns: storage key to lookup back references
148
        """
149
        relationship = getattr(self, "relationship", None)
150
        if not relationship:
151
            portal_type = api.get_portal_type(context)
152
            relationship = "%s.%s" % (portal_type, self.__name__)
153
        return relationship
154
155
    @property
156
    def keep_backreferences(self):
157
        """Returns whether this field must keep back references. Returns False
158
        if the value for property relationship is None or empty
159
        """
160
        relationship = getattr(self, "relationship", None)
161
        if relationship and isinstance(relationship, six.string_types):
162
            return True
163
        return False
164
165
    def get_uid(self, value):
166
        """Value -> UID
167
168
        :parm value: object/UID/SuperModel
169
        :returns: UID
170
        """
171
        try:
172
            return api.get_uid(value)
173
        except api.APIError:
174
            return None
175
176
    def get_object(self, value):
177
        """Value -> object
178
179
        :returns: Object or None
180
        """
181
        try:
182
            return api.get_object(value)
183
        except api.APIError:
184
            return None
185
186
    def get_allowed_types(self):
187
        """Returns the allowed reference types
188
189
        :returns: tuple of allowed_types
190
        """
191
        allowed_types = self.allowed_types
192
        if not allowed_types:
193
            allowed_types = ()
194
        elif isinstance(allowed_types, six.string_types):
195
            allowed_types = (allowed_types, )
196
        return allowed_types
197
198
    def set(self, object, value):
199
        """Set UID reference
200
201
        :param object: the instance of the field
202
        :param value: object/UID/SuperModel
203
        :type value: list/tuple/str
204
        """
205
206
        # Object might be a behavior instead of the object itself
207
        object = self._get_content_object(object)
208
209
        # always mark the object if references are set
210
        # NOTE: there might be multiple UID reference field set on this object!
211
        if value:
212
            alsoProvides(object, IHaveUIDReferences)
213
214
        # always handle all values internally as a list
215
        value = self.to_list(value)
216
217
        # convert to UIDs
218
        uids = []
219
        for v in value:
220
            uid = self.get_uid(v)
221
            if uid is None:
222
                continue
223
            uids.append(uid)
224
225
        # current set UIDs
226
        existing = self.to_list(self.get_raw(object))
227
228
        # filter out new/removed UIDs
229
        added_uids = [u for u in uids if u not in existing]
230
        added_objs = filter(None, map(self.get_object, added_uids))
231
232
        removed_uids = [u for u in existing if u not in uids]
233
        removed_objs = filter(None, map(self.get_object, removed_uids))
234
235
        # link backreferences of new uids
236
        for added_obj in added_objs:
237
            if self.link_backref(added_obj, object):
238
                notify_reference_created(self, object, added_obj)
239
240
        # unlink backreferences of removed UIDs
241
        for removed_obj in removed_objs:
242
            self.unlink_backref(removed_obj, object)
243
            notify_reference_destroyed(self, object, removed_obj)
244
245
        super(UIDReferenceField, self).set(object, uids)
246
247
    def unlink_backref(self, source, target):
248
        """Remove backreference from the source to the target
249
250
        :param source: the object where the backref is stored (our reference)
251
        :param target: the object where the backref points to (our object)
252
        :returns: True when the backref was removed, False otherwise
253
        """
254
        # only unlink backreference if a `relationship` key is explicitly set
255
        if not self.keep_backreferences:
256
            return False
257
258
        # Target might be a behavior instead of the object itself
259
        target = self._get_content_object(target)
260
261
        # This should be actually not possible
262
        if self.is_initializing(target):
263
            raise ValueError("Objects in initialization state "
264
                             "can not have existing back references!")
265
266
        target_uid = self.get_uid(target)
267
        # get the storage key
268
        key = self.get_relationship_key(target)
269
        # get all backreferences from the source
270
        backrefs = get_backref_storage(source)
271
        if key not in backrefs:
272
            logger.warn(
273
                "Referenced object {} has no backreferences for the key {}"
274
                .format(repr(source), key))
275
            return False
276
        if target_uid not in backrefs[key]:
277
            logger.warn("Target {} was not linked by {}"
278
                        .format(repr(target), repr(source)))
279
            return False
280
        backrefs[key].remove(target_uid)
281
        return True
282
283
    def link_backref(self, source, target):
284
        """Add backreference from the source to the target
285
286
        :param source: the object where the backref is stored (our reference)
287
        :param target: the object where the backref points to (our object)
288
        :returns: True when the backref was written
289
        """
290
        # only link backreference if a `relationship` key is explicitly set
291
        if not self.keep_backreferences:
292
            return False
293
294
        # Target might be a behavior instead of the object itself
295
        target = self._get_content_object(target)
296
297
        # Object is initializing and don't have an UID!
298
        # -> Postpone to set back references in event handler
299
        if self.is_initializing(target):
300
            logger.info("Object is in initialization state. "
301
                        "Back references will be set in event handler")
302
            return
303
304
        target_uid = api.get_uid(target)
305
        # get the annotation storage key
306
        key = self.get_relationship_key(target)
307
        # get all backreferences
308
        backrefs = get_backref_storage(source)
309
        if key not in backrefs:
310
            backrefs[key] = PersistentList()
311
        if target_uid not in backrefs[key]:
312
            backrefs[key].append(target_uid)
313
        return True
314
315
    def get(self, object):
316
        """Get referenced objects
317
318
        :param object: instance of the field
319
        :returns: list of referenced objects
320
        """
321
        return self._get(object, as_objects=True)
322
323
    def get_raw(self, object):
324
        """Get referenced UIDs
325
326
        NOTE: Called from the data manager `query` method
327
              to get the widget value
328
329
        :param object: instance of the field
330
        :returns: list of referenced UIDs
331
        """
332
        return self._get(object, as_objects=False)
333
334
    def _get(self, object, as_objects=False):
335
        """Returns single/multi value
336
337
        :param object: instance of the field
338
        :param as_objects: Flag for UID/object returns
339
        :returns: list of referenced UIDs
340
        """
341
342
        # Object might be a behavior instead of the object itself
343
        object = self._get_content_object(object)
344
345
        uids = super(UIDReferenceField, self).get(object)
346
347
        if not uids:
348
            uids = []
349
        elif not isinstance(uids, (list, tuple)):
350
            uids = [uids]
351
352
        if as_objects is True:
353
            uids = filter(None, map(self.get_object, uids))
354
355
        if self.multi_valued:
356
            return uids
357
        if len(uids) == 0:
358
            return None
359
        return uids[0]
360
361
    def _validate(self, value):
362
        """Validator when called from form submission
363
        """
364
        super(UIDReferenceField, self)._validate(value)
365
        # check if the fields accepts single values only
366
        if not self.multi_valued and len(value) > 1:
367
            raise ValueError("Single valued field accepts at most 1 value")
368
369
        # check for valid UIDs
370
        for uid in value:
371
            if not api.is_uid(uid):
372
                raise ValueError("Invalid UID: '%s'" % uid)
373
374
        # check if the type is allowed
375
        allowed_types = self.get_allowed_types()
376
        if allowed_types:
377
            objs = filter(None, map(self.get_object, value))
378
            types = set(map(api.get_portal_type, objs))
379
            if not types.issubset(allowed_types):
380
                raise ValueError("Only the following types are allowed: %s"
381
                                 % ",".join(allowed_types))
382
383
    def _get_content_object(self, thing):
384
        """Returns the underlying content object
385
        """
386
        if IBehavior.providedBy(thing):
387
            return self._get_content_object(thing.context)
388
389
        if api.is_dexterity_content(thing):
390
            return thing
391
392
        # Not all behaviors implement plone.behavior.interfaces.IBehavior
393
        context = getattr(thing, "context", None)
394
        if context:
395
            clazz = thing.__class__
396
            clazz = "{}.{}".format(clazz.__module__, clazz.__name__)
397
            logger.warn("{} does not implement IBehavior".format(clazz))
398
            return self._get_content_object(context)
399
400
        raise ValueError("Not a valid object: %s" % repr(thing))
401