Completed
Branch master (d329f3)
by Jordi
03:26
created

Analysis_Specifications.Import()   D

Complexity

Conditions 13

Size

Total Lines 53
Code Lines 49

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 49
dl 0
loc 53
rs 4.2
c 0
b 0
f 0
cc 13
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like bika.health.setupdata.Analysis_Specifications.Import() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.HEALTH
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
from Products.CMFCore.utils import getToolByName
9
from Products.CMFPlone.utils import safe_unicode, _createObjectByType
10
from bika.health import logger
11
from Products.CMFCore.utils import getToolByName
12
from bika.lims.exportimport.dataimport import SetupDataSetList as SDL
13
from bika.lims.exportimport.setupdata import WorksheetImporter
14
from bika.lims.idserver import renameAfterCreation
15
from bika.lims.interfaces import ISetupDataSetList
16
from bika.lims.utils import tmpID
17
from pkg_resources import resource_filename
18
from zope.interface import implements
19
import transaction, os.path
20
21
22
class SetupDataSetList(SDL):
23
24
    implements(ISetupDataSetList)
25
26
    def __call__(self):
27
        return SDL.__call__(self, projectname="bika.health")
28
29
30
class Symptoms(WorksheetImporter):
31
32
    def Import(self):
33
        folder = self.context.bika_setup.bika_symptoms
34
        rows = self.get_rows(3)
35
        for row in rows:
36
            _id = folder.invokeFactory('Symptom', id=tmpID())
37
            obj = folder[_id]
38
            if row['Title']:
39
                obj.edit(Code=row.get('Code', ''),
40
                         title=row['Title'],
41
                         description=row.get('Description', ''),
42
                         Gender=row.get('Gender', 'dk'),
43
                         SeverityAllowed=self.to_bool(row.get('SeverityAllowed', 1)))
44
                obj.unmarkCreationFlag()
45
                renameAfterCreation(obj)
46
47
48
class Treatments(WorksheetImporter):
49
50
    def Import(self):
51
        folder = self.context.bika_setup.bika_treatments
52
        for row in self.get_rows(3):
53
            obj = _createObjectByType('Treatment', folder, tmpID())
54
            if row['title']:
55
                obj.edit(title=row['title'],
56
                         description=row.get('description', ''),
57
                         Type=row.get('Type', 'active'),
58
                         Procedure=row.get('Procedure', ''),
59
                         Care=row.get('Care', ''),
60
                         SubjectiveClinicalFindings=row.get('SubjectiveClinicalFindings', ''),
61
                         ObjectiveClinicalFindings=row.get('ObjectiveClinicalFindings', ''),)
62
                obj.unmarkCreationFlag()
63
                renameAfterCreation(obj)
64
65
66
class Aetiologic_Agents(WorksheetImporter):
67
68
    def get_subtypes(self):
69
        sheetname = 'Aetiologic Agents Subtypes'
70
        worksheet = self.workbook.get_sheet_by_name(sheetname)
71
        if not worksheet:
72
            return
73
        subtypes = {}
74
        rows = self.get_rows(3, worksheet=worksheet)
75
        for row in rows:
76
            ae_title = row['aetiologicagent_title']
77
            if ae_title not in subtypes.keys():
78
                subtypes[ae_title] = []
79
            subtypes[ae_title].append({'Subtype': row.get('Subtype', ''),
80
                                       'SubtypeRemarks': row.get('SubtypeRemarks', '')})
81
        return subtypes
82
83
    def Import(self):
84
        subtypes = self.get_subtypes()
85
        folder = self.context.bika_setup.bika_aetiologicagents
86
        for row in self.get_rows(3):
87
            obj = _createObjectByType('AetiologicAgent', folder, tmpID())
88
            if not row['title']:
89
                continue
90
            ae_title = row['title']
91
            ae_subtypes = subtypes.get(ae_title, [])
92
            obj.edit(title=row['title'],
93
                     description=row.get('description', ''),
94
                     AetiologicAgentSubtypes=ae_subtypes)
95
            obj.unmarkCreationFlag()
96
            renameAfterCreation(obj)
97
98
99
class Case_Syndromic_Classifications(WorksheetImporter):
100
101
    def Import(self):
102
        folder = self.context.bika_setup.bika_casesyndromicclassifications
103
        for row in self.get_rows(3):
104
            obj = _createObjectByType('CaseSyndromicClassification', folder, tmpID())
105
            if row['title']:
106
                obj.edit(title=row['title'],
107
                         description=row.get('description', ''),)
108
                obj.unmarkCreationFlag()
109
                renameAfterCreation(obj)
110
111
112
class Drugs(WorksheetImporter):
113
114
    def Import(self):
115
        folder = self.context.bika_setup.bika_drugs
116
        for row in self.get_rows(3):
117
            obj = _createObjectByType('Drug', folder, tmpID())
118
            if row['title']:
119
                obj.edit(title=row['title'],
120
                         description=row.get('description', ''),
121
                         Category=row.get('Category', ''),
122
                         Indications=row.get('Indications', ''),
123
                         Posology=row.get('Posology', ''),
124
                         SideEffects=row.get('SideEffects', ''),
125
                         Preservation=row.get('Preservation', ''),)
126
                obj.unmarkCreationFlag()
127
                renameAfterCreation(obj)
128
129
130
class Drug_Prohibitions(WorksheetImporter):
131
132
    def Import(self):
133
        folder = self.context.bika_setup.bika_drugprohibitions
134
        for row in self.get_rows(3):
135
            obj = _createObjectByType('DrugProhibition', folder, tmpID())
136
            if row['title']:
137
                obj.edit(title=row['title'],
138
                         description=row.get('description', ''),)
139
                obj.unmarkCreationFlag()
140
                renameAfterCreation(obj)
141
142
143
class Identifier_Types(WorksheetImporter):
144
145
    def Import(self):
146
        folder = self.context.bika_setup.bika_identifiertypes
147
        for row in self.get_rows(3):
148
            obj = _createObjectByType('IdentifierType', folder, tmpID())
149
            if row['title']:
150
                obj.edit(title=row['title'],
151
                         description=row.get('description', ''),)
152
                obj.unmarkCreationFlag()
153
                renameAfterCreation(obj)
154
155
156
class Immunizations(WorksheetImporter):
157
158
    def Import(self):
159
        folder = self.context.bika_setup.bika_immunizations
160
        for row in self.get_rows(3):
161
            obj = _createObjectByType('Immunization', folder, tmpID())
162
            if row['title']:
163
                obj.edit(title=row['title'],
164
                         description=row.get('description', ''),
165
                         Form=row.get('Form', 'active'),
166
                         RelevantFacts=row.get('RelevantFacts', ''),
167
                         GeographicalDistribution=row.get('GeographicalDistribution', ''),
168
                         Transmission=row.get('Transmission', ''),
169
                         Symptoms=row.get('Symptoms', ''),
170
                         Risk=row.get('Risk', ''),
171
                         Treatment=row.get('Treatment', ''),
172
                         Prevention=row.get('Prevention', ''),)
173
                obj.unmarkCreationFlag()
174
                renameAfterCreation(obj)
175
176
177
class Vaccination_Centers(WorksheetImporter):
178
179
    def Import(self):
180
        folder = self.context.bika_setup.bika_vaccinationcenters
181
        for row in self.get_rows(3):
182
            obj = _createObjectByType("VaccinationCenter", folder, tmpID())
183
            if row['Name']:
184
                obj.edit(
185
                    Name=row.get('Name', ''),
186
                    TaxNumber=row.get('TaxNumber', ''),
187
                    AccountType=row.get('AccountType', {}),
188
                    AccountName=row.get('AccountName', {}),
189
                    AccountNumber=row.get('AccountNumber', ''),
190
                    BankName=row.get('BankName', ''),
191
                    BankBranch=row.get('BankBranch', ''),
192
                )
193
                self.fill_contactfields(row, obj)
194
                self.fill_addressfields(row, obj)
195
                obj.unmarkCreationFlag()
196
                renameAfterCreation(obj)
197
198
199
class Case_Outcomes(WorksheetImporter):
200
201 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
202
        folder = self.context.bika_setup.bika_caseoutcomes
203
        rows = self.get_rows(3)
204
        for row in rows:
205
            if row['title']:
206
                _id = folder.invokeFactory('CaseOutcome', id=tmpID())
207
                obj = folder[_id]
208
                obj.edit(title=row['title'],
209
                         description=row.get('description', ''))
210
                obj.unmarkCreationFlag()
211
                renameAfterCreation(obj)
212
213
214
class Case_Statuses(WorksheetImporter):
215
216 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
217
        folder = self.context.bika_setup.bika_casestatuses
218
        rows = self.get_rows(3)
219
        for row in rows:
220
            if row['title']:
221
                _id = folder.invokeFactory('CaseStatus', id=tmpID())
222
                obj = folder[_id]
223
                obj.edit(title=row['title'],
224
                         description=row.get('description', ''))
225
                obj.unmarkCreationFlag()
226
                renameAfterCreation(obj)
227
228
229
class Diseases(WorksheetImporter):
230
231
    def Import(self):
232
        folder = self.context.bika_setup.bika_diseases
233
        rows = self.get_rows(3)
234
        for row in rows:
235
            _id = folder.invokeFactory('Disease', id=tmpID())
236
            obj = folder[_id]
237
            if row['Title']:
238
                obj.edit(ICDCode=row.get('ICDCode', ''),
239
                         title=row['Title'],
240
                         description=row.get('Description', ''))
241
                obj.unmarkCreationFlag()
242
                renameAfterCreation(obj)
243
244
245
class Doctors(WorksheetImporter):
246
247
    def Import(self):
248
        folder = self.context.doctors
249
        rows = self.get_rows(3)
250
        for row in rows:
251
            if not row['Firstname']:
252
                continue
253
254
            _id = folder.invokeFactory('Doctor', id=tmpID())
255
            obj = folder[_id]
256
            Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip()
257
            obj.edit(title=Fullname,
258
                     Salutation = row.get('Salutation', ''),
259
                     Firstname = row.get('Firstname', ''),
260
                     Surname = row.get('Surname', ''),
261
                     JobTitle = row.get('JobTitle', ''),
262
                     Department = row.get('Department', ''),
263
                     DoctorID = row.get('DoctorID', ''),
264
                     PublicationPreference = row.get('PublicationPreference','').split(","),
265
                     AttachmentsPermitted = self.to_bool(row.get('AttachmentsPermitted','True'))
266
                     )
267
            self.fill_contactfields(row, obj)
268
            self.fill_addressfields(row, obj)
269
            obj.unmarkCreationFlag()
270
            renameAfterCreation(obj)
271
272
273
class Ethnicities(WorksheetImporter):
274
275 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
276
        folder = self.context.bika_setup.bika_ethnicities
277
        rows = self.get_rows(3)
278
        for row in rows:
279
            _id = folder.invokeFactory('Ethnicity', id=tmpID())
280
            obj = folder[_id]
281
            if row.get('Title', None):
282
                obj.edit(title=row['Title'],
283
                         description=row.get('Description', ''))
284
                obj.unmarkCreationFlag()
285
                renameAfterCreation(obj)
286
287
288
class Patients(WorksheetImporter):
289
290
    def Import(self):
291
        folder = self.context.patients
292
        rows = self.get_rows(3)
293
        for row in rows:
294
            if not row['Firstname'] or not row['PrimaryReferrer']:
295
                continue
296
            pc = getToolByName(self.context, 'portal_catalog')
297
            client = pc(portal_type='Client', Title=row['PrimaryReferrer'])
298
            if len(client) == 0:
299
                error = "Primary referrer invalid: '%s'. Patient '%s %s' will not be uploaded"
300
                logger.error(error, row['PrimaryReferrer'], row['Firstname'], row.get('Surname', ''))
301
                continue
302
303
            client = client[0].getObject()
304
305
            # Getting an existing ethnicity
306
            bsc = getToolByName(self.context, 'bika_setup_catalog')
307
            ethnicity = bsc(portal_type='Ethnicity', Title=row.get('Ethnicity', ''))
308
            if len(ethnicity) == 0:
309
                raise IndexError("Invalid ethnicity: '%s'" % row['Ethnicity'])
310
            ethnicity = ethnicity[0].getObject()
311
312
            _id = folder.invokeFactory('Patient', id=tmpID())
313
            obj = folder[_id]
314
            obj.unmarkCreationFlag()
315
            renameAfterCreation(obj)
316
            Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip()
317
            obj.edit(PatientID=row.get('PatientID'),
318
                     title=Fullname,
319
                     ClientPatientID = row.get('ClientPatientID', ''),
320
                     Salutation = row.get('Salutation', ''),
321
                     Firstname = row.get('Firstname', ''),
322
                     Surname = row.get('Surname', ''),
323
                     PrimaryReferrer = client.UID(),
324
                     Gender = row.get('Gender', 'dk'),
325
                     Age = row.get('Age', ''),
326
                     BirthDate = row.get('BirthDate', ''),
327
                     BirthDateEstimated =self.to_bool(row.get('BirthDateEstimated','False')),
328
                     BirthPlace = row.get('BirthPlace', ''),
329
                     # TODO Ethnicity_Obj -> Ethnicity on health v319
330
                     Ethnicity_Obj=ethnicity.UID(),
331
                     Citizenship =row.get('Citizenship', ''),
332
                     MothersName = row.get('MothersName', ''),
333
                     CivilStatus =row.get('CivilStatus', ''),
334
                     Anonymous = self.to_bool(row.get('Anonymous','False'))
335
                     )
336
            self.fill_contactfields(row, obj)
337
            self.fill_addressfields(row, obj)
338
            if 'Photo' in row and row['Photo']:
339
                try:
340
                    path = resource_filename(self.dataset_project,
341
                                             "setupdata/%s/%s" \
342
                                             % (self.dataset_name, row['Photo']))
343
                    file_data = open(path, "rb").read() if os.path.isfile(path) \
344
                        else open(path+'.jpg', "rb").read()
345
                    obj.setPhoto(file_data)
346
                except:
347
                    logger.error("Unable to load Photo %s"%row['Photo'])
348
349
            if 'Feature' in row and row['Feature']:
350
                try:
351
                    path = resource_filename(self.dataset_project,
352
                                             "setupdata/%s/%s" \
353
                                             % (self.dataset_name, row['Feature']))
354
                    file_data = open(path, "rb").read() if os.path.isfile(path) \
355
                        else open(path+'.pdf', "rb").read()
356
                    obj.setFeature(file_data)
357
                except:
358
                    logger.error("Unable to load Feature %s"%row['Feature'])
359
360
            obj.unmarkCreationFlag()
361
            transaction.savepoint(optimistic=True)
362
            if row.get('PatientID'):
363
                # To maintain the patient spreadsheet's IDs, we cannot do a 'renameaftercreation()'
364
                if obj.getPatientID() != row.get('PatientID'):
365
                    transaction.savepoint(optimistic=True)
366
                    obj.aq_inner.aq_parent.manage_renameObject(obj.id, row.get('PatientID'))
367
            else:
368
                renameAfterCreation(obj)
369
370
371
class Analysis_Specifications(WorksheetImporter):
372
373
    def Import(self):
374
        print "EOOO"
375
        s_t = ''
376
        c_t = 'lab'
377
        bucket = {}
378
        pc = getToolByName(self.context, 'portal_catalog')
379
        bsc = getToolByName(self.context, 'bika_setup_catalog')
380
        # collect up all values into the bucket
381
        for row in self.get_rows(3):
382
            c_t = row['Client_title'] if row['Client_title'] else 'lab'
383
            if c_t not in bucket:
384
                bucket[c_t] = {}
385
            s_t = row['SampleType_title'] if row['SampleType_title'] else s_t
386
            if s_t not in bucket[c_t]:
387
                bucket[c_t][s_t] = []
388
            service = bsc(portal_type='AnalysisService', title=row['service'])
389
            if not service:
390
                service = bsc(portal_type='AnalysisService',
391
                              getKeyword=row['service'])
392
            try:
393
                service = service[0].getObject()
394
                bucket[c_t][s_t].append({
395
                'keyword': service.getKeyword(),
396
                'min': row.get('min','0'),
397
                'max': row.get('max','0'),
398
                'minpanic': row.get('minpanic','0'),
399
                'maxpanic': row.get('maxpanic','0'),
400
                'error': row.get('error','0'),
401
                })
402
            except IndexError:
403
                warning = "Error with service name %s on sheet %s. Service not uploaded."
404
                logger.warning(warning, row.get('service', ''), self.sheetname)
405
        # write objects.
406
        for c_t in bucket:
407
            if c_t == 'lab':
408
                folder = self.context.bika_setup.bika_analysisspecs
409
            else:
410
                folder = pc(portal_type='Client', title=c_t)
411
                if (not folder or len(folder) != 1):
412
                    logger.warn("Client %s not found. Omiting client specifications." % c_t)
413
                    continue
414
                folder = folder[0].getObject()
415
            for s_t in bucket[c_t]:
416
                resultsrange = bucket[c_t][s_t]
417
                sampletype = bsc(portal_type='SampleType', title=s_t)[0]
418
                _id = folder.invokeFactory('AnalysisSpec', id=tmpID())
419
                obj = folder[_id]
420
                obj.edit(
421
                    title=sampletype.Title,
422
                    ResultsRange=resultsrange)
423
                obj.setSampleType(sampletype.UID)
424
                obj.unmarkCreationFlag()
425
                renameAfterCreation(obj)
426
427
class Insurance_Companies(WorksheetImporter):
428
429
    def Import(self):
430
        folder = self.context.bika_setup.bika_insurancecompanies
431
        for row in self.get_rows(3):
432
            obj = _createObjectByType("InsuranceCompany", folder, tmpID())
433
            if row.get('Name', None):
434
                obj.edit(
435
                    Name=row.get('Name', ''),
436
                    EmailAddress=row.get('EmailAddress', ''),
437
                    Phone=row.get('Phone', ''),
438
                    Fax=row.get('Fax', ''),
439
                    TaxNumber=row.get('TaxNumber', ''),
440
                    AccountType=row.get('AccountType', {}),
441
                    AccountName=row.get('AccountName', {}),
442
                    AccountNumber=row.get('AccountNumber', ''),
443
                    BankName=row.get('BankName', ''),
444
                    BankBranch=row.get('BankBranch', ''),
445
                )
446
                self.fill_contactfields(row, obj)
447
                self.fill_addressfields(row, obj)
448
                obj.unmarkCreationFlag()
449
                renameAfterCreation(obj)
450