| Total Complexity | 54 | 
| Total Lines | 418 | 
| Duplicated Lines | 4.31 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.browser.fields.aranalysesfield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*-  | 
            ||
| 2 | #  | 
            ||
| 3 | # This file is part of SENAITE.CORE.  | 
            ||
| 4 | #  | 
            ||
| 5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under  | 
            ||
| 6 | # the terms of the GNU General Public License as published by the Free Software  | 
            ||
| 7 | # Foundation, version 2.  | 
            ||
| 8 | #  | 
            ||
| 9 | # This program is distributed in the hope that it will be useful, but WITHOUT  | 
            ||
| 10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS  | 
            ||
| 11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more  | 
            ||
| 12 | # details.  | 
            ||
| 13 | #  | 
            ||
| 14 | # You should have received a copy of the GNU General Public License along with  | 
            ||
| 15 | # this program; if not, write to the Free Software Foundation, Inc., 51  | 
            ||
| 16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.  | 
            ||
| 17 | #  | 
            ||
| 18 | # Copyright 2018-2021 by it's authors.  | 
            ||
| 19 | # Some rights reserved, see README and LICENSE.  | 
            ||
| 20 | |||
| 21 | import itertools  | 
            ||
| 22 | |||
| 23 | from AccessControl import ClassSecurityInfo  | 
            ||
| 24 | from AccessControl import Unauthorized  | 
            ||
| 25 | from bika.lims import api  | 
            ||
| 26 | from bika.lims import logger  | 
            ||
| 27 | from bika.lims.api.security import check_permission  | 
            ||
| 28 | from bika.lims.catalog import CATALOG_ANALYSIS_LISTING  | 
            ||
| 29 | from bika.lims.catalog import SETUP_CATALOG  | 
            ||
| 30 | from bika.lims.interfaces import IAnalysis  | 
            ||
| 31 | from bika.lims.interfaces import IAnalysisService  | 
            ||
| 32 | from bika.lims.interfaces import IARAnalysesField  | 
            ||
| 33 | from bika.lims.interfaces import ISubmitted  | 
            ||
| 34 | from bika.lims.permissions import AddAnalysis  | 
            ||
| 35 | from bika.lims.utils.analysis import create_analysis  | 
            ||
| 36 | from Products.Archetypes.public import Field  | 
            ||
| 37 | from Products.Archetypes.public import ObjectField  | 
            ||
| 38 | from Products.Archetypes.Registry import registerField  | 
            ||
| 39 | from zope.interface import implements  | 
            ||
| 40 | |||
| 41 | DETACHED_STATES = ["cancelled", "retracted", "rejected"]  | 
            ||
| 42 | |||
| 43 | |||
| 44 | """Field to manage Analyses on ARs  | 
            ||
| 45 | |||
| 46 | Please see the assigned doctest at tests/doctests/ARAnalysesField.rst  | 
            ||
| 47 | |||
| 48 | Run this test from the buildout directory:  | 
            ||
| 49 | |||
| 50 | bin/test test_textual_doctests -t ARAnalysesField  | 
            ||
| 51 | """  | 
            ||
| 52 | |||
| 53 | |||
| 54 | class ARAnalysesField(ObjectField):  | 
            ||
| 55 | """A field that stores Analyses instances  | 
            ||
| 56 | """  | 
            ||
| 57 | implements(IARAnalysesField)  | 
            ||
| 58 | |||
| 59 | security = ClassSecurityInfo()  | 
            ||
| 60 | _properties = Field._properties.copy()  | 
            ||
| 61 |     _properties.update({ | 
            ||
| 62 | "type": "analyses",  | 
            ||
| 63 | "default": None,  | 
            ||
| 64 | })  | 
            ||
| 65 | |||
| 66 |     security.declarePrivate('get') | 
            ||
| 67 | |||
| 68 | def get(self, instance, **kwargs):  | 
            ||
| 69 | """Returns a list of Analyses assigned to this AR  | 
            ||
| 70 | |||
| 71 | Return a list of catalog brains unless `full_objects=True` is passed.  | 
            ||
| 72 | Other keyword arguments are passed to bika_analysis_catalog  | 
            ||
| 73 | |||
| 74 | :param instance: Analysis Request object  | 
            ||
| 75 | :param kwargs: Keyword arguments to inject in the search query  | 
            ||
| 76 | :returns: A list of Analysis Objects/Catalog Brains  | 
            ||
| 77 | """  | 
            ||
| 78 | # Do we need to return objects or brains  | 
            ||
| 79 |         full_objects = kwargs.get("full_objects", False) | 
            ||
| 80 | |||
| 81 | # Bail out parameters from kwargs that don't match with indexes  | 
            ||
| 82 | catalog = api.get_tool(CATALOG_ANALYSIS_LISTING)  | 
            ||
| 83 | indexes = catalog.indexes()  | 
            ||
| 84 | query = dict([(k, v) for k, v in kwargs.items() if k in indexes])  | 
            ||
| 85 | |||
| 86 | # Do the search against the catalog  | 
            ||
| 87 | query["portal_type"] = "Analysis"  | 
            ||
| 88 | query["getAncestorsUIDs"] = api.get_uid(instance)  | 
            ||
| 89 | brains = catalog(query)  | 
            ||
| 90 | if full_objects:  | 
            ||
| 91 | return map(api.get_object, brains)  | 
            ||
| 92 | return brains  | 
            ||
| 93 | |||
| 94 |     security.declarePrivate('set') | 
            ||
| 95 | |||
| 96 | def set(self, instance, items, prices=None, specs=None, hidden=None, **kw):  | 
            ||
| 97 | """Set/Assign Analyses to this AR  | 
            ||
| 98 | |||
| 99 | :param items: List of Analysis objects/brains, AnalysisService  | 
            ||
| 100 | objects/brains and/or Analysis Service uids  | 
            ||
| 101 | :type items: list  | 
            ||
| 102 | :param prices: Mapping of AnalysisService UID -> price  | 
            ||
| 103 | :type prices: dict  | 
            ||
| 104 | :param specs: List of AnalysisService UID -> Result Range mappings  | 
            ||
| 105 | :type specs: list  | 
            ||
| 106 | :param hidden: List of AnalysisService UID -> Hidden mappings  | 
            ||
| 107 | :type hidden: list  | 
            ||
| 108 | :returns: list of new assigned Analyses  | 
            ||
| 109 | """  | 
            ||
| 110 | if items is None:  | 
            ||
| 111 | items = []  | 
            ||
| 112 | |||
| 113 | # Bail out if the items is not a list type  | 
            ||
| 114 | if not isinstance(items, (list, tuple)):  | 
            ||
| 115 | raise TypeError(  | 
            ||
| 116 |                 "Items parameter must be a tuple or list, got '{}'".format( | 
            ||
| 117 | type(items)))  | 
            ||
| 118 | |||
| 119 | # Bail out if the AR is inactive  | 
            ||
| 120 | if not api.is_active(instance):  | 
            ||
| 121 |             raise Unauthorized("Inactive ARs can not be modified") | 
            ||
| 122 | |||
| 123 | # Bail out if the user has not the right permission  | 
            ||
| 124 | if not check_permission(AddAnalysis, instance):  | 
            ||
| 125 |             raise Unauthorized("You do not have the '{}' permission" | 
            ||
| 126 | .format(AddAnalysis))  | 
            ||
| 127 | |||
| 128 | # Convert the items to a valid list of AnalysisServices  | 
            ||
| 129 | services = filter(None, map(self._to_service, items))  | 
            ||
| 130 | |||
| 131 | # Calculate dependencies  | 
            ||
| 132 | dependencies = map(lambda s: s.getServiceDependencies(), services)  | 
            ||
| 133 | dependencies = list(itertools.chain.from_iterable(dependencies))  | 
            ||
| 134 | |||
| 135 | # Merge dependencies and services  | 
            ||
| 136 | services = set(services + dependencies)  | 
            ||
| 137 | |||
| 138 | # Modify existing AR specs with new form values of selected analyses  | 
            ||
| 139 | specs = self.resolve_specs(instance, specs)  | 
            ||
| 140 | |||
| 141 | # Add analyses  | 
            ||
| 142 | params = dict(prices=prices, hidden=hidden, specs=specs)  | 
            ||
| 143 | map(lambda serv: self.add_analysis(instance, serv, **params), services)  | 
            ||
| 144 | |||
| 145 | # Get all analyses (those from descendants included)  | 
            ||
| 146 |         analyses = instance.objectValues("Analysis") | 
            ||
| 147 | analyses.extend(self.get_analyses_from_descendants(instance))  | 
            ||
| 148 | |||
| 149 | # Bail out those not in services list or submitted  | 
            ||
| 150 | uids = map(api.get_uid, services)  | 
            ||
| 151 | to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses)  | 
            ||
| 152 | to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove)  | 
            ||
| 153 | |||
| 154 | # Remove analyses  | 
            ||
| 155 | map(self.remove_analysis, to_remove)  | 
            ||
| 156 | |||
| 157 | def resolve_specs(self, instance, results_ranges):  | 
            ||
| 158 | """Returns a dictionary where the key is the service_uid and the value  | 
            ||
| 159 | is its results range. The dictionary is made by extending the  | 
            ||
| 160 | results_ranges passed-in with the Sample's ResultsRanges (a copy of the  | 
            ||
| 161 | specifications initially set)  | 
            ||
| 162 | """  | 
            ||
| 163 | rrs = results_ranges or []  | 
            ||
| 164 | |||
| 165 | # Sample's Results ranges  | 
            ||
| 166 | sample_rrs = instance.getResultsRange()  | 
            ||
| 167 | |||
| 168 | # Ensure all subfields from specification are kept and missing values  | 
            ||
| 169 | # for subfields are filled in accordance with the specs  | 
            ||
| 170 | rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs)  | 
            ||
| 171 | |||
| 172 | # Append those from sample that are missing in the ranges passed-in  | 
            ||
| 173 | service_uids = map(lambda rr: rr["uid"], rrs)  | 
            ||
| 174 | rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs))  | 
            ||
| 175 | |||
| 176 | # Create a dict for easy access to results ranges  | 
            ||
| 177 | return dict(map(lambda rr: (rr["uid"], rr), rrs))  | 
            ||
| 178 | |||
| 179 | def resolve_range(self, result_range, sample_result_ranges):  | 
            ||
| 180 | """Resolves the range by adding the uid if not present and filling the  | 
            ||
| 181 | missing subfield values with those that come from the Sample  | 
            ||
| 182 | specification if they are not present in the result_range passed-in  | 
            ||
| 183 | """  | 
            ||
| 184 | # Resolve result_range to make sure it contain uid subfield  | 
            ||
| 185 | rrs = self.resolve_uid(result_range)  | 
            ||
| 186 |         uid = rrs.get("uid") | 
            ||
| 187 | |||
| 188 | for sample_rr in sample_result_ranges:  | 
            ||
| 189 |             if uid and sample_rr.get("uid") == uid: | 
            ||
| 190 | # Keep same fields from sample  | 
            ||
| 191 | rr = sample_rr.copy()  | 
            ||
| 192 | rr.update(rrs)  | 
            ||
| 193 | return rr  | 
            ||
| 194 | |||
| 195 | # Return the original with no changes  | 
            ||
| 196 | return rrs  | 
            ||
| 197 | |||
| 198 | View Code Duplication | def resolve_uid(self, result_range):  | 
            |
| 
                                                                                                    
                        
                         | 
                |||
| 199 | """Resolves the uid key for the result_range passed in if it does not  | 
            ||
| 200 | exist when contains a keyword  | 
            ||
| 201 | """  | 
            ||
| 202 | value = result_range.copy()  | 
            ||
| 203 |         uid = value.get("uid") | 
            ||
| 204 | if api.is_uid(uid) and uid != "0":  | 
            ||
| 205 | return value  | 
            ||
| 206 | |||
| 207 | # uid key does not exist or is not valid, try to infere from keyword  | 
            ||
| 208 |         keyword = value.get("keyword") | 
            ||
| 209 | if keyword:  | 
            ||
| 210 | query = dict(portal_type="AnalysisService", getKeyword=keyword)  | 
            ||
| 211 | brains = api.search(query, SETUP_CATALOG)  | 
            ||
| 212 | if len(brains) == 1:  | 
            ||
| 213 | uid = api.get_uid(brains[0])  | 
            ||
| 214 | value["uid"] = uid  | 
            ||
| 215 | return value  | 
            ||
| 216 | |||
| 217 | def add_analysis(self, instance, service, **kwargs):  | 
            ||
| 218 | service_uid = api.get_uid(service)  | 
            ||
| 219 | |||
| 220 | # Ensure we have suitable parameters  | 
            ||
| 221 |         specs = kwargs.get("specs") or {} | 
            ||
| 222 | |||
| 223 | # Get the hidden status for the service  | 
            ||
| 224 |         hidden = kwargs.get("hidden") or [] | 
            ||
| 225 |         hidden = filter(lambda d: d.get("uid") == service_uid, hidden) | 
            ||
| 226 |         hidden = hidden and hidden[0].get("hidden") or service.getHidden() | 
            ||
| 227 | |||
| 228 | # Get the price for the service  | 
            ||
| 229 |         prices = kwargs.get("prices") or {} | 
            ||
| 230 | price = prices.get(service_uid) or service.getPrice()  | 
            ||
| 231 | |||
| 232 | # Gets the analysis or creates the analysis for this service  | 
            ||
| 233 | # Note this returns a list, because is possible to have multiple  | 
            ||
| 234 | # partitions with same analysis  | 
            ||
| 235 | analyses = self.resolve_analyses(instance, service)  | 
            ||
| 236 | |||
| 237 | # Filter out analyses in detached states  | 
            ||
| 238 | # This allows to re-add an analysis that was retracted or cancelled  | 
            ||
| 239 | analyses = filter(  | 
            ||
| 240 | lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES,  | 
            ||
| 241 | analyses)  | 
            ||
| 242 | |||
| 243 | if not analyses:  | 
            ||
| 244 | # Create the analysis  | 
            ||
| 245 | new_id = self.generate_analysis_id(instance, service)  | 
            ||
| 246 |             logger.info("Creating new analysis '{}'".format(new_id)) | 
            ||
| 247 | analysis = create_analysis(instance, service, id=new_id)  | 
            ||
| 248 | analyses.append(analysis)  | 
            ||
| 249 | |||
| 250 | for analysis in analyses:  | 
            ||
| 251 | # Set the hidden status  | 
            ||
| 252 | analysis.setHidden(hidden)  | 
            ||
| 253 | |||
| 254 | # Set the price of the Analysis  | 
            ||
| 255 | analysis.setPrice(price)  | 
            ||
| 256 | |||
| 257 | # Set the internal use status  | 
            ||
| 258 | parent_sample = analysis.getRequest()  | 
            ||
| 259 | analysis.setInternalUse(parent_sample.getInternalUse())  | 
            ||
| 260 | |||
| 261 | # Set the result range to the analysis  | 
            ||
| 262 | analysis_rr = specs.get(service_uid) or analysis.getResultsRange()  | 
            ||
| 263 | analysis.setResultsRange(analysis_rr)  | 
            ||
| 264 | analysis.reindexObject()  | 
            ||
| 265 | |||
| 266 | def generate_analysis_id(self, instance, service):  | 
            ||
| 267 | """Generate a new analysis ID  | 
            ||
| 268 | """  | 
            ||
| 269 | count = 1  | 
            ||
| 270 | keyword = service.getKeyword()  | 
            ||
| 271 | new_id = keyword  | 
            ||
| 272 | while new_id in instance.objectIds():  | 
            ||
| 273 |             new_id = "{}-{}".format(keyword, count) | 
            ||
| 274 | count += 1  | 
            ||
| 275 | return new_id  | 
            ||
| 276 | |||
| 277 | def remove_analysis(self, analysis):  | 
            ||
| 278 | """Removes a given analysis from the instance  | 
            ||
| 279 | """  | 
            ||
| 280 | # Remember assigned attachments  | 
            ||
| 281 | # https://github.com/senaite/senaite.core/issues/1025  | 
            ||
| 282 | attachments = analysis.getAttachment()  | 
            ||
| 283 | analysis.setAttachment([])  | 
            ||
| 284 | |||
| 285 | # If assigned to a worksheet, unassign it before deletion  | 
            ||
| 286 | worksheet = analysis.getWorksheet()  | 
            ||
| 287 | if worksheet:  | 
            ||
| 288 | worksheet.removeAnalysis(analysis)  | 
            ||
| 289 | |||
| 290 | # handle retest source deleted  | 
            ||
| 291 | retest = analysis.getRetest()  | 
            ||
| 292 | if retest:  | 
            ||
| 293 | # unset reference link  | 
            ||
| 294 | retest.setRetestOf(None)  | 
            ||
| 295 | |||
| 296 | # Remove the analysis  | 
            ||
| 297 | # Note the analysis might belong to a partition  | 
            ||
| 298 | analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)])  | 
            ||
| 299 | |||
| 300 | # Remove orphaned attachments  | 
            ||
| 301 | for attachment in attachments:  | 
            ||
| 302 | if not attachment.getLinkedAnalyses():  | 
            ||
| 303 | # only delete attachments which are no further linked  | 
            ||
| 304 | logger.info(  | 
            ||
| 305 |                     "Deleting attachment: {}".format(attachment.getId())) | 
            ||
| 306 | attachment_id = api.get_id(attachment)  | 
            ||
| 307 | api.get_parent(attachment).manage_delObjects(attachment_id)  | 
            ||
| 308 | |||
| 309 | def resolve_analyses(self, instance, service):  | 
            ||
| 310 | """Resolves analyses for the service and instance  | 
            ||
| 311 | It returns a list, cause for a given sample, multiple analyses for same  | 
            ||
| 312 | service can exist due to the possibility of having multiple partitions  | 
            ||
| 313 | """  | 
            ||
| 314 | analyses = []  | 
            ||
| 315 | |||
| 316 | # Does the analysis exists in this instance already?  | 
            ||
| 317 | instance_analyses = self.get_from_instance(instance, service)  | 
            ||
| 318 | |||
| 319 | if instance_analyses:  | 
            ||
| 320 | analyses.extend(instance_analyses)  | 
            ||
| 321 | |||
| 322 | # Does the analysis exists in an ancestor?  | 
            ||
| 323 | from_ancestor = self.get_from_ancestor(instance, service)  | 
            ||
| 324 | for ancestor_analysis in from_ancestor:  | 
            ||
| 325 | # Move the analysis into this instance. The ancestor's  | 
            ||
| 326 | # analysis will be masked otherwise  | 
            ||
| 327 | analysis_id = api.get_id(ancestor_analysis)  | 
            ||
| 328 |             logger.info("Analysis {} is from an ancestor".format(analysis_id)) | 
            ||
| 329 | cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id)  | 
            ||
| 330 | instance.manage_pasteObjects(cp)  | 
            ||
| 331 | analyses.append(instance._getOb(analysis_id))  | 
            ||
| 332 | |||
| 333 | # Does the analysis exists in descendants?  | 
            ||
| 334 | from_descendant = self.get_from_descendant(instance, service)  | 
            ||
| 335 | analyses.extend(from_descendant)  | 
            ||
| 336 | |||
| 337 | return analyses  | 
            ||
| 338 | |||
| 339 | def get_analyses_from_descendants(self, instance):  | 
            ||
| 340 | """Returns all the analyses from descendants  | 
            ||
| 341 | """  | 
            ||
| 342 | analyses = []  | 
            ||
| 343 | for descendant in instance.getDescendants(all_descendants=True):  | 
            ||
| 344 |             analyses.extend(descendant.objectValues("Analysis")) | 
            ||
| 345 | return analyses  | 
            ||
| 346 | |||
| 347 | def get_from_instance(self, instance, service):  | 
            ||
| 348 | """Returns analyses for the given service from the instance  | 
            ||
| 349 | """  | 
            ||
| 350 | service_uid = api.get_uid(service)  | 
            ||
| 351 |         analyses = instance.objectValues("Analysis") | 
            ||
| 352 | # Filter those analyses with same keyword. Note that a Sample can  | 
            ||
| 353 | # contain more than one analysis with same keyword because of retests  | 
            ||
| 354 | return filter(lambda an: an.getServiceUID() == service_uid, analyses)  | 
            ||
| 355 | |||
| 356 | def get_from_ancestor(self, instance, service):  | 
            ||
| 357 | """Returns analyses for the given service from ancestors  | 
            ||
| 358 | """  | 
            ||
| 359 | ancestor = instance.getParentAnalysisRequest()  | 
            ||
| 360 | if not ancestor:  | 
            ||
| 361 | return []  | 
            ||
| 362 | |||
| 363 | analyses = self.get_from_instance(ancestor, service)  | 
            ||
| 364 | return analyses or self.get_from_ancestor(ancestor, service)  | 
            ||
| 365 | |||
| 366 | def get_from_descendant(self, instance, service):  | 
            ||
| 367 | """Returns analyses for the given service from descendants  | 
            ||
| 368 | """  | 
            ||
| 369 | analyses = []  | 
            ||
| 370 | for descendant in instance.getDescendants():  | 
            ||
| 371 | # Does the analysis exists in the current descendant?  | 
            ||
| 372 | descendant_analyses = self.get_from_instance(descendant, service)  | 
            ||
| 373 | if descendant_analyses:  | 
            ||
| 374 | analyses.extend(descendant_analyses)  | 
            ||
| 375 | |||
| 376 | # Search in descendants from current descendant  | 
            ||
| 377 | from_descendant = self.get_from_descendant(descendant, service)  | 
            ||
| 378 | analyses.extend(from_descendant)  | 
            ||
| 379 | |||
| 380 | return analyses  | 
            ||
| 381 | |||
| 382 | def _to_service(self, thing):  | 
            ||
| 383 | """Convert to Analysis Service  | 
            ||
| 384 | |||
| 385 | :param thing: UID/Catalog Brain/Object/Something  | 
            ||
| 386 | :returns: Analysis Service object or None  | 
            ||
| 387 | """  | 
            ||
| 388 | |||
| 389 | # Convert UIDs to objects  | 
            ||
| 390 | if api.is_uid(thing):  | 
            ||
| 391 | thing = api.get_object_by_uid(thing, None)  | 
            ||
| 392 | |||
| 393 | # Bail out if the thing is not a valid object  | 
            ||
| 394 | if not api.is_object(thing):  | 
            ||
| 395 |             logger.warn("'{}' is not a valid object!".format(repr(thing))) | 
            ||
| 396 | return None  | 
            ||
| 397 | |||
| 398 | # Ensure we have an object here and not a brain  | 
            ||
| 399 | obj = api.get_object(thing)  | 
            ||
| 400 | |||
| 401 | if IAnalysisService.providedBy(obj):  | 
            ||
| 402 | return obj  | 
            ||
| 403 | |||
| 404 | if IAnalysis.providedBy(obj):  | 
            ||
| 405 | return obj.getAnalysisService()  | 
            ||
| 406 | |||
| 407 | # An object, but neither an Analysis nor AnalysisService?  | 
            ||
| 408 | # This should never happen.  | 
            ||
| 409 | portal_type = api.get_portal_type(obj)  | 
            ||
| 410 |         logger.error("ARAnalysesField doesn't accept objects from {} type. " | 
            ||
| 411 | "The object will be dismissed.".format(portal_type))  | 
            ||
| 412 | return None  | 
            ||
| 413 | |||
| 414 | |||
| 415 | registerField(ARAnalysesField,  | 
            ||
| 416 | title="Analyses",  | 
            ||
| 417 | description="Manages Analyses of ARs")  | 
            ||
| 418 |