bika.health.setupdata.Patients.Import()   F
last analyzed

Complexity

Conditions 16

Size

Total Lines 79
Code Lines 69

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 69
dl 0
loc 79
rs 2.4
c 0
b 0
f 0
cc 16
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like bika.health.setupdata.Patients.Import() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.HEALTH.
4
#
5
# SENAITE.HEALTH is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License as published by the Free
7
# Software Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import os.path
22
import transaction
23
from Products.CMFCore.utils import getToolByName
24
from Products.CMFPlone.utils import _createObjectByType
25
from pkg_resources import resource_filename
26
from zope.interface import implements
27
28
from bika.health import logger
29
from bika.lims.exportimport.dataimport import SetupDataSetList as SDL
30
from bika.lims.exportimport.setupdata import WorksheetImporter
31
from bika.lims.idserver import renameAfterCreation
32
from bika.lims.interfaces import ISetupDataSetList
33
from bika.lims.utils import tmpID
34
35
36
class SetupDataSetList(SDL):
37
38
    implements(ISetupDataSetList)
39
40
    def __call__(self):
41
        return SDL.__call__(self, projectname="bika.health")
42
43
44
class Symptoms(WorksheetImporter):
45
46
    def Import(self):
47
        folder = self.context.bika_setup.bika_symptoms
48
        rows = self.get_rows(3)
49
        for row in rows:
50
            _id = folder.invokeFactory('Symptom', id=tmpID())
51
            obj = folder[_id]
52
            if row['Title']:
53
                obj.edit(Code=row.get('Code', ''),
54
                         title=row['Title'],
55
                         description=row.get('Description', ''),
56
                         Gender=row.get('Gender', 'dk'),
57
                         SeverityAllowed=self.to_bool(row.get('SeverityAllowed', 1)))
58
                obj.unmarkCreationFlag()
59
                renameAfterCreation(obj)
60
61
62
class Treatments(WorksheetImporter):
63
64
    def Import(self):
65
        folder = self.context.bika_setup.bika_treatments
66
        for row in self.get_rows(3):
67
            obj = _createObjectByType('Treatment', folder, tmpID())
68
            if row['title']:
69
                obj.edit(title=row['title'],
70
                         description=row.get('description', ''),
71
                         Type=row.get('Type', 'active'),
72
                         Procedure=row.get('Procedure', ''),
73
                         Care=row.get('Care', ''),
74
                         SubjectiveClinicalFindings=row.get('SubjectiveClinicalFindings', ''),
75
                         ObjectiveClinicalFindings=row.get('ObjectiveClinicalFindings', ''),)
76
                obj.unmarkCreationFlag()
77
                renameAfterCreation(obj)
78
79
80
class Aetiologic_Agents(WorksheetImporter):
81
82
    def get_subtypes(self):
83
        sheetname = 'Aetiologic Agents Subtypes'
84
        worksheet = self.workbook.get_sheet_by_name(sheetname)
85
        if not worksheet:
86
            return
87
        subtypes = {}
88
        rows = self.get_rows(3, worksheet=worksheet)
89
        for row in rows:
90
            ae_title = row['aetiologicagent_title']
91
            if ae_title not in subtypes.keys():
92
                subtypes[ae_title] = []
93
            subtypes[ae_title].append({'Subtype': row.get('Subtype', ''),
94
                                       'SubtypeRemarks': row.get('SubtypeRemarks', '')})
95
        return subtypes
96
97
    def Import(self):
98
        subtypes = self.get_subtypes()
99
        folder = self.context.bika_setup.bika_aetiologicagents
100
        for row in self.get_rows(3):
101
            obj = _createObjectByType('AetiologicAgent', folder, tmpID())
102
            if not row['title']:
103
                continue
104
            ae_title = row['title']
105
            ae_subtypes = subtypes.get(ae_title, [])
106
            obj.edit(title=row['title'],
107
                     description=row.get('description', ''),
108
                     AetiologicAgentSubtypes=ae_subtypes)
109
            obj.unmarkCreationFlag()
110
            renameAfterCreation(obj)
111
112
113
class Case_Syndromic_Classifications(WorksheetImporter):
114
115
    def Import(self):
116
        folder = self.context.bika_setup.bika_casesyndromicclassifications
117
        for row in self.get_rows(3):
118
            obj = _createObjectByType('CaseSyndromicClassification', folder, tmpID())
119
            if row['title']:
120
                obj.edit(title=row['title'],
121
                         description=row.get('description', ''),)
122
                obj.unmarkCreationFlag()
123
                renameAfterCreation(obj)
124
125
126
class Drugs(WorksheetImporter):
127
128
    def Import(self):
129
        folder = self.context.bika_setup.bika_drugs
130
        for row in self.get_rows(3):
131
            obj = _createObjectByType('Drug', folder, tmpID())
132
            if row['title']:
133
                obj.edit(title=row['title'],
134
                         description=row.get('description', ''),
135
                         Category=row.get('Category', ''),
136
                         Indications=row.get('Indications', ''),
137
                         Posology=row.get('Posology', ''),
138
                         SideEffects=row.get('SideEffects', ''),
139
                         Preservation=row.get('Preservation', ''),)
140
                obj.unmarkCreationFlag()
141
                renameAfterCreation(obj)
142
143
144
class Drug_Prohibitions(WorksheetImporter):
145
146
    def Import(self):
147
        folder = self.context.bika_setup.bika_drugprohibitions
148
        for row in self.get_rows(3):
149
            obj = _createObjectByType('DrugProhibition', folder, tmpID())
150
            if row['title']:
151
                obj.edit(title=row['title'],
152
                         description=row.get('description', ''),)
153
                obj.unmarkCreationFlag()
154
                renameAfterCreation(obj)
155
156
157
class Identifier_Types(WorksheetImporter):
158
159
    def Import(self):
160
        folder = self.context.bika_setup.bika_identifiertypes
161
        for row in self.get_rows(3):
162
            obj = _createObjectByType('IdentifierType', folder, tmpID())
163
            if row['title']:
164
                obj.edit(title=row['title'],
165
                         description=row.get('description', ''),)
166
                obj.unmarkCreationFlag()
167
                renameAfterCreation(obj)
168
169
170
class Immunizations(WorksheetImporter):
171
172
    def Import(self):
173
        folder = self.context.bika_setup.bika_immunizations
174
        for row in self.get_rows(3):
175
            obj = _createObjectByType('Immunization', folder, tmpID())
176
            if row['title']:
177
                obj.edit(title=row['title'],
178
                         description=row.get('description', ''),
179
                         Form=row.get('Form', 'active'),
180
                         RelevantFacts=row.get('RelevantFacts', ''),
181
                         GeographicalDistribution=row.get('GeographicalDistribution', ''),
182
                         Transmission=row.get('Transmission', ''),
183
                         Symptoms=row.get('Symptoms', ''),
184
                         Risk=row.get('Risk', ''),
185
                         Treatment=row.get('Treatment', ''),
186
                         Prevention=row.get('Prevention', ''),)
187
                obj.unmarkCreationFlag()
188
                renameAfterCreation(obj)
189
190
191
class Vaccination_Centers(WorksheetImporter):
192
193
    def Import(self):
194
        folder = self.context.bika_setup.bika_vaccinationcenters
195
        for row in self.get_rows(3):
196
            obj = _createObjectByType("VaccinationCenter", folder, tmpID())
197
            if row['Name']:
198
                obj.edit(
199
                    Name=row.get('Name', ''),
200
                    TaxNumber=row.get('TaxNumber', ''),
201
                    AccountType=row.get('AccountType', {}),
202
                    AccountName=row.get('AccountName', {}),
203
                    AccountNumber=row.get('AccountNumber', ''),
204
                    BankName=row.get('BankName', ''),
205
                    BankBranch=row.get('BankBranch', ''),
206
                )
207
                self.fill_contactfields(row, obj)
208
                self.fill_addressfields(row, obj)
209
                obj.unmarkCreationFlag()
210
                renameAfterCreation(obj)
211
212
213
class Case_Outcomes(WorksheetImporter):
214
215 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
216
        folder = self.context.bika_setup.bika_caseoutcomes
217
        rows = self.get_rows(3)
218
        for row in rows:
219
            if row['title']:
220
                _id = folder.invokeFactory('CaseOutcome', id=tmpID())
221
                obj = folder[_id]
222
                obj.edit(title=row['title'],
223
                         description=row.get('description', ''))
224
                obj.unmarkCreationFlag()
225
                renameAfterCreation(obj)
226
227
228
class Case_Statuses(WorksheetImporter):
229
230 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
231
        folder = self.context.bika_setup.bika_casestatuses
232
        rows = self.get_rows(3)
233
        for row in rows:
234
            if row['title']:
235
                _id = folder.invokeFactory('CaseStatus', id=tmpID())
236
                obj = folder[_id]
237
                obj.edit(title=row['title'],
238
                         description=row.get('description', ''))
239
                obj.unmarkCreationFlag()
240
                renameAfterCreation(obj)
241
242
243
class Diseases(WorksheetImporter):
244
245
    def Import(self):
246
        folder = self.context.bika_setup.bika_diseases
247
        rows = self.get_rows(3)
248
        for row in rows:
249
            _id = folder.invokeFactory('Disease', id=tmpID())
250
            obj = folder[_id]
251
            if row['Title']:
252
                obj.edit(ICDCode=row.get('ICDCode', ''),
253
                         title=row['Title'],
254
                         description=row.get('Description', ''))
255
                obj.unmarkCreationFlag()
256
                renameAfterCreation(obj)
257
258
259
class Doctors(WorksheetImporter):
260
261
    def Import(self):
262
        folder = self.context.doctors
263
        rows = self.get_rows(3)
264
        for row in rows:
265
            if not row['Firstname']:
266
                continue
267
268
            _id = folder.invokeFactory('Doctor', id=tmpID())
269
            obj = folder[_id]
270
            Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip()
271
            obj.edit(title=Fullname,
272
                     Salutation = row.get('Salutation', ''),
273
                     Firstname = row.get('Firstname', ''),
274
                     Surname = row.get('Surname', ''),
275
                     JobTitle = row.get('JobTitle', ''),
276
                     Department = row.get('Department', ''),
277
                     DoctorID = row.get('DoctorID', ''),
278
                     PublicationPreference = row.get('PublicationPreference','').split(","),
279
                     AttachmentsPermitted = self.to_bool(row.get('AttachmentsPermitted','True'))
280
                     )
281
            self.fill_contactfields(row, obj)
282
            self.fill_addressfields(row, obj)
283
            obj.unmarkCreationFlag()
284
            renameAfterCreation(obj)
285
286
287
class Ethnicities(WorksheetImporter):
288
289 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
290
        folder = self.context.bika_setup.bika_ethnicities
291
        rows = self.get_rows(3)
292
        for row in rows:
293
            _id = folder.invokeFactory('Ethnicity', id=tmpID())
294
            obj = folder[_id]
295
            if row.get('Title', None):
296
                obj.edit(title=row['Title'],
297
                         description=row.get('Description', ''))
298
                obj.unmarkCreationFlag()
299
                renameAfterCreation(obj)
300
301
302
class Patients(WorksheetImporter):
303
304
    def Import(self):
305
        folder = self.context.patients
306
        rows = self.get_rows(3)
307
        for row in rows:
308
            if not row['Firstname'] or not row['PrimaryReferrer']:
309
                continue
310
            pc = getToolByName(self.context, 'portal_catalog')
311
            client = pc(portal_type='Client', Title=row['PrimaryReferrer'])
312
            if len(client) == 0:
313
                error = "Primary referrer invalid: '%s'. Patient '%s %s' will not be uploaded"
314
                logger.error(error, row['PrimaryReferrer'], row['Firstname'], row.get('Surname', ''))
315
                continue
316
317
            client = client[0].getObject()
318
319
            # Getting an existing ethnicity
320
            bsc = getToolByName(self.context, 'bika_setup_catalog')
321
            ethnicity = bsc(portal_type='Ethnicity', Title=row.get('Ethnicity', ''))
322
            if len(ethnicity) == 0:
323
                raise IndexError("Invalid ethnicity: '%s'" % row['Ethnicity'])
324
            ethnicity = ethnicity[0].getObject()
325
326
            _id = folder.invokeFactory('Patient', id=tmpID())
327
            obj = folder[_id]
328
            obj.unmarkCreationFlag()
329
            renameAfterCreation(obj)
330
            Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip()
331
            obj.edit(PatientID=row.get('PatientID'),
332
                     title=Fullname,
333
                     ClientPatientID = row.get('ClientPatientID', ''),
334
                     Salutation = row.get('Salutation', ''),
335
                     Firstname = row.get('Firstname', ''),
336
                     Surname = row.get('Surname', ''),
337
                     PrimaryReferrer = client.UID(),
338
                     Gender = row.get('Gender', 'dk'),
339
                     Age = row.get('Age', ''),
340
                     BirthDate = row.get('BirthDate', ''),
341
                     BirthDateEstimated =self.to_bool(row.get('BirthDateEstimated','False')),
342
                     BirthPlace = row.get('BirthPlace', ''),
343
                     # TODO Ethnicity_Obj -> Ethnicity on health v319
344
                     Ethnicity_Obj=ethnicity.UID(),
345
                     Citizenship =row.get('Citizenship', ''),
346
                     MothersName = row.get('MothersName', ''),
347
                     CivilStatus =row.get('CivilStatus', ''),
348
                     Anonymous = self.to_bool(row.get('Anonymous','False'))
349
                     )
350
            self.fill_contactfields(row, obj)
351
            self.fill_addressfields(row, obj)
352
            if 'Photo' in row and row['Photo']:
353
                try:
354
                    path = resource_filename(self.dataset_project,
355
                                             "setupdata/%s/%s" \
356
                                             % (self.dataset_name, row['Photo']))
357
                    file_data = open(path, "rb").read() if os.path.isfile(path) \
358
                        else open(path+'.jpg', "rb").read()
359
                    obj.setPhoto(file_data)
360
                except:
361
                    logger.error("Unable to load Photo %s"%row['Photo'])
362
363
            if 'Feature' in row and row['Feature']:
364
                try:
365
                    path = resource_filename(self.dataset_project,
366
                                             "setupdata/%s/%s" \
367
                                             % (self.dataset_name, row['Feature']))
368
                    file_data = open(path, "rb").read() if os.path.isfile(path) \
369
                        else open(path+'.pdf', "rb").read()
370
                    obj.setFeature(file_data)
371
                except:
372
                    logger.error("Unable to load Feature %s"%row['Feature'])
373
374
            obj.unmarkCreationFlag()
375
            transaction.savepoint(optimistic=True)
376
            if row.get('PatientID'):
377
                # To maintain the patient spreadsheet's IDs, we cannot do a 'renameaftercreation()'
378
                if obj.getPatientID() != row.get('PatientID'):
379
                    transaction.savepoint(optimistic=True)
380
                    obj.aq_inner.aq_parent.manage_renameObject(obj.id, row.get('PatientID'))
381
            else:
382
                renameAfterCreation(obj)
383
384
385
class Insurance_Companies(WorksheetImporter):
386
387
    def Import(self):
388
        folder = self.context.bika_setup.bika_insurancecompanies
389
        for row in self.get_rows(3):
390
            obj = _createObjectByType("InsuranceCompany", folder, tmpID())
391
            if row.get('Name', None):
392
                obj.edit(
393
                    Name=row.get('Name', ''),
394
                    EmailAddress=row.get('EmailAddress', ''),
395
                    Phone=row.get('Phone', ''),
396
                    Fax=row.get('Fax', ''),
397
                    TaxNumber=row.get('TaxNumber', ''),
398
                    AccountType=row.get('AccountType', {}),
399
                    AccountName=row.get('AccountName', {}),
400
                    AccountNumber=row.get('AccountNumber', ''),
401
                    BankName=row.get('BankName', ''),
402
                    BankBranch=row.get('BankBranch', ''),
403
                )
404
                self.fill_contactfields(row, obj)
405
                self.fill_addressfields(row, obj)
406
                obj.unmarkCreationFlag()
407
                renameAfterCreation(obj)
408