Completed
Push — 2.x ( af72e2...15b24b )
by Jordi
29:53 queued 12:54
created

bika.lims.browser.workflow.client   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 144
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 83
dl 0
loc 144
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A WorkflowActionDownloadReportsAdapter.download() 0 9 1
A WorkflowActionPublishSamplesAdapter.get_sample_uids_in_report() 0 5 1
A WorkflowActionDownloadReportsAdapter.create_archive() 0 8 3
A WorkflowActionDownloadReportsAdapter.get_pdf() 0 7 2
A WorkflowActionPublishSamplesAdapter.__call__() 0 29 4
B WorkflowActionDownloadReportsAdapter.__call__() 0 27 5
A WorkflowActionPublishSamplesAdapter.publish_sample() 0 8 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
import tempfile
23
import zipfile
24
25
from bika.lims import _
26
from bika.lims import api
27
from bika.lims.browser.workflow import RequestContextAware
28
from bika.lims.interfaces import IWorkflowActionUIDsAdapter
29
from bika.lims.workflow import doActionFor
30
from ZODB.POSException import POSKeyError
31
from zope.interface import implements
32
33
34
class WorkflowActionDownloadReportsAdapter(RequestContextAware):
35
    """Adapter in charge of the client 'publish_samples' action
36
    """
37
    implements(IWorkflowActionUIDsAdapter)
38
39
    def __call__(self, action, uids):
40
        # get the selected ARReport objects
41
        reports = map(api.get_object_by_uid, uids)
42
43
        pdfs = []
44
45
        for report in reports:
46
            sample = report.getAnalysisRequest()
47
            sample_id = api.get_id(sample)
48
            pdf = self.get_pdf(report)
49
            if pdf is None:
50
                self.add_status_message(
51
                    _("Could not load PDF for sample {}"
52
                      .format(sample_id)), "warning")
53
                continue
54
            pdf.filename = "{}.pdf".format(sample_id)
55
            pdfs.append(pdf)
56
57
        if len(pdfs) == 1:
58
            pdf = pdfs[0]
59
            filename = pdf.filename
60
            return self.download(pdf.data, filename, type="application/pdf")
61
62
        with self.create_archive(pdfs) as archive:
63
            archive_name = "Reports-{}.zip".format(self.context.getName())
64
            data = archive.file.read()
65
            return self.download(data, archive_name, type="application/zip")
66
67
    def create_archive(self, pdfs):
68
        """Create an ZIP archive with the given PDFs
69
        """
70
        archive = tempfile.NamedTemporaryFile(suffix=".zip")
71
        with zipfile.ZipFile(archive.name, "w", zipfile.ZIP_DEFLATED) as zf:
72
            for pdf in pdfs:
73
                zf.writestr(pdf.filename, pdf.data)
74
        return archive
75
76
    def download(self, data, filename, type="application/zip"):
77
        response = self.request.response
78
        response.setHeader("Content-Disposition",
79
                           "attachment; filename={}".format(filename))
80
        response.setHeader("Content-Type", "{}; charset=utf-8".format(type))
81
        response.setHeader("Content-Length", len(data))
82
        response.setHeader("Cache-Control", "no-store")
83
        response.setHeader("Pragma", "no-cache")
84
        response.write(data)
85
86
    def get_pdf(self, report):
87
        """Get the report PDF
88
        """
89
        try:
90
            return report.getPdf()
91
        except (POSKeyError, TypeError):
92
            return None
93
94
95
class WorkflowActionPublishSamplesAdapter(RequestContextAware):
96
    """Adapter in charge of the client 'publish_samples' action
97
    """
98
    implements(IWorkflowActionUIDsAdapter)
99
100
    def __call__(self, action, uids):
101
        published = []
102
103
        # get the selected ARReport objects
104
        reports = map(api.get_object_by_uid, uids)
105
        # get all the contained sample UIDs of the generated PDFs
106
        sample_uids = map(self.get_sample_uids_in_report, reports)
107
        # uniquify the UIDs of the contained samples
108
        unique_sample_uids = set(list(
109
            itertools.chain.from_iterable(sample_uids)))
110
111
        # publish all the contained samples of the selected reports
112
        for uid in unique_sample_uids:
113
            sample = api.get_object_by_uid(uid)
114
            if self.publish_sample(sample):
115
                published.append(sample)
116
117
        # generate a status message of the published sample IDs
118
        message = _("No items published")
119
        if published:
120
            message = _("Published {}".format(
121
                ", ".join(map(api.get_id, published))))
122
123
        # add the status message for the response
124
        self.add_status_message(message, "info")
125
126
        # redirect back
127
        referer = self.request.get_header("referer")
128
        return self.redirect(redirect_url=referer)
129
130
    def get_sample_uids_in_report(self, report):
131
        """Return a list of contained sample UIDs
132
        """
133
        metadata = report.getMetadata() or {}
134
        return metadata.get("contained_requests", [])
135
136
    def publish_sample(self, sample):
137
        """Set status to prepublished/published/republished
138
        """
139
        status = api.get_workflow_status_of(sample)
140
        transitions = {"verified": "publish", "published": "republish"}
141
        transition = transitions.get(status, "prepublish")
142
        succeed, message = doActionFor(sample, transition)
143
        return succeed
144