Passed
Push — master ( d8e2ec...90ae0b )
by Jordi
10:07 queued 04:19
created

AgilentMasshunterParser.parse_headerline()   F

Complexity

Conditions 40

Size

Total Lines 191
Code Lines 122

Duplication

Lines 11
Ratio 5.76 %

Importance

Changes 0
Metric Value
eloc 122
dl 11
loc 191
rs 0
c 0
b 0
f 0
cc 40
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.bika.lims.exportimport.instruments.agilent.masshunter.masshunter.AgilentMasshunterParser.parse_headerline() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
""" Agilent's 'Masshunter'
9
"""
10
from bika.lims import bikaMessageFactory as _
11
from datetime import datetime
12
import json
13
import re
14
from bika.lims.exportimport.instruments.resultsimport import \
15
    InstrumentCSVResultsFileParser, AnalysisResultsImporter
16
import traceback
17
18
title = "Agilent - Masshunter"
19
20
21 View Code Duplication
def Import(context, request):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
22
    """ Read Agilent Masshunter analysis results
23
    """
24
    form = request.form
25
    # TODO form['file'] sometimes returns a list
26
    infile = form['instrument_results_file'][0] if isinstance(
27
            form['instrument_results_file'], list) else \
28
        form['instrument_results_file']
29
    artoapply = form['artoapply']
30
    override = form['results_override']
31
32
    instrument = form.get('instrument', None)
33
    errors = []
34
    logs = []
35
36
    # Load the most suitable parser according to file extension/options/etc...
37
    parser = None
38
    if not hasattr(infile, 'filename'):
39
        errors.append(_("No file selected"))
40
    parser = AgilentMasshunterParser(infile)
41
42
    if parser:
43
        # Load the importer
44
        status = ['sample_received', 'attachment_due', 'to_be_verified']
45
        if artoapply == 'received':
46
            status = ['sample_received']
47
        elif artoapply == 'received_tobeverified':
48
            status = ['sample_received', 'attachment_due', 'to_be_verified']
49
50
        over = [False, False]
51
        if override == 'nooverride':
52
            over = [False, False]
53
        elif override == 'override':
54
            over = [True, False]
55
        elif override == 'overrideempty':
56
            over = [True, True]
57
58
        importer = AgilentMasshunterImporter(parser=parser,
59
                                             context=context,
60
                                             allowed_ar_states=status,
61
                                             allowed_analysis_states=None,
62
                                             override=over,
63
                                             instrument_uid=instrument)
64
        tbex = ''
65
        try:
66
            importer.process()
67
        except:
68
            tbex = traceback.format_exc()
69
        errors = importer.errors
70
        logs = importer.logs
71
        warns = importer.warns
72
        if tbex:
73
            errors.append(tbex)
74
75
    results = {'errors': errors, 'log': logs, 'warns': warns}
0 ignored issues
show
introduced by
The variable warns does not seem to be defined in case parser on line 42 is False. Are you sure this can never be the case?
Loading history...
76
77
    return json.dumps(results)
78
79
80
class AgilentMasshunterParser(InstrumentCSVResultsFileParser):
81
82
    HEADERKEY_ANALYSISTIME = 'Analysis Time'
83
    HEADERKEY_ANALYSTNAME = 'Analyst Name'
84
    HEADERKEY_BATCHDATAPATH = 'Batch Data Path'
85
    HEADERKEY_BATCHNAME = 'Batch Name'
86
    HEADERKEY_BATCHSTATE = 'Batch State'
87
    HEADERKEY_LASTCALIBRATION = 'Calibration Last Updated Time'
88
    HEADERKEY_REPORTGENERATIONTIME = 'Report Generation Time'
89
    HEADERKEY_REPORTGENERATORNAME = 'Report Generator Name'
90
    HEADERKEY_REPORTRESULTSDATAPATH = 'Report Results Data Path'
91
    HEADERKEY_SCHEMAVERSION = 'SchemaVersion'
92
    HEADERKEY_QUANTBATCHVERSION = 'Quant Batch Version'
93
    HEADERKEY_QUANTREPORTVERSION = 'Quant Report Version'
94
95
    QUANTITATIONRESULTS_NUMERICHEADERS = ('CalculatedConcentration',
96
                                          'FinalConcentration',
97
                                          'FinalConcentration')
98
    COMMAS = ','
99
100
    def __init__(self, csv):
101
        InstrumentCSVResultsFileParser.__init__(self, csv)
102
        self._end_header = False
103
        self._quantitationresultsheader = []
104
        self._numline = 0
105
106
    def _parseline(self, line):
107
        if self._end_header is False:
108
            return self.parse_headerline(line)
109
        else:
110
            return self.parse_quantitationesultsline(line)
111
112
    def parse_headerline(self, line):
113
        """ Parses header lines
114
115
            Analysis Time	7/13/2017 9:55
116
            Analyst Name	MassHunter01\Agilent
117
            Batch Data Path	D:\MassHunter\GCMS\Terpenes\2017\July\20170711\
118
                    QuantResults\20170711 Sample Workup
119
            Batch Name	20170711 Sample Workup
120
            Batch State	Processed
121
            Calibration Last Updated Time	6/29/2017 15:57
122
            Report Generation Time	1/1/0001 12:00:00 AM
123
            Report Generator Name	None
124
            Report Results Data Path	None
125
            SchemaVersion	65586
126
            Quant Batch Version	B.08.00
127
            Quant Report Version	B.08.00
128
        """
129
        if self._end_header is True:
130
            # Header already processed
131
            return 0
132
133
        splitted = [token.strip() for token in line.split(self.COMMAS)]
134
135
        # Analysis Time	7/13/2017 9:55
136
        if splitted[0] == self.HEADERKEY_ANALYSISTIME:
137 View Code Duplication
            if splitted[1]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
                try:
139
                    d = datetime.strptime(splitted[1], "%m/%d/%Y %H:%M")
140
                    self._header[self.HEADERKEY_ANALYSISTIME] = d
141
                except ValueError:
142
                    self.err("Invalid Output Time format",
143
                             numline=self._numline, line=line)
144
            else:
145
                self.warn("Output Time not found or empty",
146
                          numline=self._numline, line=line)
147
                d = datetime.strptime(splitted[1], "%I:%M %p")
148
149
        # Analyst Name	MassHunter01\Agilent
150
        elif splitted[0] == self.HEADERKEY_ANALYSTNAME:
151
            if self.HEADERKEY_ANALYSTNAME in self._header:
152
                self.warn("Header File Data Name already found. Discarding",
153
                          numline=self._numline, line=line)
154
                return 0
155
156
            if splitted[1]:
157
                self._header[self.HEADERKEY_ANALYSTNAME] = splitted[1]
158
            else:
159
                self.warn("File Data Name not found or empty",
160
                          numline=self._numline, line=line)
161
162
        # Batch Data Path
163
        # D:\MassHunter\GCMS\Terpenes\2017\July\20170711\QuantResults\20170711
164
        elif splitted[0] == self.HEADERKEY_BATCHDATAPATH:
165
            if self.HEADERKEY_BATCHDATAPATH in self._header:
166
                self.warn("Header File Data Name already found. Discarding",
167
                          numline=self._numline, line=line)
168
                return 0
169
170
            if splitted[1]:
171
                self._header[self.HEADERKEY_BATCHDATAPATH] = splitted[1]
172
            else:
173
                self.warn("File Data Name not found or empty",
174
                          numline=self._numline, line=line)
175
176
        # Batch Name	20170711 Sample Workup
177
        elif splitted[0] == self.HEADERKEY_BATCHNAME:
178
            if self.HEADERKEY_BATCHNAME in self._header:
179
                self.warn("Header File Data Name already found. Discarding",
180
                          numline=self._numline, line=line)
181
                return 0
182
183
            if splitted[1]:
184
                self._header[self.HEADERKEY_BATCHNAME] = splitted[1]
185
            else:
186
                self.warn("File Data Name not found or empty",
187
                          numline=self._numline, line=line)
188
189
        # Batch State	Processed
190
        elif splitted[0] == self.HEADERKEY_BATCHSTATE:
191
            if self.HEADERKEY_BATCHSTATE in self._header:
192
                self.warn("Header File Data Name already found. Discarding",
193
                          numline=self._numline, line=line)
194
                return 0
195
196
            if splitted[1]:
197
                self._header[self.HEADERKEY_BATCHNAME] = splitted[1]
198
            else:
199
                self.warn("File Data Name not found or empty",
200
                          numline=self._numline, line=line)
201
202
        # Calibration Last Updated Time	6/29/2017 15:57
203
        elif splitted[0] == self.HEADERKEY_LASTCALIBRATION:
204
            if self.HEADERKEY_LASTCALIBRATION in self._header:
205
                self.warn("Header File Data Name already found. Discarding",
206
                          numline=self._numline, line=line)
207
                return 0
208
209
            if splitted[1]:
210
                self._header[self.HEADERKEY_LASTCALIBRATION] = splitted[1]
211
            else:
212
                self.warn("File Data Name not found or empty",
213
                          numline=self._numline, line=line)
214
215
        # Report Generation Time	1/1/0001 12:00:00 AM
216
        elif splitted[0] == self.HEADERKEY_REPORTGENERATIONTIME:
217
            if self.HEADERKEY_REPORTGENERATIONTIME in self._header:
218
                self.warn("Header File Data Name already found. Discarding",
219
                          numline=self._numline, line=line)
220
                return 0
221
222
            if splitted[1]:
223
                self._header[self.HEADERKEY_REPORTGENERATIONTIME] = splitted[1]
224
            else:
225
                self.warn("File Data Name not found or empty",
226
                          numline=self._numline, line=line)
227
228
        # Report Generator Name	None
229
        elif splitted[0] == self.HEADERKEY_REPORTGENERATORNAME:
230
            if self.HEADERKEY_REPORTGENERATORNAME in self._header:
231
                self.warn("Header File Data Name already found. Discarding",
232
                          numline=self._numline, line=line)
233
                return 0
234
235
            if splitted[1]:
236
                self._header[self.HEADERKEY_REPORTGENERATORNAME] = splitted[1]
237
            else:
238
                self.warn("File Data Name not found or empty",
239
                          numline=self._numline, line=line)
240
241
        # Report Results Data Path	None
242
        elif splitted[0] == self.HEADERKEY_REPORTRESULTSDATAPATH:
243
            if self.HEADERKEY_REPORTRESULTSDATAPATH in self._header:
244
                self.warn("Header File Data Name already found. Discarding",
245
                          numline=self._numline, line=line)
246
                return 0
247
248
            if splitted[1]:
249
                self._header[self.HEADERKEY_REPORTRESULTSDATAPATH] = \
250
                    splitted[1]
251
            else:
252
                self.warn("File Data Name not found or empty",
253
                          numline=self._numline, line=line)
254
255
        # SchemaVersion	65586
256
        elif splitted[0] == self.HEADERKEY_SCHEMAVERSION:
257
            if self.HEADERKEY_SCHEMAVERSION in self._header:
258
                self.warn("Header File Data Name already found. Discarding",
259
                          numline=self._numline, line=line)
260
                return 0
261
262
            if splitted[1]:
263
                self._header[self.HEADERKEY_SCHEMAVERSION] = splitted[1]
264
            else:
265
                self.warn("File Data Name not found or empty",
266
                          numline=self._numline, line=line)
267
268
        # Quant Batch Version	B.08.00
269
        elif splitted[0] == self.HEADERKEY_QUANTBATCHVERSION:
270
            if self.HEADERKEY_QUANTBATCHVERSION in self._header:
271
                self.warn("Header File Data Name already found. Discarding",
272
                          numline=self._numline, line=line)
273
                return 0
274
275
            if splitted[1]:
276
                self._header[self.HEADERKEY_QUANTBATCHVERSION] = splitted[1]
277
            else:
278
                self.warn("File Data Name not found or empty",
279
                          numline=self._numline, line=line)
280
281
        # Quant Report Version	B.08.00
282
        elif splitted[0] == self.HEADERKEY_QUANTREPORTVERSION:
283
            if self.HEADERKEY_QUANTREPORTVERSION in self._header:
284
                self.warn("Header File Data Name already found. Discarding",
285
                          numline=self._numline, line=line)
286
                return 0
287
288
            if splitted[1]:
289
                self._header[self.HEADERKEY_QUANTREPORTVERSION] = splitted[1]
290
            else:
291
                self.warn("File Data Name not found or empty",
292
                          numline=self._numline, line=line)
293
294
        # Blank lines
295
        if splitted[0] == '':
296
            self._end_header = True
297
            if len(self._header) == 0:
298
                self.err("No header found", numline=self._numline)
299
                return -1
300
            return 0
301
302
        return 0
303
304
    def parse_quantitationesultsline(self, line):
305
        """ Parses quantitation result lines
306
            Please see samples/GC-MS output.txt
307
            [MS Quantitative Results] section
308
        """
309
310
        if line == ',,,,,,,,,,,,,,,,,,':
311
            return 0
312
313
        if line.startswith('SampleID'):
314
            self._end_header = True
315
            self._quantitationresultsheader = [token.strip() for token
316
                                               in line.split(self.COMMAS)
317
                                               if token.strip()]
318
            return 0
319
320
        splitted = [token.strip() for token in line.split(self.COMMAS)]
321
        quantitation = {'DefaultResult': 'FinalConcentration'}
322
        for colname in self._quantitationresultsheader:
323
            quantitation[colname] = ''
324
325
        for i in range(len(splitted)):
326
            token = splitted[i]
327 View Code Duplication
            if i < len(self._quantitationresultsheader):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
328
                colname = self._quantitationresultsheader[i]
329
                if colname in self.QUANTITATIONRESULTS_NUMERICHEADERS:
330
                    try:
331
                        quantitation[colname] = float(token)
332
                    except ValueError:
333
                        self.warn(
334
                            "No valid number ${token} in column "
335
                            "${index} (${column_name})",
336
                            mapping={"token": token,
337
                                     "index": str(i + 1),
338
                                     "column_name": colname},
339
                            numline=self._numline, line=line)
340
                        quantitation[colname] = token
341
                else:
342
                    quantitation[colname] = token
343
344
            elif token:
345
                self.err("Orphan value in column ${index} (${token})",
346
                         mapping={"index": str(i+1),
347
                                  "token": token},
348
                         numline=self._numline, line=line)
349
350
        result = quantitation[quantitation['DefaultResult']]
351
        column_name = quantitation['DefaultResult']
352
        result = self.zeroValueDefaultInstrumentResults(column_name,
353
                                                        result, line)
354
        quantitation[quantitation['DefaultResult']] = result
355
356
        d = datetime.strptime(quantitation['AcqDateTime'], "%m/%d/%Y %H:%M")
357
        quantitation['AcqDateTime'] = d
358
        val = re.sub(r"\W", "", quantitation['Compound'])
359
        self._addRawResult(quantitation['DataFileName'],
360
                           values={val: quantitation},
361
                           override=False)
362
363
    def zeroValueDefaultInstrumentResults(self, column_name, result, line):
364
        result = str(result)
365
        if result.startswith('--') or result == '' or result == 'ND':
366
            return 0.0
367
368
        try:
369
            result = float(result)
370
            if result < 0.0:
371
                result = 0.0
372
        except ValueError:
373
            self.err(
374
                "No valid number ${result} in column (${column_name})",
375
                mapping={"result": result,
376
                         "column_name": column_name},
377
                numline=self._numline, line=line)
378
            return
379
        return result
380
381
382
class AgilentMasshunterImporter(AnalysisResultsImporter):
383
384
    def __init__(self, parser, context, override,
385
                 allowed_ar_states=None, allowed_analysis_states=None,
386
                 instrument_uid=''):
387
        AnalysisResultsImporter.__init__(self, parser, context,
388
                                         override, allowed_ar_states,
389
                                         allowed_analysis_states,
390
                                         instrument_uid)
391