Total Complexity | 64 |
Total Lines | 398 |
Duplicated Lines | 9.8 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.browser.workflow.analysisrequest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | from email.mime.multipart import MIMEMultipart |
||
2 | from email.mime.text import MIMEText |
||
3 | from string import Template |
||
4 | |||
5 | from DateTime import DateTime |
||
6 | from Products.CMFPlone.utils import safe_unicode |
||
7 | from bika.lims import api |
||
8 | from bika.lims import bikaMessageFactory as _ |
||
9 | from bika.lims import logger |
||
10 | from bika.lims.browser.workflow import WorkflowActionGenericAdapter, \ |
||
11 | RequestContextAware |
||
12 | from bika.lims.content.analysisspec import ResultsRangeDict |
||
13 | from bika.lims.interfaces import IAnalysisRequest, IWorkflowActionUIDsAdapter |
||
14 | from bika.lims.utils import encode_header |
||
15 | from bika.lims.utils import t |
||
16 | from email.Utils import formataddr |
||
17 | from zope.component.interfaces import implements |
||
18 | |||
19 | |||
20 | class WorkflowActionCopyToNewAdapter(RequestContextAware): |
||
21 | """Adapter in charge of Analysis Requests 'copy_to_new' action |
||
22 | """ |
||
23 | implements(IWorkflowActionUIDsAdapter) |
||
24 | |||
25 | def __call__(self, action, uids): |
||
26 | url = "{}/ar_add?ar_count={}©_from={}".format( |
||
27 | self.back_url, len(uids), ",".join(uids)) |
||
28 | return self.redirect(redirect_url=url) |
||
29 | |||
30 | |||
31 | class WorkflowActionPrintStickersAdapter(RequestContextAware): |
||
32 | """Adapter in charge of Analysis Requests 'print_stickers' action |
||
33 | """ |
||
34 | implements(IWorkflowActionUIDsAdapter) |
||
35 | |||
36 | def __call__(self, action, uids): |
||
37 | url = "{}/sticker?template={}&items={}".format(self.back_url, |
||
38 | self.context.bika_setup.getAutoStickerTemplate(), ",".join(uids)) |
||
39 | return self.redirect(redirect_url=url) |
||
40 | |||
41 | |||
42 | class WorkflowActionCreatePartitionsAdapter(RequestContextAware): |
||
43 | """Adapter in charge of Analysis Requests 'copy_to_new' action |
||
44 | """ |
||
45 | implements(IWorkflowActionUIDsAdapter) |
||
46 | |||
47 | def __call__(self, action, uids): |
||
48 | url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids)) |
||
49 | return self.redirect(redirect_url=url) |
||
50 | |||
51 | |||
52 | class WorkflowActionPublishAdapter(RequestContextAware): |
||
53 | """Adapter in charge of Analysis Requests 'publish'-like actions |
||
54 | """ |
||
55 | implements(IWorkflowActionUIDsAdapter) |
||
56 | |||
57 | def __call__(self, action, uids): |
||
58 | purl = self.context.portal_url() |
||
59 | uids = ",".join(uids) |
||
60 | url = "{}/analysisrequests/publish?items={}".format(purl, uids) |
||
61 | return self.redirect(redirect_url=url) |
||
62 | |||
63 | |||
64 | class WorkflowActionReceiveAdapter(WorkflowActionGenericAdapter): |
||
65 | """Adapter in charge of Analysis Request receive action |
||
66 | """ |
||
67 | |||
68 | def __call__(self, action, objects): |
||
69 | transitioned = self.do_action(action, objects) |
||
70 | if not transitioned: |
||
71 | return self.redirect(message=_("No changes made"), level="warning") |
||
72 | |||
73 | auto_partitions = filter(self.is_auto_partition_required, objects) |
||
74 | if auto_partitions: |
||
75 | # Redirect to the partitioning view |
||
76 | uids = ",".join(map(api.get_uid, auto_partitions)) |
||
77 | url = "{}/partition_magic?uids={}".format(self.back_url, uids) |
||
78 | return self.redirect(redirect_url=url) |
||
79 | |||
80 | if self.is_auto_print_stickers_enabled(): |
||
81 | # Redirect to the auto-print stickers view |
||
82 | uids = ",".join(map(api.get_uid, transitioned)) |
||
83 | sticker_template = self.context.bika_setup.getAutoStickerTemplate() |
||
84 | url = "{}/sticker?autoprint=1&template={}&items={}".format( |
||
85 | self.back_url, sticker_template, uids) |
||
86 | return self.redirect(redirect_url=url) |
||
87 | |||
88 | # Redirect the user to success page |
||
89 | return self.success(transitioned) |
||
90 | |||
91 | def is_auto_partition_required(self, brain_or_object): |
||
92 | """Returns whether the passed in object needs to be partitioned |
||
93 | """ |
||
94 | obj = api.get_object(brain_or_object) |
||
95 | if not IAnalysisRequest.providedBy(obj): |
||
96 | return False |
||
97 | template = obj.getTemplate() |
||
98 | return template and template.getAutoPartition() |
||
99 | |||
100 | def is_auto_print_stickers_enabled(self): |
||
101 | """Returns whether the auto print of stickers on reception is enabled |
||
102 | """ |
||
103 | return "receive" in self.context.bika_setup.getAutoPrintStickers() |
||
104 | |||
105 | |||
106 | class WorkflowActionInvalidateAdapter(WorkflowActionGenericAdapter): |
||
107 | """Adapter in charge of Analysis Request invalidate action |
||
108 | """ |
||
109 | |||
110 | def __call__(self, action, objects): |
||
111 | transitioned = self.do_action(action, objects) |
||
112 | if not transitioned: |
||
113 | return self.redirect(message=_("No changes made"), level="warning") |
||
114 | |||
115 | # Need to notify client contacts? |
||
116 | if not self.context.bika_setup.getNotifyOnARRetract(): |
||
117 | return self.success(transitioned) |
||
118 | |||
119 | # Alert the client contacts who ordered the results, stating that a |
||
120 | # possible mistake has been picked up and is under investigation. |
||
121 | for sample in transitioned: |
||
122 | self.notify_ar_retract(sample) |
||
123 | |||
124 | # Redirect the user to success page |
||
125 | return self.success(transitioned) |
||
126 | |||
127 | def notify_ar_retract(self, sample): |
||
128 | """Sends an email notification to sample's client contact if the sample |
||
129 | passed in has a retest associated |
||
130 | """ |
||
131 | retest = sample.getRetest() |
||
132 | if not retest: |
||
133 | logger.warn("No retest found for {}. And it should!" |
||
134 | .format(api.get_id(sample))) |
||
135 | return |
||
136 | |||
137 | # Email fields |
||
138 | sample_id = api.get_id(sample) |
||
139 | subject = t(_("Erroneous result publication from {}").format(sample_id)) |
||
140 | lab_address = api.get_bika_setup().laboratory.getPrintAddress() |
||
141 | emails_lab = self.get_lab_managers_formatted_emails() |
||
142 | emails_sample = self.get_sample_contacts_formatted_emails(sample) |
||
143 | recipients = list(set(emails_lab + emails_sample)) |
||
144 | |||
145 | msg = MIMEMultipart("related") |
||
146 | msg["Subject"] = subject |
||
147 | msg["From"] = self.get_laboratory_formatted_email() |
||
148 | msg["To"] = ", ".join(recipients) |
||
149 | body = Template("Some errors have been detected in the results report " |
||
150 | "published from the Sample $sample_link. The Sample " |
||
151 | "$retest_link has been created automatically and the " |
||
152 | "previous has been invalidated.<br/>" |
||
153 | "The possible mistake has been picked up and is under " |
||
154 | "investigation.<br/><br/>" |
||
155 | "$lab_address").safe_substitute( |
||
156 | dict(sample_link=self.get_html_link(sample), |
||
157 | retest_link=self.get_html_link(retest), |
||
158 | lab_address = "<br/>".join(lab_address))) |
||
159 | msg_txt = MIMEText(safe_unicode(body).encode('utf-8'), _subtype='html') |
||
160 | msg.preamble = 'This is a multi-part MIME message.' |
||
161 | msg.attach(msg_txt) |
||
162 | |||
163 | # Send the email |
||
164 | try: |
||
165 | host = api.get_tool("MailHost") |
||
166 | host.send(msg.as_string(), immediate=True) |
||
167 | except Exception as err_msg: |
||
168 | message = _("Unable to send an email to alert lab " |
||
169 | "client contacts that the Sample has been " |
||
170 | "retracted: ${error}", |
||
171 | mapping={'error': safe_unicode(err_msg)}) |
||
172 | self.context.plone_utils.addPortalMessage(message, 'warning') |
||
173 | |||
174 | def get_formatted_email(self, email_name): |
||
175 | """Formats a email |
||
176 | """ |
||
177 | return formataddr((encode_header(email_name[0]), email_name[1])) |
||
178 | |||
179 | def get_laboratory_formatted_email(self): |
||
180 | """Returns the laboratory email formatted |
||
181 | """ |
||
182 | lab = api.get_bika_setup().laboratory |
||
183 | return self.get_formatted_email((lab.getEmailAddress(), lab.getName())) |
||
184 | |||
185 | def get_lab_managers_formatted_emails(self): |
||
186 | """Returns a list with lab managers formatted emails |
||
187 | """ |
||
188 | users = api.get_users_by_roles("LabManager") |
||
189 | users = map(lambda user: (user.getProperty("email"), |
||
190 | user.getProperty("fullname")), users) |
||
191 | return map(self.get_formatted_email, users) |
||
192 | |||
193 | def get_contact_formatted_email(self, contact): |
||
194 | """Returns a string with the formatted email for the given contact |
||
195 | """ |
||
196 | contact_name = contact.Title() |
||
197 | contact_email = contact.getEmailAddress() |
||
198 | return self.get_formatted_email((contact_email, contact_name)) |
||
199 | |||
200 | def get_sample_contacts_formatted_emails(self, sample): |
||
201 | """Returns a list with the formatted emails from sample contacts |
||
202 | """ |
||
203 | contacts = list(set([sample.getContact()] + sample.getCCContact())) |
||
204 | return map(self.get_contact_formatted_email, contacts) |
||
205 | |||
206 | def get_html_link(self, obj): |
||
207 | """Returns an html formatted link for the given object |
||
208 | """ |
||
209 | return "<a href='{}'>{}</a>".format(api.get_url(obj), api.get_id(obj)) |
||
210 | |||
211 | |||
212 | class WorkflowActionPrintSampleAdapter(WorkflowActionGenericAdapter): |
||
213 | """Adapter in charge of Analysis Request print_sample action |
||
214 | """ |
||
215 | |||
216 | def __call__(self, action, objects): |
||
217 | # Update printed times |
||
218 | transitioned = filter(lambda obj: self.set_printed_time(obj), objects) |
||
219 | if not transitioned: |
||
220 | return self.redirect(message=_("No changes made"), level="warning") |
||
221 | |||
222 | # Redirect the user to success page |
||
223 | return self.success(transitioned) |
||
224 | |||
225 | def set_printed_time(self, sample): |
||
226 | """Updates the printed time of the last results report from the sample |
||
227 | """ |
||
228 | if api.get_workflow_status_of(sample) != "published": |
||
229 | return False |
||
230 | reports = sample.objectValues("ARReport") |
||
231 | reports = sorted(reports, key=lambda report: report.getDatePublished()) |
||
232 | last_report = reports[-1] |
||
233 | if not last_report.getDatePrinted(): |
||
234 | last_report.setDatePrinted(DateTime()) |
||
235 | sample.reindexObject(idxs=["getPrinted"]) |
||
236 | return True |
||
237 | |||
238 | |||
239 | class WorkflowActionSampleAdapter(WorkflowActionGenericAdapter): |
||
240 | """Adapter in charge of Analysis Request sample action |
||
241 | """ |
||
242 | |||
243 | View Code Duplication | def __call__(self, action, objects): |
|
|
|||
244 | # Assign the Sampler and DateSampled |
||
245 | transitioned = filter(lambda obj: self.set_sampler_info(obj), objects) |
||
246 | if not transitioned: |
||
247 | return self.redirect(message=_("No changes made"), level="warning") |
||
248 | |||
249 | # Trigger "sample" transition |
||
250 | transitioned = self.do_action(action, transitioned) |
||
251 | if not transitioned: |
||
252 | return self.redirect(message=_("No changes made"), level="warning") |
||
253 | |||
254 | # Redirect the user to success page |
||
255 | return self.success(transitioned) |
||
256 | |||
257 | def set_sampler_info(self, sample): |
||
258 | """Updates the Sampler and the Sample Date with the values provided in |
||
259 | the request. If neither Sampler nor SampleDate are present in the |
||
260 | request, returns False |
||
261 | """ |
||
262 | if sample.getSampler() and sample.getDateSampled(): |
||
263 | # Sampler and Date Sampled already set. This is correct |
||
264 | return True |
||
265 | sampler = self.get_form_value("Sampler", sample, sample.getSampler()) |
||
266 | sampled = self.get_form_value("getDateSampled", sample, |
||
267 | sample.getDateSampled()) |
||
268 | if not all([sampler, sampled]): |
||
269 | return False |
||
270 | sample.setSampler(sampler) |
||
271 | sample.setDateSampled(DateTime(sampled)) |
||
272 | return True |
||
273 | |||
274 | |||
275 | class WorkflowActionPreserveAdapter(WorkflowActionGenericAdapter): |
||
276 | """Adapter in charge of Analysis Request preserve action |
||
277 | """ |
||
278 | |||
279 | View Code Duplication | def __call__(self, action, objects): |
|
280 | # Assign the Preserver and DatePreserved |
||
281 | transitioned = filter(lambda obj: self.set_preserver_info(obj), objects) |
||
282 | if not transitioned: |
||
283 | return self.redirect(message=_("No changes made"), level="warning") |
||
284 | |||
285 | # Trigger "preserve" transition |
||
286 | transitioned = self.do_action(action, transitioned) |
||
287 | if not transitioned: |
||
288 | return self.redirect(message=_("No changes made"), level="warning") |
||
289 | |||
290 | # Redirect the user to success page |
||
291 | return self.success(transitioned) |
||
292 | |||
293 | def set_preserver_info(self, sample): |
||
294 | """Updates the Preserver and the Date Preserved with the values provided |
||
295 | in the request. If neither Preserver nor DatePreserved are present in |
||
296 | the request, returns False |
||
297 | """ |
||
298 | if sample.getPreserver() and sample.getDatePreserved(): |
||
299 | # Preserver and Date Preserved already set. This is correct |
||
300 | return True |
||
301 | preserver = self.get_form_value("Preserver", sample, |
||
302 | sample.getPreserver()) |
||
303 | preserved = self.get_form_value("getDatePreserved", |
||
304 | sample.getDatePreserved()) |
||
305 | if not all([preserver, preserved]): |
||
306 | return False |
||
307 | sample.setPreserver(preserver) |
||
308 | sample.setDatePreserver(DateTime(preserved)) |
||
309 | return True |
||
310 | |||
311 | |||
312 | class WorkflowActionScheduleSamplingAdapter(WorkflowActionGenericAdapter): |
||
313 | """Adapter in charge of Analysis request schedule sampling action |
||
314 | """ |
||
315 | |||
316 | View Code Duplication | def __call__(self, action, objects): |
|
317 | # Assign the scheduled Sampler and Sampling Date |
||
318 | transitioned = filter(lambda obj: self.set_sampling_info(obj), objects) |
||
319 | if not transitioned: |
||
320 | return self.redirect(message=_("No changes made"), level="warning") |
||
321 | |||
322 | # Trigger "schedule_sampling" transition |
||
323 | transitioned = self.do_action(action, transitioned) |
||
324 | if not transitioned: |
||
325 | return self.redirect(message=_("No changes made"), level="warning") |
||
326 | |||
327 | # Redirect the user to success page |
||
328 | return self.success(transitioned) |
||
329 | |||
330 | def set_sampling_info(self, sample): |
||
331 | """Updates the scheduled Sampling sampler and the Sampling Date with the |
||
332 | values provided in the request. If neither Sampling sampler nor Sampling |
||
333 | Date are present in the request, returns False |
||
334 | """ |
||
335 | if sample.getScheduledSamplingSampler() and sample.getSamplingDate(): |
||
336 | return True |
||
337 | sampler = self.get_form_value("getScheduledSamplingSampler", sample, |
||
338 | sample.getScheduledSamplingSampler()) |
||
339 | sampled = self.get_form_value("getSamplingDate", |
||
340 | sample.getSamplingDate()) |
||
341 | if not all([sampler, sampled]): |
||
342 | return False |
||
343 | sample.setScheduledSamplingSampler(sampler) |
||
344 | sample.setSamplingDate(DateTime(sampled)) |
||
345 | return True |
||
346 | |||
347 | class WorkflowActionSaveAnalysesAdapter(WorkflowActionGenericAdapter): |
||
348 | """Adapter in charge of "save analyses" action in Analysis Request. |
||
349 | """ |
||
350 | |||
351 | def __call__(self, action, services): |
||
352 | """The objects passed in are Analysis Services and the context is the |
||
353 | Analysis Request |
||
354 | """ |
||
355 | sample = self.context |
||
356 | if not IAnalysisRequest.providedBy(sample): |
||
357 | return self.redirect(message=_("No changes made"), level="warning") |
||
358 | |||
359 | # Get form values |
||
360 | form = self.request.form |
||
361 | prices = form.get("Price", [None])[0] |
||
362 | hidden = map(lambda o: {"uid": o, "hidden": self.is_hidden(o)}, services) |
||
363 | specs = map(lambda service: self.get_specs(service), services) |
||
364 | |||
365 | # Set new analyses to the sample |
||
366 | uids = map(api.get_uid, services) |
||
367 | sample.setAnalysisServicesSettings(hidden) |
||
368 | sample.setAnalyses(uids, prices=prices, specs=specs) |
||
369 | |||
370 | # Just in case new analyses have been added while the Sample was in a |
||
371 | # "non-open" state (e.g. "to_be_verified") |
||
372 | self.do_action("rollback_to_receive", [sample]) |
||
373 | |||
374 | # Reindex the analyses that have been added |
||
375 | for analysis in sample.objectValues("Analysis"): |
||
376 | analysis.reindexObject() |
||
377 | |||
378 | # Redirect the user to success page |
||
379 | self.success([sample]) |
||
380 | |||
381 | def is_hidden(self, service): |
||
382 | """Returns whether the request Hidden param for the given obj is True |
||
383 | """ |
||
384 | uid = api.get_uid(service) |
||
385 | hidden_ans = self.request.form.get("Hidden", {}) |
||
386 | return hidden_ans.get(uid, "") == "on" |
||
387 | |||
388 | def get_specs(self, service): |
||
389 | """Returns the analysis specs available in the request for the given uid |
||
390 | """ |
||
391 | uid = api.get_uid(service) |
||
392 | keyword = service.getKeyword() |
||
393 | specs = ResultsRangeDict(keyword=keyword, uid=uid).copy() |
||
394 | for key in specs.keys(): |
||
395 | specs_value = self.request.form.get(key, [{}])[0].get(uid, None) |
||
396 | specs[key] = specs_value or specs.get(key) |
||
397 | return specs |
||
398 |