Passed
Push — master ( 955ce5...625fd4 )
by Jordi
05:59
created

bika.lims.exportimport.instruments.generic.two_dimension   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 263
Duplicated Lines 22.05 %

Importance

Changes 0
Metric Value
wmc 41
eloc 173
dl 58
loc 263
rs 9.1199
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A TwoDimensionCSVParser.__init__() 0 6 1
A TwoDimensionImporter.__init__() 0 8 1
A TwoDimensionCSVParser.get_result() 13 13 5
A TwoDimensionCSVParser.parse_headerline() 0 15 3
A TwoDimensionCSVParser._parseline() 0 4 2
D TwoDimensionCSVParser.parse_resultsline() 0 73 13

6 Functions

Rating   Name   Duplication   Size   Complexity  
A get_interims_keywords() 0 3 2
A find_kw() 0 12 3
A is_keyword() 0 3 1
A find_analysis_interims() 0 13 2
A find_analyses() 0 17 3
B Import() 45 45 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like bika.lims.exportimport.instruments.generic.two_dimension often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# This file is part of Bika LIMS
2
#
3
# Copyright 2011-2016 by it's authors.
4
# Some rights reserved. See LICENSE.txt, AUTHORS.txt.
5
6
""" 2-Dimensional-CSV
7
"""
8
from bika.lims import bikaMessageFactory as _
9
from bika.lims import api
10
import json
11
import re
12
from bika.lims.exportimport.instruments.utils import \
13
    (get_instrument_import_search_criteria,
14
     get_instrument_import_override,
15
     get_instrument_import_ar_allowed_states)
16
from bika.lims.exportimport.instruments.resultsimport import \
17
    InstrumentCSVResultsFileParser, AnalysisResultsImporter
18
import traceback
19
from bika.lims.catalog import CATALOG_ANALYSIS_REQUEST_LISTING
20
21
title = "2-Dimensional-CSV"
22
23
24 View Code Duplication
def Import(context, request):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
25
    """ Read Dimensional-CSV analysis results
26
    """
27
    form = request.form
28
    # TODO form['file'] sometimes returns a list
29
    infile = form['instrument_results_file'][0] if \
30
        isinstance(form['instrument_results_file'], list) else \
31
        form['instrument_results_file']
32
    artoapply = form['artoapply']
33
    override = form['results_override']
34
    sample = form.get('sample', 'requestid')
35
    instrument = form.get('instrument', None)
36
    errors = []
37
    logs = []
38
39
    # Load the most suitable parser according to file extension/options/etc...
40
    parser = None
41
    if not hasattr(infile, 'filename'):
42
        errors.append(_("No file selected"))
43
    parser = TwoDimensionCSVParser(infile)
44
    status = get_instrument_import_ar_allowed_states(artoapply)
45
    over = get_instrument_import_override(override)
46
    sam = get_instrument_import_search_criteria(sample)
47
    importer = TwoDimensionImporter(parser=parser,
48
                                    context=context,
49
                                    idsearchcriteria=sam,
50
                                    allowed_ar_states=status,
51
                                    allowed_analysis_states=None,
52
                                    override=over,
53
                                    instrument_uid=instrument,
54
                                    form=form)
55
    tbex = ''
56
    try:
57
        importer.process()
58
    except:
59
        tbex = traceback.format_exc()
60
    errors = importer.errors
61
    logs = importer.logs
62
    warns = importer.warns
63
    if tbex:
64
        errors.append(tbex)
65
66
    results = {'errors': errors, 'log': logs, 'warns': warns}
67
68
    return json.dumps(results)
69
70
71
def is_keyword(kw):
72
    bsc = api.get_tool('bika_setup_catalog')
73
    return len(bsc(getKeyword=kw))
74
75
76
def find_analyses(ar_or_sample):
77
    """ This function is used to find keywords that are not on the analysis
78
        but keywords that are on the interim fields.
79
80
        This function and is is_keyword function should probably be in
81
        resultsimport.py or somewhere central where it can be used by other
82
        instrument interfaces.
83
    """
84
    bc = api.get_tool(CATALOG_ANALYSIS_REQUEST_LISTING)
85
    ar = bc(portal_type='AnalysisRequest', id=ar_or_sample)
86
    if len(ar) == 0:
87
        ar = bc(portal_type='AnalysisRequest', getSampleID=ar_or_sample)
88
    if len(ar) == 1:
89
        obj = ar[0].getObject()
90
        analyses = obj.getAnalyses(full_objects=True)
91
        return analyses
92
    return []
93
94
95
def get_interims_keywords(analysis):
96
    interims = api.safe_getattr(analysis, 'getInterimFields')
97
    return map(lambda item: item['keyword'], interims)
98
99
100
def find_analysis_interims(ar_or_sample):
101
    """ This function is used to find keywords that are not on the analysis
102
        but keywords that are on the interim fields.
103
104
        This function and is is_keyword function should probably be in
105
        resultsimport.py or somewhere central where it can be used by other
106
        instrument interfaces.
107
    """
108
    interim_fields = list()
109
    for analysis in find_analyses(ar_or_sample):
110
        keywords = get_interims_keywords(analysis)
111
        interim_fields.extend(keywords)
112
    return list(set(interim_fields))
113
114
115
def find_kw(ar_or_sample, kw):
116
    """ This function is used to find keywords that are not on the analysis
117
        but keywords that are on the interim fields.
118
119
        This function and is is_keyword function should probably be in
120
        resultsimport.py or somewhere central where it can be used by other
121
        instrument interfaces.
122
    """
123
    for analysis in find_analyses(ar_or_sample):
124
        if kw in get_interims_keywords(analysis):
125
            return analysis.getKeyword()
126
    return None
127
128
129
class TwoDimensionCSVParser(InstrumentCSVResultsFileParser):
130
131
    QUANTITATIONRESULTS_NUMERICHEADERS = ('Title8', 'Title9', 'Title31',
132
                                          'Title32', 'Title41', 'Title42',
133
                                          'Title43',)
134
135
    def __init__(self, csv):
136
        InstrumentCSVResultsFileParser.__init__(self, csv)
137
        self._end_header = False
138
        self._keywords = []
139
        self._quantitationresultsheader = []
140
        self._numline = 0
141
142
    def _parseline(self, line):
143
        if self._end_header:
144
            return self.parse_resultsline(line)
145
        return self.parse_headerline(line)
146
147
    def parse_headerline(self, line):
148
        """ Parses header lines
149
150
            Keywords example:
151
            Keyword1, Keyword2, Keyword3, ..., end
152
        """
153
        if self._end_header is True:
154
            # Header already processed
155
            return 0
156
157
        splitted = [token.strip() for token in line.split(',')]
158
        if splitted[-1] == 'end':
159
            self._keywords = splitted[1:-1]  # exclude the word end
160
            self._end_header = True
161
        return 0
162
163
    def parse_resultsline(self, line):
164
        """ Parses result lines
165
        """
166
        splitted = [token.strip() for token in line.split(',')]
167
168
        if splitted[0] == 'end':
169
            return 0
170
171
        blank_line = [i for i in splitted if i != '']
172
        if len(blank_line) == 0:
173
            return 0
174
175
        quantitation = {}
176
        list_of_interim_results = []
177
        # list_of_interim_results is a list that will have interim fields on
178
        # the current line so that we don't have to call self._addRawResult
179
        # for the same interim fields, ultimately we want a dict that looks
180
        # like quantitation = {'AR': 'AP-0001-R01', 'interim1': 83.12, 'interim2': 22.3}
181
        # self._addRawResult(quantitation['AR'],
182
        #                    values={kw: quantitation},
183
        #                    override=False)
184
        # We use will one of the interims to find the analysis in this case new_kw which becomes kw
185
        # kw is the analysis keyword which sometimes we have to find using the interim field
186
        # because we have the result of the interim field and not of the analysis
187
188
        found = False  # This is just a flag used to check values in list_of_interim_results
189
        clean_splitted = splitted[1:-1]  # First value on the line is AR
190
        for i in range(len(clean_splitted)):
191
            token = clean_splitted[i]
192
            if i < len(self._keywords):
193
                quantitation['AR'] = splitted[0]
194
                # quantitation['AN'] = self._keywords[i]
195
                quantitation['DefaultResult'] = 'resultValue'
196
                quantitation['resultValue'] = token
197
            elif token:
198
                self.err("Orphan value in column ${index} (${token})",
199
                         mapping={"index": str(i + 1),
200
                                  "token": token},
201
                         numline=self._numline, line=line)
202
203
            result = quantitation[quantitation['DefaultResult']]
204
            column_name = quantitation['DefaultResult']
205
            result = self.get_result(column_name, result, line)
206
            quantitation[quantitation['DefaultResult']] = result
207
208
            kw = re.sub(r"\W", "", self._keywords[i])
209
            if not is_keyword(kw):
210
                new_kw = find_kw(quantitation['AR'], kw)
211
                if new_kw:
212
                    quantitation[kw] = quantitation['resultValue']
213
                    del quantitation['resultValue']
214
                    for interim_res in list_of_interim_results:
215
                        if kw in interim_res:
216
                            # Interim field already in quantitation dict
217
                            found = True
218
                            break
219
                    if found:
220
                        continue
221
                    interims = find_analysis_interims(quantitation['AR'])
222
                    # pairing headers(keywords) and their values(results) per line
223
                    keyword_value_dict = dict(zip(self._keywords, clean_splitted))
224
                    for interim in interims:
225
                        if interim in keyword_value_dict:
226
                            quantitation[interim] = keyword_value_dict[interim]
227
                            list_of_interim_results.append(quantitation)
228
                    kw = new_kw
229
                    kw = re.sub(r"\W", "", kw)
230
231
            self._addRawResult(quantitation['AR'],
232
                               values={kw: quantitation},
233
                               override=False)
234
            quantitation = {}
235
            found = False
236
237 View Code Duplication
    def get_result(self, column_name, result, line):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
238
        result = str(result)
239
        if result.startswith('--') or result == '' or result == 'ND':
240
            return 0.0
241
242
        if api.is_floatable(result):
243
            result = api.to_float(result)
244
            return result > 0.0 and result or 0.0
245
        self.err("No valid number ${result} in column (${column_name})",
246
                 mapping={"result": result,
247
                          "column_name": column_name},
248
                 numline=self._numline, line=line)
249
        return
250
251
252
253
class TwoDimensionImporter(AnalysisResultsImporter):
254
255
    def __init__(self, parser, context, idsearchcriteria, override,
256
                 allowed_ar_states=None, allowed_analysis_states=None,
257
                 instrument_uid='', form=None):
258
        AnalysisResultsImporter.__init__(self, parser, context,
259
                                         idsearchcriteria,
260
                                         override, allowed_ar_states,
261
                                         allowed_analysis_states,
262
                                         instrument_uid)
263