Total Complexity | 41 |
Total Lines | 263 |
Duplicated Lines | 22.05 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.exportimport.instruments.generic.two_dimension often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # This file is part of Bika LIMS |
||
2 | # |
||
3 | # Copyright 2011-2016 by it's authors. |
||
4 | # Some rights reserved. See LICENSE.txt, AUTHORS.txt. |
||
5 | |||
6 | """ 2-Dimensional-CSV |
||
7 | """ |
||
8 | from bika.lims import bikaMessageFactory as _ |
||
9 | from bika.lims import api |
||
10 | import json |
||
11 | import re |
||
12 | from bika.lims.exportimport.instruments.utils import \ |
||
13 | (get_instrument_import_search_criteria, |
||
14 | get_instrument_import_override, |
||
15 | get_instrument_import_ar_allowed_states) |
||
16 | from bika.lims.exportimport.instruments.resultsimport import \ |
||
17 | InstrumentCSVResultsFileParser, AnalysisResultsImporter |
||
18 | import traceback |
||
19 | from bika.lims.catalog import CATALOG_ANALYSIS_REQUEST_LISTING |
||
20 | |||
21 | title = "2-Dimensional-CSV" |
||
22 | |||
23 | |||
24 | View Code Duplication | def Import(context, request): |
|
|
|||
25 | """ Read Dimensional-CSV analysis results |
||
26 | """ |
||
27 | form = request.form |
||
28 | # TODO form['file'] sometimes returns a list |
||
29 | infile = form['instrument_results_file'][0] if \ |
||
30 | isinstance(form['instrument_results_file'], list) else \ |
||
31 | form['instrument_results_file'] |
||
32 | artoapply = form['artoapply'] |
||
33 | override = form['results_override'] |
||
34 | sample = form.get('sample', 'requestid') |
||
35 | instrument = form.get('instrument', None) |
||
36 | errors = [] |
||
37 | logs = [] |
||
38 | |||
39 | # Load the most suitable parser according to file extension/options/etc... |
||
40 | parser = None |
||
41 | if not hasattr(infile, 'filename'): |
||
42 | errors.append(_("No file selected")) |
||
43 | parser = TwoDimensionCSVParser(infile) |
||
44 | status = get_instrument_import_ar_allowed_states(artoapply) |
||
45 | over = get_instrument_import_override(override) |
||
46 | sam = get_instrument_import_search_criteria(sample) |
||
47 | importer = TwoDimensionImporter(parser=parser, |
||
48 | context=context, |
||
49 | idsearchcriteria=sam, |
||
50 | allowed_ar_states=status, |
||
51 | allowed_analysis_states=None, |
||
52 | override=over, |
||
53 | instrument_uid=instrument, |
||
54 | form=form) |
||
55 | tbex = '' |
||
56 | try: |
||
57 | importer.process() |
||
58 | except: |
||
59 | tbex = traceback.format_exc() |
||
60 | errors = importer.errors |
||
61 | logs = importer.logs |
||
62 | warns = importer.warns |
||
63 | if tbex: |
||
64 | errors.append(tbex) |
||
65 | |||
66 | results = {'errors': errors, 'log': logs, 'warns': warns} |
||
67 | |||
68 | return json.dumps(results) |
||
69 | |||
70 | |||
71 | def is_keyword(kw): |
||
72 | bsc = api.get_tool('bika_setup_catalog') |
||
73 | return len(bsc(getKeyword=kw)) |
||
74 | |||
75 | |||
76 | def find_analyses(ar_or_sample): |
||
77 | """ This function is used to find keywords that are not on the analysis |
||
78 | but keywords that are on the interim fields. |
||
79 | |||
80 | This function and is is_keyword function should probably be in |
||
81 | resultsimport.py or somewhere central where it can be used by other |
||
82 | instrument interfaces. |
||
83 | """ |
||
84 | bc = api.get_tool(CATALOG_ANALYSIS_REQUEST_LISTING) |
||
85 | ar = bc(portal_type='AnalysisRequest', id=ar_or_sample) |
||
86 | if len(ar) == 0: |
||
87 | ar = bc(portal_type='AnalysisRequest', getSampleID=ar_or_sample) |
||
88 | if len(ar) == 1: |
||
89 | obj = ar[0].getObject() |
||
90 | analyses = obj.getAnalyses(full_objects=True) |
||
91 | return analyses |
||
92 | return [] |
||
93 | |||
94 | |||
95 | def get_interims_keywords(analysis): |
||
96 | interims = api.safe_getattr(analysis, 'getInterimFields') |
||
97 | return map(lambda item: item['keyword'], interims) |
||
98 | |||
99 | |||
100 | def find_analysis_interims(ar_or_sample): |
||
101 | """ This function is used to find keywords that are not on the analysis |
||
102 | but keywords that are on the interim fields. |
||
103 | |||
104 | This function and is is_keyword function should probably be in |
||
105 | resultsimport.py or somewhere central where it can be used by other |
||
106 | instrument interfaces. |
||
107 | """ |
||
108 | interim_fields = list() |
||
109 | for analysis in find_analyses(ar_or_sample): |
||
110 | keywords = get_interims_keywords(analysis) |
||
111 | interim_fields.extend(keywords) |
||
112 | return list(set(interim_fields)) |
||
113 | |||
114 | |||
115 | def find_kw(ar_or_sample, kw): |
||
116 | """ This function is used to find keywords that are not on the analysis |
||
117 | but keywords that are on the interim fields. |
||
118 | |||
119 | This function and is is_keyword function should probably be in |
||
120 | resultsimport.py or somewhere central where it can be used by other |
||
121 | instrument interfaces. |
||
122 | """ |
||
123 | for analysis in find_analyses(ar_or_sample): |
||
124 | if kw in get_interims_keywords(analysis): |
||
125 | return analysis.getKeyword() |
||
126 | return None |
||
127 | |||
128 | |||
129 | class TwoDimensionCSVParser(InstrumentCSVResultsFileParser): |
||
130 | |||
131 | QUANTITATIONRESULTS_NUMERICHEADERS = ('Title8', 'Title9', 'Title31', |
||
132 | 'Title32', 'Title41', 'Title42', |
||
133 | 'Title43',) |
||
134 | |||
135 | def __init__(self, csv): |
||
136 | InstrumentCSVResultsFileParser.__init__(self, csv) |
||
137 | self._end_header = False |
||
138 | self._keywords = [] |
||
139 | self._quantitationresultsheader = [] |
||
140 | self._numline = 0 |
||
141 | |||
142 | def _parseline(self, line): |
||
143 | if self._end_header: |
||
144 | return self.parse_resultsline(line) |
||
145 | return self.parse_headerline(line) |
||
146 | |||
147 | def parse_headerline(self, line): |
||
148 | """ Parses header lines |
||
149 | |||
150 | Keywords example: |
||
151 | Keyword1, Keyword2, Keyword3, ..., end |
||
152 | """ |
||
153 | if self._end_header is True: |
||
154 | # Header already processed |
||
155 | return 0 |
||
156 | |||
157 | splitted = [token.strip() for token in line.split(',')] |
||
158 | if splitted[-1] == 'end': |
||
159 | self._keywords = splitted[1:-1] # exclude the word end |
||
160 | self._end_header = True |
||
161 | return 0 |
||
162 | |||
163 | def parse_resultsline(self, line): |
||
164 | """ Parses result lines |
||
165 | """ |
||
166 | splitted = [token.strip() for token in line.split(',')] |
||
167 | |||
168 | if splitted[0] == 'end': |
||
169 | return 0 |
||
170 | |||
171 | blank_line = [i for i in splitted if i != ''] |
||
172 | if len(blank_line) == 0: |
||
173 | return 0 |
||
174 | |||
175 | quantitation = {} |
||
176 | list_of_interim_results = [] |
||
177 | # list_of_interim_results is a list that will have interim fields on |
||
178 | # the current line so that we don't have to call self._addRawResult |
||
179 | # for the same interim fields, ultimately we want a dict that looks |
||
180 | # like quantitation = {'AR': 'AP-0001-R01', 'interim1': 83.12, 'interim2': 22.3} |
||
181 | # self._addRawResult(quantitation['AR'], |
||
182 | # values={kw: quantitation}, |
||
183 | # override=False) |
||
184 | # We use will one of the interims to find the analysis in this case new_kw which becomes kw |
||
185 | # kw is the analysis keyword which sometimes we have to find using the interim field |
||
186 | # because we have the result of the interim field and not of the analysis |
||
187 | |||
188 | found = False # This is just a flag used to check values in list_of_interim_results |
||
189 | clean_splitted = splitted[1:-1] # First value on the line is AR |
||
190 | for i in range(len(clean_splitted)): |
||
191 | token = clean_splitted[i] |
||
192 | if i < len(self._keywords): |
||
193 | quantitation['AR'] = splitted[0] |
||
194 | # quantitation['AN'] = self._keywords[i] |
||
195 | quantitation['DefaultResult'] = 'resultValue' |
||
196 | quantitation['resultValue'] = token |
||
197 | elif token: |
||
198 | self.err("Orphan value in column ${index} (${token})", |
||
199 | mapping={"index": str(i + 1), |
||
200 | "token": token}, |
||
201 | numline=self._numline, line=line) |
||
202 | |||
203 | result = quantitation[quantitation['DefaultResult']] |
||
204 | column_name = quantitation['DefaultResult'] |
||
205 | result = self.get_result(column_name, result, line) |
||
206 | quantitation[quantitation['DefaultResult']] = result |
||
207 | |||
208 | kw = re.sub(r"\W", "", self._keywords[i]) |
||
209 | if not is_keyword(kw): |
||
210 | new_kw = find_kw(quantitation['AR'], kw) |
||
211 | if new_kw: |
||
212 | quantitation[kw] = quantitation['resultValue'] |
||
213 | del quantitation['resultValue'] |
||
214 | for interim_res in list_of_interim_results: |
||
215 | if kw in interim_res: |
||
216 | # Interim field already in quantitation dict |
||
217 | found = True |
||
218 | break |
||
219 | if found: |
||
220 | continue |
||
221 | interims = find_analysis_interims(quantitation['AR']) |
||
222 | # pairing headers(keywords) and their values(results) per line |
||
223 | keyword_value_dict = dict(zip(self._keywords, clean_splitted)) |
||
224 | for interim in interims: |
||
225 | if interim in keyword_value_dict: |
||
226 | quantitation[interim] = keyword_value_dict[interim] |
||
227 | list_of_interim_results.append(quantitation) |
||
228 | kw = new_kw |
||
229 | kw = re.sub(r"\W", "", kw) |
||
230 | |||
231 | self._addRawResult(quantitation['AR'], |
||
232 | values={kw: quantitation}, |
||
233 | override=False) |
||
234 | quantitation = {} |
||
235 | found = False |
||
236 | |||
237 | View Code Duplication | def get_result(self, column_name, result, line): |
|
238 | result = str(result) |
||
239 | if result.startswith('--') or result == '' or result == 'ND': |
||
240 | return 0.0 |
||
241 | |||
242 | if api.is_floatable(result): |
||
243 | result = api.to_float(result) |
||
244 | return result > 0.0 and result or 0.0 |
||
245 | self.err("No valid number ${result} in column (${column_name})", |
||
246 | mapping={"result": result, |
||
247 | "column_name": column_name}, |
||
248 | numline=self._numline, line=line) |
||
249 | return |
||
250 | |||
251 | |||
252 | |||
253 | class TwoDimensionImporter(AnalysisResultsImporter): |
||
254 | |||
255 | def __init__(self, parser, context, idsearchcriteria, override, |
||
256 | allowed_ar_states=None, allowed_analysis_states=None, |
||
257 | instrument_uid='', form=None): |
||
258 | AnalysisResultsImporter.__init__(self, parser, context, |
||
259 | idsearchcriteria, |
||
260 | override, allowed_ar_states, |
||
261 | allowed_analysis_states, |
||
262 | instrument_uid) |
||
263 |