Passed
Push — 2.x ( 3a8d9c...f90226 )
by Ramon
06:47
created

senaite.core.content.dynamicanalysisspec   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 147
Duplicated Lines 68.03 %

Importance

Changes 0
Metric Value
wmc 25
eloc 101
dl 100
loc 147
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
B IDynamicAnalysisSpecSchema.validate_specs_file() 25 25 7
A DynamicAnalysisSpec.get_worksheets() 5 5 2
A DynamicAnalysisSpec.get_header() 10 10 4
B DynamicAnalysisSpec.get_specs() 23 23 6
A DynamicAnalysisSpec.get_by_keyword() 6 6 2
A DynamicAnalysisSpec.get_primary_sheet() 5 5 2
A DynamicAnalysisSpec.get_workbook() 6 6 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from collections import defaultdict
22
23
from bika.lims import senaiteMessageFactory as _
24
from bika.lims import api
25
from senaite.core.catalog import SETUP_CATALOG
26
from senaite.core.content.base import Container
27
from senaite.core.interfaces import IDynamicAnalysisSpec
28
from openpyxl.reader.excel import load_workbook
29
from openpyxl.utils.exceptions import InvalidFileException
30
from plone.namedfile import field as namedfile
31
from plone.supermodel import model
32
from six import StringIO
33
from z3c.form.interfaces import NOT_CHANGED
34
from zope.interface import Invalid
35
from zope.interface import implementer
36
from zope.interface import invariant
37
38
REQUIRED_COLUMNS = [
39
    "Keyword",  # The Analysis Keyword
40
    "min",  # Lower Limit
41
    "max",  # Upper Limit
42
]
43
44
45 View Code Duplication
class IDynamicAnalysisSpecSchema(model.Schema):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
46
    """Dynamic Analysis Specification
47
    """
48
49
    specs_file = namedfile.NamedBlobFile(
50
        title=_(u"Specification File"),
51
        description=_(u"Only Excel files supported"),
52
        required=True)
53
54
    @invariant
55
    def validate_specs_file(data):
56
        """Checks the Excel file contains the required header columns
57
        """
58
        # return immediately if not changed
59
        if data.specs_file == NOT_CHANGED:
60
            return True
61
        fd = StringIO(data.specs_file.data)
62
        try:
63
            xls = load_workbook(fd)
64
        except (InvalidFileException, TypeError):
65
            raise Invalid(_(
66
                "Invalid specifications file detected. "
67
                "Please upload an Excel spreadsheet with at least "
68
                "the following columns defined: '{}'"
69
                .format(", ".join(REQUIRED_COLUMNS))))
70
        try:
71
            header_row = xls.worksheets[0].rows.next()
72
            header = map(lambda c: c.value, header_row)
73
        except (IndexError, AttributeError):
74
            raise Invalid(
75
                _("First sheet does not contain a valid column definition"))
76
        for col in REQUIRED_COLUMNS:
77
            if col not in header:
78
                raise Invalid(_("Column '{}' is missing".format(col)))
79
80
81 View Code Duplication
@implementer(IDynamicAnalysisSpec, IDynamicAnalysisSpecSchema)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
82
class DynamicAnalysisSpec(Container):
83
    """Dynamic Analysis Specification
84
    """
85
    _catalogs = [SETUP_CATALOG]
86
87
    def get_workbook(self):
88
        specs_file = self.specs_file
89
        if not specs_file:
90
            return None
91
        data = StringIO(specs_file.data)
92
        return load_workbook(data)
93
94
    def get_worksheets(self):
95
        wb = self.get_workbook()
96
        if wb is None:
97
            return []
98
        return wb.worksheets
99
100
    def get_primary_sheet(self):
101
        sheets = self.get_worksheets()
102
        if len(sheets) == 0:
103
            return None
104
        return sheets[0]
105
106
    def get_header(self):
107
        header = []
108
        ps = self.get_primary_sheet()
109
        if ps is None:
110
            return header
111
        for num, row in enumerate(ps.rows):
112
            if num > 0:
113
                break
114
            header = [cell.value for cell in row]
115
        return header
116
117
    def get_specs(self):
118
        ps = self.get_primary_sheet()
119
        if ps is None:
120
            return []
121
        keys = self.get_header()
122
        specs = []
123
124
        def get_cell_string_value(cell):
125
            value = cell.value
126
            if api.is_string(value):
127
                return value
128
            elif value is None:
129
                return None
130
            return str(value)
131
132
        for num, row in enumerate(ps.rows):
133
            # skip the header
134
            if num == 0:
135
                continue
136
            values = map(get_cell_string_value, row)
137
            data = dict(zip(keys, values))
138
            specs.append(data)
139
        return specs
140
141
    def get_by_keyword(self):
142
        specs = self.get_specs()
143
        groups = defaultdict(list)
144
        for spec in specs:
145
            groups[spec.get("Keyword")].append(spec)
146
        return groups
147