Passed
Push — master ( 1b98f4...c06457 )
by Ramon
09:26 queued 04:32
created

bika.lims.browser.fields.historyawarereferencefield   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 45
eloc 128
dl 0
loc 223
rs 8.8
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
C HistoryAwareReferenceField.get() 0 28 9
A HistoryAwareReferenceField.unlink_version() 0 14 3
A HistoryAwareReferenceField.retrieve_version() 0 19 3
A HistoryAwareReferenceField.get_backreferences_for() 0 8 2
A HistoryAwareReferenceField.get_versioned_references_for() 0 21 3
B HistoryAwareReferenceField.preprocess_value() 0 16 6
A HistoryAwareReferenceField.del_reference() 0 8 1
A HistoryAwareReferenceField.link_version() 0 17 3
D HistoryAwareReferenceField.set() 0 37 13
A HistoryAwareReferenceField.add_reference() 0 14 2

How to fix   Complexity   

Complexity

Complex classes like bika.lims.browser.fields.historyawarereferencefield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
from AccessControl import ClassSecurityInfo
9
from Acquisition import aq_base
10
from bika.lims import api
11
from bika.lims import logger
12
from Products.Archetypes.public import ReferenceField
13
from Products.Archetypes.Registry import registerField
14
15
VERSION_ID = "version_id"
16
REFERENCE_VERSIONS = "reference_versions"
17
18
19
class HistoryAwareReferenceField(ReferenceField):
20
    """Version aware references.
21
22
    Uses instance.reference_versions[uid] to record uid.version_id,
23
    to pin this reference to a specific version.
24
    """
25
    security = ClassSecurityInfo()
26
27
    def get_versioned_references_for(self, instance):
28
        """Returns the versioned references for the given instance
29
        """
30
        vrefs = []
31
32
        # Retrieve the referenced objects
33
        refs = instance.getRefs(relationship=self.relationship)
34
35
        ref_versions = getattr(instance, REFERENCE_VERSIONS, None)
36
        # No versions stored, return the original references
37
        if ref_versions is None:
38
            return refs
39
40
        for ref in refs:
41
            uid = api.get_uid(ref)
42
            # get the linked version to the reference
43
            version = ref_versions.get(uid)
44
            # append the versioned reference
45
            vrefs.append(self.retrieve_version(ref, version))
46
47
        return vrefs
48
49
    def retrieve_version(self, obj, version):
50
        """Retrieve the version of the object
51
        """
52
        current_version = getattr(obj, VERSION_ID, None)
53
54
        if current_version is None:
55
            # No initial version
56
            return obj
57
58
        if str(current_version) == str(version):
59
            # Same version
60
            return obj
61
62
        # Retrieve the object from the repository
63
        pr = api.get_tool("portal_repository")
64
        # bypass permission check to AccessPreviousVersions
65
        result = pr._retrieve(
66
            obj, selector=version, preserve=(), countPurged=True)
67
        return result.object
68
69
    def get_backreferences_for(self, instance):
70
        """Returns the backreferences for the given instance
71
72
        :returns: list of UIDs
73
        """
74
        rc = api.get_tool("reference_catalog")
75
        backreferences = rc.getReferences(instance, self.relationship)
76
        return map(lambda ref: ref.targetUID, backreferences)
77
78
    def preprocess_value(self, value, default=tuple()):
79
        """Preprocess the value for set
80
        """
81
        # empty value
82
        if not value:
83
            return default
84
85
        # list with one empty item
86
        if isinstance(value, (list, tuple)):
87
            if len(value) == 1 and not value[0]:
88
                return default
89
90
        if not isinstance(value, (list, tuple)):
91
            value = value,
92
93
        return value
94
95
    def link_version(self, source, target):
96
        """Link the current version of the target on the source
97
        """
98
        if not hasattr(target, VERSION_ID):
99
            # no initial version of this object!
100
            logger.warn("No iniatial version found for '{}'"
101
                        .format(repr(target)))
102
            return
103
104
        if not hasattr(source, REFERENCE_VERSIONS):
105
            source.reference_versions = {}
106
107
        target_uid = api.get_uid(target)
108
        # store the current version of the target on the source
109
        source.reference_versions[target_uid] = target.version_id
110
        # persist changes that occured referenced versions
111
        source._p_changed = 1
112
113
    def unlink_version(self, source, target):
114
        """Unlink the current version of the target from the source
115
        """
116
        if not hasattr(source, REFERENCE_VERSIONS):
117
            return
118
        target_uid = api.get_uid(target)
119
        if target_uid in source.reference_versions[target_uid]:
120
            # delete the version
121
            del source.reference_versions[target_uid]
122
            # persist changes that occured referenced versions
123
            source._p_changed = 1
124
        else:
125
            logger.warn("No version link found on '{}' -> '{}'"
126
                        .format(repr(source), repr(target)))
127
128
    def add_reference(self, source, target, **kwargs):
129
        """Add a new reference
130
        """
131
        # Tweak keyword arguments for addReference
132
        addRef_kw = kwargs.copy()
133
        addRef_kw.setdefault("referenceClass", self.referenceClass)
134
        if "schema" in addRef_kw:
135
            del addRef_kw["schema"]
136
        uid = api.get_uid(target)
137
        rc = api.get_tool("reference_catalog")
138
        # throws IndexError if uid is invalid
139
        rc.addReference(source, uid, self.relationship, **addRef_kw)
140
        # link the version of the reference
141
        self.link_version(source, target)
142
143
    def del_reference(self, source, target, **kwargs):
144
        """Remove existing reference
145
        """
146
        rc = api.get_tool("reference_catalog")
147
        uid = api.get_uid(target)
148
        rc.deleteReference(source, uid, self.relationship)
149
        # unlink the version of the reference
150
        self.link_version(source, target)
151
152
    @security.private
153
    def set(self, instance, value, **kwargs):
154
        """Set (multi-)references
155
        """
156
        value = self.preprocess_value(value)
157
        existing_uids = self.get_backreferences_for(instance)
158
159
        if not value and not existing_uids:
160
            logger.warning("Field and value is empty!")
161
            return
162
163
        if not self.multiValued and len(value) > 1:
164
            raise ValueError("Multiple values given for single valued field {}"
165
                             .format(repr(self)))
166
167
        set_uids = []
168
        for val in value:
169
            if api.is_uid(val):
170
                set_uids.append(val)
171
            elif api.is_object(val):
172
                set_uids.append(api.get_uid(val))
173
            else:
174
                logger.error("Target has no UID: %s/%s" % (val, value))
175
176
        sub = filter(lambda uid: uid not in set_uids, existing_uids)
0 ignored issues
show
introduced by
The variable set_uids does not seem to be defined for all execution paths.
Loading history...
177
        add = filter(lambda uid: uid not in existing_uids, set_uids)
178
179
        for uid in set(existing_uids + set_uids):
180
            # The object to link
181
            target = api.get_object(uid)
182
            # Add reference to object
183
            if uid in add:
184
                __traceback_info__ = (instance, uid, value, existing_uids)
185
                self.add_reference(instance, target, **kwargs)
186
            # Delete reference to object
187
            elif uid in sub:
188
                self.del_reference(instance, target, **kwargs)
189
190
    @security.private
191
    def get(self, instance, aslist=False, **kwargs):
192
        """Get (multi-)references
193
        """
194
        refs = self.get_versioned_references_for(instance)
195
196
        if not self.multiValued:
197
            if len(refs) > 1:
198
                logger.warning("Found {} references for non-multivalued "
199
                               "reference field '{}' of {}".format(
200
                                   len(refs), self.getName(), repr(instance)))
201
            if not aslist:
202
                if refs:
203
                    refs = refs[0]
204
                else:
205
                    refs = None
206
207
        if not self.referencesSortable or not hasattr(
208
                aq_base(instance), "at_ordered_refs"):
209
            return refs
210
211
        refs = instance.at_ordered_refs
212
        order = refs[self.relationship]
213
        if order is None:
214
            return refs
215
216
        by_uid = dict(map(lambda ob: (api.get_uid(ob), ob), refs))
217
        return [by_uid[uid] for uid in order if uid in by_uid]
218
219
220
registerField(HistoryAwareReferenceField,
221
              title="History Aware Reference",
222
              description="")
223