Total Complexity | 66 |
Total Lines | 408 |
Duplicated Lines | 8.09 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.health.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.HEALTH. |
||
4 | # |
||
5 | # SENAITE.HEALTH is free software: you can redistribute it and/or modify it |
||
6 | # under the terms of the GNU General Public License as published by the Free |
||
7 | # Software Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2019 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | import os.path |
||
22 | import transaction |
||
23 | from Products.CMFCore.utils import getToolByName |
||
24 | from Products.CMFPlone.utils import _createObjectByType |
||
25 | from pkg_resources import resource_filename |
||
26 | from zope.interface import implements |
||
27 | |||
28 | from bika.health import logger |
||
29 | from bika.lims.exportimport.dataimport import SetupDataSetList as SDL |
||
30 | from bika.lims.exportimport.setupdata import WorksheetImporter |
||
31 | from bika.lims.idserver import renameAfterCreation |
||
32 | from bika.lims.interfaces import ISetupDataSetList |
||
33 | from bika.lims.utils import tmpID |
||
34 | |||
35 | |||
36 | class SetupDataSetList(SDL): |
||
37 | |||
38 | implements(ISetupDataSetList) |
||
39 | |||
40 | def __call__(self): |
||
41 | return SDL.__call__(self, projectname="bika.health") |
||
42 | |||
43 | |||
44 | class Symptoms(WorksheetImporter): |
||
45 | |||
46 | def Import(self): |
||
47 | folder = self.context.bika_setup.bika_symptoms |
||
48 | rows = self.get_rows(3) |
||
49 | for row in rows: |
||
50 | _id = folder.invokeFactory('Symptom', id=tmpID()) |
||
51 | obj = folder[_id] |
||
52 | if row['Title']: |
||
53 | obj.edit(Code=row.get('Code', ''), |
||
54 | title=row['Title'], |
||
55 | description=row.get('Description', ''), |
||
56 | Gender=row.get('Gender', 'dk'), |
||
57 | SeverityAllowed=self.to_bool(row.get('SeverityAllowed', 1))) |
||
58 | obj.unmarkCreationFlag() |
||
59 | renameAfterCreation(obj) |
||
60 | |||
61 | |||
62 | class Treatments(WorksheetImporter): |
||
63 | |||
64 | def Import(self): |
||
65 | folder = self.context.bika_setup.bika_treatments |
||
66 | for row in self.get_rows(3): |
||
67 | obj = _createObjectByType('Treatment', folder, tmpID()) |
||
68 | if row['title']: |
||
69 | obj.edit(title=row['title'], |
||
70 | description=row.get('description', ''), |
||
71 | Type=row.get('Type', 'active'), |
||
72 | Procedure=row.get('Procedure', ''), |
||
73 | Care=row.get('Care', ''), |
||
74 | SubjectiveClinicalFindings=row.get('SubjectiveClinicalFindings', ''), |
||
75 | ObjectiveClinicalFindings=row.get('ObjectiveClinicalFindings', ''),) |
||
76 | obj.unmarkCreationFlag() |
||
77 | renameAfterCreation(obj) |
||
78 | |||
79 | |||
80 | class Aetiologic_Agents(WorksheetImporter): |
||
81 | |||
82 | def get_subtypes(self): |
||
83 | sheetname = 'Aetiologic Agents Subtypes' |
||
84 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
85 | if not worksheet: |
||
86 | return |
||
87 | subtypes = {} |
||
88 | rows = self.get_rows(3, worksheet=worksheet) |
||
89 | for row in rows: |
||
90 | ae_title = row['aetiologicagent_title'] |
||
91 | if ae_title not in subtypes.keys(): |
||
92 | subtypes[ae_title] = [] |
||
93 | subtypes[ae_title].append({'Subtype': row.get('Subtype', ''), |
||
94 | 'SubtypeRemarks': row.get('SubtypeRemarks', '')}) |
||
95 | return subtypes |
||
96 | |||
97 | def Import(self): |
||
98 | subtypes = self.get_subtypes() |
||
99 | folder = self.context.bika_setup.bika_aetiologicagents |
||
100 | for row in self.get_rows(3): |
||
101 | obj = _createObjectByType('AetiologicAgent', folder, tmpID()) |
||
102 | if not row['title']: |
||
103 | continue |
||
104 | ae_title = row['title'] |
||
105 | ae_subtypes = subtypes.get(ae_title, []) |
||
106 | obj.edit(title=row['title'], |
||
107 | description=row.get('description', ''), |
||
108 | AetiologicAgentSubtypes=ae_subtypes) |
||
109 | obj.unmarkCreationFlag() |
||
110 | renameAfterCreation(obj) |
||
111 | |||
112 | |||
113 | class Case_Syndromic_Classifications(WorksheetImporter): |
||
114 | |||
115 | def Import(self): |
||
116 | folder = self.context.bika_setup.bika_casesyndromicclassifications |
||
117 | for row in self.get_rows(3): |
||
118 | obj = _createObjectByType('CaseSyndromicClassification', folder, tmpID()) |
||
119 | if row['title']: |
||
120 | obj.edit(title=row['title'], |
||
121 | description=row.get('description', ''),) |
||
122 | obj.unmarkCreationFlag() |
||
123 | renameAfterCreation(obj) |
||
124 | |||
125 | |||
126 | class Drugs(WorksheetImporter): |
||
127 | |||
128 | def Import(self): |
||
129 | folder = self.context.bika_setup.bika_drugs |
||
130 | for row in self.get_rows(3): |
||
131 | obj = _createObjectByType('Drug', folder, tmpID()) |
||
132 | if row['title']: |
||
133 | obj.edit(title=row['title'], |
||
134 | description=row.get('description', ''), |
||
135 | Category=row.get('Category', ''), |
||
136 | Indications=row.get('Indications', ''), |
||
137 | Posology=row.get('Posology', ''), |
||
138 | SideEffects=row.get('SideEffects', ''), |
||
139 | Preservation=row.get('Preservation', ''),) |
||
140 | obj.unmarkCreationFlag() |
||
141 | renameAfterCreation(obj) |
||
142 | |||
143 | |||
144 | class Drug_Prohibitions(WorksheetImporter): |
||
145 | |||
146 | def Import(self): |
||
147 | folder = self.context.bika_setup.bika_drugprohibitions |
||
148 | for row in self.get_rows(3): |
||
149 | obj = _createObjectByType('DrugProhibition', folder, tmpID()) |
||
150 | if row['title']: |
||
151 | obj.edit(title=row['title'], |
||
152 | description=row.get('description', ''),) |
||
153 | obj.unmarkCreationFlag() |
||
154 | renameAfterCreation(obj) |
||
155 | |||
156 | |||
157 | class Identifier_Types(WorksheetImporter): |
||
158 | |||
159 | def Import(self): |
||
160 | folder = self.context.bika_setup.bika_identifiertypes |
||
161 | for row in self.get_rows(3): |
||
162 | obj = _createObjectByType('IdentifierType', folder, tmpID()) |
||
163 | if row['title']: |
||
164 | obj.edit(title=row['title'], |
||
165 | description=row.get('description', ''),) |
||
166 | obj.unmarkCreationFlag() |
||
167 | renameAfterCreation(obj) |
||
168 | |||
169 | |||
170 | class Immunizations(WorksheetImporter): |
||
171 | |||
172 | def Import(self): |
||
173 | folder = self.context.bika_setup.bika_immunizations |
||
174 | for row in self.get_rows(3): |
||
175 | obj = _createObjectByType('Immunization', folder, tmpID()) |
||
176 | if row['title']: |
||
177 | obj.edit(title=row['title'], |
||
178 | description=row.get('description', ''), |
||
179 | Form=row.get('Form', 'active'), |
||
180 | RelevantFacts=row.get('RelevantFacts', ''), |
||
181 | GeographicalDistribution=row.get('GeographicalDistribution', ''), |
||
182 | Transmission=row.get('Transmission', ''), |
||
183 | Symptoms=row.get('Symptoms', ''), |
||
184 | Risk=row.get('Risk', ''), |
||
185 | Treatment=row.get('Treatment', ''), |
||
186 | Prevention=row.get('Prevention', ''),) |
||
187 | obj.unmarkCreationFlag() |
||
188 | renameAfterCreation(obj) |
||
189 | |||
190 | |||
191 | class Vaccination_Centers(WorksheetImporter): |
||
192 | |||
193 | def Import(self): |
||
194 | folder = self.context.bika_setup.bika_vaccinationcenters |
||
195 | for row in self.get_rows(3): |
||
196 | obj = _createObjectByType("VaccinationCenter", folder, tmpID()) |
||
197 | if row['Name']: |
||
198 | obj.edit( |
||
199 | Name=row.get('Name', ''), |
||
200 | TaxNumber=row.get('TaxNumber', ''), |
||
201 | AccountType=row.get('AccountType', {}), |
||
202 | AccountName=row.get('AccountName', {}), |
||
203 | AccountNumber=row.get('AccountNumber', ''), |
||
204 | BankName=row.get('BankName', ''), |
||
205 | BankBranch=row.get('BankBranch', ''), |
||
206 | ) |
||
207 | self.fill_contactfields(row, obj) |
||
208 | self.fill_addressfields(row, obj) |
||
209 | obj.unmarkCreationFlag() |
||
210 | renameAfterCreation(obj) |
||
211 | |||
212 | |||
213 | class Case_Outcomes(WorksheetImporter): |
||
214 | |||
215 | View Code Duplication | def Import(self): |
|
|
|||
216 | folder = self.context.bika_setup.bika_caseoutcomes |
||
217 | rows = self.get_rows(3) |
||
218 | for row in rows: |
||
219 | if row['title']: |
||
220 | _id = folder.invokeFactory('CaseOutcome', id=tmpID()) |
||
221 | obj = folder[_id] |
||
222 | obj.edit(title=row['title'], |
||
223 | description=row.get('description', '')) |
||
224 | obj.unmarkCreationFlag() |
||
225 | renameAfterCreation(obj) |
||
226 | |||
227 | |||
228 | class Case_Statuses(WorksheetImporter): |
||
229 | |||
230 | View Code Duplication | def Import(self): |
|
231 | folder = self.context.bika_setup.bika_casestatuses |
||
232 | rows = self.get_rows(3) |
||
233 | for row in rows: |
||
234 | if row['title']: |
||
235 | _id = folder.invokeFactory('CaseStatus', id=tmpID()) |
||
236 | obj = folder[_id] |
||
237 | obj.edit(title=row['title'], |
||
238 | description=row.get('description', '')) |
||
239 | obj.unmarkCreationFlag() |
||
240 | renameAfterCreation(obj) |
||
241 | |||
242 | |||
243 | class Diseases(WorksheetImporter): |
||
244 | |||
245 | def Import(self): |
||
246 | folder = self.context.bika_setup.bika_diseases |
||
247 | rows = self.get_rows(3) |
||
248 | for row in rows: |
||
249 | _id = folder.invokeFactory('Disease', id=tmpID()) |
||
250 | obj = folder[_id] |
||
251 | if row['Title']: |
||
252 | obj.edit(ICDCode=row.get('ICDCode', ''), |
||
253 | title=row['Title'], |
||
254 | description=row.get('Description', '')) |
||
255 | obj.unmarkCreationFlag() |
||
256 | renameAfterCreation(obj) |
||
257 | |||
258 | |||
259 | class Doctors(WorksheetImporter): |
||
260 | |||
261 | def Import(self): |
||
262 | folder = self.context.doctors |
||
263 | rows = self.get_rows(3) |
||
264 | for row in rows: |
||
265 | if not row['Firstname']: |
||
266 | continue |
||
267 | |||
268 | _id = folder.invokeFactory('Doctor', id=tmpID()) |
||
269 | obj = folder[_id] |
||
270 | Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip() |
||
271 | obj.edit(title=Fullname, |
||
272 | Salutation = row.get('Salutation', ''), |
||
273 | Firstname = row.get('Firstname', ''), |
||
274 | Surname = row.get('Surname', ''), |
||
275 | JobTitle = row.get('JobTitle', ''), |
||
276 | Department = row.get('Department', ''), |
||
277 | DoctorID = row.get('DoctorID', ''), |
||
278 | PublicationPreference = row.get('PublicationPreference','').split(","), |
||
279 | AttachmentsPermitted = self.to_bool(row.get('AttachmentsPermitted','True')) |
||
280 | ) |
||
281 | self.fill_contactfields(row, obj) |
||
282 | self.fill_addressfields(row, obj) |
||
283 | obj.unmarkCreationFlag() |
||
284 | renameAfterCreation(obj) |
||
285 | |||
286 | |||
287 | class Ethnicities(WorksheetImporter): |
||
288 | |||
289 | View Code Duplication | def Import(self): |
|
290 | folder = self.context.bika_setup.bika_ethnicities |
||
291 | rows = self.get_rows(3) |
||
292 | for row in rows: |
||
293 | _id = folder.invokeFactory('Ethnicity', id=tmpID()) |
||
294 | obj = folder[_id] |
||
295 | if row.get('Title', None): |
||
296 | obj.edit(title=row['Title'], |
||
297 | description=row.get('Description', '')) |
||
298 | obj.unmarkCreationFlag() |
||
299 | renameAfterCreation(obj) |
||
300 | |||
301 | |||
302 | class Patients(WorksheetImporter): |
||
303 | |||
304 | def Import(self): |
||
305 | folder = self.context.patients |
||
306 | rows = self.get_rows(3) |
||
307 | for row in rows: |
||
308 | if not row['Firstname'] or not row['PrimaryReferrer']: |
||
309 | continue |
||
310 | pc = getToolByName(self.context, 'portal_catalog') |
||
311 | client = pc(portal_type='Client', Title=row['PrimaryReferrer']) |
||
312 | if len(client) == 0: |
||
313 | error = "Primary referrer invalid: '%s'. Patient '%s %s' will not be uploaded" |
||
314 | logger.error(error, row['PrimaryReferrer'], row['Firstname'], row.get('Surname', '')) |
||
315 | continue |
||
316 | |||
317 | client = client[0].getObject() |
||
318 | |||
319 | # Getting an existing ethnicity |
||
320 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
321 | ethnicity = bsc(portal_type='Ethnicity', Title=row.get('Ethnicity', '')) |
||
322 | if len(ethnicity) == 0: |
||
323 | raise IndexError("Invalid ethnicity: '%s'" % row['Ethnicity']) |
||
324 | ethnicity = ethnicity[0].getObject() |
||
325 | |||
326 | _id = folder.invokeFactory('Patient', id=tmpID()) |
||
327 | obj = folder[_id] |
||
328 | obj.unmarkCreationFlag() |
||
329 | renameAfterCreation(obj) |
||
330 | Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip() |
||
331 | obj.edit(PatientID=row.get('PatientID'), |
||
332 | title=Fullname, |
||
333 | ClientPatientID = row.get('ClientPatientID', ''), |
||
334 | Salutation = row.get('Salutation', ''), |
||
335 | Firstname = row.get('Firstname', ''), |
||
336 | Surname = row.get('Surname', ''), |
||
337 | PrimaryReferrer = client.UID(), |
||
338 | Gender = row.get('Gender', 'dk'), |
||
339 | Age = row.get('Age', ''), |
||
340 | BirthDate = row.get('BirthDate', ''), |
||
341 | BirthDateEstimated =self.to_bool(row.get('BirthDateEstimated','False')), |
||
342 | BirthPlace = row.get('BirthPlace', ''), |
||
343 | # TODO Ethnicity_Obj -> Ethnicity on health v319 |
||
344 | Ethnicity_Obj=ethnicity.UID(), |
||
345 | Citizenship =row.get('Citizenship', ''), |
||
346 | MothersName = row.get('MothersName', ''), |
||
347 | CivilStatus =row.get('CivilStatus', ''), |
||
348 | Anonymous = self.to_bool(row.get('Anonymous','False')) |
||
349 | ) |
||
350 | self.fill_contactfields(row, obj) |
||
351 | self.fill_addressfields(row, obj) |
||
352 | if 'Photo' in row and row['Photo']: |
||
353 | try: |
||
354 | path = resource_filename(self.dataset_project, |
||
355 | "setupdata/%s/%s" \ |
||
356 | % (self.dataset_name, row['Photo'])) |
||
357 | file_data = open(path, "rb").read() if os.path.isfile(path) \ |
||
358 | else open(path+'.jpg', "rb").read() |
||
359 | obj.setPhoto(file_data) |
||
360 | except: |
||
361 | logger.error("Unable to load Photo %s"%row['Photo']) |
||
362 | |||
363 | if 'Feature' in row and row['Feature']: |
||
364 | try: |
||
365 | path = resource_filename(self.dataset_project, |
||
366 | "setupdata/%s/%s" \ |
||
367 | % (self.dataset_name, row['Feature'])) |
||
368 | file_data = open(path, "rb").read() if os.path.isfile(path) \ |
||
369 | else open(path+'.pdf', "rb").read() |
||
370 | obj.setFeature(file_data) |
||
371 | except: |
||
372 | logger.error("Unable to load Feature %s"%row['Feature']) |
||
373 | |||
374 | obj.unmarkCreationFlag() |
||
375 | transaction.savepoint(optimistic=True) |
||
376 | if row.get('PatientID'): |
||
377 | # To maintain the patient spreadsheet's IDs, we cannot do a 'renameaftercreation()' |
||
378 | if obj.getPatientID() != row.get('PatientID'): |
||
379 | transaction.savepoint(optimistic=True) |
||
380 | obj.aq_inner.aq_parent.manage_renameObject(obj.id, row.get('PatientID')) |
||
381 | else: |
||
382 | renameAfterCreation(obj) |
||
383 | |||
384 | |||
385 | class Insurance_Companies(WorksheetImporter): |
||
386 | |||
387 | def Import(self): |
||
388 | folder = self.context.bika_setup.bika_insurancecompanies |
||
389 | for row in self.get_rows(3): |
||
390 | obj = _createObjectByType("InsuranceCompany", folder, tmpID()) |
||
391 | if row.get('Name', None): |
||
392 | obj.edit( |
||
393 | Name=row.get('Name', ''), |
||
394 | EmailAddress=row.get('EmailAddress', ''), |
||
395 | Phone=row.get('Phone', ''), |
||
396 | Fax=row.get('Fax', ''), |
||
397 | TaxNumber=row.get('TaxNumber', ''), |
||
398 | AccountType=row.get('AccountType', {}), |
||
399 | AccountName=row.get('AccountName', {}), |
||
400 | AccountNumber=row.get('AccountNumber', ''), |
||
401 | BankName=row.get('BankName', ''), |
||
402 | BankBranch=row.get('BankBranch', ''), |
||
403 | ) |
||
404 | self.fill_contactfields(row, obj) |
||
405 | self.fill_addressfields(row, obj) |
||
406 | obj.unmarkCreationFlag() |
||
407 | renameAfterCreation(obj) |
||
408 |