Total Complexity | 58 |
Total Lines | 399 |
Duplicated Lines | 17.54 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.browser.workflow.analysisrequest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2025 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | from bika.lims import api |
||
22 | from bika.lims import bikaMessageFactory as _ |
||
23 | from bika.lims.browser.workflow import RequestContextAware |
||
24 | from bika.lims.browser.workflow import WorkflowActionGenericAdapter |
||
25 | from bika.lims.content.analysisspec import ResultsRangeDict |
||
26 | from bika.lims.interfaces import IAnalysisRequest |
||
27 | from bika.lims.interfaces import IWorkflowActionUIDsAdapter |
||
28 | from DateTime import DateTime |
||
29 | from zope.interface import implements |
||
30 | |||
31 | |||
32 | class WorkflowActionCopyToNewAdapter(RequestContextAware): |
||
33 | """Adapter in charge of Analysis Requests 'copy_to_new' action |
||
34 | """ |
||
35 | implements(IWorkflowActionUIDsAdapter) |
||
36 | |||
37 | def __call__(self, action, uids): |
||
38 | url = "{}/ar_add?ar_count={}©_from={}".format( |
||
39 | self.back_url, len(uids), ",".join(uids)) |
||
40 | return self.redirect(redirect_url=url) |
||
41 | |||
42 | |||
43 | class WorkflowActionPrintStickersAdapter(RequestContextAware): |
||
44 | """Adapter in charge of Analysis Requests 'print_stickers' action |
||
45 | """ |
||
46 | implements(IWorkflowActionUIDsAdapter) |
||
47 | |||
48 | def __call__(self, action, uids): |
||
49 | url = "{}/sticker?items={}".format(self.back_url, ",".join(uids)) |
||
50 | return self.redirect(redirect_url=url) |
||
51 | |||
52 | |||
53 | class WorkflowActionCreatePartitionsAdapter(RequestContextAware): |
||
54 | """Adapter in charge of Analysis Requests 'copy_to_new' action |
||
55 | """ |
||
56 | implements(IWorkflowActionUIDsAdapter) |
||
57 | |||
58 | def __call__(self, action, uids): |
||
59 | url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids)) |
||
60 | return self.redirect(redirect_url=url) |
||
61 | |||
62 | |||
63 | class WorkflowActionPublishAdapter(RequestContextAware): |
||
64 | """Adapter in charge of Analysis Requests 'publish'-like actions |
||
65 | """ |
||
66 | implements(IWorkflowActionUIDsAdapter) |
||
67 | |||
68 | def __call__(self, action, uids): |
||
69 | uids = ",".join(uids) |
||
70 | portal = api.get_portal() |
||
71 | portal_url = api.get_url(portal) |
||
72 | url = "{}/samples/publish?items={}".format(portal_url, uids) |
||
73 | return self.redirect(redirect_url=url) |
||
74 | |||
75 | |||
76 | View Code Duplication | class WorkflowActionRejectAdapter(WorkflowActionGenericAdapter): |
|
|
|||
77 | """Adapter in charge of Analysis Requests 'reject' action |
||
78 | """ |
||
79 | |||
80 | def __call__(self, action, objects): |
||
81 | samples = filter(IAnalysisRequest.providedBy, objects) |
||
82 | if samples: |
||
83 | # Action reject applies to samples. Redirect to Sample Reject view |
||
84 | uids = map(api.get_uid, samples) |
||
85 | uids_str = ",".join(uids) |
||
86 | url = "{}/reject_samples?uids={}".format(self.back_url, uids_str) |
||
87 | return self.redirect(redirect_url=url) |
||
88 | |||
89 | # Generic transition if reject applies to other types (e.g. Analysis) |
||
90 | transitioned = self.do_action(action, objects) |
||
91 | if not transitioned: |
||
92 | return self.redirect(message=_("No changes made."), level="warning") |
||
93 | |||
94 | # Redirect the user to success page |
||
95 | ids = map(api.get_id, transitioned) |
||
96 | message = _("Rejected items: {}").format(", ".join(ids)) |
||
97 | return self.success(transitioned, message=message) |
||
98 | |||
99 | |||
100 | class WorkflowActionReceiveAdapter(WorkflowActionGenericAdapter): |
||
101 | """Adapter in charge of Analysis Request receive action |
||
102 | """ |
||
103 | |||
104 | def __call__(self, action, objects): |
||
105 | transitioned = self.do_action(action, objects) |
||
106 | if not transitioned: |
||
107 | return self.redirect(message=_("No changes made"), level="warning") |
||
108 | |||
109 | auto_partitions = filter(self.is_auto_partition_required, objects) |
||
110 | if auto_partitions: |
||
111 | # Redirect to the partitioning view |
||
112 | uids = ",".join(map(api.get_uid, auto_partitions)) |
||
113 | url = "{}/partition_magic?uids={}".format(self.back_url, uids) |
||
114 | return self.redirect(redirect_url=url) |
||
115 | |||
116 | if self.is_auto_print_stickers_enabled(): |
||
117 | # Redirect to the auto-print stickers view |
||
118 | uids = ",".join(map(api.get_uid, transitioned)) |
||
119 | url = "{}/sticker?autoprint=1&items={}".format(self.back_url, uids) |
||
120 | return self.redirect(redirect_url=url) |
||
121 | |||
122 | # Redirect the user to success page |
||
123 | return self.success(transitioned) |
||
124 | |||
125 | def is_auto_partition_required(self, brain_or_object): |
||
126 | """Returns whether the passed in object needs to be partitioned |
||
127 | """ |
||
128 | obj = api.get_object(brain_or_object) |
||
129 | if not IAnalysisRequest.providedBy(obj): |
||
130 | return False |
||
131 | template = obj.getTemplate() |
||
132 | return template and template.getAutoPartition() |
||
133 | |||
134 | def is_auto_print_stickers_enabled(self): |
||
135 | """Returns whether the auto print of stickers on reception is enabled |
||
136 | """ |
||
137 | return "receive" in self.context.bika_setup.getAutoPrintStickers() |
||
138 | |||
139 | |||
140 | View Code Duplication | class WorkflowActionInvalidateAdapter(WorkflowActionGenericAdapter): |
|
141 | """Adapter in charge of Analysis Request invalidate action |
||
142 | """ |
||
143 | |||
144 | def __call__(self, action, objects): |
||
145 | samples = filter(IAnalysisRequest.providedBy, objects) |
||
146 | if samples: |
||
147 | # Redirect to the samples invalidation view |
||
148 | uids = map(api.get_uid, samples) |
||
149 | uids = ",".join(uids) |
||
150 | url = "%s/invalidate_samples?uids=%s" % (self.back_url, uids) |
||
151 | return self.redirect(redirect_url=url) |
||
152 | |||
153 | # Generic transition if invalidation applies to other types |
||
154 | transitioned = self.do_action(action, objects) |
||
155 | if not transitioned: |
||
156 | return self.redirect(message=_("No changes made"), level="warning") |
||
157 | |||
158 | # Redirect the user to success page |
||
159 | ids = map(api.get_id, transitioned) |
||
160 | message = _("Invalidated items: {}").format(", ".join(ids)) |
||
161 | return self.success(transitioned, message=message) |
||
162 | |||
163 | |||
164 | class WorkflowActionPrintSampleAdapter(WorkflowActionGenericAdapter): |
||
165 | """Adapter in charge of Analysis Request print_sample action |
||
166 | """ |
||
167 | |||
168 | def __call__(self, action, objects): |
||
169 | # Update printed times |
||
170 | transitioned = filter(lambda obj: self.set_printed_time(obj), objects) |
||
171 | if not transitioned: |
||
172 | return self.redirect(message=_("No changes made"), level="warning") |
||
173 | |||
174 | # Redirect the user to success page |
||
175 | return self.success(transitioned) |
||
176 | |||
177 | def set_printed_time(self, sample): |
||
178 | """Updates the printed time of the last results report from the sample |
||
179 | """ |
||
180 | if api.get_workflow_status_of(sample) != "published": |
||
181 | return False |
||
182 | |||
183 | reports = sample.objectIds("ARReport") |
||
184 | if not reports: |
||
185 | return False |
||
186 | |||
187 | last_report = sample.get(reports[-1]) |
||
188 | last_report.setDatePrinted(DateTime()) |
||
189 | sample.reindexObject(idxs=["getPrinted"]) |
||
190 | return True |
||
191 | |||
192 | |||
193 | class WorkflowActionSampleAdapter(WorkflowActionGenericAdapter): |
||
194 | """Adapter in charge of Analysis Request sample action |
||
195 | """ |
||
196 | |||
197 | def __call__(self, action, objects): |
||
198 | # Assign the Sampler and DateSampled |
||
199 | for obj in objects: |
||
200 | try: |
||
201 | self.set_sampler_info(obj) |
||
202 | except ValueError as e: |
||
203 | return self.redirect(message=str(e), level="warning") |
||
204 | |||
205 | # Trigger "sample" transition |
||
206 | transitioned = self.do_action(action, objects) |
||
207 | if not transitioned: |
||
208 | message = _("Could not transition samples to the sampled state") |
||
209 | return self.redirect(message=message, level="warning") |
||
210 | |||
211 | # Redirect the user to success page |
||
212 | return self.success(transitioned) |
||
213 | |||
214 | def set_sampler_info(self, sample): |
||
215 | """Updates the Sampler and the Sample Date with the values provided in |
||
216 | the request. If neither Sampler nor SampleDate are present in the |
||
217 | request, returns False |
||
218 | """ |
||
219 | if sample.getSampler() and sample.getDateSampled(): |
||
220 | # Sampler and Date Sampled already set. This is correct |
||
221 | return True |
||
222 | |||
223 | # Try to get the sampler and date sampled from the request. |
||
224 | # This might happen when the "Sample" transition is triggered from the |
||
225 | # samples listing view (form keys == column names of the listing) |
||
226 | |||
227 | # try to get the sampler from the request |
||
228 | sampler = self.get_form_value("getSampler", sample, |
||
229 | sample.getSampler()) |
||
230 | if not sampler: |
||
231 | sid = api.get_id(sample) |
||
232 | raise ValueError(_("Sampler required for sample %s" % sid)) |
||
233 | |||
234 | # try to get the date sampled from the request |
||
235 | sampled = self.get_form_value("getDateSampled", sample, |
||
236 | sample.getDateSampled()) |
||
237 | if not sampled: |
||
238 | sid = api.get_id(sample) |
||
239 | raise ValueError(_("Sample date required for sample %s" % sid)) |
||
240 | |||
241 | # set the field values |
||
242 | sample.setSampler(sampler) |
||
243 | sample.setDateSampled(sampled) |
||
244 | |||
245 | return True |
||
246 | |||
247 | |||
248 | class WorkflowActionPreserveAdapter(WorkflowActionGenericAdapter): |
||
249 | """Adapter in charge of Analysis Request preserve action |
||
250 | """ |
||
251 | |||
252 | View Code Duplication | def __call__(self, action, objects): |
|
253 | # Assign the Preserver and DatePreserved |
||
254 | transitioned = filter(lambda obj: self.set_preserver_info(obj), objects) |
||
255 | if not transitioned: |
||
256 | return self.redirect(message=_("No changes made"), level="warning") |
||
257 | |||
258 | # Trigger "preserve" transition |
||
259 | transitioned = self.do_action(action, transitioned) |
||
260 | if not transitioned: |
||
261 | return self.redirect(message=_("No changes made"), level="warning") |
||
262 | |||
263 | # Redirect the user to success page |
||
264 | return self.success(transitioned) |
||
265 | |||
266 | def set_preserver_info(self, sample): |
||
267 | """Updates the Preserver and the Date Preserved with the values provided |
||
268 | in the request. If neither Preserver nor DatePreserved are present in |
||
269 | the request, returns False |
||
270 | """ |
||
271 | if sample.getPreserver() and sample.getDatePreserved(): |
||
272 | # Preserver and Date Preserved already set. This is correct |
||
273 | return True |
||
274 | preserver = self.get_form_value("Preserver", sample, |
||
275 | sample.getPreserver()) |
||
276 | preserved = self.get_form_value("getDatePreserved", |
||
277 | sample.getDatePreserved()) |
||
278 | if not all([preserver, preserved]): |
||
279 | return False |
||
280 | sample.setPreserver(preserver) |
||
281 | sample.setDatePreserver(DateTime(preserved)) |
||
282 | return True |
||
283 | |||
284 | |||
285 | class WorkflowActionScheduleSamplingAdapter(WorkflowActionGenericAdapter): |
||
286 | """Adapter in charge of Analysis request schedule sampling action |
||
287 | """ |
||
288 | |||
289 | View Code Duplication | def __call__(self, action, objects): |
|
290 | # Assign the scheduled Sampler and Sampling Date |
||
291 | transitioned = filter(lambda obj: self.set_sampling_info(obj), objects) |
||
292 | if not transitioned: |
||
293 | return self.redirect(message=_("No changes made"), level="warning") |
||
294 | |||
295 | # Trigger "schedule_sampling" transition |
||
296 | transitioned = self.do_action(action, transitioned) |
||
297 | if not transitioned: |
||
298 | return self.redirect(message=_("No changes made"), level="warning") |
||
299 | |||
300 | # Redirect the user to success page |
||
301 | return self.success(transitioned) |
||
302 | |||
303 | def set_sampling_info(self, sample): |
||
304 | """Updates the scheduled Sampling sampler and the Sampling Date with the |
||
305 | values provided in the request. If neither Sampling sampler nor Sampling |
||
306 | Date are present in the request, returns False |
||
307 | """ |
||
308 | if sample.getScheduledSamplingSampler() and sample.getSamplingDate(): |
||
309 | return True |
||
310 | sampler = self.get_form_value("getScheduledSamplingSampler", sample, |
||
311 | sample.getScheduledSamplingSampler()) |
||
312 | sampled = self.get_form_value("getSamplingDate", |
||
313 | sample.getSamplingDate()) |
||
314 | if not all([sampler, sampled]): |
||
315 | return False |
||
316 | sample.setScheduledSamplingSampler(sampler) |
||
317 | sample.setSamplingDate(DateTime(sampled)) |
||
318 | return True |
||
319 | |||
320 | |||
321 | class WorkflowActionSaveAnalysesAdapter(WorkflowActionGenericAdapter): |
||
322 | """Adapter in charge of "save analyses" action in Analysis Request. |
||
323 | """ |
||
324 | |||
325 | def __call__(self, action, objects): |
||
326 | """The objects passed in are Analysis Services and the context is the |
||
327 | Analysis Request |
||
328 | """ |
||
329 | sample = self.context |
||
330 | if not IAnalysisRequest.providedBy(sample): |
||
331 | return self.redirect(message=_("No changes made"), level="warning") |
||
332 | |||
333 | # NOTE: https://github.com/senaite/senaite.core/issues/1276 |
||
334 | # |
||
335 | # Explicitly lookup the UIDs from the request, because the default |
||
336 | # behavior of the method `get_uids` in `WorkflowActionGenericAdapter` |
||
337 | # falls back to the UID of the current context if no UIDs were |
||
338 | # submitted, which is in that case an `AnalysisRequest`. |
||
339 | uids = self.get_uids_from_request() |
||
340 | services = map(api.get_object, uids) |
||
341 | |||
342 | # Get form values |
||
343 | form = self.request.form |
||
344 | prices = form.get("Price", [None])[0] |
||
345 | hidden = map(lambda o: { |
||
346 | "uid": api.get_uid(o), "hidden": self.is_hidden(o) |
||
347 | }, services) |
||
348 | |||
349 | # Do not overwrite default result ranges set through sample |
||
350 | # specification field unless the edition of specs at analysis |
||
351 | # level is explicitely allowed |
||
352 | specs = [] |
||
353 | if self.is_ar_specs_enabled: |
||
354 | specs = map(lambda service: self.get_specs(service), services) |
||
355 | |||
356 | # Set new analyses to the sample |
||
357 | sample.setAnalysisServicesSettings(hidden) |
||
358 | sample.setAnalyses(uids, prices=prices, specs=specs, hidden=hidden) |
||
359 | |||
360 | # Just in case new analyses have been added while the Sample was in a |
||
361 | # "non-open" state (e.g. "to_be_verified") |
||
362 | self.do_action("rollback_to_receive", [sample]) |
||
363 | |||
364 | # Reindex the analyses that have been added |
||
365 | for analysis in sample.objectValues("Analysis"): |
||
366 | analysis.reindexObject() |
||
367 | |||
368 | # Reindex the Sample |
||
369 | sample.reindexObject() |
||
370 | |||
371 | # Redirect the user to success page |
||
372 | self.success([sample]) |
||
373 | |||
374 | @property |
||
375 | def is_ar_specs_enabled(self): |
||
376 | """Returns whether the assignment of specs at analysis level within |
||
377 | sample context is enabled or not |
||
378 | """ |
||
379 | setup = api.get_setup() |
||
380 | return setup.getEnableARSpecs() |
||
381 | |||
382 | def is_hidden(self, service): |
||
383 | """Returns whether the request Hidden param for the given obj is True |
||
384 | """ |
||
385 | uid = api.get_uid(service) |
||
386 | hidden_ans = self.request.form.get("Hidden", {}) |
||
387 | return hidden_ans.get(uid, "") == "on" |
||
388 | |||
389 | def get_specs(self, service): |
||
390 | """Returns the analysis specs available in the request for the given uid |
||
391 | """ |
||
392 | uid = api.get_uid(service) |
||
393 | keyword = service.getKeyword() |
||
394 | specs = ResultsRangeDict(keyword=keyword, uid=uid).copy() |
||
395 | for key in specs.keys(): |
||
396 | specs_value = self.request.form.get(key, [{}])[0].get(uid, None) |
||
397 | specs[key] = specs_value or specs.get(key) |
||
398 | return specs |
||
399 |