Total Complexity | 418 |
Total Lines | 2217 |
Duplicated Lines | 8.84 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.exportimport.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE |
||
4 | # |
||
5 | # Copyright 2018 by it's authors. |
||
6 | # Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst. |
||
7 | |||
8 | from bika.lims.exportimport.dataimport import SetupDataSetList as SDL |
||
9 | from bika.lims.idserver import renameAfterCreation |
||
10 | from bika.lims.interfaces import ISetupDataSetList |
||
11 | from Products.CMFPlone.utils import safe_unicode, _createObjectByType |
||
12 | from bika.lims.utils import tmpID, to_unicode |
||
13 | from bika.lims.utils import to_utf8 |
||
14 | from bika.lims import bikaMessageFactory as _ |
||
15 | from bika.lims.utils import t |
||
16 | from Products.CMFCore.utils import getToolByName |
||
17 | from bika.lims import logger |
||
18 | from bika.lims.utils.analysis import create_analysis |
||
19 | from zope.interface import implements |
||
20 | from pkg_resources import resource_filename |
||
21 | import datetime |
||
22 | import os.path |
||
23 | import re |
||
24 | import transaction |
||
25 | |||
26 | |||
27 | def lookup(context, portal_type, **kwargs): |
||
28 | at = getToolByName(context, 'archetype_tool') |
||
29 | catalog = at.catalog_map.get(portal_type, [None])[0] or 'portal_catalog' |
||
30 | catalog = getToolByName(context, catalog) |
||
31 | kwargs['portal_type'] = portal_type |
||
32 | return catalog(**kwargs)[0].getObject() |
||
33 | |||
34 | |||
35 | def check_for_required_columns(name, data, required): |
||
36 | for column in required: |
||
37 | if not data.get(column, None): |
||
38 | message = _("%s has no '%s' column." % (name, column)) |
||
39 | raise Exception(t(message)) |
||
40 | |||
41 | |||
42 | def Float(thing): |
||
43 | try: |
||
44 | f = float(thing) |
||
45 | except ValueError: |
||
46 | f = 0.0 |
||
47 | return f |
||
48 | |||
49 | |||
50 | def read_file(path): |
||
51 | if os.path.isfile(path): |
||
52 | return open(path, "rb").read() |
||
53 | allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt', |
||
54 | 'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt'] |
||
55 | allowed_ext += [e.upper() for e in allowed_ext] |
||
56 | for e in allowed_ext: |
||
57 | out = '%s.%s' % (path, e) |
||
58 | if os.path.isfile(out): |
||
59 | return open(out, "rb").read() |
||
60 | raise IOError("File not found: %s. Allowed extensions: %s" % (path, ','.join(allowed_ext))) |
||
61 | |||
62 | |||
63 | class SetupDataSetList(SDL): |
||
64 | |||
65 | implements(ISetupDataSetList) |
||
66 | |||
67 | def __call__(self): |
||
68 | return SDL.__call__(self, projectname="bika.lims") |
||
69 | |||
70 | |||
71 | class WorksheetImporter: |
||
72 | |||
73 | """Use this as a base, for normal tabular data sheet imports. |
||
74 | """ |
||
75 | |||
76 | def __init__(self, context): |
||
77 | self.adapter_context = context |
||
78 | |||
79 | def __call__(self, lsd, workbook, dataset_project, dataset_name): |
||
80 | self.lsd = lsd |
||
81 | self.context = lsd.context |
||
82 | self.workbook = workbook |
||
83 | self.sheetname = self.__class__.__name__.replace("_", " ") |
||
84 | self.worksheet = workbook.get_sheet_by_name(self.sheetname) |
||
85 | self.dataset_project = dataset_project |
||
86 | self.dataset_name = dataset_name |
||
87 | if self.worksheet: |
||
88 | logger.info("Loading {0}.{1}: {2}".format( |
||
89 | self.dataset_project, self.dataset_name, self.sheetname)) |
||
90 | try: |
||
91 | self.Import() |
||
92 | except IOError: |
||
93 | # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/ |
||
94 | # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from |
||
95 | # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624 |
||
96 | warning = "Error while loading attached file from %s. The file will not be uploaded into the system." |
||
97 | logger.warning(warning, self.sheetname) |
||
98 | self.context.plone_utils.addPortalMessage("Error while loading some attached files. " |
||
99 | "The files weren't uploaded into the system.") |
||
100 | else: |
||
101 | logger.info("No records found: '{0}'".format(self.sheetname)) |
||
102 | |||
103 | def get_rows(self, startrow=3, worksheet=None): |
||
104 | """Returns a generator for all rows in a sheet. |
||
105 | Each row contains a dictionary where the key is the value of the |
||
106 | first row of the sheet for each column. |
||
107 | The data values are returned in utf-8 format. |
||
108 | Starts to consume data from startrow |
||
109 | """ |
||
110 | |||
111 | headers = [] |
||
112 | row_nr = 0 |
||
113 | worksheet = worksheet if worksheet else self.worksheet |
||
114 | for row in worksheet.rows: # .iter_rows(): |
||
115 | row_nr += 1 |
||
116 | if row_nr == 1: |
||
117 | # headers = [cell.internal_value for cell in row] |
||
118 | headers = [cell.value for cell in row] |
||
119 | continue |
||
120 | if row_nr % 1000 == 0: |
||
121 | transaction.savepoint() |
||
122 | if row_nr <= startrow: |
||
123 | continue |
||
124 | # row = [_c(cell.internal_value).decode('utf-8') for cell in row] |
||
125 | new_row = [] |
||
126 | for cell in row: |
||
127 | value = cell.value |
||
128 | if value is None: |
||
129 | value = '' |
||
130 | if isinstance(value, unicode): |
||
131 | value = value.encode('utf-8') |
||
132 | # Strip any space, \t, \n, or \r characters from the left-hand |
||
133 | # side, right-hand side, or both sides of the string |
||
134 | if isinstance(value, str): |
||
135 | value = value.strip(' \t\n\r') |
||
136 | new_row.append(value) |
||
137 | row = dict(zip(headers, new_row)) |
||
138 | |||
139 | # parse out addresses |
||
140 | for add_type in ['Physical', 'Postal', 'Billing']: |
||
141 | row[add_type] = {} |
||
142 | if add_type + "_Address" in row: |
||
143 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
144 | row[add_type][key] = str(row.get("%s_%s" % (add_type, key), '')) |
||
145 | |||
146 | yield row |
||
147 | |||
148 | def get_file_data(self, filename): |
||
149 | if filename: |
||
150 | try: |
||
151 | path = resource_filename( |
||
152 | self.dataset_project, |
||
153 | "setupdata/%s/%s" % (self.dataset_name, filename)) |
||
154 | file_data = open(path, "rb").read() |
||
155 | except: |
||
156 | file_data = None |
||
157 | else: |
||
158 | file_data = None |
||
159 | return file_data |
||
160 | |||
161 | def to_bool(self, value): |
||
162 | """ Converts a sheet string value to a boolean value. |
||
163 | Needed because of utf-8 conversions |
||
164 | """ |
||
165 | |||
166 | try: |
||
167 | value = value.lower() |
||
168 | except: |
||
169 | pass |
||
170 | try: |
||
171 | value = value.encode('utf-8') |
||
172 | except: |
||
173 | pass |
||
174 | try: |
||
175 | value = int(value) |
||
176 | except: |
||
177 | pass |
||
178 | if value in ('true', 1): |
||
179 | return True |
||
180 | else: |
||
181 | return False |
||
182 | |||
183 | def to_int(self, value, default=0): |
||
184 | """ Converts a value o a int. Returns default if the conversion fails. |
||
185 | """ |
||
186 | try: |
||
187 | return int(value) |
||
188 | except ValueError: |
||
189 | try: |
||
190 | return int(default) |
||
191 | except: |
||
192 | return 0 |
||
193 | |||
194 | def to_float(self, value, default=0): |
||
195 | """ Converts a value o a float. Returns default if the conversion fails. |
||
196 | """ |
||
197 | try: |
||
198 | return float(value) |
||
199 | except ValueError: |
||
200 | try: |
||
201 | return float(default) |
||
202 | except: |
||
203 | return 0.0 |
||
204 | |||
205 | def defer(self, **kwargs): |
||
206 | self.lsd.deferred.append(kwargs) |
||
207 | |||
208 | def Import(self): |
||
209 | """ Override this. |
||
210 | XXX Simple generic sheet importer |
||
211 | """ |
||
212 | |||
213 | def fill_addressfields(self, row, obj): |
||
214 | """ Fills the address fields for the specified object if allowed: |
||
215 | PhysicalAddress, PostalAddress, CountryState, BillingAddress |
||
216 | """ |
||
217 | addresses = {} |
||
218 | for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']: |
||
219 | addresses[add_type] = {} |
||
220 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
221 | addresses[add_type][key.lower()] = str(row.get("%s_%s" % (add_type, key), '')) |
||
222 | |||
223 | if addresses['CountryState']['country'] == '' \ |
||
224 | and addresses['CountryState']['state'] == '': |
||
225 | addresses['CountryState']['country'] = addresses['Physical']['country'] |
||
226 | addresses['CountryState']['state'] = addresses['Physical']['state'] |
||
227 | |||
228 | if hasattr(obj, 'setPhysicalAddress'): |
||
229 | obj.setPhysicalAddress(addresses['Physical']) |
||
230 | if hasattr(obj, 'setPostalAddress'): |
||
231 | obj.setPostalAddress(addresses['Postal']) |
||
232 | if hasattr(obj, 'setCountryState'): |
||
233 | obj.setCountryState(addresses['CountryState']) |
||
234 | if hasattr(obj, 'setBillingAddress'): |
||
235 | obj.setBillingAddress(addresses['Billing']) |
||
236 | |||
237 | def fill_contactfields(self, row, obj): |
||
238 | """ Fills the contact fields for the specified object if allowed: |
||
239 | EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone, |
||
240 | MobilePhone |
||
241 | """ |
||
242 | fieldnames = ['EmailAddress', |
||
243 | 'Phone', |
||
244 | 'Fax', |
||
245 | 'BusinessPhone', |
||
246 | 'BusinessFax', |
||
247 | 'HomePhone', |
||
248 | 'MobilePhone', |
||
249 | ] |
||
250 | schema = obj.Schema() |
||
251 | fields = dict([(field.getName(), field) for field in schema.fields()]) |
||
252 | for fieldname in fieldnames: |
||
253 | try: |
||
254 | field = fields[fieldname] |
||
255 | except: |
||
256 | if fieldname in row: |
||
257 | logger.info("Address field %s not found on %s"%(fieldname,obj)) |
||
258 | continue |
||
259 | value = row.get(fieldname, '') |
||
260 | field.set(obj, value) |
||
261 | |||
262 | def get_object(self, catalog, portal_type, title=None, **kwargs): |
||
263 | """This will return an object from the catalog. |
||
264 | Logs a message and returns None if no object or multiple objects found. |
||
265 | All keyword arguments are passed verbatim to the contentFilter |
||
266 | """ |
||
267 | if not title and not kwargs: |
||
268 | return None |
||
269 | contentFilter = {"portal_type": portal_type} |
||
270 | if title: |
||
271 | contentFilter['title'] = to_unicode(title) |
||
272 | contentFilter.update(kwargs) |
||
273 | brains = catalog(contentFilter) |
||
274 | if len(brains) > 1: |
||
275 | logger.info("More than one object found for %s" % contentFilter) |
||
276 | return None |
||
277 | elif len(brains) == 0: |
||
278 | if portal_type == 'AnalysisService': |
||
279 | brains = catalog(portal_type=portal_type, getKeyword=title) |
||
280 | if brains: |
||
281 | return brains[0].getObject() |
||
282 | logger.info("No objects found for %s" % contentFilter) |
||
283 | return None |
||
284 | else: |
||
285 | return brains[0].getObject() |
||
286 | |||
287 | |||
288 | class Sub_Groups(WorksheetImporter): |
||
289 | |||
290 | def Import(self): |
||
291 | folder = self.context.bika_setup.bika_subgroups |
||
292 | for row in self.get_rows(3): |
||
293 | if 'title' in row and row['title']: |
||
294 | obj = _createObjectByType("SubGroup", folder, tmpID()) |
||
295 | obj.edit(title=row['title'], |
||
296 | description=row['description'], |
||
297 | SortKey=row['SortKey']) |
||
298 | obj.unmarkCreationFlag() |
||
299 | renameAfterCreation(obj) |
||
300 | |||
301 | |||
302 | class Lab_Information(WorksheetImporter): |
||
303 | |||
304 | def Import(self): |
||
305 | laboratory = self.context.bika_setup.laboratory |
||
306 | values = {} |
||
307 | for row in self.get_rows(3): |
||
308 | values[row['Field']] = row['Value'] |
||
309 | |||
310 | if values['AccreditationBodyLogo']: |
||
311 | path = resource_filename( |
||
312 | self.dataset_project, |
||
313 | "setupdata/%s/%s" % (self.dataset_name, |
||
314 | values['AccreditationBodyLogo'])) |
||
315 | try: |
||
316 | file_data = read_file(path) |
||
317 | except Exception as msg: |
||
318 | file_data = None |
||
319 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
320 | else: |
||
321 | file_data = None |
||
322 | |||
323 | laboratory.edit( |
||
324 | Name=values['Name'], |
||
325 | LabURL=values['LabURL'], |
||
326 | Confidence=values['Confidence'], |
||
327 | LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']), |
||
328 | AccreditationBodyLong=values['AccreditationBodyLong'], |
||
329 | AccreditationBody=values['AccreditationBody'], |
||
330 | AccreditationBodyURL=values['AccreditationBodyURL'], |
||
331 | Accreditation=values['Accreditation'], |
||
332 | AccreditationReference=values['AccreditationReference'], |
||
333 | AccreditationBodyLogo=file_data, |
||
334 | TaxNumber=values['TaxNumber'], |
||
335 | ) |
||
336 | self.fill_contactfields(values, laboratory) |
||
337 | self.fill_addressfields(values, laboratory) |
||
338 | |||
339 | |||
340 | class Lab_Contacts(WorksheetImporter): |
||
341 | |||
342 | def Import(self): |
||
343 | folder = self.context.bika_setup.bika_labcontacts |
||
344 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
345 | portal_registration = getToolByName( |
||
346 | self.context, 'portal_registration') |
||
347 | rownum = 2 |
||
348 | for row in self.get_rows(3): |
||
349 | rownum+=1 |
||
350 | if not row.get('Firstname',None): |
||
351 | continue |
||
352 | |||
353 | # Username already exists? |
||
354 | username = row.get('Username','') |
||
355 | fullname = ('%s %s' % (row['Firstname'], row.get('Surname', ''))).strip() |
||
356 | if username: |
||
357 | username = safe_unicode(username).encode('utf-8') |
||
358 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
359 | exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username] |
||
360 | if exists: |
||
361 | error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format(username, str(rownum)) |
||
362 | logger.error(error) |
||
363 | continue |
||
364 | |||
365 | # Is there a signature file defined? Try to get the file first. |
||
366 | signature = None |
||
367 | if row.get('Signature'): |
||
368 | signature = self.get_file_data(row['Signature']) |
||
369 | if not signature: |
||
370 | warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format(row['Signature'], username) |
||
371 | logger.warning(warning) |
||
372 | |||
373 | obj = _createObjectByType("LabContact", folder, tmpID()) |
||
374 | obj.edit( |
||
375 | title=fullname, |
||
376 | Salutation=row.get('Salutation', ''), |
||
377 | Firstname=row['Firstname'], |
||
378 | Surname=row.get('Surname', ''), |
||
379 | JobTitle=row.get('JobTitle', ''), |
||
380 | Username=row.get('Username', ''), |
||
381 | Signature=signature |
||
382 | ) |
||
383 | obj.unmarkCreationFlag() |
||
384 | renameAfterCreation(obj) |
||
385 | self.fill_contactfields(row, obj) |
||
386 | self.fill_addressfields(row, obj) |
||
387 | |||
388 | if row['Department_title']: |
||
389 | self.defer(src_obj=obj, |
||
390 | src_field='Department', |
||
391 | dest_catalog='bika_setup_catalog', |
||
392 | dest_query={'portal_type': 'Department', |
||
393 | 'title': row['Department_title']} |
||
394 | ) |
||
395 | |||
396 | # Create Plone user |
||
397 | if not row['Username']: |
||
398 | warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum)) |
||
399 | logger.warning(warn) |
||
400 | if not row.get('EmailAddress', ''): |
||
401 | warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum)) |
||
402 | logger.warning(warn) |
||
403 | |||
404 | if(row['Username'] and row.get('EmailAddress','')): |
||
405 | username = safe_unicode(row['Username']).encode('utf-8') |
||
406 | passw = row['Password'] |
||
407 | if not passw: |
||
408 | warn = "Lab Contact: No password defined for user '{0}' in row {1}. Password established automatically to '{3}'".format(username, str(rownum), username) |
||
409 | logger.warning(warn) |
||
410 | passw = username |
||
411 | |||
412 | try: |
||
413 | member = portal_registration.addMember( |
||
414 | username, |
||
415 | passw, |
||
416 | properties={ |
||
417 | 'username': username, |
||
418 | 'email': row['EmailAddress'], |
||
419 | 'fullname': fullname} |
||
420 | ) |
||
421 | except Exception as msg: |
||
422 | logger.error("Client Contact: Error adding user (%s): %s" % (msg, username)) |
||
423 | continue |
||
424 | |||
425 | groups = row.get('Groups', '') |
||
426 | if not groups: |
||
427 | warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format(username, str(rownum)) |
||
428 | logger.warning(warn) |
||
429 | groups = 'Analysts' |
||
430 | |||
431 | group_ids = [g.strip() for g in groups.split(',')] |
||
432 | # Add user to all specified groups |
||
433 | for group_id in group_ids: |
||
434 | group = portal_groups.getGroupById(group_id) |
||
435 | if group: |
||
436 | group.addMember(username) |
||
437 | roles = row.get('Roles', '') |
||
438 | if roles: |
||
439 | role_ids = [r.strip() for r in roles.split(',')] |
||
440 | # Add user to all specified roles |
||
441 | for role_id in role_ids: |
||
442 | member._addRole(role_id) |
||
443 | # If user is in LabManagers, add Owner local role on clients |
||
444 | # folder |
||
445 | if 'LabManager' in group_ids: |
||
446 | self.context.clients.manage_setLocalRoles( |
||
447 | username, ['Owner', ]) |
||
448 | |||
449 | # Now we have the lab contacts registered, try to assign the managers |
||
450 | # to each department if required |
||
451 | sheet = self.workbook.get_sheet_by_name("Lab Departments") |
||
452 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
453 | for row in self.get_rows(3, sheet): |
||
454 | if row['title'] and row['LabContact_Username']: |
||
455 | dept = self.get_object(bsc, "Department", row.get('title')) |
||
456 | if dept and not dept.getManager(): |
||
457 | username = safe_unicode(row['LabContact_Username']).encode('utf-8') |
||
458 | exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username] |
||
459 | if exists: |
||
460 | dept.setManager(exists[0].UID()) |
||
461 | |||
462 | class Lab_Departments(WorksheetImporter): |
||
463 | |||
464 | def Import(self): |
||
465 | folder = self.context.bika_setup.bika_departments |
||
466 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
467 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact")] |
||
468 | for row in self.get_rows(3): |
||
469 | if row['title']: |
||
470 | obj = _createObjectByType("Department", folder, tmpID()) |
||
471 | obj.edit(title=row['title'], |
||
472 | description=row.get('description', '')) |
||
473 | manager = None |
||
474 | for contact in lab_contacts: |
||
475 | if contact.getUsername() == row['LabContact_Username']: |
||
476 | manager = contact |
||
477 | break |
||
478 | if manager: |
||
479 | obj.setManager(manager.UID()) |
||
480 | else: |
||
481 | message = "Department: lookup of '%s' in LabContacts/Username failed." % row[ |
||
482 | 'LabContact_Username'] |
||
483 | logger.info(message) |
||
484 | obj.unmarkCreationFlag() |
||
485 | renameAfterCreation(obj) |
||
486 | |||
487 | |||
488 | class Lab_Products(WorksheetImporter): |
||
489 | |||
490 | def Import(self): |
||
491 | context = self.context |
||
492 | # Refer to the default folder |
||
493 | folder = self.context.bika_setup.bika_labproducts |
||
494 | # Iterate through the rows |
||
495 | for row in self.get_rows(3): |
||
496 | # Create the SRTemplate object |
||
497 | obj = _createObjectByType('LabProduct', folder, tmpID()) |
||
498 | # Apply the row values |
||
499 | obj.edit( |
||
500 | title=row.get('title', 'Unknown'), |
||
501 | description=row.get('description', ''), |
||
502 | Volume=row.get('volume', 0), |
||
503 | Unit=str(row.get('unit', 0)), |
||
504 | Price=str(row.get('price', 0)), |
||
505 | ) |
||
506 | # Rename the new object |
||
507 | renameAfterCreation(obj) |
||
508 | |||
509 | |||
510 | class Clients(WorksheetImporter): |
||
511 | |||
512 | def Import(self): |
||
513 | folder = self.context.clients |
||
514 | for row in self.get_rows(3): |
||
515 | obj = _createObjectByType("Client", folder, tmpID()) |
||
516 | if not row['Name']: |
||
517 | message = "Client %s has no Name" |
||
518 | raise Exception(message) |
||
519 | if not row['ClientID']: |
||
520 | message = "Client %s has no Client ID" |
||
521 | raise Exception(message) |
||
522 | obj.edit(Name=row['Name'], |
||
523 | ClientID=row['ClientID'], |
||
524 | MemberDiscountApplies=row[ |
||
525 | 'MemberDiscountApplies'] and True or False, |
||
526 | BulkDiscount=row['BulkDiscount'] and True or False, |
||
527 | TaxNumber=row.get('TaxNumber', ''), |
||
528 | AccountNumber=row.get('AccountNumber', '') |
||
529 | ) |
||
530 | self.fill_contactfields(row, obj) |
||
531 | self.fill_addressfields(row, obj) |
||
532 | obj.unmarkCreationFlag() |
||
533 | renameAfterCreation(obj) |
||
534 | |||
535 | |||
536 | class Client_Contacts(WorksheetImporter): |
||
537 | |||
538 | def Import(self): |
||
539 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
540 | pc = getToolByName(self.context, 'portal_catalog') |
||
541 | for row in self.get_rows(3): |
||
542 | client = pc(portal_type="Client", |
||
543 | getName=row['Client_title']) |
||
544 | if len(client) == 0: |
||
545 | client_contact = "%(Firstname)s %(Surname)s" % row |
||
546 | error = "Client invalid: '%s'. The Client Contact %s will not be uploaded." |
||
547 | logger.error(error, row['Client_title'], client_contact) |
||
548 | continue |
||
549 | client = client[0].getObject() |
||
550 | contact = _createObjectByType("Contact", client, tmpID()) |
||
551 | fullname = "%(Firstname)s %(Surname)s" % row |
||
552 | pub_pref = [x.strip() for x in |
||
553 | row.get('PublicationPreference', '').split(",")] |
||
554 | contact.edit( |
||
555 | Salutation=row.get('Salutation', ''), |
||
556 | Firstname=row.get('Firstname', ''), |
||
557 | Surname=row.get('Surname', ''), |
||
558 | Username=row['Username'], |
||
559 | JobTitle=row.get('JobTitle', ''), |
||
560 | Department=row.get('Department', ''), |
||
561 | PublicationPreference=pub_pref, |
||
562 | AttachmentsPermitted=row[ |
||
563 | 'AttachmentsPermitted'] and True or False, |
||
564 | ) |
||
565 | self.fill_contactfields(row, contact) |
||
566 | self.fill_addressfields(row, contact) |
||
567 | contact.unmarkCreationFlag() |
||
568 | renameAfterCreation(contact) |
||
569 | # CC Contacts |
||
570 | if row['CCContacts']: |
||
571 | names = [x.strip() for x in row['CCContacts'].split(",")] |
||
572 | for _fullname in names: |
||
573 | self.defer(src_obj=contact, |
||
574 | src_field='CCContact', |
||
575 | dest_catalog='portal_catalog', |
||
576 | dest_query={'portal_type': 'Contact', |
||
577 | 'getFullname': _fullname} |
||
578 | ) |
||
579 | ## Create Plone user |
||
580 | username = safe_unicode(row['Username']).encode('utf-8') |
||
581 | password = safe_unicode(row['Password']).encode('utf-8') |
||
582 | if(username): |
||
583 | try: |
||
584 | member = self.context.portal_registration.addMember( |
||
585 | username, |
||
586 | password, |
||
587 | properties={ |
||
588 | 'username': username, |
||
589 | 'email': row['EmailAddress'], |
||
590 | 'fullname': fullname} |
||
591 | ) |
||
592 | except Exception as msg: |
||
593 | logger.info("Error adding user (%s): %s" % (msg, username)) |
||
594 | contact.aq_parent.manage_setLocalRoles(row['Username'], ['Owner', ]) |
||
595 | contact.reindexObject() |
||
596 | # add user to Clients group |
||
597 | group = portal_groups.getGroupById('Clients') |
||
598 | group.addMember(username) |
||
599 | |||
600 | |||
601 | class Container_Types(WorksheetImporter): |
||
602 | |||
603 | View Code Duplication | def Import(self): |
|
|
|||
604 | folder = self.context.bika_setup.bika_containertypes |
||
605 | for row in self.get_rows(3): |
||
606 | if not row['title']: |
||
607 | continue |
||
608 | obj = _createObjectByType("ContainerType", folder, tmpID()) |
||
609 | obj.edit(title=row['title'], |
||
610 | description=row.get('description', '')) |
||
611 | obj.unmarkCreationFlag() |
||
612 | renameAfterCreation(obj) |
||
613 | |||
614 | |||
615 | class Preservations(WorksheetImporter): |
||
616 | |||
617 | def Import(self): |
||
618 | folder = self.context.bika_setup.bika_preservations |
||
619 | for row in self.get_rows(3): |
||
620 | if not row['title']: |
||
621 | continue |
||
622 | obj = _createObjectByType("Preservation", folder, tmpID()) |
||
623 | RP = { |
||
624 | 'days': int(row['RetentionPeriod_days'] and row['RetentionPeriod_days'] or 0), |
||
625 | 'hours': int(row['RetentionPeriod_hours'] and row['RetentionPeriod_hours'] or 0), |
||
626 | 'minutes': int(row['RetentionPeriod_minutes'] and row['RetentionPeriod_minutes'] or 0), |
||
627 | } |
||
628 | |||
629 | obj.edit(title=row['title'], |
||
630 | description=row.get('description', ''), |
||
631 | RetentionPeriod=RP) |
||
632 | obj.unmarkCreationFlag() |
||
633 | renameAfterCreation(obj) |
||
634 | |||
635 | |||
636 | class Containers(WorksheetImporter): |
||
637 | |||
638 | def Import(self): |
||
639 | folder = self.context.bika_setup.bika_containers |
||
640 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
641 | for row in self.get_rows(3): |
||
642 | if not row['title']: |
||
643 | continue |
||
644 | obj = _createObjectByType("Container", folder, tmpID()) |
||
645 | obj.edit( |
||
646 | title=row['title'], |
||
647 | description=row.get('description', ''), |
||
648 | Capacity=row.get('Capacity', 0), |
||
649 | PrePreserved=self.to_bool(row['PrePreserved']) |
||
650 | ) |
||
651 | if row['ContainerType_title']: |
||
652 | ct = self.get_object(bsc, 'ContainerType', row.get('ContainerType_title','')) |
||
653 | if ct: |
||
654 | obj.setContainerType(ct) |
||
655 | if row['Preservation_title']: |
||
656 | pres = self.get_object(bsc, 'Preservation',row.get('Preservation_title','')) |
||
657 | if pres: |
||
658 | obj.setPreservation(pres) |
||
659 | obj.unmarkCreationFlag() |
||
660 | renameAfterCreation(obj) |
||
661 | |||
662 | |||
663 | class Suppliers(WorksheetImporter): |
||
664 | |||
665 | def Import(self): |
||
666 | folder = self.context.bika_setup.bika_suppliers |
||
667 | for row in self.get_rows(3): |
||
668 | obj = _createObjectByType("Supplier", folder, tmpID()) |
||
669 | if row['Name']: |
||
670 | obj.edit( |
||
671 | Name=row.get('Name', ''), |
||
672 | TaxNumber=row.get('TaxNumber', ''), |
||
673 | AccountType=row.get('AccountType', {}), |
||
674 | AccountName=row.get('AccountName', {}), |
||
675 | AccountNumber=row.get('AccountNumber', ''), |
||
676 | BankName=row.get('BankName', ''), |
||
677 | BankBranch=row.get('BankBranch', ''), |
||
678 | SWIFTcode=row.get('SWIFTcode', ''), |
||
679 | IBN=row.get('IBN', ''), |
||
680 | NIB=row.get('NIB', ''), |
||
681 | Website=row.get('Website', ''), |
||
682 | ) |
||
683 | self.fill_contactfields(row, obj) |
||
684 | self.fill_addressfields(row, obj) |
||
685 | obj.unmarkCreationFlag() |
||
686 | renameAfterCreation(obj) |
||
687 | |||
688 | |||
689 | class Supplier_Contacts(WorksheetImporter): |
||
690 | |||
691 | def Import(self): |
||
692 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
693 | for row in self.get_rows(3): |
||
694 | if not row['Supplier_Name']: |
||
695 | continue |
||
696 | if not row['Firstname']: |
||
697 | continue |
||
698 | folder = bsc(portal_type="Supplier", |
||
699 | Title=row['Supplier_Name']) |
||
700 | if not folder: |
||
701 | continue |
||
702 | folder = folder[0].getObject() |
||
703 | obj = _createObjectByType("SupplierContact", folder, tmpID()) |
||
704 | obj.edit( |
||
705 | Firstname=row['Firstname'], |
||
706 | Surname=row.get('Surname', ''), |
||
707 | Username=row.get('Username') |
||
708 | ) |
||
709 | self.fill_contactfields(row, obj) |
||
710 | self.fill_addressfields(row, obj) |
||
711 | obj.unmarkCreationFlag() |
||
712 | renameAfterCreation(obj) |
||
713 | |||
714 | |||
715 | class Manufacturers(WorksheetImporter): |
||
716 | |||
717 | def Import(self): |
||
718 | folder = self.context.bika_setup.bika_manufacturers |
||
719 | for row in self.get_rows(3): |
||
720 | obj = _createObjectByType("Manufacturer", folder, tmpID()) |
||
721 | if row['title']: |
||
722 | obj.edit( |
||
723 | title=row['title'], |
||
724 | description=row.get('description', '') |
||
725 | ) |
||
726 | self.fill_addressfields(row, obj) |
||
727 | obj.unmarkCreationFlag() |
||
728 | renameAfterCreation(obj) |
||
729 | |||
730 | |||
731 | class Instrument_Types(WorksheetImporter): |
||
732 | |||
733 | def Import(self): |
||
734 | folder = self.context.bika_setup.bika_instrumenttypes |
||
735 | for row in self.get_rows(3): |
||
736 | obj = _createObjectByType("InstrumentType", folder, tmpID()) |
||
737 | obj.edit( |
||
738 | title=row['title'], |
||
739 | description=row.get('description', '')) |
||
740 | obj.unmarkCreationFlag() |
||
741 | renameAfterCreation(obj) |
||
742 | |||
743 | |||
744 | class Instruments(WorksheetImporter): |
||
745 | |||
746 | def Import(self): |
||
747 | folder = self.context.bika_setup.bika_instruments |
||
748 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
749 | pc = getToolByName(self.context, 'portal_catalog') |
||
750 | for row in self.get_rows(3): |
||
751 | if ('Type' not in row |
||
752 | or 'Supplier' not in row |
||
753 | or 'Brand' not in row): |
||
754 | logger.info("Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title','')) |
||
755 | continue |
||
756 | |||
757 | obj = _createObjectByType("Instrument", folder, tmpID()) |
||
758 | |||
759 | obj.edit( |
||
760 | title=row.get('title', ''), |
||
761 | AssetNumber=row.get('assetnumber', ''), |
||
762 | description=row.get('description', ''), |
||
763 | Type=row.get('Type', ''), |
||
764 | Brand=row.get('Brand', ''), |
||
765 | Model=row.get('Model', ''), |
||
766 | SerialNo=row.get('SerialNo', ''), |
||
767 | DataInterface=row.get('DataInterface', ''), |
||
768 | Location=row.get('Location', ''), |
||
769 | InstallationDate=row.get('Instalationdate', ''), |
||
770 | UserManualID=row.get('UserManualID', ''), |
||
771 | ) |
||
772 | instrumenttype = self.get_object(bsc, 'InstrumentType', title=row.get('Type')) |
||
773 | manufacturer = self.get_object(bsc, 'Manufacturer', title=row.get('Brand')) |
||
774 | supplier = self.get_object(bsc, 'Supplier', getName=row.get('Supplier', '')) |
||
775 | method = self.get_object(pc, 'Method', title=row.get('Method')) |
||
776 | obj.setInstrumentType(instrumenttype) |
||
777 | obj.setManufacturer(manufacturer) |
||
778 | obj.setSupplier(supplier) |
||
779 | if method: |
||
780 | obj.setMethods([method]) |
||
781 | obj.setMethod(method) |
||
782 | |||
783 | # Attaching the instrument's photo |
||
784 | View Code Duplication | if row.get('Photo', None): |
|
785 | path = resource_filename( |
||
786 | self.dataset_project, |
||
787 | "setupdata/%s/%s" % (self.dataset_name, |
||
788 | row['Photo']) |
||
789 | ) |
||
790 | try: |
||
791 | file_data = read_file(path) |
||
792 | obj.setPhoto(file_data) |
||
793 | except Exception as msg: |
||
794 | file_data = None |
||
795 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
796 | |||
797 | # Attaching the Installation Certificate if exists |
||
798 | View Code Duplication | if row.get('InstalationCertificate', None): |
|
799 | path = resource_filename( |
||
800 | self.dataset_project, |
||
801 | "setupdata/%s/%s" % (self.dataset_name, |
||
802 | row['InstalationCertificate']) |
||
803 | ) |
||
804 | try: |
||
805 | file_data = read_file(path) |
||
806 | obj.setInstallationCertificate(file_data) |
||
807 | except Exception as msg: |
||
808 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
809 | |||
810 | # Attaching the Instrument's manual if exists |
||
811 | if row.get('UserManualFile', None): |
||
812 | row_dict = {'DocumentID': row.get('UserManualID', 'manual'), |
||
813 | 'DocumentVersion': '', |
||
814 | 'DocumentLocation': '', |
||
815 | 'DocumentType': 'Manual', |
||
816 | 'File': row.get('UserManualFile', None) |
||
817 | } |
||
818 | addDocument(self, row_dict, obj) |
||
819 | obj.unmarkCreationFlag() |
||
820 | renameAfterCreation(obj) |
||
821 | |||
822 | |||
823 | View Code Duplication | class Instrument_Validations(WorksheetImporter): |
|
824 | |||
825 | def Import(self): |
||
826 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
827 | for row in self.get_rows(3): |
||
828 | if not row.get('instrument', None) or not row.get('title', None): |
||
829 | continue |
||
830 | |||
831 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
832 | if folder: |
||
833 | obj = _createObjectByType("InstrumentValidation", folder, tmpID()) |
||
834 | obj.edit( |
||
835 | title=row['title'], |
||
836 | DownFrom=row.get('downfrom', ''), |
||
837 | DownTo=row.get('downto', ''), |
||
838 | Validator=row.get('validator', ''), |
||
839 | Considerations=row.get('considerations', ''), |
||
840 | WorkPerformed=row.get('workperformed', ''), |
||
841 | Remarks=row.get('remarks', ''), |
||
842 | DateIssued=row.get('DateIssued', ''), |
||
843 | ReportID=row.get('ReportID', '') |
||
844 | ) |
||
845 | # Getting lab contacts |
||
846 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
847 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", inactive_state='active')] |
||
848 | for contact in lab_contacts: |
||
849 | if contact.getFullname() == row.get('Worker', ''): |
||
850 | obj.setWorker(contact.UID()) |
||
851 | obj.unmarkCreationFlag() |
||
852 | renameAfterCreation(obj) |
||
853 | |||
854 | |||
855 | View Code Duplication | class Instrument_Calibrations(WorksheetImporter): |
|
856 | |||
857 | def Import(self): |
||
858 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
859 | for row in self.get_rows(3): |
||
860 | if not row.get('instrument', None) or not row.get('title', None): |
||
861 | continue |
||
862 | |||
863 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
864 | if folder: |
||
865 | obj = _createObjectByType("InstrumentCalibration", folder, tmpID()) |
||
866 | obj.edit( |
||
867 | title=row['title'], |
||
868 | DownFrom=row.get('downfrom', ''), |
||
869 | DownTo=row.get('downto', ''), |
||
870 | Calibrator=row.get('calibrator', ''), |
||
871 | Considerations=row.get('considerations', ''), |
||
872 | WorkPerformed=row.get('workperformed', ''), |
||
873 | Remarks=row.get('remarks', ''), |
||
874 | DateIssued=row.get('DateIssued', ''), |
||
875 | ReportID=row.get('ReportID', '') |
||
876 | ) |
||
877 | # Gettinginstrument lab contacts |
||
878 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
879 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')] |
||
880 | for contact in lab_contacts: |
||
881 | if contact.getFullname() == row.get('Worker', ''): |
||
882 | obj.setWorker(contact.UID()) |
||
883 | obj.unmarkCreationFlag() |
||
884 | renameAfterCreation(obj) |
||
885 | |||
886 | |||
887 | class Instrument_Certifications(WorksheetImporter): |
||
888 | |||
889 | def Import(self): |
||
890 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
891 | for row in self.get_rows(3): |
||
892 | if not row['instrument'] or not row['title']: |
||
893 | continue |
||
894 | |||
895 | folder = self.get_object(bsc, 'Instrument', row.get('instrument','')) |
||
896 | if folder: |
||
897 | obj = _createObjectByType("InstrumentCertification", folder, tmpID()) |
||
898 | today = datetime.date.today() |
||
899 | certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \ |
||
900 | if row.get('validto', '') == '' else row.get('validto') |
||
901 | certificate_start_date = today.strftime('%d/%m/%Y') \ |
||
902 | if row.get('validfrom', '') == '' else row.get('validfrom') |
||
903 | obj.edit( |
||
904 | title=row['title'], |
||
905 | AssetNumber=row.get('assetnumber', ''), |
||
906 | Date=row.get('date', ''), |
||
907 | ValidFrom=certificate_start_date, |
||
908 | ValidTo=certificate_expire_date, |
||
909 | Agency=row.get('agency', ''), |
||
910 | Remarks=row.get('remarks', ''), |
||
911 | ) |
||
912 | # Attaching the Report Certificate if exists |
||
913 | View Code Duplication | if row.get('report', None): |
|
914 | path = resource_filename( |
||
915 | self.dataset_project, |
||
916 | "setupdata/%s/%s" % (self.dataset_name, |
||
917 | row['report']) |
||
918 | ) |
||
919 | try: |
||
920 | file_data = read_file(path) |
||
921 | obj.setDocument(file_data) |
||
922 | except Exception as msg: |
||
923 | file_data = None |
||
924 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
925 | |||
926 | # Getting lab contacts |
||
927 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
928 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')] |
||
929 | for contact in lab_contacts: |
||
930 | if contact.getFullname() == row.get('preparedby', ''): |
||
931 | obj.setPreparator(contact.UID()) |
||
932 | if contact.getFullname() == row.get('approvedby', ''): |
||
933 | obj.setValidator(contact.UID()) |
||
934 | obj.unmarkCreationFlag() |
||
935 | renameAfterCreation(obj) |
||
936 | |||
937 | |||
938 | class Instrument_Documents(WorksheetImporter): |
||
939 | |||
940 | def Import(self): |
||
941 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
942 | for row in self.get_rows(3): |
||
943 | if not row.get('instrument', ''): |
||
944 | continue |
||
945 | folder = self.get_object(bsc, 'Instrument', row.get('instrument', '')) |
||
946 | addDocument(self, row, folder) |
||
947 | |||
948 | def addDocument(self, row_dict, folder): |
||
949 | """ |
||
950 | This function adds a multifile object to the instrument folder |
||
951 | :param row_dict: the dictionary which contains the document information |
||
952 | :param folder: the instrument object |
||
953 | """ |
||
954 | if folder: |
||
955 | # This content type need a file |
||
956 | if row_dict.get('File', None): |
||
957 | path = resource_filename( |
||
958 | self.dataset_project, |
||
959 | "setupdata/%s/%s" % (self.dataset_name, |
||
960 | row_dict['File']) |
||
961 | ) |
||
962 | try: |
||
963 | file_data = read_file(path) |
||
964 | except Exception as msg: |
||
965 | file_data = None |
||
966 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
967 | |||
968 | # Obtain all created instrument documents content type |
||
969 | catalog = getToolByName(self.context, 'bika_setup_catalog') |
||
970 | documents_brains = catalog.searchResults({'portal_type': 'Multifile'}) |
||
971 | # If a the new document has the same DocumentID as a created document, this object won't be created. |
||
972 | idAlreadyInUse = False |
||
973 | for item in documents_brains: |
||
974 | if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''): |
||
975 | warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \ |
||
976 | "the file hasn't been upload." % (row_dict.get('DocumentID', ''), row_dict.get('instrument', '')) |
||
977 | self.context.plone_utils.addPortalMessage(warning) |
||
978 | idAlreadyInUse = True |
||
979 | if not idAlreadyInUse: |
||
980 | obj = _createObjectByType("Multifile", folder, tmpID()) |
||
981 | obj.edit( |
||
982 | DocumentID=row_dict.get('DocumentID', ''), |
||
983 | DocumentVersion=row_dict.get('DocumentVersion', ''), |
||
984 | DocumentLocation=row_dict.get('DocumentLocation', ''), |
||
985 | DocumentType=row_dict.get('DocumentType', ''), |
||
986 | File=file_data |
||
987 | ) |
||
988 | obj.unmarkCreationFlag() |
||
989 | renameAfterCreation(obj) |
||
990 | |||
991 | |||
992 | class Instrument_Maintenance_Tasks(WorksheetImporter): |
||
993 | |||
994 | def Import(self): |
||
995 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
996 | for row in self.get_rows(3): |
||
997 | if not row['instrument'] or not row['title'] or not row['type']: |
||
998 | continue |
||
999 | |||
1000 | folder = self.get_object(bsc, 'Instrument',row.get('instrument')) |
||
1001 | if folder: |
||
1002 | obj = _createObjectByType("InstrumentMaintenanceTask", folder, tmpID()) |
||
1003 | try: |
||
1004 | cost = "%.2f" % (row.get('cost', 0)) |
||
1005 | except: |
||
1006 | cost = row.get('cost', '0.0') |
||
1007 | |||
1008 | obj.edit( |
||
1009 | title=row['title'], |
||
1010 | description=row['description'], |
||
1011 | Type=row['type'], |
||
1012 | DownFrom=row.get('downfrom', ''), |
||
1013 | DownTo=row.get('downto', ''), |
||
1014 | Maintainer=row.get('maintaner', ''), |
||
1015 | Considerations=row.get('considerations', ''), |
||
1016 | WorkPerformed=row.get('workperformed', ''), |
||
1017 | Remarks=row.get('remarks', ''), |
||
1018 | Cost=cost, |
||
1019 | Closed=self.to_bool(row.get('closed')) |
||
1020 | ) |
||
1021 | obj.unmarkCreationFlag() |
||
1022 | renameAfterCreation(obj) |
||
1023 | |||
1024 | |||
1025 | class Instrument_Schedule(WorksheetImporter): |
||
1026 | |||
1027 | def Import(self): |
||
1028 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1029 | for row in self.get_rows(3): |
||
1030 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1031 | continue |
||
1032 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
1033 | if folder: |
||
1034 | obj = _createObjectByType("InstrumentScheduledTask", folder, tmpID()) |
||
1035 | criteria = [ |
||
1036 | {'fromenabled': row.get('date', None) is not None, |
||
1037 | 'fromdate': row.get('date', ''), |
||
1038 | 'repeatenabled': ((row['numrepeats'] and |
||
1039 | row['numrepeats'] > 1) or |
||
1040 | (row['repeatuntil'] and |
||
1041 | len(row['repeatuntil']) > 0)), |
||
1042 | 'repeatunit': row.get('numrepeats', ''), |
||
1043 | 'repeatperiod': row.get('periodicity', ''), |
||
1044 | 'repeatuntilenabled': (row['repeatuntil'] and |
||
1045 | len(row['repeatuntil']) > 0), |
||
1046 | 'repeatuntil': row.get('repeatuntil')} |
||
1047 | ] |
||
1048 | obj.edit( |
||
1049 | title=row['title'], |
||
1050 | Type=row['type'], |
||
1051 | ScheduleCriteria=criteria, |
||
1052 | Considerations=row.get('considerations', ''), |
||
1053 | ) |
||
1054 | obj.unmarkCreationFlag() |
||
1055 | renameAfterCreation(obj) |
||
1056 | |||
1057 | |||
1058 | class Sample_Matrices(WorksheetImporter): |
||
1059 | |||
1060 | View Code Duplication | def Import(self): |
|
1061 | folder = self.context.bika_setup.bika_samplematrices |
||
1062 | for row in self.get_rows(3): |
||
1063 | if not row['title']: |
||
1064 | continue |
||
1065 | obj = _createObjectByType("SampleMatrix", folder, tmpID()) |
||
1066 | obj.edit( |
||
1067 | title=row['title'], |
||
1068 | description=row.get('description', '') |
||
1069 | ) |
||
1070 | obj.unmarkCreationFlag() |
||
1071 | renameAfterCreation(obj) |
||
1072 | |||
1073 | |||
1074 | class Batch_Labels(WorksheetImporter): |
||
1075 | |||
1076 | def Import(self): |
||
1077 | folder = self.context.bika_setup.bika_batchlabels |
||
1078 | for row in self.get_rows(3): |
||
1079 | if row['title']: |
||
1080 | obj = _createObjectByType("BatchLabel", folder, tmpID()) |
||
1081 | obj.edit(title=row['title']) |
||
1082 | obj.unmarkCreationFlag() |
||
1083 | renameAfterCreation(obj) |
||
1084 | |||
1085 | |||
1086 | class Sample_Types(WorksheetImporter): |
||
1087 | |||
1088 | def Import(self): |
||
1089 | folder = self.context.bika_setup.bika_sampletypes |
||
1090 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1091 | for row in self.get_rows(3): |
||
1092 | if not row['title']: |
||
1093 | continue |
||
1094 | obj = _createObjectByType("SampleType", folder, tmpID()) |
||
1095 | samplematrix = self.get_object(bsc, 'SampleMatrix', |
||
1096 | row.get('SampleMatrix_title')) |
||
1097 | containertype = self.get_object(bsc, 'ContainerType', |
||
1098 | row.get('ContainerType_title')) |
||
1099 | retentionperiod = { |
||
1100 | 'days': row['RetentionPeriod'] if row['RetentionPeriod'] else 0, |
||
1101 | 'hours': 0, |
||
1102 | 'minutes': 0} |
||
1103 | obj.edit( |
||
1104 | title=row['title'], |
||
1105 | description=row.get('description', ''), |
||
1106 | RetentionPeriod=retentionperiod, |
||
1107 | Hazardous=self.to_bool(row['Hazardous']), |
||
1108 | SampleMatrix=samplematrix, |
||
1109 | Prefix=row['Prefix'], |
||
1110 | MinimumVolume=row['MinimumVolume'], |
||
1111 | ContainerType=containertype |
||
1112 | ) |
||
1113 | samplepoint = self.get_object(bsc, 'SamplePoint', |
||
1114 | row.get('SamplePoint_title')) |
||
1115 | if samplepoint: |
||
1116 | obj.setSamplePoints([samplepoint, ]) |
||
1117 | obj.unmarkCreationFlag() |
||
1118 | renameAfterCreation(obj) |
||
1119 | |||
1120 | |||
1121 | class Sample_Points(WorksheetImporter): |
||
1122 | |||
1123 | def Import(self): |
||
1124 | setup_folder = self.context.bika_setup.bika_samplepoints |
||
1125 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1126 | pc = getToolByName(self.context, 'portal_catalog') |
||
1127 | for row in self.get_rows(3): |
||
1128 | if not row['title']: |
||
1129 | continue |
||
1130 | if row['Client_title']: |
||
1131 | client_title = row['Client_title'] |
||
1132 | client = pc(portal_type="Client", getName=client_title) |
||
1133 | if len(client) == 0: |
||
1134 | error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded." |
||
1135 | logger.error(error, row['title'], client_title) |
||
1136 | continue |
||
1137 | folder = client[0].getObject() |
||
1138 | else: |
||
1139 | folder = setup_folder |
||
1140 | |||
1141 | if row['Latitude']: |
||
1142 | logger.log("Ignored SamplePoint Latitude", 'error') |
||
1143 | if row['Longitude']: |
||
1144 | logger.log("Ignored SamplePoint Longitude", 'error') |
||
1145 | |||
1146 | obj = _createObjectByType("SamplePoint", folder, tmpID()) |
||
1147 | obj.edit( |
||
1148 | title=row['title'], |
||
1149 | description=row.get('description', ''), |
||
1150 | Composite=self.to_bool(row['Composite']), |
||
1151 | Elevation=row['Elevation'], |
||
1152 | ) |
||
1153 | sampletype = self.get_object(bsc, 'SampleType', |
||
1154 | row.get('SampleType_title')) |
||
1155 | if sampletype: |
||
1156 | obj.setSampleTypes([sampletype, ]) |
||
1157 | obj.unmarkCreationFlag() |
||
1158 | renameAfterCreation(obj) |
||
1159 | |||
1160 | |||
1161 | class Sample_Point_Sample_Types(WorksheetImporter): |
||
1162 | |||
1163 | def Import(self): |
||
1164 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1165 | for row in self.get_rows(3): |
||
1166 | sampletype = self.get_object(bsc, |
||
1167 | 'SampleType', |
||
1168 | row.get('SampleType_title')) |
||
1169 | samplepoint = self.get_object(bsc, |
||
1170 | 'SamplePoint', |
||
1171 | row['SamplePoint_title']) |
||
1172 | if samplepoint: |
||
1173 | sampletypes = samplepoint.getSampleTypes() |
||
1174 | if sampletype not in sampletypes: |
||
1175 | sampletypes.append(sampletype) |
||
1176 | samplepoint.setSampleTypes(sampletypes) |
||
1177 | |||
1178 | if sampletype: |
||
1179 | samplepoints = sampletype.getSamplePoints() |
||
1180 | if samplepoint not in samplepoints: |
||
1181 | samplepoints.append(samplepoint) |
||
1182 | sampletype.setSamplePoints(samplepoints) |
||
1183 | |||
1184 | class Storage_Locations(WorksheetImporter): |
||
1185 | |||
1186 | def Import(self): |
||
1187 | setup_folder = self.context.bika_setup.bika_storagelocations |
||
1188 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1189 | pc = getToolByName(self.context, 'portal_catalog') |
||
1190 | for row in self.get_rows(3): |
||
1191 | if not row['Address']: |
||
1192 | continue |
||
1193 | |||
1194 | obj = _createObjectByType("StorageLocation", setup_folder, tmpID()) |
||
1195 | obj.edit( |
||
1196 | title=row['Address'], |
||
1197 | SiteTitle=row['SiteTitle'], |
||
1198 | SiteCode=row['SiteCode'], |
||
1199 | SiteDescription=row['SiteDescription'], |
||
1200 | LocationTitle=row['LocationTitle'], |
||
1201 | LocationCode=row['LocationCode'], |
||
1202 | LocationDescription=row['LocationDescription'], |
||
1203 | LocationType=row['LocationType'], |
||
1204 | ShelfTitle=row['ShelfTitle'], |
||
1205 | ShelfCode=row['ShelfCode'], |
||
1206 | ShelfDescription=row['ShelfDescription'], |
||
1207 | ) |
||
1208 | obj.unmarkCreationFlag() |
||
1209 | renameAfterCreation(obj) |
||
1210 | |||
1211 | |||
1212 | class Sample_Conditions(WorksheetImporter): |
||
1213 | |||
1214 | def Import(self): |
||
1215 | folder = self.context.bika_setup.bika_sampleconditions |
||
1216 | for row in self.get_rows(3): |
||
1217 | if row['Title']: |
||
1218 | obj = _createObjectByType("SampleCondition", folder, tmpID()) |
||
1219 | obj.edit( |
||
1220 | title=row['Title'], |
||
1221 | description=row.get('Description', '') |
||
1222 | ) |
||
1223 | obj.unmarkCreationFlag() |
||
1224 | renameAfterCreation(obj) |
||
1225 | |||
1226 | |||
1227 | class Analysis_Categories(WorksheetImporter): |
||
1228 | |||
1229 | def Import(self): |
||
1230 | folder = self.context.bika_setup.bika_analysiscategories |
||
1231 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1232 | for row in self.get_rows(3): |
||
1233 | department = None |
||
1234 | if row.get('Department_title', None): |
||
1235 | department = self.get_object(bsc, 'Department', |
||
1236 | row.get('Department_title')) |
||
1237 | if row.get('title', None) and department: |
||
1238 | obj = _createObjectByType("AnalysisCategory", folder, tmpID()) |
||
1239 | obj.edit( |
||
1240 | title=row['title'], |
||
1241 | description=row.get('description', '')) |
||
1242 | obj.setDepartment(department) |
||
1243 | obj.unmarkCreationFlag() |
||
1244 | renameAfterCreation(obj) |
||
1245 | elif not row.get('title', None): |
||
1246 | logger.warning("Error in in " + self.sheetname + ". Missing Title field") |
||
1247 | elif not row.get('Department_title', None): |
||
1248 | logger.warning("Error in " + self.sheetname + ". Department field missing.") |
||
1249 | else: |
||
1250 | logger.warning("Error in " + self.sheetname + ". Department " |
||
1251 | + row.get('Department_title') + "is wrong.") |
||
1252 | |||
1253 | |||
1254 | class Methods(WorksheetImporter): |
||
1255 | |||
1256 | def Import(self): |
||
1257 | folder = self.context.methods |
||
1258 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1259 | for row in self.get_rows(3): |
||
1260 | if row['title']: |
||
1261 | calculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title')) |
||
1262 | obj = _createObjectByType("Method", folder, tmpID()) |
||
1263 | obj.edit( |
||
1264 | title=row['title'], |
||
1265 | description=row.get('description', ''), |
||
1266 | Instructions=row.get('Instructions', ''), |
||
1267 | ManualEntryOfResults=row.get('ManualEntryOfResults', True), |
||
1268 | Calculation=calculation, |
||
1269 | MethodID=row.get('MethodID', ''), |
||
1270 | Accredited=row.get('Accredited', True), |
||
1271 | ) |
||
1272 | # Obtain all created methods |
||
1273 | catalog = getToolByName(self.context, 'portal_catalog') |
||
1274 | methods_brains = catalog.searchResults({'portal_type': 'Method'}) |
||
1275 | # If a the new method has the same MethodID as a created method, remove MethodID value. |
||
1276 | for methods in methods_brains: |
||
1277 | if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']: |
||
1278 | obj.edit(MethodID='') |
||
1279 | |||
1280 | View Code Duplication | if row['MethodDocument']: |
|
1281 | path = resource_filename( |
||
1282 | self.dataset_project, |
||
1283 | "setupdata/%s/%s" % (self.dataset_name, |
||
1284 | row['MethodDocument']) |
||
1285 | ) |
||
1286 | try: |
||
1287 | file_data = read_file(path) |
||
1288 | obj.setMethodDocument(file_data) |
||
1289 | except Exception as msg: |
||
1290 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
1291 | |||
1292 | obj.unmarkCreationFlag() |
||
1293 | renameAfterCreation(obj) |
||
1294 | |||
1295 | |||
1296 | class Sampling_Deviations(WorksheetImporter): |
||
1297 | |||
1298 | def Import(self): |
||
1299 | folder = self.context.bika_setup.bika_samplingdeviations |
||
1300 | for row in self.get_rows(3): |
||
1301 | if row['title']: |
||
1302 | obj = _createObjectByType("SamplingDeviation", folder, tmpID()) |
||
1303 | obj.edit( |
||
1304 | title=row['title'], |
||
1305 | description=row.get('description', '') |
||
1306 | ) |
||
1307 | obj.unmarkCreationFlag() |
||
1308 | renameAfterCreation(obj) |
||
1309 | |||
1310 | |||
1311 | class Calculations(WorksheetImporter): |
||
1312 | |||
1313 | def get_interim_fields(self): |
||
1314 | # preload Calculation Interim Fields sheet |
||
1315 | sheetname = 'Calculation Interim Fields' |
||
1316 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1317 | if not worksheet: |
||
1318 | return |
||
1319 | self.interim_fields = {} |
||
1320 | rows = self.get_rows(3, worksheet=worksheet) |
||
1321 | for row in rows: |
||
1322 | calc_title = row['Calculation_title'] |
||
1323 | if calc_title not in self.interim_fields.keys(): |
||
1324 | self.interim_fields[calc_title] = [] |
||
1325 | self.interim_fields[calc_title].append({ |
||
1326 | 'keyword': row['keyword'], |
||
1327 | 'title': row.get('title', ''), |
||
1328 | 'type': 'int', |
||
1329 | 'hidden': ('hidden' in row and row['hidden']) and True or False, |
||
1330 | 'value': row['value'], |
||
1331 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1332 | |||
1333 | def Import(self): |
||
1334 | self.get_interim_fields() |
||
1335 | folder = self.context.bika_setup.bika_calculations |
||
1336 | for row in self.get_rows(3): |
||
1337 | if not row['title']: |
||
1338 | continue |
||
1339 | calc_title = row['title'] |
||
1340 | calc_interims = self.interim_fields.get(calc_title, []) |
||
1341 | formula = row['Formula'] |
||
1342 | # scan formula for dep services |
||
1343 | keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula) |
||
1344 | # remove interims from deps |
||
1345 | interim_keys = [k['keyword'] for k in calc_interims] |
||
1346 | dep_keywords = [k for k in keywords if k not in interim_keys] |
||
1347 | |||
1348 | obj = _createObjectByType("Calculation", folder, tmpID()) |
||
1349 | obj.edit( |
||
1350 | title=calc_title, |
||
1351 | description=row.get('description', ''), |
||
1352 | InterimFields=calc_interims, |
||
1353 | Formula=str(row['Formula']) |
||
1354 | ) |
||
1355 | for kw in dep_keywords: |
||
1356 | self.defer(src_obj=obj, |
||
1357 | src_field='DependentServices', |
||
1358 | dest_catalog='bika_setup_catalog', |
||
1359 | dest_query={'portal_type': 'AnalysisService', |
||
1360 | 'getKeyword': kw} |
||
1361 | ) |
||
1362 | obj.unmarkCreationFlag() |
||
1363 | renameAfterCreation(obj) |
||
1364 | |||
1365 | # Now we have the calculations registered, try to assign default calcs |
||
1366 | # to methods |
||
1367 | sheet = self.workbook.get_sheet_by_name("Methods") |
||
1368 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1369 | for row in self.get_rows(3, sheet): |
||
1370 | if row.get('title', '') and row.get('Calculation_title', ''): |
||
1371 | meth = self.get_object(bsc, "Method", row.get('title')) |
||
1372 | if meth and not meth.getCalculation(): |
||
1373 | calctit = safe_unicode(row['Calculation_title']).encode('utf-8') |
||
1374 | calc = self.get_object(bsc, "Calculation", calctit) |
||
1375 | if calc: |
||
1376 | meth.setCalculation(calc.UID()) |
||
1377 | |||
1378 | |||
1379 | class Analysis_Services(WorksheetImporter): |
||
1380 | |||
1381 | def load_interim_fields(self): |
||
1382 | # preload AnalysisService InterimFields sheet |
||
1383 | sheetname = 'AnalysisService InterimFields' |
||
1384 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1385 | if not worksheet: |
||
1386 | return |
||
1387 | self.service_interims = {} |
||
1388 | rows = self.get_rows(3, worksheet=worksheet) |
||
1389 | for row in rows: |
||
1390 | service_title = row['Service_title'] |
||
1391 | if service_title not in self.service_interims.keys(): |
||
1392 | self.service_interims[service_title] = [] |
||
1393 | self.service_interims[service_title].append({ |
||
1394 | 'keyword': row['keyword'], |
||
1395 | 'title': row.get('title', ''), |
||
1396 | 'type': 'int', |
||
1397 | 'value': row['value'], |
||
1398 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1399 | |||
1400 | def load_result_options(self): |
||
1401 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1402 | sheetname = 'AnalysisService ResultOptions' |
||
1403 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1404 | if not worksheet: |
||
1405 | return |
||
1406 | for row in self.get_rows(3, worksheet=worksheet): |
||
1407 | service = self.get_object(bsc, 'AnalysisService', |
||
1408 | row.get('Service_title')) |
||
1409 | if not service: |
||
1410 | return |
||
1411 | sro = service.getResultOptions() |
||
1412 | sro.append({'ResultValue': row['ResultValue'], |
||
1413 | 'ResultText': row['ResultText']}) |
||
1414 | service.setResultOptions(sro) |
||
1415 | |||
1416 | def load_service_uncertainties(self): |
||
1417 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1418 | sheetname = 'AnalysisService Uncertainties' |
||
1419 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1420 | if not worksheet: |
||
1421 | return |
||
1422 | |||
1423 | bucket = {} |
||
1424 | count = 0 |
||
1425 | for row in self.get_rows(3, worksheet=worksheet): |
||
1426 | count += 1 |
||
1427 | service = self.get_object(bsc, 'AnalysisService', |
||
1428 | row.get('Service_title')) |
||
1429 | if not service: |
||
1430 | warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get('Service_title') |
||
1431 | logger.warning(warning) |
||
1432 | continue |
||
1433 | service_uid = service.UID() |
||
1434 | if service_uid not in bucket: |
||
1435 | bucket[service_uid] = [] |
||
1436 | bucket[service_uid].append( |
||
1437 | {'intercept_min': row['Range Min'], |
||
1438 | 'intercept_max': row['Range Max'], |
||
1439 | 'errorvalue': row['Uncertainty Value']} |
||
1440 | ) |
||
1441 | if count > 500: |
||
1442 | self.write_bucket(bucket) |
||
1443 | bucket = {} |
||
1444 | if bucket: |
||
1445 | self.write_bucket(bucket) |
||
1446 | |||
1447 | def get_methods(self, service_title, default_method): |
||
1448 | """ Return an array of objects of the type Method in accordance to the |
||
1449 | methods listed in the 'AnalysisService Methods' sheet and service |
||
1450 | set in the parameter service_title. |
||
1451 | If default_method is set, it will be included in the returned |
||
1452 | array. |
||
1453 | """ |
||
1454 | return self.get_relations(service_title, |
||
1455 | default_method, |
||
1456 | 'Method', |
||
1457 | 'portal_catalog', |
||
1458 | 'AnalysisService Methods', |
||
1459 | 'Method_title') |
||
1460 | |||
1461 | def get_instruments(self, service_title, default_instrument): |
||
1462 | """ Return an array of objects of the type Instrument in accordance to |
||
1463 | the instruments listed in the 'AnalysisService Instruments' sheet |
||
1464 | and service set in the parameter 'service_title'. |
||
1465 | If default_instrument is set, it will be included in the returned |
||
1466 | array. |
||
1467 | """ |
||
1468 | return self.get_relations(service_title, |
||
1469 | default_instrument, |
||
1470 | 'Instrument', |
||
1471 | 'bika_setup_catalog', |
||
1472 | 'AnalysisService Instruments', |
||
1473 | 'Instrument_title') |
||
1474 | |||
1475 | def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column): |
||
1476 | """ Return an array of objects of the specified type in accordance to |
||
1477 | the object titles defined in the sheet specified in 'sheet_name' and |
||
1478 | service set in the paramenter 'service_title'. |
||
1479 | If a default_obj is set, it will be included in the returned array. |
||
1480 | """ |
||
1481 | out_objects = [default_obj] if default_obj else [] |
||
1482 | cat = getToolByName(self.context, catalog_name) |
||
1483 | worksheet = self.workbook.get_sheet_by_name(sheet_name) |
||
1484 | if not worksheet: |
||
1485 | return out_objects |
||
1486 | for row in self.get_rows(3, worksheet=worksheet): |
||
1487 | row_as_title = row.get('Service_title') |
||
1488 | if not row_as_title: |
||
1489 | return out_objects |
||
1490 | elif row_as_title != service_title: |
||
1491 | continue |
||
1492 | obj = self.get_object(cat, obj_type, row.get(column)) |
||
1493 | if obj: |
||
1494 | if default_obj and default_obj.UID() == obj.UID(): |
||
1495 | continue |
||
1496 | out_objects.append(obj) |
||
1497 | return out_objects |
||
1498 | |||
1499 | def write_bucket(self, bucket): |
||
1500 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1501 | for service_uid, uncertainties in bucket.items(): |
||
1502 | obj = bsc(UID=service_uid)[0].getObject() |
||
1503 | _uncert = list(obj.getUncertainties()) |
||
1504 | _uncert.extend(uncertainties) |
||
1505 | obj.setUncertainties(_uncert) |
||
1506 | |||
1507 | def Import(self): |
||
1508 | self.load_interim_fields() |
||
1509 | folder = self.context.bika_setup.bika_analysisservices |
||
1510 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1511 | pc = getToolByName(self.context, 'portal_catalog') |
||
1512 | for row in self.get_rows(3): |
||
1513 | if not row['title']: |
||
1514 | continue |
||
1515 | |||
1516 | obj = _createObjectByType("AnalysisService", folder, tmpID()) |
||
1517 | MTA = { |
||
1518 | 'days': self.to_int(row.get('MaxTimeAllowed_days',0),0), |
||
1519 | 'hours': self.to_int(row.get('MaxTimeAllowed_hours',0),0), |
||
1520 | 'minutes': self.to_int(row.get('MaxTimeAllowed_minutes',0),0), |
||
1521 | } |
||
1522 | category = self.get_object(bsc, 'AnalysisCategory', row.get('AnalysisCategory_title')) |
||
1523 | department = self.get_object(bsc, 'Department', row.get('Department_title')) |
||
1524 | container = self.get_object(bsc, 'Container', row.get('Container_title')) |
||
1525 | preservation = self.get_object(bsc, 'Preservation', row.get('Preservation_title')) |
||
1526 | |||
1527 | # Analysis Service - Method considerations: |
||
1528 | # One Analysis Service can have 0 or n Methods associated (field |
||
1529 | # 'Methods' from the Schema). |
||
1530 | # If the Analysis Service has at least one method associated, then |
||
1531 | # one of those methods can be set as the defualt method (field |
||
1532 | # '_Method' from the Schema). |
||
1533 | # |
||
1534 | # To make it easier, if a DefaultMethod is declared in the |
||
1535 | # Analysis_Services spreadsheet, but the same AS has no method |
||
1536 | # associated in the Analysis_Service_Methods spreadsheet, then make |
||
1537 | # the assumption that the DefaultMethod set in the former has to be |
||
1538 | # associated to the AS although the relation is missing. |
||
1539 | defaultmethod = self.get_object(pc, 'Method', row.get('DefaultMethod_title')) |
||
1540 | methods = self.get_methods(row['title'], defaultmethod) |
||
1541 | if not defaultmethod and methods: |
||
1542 | defaultmethod = methods[0] |
||
1543 | |||
1544 | # Analysis Service - Instrument considerations: |
||
1545 | # By default, an Analysis Services will be associated automatically |
||
1546 | # with several Instruments due to the Analysis Service - Methods |
||
1547 | # relation (an Instrument can be assigned to a Method and one Method |
||
1548 | # can have zero or n Instruments associated). There is no need to |
||
1549 | # set this assignment directly, the AnalysisService object will |
||
1550 | # find those instruments. |
||
1551 | # Besides this 'automatic' behavior, an Analysis Service can also |
||
1552 | # have 0 or n Instruments manually associated ('Instruments' field). |
||
1553 | # In this case, the attribute 'AllowInstrumentEntryOfResults' should |
||
1554 | # be set to True. |
||
1555 | # |
||
1556 | # To make it easier, if a DefaultInstrument is declared in the |
||
1557 | # Analysis_Services spreadsheet, but the same AS has no instrument |
||
1558 | # associated in the AnalysisService_Instruments spreadsheet, then |
||
1559 | # make the assumption the DefaultInstrument set in the former has |
||
1560 | # to be associated to the AS although the relation is missing and |
||
1561 | # the option AllowInstrumentEntryOfResults will be set to True. |
||
1562 | defaultinstrument = self.get_object(bsc, 'Instrument', row.get('DefaultInstrument_title')) |
||
1563 | instruments = self.get_instruments(row['title'], defaultinstrument) |
||
1564 | allowinstrentry = True if instruments else False |
||
1565 | if not defaultinstrument and instruments: |
||
1566 | defaultinstrument = instruments[0] |
||
1567 | |||
1568 | # The manual entry of results can only be set to false if the value |
||
1569 | # for the attribute "InstrumentEntryOfResults" is False. |
||
1570 | allowmanualentry = True if not allowinstrentry else row.get('ManualEntryOfResults', True) |
||
1571 | |||
1572 | # Analysis Service - Calculation considerations: |
||
1573 | # By default, the AnalysisService will use the Calculation associated |
||
1574 | # to the Default Method (the field "UseDefaultCalculation"==True). |
||
1575 | # If the Default Method for this AS doesn't have any Calculation |
||
1576 | # associated and the field "UseDefaultCalculation" is True, no |
||
1577 | # Calculation will be used for this AS ("_Calculation" field is |
||
1578 | # reserved and should not be set directly). |
||
1579 | # |
||
1580 | # To make it easier, if a Calculation is set by default in the |
||
1581 | # spreadsheet, then assume the UseDefaultCalculation has to be set |
||
1582 | # to False. |
||
1583 | deferredcalculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title')) |
||
1584 | usedefaultcalculation = False if deferredcalculation else True |
||
1585 | _calculation = deferredcalculation if deferredcalculation else \ |
||
1586 | (defaultmethod.getCalculation() if defaultmethod else None) |
||
1587 | |||
1588 | obj.edit( |
||
1589 | title=row['title'], |
||
1590 | ShortTitle=row.get('ShortTitle', row['title']), |
||
1591 | description=row.get('description', ''), |
||
1592 | Keyword=row['Keyword'], |
||
1593 | PointOfCapture=row['PointOfCapture'].lower(), |
||
1594 | Category=category, |
||
1595 | Department=department, |
||
1596 | AttachmentOption=row.get('Attachment', '')[0].lower() if row.get('Attachment', '') else 'p', |
||
1597 | Unit=row['Unit'] and row['Unit'] or None, |
||
1598 | Precision=row['Precision'] and str(row['Precision']) or '0', |
||
1599 | ExponentialFormatPrecision=str(self.to_int(row.get('ExponentialFormatPrecision',7),7)), |
||
1600 | LowerDetectionLimit='%06f' % self.to_float(row.get('LowerDetectionLimit', '0.0'), 0), |
||
1601 | UpperDetectionLimit='%06f' % self.to_float(row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0), |
||
1602 | DetectionLimitSelector=self.to_bool(row.get('DetectionLimitSelector',0)), |
||
1603 | MaxTimeAllowed=MTA, |
||
1604 | Price="%02f" % Float(row['Price']), |
||
1605 | BulkPrice="%02f" % Float(row['BulkPrice']), |
||
1606 | VAT="%02f" % Float(row['VAT']), |
||
1607 | _Method=defaultmethod, |
||
1608 | Methods=methods, |
||
1609 | ManualEntryOfResults=allowmanualentry, |
||
1610 | InstrumentEntryOfResults=allowinstrentry, |
||
1611 | Instruments=instruments, |
||
1612 | Calculation=_calculation, |
||
1613 | UseDefaultCalculation=usedefaultcalculation, |
||
1614 | DuplicateVariation="%02f" % Float(row['DuplicateVariation']), |
||
1615 | Accredited=self.to_bool(row['Accredited']), |
||
1616 | InterimFields=hasattr(self, 'service_interims') and self.service_interims.get( |
||
1617 | row['title'], []) or [], |
||
1618 | Separate=self.to_bool(row.get('Separate', False)), |
||
1619 | Container=container, |
||
1620 | Preservation=preservation, |
||
1621 | CommercialID=row.get('CommercialID', ''), |
||
1622 | ProtocolID=row.get('ProtocolID', '') |
||
1623 | ) |
||
1624 | obj.unmarkCreationFlag() |
||
1625 | renameAfterCreation(obj) |
||
1626 | self.load_result_options() |
||
1627 | self.load_service_uncertainties() |
||
1628 | |||
1629 | |||
1630 | class Analysis_Specifications(WorksheetImporter): |
||
1631 | |||
1632 | def resolve_service(self, row): |
||
1633 | bsc = getToolByName(self.context, "bika_setup_catalog") |
||
1634 | service = bsc( |
||
1635 | portal_type="AnalysisService", |
||
1636 | title=safe_unicode(row["service"]) |
||
1637 | ) |
||
1638 | if not service: |
||
1639 | service = bsc( |
||
1640 | portal_type="AnalysisService", |
||
1641 | getKeyword=safe_unicode(row["service"]) |
||
1642 | ) |
||
1643 | service = service[0].getObject() |
||
1644 | return service |
||
1645 | |||
1646 | def Import(self): |
||
1647 | s_t = "" |
||
1648 | bucket = {} |
||
1649 | pc = getToolByName(self.context, "portal_catalog") |
||
1650 | bsc = getToolByName(self.context, "bika_setup_catalog") |
||
1651 | # collect up all values into the bucket |
||
1652 | for row in self.get_rows(3): |
||
1653 | title = row.get("Title", False) |
||
1654 | if not title: |
||
1655 | title = row.get("title", False) |
||
1656 | if not title: |
||
1657 | continue |
||
1658 | parent = row["Client_title"] if row["Client_title"] else "lab" |
||
1659 | st = row["SampleType_title"] if row["SampleType_title"] else "" |
||
1660 | service = self.resolve_service(row) |
||
1661 | |||
1662 | if parent not in bucket: |
||
1663 | bucket[parent] = {} |
||
1664 | if title not in bucket[parent]: |
||
1665 | bucket[parent][title] = {"sampletype": st, "resultsrange": []} |
||
1666 | bucket[parent][title]["resultsrange"].append({ |
||
1667 | "keyword": service.getKeyword(), |
||
1668 | "min": row["min"] if row["min"] else "0", |
||
1669 | "max": row["max"] if row["max"] else "0", |
||
1670 | "error": row["error"] if row["error"] else "0" |
||
1671 | }) |
||
1672 | # write objects. |
||
1673 | for parent in bucket.keys(): |
||
1674 | for title in bucket[parent]: |
||
1675 | if parent == "lab": |
||
1676 | folder = self.context.bika_setup.bika_analysisspecs |
||
1677 | else: |
||
1678 | proxy = pc(portal_type="Client", getName=safe_unicode(parent))[0] |
||
1679 | folder = proxy.getObject() |
||
1680 | st = bucket[parent][title]["sampletype"] |
||
1681 | resultsrange = bucket[parent][title]["resultsrange"] |
||
1682 | if st: |
||
1683 | st_uid = bsc(portal_type="SampleType", title=safe_unicode(st))[0].UID |
||
1684 | obj = _createObjectByType("AnalysisSpec", folder, tmpID()) |
||
1685 | obj.edit(title=title) |
||
1686 | obj.setResultsRange(resultsrange) |
||
1687 | if st: |
||
1688 | obj.setSampleType(st_uid) |
||
1689 | obj.unmarkCreationFlag() |
||
1690 | renameAfterCreation(obj) |
||
1691 | |||
1692 | |||
1693 | class Analysis_Profiles(WorksheetImporter): |
||
1694 | |||
1695 | def load_analysis_profile_services(self): |
||
1696 | sheetname = 'Analysis Profile Services' |
||
1697 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1698 | self.profile_services = {} |
||
1699 | if not worksheet: |
||
1700 | return |
||
1701 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1702 | for row in self.get_rows(3, worksheet=worksheet): |
||
1703 | if not row.get('Profile','') or not row.get('Service',''): |
||
1704 | continue |
||
1705 | if row['Profile'] not in self.profile_services.keys(): |
||
1706 | self.profile_services[row['Profile']] = [] |
||
1707 | # Here we match againts Keyword or Title. |
||
1708 | # XXX We need a utility for this kind of thing. |
||
1709 | service = self.get_object(bsc, 'AnalysisService', row.get('Service')) |
||
1710 | if not service: |
||
1711 | service = bsc(portal_type='AnalysisService', |
||
1712 | getKeyword=row['Service'])[0].getObject() |
||
1713 | self.profile_services[row['Profile']].append(service) |
||
1714 | |||
1715 | def Import(self): |
||
1716 | self.load_analysis_profile_services() |
||
1717 | folder = self.context.bika_setup.bika_analysisprofiles |
||
1718 | for row in self.get_rows(3): |
||
1719 | if row['title']: |
||
1720 | obj = _createObjectByType("AnalysisProfile", folder, tmpID()) |
||
1721 | obj.edit(title=row['title'], |
||
1722 | description=row.get('description', ''), |
||
1723 | ProfileKey=row['ProfileKey'], |
||
1724 | CommercialID=row.get('CommercialID', ''), |
||
1725 | AnalysisProfilePrice="%02f" % Float(row.get('AnalysisProfilePrice', '0.0')), |
||
1726 | AnalysisProfileVAT="%02f" % Float(row.get('AnalysisProfileVAT', '0.0')), |
||
1727 | UseAnalysisProfilePrice=row.get('UseAnalysisProfilePrice', False) |
||
1728 | ) |
||
1729 | obj.setService(self.profile_services[row['title']]) |
||
1730 | obj.unmarkCreationFlag() |
||
1731 | renameAfterCreation(obj) |
||
1732 | |||
1733 | |||
1734 | class AR_Templates(WorksheetImporter): |
||
1735 | |||
1736 | View Code Duplication | def load_artemplate_analyses(self): |
|
1737 | sheetname = 'AR Template Analyses' |
||
1738 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1739 | self.artemplate_analyses = {} |
||
1740 | if not worksheet: |
||
1741 | return |
||
1742 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1743 | for row in self.get_rows(3, worksheet=worksheet): |
||
1744 | # XXX service_uid is not a uid |
||
1745 | service = self.get_object(bsc, 'AnalysisService', |
||
1746 | row.get('service_uid')) |
||
1747 | if row['ARTemplate'] not in self.artemplate_analyses.keys(): |
||
1748 | self.artemplate_analyses[row['ARTemplate']] = [] |
||
1749 | self.artemplate_analyses[row['ARTemplate']].append( |
||
1750 | {'service_uid': service.UID(), |
||
1751 | 'partition': row['partition'] |
||
1752 | } |
||
1753 | ) |
||
1754 | |||
1755 | def load_artemplate_partitions(self): |
||
1756 | sheetname = 'AR Template Partitions' |
||
1757 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1758 | self.artemplate_partitions = {} |
||
1759 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1760 | if not worksheet: |
||
1761 | return |
||
1762 | for row in self.get_rows(3, worksheet=worksheet): |
||
1763 | if row['ARTemplate'] not in self.artemplate_partitions.keys(): |
||
1764 | self.artemplate_partitions[row['ARTemplate']] = [] |
||
1765 | container = self.get_object(bsc, 'Container', |
||
1766 | row.get('container')) |
||
1767 | preservation = self.get_object(bsc, 'Preservation', |
||
1768 | row.get('preservation')) |
||
1769 | self.artemplate_partitions[row['ARTemplate']].append({ |
||
1770 | 'part_id': row['part_id'], |
||
1771 | 'Container': container.Title(), |
||
1772 | 'container_uid': container.UID(), |
||
1773 | 'Preservation': preservation.Title(), |
||
1774 | 'preservation_uid': preservation.UID()}) |
||
1775 | |||
1776 | def Import(self): |
||
1777 | self.load_artemplate_analyses() |
||
1778 | self.load_artemplate_partitions() |
||
1779 | folder = self.context.bika_setup.bika_artemplates |
||
1780 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1781 | pc = getToolByName(self.context, 'portal_catalog') |
||
1782 | for row in self.get_rows(3): |
||
1783 | if not row['title']: |
||
1784 | continue |
||
1785 | analyses = self.artemplate_analyses[row['title']] |
||
1786 | client_title = row['Client_title'] or 'lab' |
||
1787 | if row['title'] in self.artemplate_partitions: |
||
1788 | partitions = self.artemplate_partitions[row['title']] |
||
1789 | else: |
||
1790 | partitions = [{'part_id': 'part-1', |
||
1791 | 'container': '', |
||
1792 | 'preservation': ''}] |
||
1793 | |||
1794 | if client_title == 'lab': |
||
1795 | folder = self.context.bika_setup.bika_artemplates |
||
1796 | else: |
||
1797 | folder = pc(portal_type='Client', |
||
1798 | getName=client_title)[0].getObject() |
||
1799 | |||
1800 | sampletype = self.get_object(bsc, 'SampleType', |
||
1801 | row.get('SampleType_title')) |
||
1802 | samplepoint = self.get_object(bsc, 'SamplePoint', |
||
1803 | row.get('SamplePoint_title')) |
||
1804 | |||
1805 | obj = _createObjectByType("ARTemplate", folder, tmpID()) |
||
1806 | obj.edit( |
||
1807 | title=str(row['title']), |
||
1808 | description=row.get('description', ''), |
||
1809 | Remarks=row.get('Remarks', ''),) |
||
1810 | obj.setSampleType(sampletype) |
||
1811 | obj.setSamplePoint(samplepoint) |
||
1812 | obj.setPartitions(partitions) |
||
1813 | obj.setAnalyses(analyses) |
||
1814 | obj.unmarkCreationFlag() |
||
1815 | renameAfterCreation(obj) |
||
1816 | |||
1817 | |||
1818 | class Reference_Definitions(WorksheetImporter): |
||
1819 | |||
1820 | def load_reference_definition_results(self): |
||
1821 | sheetname = 'Reference Definition Results' |
||
1822 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1823 | if not worksheet: |
||
1824 | sheetname = 'Reference Definition Values' |
||
1825 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1826 | if not worksheet: |
||
1827 | return |
||
1828 | self.results = {} |
||
1829 | if not worksheet: |
||
1830 | return |
||
1831 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1832 | for row in self.get_rows(3, worksheet=worksheet): |
||
1833 | if row['ReferenceDefinition_title'] not in self.results.keys(): |
||
1834 | self.results[row['ReferenceDefinition_title']] = [] |
||
1835 | service = self.get_object(bsc, 'AnalysisService', |
||
1836 | row.get('service')) |
||
1837 | self.results[ |
||
1838 | row['ReferenceDefinition_title']].append({ |
||
1839 | 'uid': service.UID(), |
||
1840 | 'result': row['result'] if row['result'] else '0', |
||
1841 | 'min': row['min'] if row['min'] else '0', |
||
1842 | 'max': row['max'] if row['max'] else '0'}) |
||
1843 | |||
1844 | def Import(self): |
||
1845 | self.load_reference_definition_results() |
||
1846 | folder = self.context.bika_setup.bika_referencedefinitions |
||
1847 | for row in self.get_rows(3): |
||
1848 | if not row['title']: |
||
1849 | continue |
||
1850 | obj = _createObjectByType("ReferenceDefinition", folder, tmpID()) |
||
1851 | obj.edit( |
||
1852 | title=row['title'], |
||
1853 | description=row.get('description', ''), |
||
1854 | Blank=self.to_bool(row['Blank']), |
||
1855 | ReferenceResults=self.results.get(row['title'], []), |
||
1856 | Hazardous=self.to_bool(row['Hazardous'])) |
||
1857 | obj.unmarkCreationFlag() |
||
1858 | renameAfterCreation(obj) |
||
1859 | |||
1860 | |||
1861 | class Worksheet_Templates(WorksheetImporter): |
||
1862 | |||
1863 | def load_wst_layouts(self): |
||
1864 | sheetname = 'Worksheet Template Layouts' |
||
1865 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1866 | self.wst_layouts = {} |
||
1867 | if not worksheet: |
||
1868 | return |
||
1869 | for row in self.get_rows(3, worksheet=worksheet): |
||
1870 | if row['WorksheetTemplate_title'] \ |
||
1871 | not in self.wst_layouts.keys(): |
||
1872 | self.wst_layouts[ |
||
1873 | row['WorksheetTemplate_title']] = [] |
||
1874 | self.wst_layouts[ |
||
1875 | row['WorksheetTemplate_title']].append({ |
||
1876 | 'pos': row['pos'], |
||
1877 | 'type': row['type'], |
||
1878 | 'blank_ref': row['blank_ref'], |
||
1879 | 'control_ref': row['control_ref'], |
||
1880 | 'dup': row['dup']}) |
||
1881 | |||
1882 | View Code Duplication | def load_wst_services(self): |
|
1883 | sheetname = 'Worksheet Template Services' |
||
1884 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1885 | self.wst_services = {} |
||
1886 | if not worksheet: |
||
1887 | return |
||
1888 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1889 | for row in self.get_rows(3, worksheet=worksheet): |
||
1890 | service = self.get_object(bsc, 'AnalysisService', |
||
1891 | row.get('service')) |
||
1892 | if row['WorksheetTemplate_title'] not in self.wst_services.keys(): |
||
1893 | self.wst_services[row['WorksheetTemplate_title']] = [] |
||
1894 | self.wst_services[ |
||
1895 | row['WorksheetTemplate_title']].append(service.UID()) |
||
1896 | |||
1897 | def Import(self): |
||
1898 | self.load_wst_services() |
||
1899 | self.load_wst_layouts() |
||
1900 | folder = self.context.bika_setup.bika_worksheettemplates |
||
1901 | for row in self.get_rows(3): |
||
1902 | if row['title']: |
||
1903 | obj = _createObjectByType("WorksheetTemplate", folder, tmpID()) |
||
1904 | obj.edit( |
||
1905 | title=row['title'], |
||
1906 | description=row.get('description', ''), |
||
1907 | Layout=self.wst_layouts[row['title']]) |
||
1908 | obj.setService(self.wst_services[row['title']]) |
||
1909 | obj.unmarkCreationFlag() |
||
1910 | renameAfterCreation(obj) |
||
1911 | |||
1912 | |||
1913 | class Setup(WorksheetImporter): |
||
1914 | |||
1915 | def Import(self): |
||
1916 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1917 | values = {} |
||
1918 | for row in self.get_rows(3): |
||
1919 | values[row['Field']] = row['Value'] |
||
1920 | |||
1921 | DSL = { |
||
1922 | 'days': int(values['DefaultSampleLifetime_days'] and values['DefaultSampleLifetime_days'] or 0), |
||
1923 | 'hours': int(values['DefaultSampleLifetime_hours'] and values['DefaultSampleLifetime_hours'] or 0), |
||
1924 | 'minutes': int(values['DefaultSampleLifetime_minutes'] and values['DefaultSampleLifetime_minutes'] or 0), |
||
1925 | } |
||
1926 | self.context.bika_setup.edit( |
||
1927 | PasswordLifetime=int(values['PasswordLifetime']), |
||
1928 | AutoLogOff=int(values['AutoLogOff']), |
||
1929 | ShowPricing=values.get('ShowPricing', True), |
||
1930 | Currency=values['Currency'], |
||
1931 | DefaultCountry=values.get('DefaultCountry', ''), |
||
1932 | MemberDiscount=str(Float(values['MemberDiscount'])), |
||
1933 | VAT=str(Float(values['VAT'])), |
||
1934 | MinimumResults=int(values['MinimumResults']), |
||
1935 | SamplingWorkflowEnabled=values['SamplingWorkflowEnabled'], |
||
1936 | ScheduleSamplingEnabled=values.get('ScheduleSamplingEnabled', 0), |
||
1937 | CategoriseAnalysisServices=self.to_bool( |
||
1938 | values['CategoriseAnalysisServices']), |
||
1939 | EnableAnalysisRemarks=self.to_bool( |
||
1940 | values.get('EnableAnalysisRemarks', '')), |
||
1941 | ARImportOption=values['ARImportOption'], |
||
1942 | ARAttachmentOption=values['ARAttachmentOption'][0].lower(), |
||
1943 | AnalysisAttachmentOption=values[ |
||
1944 | 'AnalysisAttachmentOption'][0].lower(), |
||
1945 | DefaultSampleLifetime=DSL, |
||
1946 | AutoPrintStickers=values.get('AutoPrintStickers','receive').lower(), |
||
1947 | AutoStickerTemplate=values.get('AutoStickerTemplate', 'Code_128_1x48mm.pt'), |
||
1948 | YearInPrefix=self.to_bool(values['YearInPrefix']), |
||
1949 | SampleIDPadding=int(values['SampleIDPadding']), |
||
1950 | ARIDPadding=int(values['ARIDPadding']), |
||
1951 | ExternalIDServer=self.to_bool(values['ExternalIDServer']), |
||
1952 | IDServerURL=values['IDServerURL'], |
||
1953 | ) |
||
1954 | |||
1955 | |||
1956 | class ID_Prefixes(WorksheetImporter): |
||
1957 | |||
1958 | def Import(self): |
||
1959 | prefixes = self.context.bika_setup.getIDFormatting() |
||
1960 | for row in self.get_rows(3): |
||
1961 | # remove existing prefix from list |
||
1962 | prefixes = [p for p in prefixes |
||
1963 | if p['portal_type'] != row['portal_type']] |
||
1964 | # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator' |
||
1965 | separator = row.get('separator', '-') |
||
1966 | separator = '' if separator == 'none' else separator |
||
1967 | # add new prefix to list |
||
1968 | prefixes.append({'portal_type': row['portal_type'], |
||
1969 | 'padding': row['padding'], |
||
1970 | 'prefix': row['prefix'], |
||
1971 | 'separator': separator}) |
||
1972 | #self.context.bika_setup.setIDFormatting(prefixes) |
||
1973 | |||
1974 | |||
1975 | class Attachment_Types(WorksheetImporter): |
||
1976 | |||
1977 | def Import(self): |
||
1978 | folder = self.context.bika_setup.bika_attachmenttypes |
||
1979 | for row in self.get_rows(3): |
||
1980 | obj = _createObjectByType("AttachmentType", folder, tmpID()) |
||
1981 | obj.edit( |
||
1982 | title=row['title'], |
||
1983 | description=row.get('description', '')) |
||
1984 | obj.unmarkCreationFlag() |
||
1985 | renameAfterCreation(obj) |
||
1986 | |||
1987 | |||
1988 | class Reference_Samples(WorksheetImporter): |
||
1989 | |||
1990 | def load_reference_sample_results(self, sample): |
||
1991 | sheetname = 'Reference Sample Results' |
||
1992 | if not hasattr(self, 'results_worksheet'): |
||
1993 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1994 | if not worksheet: |
||
1995 | return |
||
1996 | self.results_worksheet = worksheet |
||
1997 | results = [] |
||
1998 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1999 | for row in self.get_rows(3, worksheet=self.results_worksheet): |
||
2000 | if row['ReferenceSample_id'] != sample.getId(): |
||
2001 | continue |
||
2002 | service = self.get_object(bsc, 'AnalysisService', |
||
2003 | row.get('AnalysisService_title')) |
||
2004 | if not service: |
||
2005 | warning = "Unable to load a reference sample result. Service %s not found." |
||
2006 | logger.warning(warning, sheetname) |
||
2007 | continue |
||
2008 | results.append({ |
||
2009 | 'uid': service.UID(), |
||
2010 | 'result': row['result'], |
||
2011 | 'min': row['min'], |
||
2012 | 'max': row['max']}) |
||
2013 | sample.setReferenceResults(results) |
||
2014 | |||
2015 | def load_reference_analyses(self, sample): |
||
2016 | sheetname = 'Reference Analyses' |
||
2017 | if not hasattr(self, 'analyses_worksheet'): |
||
2018 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2019 | if not worksheet: |
||
2020 | return |
||
2021 | self.analyses_worksheet = worksheet |
||
2022 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2023 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2024 | if row['ReferenceSample_id'] != sample.getId(): |
||
2025 | continue |
||
2026 | service = self.get_object(bsc, 'AnalysisService', |
||
2027 | row.get('AnalysisService_title')) |
||
2028 | # Analyses are keyed/named by service keyword |
||
2029 | obj = _createObjectByType("ReferenceAnalysis", sample, row['id']) |
||
2030 | obj.edit(title=row['id'], |
||
2031 | ReferenceType=row['ReferenceType'], |
||
2032 | Result=row['Result'], |
||
2033 | Analyst=row['Analyst'], |
||
2034 | Instrument=row['Instrument'], |
||
2035 | Retested=row['Retested'] |
||
2036 | ) |
||
2037 | obj.setService(service) |
||
2038 | # obj.setCreators(row['creator']) |
||
2039 | # obj.setCreationDate(row['created']) |
||
2040 | # self.set_wf_history(obj, row['workflow_history']) |
||
2041 | obj.unmarkCreationFlag() |
||
2042 | |||
2043 | self.load_reference_analysis_interims(obj) |
||
2044 | |||
2045 | View Code Duplication | def load_reference_analysis_interims(self, analysis): |
|
2046 | sheetname = 'Reference Analysis Interims' |
||
2047 | if not hasattr(self, 'interim_worksheet'): |
||
2048 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2049 | if not worksheet: |
||
2050 | return |
||
2051 | self.interim_worksheet = worksheet |
||
2052 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2053 | interims = [] |
||
2054 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2055 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2056 | continue |
||
2057 | interims.append({ |
||
2058 | 'keyword': row['keyword'], |
||
2059 | 'title': row['title'], |
||
2060 | 'value': row['value'], |
||
2061 | 'unit': row['unit'], |
||
2062 | 'hidden': row['hidden']}) |
||
2063 | analysis.setInterimFields(interims) |
||
2064 | |||
2065 | def Import(self): |
||
2066 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2067 | for row in self.get_rows(3): |
||
2068 | if not row['id']: |
||
2069 | continue |
||
2070 | supplier = bsc(portal_type='Supplier', |
||
2071 | getName=row.get('Supplier_title', ''))[0].getObject() |
||
2072 | obj = _createObjectByType("ReferenceSample", supplier, row['id']) |
||
2073 | ref_def = self.get_object(bsc, 'ReferenceDefinition', |
||
2074 | row.get('ReferenceDefinition_title')) |
||
2075 | ref_man = self.get_object(bsc, 'Manufacturer', |
||
2076 | row.get('Manufacturer_title')) |
||
2077 | obj.edit(title=row['id'], |
||
2078 | description=row.get('description', ''), |
||
2079 | Blank=self.to_bool(row['Blank']), |
||
2080 | Hazardous=self.to_bool(row['Hazardous']), |
||
2081 | CatalogueNumber=row['CatalogueNumber'], |
||
2082 | LotNumber=row['LotNumber'], |
||
2083 | Remarks=row['Remarks'], |
||
2084 | ExpiryDate=row['ExpiryDate'], |
||
2085 | DateSampled=row['DateSampled'], |
||
2086 | DateReceived=row['DateReceived'], |
||
2087 | DateOpened=row['DateOpened'], |
||
2088 | DateExpired=row['DateExpired'], |
||
2089 | DateDisposed=row['DateDisposed'] |
||
2090 | ) |
||
2091 | obj.setReferenceDefinition(ref_def) |
||
2092 | obj.setManufacturer(ref_man) |
||
2093 | obj.unmarkCreationFlag() |
||
2094 | |||
2095 | self.load_reference_sample_results(obj) |
||
2096 | self.load_reference_analyses(obj) |
||
2097 | |||
2098 | class Analysis_Requests(WorksheetImporter): |
||
2099 | |||
2100 | def load_analyses(self, sample): |
||
2101 | sheetname = 'Analyses' |
||
2102 | if not hasattr(self, 'analyses_worksheet'): |
||
2103 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2104 | if not worksheet: |
||
2105 | return |
||
2106 | self.analyses_worksheet = worksheet |
||
2107 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2108 | bc = getToolByName(self.context, 'bika_catalog') |
||
2109 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2110 | service = bsc(portal_type='AnalysisService', |
||
2111 | title=row['AnalysisService_title'])[0].getObject() |
||
2112 | # analyses are keyed/named by keyword |
||
2113 | ar = bc(portal_type='AnalysisRequest', id=row['AnalysisRequest_id'])[0].getObject() |
||
2114 | obj = create_analysis( |
||
2115 | ar, service, |
||
2116 | Result=row['Result'], |
||
2117 | ResultCaptureDate=row['ResultCaptureDate'], |
||
2118 | Analyst=row['Analyst'], |
||
2119 | Instrument=row['Instrument'], |
||
2120 | Retested=self.to_bool(row['Retested']), |
||
2121 | MaxTimeAllowed={ |
||
2122 | 'days': int(row.get('MaxTimeAllowed_days', 0)), |
||
2123 | 'hours': int(row.get('MaxTimeAllowed_hours', 0)), |
||
2124 | 'minutes': int(row.get('MaxTimeAllowed_minutes', 0)), |
||
2125 | }, |
||
2126 | ) |
||
2127 | |||
2128 | analyses = ar.objectValues('Analyses') |
||
2129 | analyses = list(analyses) |
||
2130 | analyses.append(obj) |
||
2131 | ar.setAnalyses(analyses) |
||
2132 | obj.unmarkCreationFlag() |
||
2133 | |||
2134 | self.load_analysis_interims(obj) |
||
2135 | |||
2136 | View Code Duplication | def load_analysis_interims(self, analysis): |
|
2137 | sheetname = 'Reference Analysis Interims' |
||
2138 | if not hasattr(self, 'interim_worksheet'): |
||
2139 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2140 | if not worksheet: |
||
2141 | return |
||
2142 | self.interim_worksheet = worksheet |
||
2143 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2144 | interims = [] |
||
2145 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2146 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2147 | continue |
||
2148 | interims.append({ |
||
2149 | 'keyword': row['keyword'], |
||
2150 | 'title': row['title'], |
||
2151 | 'value': row['value'], |
||
2152 | 'unit': row['unit'], |
||
2153 | 'hidden': row['hidden']}) |
||
2154 | analysis.setInterimFields(interims) |
||
2155 | |||
2156 | def Import(self): |
||
2157 | bc = getToolByName(self.context, 'bika_catalog') |
||
2158 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2159 | pc = getToolByName(self.context, 'portal_catalog') |
||
2160 | for row in self.get_rows(3): |
||
2161 | if not row['id']: |
||
2162 | continue |
||
2163 | client = pc(portal_type="Client", |
||
2164 | getName=row['Client_title'])[0].getObject() |
||
2165 | obj = _createObjectByType("AnalysisRequest", client, row['id']) |
||
2166 | contact = pc(portal_type="Contact", |
||
2167 | getFullname=row['Contact_Fullname'])[0].getObject() |
||
2168 | obj.edit( |
||
2169 | RequestID=row['id'], |
||
2170 | Contact=contact, |
||
2171 | CCEmails=row['CCEmails'], |
||
2172 | ClientOrderNumber=row['ClientOrderNumber'], |
||
2173 | InvoiceExclude=row['InvoiceExclude'], |
||
2174 | DateReceived=row['DateReceived'], |
||
2175 | DatePublished=row['DatePublished'], |
||
2176 | Remarks=row['Remarks'] |
||
2177 | ) |
||
2178 | if row['CCContact_Fullname']: |
||
2179 | contact = pc(portal_type="Contact", |
||
2180 | getFullname=row['CCContact_Fullname'])[0].getObject() |
||
2181 | obj.setCCContact(contact) |
||
2182 | if row['AnalysisProfile_title']: |
||
2183 | profile = pc(portal_type="AnalysisProfile", |
||
2184 | title=row['AnalysisProfile_title'].getObject()) |
||
2185 | obj.setProfile(profile) |
||
2186 | if row['ARTemplate_title']: |
||
2187 | template = pc(portal_type="ARTemplate", |
||
2188 | title=row['ARTemplate_title'])[0].getObject() |
||
2189 | obj.setProfile(template) |
||
2190 | |||
2191 | obj.unmarkCreationFlag() |
||
2192 | |||
2193 | self.load_analyses(obj) |
||
2194 | |||
2195 | |||
2196 | class Invoice_Batches(WorksheetImporter): |
||
2197 | |||
2198 | def Import(self): |
||
2199 | folder = self.context.invoices |
||
2200 | for row in self.get_rows(3): |
||
2201 | obj = _createObjectByType("InvoiceBatch", folder, tmpID()) |
||
2202 | if not row['title']: |
||
2203 | message = _("InvoiceBatch has no Title") |
||
2204 | raise Exception(t(message)) |
||
2205 | if not row['start']: |
||
2206 | message = _("InvoiceBatch has no Start Date") |
||
2207 | raise Exception(t(message)) |
||
2208 | if not row['end']: |
||
2209 | message = _("InvoiceBatch has no End Date") |
||
2210 | raise Exception(t(message)) |
||
2211 | obj.edit( |
||
2212 | title=row['title'], |
||
2213 | BatchStartDate=row['start'], |
||
2214 | BatchEndDate=row['end'], |
||
2215 | ) |
||
2216 | renameAfterCreation(obj) |
||
2217 |