Completed
Push — master ( 65c723...b6585a )
by Jordi
76:39 queued 72:29
created

bika.lims.content.dynamic_analysisspec   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 108
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 22
eloc 83
dl 0
loc 108
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A DynamicAnalysisSpec.get_worksheets() 0 5 2
A DynamicAnalysisSpec.get_workbook() 0 6 2
B IDynamicAnalysisSpec.validate_sepecs_file() 0 21 6
A DynamicAnalysisSpec.get_by_keyword() 0 6 2
A DynamicAnalysisSpec.get_header() 0 5 3
A DynamicAnalysisSpec.get_primary_sheet() 0 5 2
A DynamicAnalysisSpec.get_specs() 0 14 5
1
# -*- coding: utf-8 -*-
2
3
from collections import defaultdict
4
from StringIO import StringIO
5
6
from bika.lims import _
7
from bika.lims.catalog import SETUP_CATALOG
8
from openpyxl.reader.excel import load_workbook
9
from openpyxl.shared.exc import InvalidFileException
10
from plone.dexterity.content import Item
11
from plone.namedfile import field as namedfile
12
from plone.supermodel import model
13
from zope.interface import Invalid
14
from zope.interface import implementer
15
from zope.interface import invariant
16
17
REQUIRED_COLUMNS = [
18
    "Keyword",  # The Analysis Keyword
19
    "min",  # Lower Limit
20
    "max",  # Upper Limit
21
]
22
23
24
class IDynamicAnalysisSpec(model.Schema):
25
    """Dynamic Analysis Specification
26
    """
27
28
    specs_file = namedfile.NamedBlobFile(
29
        title=_(u"Specification File"),
30
        description=_(u"Only Excel files supported"),
31
        required=True)
32
33
    @invariant
34
    def validate_sepecs_file(data):
35
        """Checks the Excel file contains the required header columns
36
        """
37
        fd = StringIO(data.specs_file.data)
38
        try:
39
            xls = load_workbook(fd)
40
        except (InvalidFileException, TypeError):
41
            raise Invalid(_(
42
                "Invalid specifications file detected. "
43
                "Please upload an Excel spreadsheet with at least "
44
                "the following columns defined: '{}'"
45
                .format(", ".join(REQUIRED_COLUMNS))))
46
        try:
47
            header = map(lambda c: c.value, xls.worksheets[0].rows[0])
48
        except IndexError:
49
            raise Invalid(
50
                _("First sheet does not contain a valid column definition"))
51
        for col in REQUIRED_COLUMNS:
52
            if col not in header:
53
                raise Invalid(_("Column '{}' is missing".format(col)))
54
55
56
@implementer(IDynamicAnalysisSpec)
57
class DynamicAnalysisSpec(Item):
58
    """Dynamic Analysis Specification
59
    """
60
    _catalogs = [SETUP_CATALOG]
61
62
    def get_workbook(self):
63
        specs_file = self.specs_file
64
        if not specs_file:
65
            return None
66
        data = StringIO(specs_file.data)
67
        return load_workbook(data)
68
69
    def get_worksheets(self):
70
        wb = self.get_workbook()
71
        if wb is None:
72
            return []
73
        return wb.worksheets
74
75
    def get_primary_sheet(self):
76
        sheets = self.get_worksheets()
77
        if len(sheets) == 0:
78
            return None
79
        return sheets[0]
80
81
    def get_header(self):
82
        ps = self.get_primary_sheet()
83
        if ps is None:
84
            return []
85
        return map(lambda cell: cell.value, ps.rows[0])
86
87
    def get_specs(self):
88
        ps = self.get_primary_sheet()
89
        if ps is None:
90
            return []
91
        keys = self.get_header()
92
        specs = []
93
        for num, row in enumerate(ps.rows):
94
            # skip the header
95
            if num == 0:
96
                continue
97
            values = map(lambda cell: cell.value, row)
98
            data = dict(zip(keys, values))
99
            specs.append(data)
100
        return specs
101
102
    def get_by_keyword(self):
103
        specs = self.get_specs()
104
        groups = defaultdict(list)
105
        for spec in specs:
106
            groups[spec.get("Keyword")].append(spec)
107
        return groups
108