| Total Complexity | 58 |
| Total Lines | 399 |
| Duplicated Lines | 17.54 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.browser.workflow.analysisrequest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # |
||
| 3 | # This file is part of SENAITE.CORE. |
||
| 4 | # |
||
| 5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
| 6 | # the terms of the GNU General Public License as published by the Free Software |
||
| 7 | # Foundation, version 2. |
||
| 8 | # |
||
| 9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
| 10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
| 11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
| 12 | # details. |
||
| 13 | # |
||
| 14 | # You should have received a copy of the GNU General Public License along with |
||
| 15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
| 16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
| 17 | # |
||
| 18 | # Copyright 2018-2025 by it's authors. |
||
| 19 | # Some rights reserved, see README and LICENSE. |
||
| 20 | |||
| 21 | from bika.lims import api |
||
| 22 | from bika.lims import bikaMessageFactory as _ |
||
| 23 | from bika.lims.browser.workflow import RequestContextAware |
||
| 24 | from bika.lims.browser.workflow import WorkflowActionGenericAdapter |
||
| 25 | from bika.lims.content.analysisspec import ResultsRangeDict |
||
| 26 | from bika.lims.interfaces import IAnalysisRequest |
||
| 27 | from bika.lims.interfaces import IWorkflowActionUIDsAdapter |
||
| 28 | from DateTime import DateTime |
||
| 29 | from zope.interface import implements |
||
| 30 | |||
| 31 | |||
| 32 | class WorkflowActionCopyToNewAdapter(RequestContextAware): |
||
| 33 | """Adapter in charge of Analysis Requests 'copy_to_new' action |
||
| 34 | """ |
||
| 35 | implements(IWorkflowActionUIDsAdapter) |
||
| 36 | |||
| 37 | def __call__(self, action, uids): |
||
| 38 | url = "{}/ar_add?ar_count={}©_from={}".format( |
||
| 39 | self.back_url, len(uids), ",".join(uids)) |
||
| 40 | return self.redirect(redirect_url=url) |
||
| 41 | |||
| 42 | |||
| 43 | class WorkflowActionPrintStickersAdapter(RequestContextAware): |
||
| 44 | """Adapter in charge of Analysis Requests 'print_stickers' action |
||
| 45 | """ |
||
| 46 | implements(IWorkflowActionUIDsAdapter) |
||
| 47 | |||
| 48 | def __call__(self, action, uids): |
||
| 49 | url = "{}/sticker?items={}".format(self.back_url, ",".join(uids)) |
||
| 50 | return self.redirect(redirect_url=url) |
||
| 51 | |||
| 52 | |||
| 53 | class WorkflowActionCreatePartitionsAdapter(RequestContextAware): |
||
| 54 | """Adapter in charge of Analysis Requests 'copy_to_new' action |
||
| 55 | """ |
||
| 56 | implements(IWorkflowActionUIDsAdapter) |
||
| 57 | |||
| 58 | def __call__(self, action, uids): |
||
| 59 | url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids)) |
||
| 60 | return self.redirect(redirect_url=url) |
||
| 61 | |||
| 62 | |||
| 63 | class WorkflowActionPublishAdapter(RequestContextAware): |
||
| 64 | """Adapter in charge of Analysis Requests 'publish'-like actions |
||
| 65 | """ |
||
| 66 | implements(IWorkflowActionUIDsAdapter) |
||
| 67 | |||
| 68 | def __call__(self, action, uids): |
||
| 69 | uids = ",".join(uids) |
||
| 70 | portal = api.get_portal() |
||
| 71 | portal_url = api.get_url(portal) |
||
| 72 | url = "{}/samples/publish?items={}".format(portal_url, uids) |
||
| 73 | return self.redirect(redirect_url=url) |
||
| 74 | |||
| 75 | |||
| 76 | View Code Duplication | class WorkflowActionRejectAdapter(WorkflowActionGenericAdapter): |
|
|
|
|||
| 77 | """Adapter in charge of Analysis Requests 'reject' action |
||
| 78 | """ |
||
| 79 | |||
| 80 | def __call__(self, action, objects): |
||
| 81 | samples = filter(IAnalysisRequest.providedBy, objects) |
||
| 82 | if samples: |
||
| 83 | # Action reject applies to samples. Redirect to Sample Reject view |
||
| 84 | uids = map(api.get_uid, samples) |
||
| 85 | uids_str = ",".join(uids) |
||
| 86 | url = "{}/reject_samples?uids={}".format(self.back_url, uids_str) |
||
| 87 | return self.redirect(redirect_url=url) |
||
| 88 | |||
| 89 | # Generic transition if reject applies to other types (e.g. Analysis) |
||
| 90 | transitioned = self.do_action(action, objects) |
||
| 91 | if not transitioned: |
||
| 92 | return self.redirect(message=_("No changes made."), level="warning") |
||
| 93 | |||
| 94 | # Redirect the user to success page |
||
| 95 | ids = map(api.get_id, transitioned) |
||
| 96 | message = _("Rejected items: {}").format(", ".join(ids)) |
||
| 97 | return self.success(transitioned, message=message) |
||
| 98 | |||
| 99 | |||
| 100 | class WorkflowActionReceiveAdapter(WorkflowActionGenericAdapter): |
||
| 101 | """Adapter in charge of Analysis Request receive action |
||
| 102 | """ |
||
| 103 | |||
| 104 | def __call__(self, action, objects): |
||
| 105 | transitioned = self.do_action(action, objects) |
||
| 106 | if not transitioned: |
||
| 107 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 108 | |||
| 109 | auto_partitions = filter(self.is_auto_partition_required, objects) |
||
| 110 | if auto_partitions: |
||
| 111 | # Redirect to the partitioning view |
||
| 112 | uids = ",".join(map(api.get_uid, auto_partitions)) |
||
| 113 | url = "{}/partition_magic?uids={}".format(self.back_url, uids) |
||
| 114 | return self.redirect(redirect_url=url) |
||
| 115 | |||
| 116 | if self.is_auto_print_stickers_enabled(): |
||
| 117 | # Redirect to the auto-print stickers view |
||
| 118 | uids = ",".join(map(api.get_uid, transitioned)) |
||
| 119 | url = "{}/sticker?autoprint=1&items={}".format(self.back_url, uids) |
||
| 120 | return self.redirect(redirect_url=url) |
||
| 121 | |||
| 122 | # Redirect the user to success page |
||
| 123 | return self.success(transitioned) |
||
| 124 | |||
| 125 | def is_auto_partition_required(self, brain_or_object): |
||
| 126 | """Returns whether the passed in object needs to be partitioned |
||
| 127 | """ |
||
| 128 | obj = api.get_object(brain_or_object) |
||
| 129 | if not IAnalysisRequest.providedBy(obj): |
||
| 130 | return False |
||
| 131 | template = obj.getTemplate() |
||
| 132 | return template and template.getAutoPartition() |
||
| 133 | |||
| 134 | def is_auto_print_stickers_enabled(self): |
||
| 135 | """Returns whether the auto print of stickers on reception is enabled |
||
| 136 | """ |
||
| 137 | return "receive" in self.context.bika_setup.getAutoPrintStickers() |
||
| 138 | |||
| 139 | |||
| 140 | View Code Duplication | class WorkflowActionInvalidateAdapter(WorkflowActionGenericAdapter): |
|
| 141 | """Adapter in charge of Analysis Request invalidate action |
||
| 142 | """ |
||
| 143 | |||
| 144 | def __call__(self, action, objects): |
||
| 145 | samples = filter(IAnalysisRequest.providedBy, objects) |
||
| 146 | if samples: |
||
| 147 | # Redirect to the samples invalidation view |
||
| 148 | uids = map(api.get_uid, samples) |
||
| 149 | uids = ",".join(uids) |
||
| 150 | url = "%s/invalidate_samples?uids=%s" % (self.back_url, uids) |
||
| 151 | return self.redirect(redirect_url=url) |
||
| 152 | |||
| 153 | # Generic transition if invalidation applies to other types |
||
| 154 | transitioned = self.do_action(action, objects) |
||
| 155 | if not transitioned: |
||
| 156 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 157 | |||
| 158 | # Redirect the user to success page |
||
| 159 | ids = map(api.get_id, transitioned) |
||
| 160 | message = _("Invalidated items: {}").format(", ".join(ids)) |
||
| 161 | return self.success(transitioned, message=message) |
||
| 162 | |||
| 163 | |||
| 164 | class WorkflowActionPrintSampleAdapter(WorkflowActionGenericAdapter): |
||
| 165 | """Adapter in charge of Analysis Request print_sample action |
||
| 166 | """ |
||
| 167 | |||
| 168 | def __call__(self, action, objects): |
||
| 169 | # Update printed times |
||
| 170 | transitioned = filter(lambda obj: self.set_printed_time(obj), objects) |
||
| 171 | if not transitioned: |
||
| 172 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 173 | |||
| 174 | # Redirect the user to success page |
||
| 175 | return self.success(transitioned) |
||
| 176 | |||
| 177 | def set_printed_time(self, sample): |
||
| 178 | """Updates the printed time of the last results report from the sample |
||
| 179 | """ |
||
| 180 | if api.get_workflow_status_of(sample) != "published": |
||
| 181 | return False |
||
| 182 | |||
| 183 | reports = sample.objectIds("ARReport") |
||
| 184 | if not reports: |
||
| 185 | return False |
||
| 186 | |||
| 187 | last_report = sample.get(reports[-1]) |
||
| 188 | last_report.setDatePrinted(DateTime()) |
||
| 189 | sample.reindexObject(idxs=["getPrinted"]) |
||
| 190 | return True |
||
| 191 | |||
| 192 | |||
| 193 | class WorkflowActionSampleAdapter(WorkflowActionGenericAdapter): |
||
| 194 | """Adapter in charge of Analysis Request sample action |
||
| 195 | """ |
||
| 196 | |||
| 197 | def __call__(self, action, objects): |
||
| 198 | # Assign the Sampler and DateSampled |
||
| 199 | for obj in objects: |
||
| 200 | try: |
||
| 201 | self.set_sampler_info(obj) |
||
| 202 | except ValueError as e: |
||
| 203 | return self.redirect(message=str(e), level="warning") |
||
| 204 | |||
| 205 | # Trigger "sample" transition |
||
| 206 | transitioned = self.do_action(action, objects) |
||
| 207 | if not transitioned: |
||
| 208 | message = _("Could not transition samples to the sampled state") |
||
| 209 | return self.redirect(message=message, level="warning") |
||
| 210 | |||
| 211 | # Redirect the user to success page |
||
| 212 | return self.success(transitioned) |
||
| 213 | |||
| 214 | def set_sampler_info(self, sample): |
||
| 215 | """Updates the Sampler and the Sample Date with the values provided in |
||
| 216 | the request. If neither Sampler nor SampleDate are present in the |
||
| 217 | request, returns False |
||
| 218 | """ |
||
| 219 | if sample.getSampler() and sample.getDateSampled(): |
||
| 220 | # Sampler and Date Sampled already set. This is correct |
||
| 221 | return True |
||
| 222 | |||
| 223 | # Try to get the sampler and date sampled from the request. |
||
| 224 | # This might happen when the "Sample" transition is triggered from the |
||
| 225 | # samples listing view (form keys == column names of the listing) |
||
| 226 | |||
| 227 | # try to get the sampler from the request |
||
| 228 | sampler = self.get_form_value("getSampler", sample, |
||
| 229 | sample.getSampler()) |
||
| 230 | if not sampler: |
||
| 231 | sid = api.get_id(sample) |
||
| 232 | raise ValueError(_("Sampler required for sample %s" % sid)) |
||
| 233 | |||
| 234 | # try to get the date sampled from the request |
||
| 235 | sampled = self.get_form_value("getDateSampled", sample, |
||
| 236 | sample.getDateSampled()) |
||
| 237 | if not sampled: |
||
| 238 | sid = api.get_id(sample) |
||
| 239 | raise ValueError(_("Sample date required for sample %s" % sid)) |
||
| 240 | |||
| 241 | # set the field values |
||
| 242 | sample.setSampler(sampler) |
||
| 243 | sample.setDateSampled(sampled) |
||
| 244 | |||
| 245 | return True |
||
| 246 | |||
| 247 | |||
| 248 | class WorkflowActionPreserveAdapter(WorkflowActionGenericAdapter): |
||
| 249 | """Adapter in charge of Analysis Request preserve action |
||
| 250 | """ |
||
| 251 | |||
| 252 | View Code Duplication | def __call__(self, action, objects): |
|
| 253 | # Assign the Preserver and DatePreserved |
||
| 254 | transitioned = filter(lambda obj: self.set_preserver_info(obj), objects) |
||
| 255 | if not transitioned: |
||
| 256 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 257 | |||
| 258 | # Trigger "preserve" transition |
||
| 259 | transitioned = self.do_action(action, transitioned) |
||
| 260 | if not transitioned: |
||
| 261 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 262 | |||
| 263 | # Redirect the user to success page |
||
| 264 | return self.success(transitioned) |
||
| 265 | |||
| 266 | def set_preserver_info(self, sample): |
||
| 267 | """Updates the Preserver and the Date Preserved with the values provided |
||
| 268 | in the request. If neither Preserver nor DatePreserved are present in |
||
| 269 | the request, returns False |
||
| 270 | """ |
||
| 271 | if sample.getPreserver() and sample.getDatePreserved(): |
||
| 272 | # Preserver and Date Preserved already set. This is correct |
||
| 273 | return True |
||
| 274 | preserver = self.get_form_value("Preserver", sample, |
||
| 275 | sample.getPreserver()) |
||
| 276 | preserved = self.get_form_value("getDatePreserved", |
||
| 277 | sample.getDatePreserved()) |
||
| 278 | if not all([preserver, preserved]): |
||
| 279 | return False |
||
| 280 | sample.setPreserver(preserver) |
||
| 281 | sample.setDatePreserver(DateTime(preserved)) |
||
| 282 | return True |
||
| 283 | |||
| 284 | |||
| 285 | class WorkflowActionScheduleSamplingAdapter(WorkflowActionGenericAdapter): |
||
| 286 | """Adapter in charge of Analysis request schedule sampling action |
||
| 287 | """ |
||
| 288 | |||
| 289 | View Code Duplication | def __call__(self, action, objects): |
|
| 290 | # Assign the scheduled Sampler and Sampling Date |
||
| 291 | transitioned = filter(lambda obj: self.set_sampling_info(obj), objects) |
||
| 292 | if not transitioned: |
||
| 293 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 294 | |||
| 295 | # Trigger "schedule_sampling" transition |
||
| 296 | transitioned = self.do_action(action, transitioned) |
||
| 297 | if not transitioned: |
||
| 298 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 299 | |||
| 300 | # Redirect the user to success page |
||
| 301 | return self.success(transitioned) |
||
| 302 | |||
| 303 | def set_sampling_info(self, sample): |
||
| 304 | """Updates the scheduled Sampling sampler and the Sampling Date with the |
||
| 305 | values provided in the request. If neither Sampling sampler nor Sampling |
||
| 306 | Date are present in the request, returns False |
||
| 307 | """ |
||
| 308 | if sample.getScheduledSamplingSampler() and sample.getSamplingDate(): |
||
| 309 | return True |
||
| 310 | sampler = self.get_form_value("getScheduledSamplingSampler", sample, |
||
| 311 | sample.getScheduledSamplingSampler()) |
||
| 312 | sampled = self.get_form_value("getSamplingDate", |
||
| 313 | sample.getSamplingDate()) |
||
| 314 | if not all([sampler, sampled]): |
||
| 315 | return False |
||
| 316 | sample.setScheduledSamplingSampler(sampler) |
||
| 317 | sample.setSamplingDate(DateTime(sampled)) |
||
| 318 | return True |
||
| 319 | |||
| 320 | |||
| 321 | class WorkflowActionSaveAnalysesAdapter(WorkflowActionGenericAdapter): |
||
| 322 | """Adapter in charge of "save analyses" action in Analysis Request. |
||
| 323 | """ |
||
| 324 | |||
| 325 | def __call__(self, action, objects): |
||
| 326 | """The objects passed in are Analysis Services and the context is the |
||
| 327 | Analysis Request |
||
| 328 | """ |
||
| 329 | sample = self.context |
||
| 330 | if not IAnalysisRequest.providedBy(sample): |
||
| 331 | return self.redirect(message=_("No changes made"), level="warning") |
||
| 332 | |||
| 333 | # NOTE: https://github.com/senaite/senaite.core/issues/1276 |
||
| 334 | # |
||
| 335 | # Explicitly lookup the UIDs from the request, because the default |
||
| 336 | # behavior of the method `get_uids` in `WorkflowActionGenericAdapter` |
||
| 337 | # falls back to the UID of the current context if no UIDs were |
||
| 338 | # submitted, which is in that case an `AnalysisRequest`. |
||
| 339 | uids = self.get_uids_from_request() |
||
| 340 | services = map(api.get_object, uids) |
||
| 341 | |||
| 342 | # Get form values |
||
| 343 | form = self.request.form |
||
| 344 | prices = form.get("Price", [None])[0] |
||
| 345 | hidden = map(lambda o: { |
||
| 346 | "uid": api.get_uid(o), "hidden": self.is_hidden(o) |
||
| 347 | }, services) |
||
| 348 | |||
| 349 | # Do not overwrite default result ranges set through sample |
||
| 350 | # specification field unless the edition of specs at analysis |
||
| 351 | # level is explicitely allowed |
||
| 352 | specs = [] |
||
| 353 | if self.is_ar_specs_enabled: |
||
| 354 | specs = map(lambda service: self.get_specs(service), services) |
||
| 355 | |||
| 356 | # Set new analyses to the sample |
||
| 357 | sample.setAnalysisServicesSettings(hidden) |
||
| 358 | sample.setAnalyses(uids, prices=prices, specs=specs, hidden=hidden) |
||
| 359 | |||
| 360 | # Just in case new analyses have been added while the Sample was in a |
||
| 361 | # "non-open" state (e.g. "to_be_verified") |
||
| 362 | self.do_action("rollback_to_receive", [sample]) |
||
| 363 | |||
| 364 | # Reindex the analyses that have been added |
||
| 365 | for analysis in sample.objectValues("Analysis"): |
||
| 366 | analysis.reindexObject() |
||
| 367 | |||
| 368 | # Reindex the Sample |
||
| 369 | sample.reindexObject() |
||
| 370 | |||
| 371 | # Redirect the user to success page |
||
| 372 | self.success([sample]) |
||
| 373 | |||
| 374 | @property |
||
| 375 | def is_ar_specs_enabled(self): |
||
| 376 | """Returns whether the assignment of specs at analysis level within |
||
| 377 | sample context is enabled or not |
||
| 378 | """ |
||
| 379 | setup = api.get_setup() |
||
| 380 | return setup.getEnableARSpecs() |
||
| 381 | |||
| 382 | def is_hidden(self, service): |
||
| 383 | """Returns whether the request Hidden param for the given obj is True |
||
| 384 | """ |
||
| 385 | uid = api.get_uid(service) |
||
| 386 | hidden_ans = self.request.form.get("Hidden", {}) |
||
| 387 | return hidden_ans.get(uid, "") == "on" |
||
| 388 | |||
| 389 | def get_specs(self, service): |
||
| 390 | """Returns the analysis specs available in the request for the given uid |
||
| 391 | """ |
||
| 392 | uid = api.get_uid(service) |
||
| 393 | keyword = service.getKeyword() |
||
| 394 | specs = ResultsRangeDict(keyword=keyword, uid=uid).copy() |
||
| 395 | for key in specs.keys(): |
||
| 396 | specs_value = self.request.form.get(key, [{}])[0].get(uid, None) |
||
| 397 | specs[key] = specs_value or specs.get(key) |
||
| 398 | return specs |
||
| 399 |