Passed
Push — 2.x ( 763919...f9f320 )
by Ramon
06:41 queued 16s
created

get_backreferences()   A

Complexity

Conditions 4

Size

Total Lines 39
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 39
rs 9.75
c 0
b 0
f 0
cc 4
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import six
22
23
from AccessControl import ClassSecurityInfo
24
from Products.Archetypes.Field import Field, StringField
25
from bika.lims import logger
26
from bika.lims import api
27
from bika.lims.interfaces.field import IUIDReferenceField
28
from persistent.list import PersistentList
29
from persistent.dict import PersistentDict
30
from zope.annotation.interfaces import IAnnotations
31
from zope.interface import implements
32
33
BACKREFS_STORAGE = "bika.lims.browser.fields.uidreferencefield.backreferences"
34
35
36
class ReferenceException(Exception):
37
    pass
38
39
40
class UIDReferenceField(StringField):
41
    """A field that stores References as UID values.  This acts as a drop-in
42
    replacement for Archetypes' ReferenceField.  A relationship is required
43
    but if one is not provided, it will be composed from a concatenation
44
    of `portal_type` + `fieldname`.
45
    """
46
    _properties = Field._properties.copy()
47
    _properties.update({
48
        'type': 'uidreference',
49
        'default': '',
50
        'default_content_type': 'text/plain',
51
        'relationship': '',
52
    })
53
54
    implements(IUIDReferenceField)
55
56
    security = ClassSecurityInfo()
57
58
    def get_relationship_key(self, context):
59
        """Return the configured relationship key or generate a new one
60
        """
61
        if not self.relationship:
62
            return context.portal_type + self.getName()
63
        return self.relationship
64
65
    def link_reference(self, source, target):
66
        """Link the target to the source
67
        """
68
        target_uid = api.get_uid(target)
69
        # get the annotation storage key
70
        key = self.get_relationship_key(target)
71
        # get all backreferences from the source
72
        # N.B. only like this we get the persistent mapping!
73
        backrefs = get_backreferences(source, relationship=None)
74
        if key not in backrefs:
75
            backrefs[key] = PersistentList()
76
        if target_uid not in backrefs[key]:
77
            backrefs[key].append(target_uid)
78
        return True
79
80
    def unlink_reference(self, source, target):
81
        """Unlink the target from the source
82
        """
83
        target_uid = api.get_uid(target)
84
        # get the storage key
85
        key = self.get_relationship_key(target)
86
        # get all backreferences from the source
87
        # N.B. only like this we get the persistent mapping!
88
        backrefs = get_backreferences(source, relationship=None)
89
        if key not in backrefs:
90
            logger.warn(
91
                "Referenced object {} has no backreferences for the key {}"
92
                .format(repr(source), key))
93
            return False
94
        if target_uid not in backrefs[key]:
95
            logger.warn("Target {} was not linked by {}"
96
                        .format(repr(target), repr(source)))
97
            return False
98
        backrefs[key].remove(target_uid)
99
        return True
100
101
    @security.public
102
    def get_object(self, context, value):
103
        """Resolve a UID to an object.
104
105
        :param context: context is the object containing the field's schema.
106
        :type context: BaseContent
107
        :param value: A UID.
108
        :type value: string
109
        :return: Returns a Content object.
110
        :rtype: BaseContent
111
        """
112
        if not value:
113
            return None
114
        obj = _get_object(context, value)
115
        if obj is None:
116
            logger.warning(
117
                "{}.{}: Resolving UIDReference failed for {}.  No object will "
118
                "be returned.".format(context, self.getName(), value))
119
        return obj
120
121
    @security.public
122
    def get_uid(self, context, value):
123
        """Takes a brain or object (or UID), and returns a UID.
124
125
        :param context: context is the object who's schema contains this field.
126
        :type context: BaseContent
127
        :param value: Brain, object, or UID.
128
        :type value: Any
129
        :return: resolved UID.
130
        :rtype: string
131
        """
132
        # Empty string or list with single empty string, are commonly
133
        # passed to us from form submissions
134
        if not value or value == ['']:
135
            ret = ''
136
        elif api.is_brain(value):
137
            ret = value.UID
138
        elif api.is_at_content(value) or api.is_dexterity_content(value):
139
            ret = value.UID()
140
        elif api.is_uid(value):
141
            ret = value
142
        else:
143
            raise ReferenceException("{}.{}: Cannot resolve UID for {}".format(
144
                context, self.getName(), value))
145
        return ret
146
147
    @security.public
148
    def get(self, context, **kwargs):
149
        """Grab the stored value, and resolve object(s) from UID catalog.
150
151
        :param context: context is the object who's schema contains this field.
152
        :type context: BaseContent
153
        :param kwargs: kwargs are passed directly to the underlying get.
154
        :type kwargs: dict
155
        :return: object or list of objects for multiValued fields.
156
        :rtype: BaseContent | list[BaseContent]
157
        """
158
        uids = StringField.get(self, context, **kwargs)
159
        if not isinstance(uids, list):
160
            uids = [uids]
161
162
        # Do a direct search for all brains at once
163
        uc = api.get_tool("uid_catalog")
164
        references = uc(UID=uids)
165
166
        # Keep the original order of items
167
        references = sorted(references, key=lambda it: uids.index(it.UID))
168
169
        # Return objects by default
170
        full_objects = kwargs.pop("full_objects", True)
171
        if full_objects:
172
            references = [api.get_object(ref) for ref in references]
173
174
        if self.multiValued:
175
            return references
176
        elif references:
177
            return references[0]
178
        return None
179
180
    @security.public
181
    def getRaw(self, context, aslist=False, **kwargs):
182
        """Grab the stored value, and return it directly as UIDs.
183
184
        :param context: context is the object who's schema contains this field.
185
        :type context: BaseContent
186
        :param aslist: Forces a single-valued field to return a list type.
187
        :type aslist: bool
188
        :param kwargs: kwargs are passed directly to the underlying get.
189
        :type kwargs: dict
190
        :return: UID or list of UIDs for multiValued fields.
191
        :rtype: string | list[string]
192
        """
193
        value = StringField.get(self, context, **kwargs)
194
        if not value:
195
            return [] if self.multiValued else None
196
        if self.multiValued:
197
            ret = value
198
        else:
199
            ret = self.get_uid(context, value)
200
            if aslist:
201
                ret = [ret]
202
        return ret
203
204
    def _set_backreferences(self, context, items, **kwargs):
205
        """Set the back references on the linked items
206
207
        This will set an annotation storage on the referenced items which point
208
        to the current context.
209
        """
210
211
        # Don't set any references during initialization.
212
        # This might cause a recursion error when calling `getRaw` to fetch the
213
        # current set UIDs!
214
        initializing = kwargs.get('_initializing_', False)
215
        if initializing:
216
            return
217
218
        # UID of the current object
219
        uid = api.get_uid(context)
220
        # current set UIDs
221
        raw = self.getRaw(context) or []
222
        # handle single reference fields
223
        if isinstance(raw, six.string_types):
224
            raw = [raw, ]
225
        cur = set(raw)
226
        # UIDs to be set
227
        new = set(map(api.get_uid, items))
228
        # removed UIDs
229
        removed = cur.difference(new)
230
231
        # Unlink removed UIDs from the source
232
        for uid in removed:
233
            source = api.get_object_by_uid(uid, None)
234
            if source is None:
235
                logger.warn("UID {} does not exist anymore".format(uid))
236
                continue
237
            self.unlink_reference(source, context)
238
239
        # Link backrefs
240
        for item in items:
241
            self.link_reference(item, context)
242
243
    @security.public
244
    def set(self, context, value, **kwargs):
245
        """Accepts a UID, brain, or an object (or a list of any of these),
246
        and stores a UID or list of UIDS.
247
248
        :param context: context is the object who's schema contains this field.
249
        :type context: BaseContent
250
        :param value: A UID, brain or object (or a sequence of these).
251
        :type value: Any
252
        :param kwargs: kwargs are passed directly to the underlying get.
253
        :type kwargs: dict
254
        :return: None
255
        """
256
        if self.multiValued:
257
            if not value:
258
                value = []
259
            if type(value) not in (list, tuple):
260
                value = [value, ]
261
            ret = [self.get_object(context, val) for val in value if val]
262
            self._set_backreferences(context, ret, **kwargs)
263
            uids = [self.get_uid(context, r) for r in ret if r]
264
            StringField.set(self, context, uids, **kwargs)
265
        else:
266
            # Sometimes we get given a list here with an empty string.
267
            # This is generated by html forms with empty values.
268
            # This is a single-valued field though, so:
269
            if isinstance(value, list) and value:
270
                if len(value) > 1:
271
                    logger.warning(
272
                        "Found values '\'{}\'' for singleValued field <{}>.{} "
273
                        "- using only the first value in the list.".format(
274
                            '\',\''.join(value), context.UID(), self.getName()))
275
                value = value[0]
276
            ret = self.get_object(context, value)
277
            if ret:
278
                self._set_backreferences(context, [ret, ], **kwargs)
279
                uid = self.get_uid(context, ret)
280
                StringField.set(self, context, uid, **kwargs)
281
            else:
282
                StringField.set(self, context, '', **kwargs)
283
284
285
def _get_object(context, value):
286
    """Resolve a UID to an object.
287
288
    :param context: context is the object containing the field's schema.
289
    :type context: BaseContent
290
    :param value: A UID.
291
    :type value: string
292
    :return: Returns a Content object or None.
293
    :rtype: BaseContent
294
    """
295
    if not value:
296
        return None
297
    if api.is_brain(value):
298
        return api.get_object(value)
299
    if api.is_object(value):
300
        return value
301
    if api.is_uid(value):
302
        uc = api.get_tool('uid_catalog', context=context)
303
        brains = uc(UID=value)
304
        if len(brains) == 0:
305
            # Broken Reference!
306
            logger.warn("Reference on {} with UID {} is broken!"
307
                        .format(repr(context), value))
308
            return None
309
        return brains[0].getObject()
310
    return None
311
312
313
def get_storage(context):
314
    annotation = IAnnotations(context)
315
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
316
        annotation[BACKREFS_STORAGE] = PersistentDict()
317
    return annotation[BACKREFS_STORAGE]
318
319
320
def _get_catalog_for_uid(uid):
321
    at = api.get_tool('archetype_tool')
322
    uc = api.get_tool('uid_catalog')
323
    pc = api.get_tool('portal_catalog')
324
    # get uid_catalog brain for uid
325
    ub = uc(UID=uid)[0]
326
    # get portal_type of brain
327
    pt = ub.portal_type
328
    # get the registered catalogs for portal_type
329
    cats = at.getCatalogsByType(pt)
330
    # try avoid 'portal_catalog'; XXX multiple catalogs in setuphandlers.py?
331
    cats = [cat for cat in cats if cat != pc]
332
    if cats:
333
        return cats[0]
334
    return pc
335
336
337
def get_backreferences(context, relationship=None, as_brains=None):
338
    """Return all objects which use a UIDReferenceField to reference context.
339
340
    :param context: The object which is the target of references.
341
    :param relationship: The relationship name of the UIDReferenceField.
342
    :param as_brains: Requests that this function returns only catalog brains.
343
        as_brains can only be used if a relationship has been specified.
344
345
    This function can be called with or without specifying a relationship.
346
347
    - If a relationship is provided, the return value will be a list of items
348
      which reference the context using the provided relationship.
349
350
      If relationship is provided, then you can request that the backrefs
351
      should be returned as catalog brains.  If you do not specify as_brains,
352
      the raw list of UIDs will be returned.
353
354
    - If the relationship is not provided, then the entire set of
355
      backreferences to the context object is returned (by reference) as a
356
      dictionary.  This value can then be modified in-place, to edit the stored
357
      backreferences.
358
    """
359
360
    instance = context.aq_base
361
    raw_backrefs = get_storage(instance)
362
363
    if not relationship:
364
        assert not as_brains, "You cannot use as_brains with no relationship"
365
        return raw_backrefs
366
367
    backrefs = list(raw_backrefs.get(relationship, []))
368
    if not backrefs:
369
        return []
370
371
    if not as_brains:
372
        return backrefs
373
374
    cat = _get_catalog_for_uid(backrefs[0])
375
    return cat(UID=backrefs)
376