Total Complexity | 47 |
Total Lines | 257 |
Duplicated Lines | 20.23 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.workflow.analysisrequest.guards often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2024 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | from bika.lims import api |
||
22 | from bika.lims.interfaces import IInternalUse |
||
23 | from bika.lims.interfaces import IRejected |
||
24 | from bika.lims.interfaces import IRetracted |
||
25 | from bika.lims.interfaces import ISubmitted |
||
26 | from bika.lims.interfaces import IVerified |
||
27 | from bika.lims.workflow import isTransitionAllowed |
||
28 | |||
29 | # States to be omitted in regular transitions |
||
30 | ANALYSIS_DETACHED_STATES = ['cancelled', 'rejected', 'retracted'] |
||
31 | |||
32 | |||
33 | def guard_no_sampling_workflow(analysis_request): |
||
34 | """Returns whether the transition "no_sampling_workflow" can be performed |
||
35 | or not. Returns True when Sampling Workflow is not enabled in setup |
||
36 | """ |
||
37 | return not analysis_request.getSamplingRequired() |
||
38 | |||
39 | |||
40 | def guard_to_be_sampled(analysis_request): |
||
41 | """Returns whether the transition "to_be_sampled" can be performed or not. |
||
42 | Returns True if Sampling Workflow is enabled for the analysis request |
||
43 | """ |
||
44 | return analysis_request.getSamplingRequired() |
||
45 | |||
46 | |||
47 | def guard_schedule_sampling(analysis_request): |
||
48 | """Return whether the transition "schedule_sampling" can be performed or not |
||
49 | Returns True only when the schedule sampling workflow is enabled in setup. |
||
50 | """ |
||
51 | return analysis_request.bika_setup.getScheduleSamplingEnabled() |
||
52 | |||
53 | |||
54 | def guard_create_partitions(analysis_request): |
||
55 | """Returns true if partitions can be created using the analysis request |
||
56 | passed in as the source. |
||
57 | """ |
||
58 | if analysis_request.isPartition(): |
||
59 | # Do not allow the creation of partitions from partitions |
||
60 | return False |
||
61 | |||
62 | return True |
||
63 | |||
64 | |||
65 | View Code Duplication | def guard_submit(analysis_request): |
|
|
|||
66 | """Return whether the transition "submit" can be performed or not. |
||
67 | Returns True if there is at least one analysis in a non-detached state and |
||
68 | all analyses in a non-detached analyses have been submitted. |
||
69 | """ |
||
70 | # Discard detached analyses |
||
71 | analyses = analysis_request.getAnalyses(full_objects=True) |
||
72 | analyses = filter(lambda an: api.get_workflow_status_of(an) not in |
||
73 | ANALYSIS_DETACHED_STATES, analyses) |
||
74 | |||
75 | # If not all analyses are for internal use, rely on "regular" analyses |
||
76 | internals = map(IInternalUse.providedBy, analyses) |
||
77 | omit_internals = not all(internals) |
||
78 | |||
79 | analyses_ready = False |
||
80 | for analysis in analyses: |
||
81 | # Omit analyses for internal use |
||
82 | if omit_internals and IInternalUse.providedBy(analysis): |
||
83 | continue |
||
84 | |||
85 | analysis_status = api.get_workflow_status_of(analysis) |
||
86 | if analysis_status in ['assigned', 'unassigned', 'registered']: |
||
87 | return False |
||
88 | |||
89 | analyses_ready = True |
||
90 | return analyses_ready |
||
91 | |||
92 | |||
93 | View Code Duplication | def guard_verify(analysis_request): |
|
94 | """Returns whether the transition "verify" can be performed or not. |
||
95 | Returns True if at there is at least one analysis in a non-dettached state |
||
96 | and all analyses in a non-detached state are in "verified" state. |
||
97 | """ |
||
98 | # Discard detached analyses |
||
99 | analyses = analysis_request.getAnalyses(full_objects=True) |
||
100 | analyses = filter(lambda an: api.get_workflow_status_of(an) not in |
||
101 | ANALYSIS_DETACHED_STATES, analyses) |
||
102 | |||
103 | # If not all analyses are for internal use, rely on "regular" analyses |
||
104 | internals = map(IInternalUse.providedBy, analyses) |
||
105 | omit_internals = not all(internals) |
||
106 | |||
107 | analyses_ready = False |
||
108 | for analysis in analyses: |
||
109 | # Omit analyses for internal use |
||
110 | if omit_internals and IInternalUse.providedBy(analysis): |
||
111 | continue |
||
112 | |||
113 | # All analyses must be in verified (or further) status |
||
114 | if not IVerified.providedBy(analysis): |
||
115 | return False |
||
116 | |||
117 | analyses_ready = True |
||
118 | return analyses_ready |
||
119 | |||
120 | |||
121 | def guard_prepublish(analysis_request): |
||
122 | """Returns whether 'prepublish' transition can be performed or not. Returns |
||
123 | True if the at least one of the analyses of the sample has been submitted |
||
124 | and has not been retracted or rejected. Otherwise, returns False |
||
125 | """ |
||
126 | if IInternalUse.providedBy(analysis_request): |
||
127 | return False |
||
128 | |||
129 | for analysis in analysis_request.getAnalyses(): |
||
130 | analysis = api.get_object(analysis) |
||
131 | if IRetracted.providedBy(analysis): |
||
132 | continue |
||
133 | if IRejected.providedBy(analysis): |
||
134 | continue |
||
135 | if ISubmitted.providedBy(analysis): |
||
136 | return True |
||
137 | |||
138 | return False |
||
139 | |||
140 | |||
141 | def guard_publish(analysis_request): |
||
142 | """Returns whether the transition "publish" can be performed or not. |
||
143 | Returns True if the analysis request is not labeled for internal use or if |
||
144 | at least one of the contained analyses is not for internal use |
||
145 | """ |
||
146 | if IInternalUse.providedBy(analysis_request): |
||
147 | return False |
||
148 | # Note we return True without checking anything else because this |
||
149 | # transition is only available when sample is in verified status |
||
150 | return True |
||
151 | |||
152 | |||
153 | def guard_rollback_to_receive(analysis_request): |
||
154 | """Return whether 'rollback_to_receive' transition can be performed or not. |
||
155 | Returns True if the analysis request has at least one analysis in 'assigned' |
||
156 | or 'unassigned' status. Otherwise, returns False |
||
157 | """ |
||
158 | skipped = 0 |
||
159 | valid_states = ['unassigned', 'assigned'] |
||
160 | skip_states = ['retracted', 'rejected'] |
||
161 | analyses = analysis_request.getAnalyses() |
||
162 | for analysis in analyses: |
||
163 | analysis = api.get_object(analysis) |
||
164 | status = api.get_workflow_status_of(analysis) |
||
165 | if status in valid_states: |
||
166 | return True |
||
167 | elif status in skip_states: |
||
168 | skipped += 1 |
||
169 | return len(analyses) == skipped |
||
170 | |||
171 | |||
172 | def guard_cancel(analysis_request): |
||
173 | """Returns whether 'cancel' transition can be performed or not. Returns |
||
174 | True only if all analyses are in "unassigned" status |
||
175 | """ |
||
176 | # Ask to partitions |
||
177 | for partition in analysis_request.getDescendants(all_descendants=False): |
||
178 | if not isTransitionAllowed(partition, "cancel"): |
||
179 | return False |
||
180 | |||
181 | # Look through analyses. We've checked the partitions already, so there is |
||
182 | # no need to look through analyses from partitions again, but through the |
||
183 | # analyses directly bound to the current Analysis Request. |
||
184 | cancellable_states = ["unassigned", "registered"] |
||
185 | |||
186 | # also consider the detached states as cancellable |
||
187 | cancellable_states += ANALYSIS_DETACHED_STATES |
||
188 | |||
189 | for analysis in analysis_request.objectValues("Analysis"): |
||
190 | if api.get_workflow_status_of(analysis) not in cancellable_states: |
||
191 | return False |
||
192 | |||
193 | return True |
||
194 | |||
195 | |||
196 | def guard_reinstate(analysis_request): |
||
197 | """Returns whether 'reinstate" transition can be performed or not. Returns |
||
198 | True only if this is not a partition or the parent analysis request can be |
||
199 | reinstated or is not in a cancelled state |
||
200 | """ |
||
201 | parent = analysis_request.getParentAnalysisRequest() |
||
202 | if not parent: |
||
203 | return True |
||
204 | if api.get_workflow_status_of(parent) != "cancelled": |
||
205 | return True |
||
206 | return isTransitionAllowed(parent, "reinstate") |
||
207 | |||
208 | |||
209 | def guard_sample(analysis_request): |
||
210 | """Returns whether 'sample' transition can be performed or not. Returns |
||
211 | True only if the analysis request has the DateSampled and Sampler set or if |
||
212 | the user belongs to the Samplers group |
||
213 | """ |
||
214 | if analysis_request.getDateSampled() and analysis_request.getSampler(): |
||
215 | return True |
||
216 | |||
217 | current_user = api.get_current_user() |
||
218 | return "Sampler" in current_user.getRolesInContext(analysis_request) |
||
219 | |||
220 | |||
221 | def guard_reject(analysis_request): |
||
222 | """Returns whether 'reject' transition can be performed or not. Returns |
||
223 | True only if setup's isRejectionWorkflowEnabled is True |
||
224 | """ |
||
225 | return analysis_request.bika_setup.isRejectionWorkflowEnabled() |
||
226 | |||
227 | |||
228 | def guard_detach(analysis_request): |
||
229 | """Returns whether 'detach' transition can be performed or not. Returns True |
||
230 | only if the sample passed in is a partition |
||
231 | """ |
||
232 | # Detach transition can only be done to partitions |
||
233 | return analysis_request.isPartition() |
||
234 | |||
235 | |||
236 | def guard_dispatch(sample): |
||
237 | """Checks if the dispatch transition is allowed |
||
238 | |||
239 | We prevent dispatching when one analysis is assigned to a worksheet. |
||
240 | """ |
||
241 | for analysis in sample.getAnalyses(): |
||
242 | if api.get_workflow_status_of(analysis) == "assigned": |
||
243 | return False |
||
244 | return True |
||
245 | |||
246 | |||
247 | def guard_restore(sample): |
||
248 | """Checks if the restore transition is allowed |
||
249 | """ |
||
250 | return True |
||
251 | |||
252 | |||
253 | def guard_multi_results(sample): |
||
254 | """Checks if the multi results action is allowed |
||
255 | """ |
||
256 | return True |
||
257 |