| Total Complexity | 66 |
| Total Lines | 408 |
| Duplicated Lines | 8.09 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.health.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # |
||
| 3 | # This file is part of SENAITE.HEALTH. |
||
| 4 | # |
||
| 5 | # SENAITE.HEALTH is free software: you can redistribute it and/or modify it |
||
| 6 | # under the terms of the GNU General Public License as published by the Free |
||
| 7 | # Software Foundation, version 2. |
||
| 8 | # |
||
| 9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
| 10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
| 11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
| 12 | # details. |
||
| 13 | # |
||
| 14 | # You should have received a copy of the GNU General Public License along with |
||
| 15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
| 16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
| 17 | # |
||
| 18 | # Copyright 2018-2019 by it's authors. |
||
| 19 | # Some rights reserved, see README and LICENSE. |
||
| 20 | |||
| 21 | import os.path |
||
| 22 | import transaction |
||
| 23 | from Products.CMFCore.utils import getToolByName |
||
| 24 | from Products.CMFPlone.utils import _createObjectByType |
||
| 25 | from pkg_resources import resource_filename |
||
| 26 | from zope.interface import implements |
||
| 27 | |||
| 28 | from bika.health import logger |
||
| 29 | from bika.lims.exportimport.dataimport import SetupDataSetList as SDL |
||
| 30 | from bika.lims.exportimport.setupdata import WorksheetImporter |
||
| 31 | from bika.lims.idserver import renameAfterCreation |
||
| 32 | from bika.lims.interfaces import ISetupDataSetList |
||
| 33 | from bika.lims.utils import tmpID |
||
| 34 | |||
| 35 | |||
| 36 | class SetupDataSetList(SDL): |
||
| 37 | |||
| 38 | implements(ISetupDataSetList) |
||
| 39 | |||
| 40 | def __call__(self): |
||
| 41 | return SDL.__call__(self, projectname="bika.health") |
||
| 42 | |||
| 43 | |||
| 44 | class Symptoms(WorksheetImporter): |
||
| 45 | |||
| 46 | def Import(self): |
||
| 47 | folder = self.context.bika_setup.bika_symptoms |
||
| 48 | rows = self.get_rows(3) |
||
| 49 | for row in rows: |
||
| 50 | _id = folder.invokeFactory('Symptom', id=tmpID()) |
||
| 51 | obj = folder[_id] |
||
| 52 | if row['Title']: |
||
| 53 | obj.edit(Code=row.get('Code', ''), |
||
| 54 | title=row['Title'], |
||
| 55 | description=row.get('Description', ''), |
||
| 56 | Gender=row.get('Gender', 'dk'), |
||
| 57 | SeverityAllowed=self.to_bool(row.get('SeverityAllowed', 1))) |
||
| 58 | obj.unmarkCreationFlag() |
||
| 59 | renameAfterCreation(obj) |
||
| 60 | |||
| 61 | |||
| 62 | class Treatments(WorksheetImporter): |
||
| 63 | |||
| 64 | def Import(self): |
||
| 65 | folder = self.context.bika_setup.bika_treatments |
||
| 66 | for row in self.get_rows(3): |
||
| 67 | obj = _createObjectByType('Treatment', folder, tmpID()) |
||
| 68 | if row['title']: |
||
| 69 | obj.edit(title=row['title'], |
||
| 70 | description=row.get('description', ''), |
||
| 71 | Type=row.get('Type', 'active'), |
||
| 72 | Procedure=row.get('Procedure', ''), |
||
| 73 | Care=row.get('Care', ''), |
||
| 74 | SubjectiveClinicalFindings=row.get('SubjectiveClinicalFindings', ''), |
||
| 75 | ObjectiveClinicalFindings=row.get('ObjectiveClinicalFindings', ''),) |
||
| 76 | obj.unmarkCreationFlag() |
||
| 77 | renameAfterCreation(obj) |
||
| 78 | |||
| 79 | |||
| 80 | class Aetiologic_Agents(WorksheetImporter): |
||
| 81 | |||
| 82 | def get_subtypes(self): |
||
| 83 | sheetname = 'Aetiologic Agents Subtypes' |
||
| 84 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
| 85 | if not worksheet: |
||
| 86 | return |
||
| 87 | subtypes = {} |
||
| 88 | rows = self.get_rows(3, worksheet=worksheet) |
||
| 89 | for row in rows: |
||
| 90 | ae_title = row['aetiologicagent_title'] |
||
| 91 | if ae_title not in subtypes.keys(): |
||
| 92 | subtypes[ae_title] = [] |
||
| 93 | subtypes[ae_title].append({'Subtype': row.get('Subtype', ''), |
||
| 94 | 'SubtypeRemarks': row.get('SubtypeRemarks', '')}) |
||
| 95 | return subtypes |
||
| 96 | |||
| 97 | def Import(self): |
||
| 98 | subtypes = self.get_subtypes() |
||
| 99 | folder = self.context.bika_setup.bika_aetiologicagents |
||
| 100 | for row in self.get_rows(3): |
||
| 101 | obj = _createObjectByType('AetiologicAgent', folder, tmpID()) |
||
| 102 | if not row['title']: |
||
| 103 | continue |
||
| 104 | ae_title = row['title'] |
||
| 105 | ae_subtypes = subtypes.get(ae_title, []) |
||
| 106 | obj.edit(title=row['title'], |
||
| 107 | description=row.get('description', ''), |
||
| 108 | AetiologicAgentSubtypes=ae_subtypes) |
||
| 109 | obj.unmarkCreationFlag() |
||
| 110 | renameAfterCreation(obj) |
||
| 111 | |||
| 112 | |||
| 113 | class Case_Syndromic_Classifications(WorksheetImporter): |
||
| 114 | |||
| 115 | def Import(self): |
||
| 116 | folder = self.context.bika_setup.bika_casesyndromicclassifications |
||
| 117 | for row in self.get_rows(3): |
||
| 118 | obj = _createObjectByType('CaseSyndromicClassification', folder, tmpID()) |
||
| 119 | if row['title']: |
||
| 120 | obj.edit(title=row['title'], |
||
| 121 | description=row.get('description', ''),) |
||
| 122 | obj.unmarkCreationFlag() |
||
| 123 | renameAfterCreation(obj) |
||
| 124 | |||
| 125 | |||
| 126 | class Drugs(WorksheetImporter): |
||
| 127 | |||
| 128 | def Import(self): |
||
| 129 | folder = self.context.bika_setup.bika_drugs |
||
| 130 | for row in self.get_rows(3): |
||
| 131 | obj = _createObjectByType('Drug', folder, tmpID()) |
||
| 132 | if row['title']: |
||
| 133 | obj.edit(title=row['title'], |
||
| 134 | description=row.get('description', ''), |
||
| 135 | Category=row.get('Category', ''), |
||
| 136 | Indications=row.get('Indications', ''), |
||
| 137 | Posology=row.get('Posology', ''), |
||
| 138 | SideEffects=row.get('SideEffects', ''), |
||
| 139 | Preservation=row.get('Preservation', ''),) |
||
| 140 | obj.unmarkCreationFlag() |
||
| 141 | renameAfterCreation(obj) |
||
| 142 | |||
| 143 | |||
| 144 | class Drug_Prohibitions(WorksheetImporter): |
||
| 145 | |||
| 146 | def Import(self): |
||
| 147 | folder = self.context.bika_setup.bika_drugprohibitions |
||
| 148 | for row in self.get_rows(3): |
||
| 149 | obj = _createObjectByType('DrugProhibition', folder, tmpID()) |
||
| 150 | if row['title']: |
||
| 151 | obj.edit(title=row['title'], |
||
| 152 | description=row.get('description', ''),) |
||
| 153 | obj.unmarkCreationFlag() |
||
| 154 | renameAfterCreation(obj) |
||
| 155 | |||
| 156 | |||
| 157 | class Identifier_Types(WorksheetImporter): |
||
| 158 | |||
| 159 | def Import(self): |
||
| 160 | folder = self.context.bika_setup.bika_identifiertypes |
||
| 161 | for row in self.get_rows(3): |
||
| 162 | obj = _createObjectByType('IdentifierType', folder, tmpID()) |
||
| 163 | if row['title']: |
||
| 164 | obj.edit(title=row['title'], |
||
| 165 | description=row.get('description', ''),) |
||
| 166 | obj.unmarkCreationFlag() |
||
| 167 | renameAfterCreation(obj) |
||
| 168 | |||
| 169 | |||
| 170 | class Immunizations(WorksheetImporter): |
||
| 171 | |||
| 172 | def Import(self): |
||
| 173 | folder = self.context.bika_setup.bika_immunizations |
||
| 174 | for row in self.get_rows(3): |
||
| 175 | obj = _createObjectByType('Immunization', folder, tmpID()) |
||
| 176 | if row['title']: |
||
| 177 | obj.edit(title=row['title'], |
||
| 178 | description=row.get('description', ''), |
||
| 179 | Form=row.get('Form', 'active'), |
||
| 180 | RelevantFacts=row.get('RelevantFacts', ''), |
||
| 181 | GeographicalDistribution=row.get('GeographicalDistribution', ''), |
||
| 182 | Transmission=row.get('Transmission', ''), |
||
| 183 | Symptoms=row.get('Symptoms', ''), |
||
| 184 | Risk=row.get('Risk', ''), |
||
| 185 | Treatment=row.get('Treatment', ''), |
||
| 186 | Prevention=row.get('Prevention', ''),) |
||
| 187 | obj.unmarkCreationFlag() |
||
| 188 | renameAfterCreation(obj) |
||
| 189 | |||
| 190 | |||
| 191 | class Vaccination_Centers(WorksheetImporter): |
||
| 192 | |||
| 193 | def Import(self): |
||
| 194 | folder = self.context.bika_setup.bika_vaccinationcenters |
||
| 195 | for row in self.get_rows(3): |
||
| 196 | obj = _createObjectByType("VaccinationCenter", folder, tmpID()) |
||
| 197 | if row['Name']: |
||
| 198 | obj.edit( |
||
| 199 | Name=row.get('Name', ''), |
||
| 200 | TaxNumber=row.get('TaxNumber', ''), |
||
| 201 | AccountType=row.get('AccountType', {}), |
||
| 202 | AccountName=row.get('AccountName', {}), |
||
| 203 | AccountNumber=row.get('AccountNumber', ''), |
||
| 204 | BankName=row.get('BankName', ''), |
||
| 205 | BankBranch=row.get('BankBranch', ''), |
||
| 206 | ) |
||
| 207 | self.fill_contactfields(row, obj) |
||
| 208 | self.fill_addressfields(row, obj) |
||
| 209 | obj.unmarkCreationFlag() |
||
| 210 | renameAfterCreation(obj) |
||
| 211 | |||
| 212 | |||
| 213 | class Case_Outcomes(WorksheetImporter): |
||
| 214 | |||
| 215 | View Code Duplication | def Import(self): |
|
|
|
|||
| 216 | folder = self.context.bika_setup.bika_caseoutcomes |
||
| 217 | rows = self.get_rows(3) |
||
| 218 | for row in rows: |
||
| 219 | if row['title']: |
||
| 220 | _id = folder.invokeFactory('CaseOutcome', id=tmpID()) |
||
| 221 | obj = folder[_id] |
||
| 222 | obj.edit(title=row['title'], |
||
| 223 | description=row.get('description', '')) |
||
| 224 | obj.unmarkCreationFlag() |
||
| 225 | renameAfterCreation(obj) |
||
| 226 | |||
| 227 | |||
| 228 | class Case_Statuses(WorksheetImporter): |
||
| 229 | |||
| 230 | View Code Duplication | def Import(self): |
|
| 231 | folder = self.context.bika_setup.bika_casestatuses |
||
| 232 | rows = self.get_rows(3) |
||
| 233 | for row in rows: |
||
| 234 | if row['title']: |
||
| 235 | _id = folder.invokeFactory('CaseStatus', id=tmpID()) |
||
| 236 | obj = folder[_id] |
||
| 237 | obj.edit(title=row['title'], |
||
| 238 | description=row.get('description', '')) |
||
| 239 | obj.unmarkCreationFlag() |
||
| 240 | renameAfterCreation(obj) |
||
| 241 | |||
| 242 | |||
| 243 | class Diseases(WorksheetImporter): |
||
| 244 | |||
| 245 | def Import(self): |
||
| 246 | folder = self.context.bika_setup.bika_diseases |
||
| 247 | rows = self.get_rows(3) |
||
| 248 | for row in rows: |
||
| 249 | _id = folder.invokeFactory('Disease', id=tmpID()) |
||
| 250 | obj = folder[_id] |
||
| 251 | if row['Title']: |
||
| 252 | obj.edit(ICDCode=row.get('ICDCode', ''), |
||
| 253 | title=row['Title'], |
||
| 254 | description=row.get('Description', '')) |
||
| 255 | obj.unmarkCreationFlag() |
||
| 256 | renameAfterCreation(obj) |
||
| 257 | |||
| 258 | |||
| 259 | class Doctors(WorksheetImporter): |
||
| 260 | |||
| 261 | def Import(self): |
||
| 262 | folder = self.context.doctors |
||
| 263 | rows = self.get_rows(3) |
||
| 264 | for row in rows: |
||
| 265 | if not row['Firstname']: |
||
| 266 | continue |
||
| 267 | |||
| 268 | _id = folder.invokeFactory('Doctor', id=tmpID()) |
||
| 269 | obj = folder[_id] |
||
| 270 | Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip() |
||
| 271 | obj.edit(title=Fullname, |
||
| 272 | Salutation = row.get('Salutation', ''), |
||
| 273 | Firstname = row.get('Firstname', ''), |
||
| 274 | Surname = row.get('Surname', ''), |
||
| 275 | JobTitle = row.get('JobTitle', ''), |
||
| 276 | Department = row.get('Department', ''), |
||
| 277 | DoctorID = row.get('DoctorID', ''), |
||
| 278 | PublicationPreference = row.get('PublicationPreference','').split(","), |
||
| 279 | AttachmentsPermitted = self.to_bool(row.get('AttachmentsPermitted','True')) |
||
| 280 | ) |
||
| 281 | self.fill_contactfields(row, obj) |
||
| 282 | self.fill_addressfields(row, obj) |
||
| 283 | obj.unmarkCreationFlag() |
||
| 284 | renameAfterCreation(obj) |
||
| 285 | |||
| 286 | |||
| 287 | class Ethnicities(WorksheetImporter): |
||
| 288 | |||
| 289 | View Code Duplication | def Import(self): |
|
| 290 | folder = self.context.bika_setup.bika_ethnicities |
||
| 291 | rows = self.get_rows(3) |
||
| 292 | for row in rows: |
||
| 293 | _id = folder.invokeFactory('Ethnicity', id=tmpID()) |
||
| 294 | obj = folder[_id] |
||
| 295 | if row.get('Title', None): |
||
| 296 | obj.edit(title=row['Title'], |
||
| 297 | description=row.get('Description', '')) |
||
| 298 | obj.unmarkCreationFlag() |
||
| 299 | renameAfterCreation(obj) |
||
| 300 | |||
| 301 | |||
| 302 | class Patients(WorksheetImporter): |
||
| 303 | |||
| 304 | def Import(self): |
||
| 305 | folder = self.context.patients |
||
| 306 | rows = self.get_rows(3) |
||
| 307 | for row in rows: |
||
| 308 | if not row['Firstname'] or not row['PrimaryReferrer']: |
||
| 309 | continue |
||
| 310 | pc = getToolByName(self.context, 'portal_catalog') |
||
| 311 | client = pc(portal_type='Client', Title=row['PrimaryReferrer']) |
||
| 312 | if len(client) == 0: |
||
| 313 | error = "Primary referrer invalid: '%s'. Patient '%s %s' will not be uploaded" |
||
| 314 | logger.error(error, row['PrimaryReferrer'], row['Firstname'], row.get('Surname', '')) |
||
| 315 | continue |
||
| 316 | |||
| 317 | client = client[0].getObject() |
||
| 318 | |||
| 319 | # Getting an existing ethnicity |
||
| 320 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
| 321 | ethnicity = bsc(portal_type='Ethnicity', Title=row.get('Ethnicity', '')) |
||
| 322 | if len(ethnicity) == 0: |
||
| 323 | raise IndexError("Invalid ethnicity: '%s'" % row['Ethnicity']) |
||
| 324 | ethnicity = ethnicity[0].getObject() |
||
| 325 | |||
| 326 | _id = folder.invokeFactory('Patient', id=tmpID()) |
||
| 327 | obj = folder[_id] |
||
| 328 | obj.unmarkCreationFlag() |
||
| 329 | renameAfterCreation(obj) |
||
| 330 | Fullname = (row['Firstname'] + " " + row.get('Surname', '')).strip() |
||
| 331 | obj.edit(PatientID=row.get('PatientID'), |
||
| 332 | title=Fullname, |
||
| 333 | ClientPatientID = row.get('ClientPatientID', ''), |
||
| 334 | Salutation = row.get('Salutation', ''), |
||
| 335 | Firstname = row.get('Firstname', ''), |
||
| 336 | Surname = row.get('Surname', ''), |
||
| 337 | PrimaryReferrer = client.UID(), |
||
| 338 | Gender = row.get('Gender', 'dk'), |
||
| 339 | Age = row.get('Age', ''), |
||
| 340 | BirthDate = row.get('BirthDate', ''), |
||
| 341 | BirthDateEstimated =self.to_bool(row.get('BirthDateEstimated','False')), |
||
| 342 | BirthPlace = row.get('BirthPlace', ''), |
||
| 343 | # TODO Ethnicity_Obj -> Ethnicity on health v319 |
||
| 344 | Ethnicity_Obj=ethnicity.UID(), |
||
| 345 | Citizenship =row.get('Citizenship', ''), |
||
| 346 | MothersName = row.get('MothersName', ''), |
||
| 347 | CivilStatus =row.get('CivilStatus', ''), |
||
| 348 | Anonymous = self.to_bool(row.get('Anonymous','False')) |
||
| 349 | ) |
||
| 350 | self.fill_contactfields(row, obj) |
||
| 351 | self.fill_addressfields(row, obj) |
||
| 352 | if 'Photo' in row and row['Photo']: |
||
| 353 | try: |
||
| 354 | path = resource_filename(self.dataset_project, |
||
| 355 | "setupdata/%s/%s" \ |
||
| 356 | % (self.dataset_name, row['Photo'])) |
||
| 357 | file_data = open(path, "rb").read() if os.path.isfile(path) \ |
||
| 358 | else open(path+'.jpg', "rb").read() |
||
| 359 | obj.setPhoto(file_data) |
||
| 360 | except: |
||
| 361 | logger.error("Unable to load Photo %s"%row['Photo']) |
||
| 362 | |||
| 363 | if 'Feature' in row and row['Feature']: |
||
| 364 | try: |
||
| 365 | path = resource_filename(self.dataset_project, |
||
| 366 | "setupdata/%s/%s" \ |
||
| 367 | % (self.dataset_name, row['Feature'])) |
||
| 368 | file_data = open(path, "rb").read() if os.path.isfile(path) \ |
||
| 369 | else open(path+'.pdf', "rb").read() |
||
| 370 | obj.setFeature(file_data) |
||
| 371 | except: |
||
| 372 | logger.error("Unable to load Feature %s"%row['Feature']) |
||
| 373 | |||
| 374 | obj.unmarkCreationFlag() |
||
| 375 | transaction.savepoint(optimistic=True) |
||
| 376 | if row.get('PatientID'): |
||
| 377 | # To maintain the patient spreadsheet's IDs, we cannot do a 'renameaftercreation()' |
||
| 378 | if obj.getPatientID() != row.get('PatientID'): |
||
| 379 | transaction.savepoint(optimistic=True) |
||
| 380 | obj.aq_inner.aq_parent.manage_renameObject(obj.id, row.get('PatientID')) |
||
| 381 | else: |
||
| 382 | renameAfterCreation(obj) |
||
| 383 | |||
| 384 | |||
| 385 | class Insurance_Companies(WorksheetImporter): |
||
| 386 | |||
| 387 | def Import(self): |
||
| 388 | folder = self.context.bika_setup.bika_insurancecompanies |
||
| 389 | for row in self.get_rows(3): |
||
| 390 | obj = _createObjectByType("InsuranceCompany", folder, tmpID()) |
||
| 391 | if row.get('Name', None): |
||
| 392 | obj.edit( |
||
| 393 | Name=row.get('Name', ''), |
||
| 394 | EmailAddress=row.get('EmailAddress', ''), |
||
| 395 | Phone=row.get('Phone', ''), |
||
| 396 | Fax=row.get('Fax', ''), |
||
| 397 | TaxNumber=row.get('TaxNumber', ''), |
||
| 398 | AccountType=row.get('AccountType', {}), |
||
| 399 | AccountName=row.get('AccountName', {}), |
||
| 400 | AccountNumber=row.get('AccountNumber', ''), |
||
| 401 | BankName=row.get('BankName', ''), |
||
| 402 | BankBranch=row.get('BankBranch', ''), |
||
| 403 | ) |
||
| 404 | self.fill_contactfields(row, obj) |
||
| 405 | self.fill_addressfields(row, obj) |
||
| 406 | obj.unmarkCreationFlag() |
||
| 407 | renameAfterCreation(obj) |
||
| 408 |