Passed
Push — master ( 02661d...c746f8 )
by Ramon
19:25 queued 13:16
created

guard_submit()   B

Complexity

Conditions 6

Size

Total Lines 26
Code Lines 15

Duplication

Lines 26
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 15
dl 26
loc 26
rs 8.6666
c 0
b 0
f 0
cc 6
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from bika.lims import api
22
from bika.lims.interfaces import IVerified, IInternalUse
23
from bika.lims.workflow import isTransitionAllowed
24
25
# States to be omitted in regular transitions
26
ANALYSIS_DETACHED_STATES = ['cancelled', 'rejected', 'retracted']
27
28
29
def guard_no_sampling_workflow(analysis_request):
30
    """Returns whether the transition "no_sampling_workflow" can be performed
31
    or not. Returns True when Sampling Workflow is not enabled in setup
32
    """
33
    return not analysis_request.getSamplingRequired()
34
35
36
def guard_to_be_sampled(analysis_request):
37
    """Returns whether the transition "to_be_sampled" can be performed or not.
38
    Returns True if Sampling Workflow is enabled for the analysis request
39
    """
40
    return analysis_request.getSamplingRequired()
41
42
43
def guard_schedule_sampling(analysis_request):
44
    """Return whether the transition "schedule_sampling" can be performed or not
45
    Returns True only when the schedule sampling workflow is enabled in setup.
46
    """
47
    return analysis_request.bika_setup.getScheduleSamplingEnabled()
48
49
50
def guard_create_partitions(analysis_request):
51
    """Returns true if partitions can be created using the analysis request
52
    passed in as the source.
53
    """
54
    if analysis_request.isPartition():
55
        # Do not allow the creation of partitions from partitions
56
        return False
57
58
    # Allow only the creation of partitions if all analyses from the Analysis
59
    # Request are in unassigned state. Otherwise, we could end up with
60
    # inconsistencies, because original analyses are deleted when the partition
61
    # is created. Note here we exclude analyses from children (partitions).
62
    analyses = analysis_request.objectValues("Analysis")
63
    for analysis in analyses:
64
        if api.get_workflow_status_of(analysis) != "unassigned":
65
            return False
66
    return analyses and True or False
67
68
69 View Code Duplication
def guard_submit(analysis_request):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70
    """Return whether the transition "submit" can be performed or not.
71
    Returns True if there is at least one analysis in a non-detached state and
72
    all analyses in a non-detached analyses have been submitted.
73
    """
74
    # Discard detached analyses
75
    analyses = analysis_request.getAnalyses(full_objects=True)
76
    analyses = filter(lambda an: api.get_workflow_status_of(an) not in
77
                                 ANALYSIS_DETACHED_STATES, analyses)
78
79
    # If not all analyses are for internal use, rely on "regular" analyses
80
    internals = map(IInternalUse.providedBy, analyses)
81
    omit_internals = not all(internals)
82
83
    analyses_ready = False
84
    for analysis in analyses:
85
        # Omit analyses for internal use
86
        if omit_internals and IInternalUse.providedBy(analysis):
87
            continue
88
89
        analysis_status = api.get_workflow_status_of(analysis)
90
        if analysis_status in ['assigned', 'unassigned', 'registered']:
91
            return False
92
93
        analyses_ready = True
94
    return analyses_ready
95
96
97 View Code Duplication
def guard_verify(analysis_request):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
98
    """Returns whether the transition "verify" can be performed or not.
99
    Returns True if at there is at least one analysis in a non-dettached state
100
    and all analyses in a non-detached state are in "verified" state.
101
    """
102
    # Discard detached analyses
103
    analyses = analysis_request.getAnalyses(full_objects=True)
104
    analyses = filter(lambda an: api.get_workflow_status_of(an) not in
105
                                 ANALYSIS_DETACHED_STATES, analyses)
106
107
    # If not all analyses are for internal use, rely on "regular" analyses
108
    internals = map(IInternalUse.providedBy, analyses)
109
    omit_internals = not all(internals)
110
111
    analyses_ready = False
112
    for analysis in analyses:
113
        # Omit analyses for internal use
114
        if omit_internals and IInternalUse.providedBy(analysis):
115
            continue
116
117
        # All analyses must be in verified (or further) status
118
        if not IVerified.providedBy(analysis):
119
            return False
120
121
        analyses_ready = True
122
    return analyses_ready
123
124
125
def guard_prepublish(analysis_request):
126
    """Returns whether 'prepublish' transition can be perform or not. Returns
127
    True if the analysis request has at least one analysis in 'verified' or in
128
    'to_be_verified' status. Otherwise, return False
129
    """
130
    if IInternalUse.providedBy(analysis_request):
131
        return False
132
133
    valid_states = ['verified', 'to_be_verified']
134
    for analysis in analysis_request.getAnalyses():
135
        analysis = api.get_object(analysis)
136
        if api.get_workflow_status_of(analysis) in valid_states:
137
            return True
138
    return False
139
140
141
def guard_publish(analysis_request):
142
    """Returns whether the transition "publish" can be performed or not.
143
    Returns True if the analysis request is not labeled for internal use or if
144
    at least one of the contained analyses is not for internal use
145
    """
146
    if IInternalUse.providedBy(analysis_request):
147
        return False
148
    # Note we return True without checking anything else because this
149
    # transition is only available when sample is in verified status
150
    return True
151
152
153
def guard_rollback_to_receive(analysis_request):
154
    """Return whether 'rollback_to_receive' transition can be performed or not.
155
    Returns True if the analysis request has at least one analysis in 'assigned'
156
    or 'unassigned' status. Otherwise, returns False
157
    """
158
    skipped = 0
159
    valid_states = ['unassigned', 'assigned']
160
    skip_states = ['retracted', 'rejected']
161
    analyses = analysis_request.getAnalyses()
162
    for analysis in analyses:
163
        analysis = api.get_object(analysis)
164
        status = api.get_workflow_status_of(analysis)
165
        if status in valid_states:
166
            return True
167
        elif status in skip_states:
168
            skipped += 1
169
    return len(analyses) == skipped
170
171
172
def guard_cancel(analysis_request):
173
    """Returns whether 'cancel' transition can be performed or not. Returns
174
    True only if all analyses are in "unassigned" status
175
    """
176
    # Ask to partitions
177
    for partition in analysis_request.getDescendants(all_descendants=False):
178
        if not isTransitionAllowed(partition, "cancel"):
179
            return False
180
181
    # Look through analyses. We've checked the partitions already, so there is
182
    # no need to look through analyses from partitions again, but through the
183
    # analyses directly bound to the current Analysis Request.
184
    cancellable_states = ["unassigned", "registered"]
185
    for analysis in analysis_request.objectValues("Analysis"):
186
        if api.get_workflow_status_of(analysis) not in cancellable_states:
187
            return False
188
189
    return True
190
191
192
def guard_reinstate(analysis_request):
193
    """Returns whether 'reinstate" transition can be performed or not. Returns
194
    True only if this is not a partition or the parent analysis request can be
195
    reinstated or is not in a cancelled state
196
    """
197
    parent = analysis_request.getParentAnalysisRequest()
198
    if not parent:
199
        return True
200
    if api.get_workflow_status_of(parent) != "cancelled":
201
        return True
202
    return isTransitionAllowed(parent, "reinstate")
203
204
205
def guard_sample(analysis_request):
206
    """Returns whether 'sample' transition can be performed or not. Returns
207
    True only if the analysis request has the DateSampled and Sampler set or if
208
    the user belongs to the Samplers group
209
    """
210
    if analysis_request.getDateSampled() and analysis_request.getSampler():
211
        return True
212
213
    current_user = api.get_current_user()
214
    return "Sampler" in current_user.getRolesInContext(analysis_request)
215
216
217
def guard_reject(analysis_request):
218
    """Returns whether 'reject' transition can be performed or not. Returns
219
    True only if setup's isRejectionWorkflowEnabled is True
220
    """
221
    return analysis_request.bika_setup.isRejectionWorkflowEnabled()
222