Passed
Push — master ( b60410...062e8c )
by Jordi
05:05
created

bika.lims.exportimport.instruments.panalytical.omnia   F

Complexity

Total Complexity 97

Size/Duplication

Total Lines 524
Duplicated Lines 29.2 %

Importance

Changes 0
Metric Value
wmc 97
eloc 330
dl 153
loc 524
rs 2
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
C AxiosXrfCSVMultiParser.parse_headerline() 22 58 11
F AxiosXrfCSVParser.parse_headerline() 131 216 44
A AxiosXrfCSVMultiParser.getAttachmentFileType() 0 2 1
A AxiosXrfCSVParser.splitLine() 0 8 2
A AxiosXrfCSVMultiParser._parseline() 0 7 2
A AxiosXrfCSVMultiParser.__init__() 0 5 1
D AxiosXrfCSVMultiParser.parse_resultline() 0 49 12
F AxiosXrfCSVParser.parse_resultline() 0 84 15
A AxiosXrfCSVParser.__init__() 0 5 1
A AxiosXrfCSVParser.csvDate2BikaDate() 0 4 1
A AxiosXrfCSVParser._parseline() 0 7 2
A AxiosXrfImporter.__init__() 0 8 1
A AxiosXrfCSVParser.getAttachmentFileType() 0 2 1
A AxiosXrfCSVMultiParser.csvDate2BikaDate() 0 4 1
A AxiosXrfCSVMultiParser.splitLine() 0 8 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like bika.lims.exportimport.instruments.panalytical.omnia often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
""" Omnia Axios XRF
22
"""
23
from datetime import datetime
24
from bika.lims.utils import to_unicode
25
from bika.lims import bikaMessageFactory as _
26
from bika.lims.exportimport.instruments.resultsimport import \
27
    AnalysisResultsImporter, InstrumentCSVResultsFileParser
28
29
class AxiosXrfCSVMultiParser(InstrumentCSVResultsFileParser):
30
31
    def __init__(self, csv):
32
        InstrumentCSVResultsFileParser.__init__(self, csv)
33
        self._end_header = False
34
        self._columns = []
35
        self.columns_name = False #To know if the next line contains
36
                                  #analytic's columns name
37
38
39
    def _parseline(self, line):
40
        # Process the line differenly if it pertains at header or results block
41
        if self._end_header == False:
42
            sline = line.strip(',')
43
            return self.parse_headerline(sline)
44
        else:
45
            return self.parse_resultline(line)
46
47
    def splitLine(self, line):
48
        # If pertains at header it split the line by ':' and then remove ','
49
        # Else split by ',' and remove blank spaces
50
        if self._end_header == False:
51
            sline = line.split(':')
52
            return [token.strip(',') for token in sline]
53
54
        return [token.strip() for token in line.split(',')]
55
56
    def csvDate2BikaDate(self,DateTime):
57
    #11/03/2014 14:46:46 --> %d/%m/%Y %H:%M %p
58
        dtobj = datetime.strptime(DateTime,"%d/%m/%Y %H:%M:%S")
59
        return dtobj.strftime("%Y%m%d %H:%M:%S")
60
61
    def parse_headerline(self, line):
62
        #Process incoming header line
63
        """11/03/2014 14:46:46
64
        PANalytical
65
        Results quantitative - Omnian 2013,
66
67
        Selected archive:,Omnian 2013
68
        Number of results selected:,4
69
        """
70
        
71
        # Save each header field (that we know) and its own value in the dict        
72
        if line.startswith('Results quantitative'):
73
            line = to_unicode(line)
74
            if len(self._header) == 0:
75
                self.err("Unexpected header format", numline=self._numline)
76
                return -1
77
78
            line = line.replace(',', "")
79
            splitted = line.split(' - ')
80
            self._header['Quantitative'] = splitted[1]
81
            return 1
82
83 View Code Duplication
        if line.startswith('Selected archive'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
84
            if len(self._header) == 0:
85
                self.err("No header found", numline=self._numline)
86
                return -1
87
88
            splitted = self.splitLine(line)
89
            if len(splitted) > 1:
90
                self._header['Archive'] = splitted[1].replace('"', '').strip()
91
            else:
92
                self.warn('Unexpected header format', numline=self._numline)
93
            return 0
94
95 View Code Duplication
        if line.startswith('Number of'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
96
            if len(self._header) == 0:
97
                self.err("No header found", numline=self._numline)
98
                return -1
99
100
            splitted = self.splitLine(line)
101
            if len(splitted) > 1:
102
                self._header['NumResults'] = splitted[1].replace('"', '').strip()
103
            else:
104
                self.warn('Unexpected header format', numline=self._numline)
105
            return 0
106
107
        if line.startswith('Seq.'):
108
            if len(self._header) == 0:
109
                self.err("No header found", numline=self._numline)
110
                return -1
111
            #Grab column names
112
            self._columns = line.split(',')
113
            self._end_header = True
114
            return 1
115
116
        else:
117
            self._header['Date'] = line
118
            return 1
119
120
121
122
    def parse_resultline(self, line):
123
        # Process incoming results line
124
        if not line.strip():
125
            return 0
126
        if line.startswith(',,'):
127
            return 0
128
129
        rawdict = {}
130
        # Split by ","
131
        splitted = self.splitLine(line.strip(";"))
132
133
        errors = ''
134
135
        # Adjunt separated values from split by ','
136
        for idx, result in enumerate(splitted):
137
            if result.startswith('"'):
138
                # It means that is the value's firts part
139
                # Consequently we take second part and append both
140
                result = (splitted[idx].strip('"') + "," + splitted[idx+1].strip('"'))
141
                splitted[idx] = result
142
                splitted.remove(splitted[idx+1])
143
                
144
        result_type = ''
145
        result_sum = ''
146
        for idx, result in enumerate(splitted):
147
            if self._columns[idx] == 'Result type':
148
                result_type = result
149
            elif self._columns[idx].startswith('Sample name'):
150
                    rid = result
151
            elif self._columns[idx].startswith('Seq.'):
152
                pass
153
            elif self._columns[idx] == 'Sum':
154
                    result_sum = result
155
            else:
156
                rawdict[self._columns[idx]] = {'DefaultResult':result_type,
157
                                               # Replace to obtain UK values from default
158
                                               'Concentration':result.replace(',','.'),
159
                                               'Sum':result_sum}
160
        try:
161
            rawdict['DateTime'] = {'DateTime':self.csvDate2BikaDate(self._header['Date']),
162
                                   'DefaultValue':'DateTime'}
163
        except:
164
            pass
165
        if not rid:
0 ignored issues
show
introduced by
The variable rid does not seem to be defined in case the for loop on line 146 is not entered. Are you sure this can never be the case?
Loading history...
166
            self.err("No Sample defined", numline=self._numline)
167
            return 0
168
169
        self._addRawResult(rid, rawdict, True)
170
        return 0
171
172
173
    def getAttachmentFileType(self):
174
        return "PANalytical - Omnia Axios XRF"
175
176
class AxiosXrfCSVParser(InstrumentCSVResultsFileParser):
177
178
    def __init__(self, csv):
179
        InstrumentCSVResultsFileParser.__init__(self, csv)
180
        self._end_header = False
181
        self._columns = []
182
        self.columns_name = False #To know if the next line contains
183
                                  #analytic's columns name
184
185
    def _parseline(self, line):
186
        # Process the line differenly if it pertains at header or results block
187
        if self._end_header == False:
188
            sline = line.strip(',')
189
            return self.parse_headerline(sline)
190
        else:
191
            return self.parse_resultline(line)
192
193
    def csvDate2BikaDate(self,DateTime):
194
    #11/03/2014 14:46:46 --> %d/%m/%Y %H:%M %p
195
        dtobj = datetime.strptime(DateTime,"%d/%m/%Y %H:%M:%S")
196
        return dtobj.strftime("%Y%m%d %H:%M:%S")
197
198
    def splitLine(self, line):
199
        # If pertains at header it split the line by ':' and then remove ','
200
        # Else split by ',' and remove blank spaces
201
        if self._end_header == False:
202
            sline = line.split(':')
203
            return [token.strip(',') for token in sline]
204
205
        return [token.strip() for token in line.split(',')]
206
207
    def parse_headerline(self, line):
208
        #Process incoming header line
209
        """
210
        29/11/2013 10:15:44
211
        PANalytical
212
        "Quantification of sample ESFERA CINZA - 1g H3BO3 -  1:0,5 - NO PPC",
213
214
        R.M.S.:,"0,035"
215
        Result status:,
216
        Sum before normalization:,"119,5 %"
217
        Normalised to:,"100,0 %"
218
        Sample type:,Pressed powder
219
        Initial sample weight (g):,"2,000"
220
        Weight after pressing (g):,"3,000"
221
        Correction applied for medium:,No
222
        Correction applied for film:,No
223
        Used Compound list:,Oxides
224
        Results database:,omnian 2013
225
        Results database in:,c:\panalytical\superq\userdata
226
        """
227
228
        if line.startswith('"Quantification of sample') or line.startswith('Quantification of sample'):
229
            line = to_unicode(line)
230
            if len(self._header) == 0:
231
                self.warn('Unexpected header format', numline=self._numline)
232
                return -1
233
            # Remove non important string and double comas to obtein
234
            # the sample name free
235
            line = line.replace("Quantification of sample ", "")
236
            line = line.replace('"', "")
237
            splitted = line.split(' - ')
238
239
            if len(splitted) > 3:# Maybe we don't need this, i could be all the sample's identifier...
240
                self._header['Sample'] = splitted[0].strip(' ')
241
                self._header['Quantity'] = splitted[1]
242
                self._header['????'] = splitted[2]# At present we
243
                                                  # don't know what
244
                                                  # is that
245
                self._header['PPC'] = splitted[3]
246
            
247
            elif len(splitted) == 1:
248
                self._header['Sample'] = splitted[0].replace('Quantification of sample','').strip(' ')
249
250
            else:
251
                self.warn('Unexpected header format', numline=self._numline)
252
            return 1
253
        # Save each header field (that we know) and its own value in the dict
254 View Code Duplication
        if line.startswith('R.M.S.'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
255
256
            if len(self._header) == 0:
257
                self.err("No header found", numline=self._numline)
258
                return -1
259
260
            splitted = self.splitLine(line)
261
            if len(splitted) > 1:
262
                self._header['R.M.S.'] = splitted[1].replace('"', '').strip()
263
            else:
264
                self.warn('Unexpected header format', numline=self._numline)
265
            return 0
266
267 View Code Duplication
        if line.startswith('Result status'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
268
            if len(self._header) == 0:
269
                self.err("No header found", numline=self._numline)
270
271
            splitted = self.splitLine(line)
272
            if len(splitted) > 1:
273
                self._header['Result status'] = splitted[1].replace('"', '').strip()
274
            else:
275
                self.warn('Unexpected header format', numline=self._numline)
276
277
            return 0
278
279 View Code Duplication
        if line.startswith('Sum before normalization'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
280
            if len(self._header) == 0:
281
                self.err("No header found", numline=self._numline)
282
                return -1
283
284
            splitted = self.splitLine(line)
285
            if len(splitted) > 1:
286
                self._header['Sum'] = splitted[1].replace('"', '').strip()
287
            else:
288
                self.warn('Unexpected header format', numline=self._numline)
289
290
            return 0
291
292 View Code Duplication
        if line.startswith('Normalised to'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
293
            if len(self._header) == 0:
294
                self.err("No header found", numline=self._numline)
295
                return -1
296
297
            splitted = self.splitLine(line)
298
            if len(splitted) > 1:
299
                self._header['Normalized'] = splitted[1].replace('"', '').strip()
300
            else:
301
                self.warn('Unexpected header format', numline=self._numline)
302
303
            return 0
304
305 View Code Duplication
        if line.startswith('Sample type'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
306
            if len(self._header) == 0:
307
                self.err("No header found", numline=self._numline)
308
                return -1
309
310
            splitted = self.splitLine(line)
311
            if len(splitted) > 1:
312
                self._header['Sample type'] = splitted[1].strip()
313
            else:
314
                self.warn('Unexpected header format', numline=self._numline)
315
316
            return 0
317
318 View Code Duplication
        if line.startswith('Initial sample weight (g)'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
319
            if len(self._header) == 0:
320
                self.err("No header found", numline=self._numline)
321
                return -1
322
323
            splitted = self.splitLine(line)
324
            if len(splitted) > 1:
325
                self._header['Initial sample weight'] = splitted[1].replace('"', '').strip()
326
            else:
327
                self.warn('Unexpected header format', numline=self._numline)
328
329
            return 0
330
331 View Code Duplication
        if line.startswith('Weight after pressing (g)'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
332
            if len(self._header) == 0:
333
                self.err("No header found", numline=self._numline)
334
                return -1
335
336
            splitted = self.splitLine(line)
337
            if len(splitted) > 1:
338
                self._header['Weight after pressing'] = splitted[1].replace('"', '').strip()
339
            else:
340
                self.warn('Unexpected header format', numline=self._numline)
341
342
            return 0
343
344 View Code Duplication
        if line.startswith('Correction applied for medium'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
345
            if len(self._header) == 0:
346
                self.warn('Unexpected header format', numline=self._numline)
347
                return -1
348
349
            splitted = self.splitLine(line)
350
            if len(splitted) > 1:
351
                self._header['Correction medium'] = splitted[1].replace('"', '').strip()
352
            else:
353
                self.warn('Unexpected header format', numline=self._numline)
354
355
            return 0
356
357 View Code Duplication
        if line.startswith('Correction applied for film'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
358
            if len(self._header) == 0:
359
                self.err("No header found", numline=self._numline)
360
                return -1
361
362
            splitted = self.splitLine(line)
363
            if len(splitted) > 1:
364
                self._header['Correction film'] = splitted[1].replace('"', '').strip()
365
            else:
366
                self.warn('Unexpected header format', numline=self._numline)
367
368
            return 0
369
370 View Code Duplication
        if line.startswith('Used Compound list'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
371
            if len(self._header) == 0:
372
                self.err("No header found", numline=self._numline)
373
                return -1
374
375
            splitted = self.splitLine(line)
376
            if len(splitted) > 1:
377
                self._header['Used compound'] = splitted[1].replace('"', '').strip()
378
            else:
379
                self.warn('Unexpected header format', numline=self._numline)
380
381
            return 0
382 View Code Duplication
        if line.startswith('Results database:'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
383
            if len(self._header) == 0:
384
                self.err("No header found", numline=self._numline)
385
                return -1
386
387
            splitted = self.splitLine(line)
388
            if len(splitted) > 1:
389
                self._header['Result database'] = splitted[1].replace('"', '').strip()
390
            else:
391
                self.warn('Unexpected header format', numline=self._numline)
392
393
            return 0
394
395
       
396
        if self.columns_name:
397
            if len(self._header) == 0:
398
                self.err("No header found", numline=self._numline)
399
                return -1
400
401
            #Grab column names
402
            self._end_header = True
403
            self._columns = self.splitLine(line)
404
            return 1
405
406
        if line.startswith('Results database in'):
407
            if len(self._header) == 0:
408
                self.err("No header found", numline=self._numline)
409
                return -1
410
            
411
            splitted = self.splitLine(line)
412
            if len(splitted) > 1:
413
                self._header['Database path'] = splitted[1]+splitted[2]
414
                self.columns_name = True
415
            else:
416
                self.warn('Unexpected header format', numline=self._numline)
417
                
418
            return 1
419
            
420
        else:
421
            self._header['Date'] = line
422
            return 1
423
424
    def parse_resultline(self, line):
425
        # Process incoming results line
426
        if not line.strip():
427
            return 0
428
429
        rawdict = {}
430
        # Split by ","
431
        splitted = self.splitLine(line.strip(";"))
432
        # Look to know if the first value is an enumerate field
433
        try:
434
            int(splitted[0])
435
            rawdict["num"] = splitted[0]
436
            splitted = splitted[1:]
437
        except ValueError:
438
            pass
439
440
        # Enumerate the list to obtain: [(0,data0),(1,data1),...]
441
        e_splitted = list(enumerate(splitted))
442
        errors = ''
443
444
        com = False
445
        for idx, result in e_splitted:
446
            if result.startswith('"'):
447
                # It means that is the first value part
448
                # Consequently we take second part and append both
449
                result = (e_splitted[idx][1].strip('"') + "," + e_splitted[idx+1][1].strip('"'))
450
                e_splitted[idx] = (idx,result)
451
                e_splitted.remove(e_splitted[idx+1])
452
                com = True
453
                rawdict[self._columns[idx]] = result
454
                conc = self._columns[idx] # Main value's name
455
                                
456
               
457
            elif com:# We have rm the 2nd part value, consequently we
458
                    # need to decrement idx
459
                if len(self._columns) <= idx-1:
460
                    self.err("Orphan value in column ${index}",
461
                             mapping={"index":str(idx + 1)},
462
                             numline=self._numline)
463
                    break
464
                # We add and sync the result with its value's name
465
                rawdict[self._columns[idx-1]] = result
466
467
            else:
468
                if len(self._columns) <= idx:
469
                    self.err("Orphan value in column ${index}",
470
                             mapping={"index":str(idx + 1)},
471
                             numline=self._numline)
472
                    break
473
                rawdict[self._columns[idx]] = result
474
475
        aname = rawdict[self._columns[0]]# The fisrt column is analytic name  
476
        if not aname:
477
            self.err("No Analysis Name defined", numline=self._numline)
478
            return 0
479
        elif aname == "<H>":
480
            # <H> maybe is data error header? We need more examples...
481
            errors = rawdict.get('Compound')
482
            notes = rawdict.get('Calibration')
483
            rawdict['Notes'] = notes
484
485
        rid = self._header['Sample']
486
        if not rid:
487
            self.err("No Sample defined", numline=self._numline)
488
            return 0
489
490
        notes = rawdict.get('Notes', '')
491
        notes = "Notes: %s" % notes if notes else ''
492
        rawdict['DefaultResult'] = conc
0 ignored issues
show
introduced by
The variable conc does not seem to be defined in case the for loop on line 445 is not entered. Are you sure this can never be the case?
Loading history...
493
        # Replace to obtain UK values from default
494
        rawdict[conc] = rawdict[conc].replace(',','.')
495
        rawdict['Remarks'] = ' '.join([errors, notes])
496
        rawres = self.getRawResults().get(rid, [])
497
        raw = rawres[0] if len(rawres) > 0 else {}
498
        raw[aname] = rawdict
499
        if not 'DateTime' in raw:
500
            try:
501
                raw['DateTime'] = {'DateTime':self.csvDate2BikaDate(self._header['Date']),
502
                                   'DefaultValue':'DateTime'}
503
            except:
504
                pass
505
            
506
        self._addRawResult(rid, raw, True)
507
        return 0
508
509
510
    def getAttachmentFileType(self):
511
        return "PANalytical - Omnia Axios XRF"
512
513
514
class AxiosXrfImporter(AnalysisResultsImporter):
515
516
    def __init__(self, parser, context,  override,
517
                 allowed_ar_states=None, allowed_analysis_states=None,
518
                 instrument_uid=None):
519
        AnalysisResultsImporter.__init__(self, parser, context,
520
                                          override,
521
                                         allowed_ar_states,
522
                                         allowed_analysis_states,
523
                                         instrument_uid)
524