Passed
Push — 2.x ( bd408b...df866e )
by Jordi
06:03
created

bika.lims.browser.fields.uidreferencefield   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 305
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 39
eloc 142
dl 0
loc 305
rs 9.28
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A UIDReferenceField.link_reference() 0 14 3
A UIDReferenceField.unlink_reference() 0 20 3
A UIDReferenceField.get_relationship_key() 0 6 2
A UIDReferenceField.set() 0 27 4
A UIDReferenceField.get_uid() 0 16 3
A UIDReferenceField._set_backreferences() 0 37 5
B UIDReferenceField.get() 0 32 6
A UIDReferenceField.getRaw() 0 23 5

3 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_catalog_for_uid() 0 15 2
A get_storage() 0 5 2
A get_backreferences() 0 39 4
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import six
22
23
from AccessControl import ClassSecurityInfo
24
from Products.Archetypes.Field import Field, StringField
25
from bika.lims import logger
26
from bika.lims import api
27
from bika.lims.interfaces.field import IUIDReferenceField
28
from persistent.list import PersistentList
29
from persistent.dict import PersistentDict
30
from zope.annotation.interfaces import IAnnotations
31
from zope.interface import implements
32
33
BACKREFS_STORAGE = "bika.lims.browser.fields.uidreferencefield.backreferences"
34
35
36
class ReferenceException(Exception):
37
    pass
38
39
40
class UIDReferenceField(StringField):
41
    """A field that stores References as UID values.  This acts as a drop-in
42
    replacement for Archetypes' ReferenceField.  A relationship is required
43
    but if one is not provided, it will be composed from a concatenation
44
    of `portal_type` + `fieldname`.
45
    """
46
    _properties = Field._properties.copy()
47
    _properties.update({
48
        'type': 'uidreference',
49
        'default': '',
50
        'default_content_type': 'text/plain',
51
        'relationship': '',
52
    })
53
54
    implements(IUIDReferenceField)
55
56
    security = ClassSecurityInfo()
57
58
    def get_relationship_key(self, context):
59
        """Return the configured relationship key or generate a new one
60
        """
61
        if not self.relationship:
62
            return context.portal_type + self.getName()
63
        return self.relationship
64
65
    def link_reference(self, source, target):
66
        """Link the target to the source
67
        """
68
        target_uid = api.get_uid(target)
69
        # get the annotation storage key
70
        key = self.get_relationship_key(target)
71
        # get all backreferences from the source
72
        # N.B. only like this we get the persistent mapping!
73
        backrefs = get_backreferences(source, relationship=None)
74
        if key not in backrefs:
75
            backrefs[key] = PersistentList()
76
        if target_uid not in backrefs[key]:
77
            backrefs[key].append(target_uid)
78
        return True
79
80
    def unlink_reference(self, source, target):
81
        """Unlink the target from the source
82
        """
83
        target_uid = api.get_uid(target)
84
        # get the storage key
85
        key = self.get_relationship_key(target)
86
        # get all backreferences from the source
87
        # N.B. only like this we get the persistent mapping!
88
        backrefs = get_backreferences(source, relationship=None)
89
        if key not in backrefs:
90
            logger.warn(
91
                "Referenced object {} has no backreferences for the key {}"
92
                .format(repr(source), key))
93
            return False
94
        if target_uid not in backrefs[key]:
95
            logger.warn("Target {} was not linked by {}"
96
                        .format(repr(target), repr(source)))
97
            return False
98
        backrefs[key].remove(target_uid)
99
        return True
100
101
    @security.public
102
    def get_uid(self, context, value, default=""):
103
        """Takes a brain or object (or UID), and returns a UID.
104
105
        :param context: context is the object who's schema contains this field.
106
        :type context: BaseContent
107
        :param value: Brain, object, or UID.
108
        :type value: Any
109
        :return: resolved UID.
110
        :rtype: string
111
        """
112
        if api.is_object(value):
113
            value = api.get_uid(value)
114
        elif not api.is_uid(value):
115
            value = default
116
        return value
117
118
    @security.public
119
    def get(self, context, **kwargs):
120
        """Grab the stored value, and resolve object(s) from UID catalog.
121
122
        :param context: context is the object who's schema contains this field.
123
        :type context: BaseContent
124
        :param kwargs: kwargs are passed directly to the underlying get.
125
        :type kwargs: dict
126
        :return: object or list of objects for multiValued fields.
127
        :rtype: BaseContent | list[BaseContent]
128
        """
129
        uids = StringField.get(self, context, **kwargs)
130
        if not isinstance(uids, list):
131
            uids = [uids]
132
133
        # Do a direct search for all brains at once
134
        uc = api.get_tool("uid_catalog")
135
        references = uc(UID=uids)
136
137
        # Keep the original order of items
138
        references = sorted(references, key=lambda it: uids.index(it.UID))
139
140
        # Return objects by default
141
        full_objects = kwargs.pop("full_objects", True)
142
        if full_objects:
143
            references = [api.get_object(ref) for ref in references]
144
145
        if self.multiValued:
146
            return references
147
        elif references:
148
            return references[0]
149
        return None
150
151
    @security.public
152
    def getRaw(self, context, aslist=False, **kwargs):
153
        """Grab the stored value, and return it directly as UIDs.
154
155
        :param context: context is the object who's schema contains this field.
156
        :type context: BaseContent
157
        :param aslist: Forces a single-valued field to return a list type.
158
        :type aslist: bool
159
        :param kwargs: kwargs are passed directly to the underlying get.
160
        :type kwargs: dict
161
        :return: UID or list of UIDs for multiValued fields.
162
        :rtype: string | list[string]
163
        """
164
        value = StringField.get(self, context, **kwargs)
165
        if not value:
166
            return [] if self.multiValued else None
167
        if self.multiValued:
168
            ret = value
169
        else:
170
            ret = self.get_uid(context, value)
171
            if aslist:
172
                ret = [ret]
173
        return ret
174
175
    def _set_backreferences(self, context, items, **kwargs):
176
        """Set the back references on the linked items
177
178
        This will set an annotation storage on the referenced items which point
179
        to the current context.
180
        """
181
182
        # Don't set any references during initialization.
183
        # This might cause a recursion error when calling `getRaw` to fetch the
184
        # current set UIDs!
185
        initializing = kwargs.get('_initializing_', False)
186
        if initializing:
187
            return
188
189
        # current set UIDs
190
        raw = self.getRaw(context) or []
191
        # handle single reference fields
192
        if isinstance(raw, six.string_types):
193
            raw = [raw, ]
194
        cur = set(raw)
195
        # UIDs to be set
196
        uids = set(map(api.get_uid, items))
197
        # removed UIDs
198
        removed = cur.difference(uids)
199
        # missing UIDs
200
        missing = uids.difference(cur)
201
202
        # Unlink removed UIDs from the source
203
        uc = api.get_tool("uid_catalog")
204
        for brain in uc(UID=list(removed)):
205
            source = api.get_object(brain)
206
            self.unlink_reference(source, context)
207
208
        # Link missing UIDs
209
        for brain in uc(UID=list(missing)):
210
            target = api.get_object(brain)
211
            self.link_reference(target, context)
212
213
    @security.public
214
    def set(self, context, value, **kwargs):
215
        """Accepts a UID, brain, or an object (or a list of any of these),
216
        and stores a UID or list of UIDS.
217
218
        :param context: context is the object who's schema contains this field.
219
        :type context: BaseContent
220
        :param value: A UID, brain or object (or a sequence of these).
221
        :type value: Any
222
        :param kwargs: kwargs are passed directly to the underlying get.
223
        :type kwargs: dict
224
        :return: None
225
        """
226
        if not isinstance(value, (list, tuple)):
227
            value = [value]
228
229
        # Extract uids and remove empties
230
        uids = [self.get_uid(context, item) for item in value]
231
        uids = filter(api.is_uid, uids)
232
233
        # Back-reference current object to referenced objects
234
        self._set_backreferences(context, uids, **kwargs)
235
236
        # Store the referenced objects as uids
237
        if not self.multiValued:
238
            uids = uids[0] if uids else ""
239
        StringField.set(self, context, uids, **kwargs)
240
241
242
def get_storage(context):
243
    annotation = IAnnotations(context)
244
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
245
        annotation[BACKREFS_STORAGE] = PersistentDict()
246
    return annotation[BACKREFS_STORAGE]
247
248
249
def _get_catalog_for_uid(uid):
250
    at = api.get_tool('archetype_tool')
251
    uc = api.get_tool('uid_catalog')
252
    pc = api.get_tool('portal_catalog')
253
    # get uid_catalog brain for uid
254
    ub = uc(UID=uid)[0]
255
    # get portal_type of brain
256
    pt = ub.portal_type
257
    # get the registered catalogs for portal_type
258
    cats = at.getCatalogsByType(pt)
259
    # try avoid 'portal_catalog'; XXX multiple catalogs in setuphandlers.py?
260
    cats = [cat for cat in cats if cat != pc]
261
    if cats:
262
        return cats[0]
263
    return pc
264
265
266
def get_backreferences(context, relationship=None, as_brains=None):
267
    """Return all objects which use a UIDReferenceField to reference context.
268
269
    :param context: The object which is the target of references.
270
    :param relationship: The relationship name of the UIDReferenceField.
271
    :param as_brains: Requests that this function returns only catalog brains.
272
        as_brains can only be used if a relationship has been specified.
273
274
    This function can be called with or without specifying a relationship.
275
276
    - If a relationship is provided, the return value will be a list of items
277
      which reference the context using the provided relationship.
278
279
      If relationship is provided, then you can request that the backrefs
280
      should be returned as catalog brains.  If you do not specify as_brains,
281
      the raw list of UIDs will be returned.
282
283
    - If the relationship is not provided, then the entire set of
284
      backreferences to the context object is returned (by reference) as a
285
      dictionary.  This value can then be modified in-place, to edit the stored
286
      backreferences.
287
    """
288
289
    instance = context.aq_base
290
    raw_backrefs = get_storage(instance)
291
292
    if not relationship:
293
        assert not as_brains, "You cannot use as_brains with no relationship"
294
        return raw_backrefs
295
296
    backrefs = list(raw_backrefs.get(relationship, []))
297
    if not backrefs:
298
        return []
299
300
    if not as_brains:
301
        return backrefs
302
303
    cat = _get_catalog_for_uid(backrefs[0])
304
    return cat(UID=backrefs)
305