Passed
Pull Request — 2.x (#1864)
by Ramon
05:30
created

UIDReferenceField.unlink_bref()   A

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 23
rs 9.65
c 0
b 0
f 0
cc 3
nop 3
1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from bika.lims import api
6
from persistent.dict import PersistentDict
7
from persistent.list import PersistentList
8
from senaite.core import logger
9
from senaite.core.schema.fields import BaseField
10
from senaite.core.schema.interfaces import IUIDReferenceField
11
from zope.annotation.interfaces import IAnnotations
12
from zope.interface import implementer
13
from zope.schema import ASCIILine
14
from Acquisition import aq_base
15
from zope.schema import List
16
from zope.schema.interfaces import IFromUnicode
17
18
BACKREFS_STORAGE = "senaite.core.schema.uidreferencefield.backreferences"
19
20
21
def get_brefs(context, relationship, as_objects=False):
22
    """Return backreferences of the context
23
24
    :returns: List of UIDs that are linked by the relationship
25
    """
26
    context = aq_base(context)
27
    # get the bref annotation storage of the context
28
    brefs = get_bref_storage(context)
29
    # get the referenced UIDs
30
    bref_uids = list(brefs.get(relationship, []))
31
32
    if not bref_uids:
33
        return []
34
35
    if as_objects is True:
36
        return [api.get_object(uid) for uid in bref_uids]
37
38
    return bref_uids
39
40
41
def get_bref_storage(context):
42
    """Get the annotation storage for backreferences of the context
43
    """
44
    annotation = IAnnotations(context)
45
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
46
        annotation[BACKREFS_STORAGE] = PersistentDict()
47
    return annotation[BACKREFS_STORAGE]
48
49
50
@implementer(IUIDReferenceField, IFromUnicode)
51
class UIDReferenceField(List, BaseField):
52
    """Stores UID references to other objects
53
    """
54
55
    value_type = ASCIILine(title=u"UID")
56
57
    def __init__(self, allowed_types=None, multi_valued=True, **kw):
58
        if allowed_types is None:
59
            allowed_types = ()
60
        self.allowed_types = allowed_types
61
        self.multi_valued = multi_valued
62
        super(UIDReferenceField, self).__init__(**kw)
63
64
    def get_relationship_key(self, context):
65
        """Relationship key used for backreferences
66
67
        The key used for the annotation storage on the referenced object to
68
        remember the current object UID.
69
70
        :returns: storage key to lookup back references
71
        """
72
        portal_type = api.get_portal_type(context)
73
        return "%s.%s" % (portal_type, self.__name__)
74
75
    def get_uid(self, value):
76
        """Value -> UID
77
78
        :parm value: object/UID/SuperModel
79
        :returns: UID
80
        """
81
        try:
82
            return api.get_uid(value)
83
        except api.APIError:
84
            return None
85
86
    def get_object(self, value):
87
        """Value -> object
88
89
        :returns: Object or None
90
        """
91
        try:
92
            return api.get_object(value)
93
        except api.APIError:
94
            return None
95
96
    def get_allowed_types(self):
97
        """Returns the allowed reference types
98
99
        :returns: tuple of allowed_types
100
        """
101
        allowed_types = self.allowed_types
102
        if not allowed_types:
103
            allowed_types = ()
104
        elif isinstance(allowed_types, six.string_types):
105
            allowed_types = (allowed_types, )
106
        return allowed_types
107
108
    def set(self, object, value):
109
        """Set UID reference
110
111
        :param object: the instance of the field
112
        :param value: object/UID/SuperModel
113
        :type value: list/tuple/str
114
        """
115
116
        # always handle all values internally as a list
117
        if isinstance(value, six.string_types):
118
            value = [value]
119
        elif api.is_object(value):
120
            value = [value]
121
122
        # check if the fields accepts single values only
123
        if not self.multi_valued and len(value) > 1:
124
            raise TypeError("Single valued field accepts at most 1 value")
125
126
        # check if the type is allowed
127
        allowed_types = self.get_allowed_types()
128
        if allowed_types:
129
            objs = filter(None, map(self.get_object, value))
130
            types = set(map(api.get_portal_type, objs))
131
            if not types.issubset(allowed_types):
132
                raise TypeError("Only the following types are allowed: %s"
133
                                % ",".join(allowed_types))
134
135
        # convert to UIDs
136
        uids = []
137
        for v in value:
138
            uid = self.get_uid(v)
139
            if uid is None:
140
                continue
141
            uids.append(uid)
142
143
        # current set UIDs
144
        existing = self.get_raw(object)
145
146
        # filter out new/removed UIDs
147
        added_uids = [u for u in uids if u not in existing]
148
        added_objs = filter(None, map(self.get_object, added_uids))
149
150
        removed_uids = [u for u in existing if u not in uids]
151
        removed_objs = filter(None, map(self.get_object, removed_uids))
152
153
        # link backreferences of new uids
154
        for added_obj in added_objs:
155
            self.link_bref(added_obj, object)
156
157
        # unlink backreferences of removed UIDs
158
        for removed_obj in removed_objs:
159
            self.unlink_bref(removed_obj, object)
160
161
        super(UIDReferenceField, self).set(object, uids)
162
163
    def unlink_bref(self, source, target):
164
        """Remove backreference from the source to the target
165
166
        :param source: the object where the bref is stored (our reference)
167
        :param target: the object where the bref points to (our object)
168
        :returns: True when the bref was removed, False otherwise
169
        """
170
        target_uid = self.get_uid(target)
171
        # get the storage key
172
        key = self.get_relationship_key(target)
173
        # get all backreferences from the source
174
        brefs = get_bref_storage(source)
175
        if key not in brefs:
176
            logger.warn(
177
                "Referenced object {} has no backreferences for the key {}"
178
                .format(repr(source), key))
179
            return False
180
        if target_uid not in brefs[key]:
181
            logger.warn("Target {} was not linked by {}"
182
                        .format(repr(target), repr(source)))
183
            return False
184
        brefs[key].remove(target_uid)
185
        return True
186
187
    def link_bref(self, source, target):
188
        """Add backreference from the source to the target
189
190
        :param source: the object where the bref is stored (our reference)
191
        :param target: the object where the bref points to (our object)
192
        :returns: True when the bref was written
193
        """
194
        target_uid = api.get_uid(target)
195
        # get the annotation storage key
196
        key = self.get_relationship_key(target)
197
        # get all backreferences
198
        brefs = get_bref_storage(source)
199
        if key not in brefs:
200
            brefs[key] = PersistentList()
201
        if target_uid not in brefs[key]:
202
            brefs[key].append(target_uid)
203
        return True
204
205
    def get(self, object):
206
        """Get referenced objects
207
208
        :param object: instance of the field
209
        :returns: list of referenced objects
210
        """
211
        return self._get(object, as_objects=True)
212
213
    def get_raw(self, object):
214
        """Get referenced UIDs
215
216
        :param object: instance of the field
217
        :returns: list of referenced UIDs
218
        """
219
        return self._get(object, as_objects=False)
220
221
    def _get(self, object, as_objects=False):
222
        """Returns single/multi value
223
224
        :param object: instance of the field
225
        :param as_objects: Flag for UID/object returns
226
        :returns: list of referenced UIDs
227
        """
228
        uids = super(UIDReferenceField, self).get(object)
229
230
        if not uids:
231
            uids = []
232
233
        if as_objects is True:
234
            uids = filter(None, map(self.get_object, uids))
235
236
        if self.multi_valued:
237
            return uids
238
        if len(uids) == 0:
239
            return None
240
        return uids[0]
241
242
    def fromUnicode(self, value):
243
        self.validate(value)
244
        return value
245