Passed
Push — 2.x ( fb2717...07f530 )
by Jordi
06:32
created

DynamicAnalysisSpec.get_specs()   B

Complexity

Conditions 6

Size

Total Lines 23
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 23
rs 8.4666
c 0
b 0
f 0
cc 6
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from collections import defaultdict
22
23
from bika.lims import _
24
from bika.lims import api
25
from bika.lims.catalog import SETUP_CATALOG
26
from openpyxl.reader.excel import load_workbook
27
from openpyxl.utils.exceptions import InvalidFileException
28
from plone.dexterity.content import Item
29
from plone.namedfile import field as namedfile
30
from plone.supermodel import model
31
from six import StringIO
32
from z3c.form.interfaces import NOT_CHANGED
33
from zope.interface import Invalid
34
from zope.interface import implementer
35
from zope.interface import invariant
36
37
REQUIRED_COLUMNS = [
38
    "Keyword",  # The Analysis Keyword
39
    "min",  # Lower Limit
40
    "max",  # Upper Limit
41
]
42
43
44
class IDynamicAnalysisSpec(model.Schema):
45
    """Dynamic Analysis Specification
46
    """
47
48
    specs_file = namedfile.NamedBlobFile(
49
        title=_(u"Specification File"),
50
        description=_(u"Only Excel files supported"),
51
        required=True)
52
53
    @invariant
54
    def validate_sepecs_file(data):
55
        """Checks the Excel file contains the required header columns
56
        """
57
        # return immediately if not changed
58
        if data.specs_file == NOT_CHANGED:
59
            return True
60
        fd = StringIO(data.specs_file.data)
61
        try:
62
            xls = load_workbook(fd)
63
        except (InvalidFileException, TypeError):
64
            raise Invalid(_(
65
                "Invalid specifications file detected. "
66
                "Please upload an Excel spreadsheet with at least "
67
                "the following columns defined: '{}'"
68
                .format(", ".join(REQUIRED_COLUMNS))))
69
        try:
70
            header_row = xls.worksheets[0].rows.next()
71
            header = map(lambda c: c.value, header_row)
72
        except (IndexError, AttributeError):
73
            raise Invalid(
74
                _("First sheet does not contain a valid column definition"))
75
        for col in REQUIRED_COLUMNS:
76
            if col not in header:
77
                raise Invalid(_("Column '{}' is missing".format(col)))
78
79
80
@implementer(IDynamicAnalysisSpec)
81
class DynamicAnalysisSpec(Item):
82
    """Dynamic Analysis Specification
83
    """
84
    _catalogs = [SETUP_CATALOG]
85
86
    def get_workbook(self):
87
        specs_file = self.specs_file
88
        if not specs_file:
89
            return None
90
        data = StringIO(specs_file.data)
91
        return load_workbook(data)
92
93
    def get_worksheets(self):
94
        wb = self.get_workbook()
95
        if wb is None:
96
            return []
97
        return wb.worksheets
98
99
    def get_primary_sheet(self):
100
        sheets = self.get_worksheets()
101
        if len(sheets) == 0:
102
            return None
103
        return sheets[0]
104
105
    def get_header(self):
106
        header = []
107
        ps = self.get_primary_sheet()
108
        if ps is None:
109
            return header
110
        for num, row in enumerate(ps.rows):
111
            if num > 0:
112
                break
113
            header = [cell.value for cell in row]
114
        return header
115
116
    def get_specs(self):
117
        ps = self.get_primary_sheet()
118
        if ps is None:
119
            return []
120
        keys = self.get_header()
121
        specs = []
122
123
        def get_cell_string_value(cell):
124
            value = cell.value
125
            if api.is_string(value):
126
                return value
127
            elif value is None:
128
                return None
129
            return str(value)
130
131
        for num, row in enumerate(ps.rows):
132
            # skip the header
133
            if num == 0:
134
                continue
135
            values = map(get_cell_string_value, row)
136
            data = dict(zip(keys, values))
137
            specs.append(data)
138
        return specs
139
140
    def get_by_keyword(self):
141
        specs = self.get_specs()
142
        groups = defaultdict(list)
143
        for spec in specs:
144
            groups[spec.get("Keyword")].append(spec)
145
        return groups
146