Passed
Push — master ( caa188...9d0ad3 )
by Ramon
11:25 queued 07:12
created

AttachmentsView.delete_attachment()   B

Complexity

Conditions 8

Size

Total Lines 52
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 29
dl 0
loc 52
rs 7.3173
c 0
b 0
f 0
cc 8
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
from bika.lims import api
9
from bika.lims import logger
10
from bika.lims.config import ATTACHMENT_REPORT_OPTIONS
11
from bika.lims.decorators import returns_json
12
from bika.lims.permissions import AddAttachment
13
from bika.lims.permissions import EditFieldResults
14
from bika.lims.permissions import EditResults
15
from BTrees.OOBTree import OOBTree
16
from plone import protect
17
from Products.Five.browser import BrowserView
18
from zope.annotation.interfaces import IAnnotations
19
from zope.interface import implements
20
from zope.publisher.interfaces import IPublishTraverse
21
22
23
ATTACHMENTS_STORAGE = "bika.lims.browser.attachment"
24
25
EDITABLE_STATES = [
26
    'to_be_sampled',
27
    'to_be_preserved',
28
    'sample_due',
29
    'sample_received',
30
    'to_be_verified',
31
]
32
33
34
class AttachmentsView(BrowserView):
35
    """Attachments manage view
36
37
    This view is used in the Attachments viewlet displayed in ARs and WSs, but
38
    can be used as a general purpose multi-adapter for ARs and WSs to manage
39
    attachments.
40
    """
41
    implements(IPublishTraverse)
42
43
    def __init__(self, context, request):
44
        self.context = context
45
        self.request = request
46
        self.traverse_subpath = []
47
48
    def publishTraverse(self, request, name):
49
        """get called before __call__ for each path name
50
        """
51
        self.traverse_subpath.append(name)
52
        return self
53
54
    def __call__(self):
55
        """Endpoint for form actions etc.
56
        """
57
        protect.CheckAuthenticator(self.request.form)
58
59
        url = self.context.absolute_url()
60
61
        # only proceed if the form was POSTed
62
        if not self.request.form.get("submitted", False):
63
            return self.request.response.redirect(url)
64
65
        # only handle one additional path segment to route to a form action
66
        if len(self.traverse_subpath) != 1:
67
            return self.request.response.redirect(url)
68
69
        # the first path segment is used to determine the endpoint
70
        func_name = self.traverse_subpath[0]
71
        action_name = "action_{}".format(func_name)
72
        action = getattr(self, action_name, None)
73
74
        if action is None:
75
            logger.warn("AttachmentsView.__call__: Unknown action name '{}'"
76
                        .format(func_name))
77
            return self.request.response.redirect(url)
78
        # call the endpoint
79
        return action()
80
81
    def action_update(self):
82
        """Form action enpoint to update the attachments
83
        """
84
85
        order = []
86
        form = self.request.form
87
        attachments = form.get("attachments", [])
88
89
        for attachment in attachments:
90
            # attachment is a form mapping, not a dictionary -> convert
91
            values = dict(attachment)
92
93
            uid = values.pop("UID")
94
            obj = api.get_object_by_uid(uid)
95
96
            # delete the attachment if the delete flag is true
97
            if values.pop("delete", False):
98
                self.delete_attachment(obj)
99
                continue
100
101
            # remember the order
102
            order.append(uid)
103
104
            # update the attachment with the given data
105
            obj.update(**values)
106
            obj.reindexObject()
107
108
        # set the attachments order to the annotation storage
109
        self.set_attachments_order(order)
110
111
        # redirect back to the default view
112
        return self.request.response.redirect(self.context.absolute_url())
113
114
    def action_add_to_ws(self):
115
        """Form action to add a new attachment in a worksheet
116
        """
117
118
        ws = self.context
119
        form = self.request.form
120
        attachment_file = form.get('AttachmentFile_file', None)
121
        analysis_uid = self.request.get('analysis_uid', None)
122
        service_uid = self.request.get('Service', None)
123
        AttachmentType = form.get('AttachmentType', '')
124
        AttachmentKeys = form.get('AttachmentKeys', '')
125
        ReportOption = form.get('ReportOption', 'i')
126
127
        # nothing to do if the attachment file is missing
128
        if attachment_file is None:
129
            logger.warn("AttachmentView.action_add_attachment: Attachment file is missing")
130
            return
131
132
        if analysis_uid:
133
            rc = api.get_tool("reference_catalog")
134
            analysis = rc.lookupObject(analysis_uid)
135
136
            # create attachment
137
            attachment = self.create_attachment(
138
                ws,
139
                attachment_file,
140
                AttachmentType=AttachmentType,
141
                AttachmentKeys=AttachmentKeys,
142
                ReportOption=ReportOption)
143
144
            others = analysis.getAttachment()
145
            attachments = []
146
            for other in others:
147
                attachments.append(other.UID())
148
            attachments.append(attachment.UID())
149
            analysis.setAttachment(attachments)
150
151
            # The metadata for getAttachmentUIDs need to get updated,
152
            # otherwise the attachments are not displayed
153
            # https://github.com/senaite/bika.lims/issues/521
154
            analysis.reindexObject()
155
156
        if service_uid:
157
            workflow = api.get_tool('portal_workflow')
158
159
            # XXX: refactor out dependency to this view.
160
            view = api.get_view("manage_results", context=self.context, request=self.request)
161
            analyses = view._getAnalyses()
162
163
            for analysis in analyses:
164
                if analysis.portal_type not in ('Analysis', 'DuplicateAnalysis'):
165
                    continue
166
                if not analysis.getServiceUID() == service_uid:
167
                    continue
168
                review_state = workflow.getInfoFor(analysis, 'review_state', '')
169
                if review_state not in ['assigned', 'sample_received', 'to_be_verified']:
170
                    continue
171
172
                # create attachment
173
                attachment = self.create_attachment(
174
                    ws,
175
                    attachment_file,
176
                    AttachmentType=AttachmentType,
177
                    AttachmentKeys=AttachmentKeys,
178
                    ReportOption=ReportOption)
179
180
                others = analysis.getAttachment()
181
                attachments = []
182
                for other in others:
183
                    attachments.append(other.UID())
184
                attachments.append(attachment.UID())
185
                analysis.setAttachment(attachments)
186
187
                # The metadata for getAttachmentUIDs need to get updated,
188
                # otherwise the attachments are not displayed
189
                # https://github.com/senaite/bika.lims/issues/521
190
                analysis.reindexObject()
191
192
        if self.request['HTTP_REFERER'].endswith('manage_results'):
193
            self.request.response.redirect('{}/manage_results'.format(
194
                self.context.absolute_url()))
195
        else:
196
            self.request.response.redirect(self.context.absolute_url())
197
198
    def action_add(self):
199
        """Form action to add a new attachment
200
201
        Code taken from bika.lims.content.addARAttachment.
202
        """
203
204
        form = self.request.form
205
        parent = api.get_parent(self.context)
206
        attachment_file = form.get('AttachmentFile_file', None)
207
        AttachmentType = form.get('AttachmentType', '')
208
        AttachmentKeys = form.get('AttachmentKeys', '')
209
        ReportOption = form.get('ReportOption', 'r')
210
211
        # nothing to do if the attachment file is missing
212
        if attachment_file is None:
213
            logger.warn("AttachmentView.action_add_attachment: Attachment file is missing")
214
            return
215
216
        # create attachment
217
        attachment = self.create_attachment(
218
            parent,
219
            attachment_file,
220
            AttachmentType=AttachmentType,
221
            AttachmentKeys=AttachmentKeys,
222
            ReportOption=ReportOption)
223
224
        # append the new UID to the end of the current order
225
        self.set_attachments_order(api.get_uid(attachment))
226
227
        # handle analysis attachment
228
        analysis_uid = form.get("Analysis", None)
229
        if analysis_uid:
230
            analysis = api.get_object_by_uid(analysis_uid)
231
            others = analysis.getAttachment()
232
            attachments = []
233
            for other in others:
234
                attachments.append(other.UID())
235
            attachments.append(attachment.UID())
236
            analysis.setAttachment(attachments)
237
            # The metadata for getAttachmentUIDs need to get updated,
238
            # otherwise the attachments are not displayed
239
            # https://github.com/senaite/bika.lims/issues/521
240
            analysis.reindexObject()
241
242
            if api.get_workflow_status_of(analysis) == 'attachment_due':
243
                api.do_transition_for(analysis, 'attach')
244
        else:
245
            others = self.context.getAttachment()
246
            attachments = []
247
            for other in others:
248
                attachments.append(other.UID())
249
            attachments.append(attachment.UID())
250
251
            self.context.setAttachment(attachments)
252
253
        if self.request['HTTP_REFERER'].endswith('manage_results'):
254
            self.request.response.redirect('{}/manage_results'.format(
255
                self.context.absolute_url()))
256
        else:
257
            self.request.response.redirect(self.context.absolute_url())
258
259
    def create_attachment(self, container, attachment_file, **kw):
260
        """Create an Attachment object in the given container
261
        """
262
        filename = getattr(attachment_file, "filename", "Attachment")
263
        attachment = api.create(container, "Attachment", title=filename)
264
        attachment.edit(AttachmentFile=attachment_file, **kw)
265
        attachment.processForm()
266
        attachment.reindexObject()
267
        logger.info("Created new Attachment {} in {}".format(
268
            repr(attachment), repr(container)))
269
        return attachment
270
271
    def delete_attachment(self, attachment):
272
        """Delete attachment from the AR or Analysis
273
274
        The attachment will be only deleted if it is not further referenced by
275
        another AR/Analysis.
276
        """
277
278
        # Get the holding parent of this attachment
279
        parent = None
280
        if attachment.getLinkedRequests():
281
            # Holding parent is an AR
282
            parent = attachment.getRequest()
283
        elif attachment.getLinkedAnalyses():
284
            # Holding parent is an Analysis
285
            parent = attachment.getAnalysis()
286
287
        if parent is None:
288
            logger.warn(
289
                "Attachment {} is nowhere assigned. This should never happen!"
290
                .format(repr(attachment)))
291
            return False
292
293
        # Get the other attachments of the holding parent
294
        attachments = parent.getAttachment()
295
296
        # New attachments to set
297
        if attachment in attachments:
298
            attachments.remove(attachment)
299
300
        # Set the attachments w/o the current attachments
301
        parent.setAttachment(attachments)
302
303
        retain = False
304
305
        # Attachment is referenced by another Analysis
306
        if attachment.getLinkedAnalyses():
307
            holder = attachment.getAnalysis()
308
            logger.info("Attachment {} referenced by {} -> RETAIN"
309
                        .format(repr(attachment), repr(holder)))
310
            retain = True
311
312
        # Attachment is referenced by another AR
313
        if attachment.getLinkedRequests():
314
            holder = attachment.getRequest()
315
            logger.info("Attachment {} referenced by {} -> RETAIN"
316
                        .format(repr(attachment), repr(holder)))
317
            retain = True
318
319
        # Delete attachment finally
320
        if retain is False:
321
            client = api.get_parent(attachment)
322
            client.manage_delObjects([attachment.getId(), ])
323
324
    def global_attachments_allowed(self):
325
        """Checks Bika Setup if Attachments are allowed
326
        """
327
        bika_setup = api.get_bika_setup()
328
        return bika_setup.getAttachmentsPermitted()
329
330
    def global_ar_attachments_allowed(self):
331
        """Checks Bika Setup if AR Attachments are allowed
332
        """
333
        bika_setup = api.get_bika_setup()
334
        return bika_setup.getARAttachmentsPermitted()
335
336
    def global_analysis_attachments_allowed(self):
337
        """Checks Bika Setup if Attachments for Analyses are allowed
338
        """
339
        bika_setup = api.get_bika_setup()
340
        return bika_setup.getAnalysisAttachmentsPermitted()
341
342
    def get_attachment_size(self, attachment):
343
        """Get the human readable size of the attachment
344
        """
345
        fsize = 0
346
        file = attachment.getAttachmentFile()
347
        if file:
348
            fsize = file.get_size()
349
        if fsize < 1024:
350
            fsize = '%s b' % fsize
351
        else:
352
            fsize = '%s Kb' % (fsize / 1024)
353
        return fsize
354
355
    def get_attachment_info(self, attachment):
356
        """Returns a dictionary of attachment information
357
        """
358
359
        attachment_uid = api.get_uid(attachment)
360
        attachment_file = attachment.getAttachmentFile()
361
        attachment_type = attachment.getAttachmentType()
362
        attachment_icon = attachment_file.icon
363
364
        if callable(attachment_icon):
365
            attachment_icon = attachment_icon()
366
367
        return {
368
            'keywords': attachment.getAttachmentKeys(),
369
            'size': self.get_attachment_size(attachment),
370
            'name': attachment_file.filename,
371
            'Icon': attachment_icon,
372
            'type': api.get_uid(attachment_type) if attachment_type else '',
373
            'absolute_url': attachment.absolute_url(),
374
            'UID': attachment_uid,
375
            'report_option': attachment.getReportOption(),
376
            'analysis': '',
377
        }
378
379
    def get_attachments(self):
380
        """Returns a list of attachments info dictionaries
381
382
        Original code taken from bika.lims.analysisrequest.view
383
        """
384
385
        attachments = []
386
387
        # process AR attachments
388
        for attachment in self.context.getAttachment():
389
            attachment_info = self.get_attachment_info(attachment)
390
            attachments.append(attachment_info)
391
392
        # process analyses attachments
393
        for analysis in self.context.getAnalyses(full_objects=True):
394
            for attachment in analysis.getAttachment():
395
                attachment_info = self.get_attachment_info(attachment)
396
                attachment_info["analysis"] = analysis.Title()
397
                attachment_info["analysis_uid"] = api.get_uid(analysis)
398
                attachments.append(attachment_info)
399
400
        return attachments
401
402
    def get_sorted_attachments(self):
403
        """Returns a sorted list of analysis info dictionaries
404
        """
405
        inf = float("inf")
406
        order = self.get_attachments_order()
407
        attachments = self.get_attachments()
408
409
        def att_cmp(att1, att2):
410
            _n1 = att1.get('UID')
411
            _n2 = att2.get('UID')
412
            _i1 = _n1 in order and order.index(_n1) + 1 or inf
413
            _i2 = _n2 in order and order.index(_n2) + 1 or inf
414
            return cmp(_i1, _i2)
415
416
        sorted_attachments = sorted(attachments, cmp=att_cmp)
417
        return sorted_attachments
418
419
    def get_attachment_types(self):
420
        """Returns a list of available attachment types
421
        """
422
        bika_setup_catalog = api.get_tool("bika_setup_catalog")
423
        attachment_types = bika_setup_catalog(portal_type='AttachmentType',
424
                                              inactive_state='active',
425
                                              sort_on="sortable_title",
426
                                              sort_order="ascending")
427
        return attachment_types
428
429
    def get_attachment_report_options(self):
430
        """Returns the valid attachment report options
431
        """
432
        return ATTACHMENT_REPORT_OPTIONS.items()
433
434
    def get_analyses(self):
435
        """Returns a list of analyses from the AR
436
        """
437
        analyses = self.context.getAnalyses(full_objects=True)
438
        return filter(self.is_analysis_attachment_allowed, analyses)
439
440
    def is_analysis_attachment_allowed(self, analysis):
441
        """Checks if the analysis
442
        """
443
        if analysis.getAttachmentOption() not in ["p", "r"]:
444
            return False
445
        if api.get_workflow_status_of(analysis) in ["retracted"]:
446
            return False
447
        return True
448
449
    def user_can_add_attachments(self):
450
        """Checks if the current logged in user is allowed to add attachments
451
        """
452
        if not self.global_attachments_allowed():
453
            return False
454
        context = self.context
455
        pm = api.get_tool("portal_membership")
456
        return pm.checkPermission(AddAttachment, context)
457
458
    def user_can_update_attachments(self):
459
        """Checks if the current logged in user is allowed to update attachments
460
        """
461
        context = self.context
462
        pm = api.get_tool("portal_membership")
463
        return pm.checkPermission(EditResults, context) or \
464
            pm.checkPermission(EditFieldResults, context)
465
466
    def user_can_delete_attachments(self):
467
        """Checks if the current logged in user is allowed to delete attachments
468
        """
469
        context = self.context
470
        user = api.get_current_user()
471
        if not self.is_ar_editable():
472
            return False
473
        return (self.user_can_add_attachments() and
474
                not user.allowed(context, ["Client"])) or \
475
            self.user_can_update_attachments()
476
477
    def is_ar_editable(self):
478
        """Checks if the AR is in a review_state that allows to update the attachments.
479
        """
480
        state = api.get_workflow_status_of(self.context)
481
        return state in EDITABLE_STATES
482
483
    # ANNOTATION HANDLING
484
485
    def get_annotation(self):
486
        """Get the annotation adapter
487
        """
488
        return IAnnotations(self.context)
489
490
    @property
491
    def storage(self):
492
        """A storage which keeps configuration settings for attachments
493
        """
494
        annotation = self.get_annotation()
495
        if annotation.get(ATTACHMENTS_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ATTACHMENTS_STORAGE does not seem to be defined.
Loading history...
496
            annotation[ATTACHMENTS_STORAGE] = OOBTree()
497
        return annotation[ATTACHMENTS_STORAGE]
498
499
    def flush(self):
500
        """Remove the whole storage
501
        """
502
        annotation = self.get_annotation()
503
        if annotation.get(ATTACHMENTS_STORAGE) is not None:
504
            del annotation[ATTACHMENTS_STORAGE]
505
506
    def set_attachments_order(self, order):
507
        """Remember the attachments order
508
        """
509
        # append single uids to the order
510
        if isinstance(order, basestring):
511
            new_order = self.storage.get("order", [])
512
            new_order.append(order)
513
            order = new_order
514
        self.storage.update({"order": order})
515
516
    def get_attachments_order(self):
517
        """Retunrs a list of UIDs for sorting purposes.
518
519
        The order should be in the same order like the rows of the attachment
520
        listing viewlet.
521
        """
522
        return self.storage.get("order", [])
523
524
525
class ajaxAttachmentsView(AttachmentsView):
526
    """Ajax helpers for attachments
527
    """
528
529
    def __init__(self, context, request):
530
        super(ajaxAttachmentsView, self).__init__(context, request)
531
532 View Code Duplication
    @returns_json
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
533
    def __call__(self):
534
        protect.CheckAuthenticator(self.request.form)
535
536
        if len(self.traverse_subpath) != 1:
537
            return self.error("Not found", status=404)
538
        func_name = "ajax_{}".format(self.traverse_subpath[0])
539
        func = getattr(self, func_name, None)
540
        if func is None:
541
            return self.error("Invalid function", status=400)
542
        return func()
543
544
    def error(self, message, status=500, **kw):
545
        self.request.response.setStatus(status)
546
        result = {"success": False, "errors": message}
547
        result.update(kw)
548
        return result
549
550
    def ajax_delete_analysis_attachment(self):
551
        """Endpoint for attachment delete in WS
552
        """
553
        form = self.request.form
554
        attachment_uid = form.get("attachment_uid", None)
555
556
        if not attachment_uid:
557
            return "error"
558
559
        attachment = api.get_object_by_uid(attachment_uid, None)
560
        if attachment is None:
561
            return "Could not resolve attachment UID {}".format(attachment_uid)
562
563
        # handle delete via the AttachmentsView
564
        view = self.context.restrictedTraverse("@@attachments_view")
565
        view.delete_attachment(attachment)
566
567
        return "success"
568