Passed
Push — master ( d8e2ec...90ae0b )
by Jordi
10:07 queued 04:19
created

AxiosXrfCSVMultiParser._parseline()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
""" Omnia Axios XRF
9
"""
10
from datetime import datetime
11
from bika.lims.utils import to_unicode
12
from bika.lims import bikaMessageFactory as _
13
from bika.lims.exportimport.instruments.resultsimport import \
14
    AnalysisResultsImporter, InstrumentCSVResultsFileParser
15
16
class AxiosXrfCSVMultiParser(InstrumentCSVResultsFileParser):
17
18
    def __init__(self, csv):
19
        InstrumentCSVResultsFileParser.__init__(self, csv)
20
        self._end_header = False
21
        self._columns = []
22
        self.columns_name = False #To know if the next line contains
23
                                  #analytic's columns name
24
25
26
    def _parseline(self, line):
27
        # Process the line differenly if it pertains at header or results block
28
        if self._end_header == False:
29
            sline = line.strip(',')
30
            return self.parse_headerline(sline)
31
        else:
32
            return self.parse_resultline(line)
33
34
    def splitLine(self, line):
35
        # If pertains at header it split the line by ':' and then remove ','
36
        # Else split by ',' and remove blank spaces
37
        if self._end_header == False:
38
            sline = line.split(':')
39
            return [token.strip(',') for token in sline]
40
41
        return [token.strip() for token in line.split(',')]
42
43
    def csvDate2BikaDate(self,DateTime):
44
    #11/03/2014 14:46:46 --> %d/%m/%Y %H:%M %p
45
        dtobj = datetime.strptime(DateTime,"%d/%m/%Y %H:%M:%S")
46
        return dtobj.strftime("%Y%m%d %H:%M:%S")
47
48
    def parse_headerline(self, line):
49
        #Process incoming header line
50
        """11/03/2014 14:46:46
51
        PANalytical
52
        Results quantitative - Omnian 2013,
53
54
        Selected archive:,Omnian 2013
55
        Number of results selected:,4
56
        """
57
        
58
        # Save each header field (that we know) and its own value in the dict        
59
        if line.startswith('Results quantitative'):
60
            line = to_unicode(line)
61
            if len(self._header) == 0:
62
                self.err("Unexpected header format", numline=self._numline)
63
                return -1
64
65
            line = line.replace(',', "")
66
            splitted = line.split(' - ')
67
            self._header['Quantitative'] = splitted[1]
68
            return 1
69
70 View Code Duplication
        if line.startswith('Selected archive'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
71
            if len(self._header) == 0:
72
                self.err("No header found", numline=self._numline)
73
                return -1
74
75
            splitted = self.splitLine(line)
76
            if len(splitted) > 1:
77
                self._header['Archive'] = splitted[1].replace('"', '').strip()
78
            else:
79
                self.warn('Unexpected header format', numline=self._numline)
80
            return 0
81
82 View Code Duplication
        if line.startswith('Number of'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
83
            if len(self._header) == 0:
84
                self.err("No header found", numline=self._numline)
85
                return -1
86
87
            splitted = self.splitLine(line)
88
            if len(splitted) > 1:
89
                self._header['NumResults'] = splitted[1].replace('"', '').strip()
90
            else:
91
                self.warn('Unexpected header format', numline=self._numline)
92
            return 0
93
94
        if line.startswith('Seq.'):
95
            if len(self._header) == 0:
96
                self.err("No header found", numline=self._numline)
97
                return -1
98
            #Grab column names
99
            self._columns = line.split(',')
100
            self._end_header = True
101
            return 1
102
103
        else:
104
            self._header['Date'] = line
105
            return 1
106
107
108
109
    def parse_resultline(self, line):
110
        # Process incoming results line
111
        if not line.strip():
112
            return 0
113
        if line.startswith(',,'):
114
            return 0
115
116
        rawdict = {}
117
        # Split by ","
118
        splitted = self.splitLine(line.strip(";"))
119
120
        errors = ''
121
122
        # Adjunt separated values from split by ','
123
        for idx, result in enumerate(splitted):
124
            if result.startswith('"'):
125
                # It means that is the value's firts part
126
                # Consequently we take second part and append both
127
                result = (splitted[idx].strip('"') + "," + splitted[idx+1].strip('"'))
128
                splitted[idx] = result
129
                splitted.remove(splitted[idx+1])
130
                
131
        result_type = ''
132
        result_sum = ''
133
        for idx, result in enumerate(splitted):
134
            if self._columns[idx] == 'Result type':
135
                result_type = result
136
            elif self._columns[idx].startswith('Sample name'):
137
                    rid = result
138
            elif self._columns[idx].startswith('Seq.'):
139
                pass
140
            elif self._columns[idx] == 'Sum':
141
                    result_sum = result
142
            else:
143
                rawdict[self._columns[idx]] = {'DefaultResult':result_type,
144
                                               # Replace to obtain UK values from default
145
                                               'Concentration':result.replace(',','.'),
146
                                               'Sum':result_sum}
147
        try:
148
            rawdict['DateTime'] = {'DateTime':self.csvDate2BikaDate(self._header['Date']),
149
                                   'DefaultValue':'DateTime'}
150
        except:
151
            pass
152
        if not rid:
0 ignored issues
show
introduced by
The variable rid does not seem to be defined in case the for loop on line 133 is not entered. Are you sure this can never be the case?
Loading history...
153
            self.err("No Sample defined", numline=self._numline)
154
            return 0
155
156
        self._addRawResult(rid, rawdict, True)
157
        return 0
158
159
160
    def getAttachmentFileType(self):
161
        return "PANalytical - Omnia Axios XRF"
162
163
class AxiosXrfCSVParser(InstrumentCSVResultsFileParser):
164
165
    def __init__(self, csv):
166
        InstrumentCSVResultsFileParser.__init__(self, csv)
167
        self._end_header = False
168
        self._columns = []
169
        self.columns_name = False #To know if the next line contains
170
                                  #analytic's columns name
171
172
    def _parseline(self, line):
173
        # Process the line differenly if it pertains at header or results block
174
        if self._end_header == False:
175
            sline = line.strip(',')
176
            return self.parse_headerline(sline)
177
        else:
178
            return self.parse_resultline(line)
179
180
    def csvDate2BikaDate(self,DateTime):
181
    #11/03/2014 14:46:46 --> %d/%m/%Y %H:%M %p
182
        dtobj = datetime.strptime(DateTime,"%d/%m/%Y %H:%M:%S")
183
        return dtobj.strftime("%Y%m%d %H:%M:%S")
184
185
    def splitLine(self, line):
186
        # If pertains at header it split the line by ':' and then remove ','
187
        # Else split by ',' and remove blank spaces
188
        if self._end_header == False:
189
            sline = line.split(':')
190
            return [token.strip(',') for token in sline]
191
192
        return [token.strip() for token in line.split(',')]
193
194
    def parse_headerline(self, line):
195
        #Process incoming header line
196
        """
197
        29/11/2013 10:15:44
198
        PANalytical
199
        "Quantification of sample ESFERA CINZA - 1g H3BO3 -  1:0,5 - NO PPC",
200
201
        R.M.S.:,"0,035"
202
        Result status:,
203
        Sum before normalization:,"119,5 %"
204
        Normalised to:,"100,0 %"
205
        Sample type:,Pressed powder
206
        Initial sample weight (g):,"2,000"
207
        Weight after pressing (g):,"3,000"
208
        Correction applied for medium:,No
209
        Correction applied for film:,No
210
        Used Compound list:,Oxides
211
        Results database:,omnian 2013
212
        Results database in:,c:\panalytical\superq\userdata
213
        """
214
215
        if line.startswith('"Quantification of sample') or line.startswith('Quantification of sample'):
216
            line = to_unicode(line)
217
            if len(self._header) == 0:
218
                self.warn('Unexpected header format', numline=self._numline)
219
                return -1
220
            # Remove non important string and double comas to obtein
221
            # the sample name free
222
            line = line.replace("Quantification of sample ", "")
223
            line = line.replace('"', "")
224
            splitted = line.split(' - ')
225
226
            if len(splitted) > 3:# Maybe we don't need this, i could be all the sample's identifier...
227
                self._header['Sample'] = splitted[0].strip(' ')
228
                self._header['Quantity'] = splitted[1]
229
                self._header['????'] = splitted[2]# At present we
230
                                                  # don't know what
231
                                                  # is that
232
                self._header['PPC'] = splitted[3]
233
            
234
            elif len(splitted) == 1:
235
                self._header['Sample'] = splitted[0].replace('Quantification of sample','').strip(' ')
236
237
            else:
238
                self.warn('Unexpected header format', numline=self._numline)
239
            return 1
240
        # Save each header field (that we know) and its own value in the dict
241 View Code Duplication
        if line.startswith('R.M.S.'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
242
243
            if len(self._header) == 0:
244
                self.err("No header found", numline=self._numline)
245
                return -1
246
247
            splitted = self.splitLine(line)
248
            if len(splitted) > 1:
249
                self._header['R.M.S.'] = splitted[1].replace('"', '').strip()
250
            else:
251
                self.warn('Unexpected header format', numline=self._numline)
252
            return 0
253
254 View Code Duplication
        if line.startswith('Result status'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
255
            if len(self._header) == 0:
256
                self.err("No header found", numline=self._numline)
257
258
            splitted = self.splitLine(line)
259
            if len(splitted) > 1:
260
                self._header['Result status'] = splitted[1].replace('"', '').strip()
261
            else:
262
                self.warn('Unexpected header format', numline=self._numline)
263
264
            return 0
265
266 View Code Duplication
        if line.startswith('Sum before normalization'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
267
            if len(self._header) == 0:
268
                self.err("No header found", numline=self._numline)
269
                return -1
270
271
            splitted = self.splitLine(line)
272
            if len(splitted) > 1:
273
                self._header['Sum'] = splitted[1].replace('"', '').strip()
274
            else:
275
                self.warn('Unexpected header format', numline=self._numline)
276
277
            return 0
278
279 View Code Duplication
        if line.startswith('Normalised to'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
280
            if len(self._header) == 0:
281
                self.err("No header found", numline=self._numline)
282
                return -1
283
284
            splitted = self.splitLine(line)
285
            if len(splitted) > 1:
286
                self._header['Normalized'] = splitted[1].replace('"', '').strip()
287
            else:
288
                self.warn('Unexpected header format', numline=self._numline)
289
290
            return 0
291
292 View Code Duplication
        if line.startswith('Sample type'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
293
            if len(self._header) == 0:
294
                self.err("No header found", numline=self._numline)
295
                return -1
296
297
            splitted = self.splitLine(line)
298
            if len(splitted) > 1:
299
                self._header['Sample type'] = splitted[1].strip()
300
            else:
301
                self.warn('Unexpected header format', numline=self._numline)
302
303
            return 0
304
305 View Code Duplication
        if line.startswith('Initial sample weight (g)'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
306
            if len(self._header) == 0:
307
                self.err("No header found", numline=self._numline)
308
                return -1
309
310
            splitted = self.splitLine(line)
311
            if len(splitted) > 1:
312
                self._header['Initial sample weight'] = splitted[1].replace('"', '').strip()
313
            else:
314
                self.warn('Unexpected header format', numline=self._numline)
315
316
            return 0
317
318 View Code Duplication
        if line.startswith('Weight after pressing (g)'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
319
            if len(self._header) == 0:
320
                self.err("No header found", numline=self._numline)
321
                return -1
322
323
            splitted = self.splitLine(line)
324
            if len(splitted) > 1:
325
                self._header['Weight after pressing'] = splitted[1].replace('"', '').strip()
326
            else:
327
                self.warn('Unexpected header format', numline=self._numline)
328
329
            return 0
330
331 View Code Duplication
        if line.startswith('Correction applied for medium'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
332
            if len(self._header) == 0:
333
                self.warn('Unexpected header format', numline=self._numline)
334
                return -1
335
336
            splitted = self.splitLine(line)
337
            if len(splitted) > 1:
338
                self._header['Correction medium'] = splitted[1].replace('"', '').strip()
339
            else:
340
                self.warn('Unexpected header format', numline=self._numline)
341
342
            return 0
343
344 View Code Duplication
        if line.startswith('Correction applied for film'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
345
            if len(self._header) == 0:
346
                self.err("No header found", numline=self._numline)
347
                return -1
348
349
            splitted = self.splitLine(line)
350
            if len(splitted) > 1:
351
                self._header['Correction film'] = splitted[1].replace('"', '').strip()
352
            else:
353
                self.warn('Unexpected header format', numline=self._numline)
354
355
            return 0
356
357 View Code Duplication
        if line.startswith('Used Compound list'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
358
            if len(self._header) == 0:
359
                self.err("No header found", numline=self._numline)
360
                return -1
361
362
            splitted = self.splitLine(line)
363
            if len(splitted) > 1:
364
                self._header['Used compound'] = splitted[1].replace('"', '').strip()
365
            else:
366
                self.warn('Unexpected header format', numline=self._numline)
367
368
            return 0
369 View Code Duplication
        if line.startswith('Results database:'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
370
            if len(self._header) == 0:
371
                self.err("No header found", numline=self._numline)
372
                return -1
373
374
            splitted = self.splitLine(line)
375
            if len(splitted) > 1:
376
                self._header['Result database'] = splitted[1].replace('"', '').strip()
377
            else:
378
                self.warn('Unexpected header format', numline=self._numline)
379
380
            return 0
381
382
       
383
        if self.columns_name:
384
            if len(self._header) == 0:
385
                self.err("No header found", numline=self._numline)
386
                return -1
387
388
            #Grab column names
389
            self._end_header = True
390
            self._columns = self.splitLine(line)
391
            return 1
392
393
        if line.startswith('Results database in'):
394
            if len(self._header) == 0:
395
                self.err("No header found", numline=self._numline)
396
                return -1
397
            
398
            splitted = self.splitLine(line)
399
            if len(splitted) > 1:
400
                self._header['Database path'] = splitted[1]+splitted[2]
401
                self.columns_name = True
402
            else:
403
                self.warn('Unexpected header format', numline=self._numline)
404
                
405
            return 1
406
            
407
        else:
408
            self._header['Date'] = line
409
            return 1
410
411
    def parse_resultline(self, line):
412
        # Process incoming results line
413
        if not line.strip():
414
            return 0
415
416
        rawdict = {}
417
        # Split by ","
418
        splitted = self.splitLine(line.strip(";"))
419
        # Look to know if the first value is an enumerate field
420
        try:
421
            int(splitted[0])
422
            rawdict["num"] = splitted[0]
423
            splitted = splitted[1:]
424
        except ValueError:
425
            pass
426
427
        # Enumerate the list to obtain: [(0,data0),(1,data1),...]
428
        e_splitted = list(enumerate(splitted))
429
        errors = ''
430
431
        com = False
432
        for idx, result in e_splitted:
433
            if result.startswith('"'):
434
                # It means that is the first value part
435
                # Consequently we take second part and append both
436
                result = (e_splitted[idx][1].strip('"') + "," + e_splitted[idx+1][1].strip('"'))
437
                e_splitted[idx] = (idx,result)
438
                e_splitted.remove(e_splitted[idx+1])
439
                com = True
440
                rawdict[self._columns[idx]] = result
441
                conc = self._columns[idx] # Main value's name
442
                                
443
               
444
            elif com:# We have rm the 2nd part value, consequently we
445
                    # need to decrement idx
446
                if len(self._columns) <= idx-1:
447
                    self.err("Orphan value in column ${index}",
448
                             mapping={"index":str(idx + 1)},
449
                             numline=self._numline)
450
                    break
451
                # We add and sync the result with its value's name
452
                rawdict[self._columns[idx-1]] = result
453
454
            else:
455
                if len(self._columns) <= idx:
456
                    self.err("Orphan value in column ${index}",
457
                             mapping={"index":str(idx + 1)},
458
                             numline=self._numline)
459
                    break
460
                rawdict[self._columns[idx]] = result
461
462
        aname = rawdict[self._columns[0]]# The fisrt column is analytic name  
463
        if not aname:
464
            self.err("No Analysis Name defined", numline=self._numline)
465
            return 0
466
        elif aname == "<H>":
467
            # <H> maybe is data error header? We need more examples...
468
            errors = rawdict.get('Compound')
469
            notes = rawdict.get('Calibration')
470
            rawdict['Notes'] = notes
471
472
        rid = self._header['Sample']
473
        if not rid:
474
            self.err("No Sample defined", numline=self._numline)
475
            return 0
476
477
        notes = rawdict.get('Notes', '')
478
        notes = "Notes: %s" % notes if notes else ''
479
        rawdict['DefaultResult'] = conc
0 ignored issues
show
introduced by
The variable conc does not seem to be defined in case the for loop on line 432 is not entered. Are you sure this can never be the case?
Loading history...
480
        # Replace to obtain UK values from default
481
        rawdict[conc] = rawdict[conc].replace(',','.')
482
        rawdict['Remarks'] = ' '.join([errors, notes])
483
        rawres = self.getRawResults().get(rid, [])
484
        raw = rawres[0] if len(rawres) > 0 else {}
485
        raw[aname] = rawdict
486
        if not 'DateTime' in raw:
487
            try:
488
                raw['DateTime'] = {'DateTime':self.csvDate2BikaDate(self._header['Date']),
489
                                   'DefaultValue':'DateTime'}
490
            except:
491
                pass
492
            
493
        self._addRawResult(rid, raw, True)
494
        return 0
495
496
497
    def getAttachmentFileType(self):
498
        return "PANalytical - Omnia Axios XRF"
499
500
501
class AxiosXrfImporter(AnalysisResultsImporter):
502
503
    def __init__(self, parser, context,  override,
504
                 allowed_ar_states=None, allowed_analysis_states=None,
505
                 instrument_uid=None):
506
        AnalysisResultsImporter.__init__(self, parser, context,
507
                                          override,
508
                                         allowed_ar_states,
509
                                         allowed_analysis_states,
510
                                         instrument_uid)
511