AxiosXrfCSVParser.parse_headerline()   F
last analyzed

Complexity

Conditions 44

Size

Total Lines 216
Code Lines 135

Duplication

Lines 23
Ratio 10.65 %

Importance

Changes 0
Metric Value
cc 44
eloc 135
nop 2
dl 23
loc 216
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like senaite.core.exportimport.instruments.panalytical.omnia.AxiosXrfCSVParser.parse_headerline() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from datetime import datetime
22
23
from bika.lims.utils import to_unicode
24
from senaite.core.exportimport.instruments.importer import \
25
    AnalysisResultsImporter
26
from senaite.core.exportimport.instruments.parser import \
27
    InstrumentCSVResultsFileParser
28
29
30
class AxiosXrfCSVMultiParser(InstrumentCSVResultsFileParser):
31
32
    def __init__(self, csv):
33
        InstrumentCSVResultsFileParser.__init__(self, csv)
34
        self._end_header = False
35
        self._columns = []
36
        self.columns_name = False #To know if the next line contains
37
                                  #analytic's columns name
38
39
40
    def _parseline(self, line):
41
        # Process the line differenly if it pertains at header or results block
42
        if self._end_header:
43
            return self.parse_resultline(line)
44
        else:
45
            sline = line.strip(',')
46
            return self.parse_headerline(sline)
47
48
    def splitLine(self, line):
49
        # If pertains at header it split the line by ':' and then remove ','
50
        # Else split by ',' and remove blank spaces
51
        if not self._end_header:
52
            sline = line.split(':')
53
            return [token.strip(',') for token in sline]
54
55
        return [token.strip() for token in line.split(',')]
56
57
    def csvDate2BikaDate(self,DateTime):
58
    #11/03/2014 14:46:46 --> %d/%m/%Y %H:%M %p
59
        dtobj = datetime.strptime(DateTime,"%d/%m/%Y %H:%M:%S")
60
        return dtobj.strftime("%Y%m%d %H:%M:%S")
61
62
    def parse_headerline(self, line):
63
        #Process incoming header line
64
        """11/03/2014 14:46:46
65
        PANalytical
66
        Results quantitative - Omnian 2013,
67
68
        Selected archive:,Omnian 2013
69
        Number of results selected:,4
70
        """
71
        
72
        # Save each header field (that we know) and its own value in the dict        
73
        if line.startswith('Results quantitative'):
74
            line = to_unicode(line)
75
            if len(self._header) == 0:
76
                self.err("Unexpected header format", numline=self._numline)
77
                return -1
78
79
            line = line.replace(',', "")
80
            splitted = line.split(' - ')
81
            self._header['Quantitative'] = splitted[1]
82
            return 1
83
84
        if line.startswith('Selected archive'):
85
            if len(self._header) == 0:
86
                self.err("No header found", numline=self._numline)
87
                return -1
88
89
            splitted = self.splitLine(line)
90
            if len(splitted) > 1:
91
                self._header['Archive'] = splitted[1].replace('"', '').strip()
92
            else:
93
                self.warn('Unexpected header format', numline=self._numline)
94
            return 0
95
96
        if line.startswith('Number of'):
97
            if len(self._header) == 0:
98
                self.err("No header found", numline=self._numline)
99
                return -1
100
101
            splitted = self.splitLine(line)
102
            if len(splitted) > 1:
103
                self._header['NumResults'] = splitted[1].replace('"', '').strip()
104
            else:
105
                self.warn('Unexpected header format', numline=self._numline)
106
            return 0
107
108
        if line.startswith('Seq.'):
109
            if len(self._header) == 0:
110
                self.err("No header found", numline=self._numline)
111
                return -1
112
            #Grab column names
113
            self._columns = line.split(',')
114
            self._end_header = True
115
            return 1
116
117
        else:
118
            self._header['Date'] = line
119
            return 1
120
121
122
123
    def parse_resultline(self, line):
124
        # Process incoming results line
125
        if not line.strip():
126
            return 0
127
        if line.startswith(',,'):
128
            return 0
129
130
        rawdict = {}
131
        # Split by ","
132
        splitted = self.splitLine(line.strip(";"))
133
134
        # Adjunt separated values from split by ','
135
        for idx, result in enumerate(splitted):
136
            if result.startswith('"'):
137
                # It means that is the value's firts part
138
                # Consequently we take second part and append both
139
                result = (splitted[idx].strip('"') + "," + splitted[idx+1].strip('"'))
140
                splitted[idx] = result
141
                splitted.remove(splitted[idx+1])
142
                
143
        result_type = ''
144
        result_sum = ''
145
        for idx, result in enumerate(splitted):
146
            if self._columns[idx] == 'Result type':
147
                result_type = result
148
            elif self._columns[idx].startswith('Sample name'):
149
                    rid = result
150
            elif self._columns[idx].startswith('Seq.'):
151
                pass
152
            elif self._columns[idx] == 'Sum':
153
                    result_sum = result
154
            else:
155
                rawdict[self._columns[idx]] = {'DefaultResult':result_type,
156
                                               # Replace to obtain UK values from default
157
                                               'Concentration':result.replace(',','.'),
158
                                               'Sum':result_sum}
159
        try:
160
            rawdict['DateTime'] = {'DateTime':self.csvDate2BikaDate(self._header['Date']),
161
                                   'DefaultValue':'DateTime'}
162
        except Exception:
163
            pass
164
        if not rid:
0 ignored issues
show
introduced by
The variable rid does not seem to be defined in case the for loop on line 145 is not entered. Are you sure this can never be the case?
Loading history...
165
            self.err("No Sample defined", numline=self._numline)
166
            return 0
167
168
        self._addRawResult(rid, rawdict, True)
169
        return 0
170
171
172
    def getAttachmentFileType(self):
173
        return "PANalytical - Omnia Axios XRF"
174
175
class AxiosXrfCSVParser(InstrumentCSVResultsFileParser):
176
177
    def __init__(self, csv):
178
        InstrumentCSVResultsFileParser.__init__(self, csv)
179
        self._end_header = False
180
        self._columns = []
181
        self.columns_name = False #To know if the next line contains
182
                                  #analytic's columns name
183
184
    def _parseline(self, line):
185
        # Process the line differenly if it pertains at header or results block
186
        if self._end_header:
187
            return self.parse_resultline(line)
188
        else:
189
            sline = line.strip(',')
190
            return self.parse_headerline(sline)
191
192
    def csvDate2BikaDate(self,DateTime):
193
    #11/03/2014 14:46:46 --> %d/%m/%Y %H:%M %p
194
        dtobj = datetime.strptime(DateTime,"%d/%m/%Y %H:%M:%S")
195
        return dtobj.strftime("%Y%m%d %H:%M:%S")
196
197
    def splitLine(self, line):
198
        # If pertains at header it split the line by ':' and then remove ','
199
        # Else split by ',' and remove blank spaces
200
        if not self._end_header:
201
            sline = line.split(':')
202
            return [token.strip(',') for token in sline]
203
204
        return [token.strip() for token in line.split(',')]
205
206
    def parse_headerline(self, line):
207
        #Process incoming header line
208
        r"""
209
        29/11/2013 10:15:44
210
        PANalytical
211
        "Quantification of sample ESFERA CINZA - 1g H3BO3 -  1:0,5 - NO PPC",
212
213
        R.M.S.:,"0,035"
214
        Result status:,
215
        Sum before normalization:,"119,5 %"
216
        Normalised to:,"100,0 %"
217
        Sample type:,Pressed powder
218
        Initial sample weight (g):,"2,000"
219
        Weight after pressing (g):,"3,000"
220
        Correction applied for medium:,No
221
        Correction applied for film:,No
222
        Used Compound list:,Oxides
223
        Results database:,omnian 2013
224
        Results database in:,c:\panalytical\superq\userdata
225
        """
226
227
        if line.startswith('"Quantification of sample') or line.startswith('Quantification of sample'):
228
            line = to_unicode(line)
229
            if len(self._header) == 0:
230
                self.warn('Unexpected header format', numline=self._numline)
231
                return -1
232
            # Remove non important string and double comas to obtein
233
            # the sample name free
234
            line = line.replace("Quantification of sample ", "")
235
            line = line.replace('"', "")
236
            splitted = line.split(' - ')
237
238
            if len(splitted) > 3:# Maybe we don't need this, i could be all the sample's identifier...
239
                self._header['Sample'] = splitted[0].strip(' ')
240
                self._header['Quantity'] = splitted[1]
241
                self._header['????'] = splitted[2]# At present we
242
                                                  # don't know what
243
                                                  # is that
244
                self._header['PPC'] = splitted[3]
245
            
246
            elif len(splitted) == 1:
247
                self._header['Sample'] = splitted[0].replace('Quantification of sample','').strip(' ')
248
249
            else:
250
                self.warn('Unexpected header format', numline=self._numline)
251
            return 1
252
        # Save each header field (that we know) and its own value in the dict
253
        if line.startswith('R.M.S.'):
254
255
            if len(self._header) == 0:
256
                self.err("No header found", numline=self._numline)
257
                return -1
258
259
            splitted = self.splitLine(line)
260
            if len(splitted) > 1:
261
                self._header['R.M.S.'] = splitted[1].replace('"', '').strip()
262
            else:
263
                self.warn('Unexpected header format', numline=self._numline)
264
            return 0
265
266 View Code Duplication
        if line.startswith('Result status'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
267
            if len(self._header) == 0:
268
                self.err("No header found", numline=self._numline)
269
270
            splitted = self.splitLine(line)
271
            if len(splitted) > 1:
272
                self._header['Result status'] = splitted[1].replace('"', '').strip()
273
            else:
274
                self.warn('Unexpected header format', numline=self._numline)
275
276
            return 0
277
278
        if line.startswith('Sum before normalization'):
279
            if len(self._header) == 0:
280
                self.err("No header found", numline=self._numline)
281
                return -1
282
283
            splitted = self.splitLine(line)
284
            if len(splitted) > 1:
285
                self._header['Sum'] = splitted[1].replace('"', '').strip()
286
            else:
287
                self.warn('Unexpected header format', numline=self._numline)
288
289
            return 0
290
291
        if line.startswith('Normalised to'):
292
            if len(self._header) == 0:
293
                self.err("No header found", numline=self._numline)
294
                return -1
295
296
            splitted = self.splitLine(line)
297
            if len(splitted) > 1:
298
                self._header['Normalized'] = splitted[1].replace('"', '').strip()
299
            else:
300
                self.warn('Unexpected header format', numline=self._numline)
301
302
            return 0
303
304 View Code Duplication
        if line.startswith('Sample type'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
305
            if len(self._header) == 0:
306
                self.err("No header found", numline=self._numline)
307
                return -1
308
309
            splitted = self.splitLine(line)
310
            if len(splitted) > 1:
311
                self._header['Sample type'] = splitted[1].strip()
312
            else:
313
                self.warn('Unexpected header format', numline=self._numline)
314
315
            return 0
316
317
        if line.startswith('Initial sample weight (g)'):
318
            if len(self._header) == 0:
319
                self.err("No header found", numline=self._numline)
320
                return -1
321
322
            splitted = self.splitLine(line)
323
            if len(splitted) > 1:
324
                self._header['Initial sample weight'] = splitted[1].replace('"', '').strip()
325
            else:
326
                self.warn('Unexpected header format', numline=self._numline)
327
328
            return 0
329
330
        if line.startswith('Weight after pressing (g)'):
331
            if len(self._header) == 0:
332
                self.err("No header found", numline=self._numline)
333
                return -1
334
335
            splitted = self.splitLine(line)
336
            if len(splitted) > 1:
337
                self._header['Weight after pressing'] = splitted[1].replace('"', '').strip()
338
            else:
339
                self.warn('Unexpected header format', numline=self._numline)
340
341
            return 0
342
343
        if line.startswith('Correction applied for medium'):
344
            if len(self._header) == 0:
345
                self.warn('Unexpected header format', numline=self._numline)
346
                return -1
347
348
            splitted = self.splitLine(line)
349
            if len(splitted) > 1:
350
                self._header['Correction medium'] = splitted[1].replace('"', '').strip()
351
            else:
352
                self.warn('Unexpected header format', numline=self._numline)
353
354
            return 0
355
356
        if line.startswith('Correction applied for film'):
357
            if len(self._header) == 0:
358
                self.err("No header found", numline=self._numline)
359
                return -1
360
361
            splitted = self.splitLine(line)
362
            if len(splitted) > 1:
363
                self._header['Correction film'] = splitted[1].replace('"', '').strip()
364
            else:
365
                self.warn('Unexpected header format', numline=self._numline)
366
367
            return 0
368
369
        if line.startswith('Used Compound list'):
370
            if len(self._header) == 0:
371
                self.err("No header found", numline=self._numline)
372
                return -1
373
374
            splitted = self.splitLine(line)
375
            if len(splitted) > 1:
376
                self._header['Used compound'] = splitted[1].replace('"', '').strip()
377
            else:
378
                self.warn('Unexpected header format', numline=self._numline)
379
380
            return 0
381
        if line.startswith('Results database:'):
382
            if len(self._header) == 0:
383
                self.err("No header found", numline=self._numline)
384
                return -1
385
386
            splitted = self.splitLine(line)
387
            if len(splitted) > 1:
388
                self._header['Result database'] = splitted[1].replace('"', '').strip()
389
            else:
390
                self.warn('Unexpected header format', numline=self._numline)
391
392
            return 0
393
394
       
395
        if self.columns_name:
396
            if len(self._header) == 0:
397
                self.err("No header found", numline=self._numline)
398
                return -1
399
400
            #Grab column names
401
            self._end_header = True
402
            self._columns = self.splitLine(line)
403
            return 1
404
405
        if line.startswith('Results database in'):
406
            if len(self._header) == 0:
407
                self.err("No header found", numline=self._numline)
408
                return -1
409
            
410
            splitted = self.splitLine(line)
411
            if len(splitted) > 1:
412
                self._header['Database path'] = splitted[1]+splitted[2]
413
                self.columns_name = True
414
            else:
415
                self.warn('Unexpected header format', numline=self._numline)
416
                
417
            return 1
418
            
419
        else:
420
            self._header['Date'] = line
421
            return 1
422
423
    def parse_resultline(self, line):
424
        # Process incoming results line
425
        if not line.strip():
426
            return 0
427
428
        rawdict = {}
429
        # Split by ","
430
        splitted = self.splitLine(line.strip(";"))
431
        # Look to know if the first value is an enumerate field
432
        try:
433
            int(splitted[0])
434
            rawdict["num"] = splitted[0]
435
            splitted = splitted[1:]
436
        except ValueError:
437
            pass
438
439
        # Enumerate the list to obtain: [(0,data0),(1,data1),...]
440
        e_splitted = list(enumerate(splitted))
441
        errors = ''
442
443
        com = False
444
        for idx, result in e_splitted:
445
            if result.startswith('"'):
446
                # It means that is the first value part
447
                # Consequently we take second part and append both
448
                result = (e_splitted[idx][1].strip('"') + "," + e_splitted[idx+1][1].strip('"'))
449
                e_splitted[idx] = (idx,result)
450
                e_splitted.remove(e_splitted[idx+1])
451
                com = True
452
                rawdict[self._columns[idx]] = result
453
                conc = self._columns[idx] # Main value's name
454
                                
455
               
456
            elif com:# We have rm the 2nd part value, consequently we
457
                    # need to decrement idx
458
                if len(self._columns) <= idx-1:
459
                    self.err("Orphan value in column ${index}",
460
                             mapping={"index":str(idx + 1)},
461
                             numline=self._numline)
462
                    break
463
                # We add and sync the result with its value's name
464
                rawdict[self._columns[idx-1]] = result
465
466
            else:
467
                if len(self._columns) <= idx:
468
                    self.err("Orphan value in column ${index}",
469
                             mapping={"index":str(idx + 1)},
470
                             numline=self._numline)
471
                    break
472
                rawdict[self._columns[idx]] = result
473
474
        aname = rawdict[self._columns[0]]# The fisrt column is analytic name  
475
        if not aname:
476
            self.err("No Analysis Name defined", numline=self._numline)
477
            return 0
478
        elif aname == "<H>":
479
            # <H> maybe is data error header? We need more examples...
480
            errors = rawdict.get('Compound')
481
            notes = rawdict.get('Calibration')
482
            rawdict['Notes'] = notes
483
484
        rid = self._header['Sample']
485
        if not rid:
486
            self.err("No Sample defined", numline=self._numline)
487
            return 0
488
489
        notes = rawdict.get('Notes', '')
490
        notes = "Notes: %s" % notes if notes else ''
491
        rawdict['DefaultResult'] = conc
0 ignored issues
show
introduced by
The variable conc does not seem to be defined in case the for loop on line 444 is not entered. Are you sure this can never be the case?
Loading history...
492
        # Replace to obtain UK values from default
493
        rawdict[conc] = rawdict[conc].replace(',','.')
494
        rawdict['Remarks'] = ' '.join([errors, notes])
495
        rawres = self.getRawResults().get(rid, [])
496
        raw = rawres[0] if len(rawres) > 0 else {}
497
        raw[aname] = rawdict
498
        if 'DateTime' not in raw:
499
            try:
500
                raw['DateTime'] = {'DateTime':self.csvDate2BikaDate(self._header['Date']),
501
                                   'DefaultValue':'DateTime'}
502
            except Exception:
503
                pass
504
            
505
        self._addRawResult(rid, raw, True)
506
        return 0
507
508
509
    def getAttachmentFileType(self):
510
        return "PANalytical - Omnia Axios XRF"
511
512
513
class AxiosXrfImporter(AnalysisResultsImporter):
514
515
    def __init__(self, parser, context,  override,
516
                 allowed_ar_states=None, allowed_analysis_states=None,
517
                 instrument_uid=None):
518
        AnalysisResultsImporter.__init__(self, parser, context,
519
                                          override,
520
                                         allowed_ar_states,
521
                                         allowed_analysis_states,
522
                                         instrument_uid)
523