Total Complexity | 443 |
Total Lines | 2333 |
Duplicated Lines | 9.6 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.exportimport.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2019 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | import datetime |
||
22 | import os.path |
||
23 | import re |
||
24 | |||
25 | from pkg_resources import resource_filename |
||
26 | |||
27 | import transaction |
||
28 | from bika.lims import api |
||
29 | from bika.lims import bikaMessageFactory as _ |
||
30 | from bika.lims import logger |
||
31 | from bika.lims.exportimport.dataimport import SetupDataSetList as SDL |
||
32 | from bika.lims.idserver import renameAfterCreation |
||
33 | from bika.lims.interfaces import ISetupDataSetList |
||
34 | from bika.lims.utils import getFromString |
||
35 | from bika.lims.utils import t |
||
36 | from bika.lims.utils import tmpID |
||
37 | from bika.lims.utils import to_unicode |
||
38 | from bika.lims.utils.analysis import create_analysis |
||
39 | from Products.Archetypes.event import ObjectInitializedEvent |
||
40 | from Products.CMFCore.utils import getToolByName |
||
41 | from Products.CMFPlone.utils import _createObjectByType |
||
42 | from Products.CMFPlone.utils import safe_unicode |
||
43 | from zope.event import notify |
||
44 | from zope.interface import implements |
||
45 | |||
46 | |||
47 | def lookup(context, portal_type, **kwargs): |
||
48 | at = getToolByName(context, 'archetype_tool') |
||
49 | catalog = at.catalog_map.get(portal_type, [None])[0] or 'portal_catalog' |
||
50 | catalog = getToolByName(context, catalog) |
||
51 | kwargs['portal_type'] = portal_type |
||
52 | return catalog(**kwargs)[0].getObject() |
||
53 | |||
54 | |||
55 | def check_for_required_columns(name, data, required): |
||
56 | for column in required: |
||
57 | if not data.get(column, None): |
||
58 | message = _("%s has no '%s' column." % (name, column)) |
||
59 | raise Exception(t(message)) |
||
60 | |||
61 | |||
62 | def Float(thing): |
||
63 | try: |
||
64 | f = float(thing) |
||
65 | except ValueError: |
||
66 | f = 0.0 |
||
67 | return f |
||
68 | |||
69 | |||
70 | def read_file(path): |
||
71 | if os.path.isfile(path): |
||
72 | return open(path, "rb").read() |
||
73 | allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt', |
||
74 | 'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt'] |
||
75 | allowed_ext += [e.upper() for e in allowed_ext] |
||
76 | for e in allowed_ext: |
||
77 | out = '%s.%s' % (path, e) |
||
78 | if os.path.isfile(out): |
||
79 | return open(out, "rb").read() |
||
80 | raise IOError("File not found: %s. Allowed extensions: %s" % (path, ','.join(allowed_ext))) |
||
81 | |||
82 | |||
83 | class SetupDataSetList(SDL): |
||
84 | |||
85 | implements(ISetupDataSetList) |
||
86 | |||
87 | def __call__(self): |
||
88 | return SDL.__call__(self, projectname="bika.lims") |
||
89 | |||
90 | |||
91 | class WorksheetImporter: |
||
92 | |||
93 | """Use this as a base, for normal tabular data sheet imports. |
||
94 | """ |
||
95 | |||
96 | def __init__(self, context): |
||
97 | self.adapter_context = context |
||
98 | |||
99 | def __call__(self, lsd, workbook, dataset_project, dataset_name): |
||
100 | self.lsd = lsd |
||
101 | self.context = lsd.context |
||
102 | self.workbook = workbook |
||
103 | self.sheetname = self.__class__.__name__.replace("_", " ") |
||
104 | self.worksheet = workbook.get_sheet_by_name(self.sheetname) |
||
105 | self.dataset_project = dataset_project |
||
106 | self.dataset_name = dataset_name |
||
107 | if self.worksheet: |
||
108 | logger.info("Loading {0}.{1}: {2}".format( |
||
109 | self.dataset_project, self.dataset_name, self.sheetname)) |
||
110 | try: |
||
111 | self.Import() |
||
112 | except IOError: |
||
113 | # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/ |
||
114 | # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from |
||
115 | # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624 |
||
116 | warning = "Error while loading attached file from %s. The file will not be uploaded into the system." |
||
117 | logger.warning(warning, self.sheetname) |
||
118 | self.context.plone_utils.addPortalMessage("Error while loading some attached files. " |
||
119 | "The files weren't uploaded into the system.") |
||
120 | else: |
||
121 | logger.info("No records found: '{0}'".format(self.sheetname)) |
||
122 | |||
123 | def get_rows(self, startrow=3, worksheet=None): |
||
124 | """Returns a generator for all rows in a sheet. |
||
125 | Each row contains a dictionary where the key is the value of the |
||
126 | first row of the sheet for each column. |
||
127 | The data values are returned in utf-8 format. |
||
128 | Starts to consume data from startrow |
||
129 | """ |
||
130 | |||
131 | headers = [] |
||
132 | row_nr = 0 |
||
133 | worksheet = worksheet if worksheet else self.worksheet |
||
134 | for row in worksheet.rows: # .iter_rows(): |
||
135 | row_nr += 1 |
||
136 | if row_nr == 1: |
||
137 | # headers = [cell.internal_value for cell in row] |
||
138 | headers = [cell.value for cell in row] |
||
139 | continue |
||
140 | if row_nr % 1000 == 0: |
||
141 | transaction.savepoint() |
||
142 | if row_nr <= startrow: |
||
143 | continue |
||
144 | # row = [_c(cell.internal_value).decode('utf-8') for cell in row] |
||
145 | new_row = [] |
||
146 | for cell in row: |
||
147 | value = cell.value |
||
148 | if value is None: |
||
149 | value = '' |
||
150 | if isinstance(value, unicode): |
||
151 | value = value.encode('utf-8') |
||
152 | # Strip any space, \t, \n, or \r characters from the left-hand |
||
153 | # side, right-hand side, or both sides of the string |
||
154 | if isinstance(value, str): |
||
155 | value = value.strip(' \t\n\r') |
||
156 | new_row.append(value) |
||
157 | row = dict(zip(headers, new_row)) |
||
158 | |||
159 | # parse out addresses |
||
160 | for add_type in ['Physical', 'Postal', 'Billing']: |
||
161 | row[add_type] = {} |
||
162 | if add_type + "_Address" in row: |
||
163 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
164 | row[add_type][key] = str(row.get("%s_%s" % (add_type, key), '')) |
||
165 | |||
166 | yield row |
||
167 | |||
168 | def get_file_data(self, filename): |
||
169 | if filename: |
||
170 | try: |
||
171 | path = resource_filename( |
||
172 | self.dataset_project, |
||
173 | "setupdata/%s/%s" % (self.dataset_name, filename)) |
||
174 | file_data = open(path, "rb").read() |
||
175 | except: |
||
176 | file_data = None |
||
177 | else: |
||
178 | file_data = None |
||
179 | return file_data |
||
180 | |||
181 | def to_bool(self, value): |
||
182 | """ Converts a sheet string value to a boolean value. |
||
183 | Needed because of utf-8 conversions |
||
184 | """ |
||
185 | |||
186 | try: |
||
187 | value = value.lower() |
||
188 | except: |
||
189 | pass |
||
190 | try: |
||
191 | value = value.encode('utf-8') |
||
192 | except: |
||
193 | pass |
||
194 | try: |
||
195 | value = int(value) |
||
196 | except: |
||
197 | pass |
||
198 | if value in ('true', 1): |
||
199 | return True |
||
200 | else: |
||
201 | return False |
||
202 | |||
203 | def to_int(self, value, default=0): |
||
204 | """ Converts a value o a int. Returns default if the conversion fails. |
||
205 | """ |
||
206 | try: |
||
207 | return int(value) |
||
208 | except ValueError: |
||
209 | try: |
||
210 | return int(default) |
||
211 | except: |
||
212 | return 0 |
||
213 | |||
214 | def to_float(self, value, default=0): |
||
215 | """ Converts a value o a float. Returns default if the conversion fails. |
||
216 | """ |
||
217 | try: |
||
218 | return float(value) |
||
219 | except ValueError: |
||
220 | try: |
||
221 | return float(default) |
||
222 | except: |
||
223 | return 0.0 |
||
224 | |||
225 | def defer(self, **kwargs): |
||
226 | self.lsd.deferred.append(kwargs) |
||
227 | |||
228 | def Import(self): |
||
229 | """ Override this. |
||
230 | XXX Simple generic sheet importer |
||
231 | """ |
||
232 | |||
233 | def fill_addressfields(self, row, obj): |
||
234 | """ Fills the address fields for the specified object if allowed: |
||
235 | PhysicalAddress, PostalAddress, CountryState, BillingAddress |
||
236 | """ |
||
237 | addresses = {} |
||
238 | for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']: |
||
239 | addresses[add_type] = {} |
||
240 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
241 | addresses[add_type][key.lower()] = str(row.get("%s_%s" % (add_type, key), '')) |
||
242 | |||
243 | if addresses['CountryState']['country'] == '' \ |
||
244 | and addresses['CountryState']['state'] == '': |
||
245 | addresses['CountryState']['country'] = addresses['Physical']['country'] |
||
246 | addresses['CountryState']['state'] = addresses['Physical']['state'] |
||
247 | |||
248 | if hasattr(obj, 'setPhysicalAddress'): |
||
249 | obj.setPhysicalAddress(addresses['Physical']) |
||
250 | if hasattr(obj, 'setPostalAddress'): |
||
251 | obj.setPostalAddress(addresses['Postal']) |
||
252 | if hasattr(obj, 'setCountryState'): |
||
253 | obj.setCountryState(addresses['CountryState']) |
||
254 | if hasattr(obj, 'setBillingAddress'): |
||
255 | obj.setBillingAddress(addresses['Billing']) |
||
256 | |||
257 | def fill_contactfields(self, row, obj): |
||
258 | """ Fills the contact fields for the specified object if allowed: |
||
259 | EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone, |
||
260 | MobilePhone |
||
261 | """ |
||
262 | fieldnames = ['EmailAddress', |
||
263 | 'Phone', |
||
264 | 'Fax', |
||
265 | 'BusinessPhone', |
||
266 | 'BusinessFax', |
||
267 | 'HomePhone', |
||
268 | 'MobilePhone', |
||
269 | ] |
||
270 | schema = obj.Schema() |
||
271 | fields = dict([(field.getName(), field) for field in schema.fields()]) |
||
272 | for fieldname in fieldnames: |
||
273 | try: |
||
274 | field = fields[fieldname] |
||
275 | except: |
||
276 | if fieldname in row: |
||
277 | logger.info("Address field %s not found on %s"%(fieldname,obj)) |
||
278 | continue |
||
279 | value = row.get(fieldname, '') |
||
280 | field.set(obj, value) |
||
281 | |||
282 | def get_object(self, catalog, portal_type, title=None, **kwargs): |
||
283 | """This will return an object from the catalog. |
||
284 | Logs a message and returns None if no object or multiple objects found. |
||
285 | All keyword arguments are passed verbatim to the contentFilter |
||
286 | """ |
||
287 | if not title and not kwargs: |
||
288 | return None |
||
289 | contentFilter = {"portal_type": portal_type} |
||
290 | if title: |
||
291 | contentFilter['title'] = to_unicode(title) |
||
292 | contentFilter.update(kwargs) |
||
293 | brains = catalog(contentFilter) |
||
294 | if len(brains) > 1: |
||
295 | logger.info("More than one object found for %s" % contentFilter) |
||
296 | return None |
||
297 | elif len(brains) == 0: |
||
298 | if portal_type == 'AnalysisService': |
||
299 | brains = catalog(portal_type=portal_type, getKeyword=title) |
||
300 | if brains: |
||
301 | return brains[0].getObject() |
||
302 | logger.info("No objects found for %s" % contentFilter) |
||
303 | return None |
||
304 | else: |
||
305 | return brains[0].getObject() |
||
306 | |||
307 | |||
308 | class Sub_Groups(WorksheetImporter): |
||
309 | |||
310 | def Import(self): |
||
311 | folder = self.context.bika_setup.bika_subgroups |
||
312 | for row in self.get_rows(3): |
||
313 | if 'title' in row and row['title']: |
||
314 | obj = _createObjectByType("SubGroup", folder, tmpID()) |
||
315 | obj.edit(title=row['title'], |
||
316 | description=row['description'], |
||
317 | SortKey=row['SortKey']) |
||
318 | obj.unmarkCreationFlag() |
||
319 | renameAfterCreation(obj) |
||
320 | notify(ObjectInitializedEvent(obj)) |
||
321 | |||
322 | |||
323 | class Lab_Information(WorksheetImporter): |
||
324 | |||
325 | def Import(self): |
||
326 | laboratory = self.context.bika_setup.laboratory |
||
327 | values = {} |
||
328 | for row in self.get_rows(3): |
||
329 | values[row['Field']] = row['Value'] |
||
330 | |||
331 | if values['AccreditationBodyLogo']: |
||
332 | path = resource_filename( |
||
333 | self.dataset_project, |
||
334 | "setupdata/%s/%s" % (self.dataset_name, |
||
335 | values['AccreditationBodyLogo'])) |
||
336 | try: |
||
337 | file_data = read_file(path) |
||
338 | except Exception as msg: |
||
339 | file_data = None |
||
340 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
341 | else: |
||
342 | file_data = None |
||
343 | |||
344 | laboratory.edit( |
||
345 | Name=values['Name'], |
||
346 | LabURL=values['LabURL'], |
||
347 | Confidence=values['Confidence'], |
||
348 | LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']), |
||
349 | AccreditationBodyLong=values['AccreditationBodyLong'], |
||
350 | AccreditationBody=values['AccreditationBody'], |
||
351 | AccreditationBodyURL=values['AccreditationBodyURL'], |
||
352 | Accreditation=values['Accreditation'], |
||
353 | AccreditationReference=values['AccreditationReference'], |
||
354 | AccreditationBodyLogo=file_data, |
||
355 | TaxNumber=values['TaxNumber'], |
||
356 | ) |
||
357 | self.fill_contactfields(values, laboratory) |
||
358 | self.fill_addressfields(values, laboratory) |
||
359 | |||
360 | |||
361 | class Lab_Contacts(WorksheetImporter): |
||
362 | |||
363 | def Import(self): |
||
364 | folder = self.context.bika_setup.bika_labcontacts |
||
365 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
366 | portal_registration = getToolByName( |
||
367 | self.context, 'portal_registration') |
||
368 | rownum = 2 |
||
369 | for row in self.get_rows(3): |
||
370 | rownum+=1 |
||
371 | if not row.get('Firstname',None): |
||
372 | continue |
||
373 | |||
374 | # Username already exists? |
||
375 | username = row.get('Username','') |
||
376 | fullname = ('%s %s' % (row['Firstname'], row.get('Surname', ''))).strip() |
||
377 | if username: |
||
378 | username = safe_unicode(username).encode('utf-8') |
||
379 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
380 | exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username] |
||
381 | if exists: |
||
382 | error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format(username, str(rownum)) |
||
383 | logger.error(error) |
||
384 | continue |
||
385 | |||
386 | # Is there a signature file defined? Try to get the file first. |
||
387 | signature = None |
||
388 | if row.get('Signature'): |
||
389 | signature = self.get_file_data(row['Signature']) |
||
390 | if not signature: |
||
391 | warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format(row['Signature'], username) |
||
392 | logger.warning(warning) |
||
393 | |||
394 | obj = _createObjectByType("LabContact", folder, tmpID()) |
||
395 | obj.edit( |
||
396 | title=fullname, |
||
397 | Salutation=row.get('Salutation', ''), |
||
398 | Firstname=row['Firstname'], |
||
399 | Surname=row.get('Surname', ''), |
||
400 | JobTitle=row.get('JobTitle', ''), |
||
401 | Username=row.get('Username', ''), |
||
402 | Signature=signature |
||
403 | ) |
||
404 | obj.unmarkCreationFlag() |
||
405 | renameAfterCreation(obj) |
||
406 | notify(ObjectInitializedEvent(obj)) |
||
407 | self.fill_contactfields(row, obj) |
||
408 | self.fill_addressfields(row, obj) |
||
409 | |||
410 | if row['Department_title']: |
||
411 | self.defer(src_obj=obj, |
||
412 | src_field='Department', |
||
413 | dest_catalog='bika_setup_catalog', |
||
414 | dest_query={'portal_type': 'Department', |
||
415 | 'title': row['Department_title']} |
||
416 | ) |
||
417 | |||
418 | # Create Plone user |
||
419 | if not row['Username']: |
||
420 | warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum)) |
||
421 | logger.warning(warn) |
||
422 | if not row.get('EmailAddress', ''): |
||
423 | warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum)) |
||
424 | logger.warning(warn) |
||
425 | |||
426 | if(row['Username'] and row.get('EmailAddress','')): |
||
427 | username = safe_unicode(row['Username']).encode('utf-8') |
||
428 | passw = row['Password'] |
||
429 | if not passw: |
||
430 | warn = "Lab Contact: No password defined for user '{0}' in row {1}. Password established automatically to '{3}'".format(username, str(rownum), username) |
||
431 | logger.warning(warn) |
||
432 | passw = username |
||
433 | |||
434 | try: |
||
435 | member = portal_registration.addMember( |
||
436 | username, |
||
437 | passw, |
||
438 | properties={ |
||
439 | 'username': username, |
||
440 | 'email': row['EmailAddress'], |
||
441 | 'fullname': fullname} |
||
442 | ) |
||
443 | except Exception as msg: |
||
444 | logger.error("Client Contact: Error adding user (%s): %s" % (msg, username)) |
||
445 | continue |
||
446 | |||
447 | groups = row.get('Groups', '') |
||
448 | if not groups: |
||
449 | warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format(username, str(rownum)) |
||
450 | logger.warning(warn) |
||
451 | groups = 'Analysts' |
||
452 | |||
453 | group_ids = [g.strip() for g in groups.split(',')] |
||
454 | # Add user to all specified groups |
||
455 | for group_id in group_ids: |
||
456 | group = portal_groups.getGroupById(group_id) |
||
457 | if group: |
||
458 | group.addMember(username) |
||
459 | roles = row.get('Roles', '') |
||
460 | if roles: |
||
461 | role_ids = [r.strip() for r in roles.split(',')] |
||
462 | # Add user to all specified roles |
||
463 | for role_id in role_ids: |
||
464 | member._addRole(role_id) |
||
465 | # If user is in LabManagers, add Owner local role on clients |
||
466 | # folder |
||
467 | if 'LabManager' in group_ids: |
||
468 | self.context.clients.manage_setLocalRoles( |
||
469 | username, ['Owner', ]) |
||
470 | |||
471 | # Now we have the lab contacts registered, try to assign the managers |
||
472 | # to each department if required |
||
473 | sheet = self.workbook.get_sheet_by_name("Lab Departments") |
||
474 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
475 | for row in self.get_rows(3, sheet): |
||
476 | if row['title'] and row['LabContact_Username']: |
||
477 | dept = self.get_object(bsc, "Department", row.get('title')) |
||
478 | if dept and not dept.getManager(): |
||
479 | username = safe_unicode(row['LabContact_Username']).encode('utf-8') |
||
480 | exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username] |
||
481 | if exists: |
||
482 | dept.setManager(exists[0].UID()) |
||
483 | |||
484 | class Lab_Departments(WorksheetImporter): |
||
485 | |||
486 | def Import(self): |
||
487 | folder = self.context.bika_setup.bika_departments |
||
488 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
489 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact")] |
||
490 | for row in self.get_rows(3): |
||
491 | if row['title']: |
||
492 | obj = _createObjectByType("Department", folder, tmpID()) |
||
493 | obj.edit(title=row['title'], |
||
494 | description=row.get('description', '')) |
||
495 | manager = None |
||
496 | for contact in lab_contacts: |
||
497 | if contact.getUsername() == row['LabContact_Username']: |
||
498 | manager = contact |
||
499 | break |
||
500 | if manager: |
||
501 | obj.setManager(manager.UID()) |
||
502 | else: |
||
503 | message = "Department: lookup of '%s' in LabContacts/Username failed." % row[ |
||
504 | 'LabContact_Username'] |
||
505 | logger.info(message) |
||
506 | obj.unmarkCreationFlag() |
||
507 | renameAfterCreation(obj) |
||
508 | notify(ObjectInitializedEvent(obj)) |
||
509 | |||
510 | |||
511 | class Lab_Products(WorksheetImporter): |
||
512 | |||
513 | def Import(self): |
||
514 | context = self.context |
||
515 | # Refer to the default folder |
||
516 | folder = self.context.bika_setup.bika_labproducts |
||
517 | # Iterate through the rows |
||
518 | for row in self.get_rows(3): |
||
519 | # Create the SRTemplate object |
||
520 | obj = _createObjectByType('LabProduct', folder, tmpID()) |
||
521 | # Apply the row values |
||
522 | obj.edit( |
||
523 | title=row.get('title', 'Unknown'), |
||
524 | description=row.get('description', ''), |
||
525 | Volume=row.get('volume', 0), |
||
526 | Unit=str(row.get('unit', 0)), |
||
527 | Price=str(row.get('price', 0)), |
||
528 | ) |
||
529 | # Rename the new object |
||
530 | renameAfterCreation(obj) |
||
531 | notify(ObjectInitializedEvent(obj)) |
||
532 | |||
533 | |||
534 | class Clients(WorksheetImporter): |
||
535 | |||
536 | def Import(self): |
||
537 | folder = self.context.clients |
||
538 | for row in self.get_rows(3): |
||
539 | obj = _createObjectByType("Client", folder, tmpID()) |
||
540 | if not row['Name']: |
||
541 | message = "Client %s has no Name" |
||
542 | raise Exception(message) |
||
543 | if not row['ClientID']: |
||
544 | message = "Client %s has no Client ID" |
||
545 | raise Exception(message) |
||
546 | obj.edit(Name=row['Name'], |
||
547 | ClientID=row['ClientID'], |
||
548 | MemberDiscountApplies=row[ |
||
549 | 'MemberDiscountApplies'] and True or False, |
||
550 | BulkDiscount=row['BulkDiscount'] and True or False, |
||
551 | TaxNumber=row.get('TaxNumber', ''), |
||
552 | AccountNumber=row.get('AccountNumber', '') |
||
553 | ) |
||
554 | self.fill_contactfields(row, obj) |
||
555 | self.fill_addressfields(row, obj) |
||
556 | obj.unmarkCreationFlag() |
||
557 | renameAfterCreation(obj) |
||
558 | notify(ObjectInitializedEvent(obj)) |
||
559 | |||
560 | |||
561 | class Client_Contacts(WorksheetImporter): |
||
562 | |||
563 | def Import(self): |
||
564 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
565 | pc = getToolByName(self.context, 'portal_catalog') |
||
566 | for row in self.get_rows(3): |
||
567 | client = pc(portal_type="Client", |
||
568 | getName=row['Client_title']) |
||
569 | if len(client) == 0: |
||
570 | client_contact = "%(Firstname)s %(Surname)s" % row |
||
571 | error = "Client invalid: '%s'. The Client Contact %s will not be uploaded." |
||
572 | logger.error(error, row['Client_title'], client_contact) |
||
573 | continue |
||
574 | client = client[0].getObject() |
||
575 | contact = _createObjectByType("Contact", client, tmpID()) |
||
576 | fullname = "%(Firstname)s %(Surname)s" % row |
||
577 | pub_pref = [x.strip() for x in |
||
578 | row.get('PublicationPreference', '').split(",")] |
||
579 | contact.edit( |
||
580 | Salutation=row.get('Salutation', ''), |
||
581 | Firstname=row.get('Firstname', ''), |
||
582 | Surname=row.get('Surname', ''), |
||
583 | Username=row['Username'], |
||
584 | JobTitle=row.get('JobTitle', ''), |
||
585 | Department=row.get('Department', ''), |
||
586 | PublicationPreference=pub_pref, |
||
587 | AttachmentsPermitted=row[ |
||
588 | 'AttachmentsPermitted'] and True or False, |
||
589 | ) |
||
590 | self.fill_contactfields(row, contact) |
||
591 | self.fill_addressfields(row, contact) |
||
592 | contact.unmarkCreationFlag() |
||
593 | renameAfterCreation(contact) |
||
594 | notify(ObjectInitializedEvent(contact)) |
||
595 | # CC Contacts |
||
596 | if row['CCContacts']: |
||
597 | names = [x.strip() for x in row['CCContacts'].split(",")] |
||
598 | for _fullname in names: |
||
599 | self.defer(src_obj=contact, |
||
600 | src_field='CCContact', |
||
601 | dest_catalog='portal_catalog', |
||
602 | dest_query={'portal_type': 'Contact', |
||
603 | 'getFullname': _fullname} |
||
604 | ) |
||
605 | ## Create Plone user |
||
606 | username = safe_unicode(row['Username']).encode('utf-8') |
||
607 | password = safe_unicode(row['Password']).encode('utf-8') |
||
608 | if(username): |
||
609 | try: |
||
610 | member = self.context.portal_registration.addMember( |
||
611 | username, |
||
612 | password, |
||
613 | properties={ |
||
614 | 'username': username, |
||
615 | 'email': row['EmailAddress'], |
||
616 | 'fullname': fullname} |
||
617 | ) |
||
618 | except Exception as msg: |
||
619 | logger.info("Error adding user (%s): %s" % (msg, username)) |
||
620 | contact.aq_parent.manage_setLocalRoles(row['Username'], ['Owner', ]) |
||
621 | contact.reindexObject() |
||
622 | # add user to Clients group |
||
623 | group = portal_groups.getGroupById('Clients') |
||
624 | group.addMember(username) |
||
625 | |||
626 | |||
627 | class Container_Types(WorksheetImporter): |
||
628 | |||
629 | View Code Duplication | def Import(self): |
|
|
|||
630 | folder = self.context.bika_setup.bika_containertypes |
||
631 | for row in self.get_rows(3): |
||
632 | if not row['title']: |
||
633 | continue |
||
634 | obj = _createObjectByType("ContainerType", folder, tmpID()) |
||
635 | obj.edit(title=row['title'], |
||
636 | description=row.get('description', '')) |
||
637 | obj.unmarkCreationFlag() |
||
638 | renameAfterCreation(obj) |
||
639 | notify(ObjectInitializedEvent(obj)) |
||
640 | |||
641 | |||
642 | class Preservations(WorksheetImporter): |
||
643 | |||
644 | def Import(self): |
||
645 | folder = self.context.bika_setup.bika_preservations |
||
646 | for row in self.get_rows(3): |
||
647 | if not row['title']: |
||
648 | continue |
||
649 | obj = _createObjectByType("Preservation", folder, tmpID()) |
||
650 | RP = { |
||
651 | 'days': int(row['RetentionPeriod_days'] and row['RetentionPeriod_days'] or 0), |
||
652 | 'hours': int(row['RetentionPeriod_hours'] and row['RetentionPeriod_hours'] or 0), |
||
653 | 'minutes': int(row['RetentionPeriod_minutes'] and row['RetentionPeriod_minutes'] or 0), |
||
654 | } |
||
655 | |||
656 | obj.edit(title=row['title'], |
||
657 | description=row.get('description', ''), |
||
658 | RetentionPeriod=RP) |
||
659 | obj.unmarkCreationFlag() |
||
660 | renameAfterCreation(obj) |
||
661 | notify(ObjectInitializedEvent(obj)) |
||
662 | |||
663 | |||
664 | class Containers(WorksheetImporter): |
||
665 | |||
666 | def Import(self): |
||
667 | folder = self.context.bika_setup.bika_containers |
||
668 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
669 | for row in self.get_rows(3): |
||
670 | if not row['title']: |
||
671 | continue |
||
672 | obj = _createObjectByType("Container", folder, tmpID()) |
||
673 | obj.edit( |
||
674 | title=row['title'], |
||
675 | description=row.get('description', ''), |
||
676 | Capacity=row.get('Capacity', 0), |
||
677 | PrePreserved=self.to_bool(row['PrePreserved']) |
||
678 | ) |
||
679 | if row['ContainerType_title']: |
||
680 | ct = self.get_object(bsc, 'ContainerType', row.get('ContainerType_title','')) |
||
681 | if ct: |
||
682 | obj.setContainerType(ct) |
||
683 | if row['Preservation_title']: |
||
684 | pres = self.get_object(bsc, 'Preservation',row.get('Preservation_title','')) |
||
685 | if pres: |
||
686 | obj.setPreservation(pres) |
||
687 | obj.unmarkCreationFlag() |
||
688 | renameAfterCreation(obj) |
||
689 | notify(ObjectInitializedEvent(obj)) |
||
690 | |||
691 | |||
692 | class Suppliers(WorksheetImporter): |
||
693 | |||
694 | def Import(self): |
||
695 | folder = self.context.bika_setup.bika_suppliers |
||
696 | for row in self.get_rows(3): |
||
697 | obj = _createObjectByType("Supplier", folder, tmpID()) |
||
698 | if row['Name']: |
||
699 | obj.edit( |
||
700 | Name=row.get('Name', ''), |
||
701 | TaxNumber=row.get('TaxNumber', ''), |
||
702 | AccountType=row.get('AccountType', {}), |
||
703 | AccountName=row.get('AccountName', {}), |
||
704 | AccountNumber=row.get('AccountNumber', ''), |
||
705 | BankName=row.get('BankName', ''), |
||
706 | BankBranch=row.get('BankBranch', ''), |
||
707 | SWIFTcode=row.get('SWIFTcode', ''), |
||
708 | IBN=row.get('IBN', ''), |
||
709 | NIB=row.get('NIB', ''), |
||
710 | Website=row.get('Website', ''), |
||
711 | ) |
||
712 | self.fill_contactfields(row, obj) |
||
713 | self.fill_addressfields(row, obj) |
||
714 | obj.unmarkCreationFlag() |
||
715 | renameAfterCreation(obj) |
||
716 | notify(ObjectInitializedEvent(obj)) |
||
717 | |||
718 | |||
719 | class Supplier_Contacts(WorksheetImporter): |
||
720 | |||
721 | def Import(self): |
||
722 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
723 | for row in self.get_rows(3): |
||
724 | if not row['Supplier_Name']: |
||
725 | continue |
||
726 | if not row['Firstname']: |
||
727 | continue |
||
728 | folder = bsc(portal_type="Supplier", |
||
729 | Title=row['Supplier_Name']) |
||
730 | if not folder: |
||
731 | continue |
||
732 | folder = folder[0].getObject() |
||
733 | obj = _createObjectByType("SupplierContact", folder, tmpID()) |
||
734 | obj.edit( |
||
735 | Firstname=row['Firstname'], |
||
736 | Surname=row.get('Surname', ''), |
||
737 | Username=row.get('Username') |
||
738 | ) |
||
739 | self.fill_contactfields(row, obj) |
||
740 | self.fill_addressfields(row, obj) |
||
741 | obj.unmarkCreationFlag() |
||
742 | renameAfterCreation(obj) |
||
743 | notify(ObjectInitializedEvent(obj)) |
||
744 | |||
745 | |||
746 | class Manufacturers(WorksheetImporter): |
||
747 | |||
748 | def Import(self): |
||
749 | folder = self.context.bika_setup.bika_manufacturers |
||
750 | for row in self.get_rows(3): |
||
751 | obj = _createObjectByType("Manufacturer", folder, tmpID()) |
||
752 | if row['title']: |
||
753 | obj.edit( |
||
754 | title=row['title'], |
||
755 | description=row.get('description', '') |
||
756 | ) |
||
757 | self.fill_addressfields(row, obj) |
||
758 | obj.unmarkCreationFlag() |
||
759 | renameAfterCreation(obj) |
||
760 | notify(ObjectInitializedEvent(obj)) |
||
761 | |||
762 | |||
763 | class Instrument_Types(WorksheetImporter): |
||
764 | |||
765 | def Import(self): |
||
766 | folder = self.context.bika_setup.bika_instrumenttypes |
||
767 | for row in self.get_rows(3): |
||
768 | obj = _createObjectByType("InstrumentType", folder, tmpID()) |
||
769 | obj.edit( |
||
770 | title=row['title'], |
||
771 | description=row.get('description', '')) |
||
772 | obj.unmarkCreationFlag() |
||
773 | renameAfterCreation(obj) |
||
774 | notify(ObjectInitializedEvent(obj)) |
||
775 | |||
776 | |||
777 | class Instruments(WorksheetImporter): |
||
778 | |||
779 | def Import(self): |
||
780 | folder = self.context.bika_setup.bika_instruments |
||
781 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
782 | pc = getToolByName(self.context, 'portal_catalog') |
||
783 | for row in self.get_rows(3): |
||
784 | if ('Type' not in row |
||
785 | or 'Supplier' not in row |
||
786 | or 'Brand' not in row): |
||
787 | logger.info("Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title','')) |
||
788 | continue |
||
789 | |||
790 | obj = _createObjectByType("Instrument", folder, tmpID()) |
||
791 | |||
792 | obj.edit( |
||
793 | title=row.get('title', ''), |
||
794 | AssetNumber=row.get('assetnumber', ''), |
||
795 | description=row.get('description', ''), |
||
796 | Type=row.get('Type', ''), |
||
797 | Brand=row.get('Brand', ''), |
||
798 | Model=row.get('Model', ''), |
||
799 | SerialNo=row.get('SerialNo', ''), |
||
800 | DataInterface=row.get('DataInterface', ''), |
||
801 | Location=row.get('Location', ''), |
||
802 | InstallationDate=row.get('Instalationdate', ''), |
||
803 | UserManualID=row.get('UserManualID', ''), |
||
804 | ) |
||
805 | instrumenttype = self.get_object(bsc, 'InstrumentType', title=row.get('Type')) |
||
806 | manufacturer = self.get_object(bsc, 'Manufacturer', title=row.get('Brand')) |
||
807 | supplier = self.get_object(bsc, 'Supplier', getName=row.get('Supplier', '')) |
||
808 | method = self.get_object(pc, 'Method', title=row.get('Method')) |
||
809 | obj.setInstrumentType(instrumenttype) |
||
810 | obj.setManufacturer(manufacturer) |
||
811 | obj.setSupplier(supplier) |
||
812 | if method: |
||
813 | obj.setMethods([method]) |
||
814 | obj.setMethod(method) |
||
815 | |||
816 | # Attaching the instrument's photo |
||
817 | View Code Duplication | if row.get('Photo', None): |
|
818 | path = resource_filename( |
||
819 | self.dataset_project, |
||
820 | "setupdata/%s/%s" % (self.dataset_name, |
||
821 | row['Photo']) |
||
822 | ) |
||
823 | try: |
||
824 | file_data = read_file(path) |
||
825 | obj.setPhoto(file_data) |
||
826 | except Exception as msg: |
||
827 | file_data = None |
||
828 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
829 | |||
830 | # Attaching the Installation Certificate if exists |
||
831 | View Code Duplication | if row.get('InstalationCertificate', None): |
|
832 | path = resource_filename( |
||
833 | self.dataset_project, |
||
834 | "setupdata/%s/%s" % (self.dataset_name, |
||
835 | row['InstalationCertificate']) |
||
836 | ) |
||
837 | try: |
||
838 | file_data = read_file(path) |
||
839 | obj.setInstallationCertificate(file_data) |
||
840 | except Exception as msg: |
||
841 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
842 | |||
843 | # Attaching the Instrument's manual if exists |
||
844 | if row.get('UserManualFile', None): |
||
845 | row_dict = {'DocumentID': row.get('UserManualID', 'manual'), |
||
846 | 'DocumentVersion': '', |
||
847 | 'DocumentLocation': '', |
||
848 | 'DocumentType': 'Manual', |
||
849 | 'File': row.get('UserManualFile', None) |
||
850 | } |
||
851 | addDocument(self, row_dict, obj) |
||
852 | obj.unmarkCreationFlag() |
||
853 | renameAfterCreation(obj) |
||
854 | notify(ObjectInitializedEvent(obj)) |
||
855 | |||
856 | |||
857 | View Code Duplication | class Instrument_Validations(WorksheetImporter): |
|
858 | |||
859 | def Import(self): |
||
860 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
861 | for row in self.get_rows(3): |
||
862 | if not row.get('instrument', None) or not row.get('title', None): |
||
863 | continue |
||
864 | |||
865 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
866 | if folder: |
||
867 | obj = _createObjectByType("InstrumentValidation", folder, tmpID()) |
||
868 | obj.edit( |
||
869 | title=row['title'], |
||
870 | DownFrom=row.get('downfrom', ''), |
||
871 | DownTo=row.get('downto', ''), |
||
872 | Validator=row.get('validator', ''), |
||
873 | Considerations=row.get('considerations', ''), |
||
874 | WorkPerformed=row.get('workperformed', ''), |
||
875 | Remarks=row.get('remarks', ''), |
||
876 | DateIssued=row.get('DateIssued', ''), |
||
877 | ReportID=row.get('ReportID', '') |
||
878 | ) |
||
879 | # Getting lab contacts |
||
880 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
881 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", is_active=True)] |
||
882 | for contact in lab_contacts: |
||
883 | if contact.getFullname() == row.get('Worker', ''): |
||
884 | obj.setWorker(contact.UID()) |
||
885 | obj.unmarkCreationFlag() |
||
886 | renameAfterCreation(obj) |
||
887 | notify(ObjectInitializedEvent(obj)) |
||
888 | |||
889 | |||
890 | View Code Duplication | class Instrument_Calibrations(WorksheetImporter): |
|
891 | |||
892 | def Import(self): |
||
893 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
894 | for row in self.get_rows(3): |
||
895 | if not row.get('instrument', None) or not row.get('title', None): |
||
896 | continue |
||
897 | |||
898 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
899 | if folder: |
||
900 | obj = _createObjectByType("InstrumentCalibration", folder, tmpID()) |
||
901 | obj.edit( |
||
902 | title=row['title'], |
||
903 | DownFrom=row.get('downfrom', ''), |
||
904 | DownTo=row.get('downto', ''), |
||
905 | Calibrator=row.get('calibrator', ''), |
||
906 | Considerations=row.get('considerations', ''), |
||
907 | WorkPerformed=row.get('workperformed', ''), |
||
908 | Remarks=row.get('remarks', ''), |
||
909 | DateIssued=row.get('DateIssued', ''), |
||
910 | ReportID=row.get('ReportID', '') |
||
911 | ) |
||
912 | # Gettinginstrument lab contacts |
||
913 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
914 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')] |
||
915 | for contact in lab_contacts: |
||
916 | if contact.getFullname() == row.get('Worker', ''): |
||
917 | obj.setWorker(contact.UID()) |
||
918 | obj.unmarkCreationFlag() |
||
919 | renameAfterCreation(obj) |
||
920 | notify(ObjectInitializedEvent(obj)) |
||
921 | |||
922 | |||
923 | class Instrument_Certifications(WorksheetImporter): |
||
924 | |||
925 | def Import(self): |
||
926 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
927 | for row in self.get_rows(3): |
||
928 | if not row['instrument'] or not row['title']: |
||
929 | continue |
||
930 | |||
931 | folder = self.get_object(bsc, 'Instrument', row.get('instrument','')) |
||
932 | if folder: |
||
933 | obj = _createObjectByType("InstrumentCertification", folder, tmpID()) |
||
934 | today = datetime.date.today() |
||
935 | certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \ |
||
936 | if row.get('validto', '') == '' else row.get('validto') |
||
937 | certificate_start_date = today.strftime('%d/%m/%Y') \ |
||
938 | if row.get('validfrom', '') == '' else row.get('validfrom') |
||
939 | obj.edit( |
||
940 | title=row['title'], |
||
941 | AssetNumber=row.get('assetnumber', ''), |
||
942 | Date=row.get('date', ''), |
||
943 | ValidFrom=certificate_start_date, |
||
944 | ValidTo=certificate_expire_date, |
||
945 | Agency=row.get('agency', ''), |
||
946 | Remarks=row.get('remarks', ''), |
||
947 | ) |
||
948 | # Attaching the Report Certificate if exists |
||
949 | View Code Duplication | if row.get('report', None): |
|
950 | path = resource_filename( |
||
951 | self.dataset_project, |
||
952 | "setupdata/%s/%s" % (self.dataset_name, |
||
953 | row['report']) |
||
954 | ) |
||
955 | try: |
||
956 | file_data = read_file(path) |
||
957 | obj.setDocument(file_data) |
||
958 | except Exception as msg: |
||
959 | file_data = None |
||
960 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
961 | |||
962 | # Getting lab contacts |
||
963 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
964 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')] |
||
965 | for contact in lab_contacts: |
||
966 | if contact.getFullname() == row.get('preparedby', ''): |
||
967 | obj.setPreparator(contact.UID()) |
||
968 | if contact.getFullname() == row.get('approvedby', ''): |
||
969 | obj.setValidator(contact.UID()) |
||
970 | obj.unmarkCreationFlag() |
||
971 | renameAfterCreation(obj) |
||
972 | notify(ObjectInitializedEvent(obj)) |
||
973 | |||
974 | |||
975 | class Instrument_Documents(WorksheetImporter): |
||
976 | |||
977 | def Import(self): |
||
978 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
979 | for row in self.get_rows(3): |
||
980 | if not row.get('instrument', ''): |
||
981 | continue |
||
982 | folder = self.get_object(bsc, 'Instrument', row.get('instrument', '')) |
||
983 | addDocument(self, row, folder) |
||
984 | |||
985 | def addDocument(self, row_dict, folder): |
||
986 | """ |
||
987 | This function adds a multifile object to the instrument folder |
||
988 | :param row_dict: the dictionary which contains the document information |
||
989 | :param folder: the instrument object |
||
990 | """ |
||
991 | if folder: |
||
992 | # This content type need a file |
||
993 | if row_dict.get('File', None): |
||
994 | path = resource_filename( |
||
995 | self.dataset_project, |
||
996 | "setupdata/%s/%s" % (self.dataset_name, |
||
997 | row_dict['File']) |
||
998 | ) |
||
999 | try: |
||
1000 | file_data = read_file(path) |
||
1001 | except Exception as msg: |
||
1002 | file_data = None |
||
1003 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
1004 | |||
1005 | # Obtain all created instrument documents content type |
||
1006 | catalog = getToolByName(self.context, 'bika_setup_catalog') |
||
1007 | documents_brains = catalog.searchResults({'portal_type': 'Multifile'}) |
||
1008 | # If a the new document has the same DocumentID as a created document, this object won't be created. |
||
1009 | idAlreadyInUse = False |
||
1010 | for item in documents_brains: |
||
1011 | if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''): |
||
1012 | warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \ |
||
1013 | "the file hasn't been upload." % (row_dict.get('DocumentID', ''), row_dict.get('instrument', '')) |
||
1014 | self.context.plone_utils.addPortalMessage(warning) |
||
1015 | idAlreadyInUse = True |
||
1016 | if not idAlreadyInUse: |
||
1017 | obj = _createObjectByType("Multifile", folder, tmpID()) |
||
1018 | obj.edit( |
||
1019 | DocumentID=row_dict.get('DocumentID', ''), |
||
1020 | DocumentVersion=row_dict.get('DocumentVersion', ''), |
||
1021 | DocumentLocation=row_dict.get('DocumentLocation', ''), |
||
1022 | DocumentType=row_dict.get('DocumentType', ''), |
||
1023 | File=file_data |
||
1024 | ) |
||
1025 | obj.unmarkCreationFlag() |
||
1026 | renameAfterCreation(obj) |
||
1027 | notify(ObjectInitializedEvent(obj)) |
||
1028 | |||
1029 | |||
1030 | class Instrument_Maintenance_Tasks(WorksheetImporter): |
||
1031 | |||
1032 | def Import(self): |
||
1033 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1034 | for row in self.get_rows(3): |
||
1035 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1036 | continue |
||
1037 | |||
1038 | folder = self.get_object(bsc, 'Instrument',row.get('instrument')) |
||
1039 | if folder: |
||
1040 | obj = _createObjectByType("InstrumentMaintenanceTask", folder, tmpID()) |
||
1041 | try: |
||
1042 | cost = "%.2f" % (row.get('cost', 0)) |
||
1043 | except: |
||
1044 | cost = row.get('cost', '0.0') |
||
1045 | |||
1046 | obj.edit( |
||
1047 | title=row['title'], |
||
1048 | description=row['description'], |
||
1049 | Type=row['type'], |
||
1050 | DownFrom=row.get('downfrom', ''), |
||
1051 | DownTo=row.get('downto', ''), |
||
1052 | Maintainer=row.get('maintaner', ''), |
||
1053 | Considerations=row.get('considerations', ''), |
||
1054 | WorkPerformed=row.get('workperformed', ''), |
||
1055 | Remarks=row.get('remarks', ''), |
||
1056 | Cost=cost, |
||
1057 | Closed=self.to_bool(row.get('closed')) |
||
1058 | ) |
||
1059 | obj.unmarkCreationFlag() |
||
1060 | renameAfterCreation(obj) |
||
1061 | notify(ObjectInitializedEvent(obj)) |
||
1062 | |||
1063 | |||
1064 | class Instrument_Schedule(WorksheetImporter): |
||
1065 | |||
1066 | def Import(self): |
||
1067 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1068 | for row in self.get_rows(3): |
||
1069 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1070 | continue |
||
1071 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
1072 | if folder: |
||
1073 | obj = _createObjectByType("InstrumentScheduledTask", folder, tmpID()) |
||
1074 | criteria = [ |
||
1075 | {'fromenabled': row.get('date', None) is not None, |
||
1076 | 'fromdate': row.get('date', ''), |
||
1077 | 'repeatenabled': ((row['numrepeats'] and |
||
1078 | row['numrepeats'] > 1) or |
||
1079 | (row['repeatuntil'] and |
||
1080 | len(row['repeatuntil']) > 0)), |
||
1081 | 'repeatunit': row.get('numrepeats', ''), |
||
1082 | 'repeatperiod': row.get('periodicity', ''), |
||
1083 | 'repeatuntilenabled': (row['repeatuntil'] and |
||
1084 | len(row['repeatuntil']) > 0), |
||
1085 | 'repeatuntil': row.get('repeatuntil')} |
||
1086 | ] |
||
1087 | obj.edit( |
||
1088 | title=row['title'], |
||
1089 | Type=row['type'], |
||
1090 | ScheduleCriteria=criteria, |
||
1091 | Considerations=row.get('considerations', ''), |
||
1092 | ) |
||
1093 | obj.unmarkCreationFlag() |
||
1094 | renameAfterCreation(obj) |
||
1095 | notify(ObjectInitializedEvent(obj)) |
||
1096 | |||
1097 | |||
1098 | class Sample_Matrices(WorksheetImporter): |
||
1099 | |||
1100 | View Code Duplication | def Import(self): |
|
1101 | folder = self.context.bika_setup.bika_samplematrices |
||
1102 | for row in self.get_rows(3): |
||
1103 | if not row['title']: |
||
1104 | continue |
||
1105 | obj = _createObjectByType("SampleMatrix", folder, tmpID()) |
||
1106 | obj.edit( |
||
1107 | title=row['title'], |
||
1108 | description=row.get('description', '') |
||
1109 | ) |
||
1110 | obj.unmarkCreationFlag() |
||
1111 | renameAfterCreation(obj) |
||
1112 | notify(ObjectInitializedEvent(obj)) |
||
1113 | |||
1114 | |||
1115 | class Batch_Labels(WorksheetImporter): |
||
1116 | |||
1117 | def Import(self): |
||
1118 | folder = self.context.bika_setup.bika_batchlabels |
||
1119 | for row in self.get_rows(3): |
||
1120 | if row['title']: |
||
1121 | obj = _createObjectByType("BatchLabel", folder, tmpID()) |
||
1122 | obj.edit(title=row['title']) |
||
1123 | obj.unmarkCreationFlag() |
||
1124 | renameAfterCreation(obj) |
||
1125 | notify(ObjectInitializedEvent(obj)) |
||
1126 | |||
1127 | |||
1128 | class Sample_Types(WorksheetImporter): |
||
1129 | |||
1130 | def Import(self): |
||
1131 | folder = self.context.bika_setup.bika_sampletypes |
||
1132 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1133 | for row in self.get_rows(3): |
||
1134 | if not row['title']: |
||
1135 | continue |
||
1136 | obj = _createObjectByType("SampleType", folder, tmpID()) |
||
1137 | samplematrix = self.get_object(bsc, 'SampleMatrix', |
||
1138 | row.get('SampleMatrix_title')) |
||
1139 | containertype = self.get_object(bsc, 'ContainerType', |
||
1140 | row.get('ContainerType_title')) |
||
1141 | retentionperiod = { |
||
1142 | 'days': row['RetentionPeriod'] if row['RetentionPeriod'] else 0, |
||
1143 | 'hours': 0, |
||
1144 | 'minutes': 0} |
||
1145 | obj.edit( |
||
1146 | title=row['title'], |
||
1147 | description=row.get('description', ''), |
||
1148 | RetentionPeriod=retentionperiod, |
||
1149 | Hazardous=self.to_bool(row['Hazardous']), |
||
1150 | SampleMatrix=samplematrix, |
||
1151 | Prefix=row['Prefix'], |
||
1152 | MinimumVolume=row['MinimumVolume'], |
||
1153 | ContainerType=containertype |
||
1154 | ) |
||
1155 | samplepoint = self.get_object(bsc, 'SamplePoint', |
||
1156 | row.get('SamplePoint_title')) |
||
1157 | if samplepoint: |
||
1158 | obj.setSamplePoints([samplepoint, ]) |
||
1159 | obj.unmarkCreationFlag() |
||
1160 | renameAfterCreation(obj) |
||
1161 | notify(ObjectInitializedEvent(obj)) |
||
1162 | |||
1163 | |||
1164 | class Sample_Points(WorksheetImporter): |
||
1165 | |||
1166 | def Import(self): |
||
1167 | setup_folder = self.context.bika_setup.bika_samplepoints |
||
1168 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1169 | pc = getToolByName(self.context, 'portal_catalog') |
||
1170 | for row in self.get_rows(3): |
||
1171 | if not row['title']: |
||
1172 | continue |
||
1173 | if row['Client_title']: |
||
1174 | client_title = row['Client_title'] |
||
1175 | client = pc(portal_type="Client", getName=client_title) |
||
1176 | if len(client) == 0: |
||
1177 | error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded." |
||
1178 | logger.error(error, row['title'], client_title) |
||
1179 | continue |
||
1180 | folder = client[0].getObject() |
||
1181 | else: |
||
1182 | folder = setup_folder |
||
1183 | |||
1184 | if row['Latitude']: |
||
1185 | logger.log("Ignored SamplePoint Latitude", 'error') |
||
1186 | if row['Longitude']: |
||
1187 | logger.log("Ignored SamplePoint Longitude", 'error') |
||
1188 | |||
1189 | obj = _createObjectByType("SamplePoint", folder, tmpID()) |
||
1190 | obj.edit( |
||
1191 | title=row['title'], |
||
1192 | description=row.get('description', ''), |
||
1193 | Composite=self.to_bool(row['Composite']), |
||
1194 | Elevation=row['Elevation'], |
||
1195 | ) |
||
1196 | sampletype = self.get_object(bsc, 'SampleType', |
||
1197 | row.get('SampleType_title')) |
||
1198 | if sampletype: |
||
1199 | obj.setSampleTypes([sampletype, ]) |
||
1200 | obj.unmarkCreationFlag() |
||
1201 | renameAfterCreation(obj) |
||
1202 | notify(ObjectInitializedEvent(obj)) |
||
1203 | |||
1204 | |||
1205 | class Sample_Point_Sample_Types(WorksheetImporter): |
||
1206 | |||
1207 | def Import(self): |
||
1208 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1209 | for row in self.get_rows(3): |
||
1210 | sampletype = self.get_object(bsc, |
||
1211 | 'SampleType', |
||
1212 | row.get('SampleType_title')) |
||
1213 | samplepoint = self.get_object(bsc, |
||
1214 | 'SamplePoint', |
||
1215 | row['SamplePoint_title']) |
||
1216 | if samplepoint: |
||
1217 | sampletypes = samplepoint.getSampleTypes() |
||
1218 | if sampletype not in sampletypes: |
||
1219 | sampletypes.append(sampletype) |
||
1220 | samplepoint.setSampleTypes(sampletypes) |
||
1221 | |||
1222 | if sampletype: |
||
1223 | samplepoints = sampletype.getSamplePoints() |
||
1224 | if samplepoint not in samplepoints: |
||
1225 | samplepoints.append(samplepoint) |
||
1226 | sampletype.setSamplePoints(samplepoints) |
||
1227 | |||
1228 | class Storage_Locations(WorksheetImporter): |
||
1229 | |||
1230 | def Import(self): |
||
1231 | setup_folder = self.context.bika_setup.bika_storagelocations |
||
1232 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1233 | pc = getToolByName(self.context, 'portal_catalog') |
||
1234 | for row in self.get_rows(3): |
||
1235 | if not row['Address']: |
||
1236 | continue |
||
1237 | |||
1238 | obj = _createObjectByType("StorageLocation", setup_folder, tmpID()) |
||
1239 | obj.edit( |
||
1240 | title=row['Address'], |
||
1241 | SiteTitle=row['SiteTitle'], |
||
1242 | SiteCode=row['SiteCode'], |
||
1243 | SiteDescription=row['SiteDescription'], |
||
1244 | LocationTitle=row['LocationTitle'], |
||
1245 | LocationCode=row['LocationCode'], |
||
1246 | LocationDescription=row['LocationDescription'], |
||
1247 | LocationType=row['LocationType'], |
||
1248 | ShelfTitle=row['ShelfTitle'], |
||
1249 | ShelfCode=row['ShelfCode'], |
||
1250 | ShelfDescription=row['ShelfDescription'], |
||
1251 | ) |
||
1252 | obj.unmarkCreationFlag() |
||
1253 | renameAfterCreation(obj) |
||
1254 | notify(ObjectInitializedEvent(obj)) |
||
1255 | |||
1256 | |||
1257 | class Sample_Conditions(WorksheetImporter): |
||
1258 | |||
1259 | View Code Duplication | def Import(self): |
|
1260 | folder = self.context.bika_setup.bika_sampleconditions |
||
1261 | for row in self.get_rows(3): |
||
1262 | if row['title']: |
||
1263 | obj = _createObjectByType("SampleCondition", folder, tmpID()) |
||
1264 | obj.edit( |
||
1265 | title=row['title'], |
||
1266 | description=row.get('description', '') |
||
1267 | ) |
||
1268 | obj.unmarkCreationFlag() |
||
1269 | renameAfterCreation(obj) |
||
1270 | notify(ObjectInitializedEvent(obj)) |
||
1271 | |||
1272 | |||
1273 | class Analysis_Categories(WorksheetImporter): |
||
1274 | |||
1275 | def Import(self): |
||
1276 | folder = self.context.bika_setup.bika_analysiscategories |
||
1277 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1278 | for row in self.get_rows(3): |
||
1279 | department = None |
||
1280 | if row.get('Department_title', None): |
||
1281 | department = self.get_object(bsc, 'Department', |
||
1282 | row.get('Department_title')) |
||
1283 | if row.get('title', None) and department: |
||
1284 | obj = _createObjectByType("AnalysisCategory", folder, tmpID()) |
||
1285 | obj.edit( |
||
1286 | title=row['title'], |
||
1287 | description=row.get('description', '')) |
||
1288 | obj.setDepartment(department) |
||
1289 | obj.unmarkCreationFlag() |
||
1290 | renameAfterCreation(obj) |
||
1291 | notify(ObjectInitializedEvent(obj)) |
||
1292 | elif not row.get('title', None): |
||
1293 | logger.warning("Error in in " + self.sheetname + ". Missing Title field") |
||
1294 | elif not row.get('Department_title', None): |
||
1295 | logger.warning("Error in " + self.sheetname + ". Department field missing.") |
||
1296 | else: |
||
1297 | logger.warning("Error in " + self.sheetname + ". Department " |
||
1298 | + row.get('Department_title') + "is wrong.") |
||
1299 | |||
1300 | |||
1301 | class Methods(WorksheetImporter): |
||
1302 | |||
1303 | def Import(self): |
||
1304 | folder = self.context.methods |
||
1305 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1306 | for row in self.get_rows(3): |
||
1307 | if row['title']: |
||
1308 | calculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title')) |
||
1309 | obj = _createObjectByType("Method", folder, tmpID()) |
||
1310 | obj.edit( |
||
1311 | title=row['title'], |
||
1312 | description=row.get('description', ''), |
||
1313 | Instructions=row.get('Instructions', ''), |
||
1314 | ManualEntryOfResults=row.get('ManualEntryOfResults', True), |
||
1315 | Calculation=calculation, |
||
1316 | MethodID=row.get('MethodID', ''), |
||
1317 | Accredited=row.get('Accredited', True), |
||
1318 | ) |
||
1319 | # Obtain all created methods |
||
1320 | catalog = getToolByName(self.context, 'portal_catalog') |
||
1321 | methods_brains = catalog.searchResults({'portal_type': 'Method'}) |
||
1322 | # If a the new method has the same MethodID as a created method, remove MethodID value. |
||
1323 | for methods in methods_brains: |
||
1324 | if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']: |
||
1325 | obj.edit(MethodID='') |
||
1326 | |||
1327 | View Code Duplication | if row['MethodDocument']: |
|
1328 | path = resource_filename( |
||
1329 | self.dataset_project, |
||
1330 | "setupdata/%s/%s" % (self.dataset_name, |
||
1331 | row['MethodDocument']) |
||
1332 | ) |
||
1333 | try: |
||
1334 | file_data = read_file(path) |
||
1335 | obj.setMethodDocument(file_data) |
||
1336 | except Exception as msg: |
||
1337 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
1338 | |||
1339 | obj.unmarkCreationFlag() |
||
1340 | renameAfterCreation(obj) |
||
1341 | notify(ObjectInitializedEvent(obj)) |
||
1342 | |||
1343 | |||
1344 | class Sampling_Deviations(WorksheetImporter): |
||
1345 | |||
1346 | View Code Duplication | def Import(self): |
|
1347 | folder = self.context.bika_setup.bika_samplingdeviations |
||
1348 | for row in self.get_rows(3): |
||
1349 | if row['title']: |
||
1350 | obj = _createObjectByType("SamplingDeviation", folder, tmpID()) |
||
1351 | obj.edit( |
||
1352 | title=row['title'], |
||
1353 | description=row.get('description', '') |
||
1354 | ) |
||
1355 | obj.unmarkCreationFlag() |
||
1356 | renameAfterCreation(obj) |
||
1357 | notify(ObjectInitializedEvent(obj)) |
||
1358 | |||
1359 | |||
1360 | class Calculations(WorksheetImporter): |
||
1361 | |||
1362 | def get_interim_fields(self): |
||
1363 | # preload Calculation Interim Fields sheet |
||
1364 | sheetname = 'Calculation Interim Fields' |
||
1365 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1366 | if not worksheet: |
||
1367 | return |
||
1368 | self.interim_fields = {} |
||
1369 | rows = self.get_rows(3, worksheet=worksheet) |
||
1370 | for row in rows: |
||
1371 | calc_title = row['Calculation_title'] |
||
1372 | if calc_title not in self.interim_fields.keys(): |
||
1373 | self.interim_fields[calc_title] = [] |
||
1374 | self.interim_fields[calc_title].append({ |
||
1375 | 'keyword': row['keyword'], |
||
1376 | 'title': row.get('title', ''), |
||
1377 | 'type': 'int', |
||
1378 | 'hidden': ('hidden' in row and row['hidden']) and True or False, |
||
1379 | 'value': row['value'], |
||
1380 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1381 | |||
1382 | def Import(self): |
||
1383 | self.get_interim_fields() |
||
1384 | folder = self.context.bika_setup.bika_calculations |
||
1385 | for row in self.get_rows(3): |
||
1386 | if not row['title']: |
||
1387 | continue |
||
1388 | calc_title = row['title'] |
||
1389 | calc_interims = self.interim_fields.get(calc_title, []) |
||
1390 | formula = row['Formula'] |
||
1391 | # scan formula for dep services |
||
1392 | keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula) |
||
1393 | # remove interims from deps |
||
1394 | interim_keys = [k['keyword'] for k in calc_interims] |
||
1395 | dep_keywords = [k for k in keywords if k not in interim_keys] |
||
1396 | |||
1397 | obj = _createObjectByType("Calculation", folder, tmpID()) |
||
1398 | obj.edit( |
||
1399 | title=calc_title, |
||
1400 | description=row.get('description', ''), |
||
1401 | InterimFields=calc_interims, |
||
1402 | Formula=str(row['Formula']) |
||
1403 | ) |
||
1404 | for kw in dep_keywords: |
||
1405 | self.defer(src_obj=obj, |
||
1406 | src_field='DependentServices', |
||
1407 | dest_catalog='bika_setup_catalog', |
||
1408 | dest_query={'portal_type': 'AnalysisService', |
||
1409 | 'getKeyword': kw} |
||
1410 | ) |
||
1411 | obj.unmarkCreationFlag() |
||
1412 | renameAfterCreation(obj) |
||
1413 | notify(ObjectInitializedEvent(obj)) |
||
1414 | |||
1415 | # Now we have the calculations registered, try to assign default calcs |
||
1416 | # to methods |
||
1417 | sheet = self.workbook.get_sheet_by_name("Methods") |
||
1418 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1419 | for row in self.get_rows(3, sheet): |
||
1420 | if row.get('title', '') and row.get('Calculation_title', ''): |
||
1421 | meth = self.get_object(bsc, "Method", row.get('title')) |
||
1422 | if meth and not meth.getCalculation(): |
||
1423 | calctit = safe_unicode(row['Calculation_title']).encode('utf-8') |
||
1424 | calc = self.get_object(bsc, "Calculation", calctit) |
||
1425 | if calc: |
||
1426 | meth.setCalculation(calc.UID()) |
||
1427 | |||
1428 | |||
1429 | class Analysis_Services(WorksheetImporter): |
||
1430 | |||
1431 | def load_interim_fields(self): |
||
1432 | # preload AnalysisService InterimFields sheet |
||
1433 | sheetname = 'AnalysisService InterimFields' |
||
1434 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1435 | if not worksheet: |
||
1436 | return |
||
1437 | self.service_interims = {} |
||
1438 | rows = self.get_rows(3, worksheet=worksheet) |
||
1439 | for row in rows: |
||
1440 | service_title = row['Service_title'] |
||
1441 | if service_title not in self.service_interims.keys(): |
||
1442 | self.service_interims[service_title] = [] |
||
1443 | self.service_interims[service_title].append({ |
||
1444 | 'keyword': row['keyword'], |
||
1445 | 'title': row.get('title', ''), |
||
1446 | 'type': 'int', |
||
1447 | 'value': row['value'], |
||
1448 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1449 | |||
1450 | def load_result_options(self): |
||
1451 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1452 | sheetname = 'AnalysisService ResultOptions' |
||
1453 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1454 | if not worksheet: |
||
1455 | return |
||
1456 | for row in self.get_rows(3, worksheet=worksheet): |
||
1457 | service = self.get_object(bsc, 'AnalysisService', |
||
1458 | row.get('Service_title')) |
||
1459 | if not service: |
||
1460 | return |
||
1461 | sro = service.getResultOptions() |
||
1462 | sro.append({'ResultValue': row['ResultValue'], |
||
1463 | 'ResultText': row['ResultText']}) |
||
1464 | service.setResultOptions(sro) |
||
1465 | |||
1466 | def load_service_uncertainties(self): |
||
1467 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1468 | sheetname = 'AnalysisService Uncertainties' |
||
1469 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1470 | if not worksheet: |
||
1471 | return |
||
1472 | |||
1473 | bucket = {} |
||
1474 | count = 0 |
||
1475 | for row in self.get_rows(3, worksheet=worksheet): |
||
1476 | count += 1 |
||
1477 | service = self.get_object(bsc, 'AnalysisService', |
||
1478 | row.get('Service_title')) |
||
1479 | if not service: |
||
1480 | warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get('Service_title') |
||
1481 | logger.warning(warning) |
||
1482 | continue |
||
1483 | service_uid = service.UID() |
||
1484 | if service_uid not in bucket: |
||
1485 | bucket[service_uid] = [] |
||
1486 | bucket[service_uid].append( |
||
1487 | {'intercept_min': row['Range Min'], |
||
1488 | 'intercept_max': row['Range Max'], |
||
1489 | 'errorvalue': row['Uncertainty Value']} |
||
1490 | ) |
||
1491 | if count > 500: |
||
1492 | self.write_bucket(bucket) |
||
1493 | bucket = {} |
||
1494 | if bucket: |
||
1495 | self.write_bucket(bucket) |
||
1496 | |||
1497 | def get_methods(self, service_title, default_method): |
||
1498 | """ Return an array of objects of the type Method in accordance to the |
||
1499 | methods listed in the 'AnalysisService Methods' sheet and service |
||
1500 | set in the parameter service_title. |
||
1501 | If default_method is set, it will be included in the returned |
||
1502 | array. |
||
1503 | """ |
||
1504 | return self.get_relations(service_title, |
||
1505 | default_method, |
||
1506 | 'Method', |
||
1507 | 'portal_catalog', |
||
1508 | 'AnalysisService Methods', |
||
1509 | 'Method_title') |
||
1510 | |||
1511 | def get_instruments(self, service_title, default_instrument): |
||
1512 | """ Return an array of objects of the type Instrument in accordance to |
||
1513 | the instruments listed in the 'AnalysisService Instruments' sheet |
||
1514 | and service set in the parameter 'service_title'. |
||
1515 | If default_instrument is set, it will be included in the returned |
||
1516 | array. |
||
1517 | """ |
||
1518 | return self.get_relations(service_title, |
||
1519 | default_instrument, |
||
1520 | 'Instrument', |
||
1521 | 'bika_setup_catalog', |
||
1522 | 'AnalysisService Instruments', |
||
1523 | 'Instrument_title') |
||
1524 | |||
1525 | def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column): |
||
1526 | """ Return an array of objects of the specified type in accordance to |
||
1527 | the object titles defined in the sheet specified in 'sheet_name' and |
||
1528 | service set in the paramenter 'service_title'. |
||
1529 | If a default_obj is set, it will be included in the returned array. |
||
1530 | """ |
||
1531 | out_objects = [default_obj] if default_obj else [] |
||
1532 | cat = getToolByName(self.context, catalog_name) |
||
1533 | worksheet = self.workbook.get_sheet_by_name(sheet_name) |
||
1534 | if not worksheet: |
||
1535 | return out_objects |
||
1536 | for row in self.get_rows(3, worksheet=worksheet): |
||
1537 | row_as_title = row.get('Service_title') |
||
1538 | if not row_as_title: |
||
1539 | return out_objects |
||
1540 | elif row_as_title != service_title: |
||
1541 | continue |
||
1542 | obj = self.get_object(cat, obj_type, row.get(column)) |
||
1543 | if obj: |
||
1544 | if default_obj and default_obj.UID() == obj.UID(): |
||
1545 | continue |
||
1546 | out_objects.append(obj) |
||
1547 | return out_objects |
||
1548 | |||
1549 | def write_bucket(self, bucket): |
||
1550 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1551 | for service_uid, uncertainties in bucket.items(): |
||
1552 | obj = bsc(UID=service_uid)[0].getObject() |
||
1553 | _uncert = list(obj.getUncertainties()) |
||
1554 | _uncert.extend(uncertainties) |
||
1555 | obj.setUncertainties(_uncert) |
||
1556 | |||
1557 | def Import(self): |
||
1558 | self.load_interim_fields() |
||
1559 | folder = self.context.bika_setup.bika_analysisservices |
||
1560 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1561 | pc = getToolByName(self.context, 'portal_catalog') |
||
1562 | for row in self.get_rows(3): |
||
1563 | if not row['title']: |
||
1564 | continue |
||
1565 | |||
1566 | obj = _createObjectByType("AnalysisService", folder, tmpID()) |
||
1567 | MTA = { |
||
1568 | 'days': self.to_int(row.get('MaxTimeAllowed_days',0),0), |
||
1569 | 'hours': self.to_int(row.get('MaxTimeAllowed_hours',0),0), |
||
1570 | 'minutes': self.to_int(row.get('MaxTimeAllowed_minutes',0),0), |
||
1571 | } |
||
1572 | category = self.get_object(bsc, 'AnalysisCategory', row.get('AnalysisCategory_title')) |
||
1573 | department = self.get_object(bsc, 'Department', row.get('Department_title')) |
||
1574 | container = self.get_object(bsc, 'Container', row.get('Container_title')) |
||
1575 | preservation = self.get_object(bsc, 'Preservation', row.get('Preservation_title')) |
||
1576 | |||
1577 | # Analysis Service - Method considerations: |
||
1578 | # One Analysis Service can have 0 or n Methods associated (field |
||
1579 | # 'Methods' from the Schema). |
||
1580 | # If the Analysis Service has at least one method associated, then |
||
1581 | # one of those methods can be set as the defualt method (field |
||
1582 | # '_Method' from the Schema). |
||
1583 | # |
||
1584 | # To make it easier, if a DefaultMethod is declared in the |
||
1585 | # Analysis_Services spreadsheet, but the same AS has no method |
||
1586 | # associated in the Analysis_Service_Methods spreadsheet, then make |
||
1587 | # the assumption that the DefaultMethod set in the former has to be |
||
1588 | # associated to the AS although the relation is missing. |
||
1589 | defaultmethod = self.get_object(pc, 'Method', row.get('DefaultMethod_title')) |
||
1590 | methods = self.get_methods(row['title'], defaultmethod) |
||
1591 | if not defaultmethod and methods: |
||
1592 | defaultmethod = methods[0] |
||
1593 | |||
1594 | # Analysis Service - Instrument considerations: |
||
1595 | # By default, an Analysis Services will be associated automatically |
||
1596 | # with several Instruments due to the Analysis Service - Methods |
||
1597 | # relation (an Instrument can be assigned to a Method and one Method |
||
1598 | # can have zero or n Instruments associated). There is no need to |
||
1599 | # set this assignment directly, the AnalysisService object will |
||
1600 | # find those instruments. |
||
1601 | # Besides this 'automatic' behavior, an Analysis Service can also |
||
1602 | # have 0 or n Instruments manually associated ('Instruments' field). |
||
1603 | # In this case, the attribute 'AllowInstrumentEntryOfResults' should |
||
1604 | # be set to True. |
||
1605 | # |
||
1606 | # To make it easier, if a DefaultInstrument is declared in the |
||
1607 | # Analysis_Services spreadsheet, but the same AS has no instrument |
||
1608 | # associated in the AnalysisService_Instruments spreadsheet, then |
||
1609 | # make the assumption the DefaultInstrument set in the former has |
||
1610 | # to be associated to the AS although the relation is missing and |
||
1611 | # the option AllowInstrumentEntryOfResults will be set to True. |
||
1612 | defaultinstrument = self.get_object(bsc, 'Instrument', row.get('DefaultInstrument_title')) |
||
1613 | instruments = self.get_instruments(row['title'], defaultinstrument) |
||
1614 | allowinstrentry = True if instruments else False |
||
1615 | if not defaultinstrument and instruments: |
||
1616 | defaultinstrument = instruments[0] |
||
1617 | |||
1618 | # The manual entry of results can only be set to false if the value |
||
1619 | # for the attribute "InstrumentEntryOfResults" is False. |
||
1620 | allowmanualentry = True if not allowinstrentry else row.get('ManualEntryOfResults', True) |
||
1621 | |||
1622 | # Analysis Service - Calculation considerations: |
||
1623 | # By default, the AnalysisService will use the Calculation associated |
||
1624 | # to the Default Method (the field "UseDefaultCalculation"==True). |
||
1625 | # If the Default Method for this AS doesn't have any Calculation |
||
1626 | # associated and the field "UseDefaultCalculation" is True, no |
||
1627 | # Calculation will be used for this AS ("_Calculation" field is |
||
1628 | # reserved and should not be set directly). |
||
1629 | # |
||
1630 | # To make it easier, if a Calculation is set by default in the |
||
1631 | # spreadsheet, then assume the UseDefaultCalculation has to be set |
||
1632 | # to False. |
||
1633 | deferredcalculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title')) |
||
1634 | usedefaultcalculation = False if deferredcalculation else True |
||
1635 | _calculation = deferredcalculation if deferredcalculation else \ |
||
1636 | (defaultmethod.getCalculation() if defaultmethod else None) |
||
1637 | |||
1638 | obj.edit( |
||
1639 | title=row['title'], |
||
1640 | ShortTitle=row.get('ShortTitle', row['title']), |
||
1641 | description=row.get('description', ''), |
||
1642 | Keyword=row['Keyword'], |
||
1643 | PointOfCapture=row['PointOfCapture'].lower(), |
||
1644 | Category=category, |
||
1645 | Department=department, |
||
1646 | AttachmentOption=row.get('Attachment', '')[0].lower() if row.get('Attachment', '') else 'p', |
||
1647 | Unit=row['Unit'] and row['Unit'] or None, |
||
1648 | Precision=row['Precision'] and str(row['Precision']) or '0', |
||
1649 | ExponentialFormatPrecision=str(self.to_int(row.get('ExponentialFormatPrecision',7),7)), |
||
1650 | LowerDetectionLimit='%06f' % self.to_float(row.get('LowerDetectionLimit', '0.0'), 0), |
||
1651 | UpperDetectionLimit='%06f' % self.to_float(row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0), |
||
1652 | DetectionLimitSelector=self.to_bool(row.get('DetectionLimitSelector',0)), |
||
1653 | MaxTimeAllowed=MTA, |
||
1654 | Price="%02f" % Float(row['Price']), |
||
1655 | BulkPrice="%02f" % Float(row['BulkPrice']), |
||
1656 | VAT="%02f" % Float(row['VAT']), |
||
1657 | _Method=defaultmethod, |
||
1658 | Methods=methods, |
||
1659 | ManualEntryOfResults=allowmanualentry, |
||
1660 | InstrumentEntryOfResults=allowinstrentry, |
||
1661 | Instruments=instruments, |
||
1662 | Calculation=_calculation, |
||
1663 | UseDefaultCalculation=usedefaultcalculation, |
||
1664 | DuplicateVariation="%02f" % Float(row['DuplicateVariation']), |
||
1665 | Accredited=self.to_bool(row['Accredited']), |
||
1666 | InterimFields=hasattr(self, 'service_interims') and self.service_interims.get( |
||
1667 | row['title'], []) or [], |
||
1668 | Separate=self.to_bool(row.get('Separate', False)), |
||
1669 | Container=container, |
||
1670 | Preservation=preservation, |
||
1671 | CommercialID=row.get('CommercialID', ''), |
||
1672 | ProtocolID=row.get('ProtocolID', '') |
||
1673 | ) |
||
1674 | obj.unmarkCreationFlag() |
||
1675 | renameAfterCreation(obj) |
||
1676 | notify(ObjectInitializedEvent(obj)) |
||
1677 | self.load_result_options() |
||
1678 | self.load_service_uncertainties() |
||
1679 | |||
1680 | |||
1681 | class Analysis_Specifications(WorksheetImporter): |
||
1682 | |||
1683 | def resolve_service(self, row): |
||
1684 | bsc = getToolByName(self.context, "bika_setup_catalog") |
||
1685 | service = bsc( |
||
1686 | portal_type="AnalysisService", |
||
1687 | title=safe_unicode(row["service"]) |
||
1688 | ) |
||
1689 | if not service: |
||
1690 | service = bsc( |
||
1691 | portal_type="AnalysisService", |
||
1692 | getKeyword=safe_unicode(row["service"]) |
||
1693 | ) |
||
1694 | service = service[0].getObject() |
||
1695 | return service |
||
1696 | |||
1697 | def Import(self): |
||
1698 | s_t = "" |
||
1699 | bucket = {} |
||
1700 | pc = getToolByName(self.context, "portal_catalog") |
||
1701 | bsc = getToolByName(self.context, "bika_setup_catalog") |
||
1702 | # collect up all values into the bucket |
||
1703 | for row in self.get_rows(3): |
||
1704 | title = row.get("Title", False) |
||
1705 | if not title: |
||
1706 | title = row.get("title", False) |
||
1707 | if not title: |
||
1708 | continue |
||
1709 | parent = row["Client_title"] if row["Client_title"] else "lab" |
||
1710 | st = row["SampleType_title"] if row["SampleType_title"] else "" |
||
1711 | service = self.resolve_service(row) |
||
1712 | |||
1713 | if parent not in bucket: |
||
1714 | bucket[parent] = {} |
||
1715 | if title not in bucket[parent]: |
||
1716 | bucket[parent][title] = {"sampletype": st, "resultsrange": []} |
||
1717 | bucket[parent][title]["resultsrange"].append({ |
||
1718 | "keyword": service.getKeyword(), |
||
1719 | "min": row["min"] if row["min"] else "0", |
||
1720 | "max": row["max"] if row["max"] else "0", |
||
1721 | "error": row["error"] if row["error"] else "0" |
||
1722 | }) |
||
1723 | # write objects. |
||
1724 | for parent in bucket.keys(): |
||
1725 | for title in bucket[parent]: |
||
1726 | if parent == "lab": |
||
1727 | folder = self.context.bika_setup.bika_analysisspecs |
||
1728 | else: |
||
1729 | proxy = pc(portal_type="Client", getName=safe_unicode(parent))[0] |
||
1730 | folder = proxy.getObject() |
||
1731 | st = bucket[parent][title]["sampletype"] |
||
1732 | resultsrange = bucket[parent][title]["resultsrange"] |
||
1733 | if st: |
||
1734 | st_uid = bsc(portal_type="SampleType", title=safe_unicode(st))[0].UID |
||
1735 | obj = _createObjectByType("AnalysisSpec", folder, tmpID()) |
||
1736 | obj.edit(title=title) |
||
1737 | obj.setResultsRange(resultsrange) |
||
1738 | if st: |
||
1739 | obj.setSampleType(st_uid) |
||
1740 | obj.unmarkCreationFlag() |
||
1741 | renameAfterCreation(obj) |
||
1742 | notify(ObjectInitializedEvent(obj)) |
||
1743 | |||
1744 | |||
1745 | class Analysis_Profiles(WorksheetImporter): |
||
1746 | |||
1747 | def load_analysis_profile_services(self): |
||
1748 | sheetname = 'Analysis Profile Services' |
||
1749 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1750 | self.profile_services = {} |
||
1751 | if not worksheet: |
||
1752 | return |
||
1753 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1754 | for row in self.get_rows(3, worksheet=worksheet): |
||
1755 | if not row.get('Profile','') or not row.get('Service',''): |
||
1756 | continue |
||
1757 | if row['Profile'] not in self.profile_services.keys(): |
||
1758 | self.profile_services[row['Profile']] = [] |
||
1759 | # Here we match againts Keyword or Title. |
||
1760 | # XXX We need a utility for this kind of thing. |
||
1761 | service = self.get_object(bsc, 'AnalysisService', row.get('Service')) |
||
1762 | if not service: |
||
1763 | service = bsc(portal_type='AnalysisService', |
||
1764 | getKeyword=row['Service'])[0].getObject() |
||
1765 | self.profile_services[row['Profile']].append(service) |
||
1766 | |||
1767 | def Import(self): |
||
1768 | self.load_analysis_profile_services() |
||
1769 | folder = self.context.bika_setup.bika_analysisprofiles |
||
1770 | for row in self.get_rows(3): |
||
1771 | if row['title']: |
||
1772 | obj = _createObjectByType("AnalysisProfile", folder, tmpID()) |
||
1773 | obj.edit(title=row['title'], |
||
1774 | description=row.get('description', ''), |
||
1775 | ProfileKey=row['ProfileKey'], |
||
1776 | CommercialID=row.get('CommercialID', ''), |
||
1777 | AnalysisProfilePrice="%02f" % Float(row.get('AnalysisProfilePrice', '0.0')), |
||
1778 | AnalysisProfileVAT="%02f" % Float(row.get('AnalysisProfileVAT', '0.0')), |
||
1779 | UseAnalysisProfilePrice=row.get('UseAnalysisProfilePrice', False) |
||
1780 | ) |
||
1781 | obj.setService(self.profile_services[row['title']]) |
||
1782 | obj.unmarkCreationFlag() |
||
1783 | renameAfterCreation(obj) |
||
1784 | notify(ObjectInitializedEvent(obj)) |
||
1785 | |||
1786 | |||
1787 | class AR_Templates(WorksheetImporter): |
||
1788 | |||
1789 | View Code Duplication | def load_artemplate_analyses(self): |
|
1790 | sheetname = 'AR Template Analyses' |
||
1791 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1792 | self.artemplate_analyses = {} |
||
1793 | if not worksheet: |
||
1794 | return |
||
1795 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1796 | for row in self.get_rows(3, worksheet=worksheet): |
||
1797 | # XXX service_uid is not a uid |
||
1798 | service = self.get_object(bsc, 'AnalysisService', |
||
1799 | row.get('service_uid')) |
||
1800 | if row['ARTemplate'] not in self.artemplate_analyses.keys(): |
||
1801 | self.artemplate_analyses[row['ARTemplate']] = [] |
||
1802 | self.artemplate_analyses[row['ARTemplate']].append( |
||
1803 | {'service_uid': service.UID(), |
||
1804 | 'partition': row['partition'] |
||
1805 | } |
||
1806 | ) |
||
1807 | |||
1808 | def load_artemplate_partitions(self): |
||
1809 | sheetname = 'AR Template Partitions' |
||
1810 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1811 | self.artemplate_partitions = {} |
||
1812 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1813 | if not worksheet: |
||
1814 | return |
||
1815 | for row in self.get_rows(3, worksheet=worksheet): |
||
1816 | if row['ARTemplate'] not in self.artemplate_partitions.keys(): |
||
1817 | self.artemplate_partitions[row['ARTemplate']] = [] |
||
1818 | container = self.get_object(bsc, 'Container', |
||
1819 | row.get('container')) |
||
1820 | preservation = self.get_object(bsc, 'Preservation', |
||
1821 | row.get('preservation')) |
||
1822 | self.artemplate_partitions[row['ARTemplate']].append({ |
||
1823 | 'part_id': row['part_id'], |
||
1824 | 'Container': container.Title(), |
||
1825 | 'container_uid': container.UID(), |
||
1826 | 'Preservation': preservation.Title(), |
||
1827 | 'preservation_uid': preservation.UID()}) |
||
1828 | |||
1829 | def Import(self): |
||
1830 | self.load_artemplate_analyses() |
||
1831 | self.load_artemplate_partitions() |
||
1832 | folder = self.context.bika_setup.bika_artemplates |
||
1833 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1834 | pc = getToolByName(self.context, 'portal_catalog') |
||
1835 | for row in self.get_rows(3): |
||
1836 | if not row['title']: |
||
1837 | continue |
||
1838 | analyses = self.artemplate_analyses[row['title']] |
||
1839 | client_title = row['Client_title'] or 'lab' |
||
1840 | if row['title'] in self.artemplate_partitions: |
||
1841 | partitions = self.artemplate_partitions[row['title']] |
||
1842 | else: |
||
1843 | partitions = [{'part_id': 'part-1', |
||
1844 | 'container': '', |
||
1845 | 'preservation': ''}] |
||
1846 | |||
1847 | if client_title == 'lab': |
||
1848 | folder = self.context.bika_setup.bika_artemplates |
||
1849 | else: |
||
1850 | folder = pc(portal_type='Client', |
||
1851 | getName=client_title)[0].getObject() |
||
1852 | |||
1853 | sampletype = self.get_object(bsc, 'SampleType', |
||
1854 | row.get('SampleType_title')) |
||
1855 | samplepoint = self.get_object(bsc, 'SamplePoint', |
||
1856 | row.get('SamplePoint_title')) |
||
1857 | |||
1858 | obj = _createObjectByType("ARTemplate", folder, tmpID()) |
||
1859 | obj.edit( |
||
1860 | title=str(row['title']), |
||
1861 | description=row.get('description', ''), |
||
1862 | Remarks=row.get('Remarks', ''),) |
||
1863 | obj.setSampleType(sampletype) |
||
1864 | obj.setSamplePoint(samplepoint) |
||
1865 | obj.setPartitions(partitions) |
||
1866 | obj.setAnalyses(analyses) |
||
1867 | obj.unmarkCreationFlag() |
||
1868 | renameAfterCreation(obj) |
||
1869 | notify(ObjectInitializedEvent(obj)) |
||
1870 | |||
1871 | |||
1872 | class Reference_Definitions(WorksheetImporter): |
||
1873 | |||
1874 | def load_reference_definition_results(self): |
||
1875 | sheetname = 'Reference Definition Results' |
||
1876 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1877 | if not worksheet: |
||
1878 | sheetname = 'Reference Definition Values' |
||
1879 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1880 | if not worksheet: |
||
1881 | return |
||
1882 | self.results = {} |
||
1883 | if not worksheet: |
||
1884 | return |
||
1885 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1886 | for row in self.get_rows(3, worksheet=worksheet): |
||
1887 | if row['ReferenceDefinition_title'] not in self.results.keys(): |
||
1888 | self.results[row['ReferenceDefinition_title']] = [] |
||
1889 | service = self.get_object(bsc, 'AnalysisService', |
||
1890 | row.get('service')) |
||
1891 | self.results[ |
||
1892 | row['ReferenceDefinition_title']].append({ |
||
1893 | 'uid': service.UID(), |
||
1894 | 'result': row['result'] if row['result'] else '0', |
||
1895 | 'min': row['min'] if row['min'] else '0', |
||
1896 | 'max': row['max'] if row['max'] else '0'}) |
||
1897 | |||
1898 | def Import(self): |
||
1899 | self.load_reference_definition_results() |
||
1900 | folder = self.context.bika_setup.bika_referencedefinitions |
||
1901 | for row in self.get_rows(3): |
||
1902 | if not row['title']: |
||
1903 | continue |
||
1904 | obj = _createObjectByType("ReferenceDefinition", folder, tmpID()) |
||
1905 | obj.edit( |
||
1906 | title=row['title'], |
||
1907 | description=row.get('description', ''), |
||
1908 | Blank=self.to_bool(row['Blank']), |
||
1909 | ReferenceResults=self.results.get(row['title'], []), |
||
1910 | Hazardous=self.to_bool(row['Hazardous'])) |
||
1911 | obj.unmarkCreationFlag() |
||
1912 | renameAfterCreation(obj) |
||
1913 | notify(ObjectInitializedEvent(obj)) |
||
1914 | |||
1915 | |||
1916 | class Worksheet_Templates(WorksheetImporter): |
||
1917 | |||
1918 | def load_wst_layouts(self): |
||
1919 | sheetname = 'Worksheet Template Layouts' |
||
1920 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1921 | self.wst_layouts = {} |
||
1922 | if not worksheet: |
||
1923 | return |
||
1924 | for row in self.get_rows(3, worksheet=worksheet): |
||
1925 | if row['WorksheetTemplate_title'] \ |
||
1926 | not in self.wst_layouts.keys(): |
||
1927 | self.wst_layouts[ |
||
1928 | row['WorksheetTemplate_title']] = [] |
||
1929 | self.wst_layouts[ |
||
1930 | row['WorksheetTemplate_title']].append({ |
||
1931 | 'pos': row['pos'], |
||
1932 | 'type': row['type'], |
||
1933 | 'blank_ref': row['blank_ref'], |
||
1934 | 'control_ref': row['control_ref'], |
||
1935 | 'dup': row['dup']}) |
||
1936 | |||
1937 | View Code Duplication | def load_wst_services(self): |
|
1938 | sheetname = 'Worksheet Template Services' |
||
1939 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
1940 | self.wst_services = {} |
||
1941 | if not worksheet: |
||
1942 | return |
||
1943 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
1944 | for row in self.get_rows(3, worksheet=worksheet): |
||
1945 | service = self.get_object(bsc, 'AnalysisService', |
||
1946 | row.get('service')) |
||
1947 | if row['WorksheetTemplate_title'] not in self.wst_services.keys(): |
||
1948 | self.wst_services[row['WorksheetTemplate_title']] = [] |
||
1949 | self.wst_services[ |
||
1950 | row['WorksheetTemplate_title']].append(service.UID()) |
||
1951 | |||
1952 | def Import(self): |
||
1953 | self.load_wst_services() |
||
1954 | self.load_wst_layouts() |
||
1955 | folder = self.context.bika_setup.bika_worksheettemplates |
||
1956 | for row in self.get_rows(3): |
||
1957 | if row['title']: |
||
1958 | obj = _createObjectByType("WorksheetTemplate", folder, tmpID()) |
||
1959 | obj.edit( |
||
1960 | title=row['title'], |
||
1961 | description=row.get('description', ''), |
||
1962 | Layout=self.wst_layouts[row['title']]) |
||
1963 | obj.setService(self.wst_services[row['title']]) |
||
1964 | obj.unmarkCreationFlag() |
||
1965 | renameAfterCreation(obj) |
||
1966 | notify(ObjectInitializedEvent(obj)) |
||
1967 | |||
1968 | |||
1969 | class Setup(WorksheetImporter): |
||
1970 | |||
1971 | |||
1972 | def get_field_value(self, field, value): |
||
1973 | if value is None: |
||
1974 | return None |
||
1975 | converters = { |
||
1976 | "integer": self.to_integer_value, |
||
1977 | "fixedpoint": self.to_fixedpoint_value, |
||
1978 | "boolean": self.to_boolean_value, |
||
1979 | "string": self.to_string_value, |
||
1980 | "reference": self.to_reference_value, |
||
1981 | "duration": self.to_duration_value |
||
1982 | } |
||
1983 | try: |
||
1984 | return converters.get(field.type, None)(field, value) |
||
1985 | except: |
||
1986 | logger.error("No valid type for Setup.{} ({}): {}" |
||
1987 | .format(field.getName(), field.type, value)) |
||
1988 | |||
1989 | def to_integer_value(self, field, value): |
||
1990 | return str(int(value)) |
||
1991 | |||
1992 | def to_fixedpoint_value(self, field, value): |
||
1993 | return str(float(value)) |
||
1994 | |||
1995 | def to_boolean_value(self, field, value): |
||
1996 | return self.to_bool(value) |
||
1997 | |||
1998 | def to_string_value(self, field, value): |
||
1999 | if field.vocabulary: |
||
2000 | return self.to_string_vocab_value(field, value) |
||
2001 | return value and str(value) or "" |
||
2002 | |||
2003 | def to_reference_value(self, field, value): |
||
2004 | if not value: |
||
2005 | return None |
||
2006 | |||
2007 | brains = api.search({"title": to_unicode(value)}) |
||
2008 | if brains: |
||
2009 | return api.get_uid(brains[0]) |
||
2010 | |||
2011 | msg = "No object found for Setup.{0} ({1}): {2}" |
||
2012 | msg = msg.format(field.getName(), field.type, value) |
||
2013 | logger.error(msg) |
||
2014 | raise ValueError(msg) |
||
2015 | |||
2016 | def to_string_vocab_value(self, field, value): |
||
2017 | vocabulary = field.vocabulary |
||
2018 | if type(vocabulary) is str: |
||
2019 | vocabulary = getFromString(api.get_setup(), vocabulary) |
||
2020 | else: |
||
2021 | vocabulary = vocabulary.items() |
||
2022 | |||
2023 | if not vocabulary: |
||
2024 | raise ValueError("Empty vocabulary for {}".format(field.getName())) |
||
2025 | |||
2026 | if type(vocabulary) in (tuple, list): |
||
2027 | vocabulary = {item[0]: item[1] for item in vocabulary} |
||
2028 | |||
2029 | for key, val in vocabulary.items(): |
||
2030 | key_low = str(to_utf8(key)).lower() |
||
2031 | val_low = str(to_utf8(val)).lower() |
||
2032 | value_low = str(value).lower() |
||
2033 | if key_low == value_low or val_low == value_low: |
||
2034 | return key |
||
2035 | raise ValueError("Vocabulary entry not found") |
||
2036 | |||
2037 | def to_duration_value(self, field, values): |
||
2038 | duration = ["days", "hours", "minutes"] |
||
2039 | duration = map(lambda d: "{}_{}".format(field.getName(), d), duration) |
||
2040 | return dict( |
||
2041 | days=api.to_int(values.get(duration[0], 0), 0), |
||
2042 | hours=api.to_int(values.get(duration[1], 0), 0), |
||
2043 | minutes=api.to_int(values.get(duration[2], 0), 0)) |
||
2044 | |||
2045 | def Import(self): |
||
2046 | values = {} |
||
2047 | for row in self.get_rows(3): |
||
2048 | values[row['Field']] = row['Value'] |
||
2049 | |||
2050 | bsetup = self.context.bika_setup |
||
2051 | bschema = bsetup.Schema() |
||
2052 | for field in bschema.fields(): |
||
2053 | value = None |
||
2054 | field_name = field.getName() |
||
2055 | if field_name in values: |
||
2056 | value = self.get_field_value(field, values[field_name]) |
||
2057 | elif field.type == "duration": |
||
2058 | value = self.get_field_value(field, values) |
||
2059 | |||
2060 | if value is None: |
||
2061 | continue |
||
2062 | try: |
||
2063 | obj_field = bsetup.getField(field_name) |
||
2064 | obj_field.set(bsetup, str(value)) |
||
2065 | except: |
||
2066 | logger.error("No valid type for Setup.{} ({}): {}" |
||
2067 | .format(field_name, field.type, value)) |
||
2068 | |||
2069 | |||
2070 | class ID_Prefixes(WorksheetImporter): |
||
2071 | |||
2072 | def Import(self): |
||
2073 | prefixes = self.context.bika_setup.getIDFormatting() |
||
2074 | for row in self.get_rows(3): |
||
2075 | # remove existing prefix from list |
||
2076 | prefixes = [p for p in prefixes |
||
2077 | if p['portal_type'] != row['portal_type']] |
||
2078 | # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator' |
||
2079 | separator = row.get('separator', '-') |
||
2080 | separator = '' if separator == 'none' else separator |
||
2081 | # add new prefix to list |
||
2082 | prefixes.append({'portal_type': row['portal_type'], |
||
2083 | 'padding': row['padding'], |
||
2084 | 'prefix': row['prefix'], |
||
2085 | 'separator': separator}) |
||
2086 | #self.context.bika_setup.setIDFormatting(prefixes) |
||
2087 | |||
2088 | |||
2089 | class Attachment_Types(WorksheetImporter): |
||
2090 | |||
2091 | def Import(self): |
||
2092 | folder = self.context.bika_setup.bika_attachmenttypes |
||
2093 | for row in self.get_rows(3): |
||
2094 | obj = _createObjectByType("AttachmentType", folder, tmpID()) |
||
2095 | obj.edit( |
||
2096 | title=row['title'], |
||
2097 | description=row.get('description', '')) |
||
2098 | obj.unmarkCreationFlag() |
||
2099 | renameAfterCreation(obj) |
||
2100 | notify(ObjectInitializedEvent(obj)) |
||
2101 | |||
2102 | |||
2103 | class Reference_Samples(WorksheetImporter): |
||
2104 | |||
2105 | def load_reference_sample_results(self, sample): |
||
2106 | sheetname = 'Reference Sample Results' |
||
2107 | if not hasattr(self, 'results_worksheet'): |
||
2108 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2109 | if not worksheet: |
||
2110 | return |
||
2111 | self.results_worksheet = worksheet |
||
2112 | results = [] |
||
2113 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2114 | for row in self.get_rows(3, worksheet=self.results_worksheet): |
||
2115 | if row['ReferenceSample_id'] != sample.getId(): |
||
2116 | continue |
||
2117 | service = self.get_object(bsc, 'AnalysisService', |
||
2118 | row.get('AnalysisService_title')) |
||
2119 | if not service: |
||
2120 | warning = "Unable to load a reference sample result. Service %s not found." |
||
2121 | logger.warning(warning, sheetname) |
||
2122 | continue |
||
2123 | results.append({ |
||
2124 | 'uid': service.UID(), |
||
2125 | 'result': row['result'], |
||
2126 | 'min': row['min'], |
||
2127 | 'max': row['max']}) |
||
2128 | sample.setReferenceResults(results) |
||
2129 | |||
2130 | def load_reference_analyses(self, sample): |
||
2131 | sheetname = 'Reference Analyses' |
||
2132 | if not hasattr(self, 'analyses_worksheet'): |
||
2133 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2134 | if not worksheet: |
||
2135 | return |
||
2136 | self.analyses_worksheet = worksheet |
||
2137 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2138 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2139 | if row['ReferenceSample_id'] != sample.getId(): |
||
2140 | continue |
||
2141 | service = self.get_object(bsc, 'AnalysisService', |
||
2142 | row.get('AnalysisService_title')) |
||
2143 | # Analyses are keyed/named by service keyword |
||
2144 | obj = _createObjectByType("ReferenceAnalysis", sample, row['id']) |
||
2145 | obj.edit(title=row['id'], |
||
2146 | ReferenceType=row['ReferenceType'], |
||
2147 | Result=row['Result'], |
||
2148 | Analyst=row['Analyst'], |
||
2149 | Instrument=row['Instrument'], |
||
2150 | Retested=row['Retested'] |
||
2151 | ) |
||
2152 | obj.setService(service) |
||
2153 | # obj.setCreators(row['creator']) |
||
2154 | # obj.setCreationDate(row['created']) |
||
2155 | # self.set_wf_history(obj, row['workflow_history']) |
||
2156 | obj.unmarkCreationFlag() |
||
2157 | |||
2158 | self.load_reference_analysis_interims(obj) |
||
2159 | |||
2160 | View Code Duplication | def load_reference_analysis_interims(self, analysis): |
|
2161 | sheetname = 'Reference Analysis Interims' |
||
2162 | if not hasattr(self, 'interim_worksheet'): |
||
2163 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2164 | if not worksheet: |
||
2165 | return |
||
2166 | self.interim_worksheet = worksheet |
||
2167 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2168 | interims = [] |
||
2169 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2170 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2171 | continue |
||
2172 | interims.append({ |
||
2173 | 'keyword': row['keyword'], |
||
2174 | 'title': row['title'], |
||
2175 | 'value': row['value'], |
||
2176 | 'unit': row['unit'], |
||
2177 | 'hidden': row['hidden']}) |
||
2178 | analysis.setInterimFields(interims) |
||
2179 | |||
2180 | def Import(self): |
||
2181 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2182 | for row in self.get_rows(3): |
||
2183 | if not row['id']: |
||
2184 | continue |
||
2185 | supplier = bsc(portal_type='Supplier', |
||
2186 | getName=row.get('Supplier_title', ''))[0].getObject() |
||
2187 | obj = _createObjectByType("ReferenceSample", supplier, row['id']) |
||
2188 | ref_def = self.get_object(bsc, 'ReferenceDefinition', |
||
2189 | row.get('ReferenceDefinition_title')) |
||
2190 | ref_man = self.get_object(bsc, 'Manufacturer', |
||
2191 | row.get('Manufacturer_title')) |
||
2192 | obj.edit(title=row['id'], |
||
2193 | description=row.get('description', ''), |
||
2194 | Blank=self.to_bool(row['Blank']), |
||
2195 | Hazardous=self.to_bool(row['Hazardous']), |
||
2196 | CatalogueNumber=row['CatalogueNumber'], |
||
2197 | LotNumber=row['LotNumber'], |
||
2198 | Remarks=row['Remarks'], |
||
2199 | ExpiryDate=row['ExpiryDate'], |
||
2200 | DateSampled=row['DateSampled'], |
||
2201 | DateReceived=row['DateReceived'], |
||
2202 | DateOpened=row['DateOpened'], |
||
2203 | DateExpired=row['DateExpired'], |
||
2204 | DateDisposed=row['DateDisposed'] |
||
2205 | ) |
||
2206 | obj.setReferenceDefinition(ref_def) |
||
2207 | obj.setManufacturer(ref_man) |
||
2208 | obj.unmarkCreationFlag() |
||
2209 | |||
2210 | self.load_reference_sample_results(obj) |
||
2211 | self.load_reference_analyses(obj) |
||
2212 | |||
2213 | class Analysis_Requests(WorksheetImporter): |
||
2214 | |||
2215 | def load_analyses(self, sample): |
||
2216 | sheetname = 'Analyses' |
||
2217 | if not hasattr(self, 'analyses_worksheet'): |
||
2218 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2219 | if not worksheet: |
||
2220 | return |
||
2221 | self.analyses_worksheet = worksheet |
||
2222 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2223 | bc = getToolByName(self.context, 'bika_catalog') |
||
2224 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2225 | service = bsc(portal_type='AnalysisService', |
||
2226 | title=row['AnalysisService_title'])[0].getObject() |
||
2227 | # analyses are keyed/named by keyword |
||
2228 | ar = bc(portal_type='AnalysisRequest', id=row['AnalysisRequest_id'])[0].getObject() |
||
2229 | obj = create_analysis( |
||
2230 | ar, service, |
||
2231 | Result=row['Result'], |
||
2232 | ResultCaptureDate=row['ResultCaptureDate'], |
||
2233 | Analyst=row['Analyst'], |
||
2234 | Instrument=row['Instrument'], |
||
2235 | Retested=self.to_bool(row['Retested']), |
||
2236 | MaxTimeAllowed={ |
||
2237 | 'days': int(row.get('MaxTimeAllowed_days', 0)), |
||
2238 | 'hours': int(row.get('MaxTimeAllowed_hours', 0)), |
||
2239 | 'minutes': int(row.get('MaxTimeAllowed_minutes', 0)), |
||
2240 | }, |
||
2241 | ) |
||
2242 | |||
2243 | analyses = ar.objectValues('Analyses') |
||
2244 | analyses = list(analyses) |
||
2245 | analyses.append(obj) |
||
2246 | ar.setAnalyses(analyses) |
||
2247 | obj.unmarkCreationFlag() |
||
2248 | |||
2249 | self.load_analysis_interims(obj) |
||
2250 | |||
2251 | View Code Duplication | def load_analysis_interims(self, analysis): |
|
2252 | sheetname = 'Reference Analysis Interims' |
||
2253 | if not hasattr(self, 'interim_worksheet'): |
||
2254 | worksheet = self.workbook.get_sheet_by_name(sheetname) |
||
2255 | if not worksheet: |
||
2256 | return |
||
2257 | self.interim_worksheet = worksheet |
||
2258 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2259 | interims = [] |
||
2260 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2261 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2262 | continue |
||
2263 | interims.append({ |
||
2264 | 'keyword': row['keyword'], |
||
2265 | 'title': row['title'], |
||
2266 | 'value': row['value'], |
||
2267 | 'unit': row['unit'], |
||
2268 | 'hidden': row['hidden']}) |
||
2269 | analysis.setInterimFields(interims) |
||
2270 | |||
2271 | def Import(self): |
||
2272 | bc = getToolByName(self.context, 'bika_catalog') |
||
2273 | bsc = getToolByName(self.context, 'bika_setup_catalog') |
||
2274 | pc = getToolByName(self.context, 'portal_catalog') |
||
2275 | for row in self.get_rows(3): |
||
2276 | if not row['id']: |
||
2277 | continue |
||
2278 | client = pc(portal_type="Client", |
||
2279 | getName=row['Client_title'])[0].getObject() |
||
2280 | obj = _createObjectByType("AnalysisRequest", client, row['id']) |
||
2281 | contact = pc(portal_type="Contact", |
||
2282 | getFullname=row['Contact_Fullname'])[0].getObject() |
||
2283 | obj.edit( |
||
2284 | RequestID=row['id'], |
||
2285 | Contact=contact, |
||
2286 | CCEmails=row['CCEmails'], |
||
2287 | ClientOrderNumber=row['ClientOrderNumber'], |
||
2288 | InvoiceExclude=row['InvoiceExclude'], |
||
2289 | DateReceived=row['DateReceived'], |
||
2290 | DatePublished=row['DatePublished'], |
||
2291 | Remarks=row['Remarks'] |
||
2292 | ) |
||
2293 | if row['CCContact_Fullname']: |
||
2294 | contact = pc(portal_type="Contact", |
||
2295 | getFullname=row['CCContact_Fullname'])[0].getObject() |
||
2296 | obj.setCCContact(contact) |
||
2297 | if row['AnalysisProfile_title']: |
||
2298 | profile = pc(portal_type="AnalysisProfile", |
||
2299 | title=row['AnalysisProfile_title'].getObject()) |
||
2300 | obj.setProfile(profile) |
||
2301 | if row['ARTemplate_title']: |
||
2302 | template = pc(portal_type="ARTemplate", |
||
2303 | title=row['ARTemplate_title'])[0].getObject() |
||
2304 | obj.setProfile(template) |
||
2305 | |||
2306 | obj.unmarkCreationFlag() |
||
2307 | |||
2308 | self.load_analyses(obj) |
||
2309 | |||
2310 | |||
2311 | class Invoice_Batches(WorksheetImporter): |
||
2312 | |||
2313 | def Import(self): |
||
2314 | folder = self.context.invoices |
||
2315 | for row in self.get_rows(3): |
||
2316 | obj = _createObjectByType("InvoiceBatch", folder, tmpID()) |
||
2317 | if not row['title']: |
||
2318 | message = _("InvoiceBatch has no Title") |
||
2319 | raise Exception(t(message)) |
||
2320 | if not row['start']: |
||
2321 | message = _("InvoiceBatch has no Start Date") |
||
2322 | raise Exception(t(message)) |
||
2323 | if not row['end']: |
||
2324 | message = _("InvoiceBatch has no End Date") |
||
2325 | raise Exception(t(message)) |
||
2326 | obj.edit( |
||
2327 | title=row['title'], |
||
2328 | BatchStartDate=row['start'], |
||
2329 | BatchEndDate=row['end'], |
||
2330 | ) |
||
2331 | renameAfterCreation(obj) |
||
2332 | notify(ObjectInitializedEvent(obj)) |
||
2333 |