Passed
Pull Request — 2.x (#1864)
by Ramon
05:13
created

UIDReferenceField.__init__()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 4
1
# -*- coding: utf-8 -*-
2
3
import six
4
5
from bika.lims import api
6
from persistent.dict import PersistentDict
7
from persistent.list import PersistentList
8
from senaite.core import logger
9
from senaite.core.schema.fields import BaseField
10
from senaite.core.schema.interfaces import IUIDReferenceField
11
from zope.annotation.interfaces import IAnnotations
12
from zope.interface import implementer
13
from zope.schema import ASCIILine
14
from Acquisition import aq_base
15
from zope.schema import List
16
17
BACKREFS_STORAGE = "senaite.core.schema.uidreferencefield.backreferences"
18
19
20
def get_brefs(context, relationship, as_objects=False):
21
    """Return backreferences of the context
22
23
    :returns: List of UIDs that are linked by the relationship
24
    """
25
    context = aq_base(context)
26
    # get the bref annotation storage of the context
27
    brefs = get_bref_storage(context)
28
    # get the referenced UIDs
29
    bref_uids = list(brefs.get(relationship, []))
30
31
    if not bref_uids:
32
        return []
33
34
    if as_objects is True:
35
        return [api.get_object(uid) for uid in bref_uids]
36
37
    return bref_uids
38
39
40
def get_bref_storage(context):
41
    """Get the annotation storage for backreferences of the context
42
    """
43
    annotation = IAnnotations(context)
44
    if annotation.get(BACKREFS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable BACKREFS_STORAGE does not seem to be defined.
Loading history...
45
        annotation[BACKREFS_STORAGE] = PersistentDict()
46
    return annotation[BACKREFS_STORAGE]
47
48
49
@implementer(IUIDReferenceField)
50
class UIDReferenceField(List, BaseField):
51
    """Stores UID references to other objects
52
    """
53
54
    value_type = ASCIILine(title=u"UID")
55
56
    def __init__(self, allowed_types=None, multi_valued=True, **kw):
57
        if allowed_types is None:
58
            allowed_types = ()
59
        self.allowed_types = allowed_types
60
        self.multi_valued = multi_valued
61
        super(UIDReferenceField, self).__init__(**kw)
62
63
    def get_relationship_key(self, context):
64
        """Relationship key used for backreferences
65
66
        The key used for the annotation storage on the referenced object to
67
        remember the current object UID.
68
69
        :returns: storage key to lookup back references
70
        """
71
        portal_type = api.get_portal_type(context)
72
        return "%s.%s" % (portal_type, self.__name__)
73
74
    def get_uid(self, value):
75
        """Value -> UID
76
77
        :parm value: object/UID/SuperModel
78
        :returns: UID
79
        """
80
        try:
81
            return api.get_uid(value)
82
        except api.APIError:
83
            return None
84
85
    def get_object(self, value):
86
        """Value -> object
87
88
        :returns: Object or None
89
        """
90
        try:
91
            return api.get_object(value)
92
        except api.APIError:
93
            return None
94
95
    def get_allowed_types(self):
96
        """Returns the allowed reference types
97
98
        :returns: tuple of allowed_types
99
        """
100
        allowed_types = self.allowed_types
101
        if not allowed_types:
102
            allowed_types = ()
103
        elif isinstance(allowed_types, six.string_types):
104
            allowed_types = (allowed_types, )
105
        return allowed_types
106
107
    def set(self, object, value):
108
        """Set UID reference
109
110
        :param object: the instance of the field
111
        :param value: object/UID/SuperModel
112
        :type value: list/tuple/str
113
        """
114
115
        # always handle all values internally as a list
116
        if isinstance(value, six.string_types):
117
            value = [value]
118
        elif api.is_object(value):
119
            value = [value]
120
121
        # check if the fields accepts single values only
122
        if not self.multi_valued and len(value) > 1:
123
            raise TypeError("Single valued field accepts at most 1 value")
124
125
        # check if the type is allowed
126
        allowed_types = self.get_allowed_types()
127
        if allowed_types:
128
            objs = filter(None, map(self.get_object, value))
129
            types = set(map(api.get_portal_type, objs))
130
            if not types.issubset(allowed_types):
131
                raise TypeError("Only the following types are allowed: %s"
132
                                % ",".join(allowed_types))
133
134
        # convert to UIDs
135
        uids = []
136
        for v in value:
137
            uid = self.get_uid(v)
138
            if uid is None:
139
                continue
140
            uids.append(uid)
141
142
        # current set UIDs
143
        existing = self.get_raw(object)
144
145
        # filter out new/removed UIDs
146
        added_uids = [u for u in uids if u not in existing]
147
        added_objs = filter(None, map(self.get_object, added_uids))
148
149
        removed_uids = [u for u in existing if u not in uids]
150
        removed_objs = filter(None, map(self.get_object, removed_uids))
151
152
        # link backreferences of new uids
153
        for added_obj in added_objs:
154
            self.link_bref(added_obj, object)
155
156
        # unlink backreferences of removed UIDs
157
        for removed_obj in removed_objs:
158
            self.unlink_bref(removed_obj, object)
159
160
        super(UIDReferenceField, self).set(object, uids)
161
162
    def unlink_bref(self, source, target):
163
        """Remove backreference from the source to the target
164
165
        :param source: the object where the bref is stored (our reference)
166
        :param target: the object where the bref points to (our object)
167
        :returns: True when the bref was removed, False otherwise
168
        """
169
        target_uid = self.get_uid(target)
170
        # get the storage key
171
        key = self.get_relationship_key(target)
172
        # get all backreferences from the source
173
        brefs = get_bref_storage(source)
174
        if key not in brefs:
175
            logger.warn(
176
                "Referenced object {} has no backreferences for the key {}"
177
                .format(repr(source), key))
178
            return False
179
        if target_uid not in brefs[key]:
180
            logger.warn("Target {} was not linked by {}"
181
                        .format(repr(target), repr(source)))
182
            return False
183
        brefs[key].remove(target_uid)
184
        return True
185
186
    def link_bref(self, source, target):
187
        """Add backreference from the source to the target
188
189
        :param source: the object where the bref is stored (our reference)
190
        :param target: the object where the bref points to (our object)
191
        :returns: True when the bref was written
192
        """
193
        target_uid = api.get_uid(target)
194
        # get the annotation storage key
195
        key = self.get_relationship_key(target)
196
        # get all backreferences
197
        brefs = get_bref_storage(source)
198
        if key not in brefs:
199
            brefs[key] = PersistentList()
200
        if target_uid not in brefs[key]:
201
            brefs[key].append(target_uid)
202
        return True
203
204
    def get(self, object):
205
        """Get referenced objects
206
207
        :param object: instance of the field
208
        :returns: list of referenced objects
209
        """
210
        return self._get(object, as_objects=True)
211
212
    def get_raw(self, object):
213
        """Get referenced UIDs
214
215
        :param object: instance of the field
216
        :returns: list of referenced UIDs
217
        """
218
        return self._get(object, as_objects=False)
219
220
    def _get(self, object, as_objects=False):
221
        """Returns single/multi value
222
223
        :param object: instance of the field
224
        :param as_objects: Flag for UID/object returns
225
        :returns: list of referenced UIDs
226
        """
227
        uids = super(UIDReferenceField, self).get(object)
228
229
        if not uids:
230
            uids = []
231
232
        if as_objects is True:
233
            uids = filter(None, map(self.get_object, uids))
234
235
        if self.multi_valued:
236
            return uids
237
        if len(uids) == 0:
238
            return None
239
        return uids[0]
240