Total Complexity | 460 |
Total Lines | 2428 |
Duplicated Lines | 5.93 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like senaite.core.exportimport.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2025 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | import datetime |
||
22 | import os.path |
||
23 | import re |
||
24 | |||
25 | import transaction |
||
26 | from bika.lims import api |
||
27 | from bika.lims import bikaMessageFactory as _ |
||
28 | from bika.lims import logger |
||
29 | from senaite.core.idserver import renameAfterCreation |
||
30 | from bika.lims.interfaces import ISetupDataSetList |
||
31 | from bika.lims.utils import getFromString |
||
32 | from senaite.core.i18n import translate as t |
||
33 | from bika.lims.utils import tmpID |
||
34 | from bika.lims.utils import to_unicode |
||
35 | from bika.lims.utils import to_utf8 |
||
36 | from bika.lims.utils.analysis import create_analysis |
||
37 | from pkg_resources import resource_filename |
||
38 | from Products.Archetypes.event import ObjectInitializedEvent |
||
39 | from Products.CMFCore.utils import getToolByName |
||
40 | from Products.CMFPlone.utils import _createObjectByType |
||
41 | from Products.CMFPlone.utils import safe_unicode |
||
42 | from senaite.core.catalog import CLIENT_CATALOG |
||
43 | from senaite.core.catalog import CONTACT_CATALOG |
||
44 | from senaite.core.catalog import SENAITE_CATALOG |
||
45 | from senaite.core.catalog import SETUP_CATALOG |
||
46 | from senaite.core.exportimport.dataimport import SetupDataSetList as SDL |
||
47 | from senaite.core.schema.addressfield import BILLING_ADDRESS |
||
48 | from senaite.core.schema.addressfield import PHYSICAL_ADDRESS |
||
49 | from senaite.core.schema.addressfield import POSTAL_ADDRESS |
||
50 | from zope.event import notify |
||
51 | from zope.interface import implements |
||
52 | |||
53 | UID_CATALOG = "uid_catalog" |
||
54 | |||
55 | |||
56 | def get_addresses_from_row(row): |
||
57 | """Fills the address fields for the specified object if allowed: |
||
58 | PhysicalAddress, PostalAddress, CountryState, BillingAddress |
||
59 | """ |
||
60 | types = [PHYSICAL_ADDRESS, POSTAL_ADDRESS, BILLING_ADDRESS] |
||
61 | keys = ["Address", "City", "Zip", "Country"] |
||
62 | |||
63 | address_list = [] |
||
64 | for address_type in types: |
||
65 | address_item = {"type": address_type} |
||
66 | for key in keys: |
||
67 | field_name = "%s_%s" % (address_type.capitalize(), key) |
||
68 | value = str(row.get(field_name, "")) |
||
69 | if value: |
||
70 | address_item.update({key.lower(): value}) |
||
71 | if len(address_item.keys()) > 1: |
||
72 | address_list.append(address_item) |
||
73 | return address_list |
||
74 | |||
75 | |||
76 | def lookup(context, portal_type, **kwargs): |
||
77 | at = getToolByName(context, 'archetype_tool') |
||
78 | catalog = at.catalog_map.get(portal_type, [None])[0] or UID_CATALOG |
||
79 | catalog = getToolByName(context, catalog) |
||
80 | kwargs['portal_type'] = portal_type |
||
81 | return catalog(**kwargs)[0].getObject() |
||
82 | |||
83 | |||
84 | def check_for_required_columns(name, data, required): |
||
85 | for column in required: |
||
86 | if not data.get(column, None): |
||
87 | message = _("%s has no '%s' column." % (name, column)) |
||
88 | raise Exception(t(message)) |
||
89 | |||
90 | |||
91 | def Float(thing): |
||
92 | try: |
||
93 | f = float(thing) |
||
94 | except ValueError: |
||
95 | f = 0.0 |
||
96 | return f |
||
97 | |||
98 | |||
99 | def read_file(path): |
||
100 | if os.path.isfile(path): |
||
101 | return open(path, "rb").read() |
||
102 | allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt', |
||
103 | 'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt'] |
||
104 | allowed_ext += [e.upper() for e in allowed_ext] |
||
105 | for e in allowed_ext: |
||
106 | out = '%s.%s' % (path, e) |
||
107 | if os.path.isfile(out): |
||
108 | return open(out, "rb").read() |
||
109 | raise IOError("File not found: %s. Allowed extensions: %s" % |
||
110 | (path, ','.join(allowed_ext))) |
||
111 | |||
112 | |||
113 | class SetupDataSetList(SDL): |
||
114 | |||
115 | implements(ISetupDataSetList) |
||
116 | |||
117 | def __call__(self): |
||
118 | return SDL.__call__(self, projectname="bika.lims") |
||
119 | |||
120 | |||
121 | class WorksheetImporter(object): |
||
122 | |||
123 | """Use this as a base, for normal tabular data sheet imports. |
||
124 | """ |
||
125 | |||
126 | def __init__(self, context): |
||
127 | self.adapter_context = context |
||
128 | |||
129 | def __call__(self, lsd, workbook, dataset_project, dataset_name): |
||
130 | self.lsd = lsd |
||
131 | self.context = lsd.context |
||
132 | self.workbook = workbook |
||
133 | self.sheetname = self.__class__.__name__.replace("_", " ") |
||
134 | try: |
||
135 | self.worksheet = workbook[self.sheetname] |
||
136 | except KeyError: |
||
137 | self.worksheet = None |
||
138 | self.dataset_project = dataset_project |
||
139 | self.dataset_name = dataset_name |
||
140 | if self.worksheet: |
||
141 | logger.info("Loading {0}.{1}: {2}".format( |
||
142 | self.dataset_project, self.dataset_name, self.sheetname)) |
||
143 | try: |
||
144 | self.Import() |
||
145 | except IOError: |
||
146 | # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/ |
||
147 | # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from |
||
148 | # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624 |
||
149 | warning = "Error while loading attached file from %s. The file will not be uploaded into the system." |
||
150 | logger.warning(warning, self.sheetname) |
||
151 | self.context.plone_utils.addPortalMessage("Error while loading some attached files. " |
||
152 | "The files weren't uploaded into the system.") |
||
153 | else: |
||
154 | logger.info("No records found: '{0}'".format(self.sheetname)) |
||
155 | |||
156 | def get_rows(self, startrow=3, worksheet=None): |
||
157 | """Returns a generator for all rows in a sheet. |
||
158 | Each row contains a dictionary where the key is the value of the |
||
159 | first row of the sheet for each column. |
||
160 | The data values are returned in utf-8 format. |
||
161 | Starts to consume data from startrow |
||
162 | """ |
||
163 | |||
164 | headers = [] |
||
165 | row_nr = 0 |
||
166 | worksheet = worksheet if worksheet else self.worksheet |
||
167 | for row in worksheet.rows: # .iter_rows(): |
||
168 | row_nr += 1 |
||
169 | if row_nr == 1: |
||
170 | # headers = [cell.internal_value for cell in row] |
||
171 | headers = [cell.value for cell in row] |
||
172 | continue |
||
173 | if row_nr % 1000 == 0: |
||
174 | transaction.savepoint() |
||
175 | if row_nr <= startrow: |
||
176 | continue |
||
177 | # row = [_c(cell.internal_value).decode('utf-8') for cell in row] |
||
178 | new_row = [] |
||
179 | for cell in row: |
||
180 | value = cell.value |
||
181 | if value is None: |
||
182 | value = '' |
||
183 | if isinstance(value, unicode): |
||
|
|||
184 | value = value.encode('utf-8') |
||
185 | # Strip any space, \t, \n, or \r characters from the left-hand |
||
186 | # side, right-hand side, or both sides of the string |
||
187 | if isinstance(value, str): |
||
188 | value = value.strip(' \t\n\r') |
||
189 | new_row.append(value) |
||
190 | row = dict(zip(headers, new_row)) |
||
191 | |||
192 | # parse out addresses |
||
193 | for add_type in ['Physical', 'Postal', 'Billing']: |
||
194 | row[add_type] = {} |
||
195 | if add_type + "_Address" in row: |
||
196 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
197 | row[add_type][key] = str( |
||
198 | row.get("%s_%s" % (add_type, key), '')) |
||
199 | |||
200 | yield row |
||
201 | |||
202 | def get_file_data(self, filename): |
||
203 | if filename: |
||
204 | try: |
||
205 | path = resource_filename( |
||
206 | self.dataset_project, |
||
207 | "setupdata/%s/%s" % (self.dataset_name, filename)) |
||
208 | file_data = open(path, "rb").read() |
||
209 | except Exception: |
||
210 | file_data = None |
||
211 | else: |
||
212 | file_data = None |
||
213 | return file_data |
||
214 | |||
215 | def to_bool(self, value): |
||
216 | """ Converts a sheet string value to a boolean value. |
||
217 | Needed because of utf-8 conversions |
||
218 | """ |
||
219 | |||
220 | try: |
||
221 | value = value.lower() |
||
222 | except Exception: |
||
223 | pass |
||
224 | try: |
||
225 | value = value.encode('utf-8') |
||
226 | except Exception: |
||
227 | pass |
||
228 | try: |
||
229 | value = int(value) |
||
230 | except Exception: |
||
231 | pass |
||
232 | if value in ('true', 1): |
||
233 | return True |
||
234 | else: |
||
235 | return False |
||
236 | |||
237 | def to_int(self, value, default=0): |
||
238 | """ Converts a value o a int. Returns default if the conversion fails. |
||
239 | """ |
||
240 | try: |
||
241 | return int(value) |
||
242 | except ValueError: |
||
243 | try: |
||
244 | return int(default) |
||
245 | except Exception: |
||
246 | return 0 |
||
247 | |||
248 | def to_float(self, value, default=0): |
||
249 | """ Converts a value o a float. Returns default if the conversion fails. |
||
250 | """ |
||
251 | try: |
||
252 | return float(value) |
||
253 | except ValueError: |
||
254 | try: |
||
255 | return float(default) |
||
256 | except Exception: |
||
257 | return 0.0 |
||
258 | |||
259 | def defer(self, **kwargs): |
||
260 | self.lsd.deferred.append(kwargs) |
||
261 | |||
262 | def Import(self): |
||
263 | """ Override this. |
||
264 | XXX Simple generic sheet importer |
||
265 | """ |
||
266 | |||
267 | def fill_addressfields(self, row, obj): |
||
268 | """ Fills the address fields for the specified object if allowed: |
||
269 | PhysicalAddress, PostalAddress, CountryState, BillingAddress |
||
270 | """ |
||
271 | addresses = {} |
||
272 | for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']: |
||
273 | addresses[add_type] = {} |
||
274 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
275 | addresses[add_type][key.lower()] = str( |
||
276 | row.get("%s_%s" % (add_type, key), '')) |
||
277 | |||
278 | if addresses['CountryState']['country'] == '' \ |
||
279 | and addresses['CountryState']['state'] == '': |
||
280 | addresses['CountryState']['country'] = addresses['Physical']['country'] |
||
281 | addresses['CountryState']['state'] = addresses['Physical']['state'] |
||
282 | |||
283 | if hasattr(obj, 'setPhysicalAddress'): |
||
284 | obj.setPhysicalAddress(addresses['Physical']) |
||
285 | if hasattr(obj, 'setPostalAddress'): |
||
286 | obj.setPostalAddress(addresses['Postal']) |
||
287 | if hasattr(obj, 'setCountryState'): |
||
288 | obj.setCountryState(addresses['CountryState']) |
||
289 | if hasattr(obj, 'setBillingAddress'): |
||
290 | obj.setBillingAddress(addresses['Billing']) |
||
291 | |||
292 | def fill_contactfields(self, row, obj): |
||
293 | """ Fills the contact fields for the specified object if allowed: |
||
294 | EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone, |
||
295 | MobilePhone |
||
296 | """ |
||
297 | fieldnames = ['EmailAddress', |
||
298 | 'Phone', |
||
299 | 'Fax', |
||
300 | 'BusinessPhone', |
||
301 | 'BusinessFax', |
||
302 | 'HomePhone', |
||
303 | 'MobilePhone', |
||
304 | ] |
||
305 | schema = obj.Schema() |
||
306 | fields = dict([(field.getName(), field) for field in schema.fields()]) |
||
307 | for fieldname in fieldnames: |
||
308 | try: |
||
309 | field = fields[fieldname] |
||
310 | except Exception: |
||
311 | if fieldname in row: |
||
312 | logger.info("Address field %s not found on %s" % |
||
313 | (fieldname, obj)) |
||
314 | continue |
||
315 | value = row.get(fieldname, '') |
||
316 | field.set(obj, value) |
||
317 | |||
318 | def get_object(self, catalog, portal_type, title=None, **kwargs): |
||
319 | """This will return an object from the catalog. |
||
320 | Logs a message and returns None if no object or multiple objects found. |
||
321 | All keyword arguments are passed verbatim to the contentFilter |
||
322 | """ |
||
323 | if not title and not kwargs: |
||
324 | return None |
||
325 | contentFilter = {"portal_type": portal_type} |
||
326 | if title: |
||
327 | contentFilter['title'] = to_unicode(title) |
||
328 | contentFilter.update(kwargs) |
||
329 | brains = catalog(contentFilter) |
||
330 | if len(brains) > 1: |
||
331 | logger.info("More than one object found for %s" % contentFilter) |
||
332 | return None |
||
333 | elif len(brains) == 0: |
||
334 | if portal_type == 'AnalysisService': |
||
335 | brains = catalog(portal_type=portal_type, getKeyword=title) |
||
336 | if brains: |
||
337 | return brains[0].getObject() |
||
338 | logger.info("No objects found for %s" % contentFilter) |
||
339 | return None |
||
340 | else: |
||
341 | return brains[0].getObject() |
||
342 | |||
343 | |||
344 | class Sub_Groups(WorksheetImporter): |
||
345 | |||
346 | def Import(self): |
||
347 | |||
348 | container = self.context.setup.subgroups |
||
349 | for row in self.get_rows(3): |
||
350 | title = row.get("title") |
||
351 | if not title: |
||
352 | continue |
||
353 | api.create(container, "SubGroup", |
||
354 | title=title, description=row.get("description"), |
||
355 | SortKey=row.get("SortKey")) |
||
356 | |||
357 | |||
358 | class Lab_Information(WorksheetImporter): |
||
359 | |||
360 | def Import(self): |
||
361 | laboratory = self.context.bika_setup.laboratory |
||
362 | values = {} |
||
363 | for row in self.get_rows(3): |
||
364 | values[row['Field']] = row['Value'] |
||
365 | |||
366 | if values['AccreditationBodyLogo']: |
||
367 | path = resource_filename( |
||
368 | self.dataset_project, |
||
369 | "setupdata/%s/%s" % (self.dataset_name, |
||
370 | values['AccreditationBodyLogo'])) |
||
371 | try: |
||
372 | file_data = read_file(path) |
||
373 | except Exception as msg: |
||
374 | file_data = None |
||
375 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
376 | else: |
||
377 | file_data = None |
||
378 | |||
379 | laboratory.edit( |
||
380 | Name=values['Name'], |
||
381 | LabURL=values['LabURL'], |
||
382 | Confidence=values['Confidence'], |
||
383 | LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']), |
||
384 | AccreditationBodyLong=values['AccreditationBodyLong'], |
||
385 | AccreditationBody=values['AccreditationBody'], |
||
386 | AccreditationBodyURL=values['AccreditationBodyURL'], |
||
387 | Accreditation=values['Accreditation'], |
||
388 | AccreditationReference=values['AccreditationReference'], |
||
389 | AccreditationBodyLogo=file_data, |
||
390 | TaxNumber=values['TaxNumber'], |
||
391 | ) |
||
392 | self.fill_contactfields(values, laboratory) |
||
393 | self.fill_addressfields(values, laboratory) |
||
394 | |||
395 | |||
396 | class Lab_Contacts(WorksheetImporter): |
||
397 | |||
398 | def Import(self): |
||
399 | folder = self.context.bika_setup.bika_labcontacts |
||
400 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
401 | portal_registration = getToolByName( |
||
402 | self.context, 'portal_registration') |
||
403 | rownum = 2 |
||
404 | for row in self.get_rows(3): |
||
405 | rownum += 1 |
||
406 | if not row.get('Firstname', None): |
||
407 | continue |
||
408 | |||
409 | # Username already exists? |
||
410 | username = row.get('Username', '') |
||
411 | fullname = ('%s %s' % |
||
412 | (row['Firstname'], row.get('Surname', ''))).strip() |
||
413 | if username: |
||
414 | username = safe_unicode(username).encode('utf-8') |
||
415 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
416 | exists = [o.getObject() for o in bsc( |
||
417 | portal_type="LabContact") if o.getObject().getUsername() == username] |
||
418 | if exists: |
||
419 | error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format( |
||
420 | username, str(rownum)) |
||
421 | logger.error(error) |
||
422 | continue |
||
423 | |||
424 | # Is there a signature file defined? Try to get the file first. |
||
425 | signature = None |
||
426 | if row.get('Signature'): |
||
427 | signature = self.get_file_data(row['Signature']) |
||
428 | if not signature: |
||
429 | warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format( |
||
430 | row['Signature'], username) |
||
431 | logger.warning(warning) |
||
432 | |||
433 | obj = _createObjectByType("LabContact", folder, tmpID()) |
||
434 | obj.edit( |
||
435 | title=fullname, |
||
436 | Salutation=row.get('Salutation', ''), |
||
437 | Firstname=row['Firstname'], |
||
438 | Surname=row.get('Surname', ''), |
||
439 | JobTitle=row.get('JobTitle', ''), |
||
440 | Username=row.get('Username', ''), |
||
441 | Signature=signature |
||
442 | ) |
||
443 | obj.unmarkCreationFlag() |
||
444 | renameAfterCreation(obj) |
||
445 | notify(ObjectInitializedEvent(obj)) |
||
446 | self.fill_contactfields(row, obj) |
||
447 | self.fill_addressfields(row, obj) |
||
448 | |||
449 | if row['Department_title']: |
||
450 | self.defer(src_obj=obj, |
||
451 | src_field='Department', |
||
452 | dest_catalog=SETUP_CATALOG, |
||
453 | dest_query={'portal_type': 'Department', |
||
454 | 'title': row['Department_title']} |
||
455 | ) |
||
456 | |||
457 | # Create Plone user |
||
458 | if not row['Username']: |
||
459 | warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format( |
||
460 | fullname, str(rownum)) |
||
461 | logger.warning(warn) |
||
462 | if not row.get('EmailAddress', ''): |
||
463 | warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format( |
||
464 | fullname, str(rownum)) |
||
465 | logger.warning(warn) |
||
466 | |||
467 | if (row['Username'] and row.get('EmailAddress', '')): |
||
468 | username = safe_unicode(row['Username']).encode('utf-8') |
||
469 | passw = row['Password'] |
||
470 | if not passw: |
||
471 | passw = username |
||
472 | warn = ("Lab Contact: No password defined for user '{0}' in row {1}." |
||
473 | " Password established automatically to '{2}'").format(username, str(rownum), passw) |
||
474 | logger.warning(warn) |
||
475 | |||
476 | try: |
||
477 | member = portal_registration.addMember( |
||
478 | username, |
||
479 | passw, |
||
480 | properties={ |
||
481 | 'username': username, |
||
482 | 'email': row['EmailAddress'], |
||
483 | 'fullname': fullname} |
||
484 | ) |
||
485 | except Exception as msg: |
||
486 | logger.error( |
||
487 | "Client Contact: Error adding user (%s): %s" % (msg, username)) |
||
488 | continue |
||
489 | |||
490 | groups = row.get('Groups', '') |
||
491 | if not groups: |
||
492 | warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format( |
||
493 | username, str(rownum)) |
||
494 | logger.warning(warn) |
||
495 | groups = 'Analysts' |
||
496 | |||
497 | group_ids = [g.strip() for g in groups.split(',')] |
||
498 | # Add user to all specified groups |
||
499 | for group_id in group_ids: |
||
500 | group = portal_groups.getGroupById(group_id) |
||
501 | if group: |
||
502 | group.addMember(username) |
||
503 | roles = row.get('Roles', '') |
||
504 | if roles: |
||
505 | role_ids = [r.strip() for r in roles.split(',')] |
||
506 | # Add user to all specified roles |
||
507 | for role_id in role_ids: |
||
508 | member._addRole(role_id) |
||
509 | # If user is in LabManagers, add Owner local role on clients |
||
510 | # folder |
||
511 | if 'LabManager' in group_ids: |
||
512 | self.context.clients.manage_setLocalRoles( |
||
513 | username, ['Owner', ]) |
||
514 | |||
515 | # Now we have the lab contacts registered, try to assign the managers |
||
516 | # to each department if required |
||
517 | sheet = self.workbook["Lab Departments"] |
||
518 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
519 | for row in self.get_rows(3, sheet): |
||
520 | if row['title'] and row['LabContact_Username']: |
||
521 | dept = self.get_object(bsc, "Department", row.get('title')) |
||
522 | if dept and not dept.getManager(): |
||
523 | username = safe_unicode( |
||
524 | row['LabContact_Username']).encode('utf-8') |
||
525 | exists = [o.getObject() for o in bsc( |
||
526 | portal_type="LabContact") if o.getObject().getUsername() == username] |
||
527 | if exists: |
||
528 | dept.setManager(exists[0].UID()) |
||
529 | |||
530 | |||
531 | class Lab_Departments(WorksheetImporter): |
||
532 | """Import Lab Departments |
||
533 | """ |
||
534 | |||
535 | def Import(self): |
||
536 | setup = api.get_senaite_setup() |
||
537 | container = setup.departments |
||
538 | cat = getToolByName(self.context, CONTACT_CATALOG) |
||
539 | lab_contacts = [o.getObject() for o in cat(portal_type="LabContact")] |
||
540 | for row in self.get_rows(3): |
||
541 | title = row.get("title") |
||
542 | description = row.get("description") |
||
543 | username = row.get("LabContact_Username") |
||
544 | manager = None |
||
545 | |||
546 | if not title: |
||
547 | continue |
||
548 | |||
549 | obj = api.create(container, |
||
550 | "Department", |
||
551 | title=title, |
||
552 | description=description) |
||
553 | |||
554 | for contact in lab_contacts: |
||
555 | if contact.getUsername() == username: |
||
556 | manager = contact |
||
557 | break |
||
558 | if manager: |
||
559 | obj.setManager(manager.UID()) |
||
560 | else: |
||
561 | message = "Department: lookup of '%s' in LabContacts" \ |
||
562 | "/Username failed." % username |
||
563 | logger.info(message) |
||
564 | |||
565 | |||
566 | class Lab_Products(WorksheetImporter): |
||
567 | |||
568 | def Import(self): |
||
569 | # Refer to the default folder |
||
570 | container = self.context.setup.labproducts |
||
571 | # Iterate through the rows |
||
572 | for row in self.get_rows(3): |
||
573 | title = row.get("title") |
||
574 | if not title: |
||
575 | continue |
||
576 | api.create(container, "LabProduct", |
||
577 | title=title, description=row.get("description")) |
||
578 | |||
579 | |||
580 | class Clients(WorksheetImporter): |
||
581 | |||
582 | def Import(self): |
||
583 | folder = self.context.clients |
||
584 | for row in self.get_rows(3): |
||
585 | obj = _createObjectByType("Client", folder, tmpID()) |
||
586 | if not row['Name']: |
||
587 | message = "Client %s has no Name" |
||
588 | raise Exception(message) |
||
589 | if not row['ClientID']: |
||
590 | message = "Client %s has no Client ID" |
||
591 | raise Exception(message) |
||
592 | obj.edit(Name=row['Name'], |
||
593 | ClientID=row['ClientID'], |
||
594 | MemberDiscountApplies=row[ |
||
595 | 'MemberDiscountApplies'] and True or False, |
||
596 | BulkDiscount=row['BulkDiscount'] and True or False, |
||
597 | TaxNumber=row.get('TaxNumber', ''), |
||
598 | AccountNumber=row.get('AccountNumber', '') |
||
599 | ) |
||
600 | self.fill_contactfields(row, obj) |
||
601 | self.fill_addressfields(row, obj) |
||
602 | obj.unmarkCreationFlag() |
||
603 | renameAfterCreation(obj) |
||
604 | notify(ObjectInitializedEvent(obj)) |
||
605 | |||
606 | |||
607 | class Client_Contacts(WorksheetImporter): |
||
608 | |||
609 | def Import(self): |
||
610 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
611 | cat = api.get_tool(CLIENT_CATALOG) |
||
612 | for row in self.get_rows(3): |
||
613 | client = cat(portal_type="Client", |
||
614 | getName=row['Client_title']) |
||
615 | if len(client) == 0: |
||
616 | client_contact = "%(Firstname)s %(Surname)s" % row |
||
617 | error = "Client invalid: '%s'. The Client Contact %s will not be uploaded." |
||
618 | logger.error(error, row['Client_title'], client_contact) |
||
619 | continue |
||
620 | client = client[0].getObject() |
||
621 | contact = _createObjectByType("Contact", client, tmpID()) |
||
622 | fullname = "%(Firstname)s %(Surname)s" % row |
||
623 | pub_pref = [x.strip() for x in |
||
624 | row.get('PublicationPreference', '').split(",")] |
||
625 | contact.edit( |
||
626 | Salutation=row.get('Salutation', ''), |
||
627 | Firstname=row.get('Firstname', ''), |
||
628 | Surname=row.get('Surname', ''), |
||
629 | Username=row['Username'], |
||
630 | JobTitle=row.get('JobTitle', ''), |
||
631 | Department=row.get('Department', ''), |
||
632 | PublicationPreference=pub_pref, |
||
633 | ) |
||
634 | self.fill_contactfields(row, contact) |
||
635 | self.fill_addressfields(row, contact) |
||
636 | contact.unmarkCreationFlag() |
||
637 | renameAfterCreation(contact) |
||
638 | notify(ObjectInitializedEvent(contact)) |
||
639 | # CC Contacts |
||
640 | if row['CCContacts']: |
||
641 | names = [x.strip() for x in row['CCContacts'].split(",")] |
||
642 | for _fullname in names: |
||
643 | self.defer(src_obj=contact, |
||
644 | src_field='CCContact', |
||
645 | dest_catalog=CONTACT_CATALOG, |
||
646 | dest_query={'portal_type': 'Contact', |
||
647 | 'getFullname': _fullname} |
||
648 | ) |
||
649 | # Create Plone user |
||
650 | username = safe_unicode(row['Username']).encode('utf-8') |
||
651 | password = safe_unicode(row['Password']).encode('utf-8') |
||
652 | if (username): |
||
653 | try: |
||
654 | self.context.portal_registration.addMember( |
||
655 | username, |
||
656 | password, |
||
657 | properties={ |
||
658 | 'username': username, |
||
659 | 'email': row['EmailAddress'], |
||
660 | 'fullname': fullname} |
||
661 | ) |
||
662 | except Exception as msg: |
||
663 | logger.info("Error adding user (%s): %s" % (msg, username)) |
||
664 | contact.aq_parent.manage_setLocalRoles( |
||
665 | row['Username'], ['Owner', ]) |
||
666 | contact.reindexObject() |
||
667 | # add user to Clients group |
||
668 | group = portal_groups.getGroupById('Clients') |
||
669 | group.addMember(username) |
||
670 | |||
671 | |||
672 | class Container_Types(WorksheetImporter): |
||
673 | |||
674 | def Import(self): |
||
675 | container = self.context.setup.containertypes |
||
676 | for row in self.get_rows(3): |
||
677 | title = row.get("title") |
||
678 | if not title: |
||
679 | continue |
||
680 | |||
681 | api.create(container, "ContainerType", |
||
682 | title=title, description=row.get("description")) |
||
683 | |||
684 | |||
685 | class Preservations(WorksheetImporter): |
||
686 | |||
687 | def Import(self): |
||
688 | container = self.context.setup.samplepreservations |
||
689 | for row in self.get_rows(3): |
||
690 | title = row.get("title") |
||
691 | if not title: |
||
692 | continue |
||
693 | |||
694 | api.create(container, "SamplePreservation", |
||
695 | title=title, description=row.get("description")) |
||
696 | |||
697 | |||
698 | class Containers(WorksheetImporter): |
||
699 | |||
700 | def Import(self): |
||
701 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
702 | container = self.context.setup.samplecontainers |
||
703 | for row in self.get_rows(3): |
||
704 | title = row.get("title") |
||
705 | if not title: |
||
706 | continue |
||
707 | |||
708 | description = row.get("description", "") |
||
709 | capacity = row.get("Capacity", 0) |
||
710 | pre_preserved = self.to_bool(row["PrePreserved"]) |
||
711 | containertype = None |
||
712 | container_type_title = row.get("ContainerType_title", "") |
||
713 | |||
714 | if container_type_title: |
||
715 | containertype = self.get_object( |
||
716 | bsc, "ContainerType", container_type_title) |
||
717 | |||
718 | api.create(container, "SampleContainer", |
||
719 | title=title, |
||
720 | description=description, |
||
721 | capacity=capacity, |
||
722 | pre_preserved=pre_preserved, |
||
723 | containertype=containertype) |
||
724 | |||
725 | |||
726 | class Suppliers(WorksheetImporter): |
||
727 | |||
728 | def Import(self): |
||
729 | container = self.context.setup.suppliers |
||
730 | for row in self.get_rows(3): |
||
731 | title = row.get("Name") |
||
732 | if not title: |
||
733 | continue |
||
734 | |||
735 | api.create(container, "Supplier", |
||
736 | title=title, |
||
737 | description=row.get("description"), |
||
738 | tax_number=row.get("TaxNumber"), |
||
739 | phone=row.get("Phone", ""), |
||
740 | fax=row.get("Fax", ""), |
||
741 | email=row.get("EmailAddress", ""), |
||
742 | account_type=row.get("AccountType", {}), |
||
743 | account_name=row.get("AccountName", {}), |
||
744 | account_number=row.get("AccountNumber", ''), |
||
745 | bank_name=row.get("BankName", ""), |
||
746 | bank_branch=row.get("BankBranch", ""), |
||
747 | swift_code=row.get("SWIFTcode", ""), |
||
748 | iban=row.get("IBN", ""), |
||
749 | nib=row.get("NIB", ""), |
||
750 | website=row.get("Website", ""), |
||
751 | address=get_addresses_from_row(row)) |
||
752 | |||
753 | |||
754 | class Supplier_Contacts(WorksheetImporter): |
||
755 | |||
756 | def Import(self): |
||
757 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
758 | for row in self.get_rows(3): |
||
759 | if not row['Supplier_Name']: |
||
760 | continue |
||
761 | if not row['Firstname']: |
||
762 | continue |
||
763 | folder = bsc(portal_type="Supplier", |
||
764 | Title=row['Supplier_Name']) |
||
765 | if not folder: |
||
766 | continue |
||
767 | folder = folder[0].getObject() |
||
768 | obj = _createObjectByType("SupplierContact", folder, tmpID()) |
||
769 | obj.edit( |
||
770 | Firstname=row['Firstname'], |
||
771 | Surname=row.get('Surname', ''), |
||
772 | Username=row.get('Username') |
||
773 | ) |
||
774 | self.fill_contactfields(row, obj) |
||
775 | self.fill_addressfields(row, obj) |
||
776 | obj.unmarkCreationFlag() |
||
777 | renameAfterCreation(obj) |
||
778 | notify(ObjectInitializedEvent(obj)) |
||
779 | |||
780 | |||
781 | class Manufacturers(WorksheetImporter): |
||
782 | |||
783 | def Import(self): |
||
784 | container = self.context.setup.manufacturers |
||
785 | for row in self.get_rows(3): |
||
786 | title = row.get("title") |
||
787 | if not title: |
||
788 | continue |
||
789 | api.create(container, "Manufacturer", |
||
790 | title=title, description=row.get("description")) |
||
791 | |||
792 | |||
793 | class Instrument_Types(WorksheetImporter): |
||
794 | |||
795 | def Import(self): |
||
796 | container = self.context.setup.instrumenttypes |
||
797 | for row in self.get_rows(3): |
||
798 | title = row.get("title") |
||
799 | if not title: |
||
800 | continue |
||
801 | api.create(container, "InstrumentType", |
||
802 | title=title, description=row.get("description")) |
||
803 | |||
804 | |||
805 | class Instruments(WorksheetImporter): |
||
806 | |||
807 | def Import(self): |
||
808 | folder = self.context.bika_setup.bika_instruments |
||
809 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
810 | for row in self.get_rows(3): |
||
811 | if ('Type' not in row |
||
812 | or 'Supplier' not in row |
||
813 | or 'Brand' not in row): |
||
814 | logger.info( |
||
815 | "Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title', '')) |
||
816 | continue |
||
817 | |||
818 | obj = _createObjectByType("Instrument", folder, tmpID()) |
||
819 | |||
820 | obj.edit( |
||
821 | title=row.get('title', ''), |
||
822 | AssetNumber=row.get('assetnumber', ''), |
||
823 | description=row.get('description', ''), |
||
824 | Type=row.get('Type', ''), |
||
825 | Brand=row.get('Brand', ''), |
||
826 | Model=row.get('Model', ''), |
||
827 | SerialNo=row.get('SerialNo', ''), |
||
828 | DataInterface=row.get('DataInterface', ''), |
||
829 | Location=row.get('Location', ''), |
||
830 | InstallationDate=row.get('Instalationdate', ''), |
||
831 | UserManualID=row.get('UserManualID', ''), |
||
832 | ) |
||
833 | instrumenttype = self.get_object( |
||
834 | bsc, 'InstrumentType', title=row.get('Type')) |
||
835 | manufacturer = self.get_object( |
||
836 | bsc, 'Manufacturer', title=row.get('Brand')) |
||
837 | supplier = self.get_object( |
||
838 | bsc, 'Supplier', title=row.get('Supplier', '')) |
||
839 | method = self.get_object(bsc, 'Method', title=row.get('Method')) |
||
840 | obj.setInstrumentType(instrumenttype) |
||
841 | obj.setManufacturer(manufacturer) |
||
842 | obj.setSupplier(supplier) |
||
843 | if method: |
||
844 | obj.setMethods([method]) |
||
845 | obj.setMethod(method) |
||
846 | |||
847 | # Attaching the instrument's photo |
||
848 | View Code Duplication | if row.get('Photo', None): |
|
849 | path = resource_filename( |
||
850 | self.dataset_project, |
||
851 | "setupdata/%s/%s" % (self.dataset_name, |
||
852 | row['Photo']) |
||
853 | ) |
||
854 | try: |
||
855 | file_data = read_file(path) |
||
856 | obj.setPhoto(file_data) |
||
857 | except Exception as msg: |
||
858 | file_data = None |
||
859 | logger.warning( |
||
860 | msg[0] + " Error on sheet: " + self.sheetname) |
||
861 | |||
862 | # Attaching the Installation Certificate if exists |
||
863 | View Code Duplication | if row.get('InstalationCertificate', None): |
|
864 | path = resource_filename( |
||
865 | self.dataset_project, |
||
866 | "setupdata/%s/%s" % (self.dataset_name, |
||
867 | row['InstalationCertificate']) |
||
868 | ) |
||
869 | try: |
||
870 | file_data = read_file(path) |
||
871 | obj.setInstallationCertificate(file_data) |
||
872 | except Exception as msg: |
||
873 | logger.warning( |
||
874 | msg[0] + " Error on sheet: " + self.sheetname) |
||
875 | |||
876 | # Attaching the Instrument's manual if exists |
||
877 | if row.get('UserManualFile', None): |
||
878 | row_dict = {'DocumentID': row.get('UserManualID', 'manual'), |
||
879 | 'DocumentVersion': '', |
||
880 | 'DocumentLocation': '', |
||
881 | 'DocumentType': 'Manual', |
||
882 | 'File': row.get('UserManualFile', None) |
||
883 | } |
||
884 | addDocument(self, row_dict, obj) |
||
885 | obj.unmarkCreationFlag() |
||
886 | renameAfterCreation(obj) |
||
887 | notify(ObjectInitializedEvent(obj)) |
||
888 | |||
889 | |||
890 | View Code Duplication | class Instrument_Validations(WorksheetImporter): |
|
891 | |||
892 | def Import(self): |
||
893 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
894 | for row in self.get_rows(3): |
||
895 | if not row.get('instrument', None) or not row.get('title', None): |
||
896 | continue |
||
897 | |||
898 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
899 | if folder: |
||
900 | obj = _createObjectByType( |
||
901 | "InstrumentValidation", folder, tmpID()) |
||
902 | obj.edit( |
||
903 | title=row['title'], |
||
904 | DownFrom=row.get('downfrom', ''), |
||
905 | DownTo=row.get('downto', ''), |
||
906 | Validator=row.get('validator', ''), |
||
907 | Considerations=row.get('considerations', ''), |
||
908 | WorkPerformed=row.get('workperformed', ''), |
||
909 | Remarks=row.get('remarks', ''), |
||
910 | DateIssued=row.get('DateIssued', ''), |
||
911 | ReportID=row.get('ReportID', '') |
||
912 | ) |
||
913 | # Getting lab contacts |
||
914 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
915 | lab_contacts = [o.getObject() for o in bsc( |
||
916 | portal_type="LabContact", is_active=True)] |
||
917 | for contact in lab_contacts: |
||
918 | if contact.getFullname() == row.get('Worker', ''): |
||
919 | obj.setWorker(contact.UID()) |
||
920 | obj.unmarkCreationFlag() |
||
921 | renameAfterCreation(obj) |
||
922 | notify(ObjectInitializedEvent(obj)) |
||
923 | |||
924 | |||
925 | View Code Duplication | class Instrument_Calibrations(WorksheetImporter): |
|
926 | |||
927 | def Import(self): |
||
928 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
929 | for row in self.get_rows(3): |
||
930 | if not row.get('instrument', None) or not row.get('title', None): |
||
931 | continue |
||
932 | |||
933 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
934 | if folder: |
||
935 | obj = _createObjectByType( |
||
936 | "InstrumentCalibration", folder, tmpID()) |
||
937 | obj.edit( |
||
938 | title=row['title'], |
||
939 | DownFrom=row.get('downfrom', ''), |
||
940 | DownTo=row.get('downto', ''), |
||
941 | Calibrator=row.get('calibrator', ''), |
||
942 | Considerations=row.get('considerations', ''), |
||
943 | WorkPerformed=row.get('workperformed', ''), |
||
944 | Remarks=row.get('remarks', ''), |
||
945 | DateIssued=row.get('DateIssued', ''), |
||
946 | ReportID=row.get('ReportID', '') |
||
947 | ) |
||
948 | # Gettinginstrument lab contacts |
||
949 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
950 | lab_contacts = [o.getObject() for o in bsc( |
||
951 | portal_type="LabContact", nactive_state='active')] |
||
952 | for contact in lab_contacts: |
||
953 | if contact.getFullname() == row.get('Worker', ''): |
||
954 | obj.setWorker(contact.UID()) |
||
955 | obj.unmarkCreationFlag() |
||
956 | renameAfterCreation(obj) |
||
957 | notify(ObjectInitializedEvent(obj)) |
||
958 | |||
959 | |||
960 | class Instrument_Certifications(WorksheetImporter): |
||
961 | |||
962 | def Import(self): |
||
963 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
964 | for row in self.get_rows(3): |
||
965 | if not row['instrument'] or not row['title']: |
||
966 | continue |
||
967 | |||
968 | folder = self.get_object( |
||
969 | bsc, 'Instrument', row.get('instrument', '')) |
||
970 | if folder: |
||
971 | obj = _createObjectByType( |
||
972 | "InstrumentCertification", folder, tmpID()) |
||
973 | today = datetime.date.today() |
||
974 | certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \ |
||
975 | if row.get('validto', '') == '' else row.get('validto') |
||
976 | certificate_start_date = today.strftime('%d/%m/%Y') \ |
||
977 | if row.get('validfrom', '') == '' else row.get('validfrom') |
||
978 | obj.edit( |
||
979 | title=row['title'], |
||
980 | AssetNumber=row.get('assetnumber', ''), |
||
981 | Date=row.get('date', ''), |
||
982 | ValidFrom=certificate_start_date, |
||
983 | ValidTo=certificate_expire_date, |
||
984 | Agency=row.get('agency', ''), |
||
985 | Remarks=row.get('remarks', ''), |
||
986 | ) |
||
987 | # Attaching the Report Certificate if exists |
||
988 | View Code Duplication | if row.get('report', None): |
|
989 | path = resource_filename( |
||
990 | self.dataset_project, |
||
991 | "setupdata/%s/%s" % (self.dataset_name, |
||
992 | row['report']) |
||
993 | ) |
||
994 | try: |
||
995 | file_data = read_file(path) |
||
996 | obj.setDocument(file_data) |
||
997 | except Exception as msg: |
||
998 | file_data = None |
||
999 | logger.warning( |
||
1000 | msg[0] + " Error on sheet: " + self.sheetname) |
||
1001 | |||
1002 | # Getting lab contacts |
||
1003 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1004 | lab_contacts = [o.getObject() for o in bsc( |
||
1005 | portal_type="LabContact", nactive_state='active')] |
||
1006 | for contact in lab_contacts: |
||
1007 | if contact.getFullname() == row.get('preparedby', ''): |
||
1008 | obj.setPreparator(contact.UID()) |
||
1009 | if contact.getFullname() == row.get('approvedby', ''): |
||
1010 | obj.setValidator(contact.UID()) |
||
1011 | obj.unmarkCreationFlag() |
||
1012 | renameAfterCreation(obj) |
||
1013 | notify(ObjectInitializedEvent(obj)) |
||
1014 | |||
1015 | |||
1016 | class Instrument_Documents(WorksheetImporter): |
||
1017 | |||
1018 | def Import(self): |
||
1019 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1020 | for row in self.get_rows(3): |
||
1021 | if not row.get('instrument', ''): |
||
1022 | continue |
||
1023 | folder = self.get_object( |
||
1024 | bsc, 'Instrument', row.get('instrument', '')) |
||
1025 | addDocument(self, row, folder) |
||
1026 | |||
1027 | |||
1028 | def addDocument(self, row_dict, folder): |
||
1029 | """ |
||
1030 | This function adds a multifile object to the instrument folder |
||
1031 | :param row_dict: the dictionary which contains the document information |
||
1032 | :param folder: the instrument object |
||
1033 | """ |
||
1034 | if folder: |
||
1035 | # This content type need a file |
||
1036 | if row_dict.get('File', None): |
||
1037 | path = resource_filename( |
||
1038 | self.dataset_project, |
||
1039 | "setupdata/%s/%s" % (self.dataset_name, |
||
1040 | row_dict['File']) |
||
1041 | ) |
||
1042 | try: |
||
1043 | file_data = read_file(path) |
||
1044 | except Exception as msg: |
||
1045 | file_data = None |
||
1046 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
1047 | |||
1048 | # Obtain all created instrument documents content type |
||
1049 | catalog = getToolByName(self.context, SETUP_CATALOG) |
||
1050 | documents_brains = catalog.searchResults( |
||
1051 | {'portal_type': 'Multifile'}) |
||
1052 | # If a the new document has the same DocumentID as a created document, this object won't be created. |
||
1053 | idAlreadyInUse = False |
||
1054 | for item in documents_brains: |
||
1055 | if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''): |
||
1056 | warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \ |
||
1057 | "the file hasn't been upload." % (row_dict.get( |
||
1058 | 'DocumentID', ''), row_dict.get('instrument', '')) |
||
1059 | self.context.plone_utils.addPortalMessage(warning) |
||
1060 | idAlreadyInUse = True |
||
1061 | if not idAlreadyInUse: |
||
1062 | obj = _createObjectByType("Multifile", folder, tmpID()) |
||
1063 | obj.edit( |
||
1064 | DocumentID=row_dict.get('DocumentID', ''), |
||
1065 | DocumentVersion=row_dict.get('DocumentVersion', ''), |
||
1066 | DocumentLocation=row_dict.get('DocumentLocation', ''), |
||
1067 | DocumentType=row_dict.get('DocumentType', ''), |
||
1068 | File=file_data |
||
1069 | ) |
||
1070 | obj.unmarkCreationFlag() |
||
1071 | renameAfterCreation(obj) |
||
1072 | notify(ObjectInitializedEvent(obj)) |
||
1073 | |||
1074 | |||
1075 | class Instrument_Maintenance_Tasks(WorksheetImporter): |
||
1076 | |||
1077 | def Import(self): |
||
1078 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1079 | for row in self.get_rows(3): |
||
1080 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1081 | continue |
||
1082 | |||
1083 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
1084 | if folder: |
||
1085 | obj = _createObjectByType( |
||
1086 | "InstrumentMaintenanceTask", folder, tmpID()) |
||
1087 | try: |
||
1088 | cost = "%.2f" % (row.get('cost', 0)) |
||
1089 | except Exception: |
||
1090 | cost = row.get('cost', '0.0') |
||
1091 | |||
1092 | obj.edit( |
||
1093 | title=row['title'], |
||
1094 | description=row['description'], |
||
1095 | Type=row['type'], |
||
1096 | DownFrom=row.get('downfrom', ''), |
||
1097 | DownTo=row.get('downto', ''), |
||
1098 | Maintainer=row.get('maintaner', ''), |
||
1099 | Considerations=row.get('considerations', ''), |
||
1100 | WorkPerformed=row.get('workperformed', ''), |
||
1101 | Remarks=row.get('remarks', ''), |
||
1102 | Cost=cost, |
||
1103 | Closed=self.to_bool(row.get('closed')) |
||
1104 | ) |
||
1105 | obj.unmarkCreationFlag() |
||
1106 | renameAfterCreation(obj) |
||
1107 | notify(ObjectInitializedEvent(obj)) |
||
1108 | |||
1109 | |||
1110 | class Instrument_Schedule(WorksheetImporter): |
||
1111 | |||
1112 | def Import(self): |
||
1113 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1114 | for row in self.get_rows(3): |
||
1115 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1116 | continue |
||
1117 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
1118 | if folder: |
||
1119 | obj = _createObjectByType( |
||
1120 | "InstrumentScheduledTask", folder, tmpID()) |
||
1121 | criteria = [ |
||
1122 | {'fromenabled': row.get('date', None) is not None, |
||
1123 | 'fromdate': row.get('date', ''), |
||
1124 | 'repeatenabled': ((row['numrepeats'] and |
||
1125 | row['numrepeats'] > 1) or |
||
1126 | (row['repeatuntil'] and |
||
1127 | len(row['repeatuntil']) > 0)), |
||
1128 | 'repeatunit': row.get('numrepeats', ''), |
||
1129 | 'repeatperiod': row.get('periodicity', ''), |
||
1130 | 'repeatuntilenabled': (row['repeatuntil'] and |
||
1131 | len(row['repeatuntil']) > 0), |
||
1132 | 'repeatuntil': row.get('repeatuntil')} |
||
1133 | ] |
||
1134 | obj.edit( |
||
1135 | title=row['title'], |
||
1136 | Type=row['type'], |
||
1137 | ScheduleCriteria=criteria, |
||
1138 | Considerations=row.get('considerations', ''), |
||
1139 | ) |
||
1140 | obj.unmarkCreationFlag() |
||
1141 | renameAfterCreation(obj) |
||
1142 | notify(ObjectInitializedEvent(obj)) |
||
1143 | |||
1144 | |||
1145 | class Sample_Matrices(WorksheetImporter): |
||
1146 | |||
1147 | def Import(self): |
||
1148 | container = self.context.setup.samplematrices |
||
1149 | for row in self.get_rows(3): |
||
1150 | title = row.get("title") |
||
1151 | if not title: |
||
1152 | continue |
||
1153 | api.create(container, "SampleMatrix", |
||
1154 | title=title, description=row.get("description")) |
||
1155 | |||
1156 | |||
1157 | class Batch_Labels(WorksheetImporter): |
||
1158 | |||
1159 | def Import(self): |
||
1160 | container = self.context.setup.batchlabels |
||
1161 | for row in self.get_rows(3): |
||
1162 | title = row.get("title") |
||
1163 | if not title: |
||
1164 | continue |
||
1165 | api.create(container, "BatchLabel", title=title) |
||
1166 | |||
1167 | |||
1168 | class Sample_Types(WorksheetImporter): |
||
1169 | |||
1170 | def Import(self): |
||
1171 | container = self.context.setup.sampletypes |
||
1172 | sc = api.get_tool(SETUP_CATALOG) |
||
1173 | |||
1174 | for row in self.get_rows(3): |
||
1175 | title = row.get("title") |
||
1176 | if not title: |
||
1177 | continue |
||
1178 | |||
1179 | obj = api.create(container, "SampleType", title=title, |
||
1180 | description=row.get("description")) |
||
1181 | |||
1182 | samplematrix = self.get_object( |
||
1183 | sc, 'SampleMatrix', row.get('SampleMatrix_title')) |
||
1184 | containertype = self.get_object( |
||
1185 | sc, 'ContainerType', row.get('ContainerType_title')) |
||
1186 | |||
1187 | if samplematrix: |
||
1188 | obj.setSampleMatrix(samplematrix) |
||
1189 | if containertype: |
||
1190 | obj.setContainerType(containertype) |
||
1191 | |||
1192 | obj.setHazardous(self.to_bool(row['Hazardous'])) |
||
1193 | obj.setPrefix(row['Prefix']) |
||
1194 | obj.setMinimumVolume(row['MinimumVolume']) |
||
1195 | obj.setRetentionPeriod({ |
||
1196 | 'days': row['RetentionPeriod'] or 0, |
||
1197 | 'hours': 0, |
||
1198 | 'minutes': 0}) |
||
1199 | |||
1200 | samplepoint = self.get_object(sc, 'SamplePoint', |
||
1201 | row.get('SamplePoint_title')) |
||
1202 | if samplepoint: |
||
1203 | samplepoint.setSampleTypes([obj, ]) |
||
1204 | obj.reindexObject() |
||
1205 | |||
1206 | |||
1207 | class Sample_Points(WorksheetImporter): |
||
1208 | |||
1209 | def Import(self): |
||
1210 | setup_folder = self.context.setup.samplepoints |
||
1211 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1212 | cat = api.get_tool(CLIENT_CATALOG) |
||
1213 | for row in self.get_rows(3): |
||
1214 | if not row['title']: |
||
1215 | continue |
||
1216 | if row['Client_title']: |
||
1217 | client_title = row['Client_title'] |
||
1218 | client = cat(portal_type="Client", getName=client_title) |
||
1219 | if len(client) == 0: |
||
1220 | error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded." |
||
1221 | logger.error(error, row['title'], client_title) |
||
1222 | continue |
||
1223 | folder = client[0].getObject() |
||
1224 | else: |
||
1225 | folder = setup_folder |
||
1226 | |||
1227 | if row['Latitude']: |
||
1228 | logger.log("Ignored SamplePoint Latitude", 'error') |
||
1229 | if row['Longitude']: |
||
1230 | logger.log("Ignored SamplePoint Longitude", 'error') |
||
1231 | |||
1232 | obj = api.create(folder, "SamplePoint", title=row['title'], |
||
1233 | description=row.get('description', '')) |
||
1234 | obj.setComposite(self.to_bool(row["Composite"])) |
||
1235 | obj.setElevation(row["Elevation"]) |
||
1236 | sampletype = self.get_object(bsc, 'SampleType', |
||
1237 | row.get('SampleType_title')) |
||
1238 | if sampletype: |
||
1239 | obj.setSampleTypes([sampletype, ]) |
||
1240 | obj.reindexObject() |
||
1241 | |||
1242 | |||
1243 | class Sample_Point_Sample_Types(WorksheetImporter): |
||
1244 | |||
1245 | def Import(self): |
||
1246 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1247 | for row in self.get_rows(3): |
||
1248 | sampletype = self.get_object(bsc, |
||
1249 | 'SampleType', |
||
1250 | row.get('SampleType_title')) |
||
1251 | samplepoint = self.get_object(bsc, |
||
1252 | 'SamplePoint', |
||
1253 | row['SamplePoint_title']) |
||
1254 | if samplepoint: |
||
1255 | sampletypes = samplepoint.getSampleTypes() |
||
1256 | if sampletype not in sampletypes: |
||
1257 | sampletypes.append(sampletype) |
||
1258 | samplepoint.setSampleTypes(sampletypes) |
||
1259 | |||
1260 | |||
1261 | class Storage_Locations(WorksheetImporter): |
||
1262 | |||
1263 | def Import(self): |
||
1264 | container = self.context.setup.storagelocations |
||
1265 | for row in self.get_rows(3): |
||
1266 | address = row.get('Address') |
||
1267 | if not address: |
||
1268 | continue |
||
1269 | |||
1270 | api.create(container, "StorageLocation", |
||
1271 | title=address, |
||
1272 | SiteTitle=row.get('SiteTitle'), |
||
1273 | SiteCode=row.get('SiteCode'), |
||
1274 | SiteDescription=row.get('SiteDescription'), |
||
1275 | LocationTitle=row.get('LocationTitle'), |
||
1276 | LocationCode=row.get('LocationCode'), |
||
1277 | LocationDescription=row.get('LocationDescription'), |
||
1278 | LocationType=row.get('LocationType'), |
||
1279 | ShelfTitle=row.get('ShelfTitle'), |
||
1280 | ShelfCode=row.get('ShelfCode'), |
||
1281 | ShelfDescription=row.get('ShelfDescription')) |
||
1282 | |||
1283 | |||
1284 | class Sample_Conditions(WorksheetImporter): |
||
1285 | |||
1286 | def Import(self): |
||
1287 | container = self.context.setup.sampleconditions |
||
1288 | for row in self.get_rows(3): |
||
1289 | title = row.get("title") |
||
1290 | if not title: |
||
1291 | continue |
||
1292 | |||
1293 | description = row.get("description") |
||
1294 | api.create(container, "SampleCondition", |
||
1295 | title=title, |
||
1296 | description=description) |
||
1297 | |||
1298 | |||
1299 | class Analysis_Categories(WorksheetImporter): |
||
1300 | |||
1301 | def Import(self): |
||
1302 | container = self.context.setup.analysiscategories |
||
1303 | setup_tool = getToolByName(self.context, SETUP_CATALOG) |
||
1304 | for row in self.get_rows(3): |
||
1305 | title = row.get("title") |
||
1306 | if not title: |
||
1307 | logger.warning("Error in in {}. Missing Title field." |
||
1308 | .format(self.sheetname)) |
||
1309 | continue |
||
1310 | |||
1311 | department_title = row.get("Department_title", None) |
||
1312 | if not department_title: |
||
1313 | logger.warning("Error in {}. Department field missing." |
||
1314 | .format(self.sheetname)) |
||
1315 | continue |
||
1316 | |||
1317 | department = self.get_object(setup_tool, "Department", |
||
1318 | title=department_title) |
||
1319 | if not department: |
||
1320 | logger.warning("Error in {}. Department '{}' is wrong." |
||
1321 | .format(self.sheetname, department_title)) |
||
1322 | continue |
||
1323 | |||
1324 | description = row.get("description", "") |
||
1325 | comments = row.get("comments", "") |
||
1326 | api.create(container, "AnalysisCategory", |
||
1327 | title=title, |
||
1328 | description=description, |
||
1329 | comments=comments, |
||
1330 | department=department) |
||
1331 | |||
1332 | |||
1333 | class Methods(WorksheetImporter): |
||
1334 | |||
1335 | def Import(self): |
||
1336 | folder = self.context.methods |
||
1337 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1338 | for row in self.get_rows(3): |
||
1339 | if row['title']: |
||
1340 | calculation = self.get_object( |
||
1341 | bsc, 'Calculation', row.get('Calculation_title')) |
||
1342 | obj = _createObjectByType("Method", folder, tmpID()) |
||
1343 | obj.edit( |
||
1344 | title=row['title'], |
||
1345 | description=row.get('description', ''), |
||
1346 | Instructions=row.get('Instructions', ''), |
||
1347 | ManualEntryOfResults=row.get('ManualEntryOfResults', True), |
||
1348 | Calculation=calculation, |
||
1349 | MethodID=row.get('MethodID', ''), |
||
1350 | Accredited=row.get('Accredited', True), |
||
1351 | ) |
||
1352 | # Obtain all created methods |
||
1353 | methods_brains = bsc.searchResults({'portal_type': 'Method'}) |
||
1354 | # If a the new method has the same MethodID as a created method, remove MethodID value. |
||
1355 | for methods in methods_brains: |
||
1356 | if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']: |
||
1357 | obj.edit(MethodID='') |
||
1358 | |||
1359 | View Code Duplication | if row['MethodDocument']: |
|
1360 | path = resource_filename( |
||
1361 | self.dataset_project, |
||
1362 | "setupdata/%s/%s" % (self.dataset_name, |
||
1363 | row['MethodDocument']) |
||
1364 | ) |
||
1365 | try: |
||
1366 | file_data = read_file(path) |
||
1367 | obj.setMethodDocument(file_data) |
||
1368 | except Exception as msg: |
||
1369 | logger.warning( |
||
1370 | msg[0] + " Error on sheet: " + self.sheetname) |
||
1371 | |||
1372 | obj.unmarkCreationFlag() |
||
1373 | renameAfterCreation(obj) |
||
1374 | notify(ObjectInitializedEvent(obj)) |
||
1375 | |||
1376 | |||
1377 | class Sampling_Deviations(WorksheetImporter): |
||
1378 | |||
1379 | def Import(self): |
||
1380 | container = self.context.setup.samplingdeviations |
||
1381 | for row in self.get_rows(3): |
||
1382 | title = row.get("title") |
||
1383 | if not title: |
||
1384 | continue |
||
1385 | api.create(container, "SamplingDeviation", |
||
1386 | title=title, description=row.get("description")) |
||
1387 | |||
1388 | |||
1389 | class Calculations(WorksheetImporter): |
||
1390 | |||
1391 | def get_interim_fields(self): |
||
1392 | # preload Calculation Interim Fields sheet |
||
1393 | sheetname = 'Calculation Interim Fields' |
||
1394 | worksheet = self.workbook[sheetname] |
||
1395 | if not worksheet: |
||
1396 | return |
||
1397 | self.interim_fields = {} |
||
1398 | rows = self.get_rows(3, worksheet=worksheet) |
||
1399 | for row in rows: |
||
1400 | calc_title = row['Calculation_title'] |
||
1401 | if calc_title not in self.interim_fields.keys(): |
||
1402 | self.interim_fields[calc_title] = [] |
||
1403 | self.interim_fields[calc_title].append({ |
||
1404 | 'keyword': row['keyword'], |
||
1405 | 'title': row.get('title', ''), |
||
1406 | 'type': 'int', |
||
1407 | 'hidden': ('hidden' in row and row['hidden']) and True or False, |
||
1408 | 'value': row['value'], |
||
1409 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1410 | |||
1411 | def Import(self): |
||
1412 | self.get_interim_fields() |
||
1413 | folder = self.context.bika_setup.bika_calculations |
||
1414 | for row in self.get_rows(3): |
||
1415 | if not row['title']: |
||
1416 | continue |
||
1417 | calc_title = row['title'] |
||
1418 | calc_interims = self.interim_fields.get(calc_title, []) |
||
1419 | formula = row['Formula'] |
||
1420 | # scan formula for dep services |
||
1421 | keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula) |
||
1422 | # remove interims from deps |
||
1423 | interim_keys = [k['keyword'] for k in calc_interims] |
||
1424 | dep_keywords = [k for k in keywords if k not in interim_keys] |
||
1425 | |||
1426 | obj = _createObjectByType("Calculation", folder, tmpID()) |
||
1427 | obj.edit( |
||
1428 | title=calc_title, |
||
1429 | description=row.get('description', ''), |
||
1430 | InterimFields=calc_interims, |
||
1431 | Formula=str(row['Formula']) |
||
1432 | ) |
||
1433 | for kw in dep_keywords: |
||
1434 | self.defer(src_obj=obj, |
||
1435 | src_field='DependentServices', |
||
1436 | dest_catalog=SETUP_CATALOG, |
||
1437 | dest_query={'portal_type': 'AnalysisService', |
||
1438 | 'getKeyword': kw} |
||
1439 | ) |
||
1440 | obj.unmarkCreationFlag() |
||
1441 | renameAfterCreation(obj) |
||
1442 | notify(ObjectInitializedEvent(obj)) |
||
1443 | |||
1444 | # Now we have the calculations registered, try to assign default calcs |
||
1445 | # to methods |
||
1446 | sheet = self.workbook["Methods"] |
||
1447 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1448 | for row in self.get_rows(3, sheet): |
||
1449 | if row.get('title', '') and row.get('Calculation_title', ''): |
||
1450 | meth = self.get_object(bsc, "Method", row.get('title')) |
||
1451 | if meth and not meth.getCalculation(): |
||
1452 | calctit = safe_unicode( |
||
1453 | row['Calculation_title']).encode('utf-8') |
||
1454 | calc = self.get_object(bsc, "Calculation", calctit) |
||
1455 | if calc: |
||
1456 | meth.setCalculation(calc.UID()) |
||
1457 | |||
1458 | |||
1459 | class Analysis_Services(WorksheetImporter): |
||
1460 | |||
1461 | def load_interim_fields(self): |
||
1462 | # preload AnalysisService InterimFields sheet |
||
1463 | sheetname = 'AnalysisService InterimFields' |
||
1464 | worksheet = self.workbook[sheetname] |
||
1465 | if not worksheet: |
||
1466 | return |
||
1467 | self.service_interims = {} |
||
1468 | rows = self.get_rows(3, worksheet=worksheet) |
||
1469 | for row in rows: |
||
1470 | service_title = row['Service_title'] |
||
1471 | if service_title not in self.service_interims.keys(): |
||
1472 | self.service_interims[service_title] = [] |
||
1473 | self.service_interims[service_title].append({ |
||
1474 | 'keyword': row['keyword'], |
||
1475 | 'title': row.get('title', ''), |
||
1476 | 'type': 'int', |
||
1477 | 'value': row['value'], |
||
1478 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1479 | |||
1480 | def load_result_options(self): |
||
1481 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1482 | sheetname = 'AnalysisService ResultOptions' |
||
1483 | worksheet = self.workbook[sheetname] |
||
1484 | if not worksheet: |
||
1485 | return |
||
1486 | for row in self.get_rows(3, worksheet=worksheet): |
||
1487 | service = self.get_object(bsc, 'AnalysisService', |
||
1488 | row.get('Service_title')) |
||
1489 | if not service: |
||
1490 | return |
||
1491 | sro = service.getResultOptions() |
||
1492 | sro.append({'ResultValue': row['ResultValue'], |
||
1493 | 'ResultText': row['ResultText']}) |
||
1494 | service.setResultOptions(sro) |
||
1495 | |||
1496 | def load_service_uncertainties(self): |
||
1497 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1498 | sheetname = 'Analysis Service Uncertainties' |
||
1499 | worksheet = self.workbook[sheetname] |
||
1500 | if not worksheet: |
||
1501 | return |
||
1502 | |||
1503 | bucket = {} |
||
1504 | count = 0 |
||
1505 | for row in self.get_rows(3, worksheet=worksheet): |
||
1506 | count += 1 |
||
1507 | service = self.get_object(bsc, 'AnalysisService', |
||
1508 | row.get('Service_title')) |
||
1509 | if not service: |
||
1510 | warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get( |
||
1511 | 'Service_title') |
||
1512 | logger.warning(warning) |
||
1513 | continue |
||
1514 | service_uid = service.UID() |
||
1515 | if service_uid not in bucket: |
||
1516 | bucket[service_uid] = [] |
||
1517 | bucket[service_uid].append( |
||
1518 | {'intercept_min': row['Range Min'], |
||
1519 | 'intercept_max': row['Range Max'], |
||
1520 | 'errorvalue': row['Uncertainty Value']} |
||
1521 | ) |
||
1522 | if count > 500: |
||
1523 | self.write_bucket(bucket) |
||
1524 | bucket = {} |
||
1525 | if bucket: |
||
1526 | self.write_bucket(bucket) |
||
1527 | |||
1528 | def get_methods(self, service_title, default_method): |
||
1529 | """ Return an array of objects of the type Method in accordance to the |
||
1530 | methods listed in the 'AnalysisService Methods' sheet and service |
||
1531 | set in the parameter service_title. |
||
1532 | If default_method is set, it will be included in the returned |
||
1533 | array. |
||
1534 | """ |
||
1535 | return self.get_relations(service_title, |
||
1536 | default_method, |
||
1537 | 'Method', |
||
1538 | SETUP_CATALOG, |
||
1539 | 'AnalysisService Methods', |
||
1540 | 'Method_title') |
||
1541 | |||
1542 | def get_instruments(self, service_title, default_instrument): |
||
1543 | """ Return an array of objects of the type Instrument in accordance to |
||
1544 | the instruments listed in the 'AnalysisService Instruments' sheet |
||
1545 | and service set in the parameter 'service_title'. |
||
1546 | If default_instrument is set, it will be included in the returned |
||
1547 | array. |
||
1548 | """ |
||
1549 | return self.get_relations(service_title, |
||
1550 | default_instrument, |
||
1551 | 'Instrument', |
||
1552 | SETUP_CATALOG, |
||
1553 | 'AnalysisService Instruments', |
||
1554 | 'Instrument_title') |
||
1555 | |||
1556 | def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column): |
||
1557 | """ Return an array of objects of the specified type in accordance to |
||
1558 | the object titles defined in the sheet specified in 'sheet_name' and |
||
1559 | service set in the paramenter 'service_title'. |
||
1560 | If a default_obj is set, it will be included in the returned array. |
||
1561 | """ |
||
1562 | out_objects = [default_obj] if default_obj else [] |
||
1563 | cat = getToolByName(self.context, catalog_name) |
||
1564 | worksheet = self.workbook[sheet_name] |
||
1565 | if not worksheet: |
||
1566 | return out_objects |
||
1567 | for row in self.get_rows(3, worksheet=worksheet): |
||
1568 | row_as_title = row.get('Service_title') |
||
1569 | if not row_as_title: |
||
1570 | return out_objects |
||
1571 | elif row_as_title != service_title: |
||
1572 | continue |
||
1573 | obj = self.get_object(cat, obj_type, row.get(column)) |
||
1574 | if obj: |
||
1575 | if default_obj and default_obj.UID() == obj.UID(): |
||
1576 | continue |
||
1577 | out_objects.append(obj) |
||
1578 | return out_objects |
||
1579 | |||
1580 | def write_bucket(self, bucket): |
||
1581 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1582 | for service_uid, uncertainties in bucket.items(): |
||
1583 | obj = bsc(UID=service_uid)[0].getObject() |
||
1584 | _uncert = list(obj.getUncertainties()) |
||
1585 | _uncert.extend(uncertainties) |
||
1586 | obj.setUncertainties(_uncert) |
||
1587 | |||
1588 | def Import(self): |
||
1589 | self.load_interim_fields() |
||
1590 | folder = self.context.bika_setup.bika_analysisservices |
||
1591 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1592 | for row in self.get_rows(3): |
||
1593 | if not row['title']: |
||
1594 | continue |
||
1595 | |||
1596 | obj = _createObjectByType("AnalysisService", folder, tmpID()) |
||
1597 | MTA = { |
||
1598 | 'days': self.to_int(row.get('MaxTimeAllowed_days', 0), 0), |
||
1599 | 'hours': self.to_int(row.get('MaxTimeAllowed_hours', 0), 0), |
||
1600 | 'minutes': self.to_int(row.get('MaxTimeAllowed_minutes', 0), 0), |
||
1601 | } |
||
1602 | category = self.get_object( |
||
1603 | bsc, 'AnalysisCategory', row.get('AnalysisCategory_title')) |
||
1604 | department = self.get_object( |
||
1605 | bsc, 'Department', row.get('Department_title')) |
||
1606 | container = self.get_object( |
||
1607 | bsc, 'SampleContainer', row.get('Container_title')) |
||
1608 | preservation = self.get_object( |
||
1609 | bsc, 'SamplePreservation', row.get('Preservation_title')) |
||
1610 | |||
1611 | # Analysis Service - Method considerations: |
||
1612 | # One Analysis Service can have 0 or n Methods associated (field |
||
1613 | # 'Methods' from the Schema). |
||
1614 | # If the Analysis Service has at least one method associated, then |
||
1615 | # one of those methods can be set as the defualt method (field |
||
1616 | # '_Method' from the Schema). |
||
1617 | # |
||
1618 | # To make it easier, if a DefaultMethod is declared in the |
||
1619 | # Analysis_Services spreadsheet, but the same AS has no method |
||
1620 | # associated in the Analysis_Service_Methods spreadsheet, then make |
||
1621 | # the assumption that the DefaultMethod set in the former has to be |
||
1622 | # associated to the AS although the relation is missing. |
||
1623 | defaultmethod = self.get_object( |
||
1624 | bsc, 'Method', row.get('DefaultMethod_title')) |
||
1625 | methods = self.get_methods(row['title'], defaultmethod) |
||
1626 | if not defaultmethod and methods: |
||
1627 | defaultmethod = methods[0] |
||
1628 | |||
1629 | # Analysis Service - Instrument considerations: |
||
1630 | # By default, an Analysis Services will be associated automatically |
||
1631 | # with several Instruments due to the Analysis Service - Methods |
||
1632 | # relation (an Instrument can be assigned to a Method and one Method |
||
1633 | # can have zero or n Instruments associated). There is no need to |
||
1634 | # set this assignment directly, the AnalysisService object will |
||
1635 | # find those instruments. |
||
1636 | # Besides this 'automatic' behavior, an Analysis Service can also |
||
1637 | # have 0 or n Instruments manually associated ('Instruments' field). |
||
1638 | # In this case, the attribute 'AllowInstrumentEntryOfResults' should |
||
1639 | # be set to True. |
||
1640 | # |
||
1641 | # To make it easier, if a DefaultInstrument is declared in the |
||
1642 | # Analysis_Services spreadsheet, but the same AS has no instrument |
||
1643 | # associated in the AnalysisService_Instruments spreadsheet, then |
||
1644 | # make the assumption the DefaultInstrument set in the former has |
||
1645 | # to be associated to the AS although the relation is missing and |
||
1646 | # the option AllowInstrumentEntryOfResults will be set to True. |
||
1647 | defaultinstrument = self.get_object( |
||
1648 | bsc, 'Instrument', row.get('DefaultInstrument_title')) |
||
1649 | instruments = self.get_instruments(row['title'], defaultinstrument) |
||
1650 | allowinstrentry = True if instruments else False |
||
1651 | if not defaultinstrument and instruments: |
||
1652 | defaultinstrument = instruments[0] |
||
1653 | |||
1654 | # The manual entry of results can only be set to false if the value |
||
1655 | # for the attribute "InstrumentEntryOfResults" is False. |
||
1656 | allowmanualentry = True if not allowinstrentry else row.get( |
||
1657 | 'ManualEntryOfResults', True) |
||
1658 | |||
1659 | # Analysis Service - Calculation considerations: |
||
1660 | # By default, the AnalysisService will use the Calculation associated |
||
1661 | # to the Default Method (the field "UseDefaultCalculation"==True). |
||
1662 | # If the Default Method for this AS doesn't have any Calculation |
||
1663 | # associated and the field "UseDefaultCalculation" is True, no |
||
1664 | # Calculation will be used for this AS ("_Calculation" field is |
||
1665 | # reserved and should not be set directly). |
||
1666 | # |
||
1667 | # To make it easier, if a Calculation is set by default in the |
||
1668 | # spreadsheet, then assume the UseDefaultCalculation has to be set |
||
1669 | # to False. |
||
1670 | deferredcalculation = self.get_object( |
||
1671 | bsc, 'Calculation', row.get('Calculation_title')) |
||
1672 | usedefaultcalculation = False if deferredcalculation else True |
||
1673 | _calculation = deferredcalculation if deferredcalculation else \ |
||
1674 | (defaultmethod.getCalculation() if defaultmethod else None) |
||
1675 | |||
1676 | obj.edit( |
||
1677 | title=row['title'], |
||
1678 | ShortTitle=row.get('ShortTitle', row['title']), |
||
1679 | description=row.get('description', ''), |
||
1680 | Keyword=row['Keyword'], |
||
1681 | PointOfCapture=row['PointOfCapture'].lower(), |
||
1682 | Category=category, |
||
1683 | Department=department, |
||
1684 | Unit=row['Unit'] and row['Unit'] or None, |
||
1685 | Precision=row['Precision'] and str(row['Precision']) or '0', |
||
1686 | ExponentialFormatPrecision=str(self.to_int( |
||
1687 | row.get('ExponentialFormatPrecision', 7), 7)), |
||
1688 | LowerDetectionLimit='%06f' % self.to_float( |
||
1689 | row.get('LowerDetectionLimit', '0.0'), 0), |
||
1690 | UpperDetectionLimit='%06f' % self.to_float( |
||
1691 | row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0), |
||
1692 | DetectionLimitSelector=self.to_bool( |
||
1693 | row.get('DetectionLimitSelector', 0)), |
||
1694 | MaxTimeAllowed=MTA, |
||
1695 | Price="%02f" % Float(row['Price']), |
||
1696 | BulkPrice="%02f" % Float(row['BulkPrice']), |
||
1697 | VAT="%02f" % Float(row['VAT']), |
||
1698 | _Method=defaultmethod, |
||
1699 | Methods=methods, |
||
1700 | ManualEntryOfResults=allowmanualentry, |
||
1701 | InstrumentEntryOfResults=allowinstrentry, |
||
1702 | Instruments=instruments, |
||
1703 | Calculation=_calculation, |
||
1704 | UseDefaultCalculation=usedefaultcalculation, |
||
1705 | DuplicateVariation="%02f" % Float(row['DuplicateVariation']), |
||
1706 | Accredited=self.to_bool(row['Accredited']), |
||
1707 | InterimFields=hasattr(self, 'service_interims') and self.service_interims.get( |
||
1708 | row['title'], []) or [], |
||
1709 | Separate=self.to_bool(row.get('Separate', False)), |
||
1710 | Container=container, |
||
1711 | Preservation=preservation, |
||
1712 | CommercialID=row.get('CommercialID', ''), |
||
1713 | ProtocolID=row.get('ProtocolID', '') |
||
1714 | ) |
||
1715 | obj.unmarkCreationFlag() |
||
1716 | renameAfterCreation(obj) |
||
1717 | notify(ObjectInitializedEvent(obj)) |
||
1718 | self.load_result_options() |
||
1719 | self.load_service_uncertainties() |
||
1720 | |||
1721 | |||
1722 | class Analysis_Specifications(WorksheetImporter): |
||
1723 | |||
1724 | def resolve_service(self, row): |
||
1725 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1726 | service = bsc( |
||
1727 | portal_type="AnalysisService", |
||
1728 | title=safe_unicode(row["service"]) |
||
1729 | ) |
||
1730 | if not service: |
||
1731 | service = bsc( |
||
1732 | portal_type="AnalysisService", |
||
1733 | getKeyword=safe_unicode(row["service"]) |
||
1734 | ) |
||
1735 | service = service[0].getObject() |
||
1736 | return service |
||
1737 | |||
1738 | def Import(self): |
||
1739 | bucket = {} |
||
1740 | client_catalog = getToolByName(self.context, CLIENT_CATALOG) |
||
1741 | setup_catalog = getToolByName(self.context, SETUP_CATALOG) |
||
1742 | # collect up all values into the bucket |
||
1743 | for row in self.get_rows(3): |
||
1744 | title = row.get("Title", False) |
||
1745 | if not title: |
||
1746 | title = row.get("title", False) |
||
1747 | if not title: |
||
1748 | continue |
||
1749 | parent = row["Client_title"] if row["Client_title"] else "lab" |
||
1750 | st = row["SampleType_title"] if row["SampleType_title"] else "" |
||
1751 | service = self.resolve_service(row) |
||
1752 | |||
1753 | if parent not in bucket: |
||
1754 | bucket[parent] = {} |
||
1755 | if title not in bucket[parent]: |
||
1756 | bucket[parent][title] = {"sampletype": st, "resultsrange": []} |
||
1757 | bucket[parent][title]["resultsrange"].append({ |
||
1758 | "keyword": service.getKeyword(), |
||
1759 | "min": row["min"] if row["min"] else "0", |
||
1760 | "max": row["max"] if row["max"] else "0", |
||
1761 | }) |
||
1762 | # write objects. |
||
1763 | for parent in bucket.keys(): |
||
1764 | for title in bucket[parent]: |
||
1765 | if parent == "lab": |
||
1766 | folder = self.context.bika_setup.bika_analysisspecs |
||
1767 | else: |
||
1768 | proxy = client_catalog( |
||
1769 | portal_type="Client", getName=safe_unicode(parent))[0] |
||
1770 | folder = proxy.getObject() |
||
1771 | st = bucket[parent][title]["sampletype"] |
||
1772 | resultsrange = bucket[parent][title]["resultsrange"] |
||
1773 | if st: |
||
1774 | st_uid = setup_catalog( |
||
1775 | portal_type="SampleType", title=safe_unicode(st))[0].UID |
||
1776 | obj = _createObjectByType("AnalysisSpec", folder, tmpID()) |
||
1777 | obj.edit(title=title) |
||
1778 | obj.setResultsRange(resultsrange) |
||
1779 | if st: |
||
1780 | obj.setSampleType(st_uid) |
||
1781 | obj.unmarkCreationFlag() |
||
1782 | renameAfterCreation(obj) |
||
1783 | notify(ObjectInitializedEvent(obj)) |
||
1784 | |||
1785 | |||
1786 | class Analysis_Profiles(WorksheetImporter): |
||
1787 | |||
1788 | def load_analysis_profile_services(self): |
||
1789 | sheetname = 'Analysis Profile Services' |
||
1790 | worksheet = self.workbook[sheetname] |
||
1791 | self.profile_services = {} |
||
1792 | if not worksheet: |
||
1793 | return |
||
1794 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1795 | for row in self.get_rows(3, worksheet=worksheet): |
||
1796 | if not row.get('Profile', '') or not row.get('Service', ''): |
||
1797 | continue |
||
1798 | if row['Profile'] not in self.profile_services.keys(): |
||
1799 | self.profile_services[row['Profile']] = [] |
||
1800 | # Here we match againts Keyword or Title. |
||
1801 | # XXX We need a utility for this kind of thing. |
||
1802 | service = self.get_object( |
||
1803 | bsc, 'AnalysisService', row.get('Service')) |
||
1804 | if not service: |
||
1805 | service = bsc(portal_type='AnalysisService', |
||
1806 | getKeyword=row['Service'])[0].getObject() |
||
1807 | self.profile_services[row['Profile']].append(service) |
||
1808 | |||
1809 | def Import(self): |
||
1810 | self.load_analysis_profile_services() |
||
1811 | folder = self.context.setup.analysisprofiles |
||
1812 | for row in self.get_rows(3): |
||
1813 | title = row.get("title", "") |
||
1814 | description = row.get("description", "") |
||
1815 | profile_key = row.get("ProfileKey", "") |
||
1816 | commercial_id = row.get("CommercialID", "") |
||
1817 | analysis_profile_price = row.get("AnalysisProfilePrice") |
||
1818 | analysis_profile_vat = row.get("AnalysisProfileVAT") |
||
1819 | use_analysis_profile_price = row.get("UseAnalysisProfilePrice") |
||
1820 | if title: |
||
1821 | obj = api.create(folder, "AnalysisProfile") |
||
1822 | api.edit(obj, |
||
1823 | title=api.safe_unicode(title), |
||
1824 | description=api.safe_unicode(description), |
||
1825 | profile_key=api.safe_unicode(profile_key), |
||
1826 | commercial_id=api.safe_unicode(commercial_id), |
||
1827 | analysis_profile_price=api.to_float( |
||
1828 | analysis_profile_price, 0.0), |
||
1829 | analysis_profile_vat=api.to_float( |
||
1830 | analysis_profile_vat, 0.0), |
||
1831 | use_analysis_profile_price=bool( |
||
1832 | use_analysis_profile_price)) |
||
1833 | # set the services |
||
1834 | obj.setServices(self.profile_services[row["title"]]) |
||
1835 | |||
1836 | |||
1837 | class Sample_Templates(WorksheetImporter): |
||
1838 | |||
1839 | def load_sampletemplate_services(self): |
||
1840 | sheetname = "Sample Template Services" |
||
1841 | worksheet = self.workbook[sheetname] |
||
1842 | if not worksheet: |
||
1843 | return |
||
1844 | sc = api.get_tool(SETUP_CATALOG) |
||
1845 | self.services = {} |
||
1846 | for row in self.get_rows(3, worksheet=worksheet): |
||
1847 | keyword = row.get("keyword") |
||
1848 | service = self.get_object(sc, "AnalysisService", keyword) |
||
1849 | part_id = row.get("part_id", "") |
||
1850 | title = row.get("SampleTemplate") |
||
1851 | if title not in self.services: |
||
1852 | self.services[title] = [] |
||
1853 | self.services[title].append({ |
||
1854 | "uid": api.get_uid(service), |
||
1855 | "part_id": part_id, |
||
1856 | }) |
||
1857 | |||
1858 | def load_sampletemplate_partitions(self): |
||
1859 | sheetname = "Sample Template Partitions" |
||
1860 | worksheet = self.workbook[sheetname] |
||
1861 | if not worksheet: |
||
1862 | return |
||
1863 | |||
1864 | sc = api.get_tool(SETUP_CATALOG) |
||
1865 | self.partitions = {} |
||
1866 | for row in self.get_rows(3, worksheet=worksheet): |
||
1867 | title = row.get("SampleTemplate") |
||
1868 | container = row.get("container") |
||
1869 | preservation = row.get("preservation") |
||
1870 | sampletype = row.get("sampletype") |
||
1871 | part_id = row.get("part_id") |
||
1872 | if title not in self.partitions: |
||
1873 | self.partitions[title] = [] |
||
1874 | container = self.get_object(sc, "SampleContainer", container) |
||
1875 | preservation = self.get_object( |
||
1876 | sc, "SamplePreservation", preservation) |
||
1877 | sampletype = self.get_object(sc, "SampleType", sampletype) |
||
1878 | self.partitions[title].append({ |
||
1879 | "part_id": part_id, |
||
1880 | "container": api.get_uid(container) if container else "", |
||
1881 | "preservation": api.get_uid(preservation) if preservation else "", |
||
1882 | "sampletype": api.get_uid(sampletype) if sampletype else "", |
||
1883 | }) |
||
1884 | |||
1885 | def Import(self): |
||
1886 | self.load_sampletemplate_services() |
||
1887 | self.load_sampletemplate_partitions() |
||
1888 | |||
1889 | setup = api.get_senaite_setup() |
||
1890 | folder = setup.sampletemplates |
||
1891 | sc = api.get_tool(SETUP_CATALOG) |
||
1892 | |||
1893 | for row in self.get_rows(3): |
||
1894 | title = row.get("title") |
||
1895 | if not title: |
||
1896 | continue |
||
1897 | services = self.services.get(title) |
||
1898 | client_title = row.get("Client_title") or "lab" |
||
1899 | partitions = self.partitions.get(title, []) |
||
1900 | if client_title == "lab": |
||
1901 | folder = setup.sampletemplates |
||
1902 | else: |
||
1903 | client = api.search({ |
||
1904 | "portal_type": "Client", |
||
1905 | "getName": client_title |
||
1906 | }, CLIENT_CATALOG) |
||
1907 | if len(client) == 1: |
||
1908 | folder = api.get_object(client[0]) |
||
1909 | |||
1910 | sampletype = self.get_object( |
||
1911 | sc, 'SampleType', row.get('SampleType_title')) |
||
1912 | samplepoint = self.get_object( |
||
1913 | sc, 'SamplePoint', row.get('SamplePoint_title')) |
||
1914 | |||
1915 | obj = api.create(folder, "SampleTemplate", title=title) |
||
1916 | obj.setSampleType(sampletype) |
||
1917 | obj.setSamplePoint(samplepoint) |
||
1918 | obj.setPartitions(partitions) |
||
1919 | obj.setServices(services) |
||
1920 | |||
1921 | |||
1922 | class Reference_Definitions(WorksheetImporter): |
||
1923 | |||
1924 | def load_reference_definition_results(self): |
||
1925 | sheetname = 'Reference Definition Results' |
||
1926 | worksheet = self.workbook[sheetname] |
||
1927 | if not worksheet: |
||
1928 | sheetname = 'Reference Definition Values' |
||
1929 | worksheet = self.workbook[sheetname] |
||
1930 | if not worksheet: |
||
1931 | return |
||
1932 | self.results = {} |
||
1933 | if not worksheet: |
||
1934 | return |
||
1935 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1936 | for row in self.get_rows(3, worksheet=worksheet): |
||
1937 | if row['ReferenceDefinition_title'] not in self.results.keys(): |
||
1938 | self.results[row['ReferenceDefinition_title']] = [] |
||
1939 | service = self.get_object(bsc, 'AnalysisService', |
||
1940 | row.get('service')) |
||
1941 | self.results[ |
||
1942 | row['ReferenceDefinition_title']].append({ |
||
1943 | 'uid': service.UID(), |
||
1944 | 'result': row['result'] if row['result'] else '0', |
||
1945 | 'min': row['min'] if row['min'] else '0', |
||
1946 | 'max': row['max'] if row['max'] else '0'}) |
||
1947 | |||
1948 | def Import(self): |
||
1949 | self.load_reference_definition_results() |
||
1950 | folder = self.context.bika_setup.bika_referencedefinitions |
||
1951 | for row in self.get_rows(3): |
||
1952 | if not row['title']: |
||
1953 | continue |
||
1954 | obj = _createObjectByType("ReferenceDefinition", folder, tmpID()) |
||
1955 | obj.edit( |
||
1956 | title=row['title'], |
||
1957 | description=row.get('description', ''), |
||
1958 | Blank=self.to_bool(row['Blank']), |
||
1959 | ReferenceResults=self.results.get(row['title'], []), |
||
1960 | Hazardous=self.to_bool(row['Hazardous'])) |
||
1961 | obj.unmarkCreationFlag() |
||
1962 | renameAfterCreation(obj) |
||
1963 | notify(ObjectInitializedEvent(obj)) |
||
1964 | |||
1965 | |||
1966 | class Worksheet_Templates(WorksheetImporter): |
||
1967 | |||
1968 | def __init__(self, context): |
||
1969 | super(Worksheet_Templates, self).__init__(context) |
||
1970 | self.wst_layouts = {} |
||
1971 | self.wst_services = {} |
||
1972 | |||
1973 | def load_definitions(self): |
||
1974 | reference_query = { |
||
1975 | "portal_type": "ReferenceDefinition", |
||
1976 | "is_active": True, |
||
1977 | } |
||
1978 | definitions = {} |
||
1979 | brains = api.search(reference_query, SETUP_CATALOG) |
||
1980 | for brain in brains: |
||
1981 | definitions[api.get_title(brain)] = api.get_uid(brain) |
||
1982 | return definitions |
||
1983 | |||
1984 | def load_wst_layouts(self): |
||
1985 | definitions = self.load_definitions() |
||
1986 | sheetname = "Worksheet Template Layouts" |
||
1987 | worksheet = self.workbook[sheetname] |
||
1988 | if not worksheet: |
||
1989 | return |
||
1990 | for row in self.get_rows(3, worksheet=worksheet): |
||
1991 | wst_title = row.get("WorksheetTemplate_title") |
||
1992 | if wst_title not in self.wst_layouts.keys(): |
||
1993 | self.wst_layouts[wst_title] = [] |
||
1994 | |||
1995 | ref_proxy = None |
||
1996 | dup = row.get("dup", None) |
||
1997 | dup = int(dup) if dup else None |
||
1998 | analysis_type = row.get("type", "a") |
||
1999 | blank_uid = None |
||
2000 | control_uid = None |
||
2001 | |||
2002 | # check control/blank fields if it 'Title' or 'UID' |
||
2003 | if analysis_type in ["b", "Blank"]: |
||
2004 | blank_ref = row.get("blank_ref", "") |
||
2005 | if api.is_uid(blank_ref): |
||
2006 | blank_uid = blank_ref |
||
2007 | else: |
||
2008 | blank_uid = definitions.get(blank_ref, None) |
||
2009 | ref_proxy = blank_uid or None |
||
2010 | elif analysis_type in ["c", "Control"]: |
||
2011 | control_ref = row.get("control_ref", "") |
||
2012 | if api.is_uid(control_ref): |
||
2013 | control_uid = control_ref |
||
2014 | else: |
||
2015 | control_uid = definitions.get(control_ref, None) |
||
2016 | ref_proxy = control_uid or None |
||
2017 | |||
2018 | if analysis_type not in ["d", "Duplicate"]: |
||
2019 | dup = None |
||
2020 | |||
2021 | self.wst_layouts[wst_title].append( |
||
2022 | { |
||
2023 | "pos": int(row["pos"]), |
||
2024 | "type": analysis_type[0].lower(), # if 'type' is full word |
||
2025 | "blank_ref": [blank_uid] if blank_uid else [], |
||
2026 | "control_ref": [control_uid] if blank_uid else [], |
||
2027 | "reference_proxy": ref_proxy, |
||
2028 | "dup": dup, |
||
2029 | "dup_proxy": dup, |
||
2030 | } |
||
2031 | ) |
||
2032 | |||
2033 | def load_wst_services(self): |
||
2034 | sheetname = "Worksheet Template Services" |
||
2035 | worksheet = self.workbook[sheetname] |
||
2036 | if not worksheet: |
||
2037 | return |
||
2038 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2039 | for row in self.get_rows(3, worksheet=worksheet): |
||
2040 | wst_title = row.get("WorksheetTemplate_title") |
||
2041 | if wst_title not in self.wst_services.keys(): |
||
2042 | self.wst_services[wst_title] = [] |
||
2043 | service = self.get_object(bsc, "AnalysisService", |
||
2044 | row.get("service")) |
||
2045 | if service: |
||
2046 | self.wst_services[wst_title].append(service.UID()) |
||
2047 | |||
2048 | def Import(self): |
||
2049 | self.load_wst_services() |
||
2050 | self.load_wst_layouts() |
||
2051 | folder = self.context.setup.worksheettemplates |
||
2052 | for row in self.get_rows(3): |
||
2053 | title = row.get("title") |
||
2054 | if not title: |
||
2055 | continue |
||
2056 | |||
2057 | obj = api.create(folder, "WorksheetTemplate", |
||
2058 | title=title, |
||
2059 | description=row.get("description", "")) |
||
2060 | if title in self.wst_layouts.keys(): |
||
2061 | obj.setTemplateLayout(self.wst_layouts[title]) |
||
2062 | if title in self.wst_services.keys(): |
||
2063 | obj.setServices(self.wst_services[title]) |
||
2064 | |||
2065 | |||
2066 | class Setup(WorksheetImporter): |
||
2067 | |||
2068 | def get_field_value(self, field, value): |
||
2069 | if value is None: |
||
2070 | return None |
||
2071 | converters = { |
||
2072 | "integer": self.to_integer_value, |
||
2073 | "fixedpoint": self.to_fixedpoint_value, |
||
2074 | "boolean": self.to_boolean_value, |
||
2075 | "string": self.to_string_value, |
||
2076 | "reference": self.to_reference_value, |
||
2077 | "duration": self.to_duration_value |
||
2078 | } |
||
2079 | try: |
||
2080 | return converters.get(field.type, None)(field, value) |
||
2081 | except Exception: |
||
2082 | logger.error("No valid type for Setup.{} ({}): {}" |
||
2083 | .format(field.getName(), field.type, value)) |
||
2084 | |||
2085 | def to_integer_value(self, field, value): |
||
2086 | return str(int(value)) |
||
2087 | |||
2088 | def to_fixedpoint_value(self, field, value): |
||
2089 | return str(float(value)) |
||
2090 | |||
2091 | def to_boolean_value(self, field, value): |
||
2092 | return self.to_bool(value) |
||
2093 | |||
2094 | def to_string_value(self, field, value): |
||
2095 | if field.vocabulary: |
||
2096 | return self.to_string_vocab_value(field, value) |
||
2097 | return value and str(value) or "" |
||
2098 | |||
2099 | def to_reference_value(self, field, value): |
||
2100 | if not value: |
||
2101 | return None |
||
2102 | |||
2103 | brains = api.search({"title": to_unicode(value)}) |
||
2104 | if brains: |
||
2105 | return api.get_uid(brains[0]) |
||
2106 | |||
2107 | msg = "No object found for Setup.{0} ({1}): {2}" |
||
2108 | msg = msg.format(field.getName(), field.type, value) |
||
2109 | logger.error(msg) |
||
2110 | raise ValueError(msg) |
||
2111 | |||
2112 | def to_string_vocab_value(self, field, value): |
||
2113 | vocabulary = field.vocabulary |
||
2114 | if type(vocabulary) is str: |
||
2115 | vocabulary = getFromString(api.get_setup(), vocabulary) |
||
2116 | else: |
||
2117 | vocabulary = vocabulary.items() |
||
2118 | |||
2119 | if not vocabulary: |
||
2120 | raise ValueError("Empty vocabulary for {}".format(field.getName())) |
||
2121 | |||
2122 | if type(vocabulary) in (tuple, list): |
||
2123 | vocabulary = {item[0]: item[1] for item in vocabulary} |
||
2124 | |||
2125 | for key, val in vocabulary.items(): |
||
2126 | key_low = str(to_utf8(key)).lower() |
||
2127 | val_low = str(to_utf8(val)).lower() |
||
2128 | value_low = str(value).lower() |
||
2129 | if key_low == value_low or val_low == value_low: |
||
2130 | return key |
||
2131 | raise ValueError("Vocabulary entry not found") |
||
2132 | |||
2133 | def to_duration_value(self, field, values): |
||
2134 | duration = ["days", "hours", "minutes"] |
||
2135 | duration = map(lambda d: "{}_{}".format(field.getName(), d), duration) |
||
2136 | return dict( |
||
2137 | days=api.to_int(values.get(duration[0], 0), 0), |
||
2138 | hours=api.to_int(values.get(duration[1], 0), 0), |
||
2139 | minutes=api.to_int(values.get(duration[2], 0), 0)) |
||
2140 | |||
2141 | def Import(self): |
||
2142 | values = {} |
||
2143 | for row in self.get_rows(3): |
||
2144 | values[row['Field']] = row['Value'] |
||
2145 | |||
2146 | bsetup = self.context.bika_setup |
||
2147 | bschema = bsetup.Schema() |
||
2148 | for field in bschema.fields(): |
||
2149 | value = None |
||
2150 | field_name = field.getName() |
||
2151 | if field_name in values: |
||
2152 | value = self.get_field_value(field, values[field_name]) |
||
2153 | elif field.type == "duration": |
||
2154 | value = self.get_field_value(field, values) |
||
2155 | |||
2156 | if value is None: |
||
2157 | continue |
||
2158 | try: |
||
2159 | obj_field = bsetup.getField(field_name) |
||
2160 | obj_field.set(bsetup, str(value)) |
||
2161 | except Exception: |
||
2162 | logger.error("No valid type for Setup.{} ({}): {}" |
||
2163 | .format(field_name, field.type, value)) |
||
2164 | |||
2165 | |||
2166 | class ID_Prefixes(WorksheetImporter): |
||
2167 | |||
2168 | def Import(self): |
||
2169 | prefixes = self.context.bika_setup.getIDFormatting() |
||
2170 | for row in self.get_rows(3): |
||
2171 | # remove existing prefix from list |
||
2172 | prefixes = [p for p in prefixes |
||
2173 | if p['portal_type'] != row['portal_type']] |
||
2174 | # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator' |
||
2175 | separator = row.get('separator', '-') |
||
2176 | separator = '' if separator == 'none' else separator |
||
2177 | # add new prefix to list |
||
2178 | prefixes.append({'portal_type': row['portal_type'], |
||
2179 | 'padding': row['padding'], |
||
2180 | 'prefix': row['prefix'], |
||
2181 | 'separator': separator}) |
||
2182 | # self.context.bika_setup.setIDFormatting(prefixes) |
||
2183 | |||
2184 | |||
2185 | class Attachment_Types(WorksheetImporter): |
||
2186 | |||
2187 | def Import(self): |
||
2188 | container = self.context.setup.attachmenttypes |
||
2189 | for row in self.get_rows(3): |
||
2190 | title = row.get("title") |
||
2191 | if not title: |
||
2192 | continue |
||
2193 | |||
2194 | api.create(container, "AttachmentType", |
||
2195 | title=title, description=row.get("description")) |
||
2196 | |||
2197 | |||
2198 | class Reference_Samples(WorksheetImporter): |
||
2199 | |||
2200 | def load_reference_sample_results(self, sample): |
||
2201 | sheetname = 'Reference Sample Results' |
||
2202 | if not hasattr(self, 'results_worksheet'): |
||
2203 | worksheet = self.workbook[sheetname] |
||
2204 | if not worksheet: |
||
2205 | return |
||
2206 | self.results_worksheet = worksheet |
||
2207 | results = [] |
||
2208 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2209 | for row in self.get_rows(3, worksheet=self.results_worksheet): |
||
2210 | if row['ReferenceSample_id'] != sample.getId(): |
||
2211 | continue |
||
2212 | service = self.get_object(bsc, 'AnalysisService', |
||
2213 | row.get('AnalysisService_title')) |
||
2214 | if not service: |
||
2215 | warning = "Unable to load a reference sample result. Service %s not found." |
||
2216 | logger.warning(warning, sheetname) |
||
2217 | continue |
||
2218 | results.append({ |
||
2219 | 'uid': service.UID(), |
||
2220 | 'result': row['result'], |
||
2221 | 'min': row['min'], |
||
2222 | 'max': row['max']}) |
||
2223 | sample.setReferenceResults(results) |
||
2224 | |||
2225 | def load_reference_analyses(self, sample): |
||
2226 | sheetname = 'Reference Analyses' |
||
2227 | if not hasattr(self, 'analyses_worksheet'): |
||
2228 | worksheet = self.workbook[sheetname] |
||
2229 | if not worksheet: |
||
2230 | return |
||
2231 | self.analyses_worksheet = worksheet |
||
2232 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2233 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2234 | if row['ReferenceSample_id'] != sample.getId(): |
||
2235 | continue |
||
2236 | service = self.get_object(bsc, 'AnalysisService', |
||
2237 | row.get('AnalysisService_title')) |
||
2238 | # Analyses are keyed/named by service keyword |
||
2239 | obj = _createObjectByType("ReferenceAnalysis", sample, row['id']) |
||
2240 | obj.edit(title=row['id'], |
||
2241 | ReferenceType=row['ReferenceType'], |
||
2242 | Result=row['Result'], |
||
2243 | Analyst=row['Analyst'], |
||
2244 | Instrument=row['Instrument'], |
||
2245 | Retested=row['Retested'] |
||
2246 | ) |
||
2247 | obj.setService(service) |
||
2248 | # obj.setCreators(row['creator']) |
||
2249 | # obj.setCreationDate(row['created']) |
||
2250 | # self.set_wf_history(obj, row['workflow_history']) |
||
2251 | obj.unmarkCreationFlag() |
||
2252 | |||
2253 | self.load_reference_analysis_interims(obj) |
||
2254 | |||
2255 | View Code Duplication | def load_reference_analysis_interims(self, analysis): |
|
2256 | sheetname = 'Reference Analysis Interims' |
||
2257 | if not hasattr(self, 'interim_worksheet'): |
||
2258 | worksheet = self.workbook[sheetname] |
||
2259 | if not worksheet: |
||
2260 | return |
||
2261 | self.interim_worksheet = worksheet |
||
2262 | interims = [] |
||
2263 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2264 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2265 | continue |
||
2266 | interims.append({ |
||
2267 | 'keyword': row['keyword'], |
||
2268 | 'title': row['title'], |
||
2269 | 'value': row['value'], |
||
2270 | 'unit': row['unit'], |
||
2271 | 'hidden': row['hidden']}) |
||
2272 | analysis.setInterimFields(interims) |
||
2273 | |||
2274 | def Import(self): |
||
2275 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2276 | for row in self.get_rows(3): |
||
2277 | if not row['id']: |
||
2278 | continue |
||
2279 | supplier = bsc(portal_type='Supplier', |
||
2280 | getName=row.get('Supplier_title', ''))[0].getObject() |
||
2281 | obj = _createObjectByType("ReferenceSample", supplier, row['id']) |
||
2282 | ref_def = self.get_object(bsc, 'ReferenceDefinition', |
||
2283 | row.get('ReferenceDefinition_title')) |
||
2284 | ref_man = self.get_object(bsc, 'Manufacturer', |
||
2285 | row.get('Manufacturer_title')) |
||
2286 | obj.edit(title=row['id'], |
||
2287 | description=row.get('description', ''), |
||
2288 | Blank=self.to_bool(row['Blank']), |
||
2289 | Hazardous=self.to_bool(row['Hazardous']), |
||
2290 | CatalogueNumber=row['CatalogueNumber'], |
||
2291 | LotNumber=row['LotNumber'], |
||
2292 | Remarks=row['Remarks'], |
||
2293 | ExpiryDate=row['ExpiryDate'], |
||
2294 | DateSampled=row['DateSampled'], |
||
2295 | DateReceived=row['DateReceived'], |
||
2296 | DateOpened=row['DateOpened'], |
||
2297 | DateExpired=row['DateExpired'], |
||
2298 | DateDisposed=row['DateDisposed'] |
||
2299 | ) |
||
2300 | obj.setReferenceDefinition(ref_def) |
||
2301 | obj.setManufacturer(ref_man) |
||
2302 | obj.unmarkCreationFlag() |
||
2303 | |||
2304 | self.load_reference_sample_results(obj) |
||
2305 | self.load_reference_analyses(obj) |
||
2306 | |||
2307 | |||
2308 | class Analysis_Requests(WorksheetImporter): |
||
2309 | |||
2310 | def load_analyses(self, sample): |
||
2311 | sheetname = 'Analyses' |
||
2312 | if not hasattr(self, 'analyses_worksheet'): |
||
2313 | worksheet = self.workbook[sheetname] |
||
2314 | if not worksheet: |
||
2315 | return |
||
2316 | self.analyses_worksheet = worksheet |
||
2317 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2318 | bc = getToolByName(self.context, SENAITE_CATALOG) |
||
2319 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2320 | service = bsc(portal_type='AnalysisService', |
||
2321 | title=row['AnalysisService_title'])[0].getObject() |
||
2322 | # analyses are keyed/named by keyword |
||
2323 | ar = bc(portal_type='AnalysisRequest', |
||
2324 | id=row['AnalysisRequest_id'])[0].getObject() |
||
2325 | obj = create_analysis( |
||
2326 | ar, service, |
||
2327 | Result=row['Result'], |
||
2328 | ResultCaptureDate=row['ResultCaptureDate'], |
||
2329 | Analyst=row['Analyst'], |
||
2330 | Instrument=row['Instrument'], |
||
2331 | Retested=self.to_bool(row['Retested']), |
||
2332 | MaxTimeAllowed={ |
||
2333 | 'days': int(row.get('MaxTimeAllowed_days', 0)), |
||
2334 | 'hours': int(row.get('MaxTimeAllowed_hours', 0)), |
||
2335 | 'minutes': int(row.get('MaxTimeAllowed_minutes', 0)), |
||
2336 | }, |
||
2337 | ) |
||
2338 | |||
2339 | analyses = ar.objectValues('Analyses') |
||
2340 | analyses = list(analyses) |
||
2341 | analyses.append(obj) |
||
2342 | ar.setAnalyses(analyses) |
||
2343 | obj.unmarkCreationFlag() |
||
2344 | |||
2345 | self.load_analysis_interims(obj) |
||
2346 | |||
2347 | View Code Duplication | def load_analysis_interims(self, analysis): |
|
2348 | sheetname = 'Reference Analysis Interims' |
||
2349 | if not hasattr(self, 'interim_worksheet'): |
||
2350 | worksheet = self.workbook[sheetname] |
||
2351 | if not worksheet: |
||
2352 | return |
||
2353 | self.interim_worksheet = worksheet |
||
2354 | interims = [] |
||
2355 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2356 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2357 | continue |
||
2358 | interims.append({ |
||
2359 | 'keyword': row['keyword'], |
||
2360 | 'title': row['title'], |
||
2361 | 'value': row['value'], |
||
2362 | 'unit': row['unit'], |
||
2363 | 'hidden': row['hidden']}) |
||
2364 | analysis.setInterimFields(interims) |
||
2365 | |||
2366 | def Import(self): |
||
2367 | client_cat = api.get_tool(CLIENT_CATALOG) |
||
2368 | contact_cat = api.get_tool(CONTACT_CATALOG) |
||
2369 | setup_cat = api.get_tool(SETUP_CATALOG) |
||
2370 | for row in self.get_rows(3): |
||
2371 | if not row['id']: |
||
2372 | continue |
||
2373 | client = client_cat(portal_type="Client", |
||
2374 | getName=row['Client_title'])[0].getObject() |
||
2375 | obj = _createObjectByType("AnalysisRequest", client, row['id']) |
||
2376 | contact = contact_cat(portal_type="Contact", |
||
2377 | getFullname=row['Contact_Fullname'])[0].getObject() |
||
2378 | obj.edit( |
||
2379 | RequestID=row['id'], |
||
2380 | Contact=contact, |
||
2381 | CCEmails=row['CCEmails'], |
||
2382 | ClientOrderNumber=row['ClientOrderNumber'], |
||
2383 | InvoiceExclude=row['InvoiceExclude'], |
||
2384 | DateReceived=row['DateReceived'], |
||
2385 | DatePublished=row['DatePublished'], |
||
2386 | Remarks=row['Remarks'] |
||
2387 | ) |
||
2388 | if row['CCContact_Fullname']: |
||
2389 | contact = contact_cat(portal_type="Contact", |
||
2390 | getFullname=row['CCContact_Fullname'])[0].getObject() |
||
2391 | obj.setCCContact(contact) |
||
2392 | if row['AnalysisProfile_title']: |
||
2393 | profiles = setup_cat(portal_type="AnalysisProfile", |
||
2394 | title=row['AnalysisProfile_title'])[0].getObject() |
||
2395 | obj.setProfiles([profiles]) |
||
2396 | if row['ARTemplate_title']: |
||
2397 | template = setup_cat(portal_type="ARTemplate", |
||
2398 | title=row['ARTemplate_title'])[0].getObject() |
||
2399 | obj.setTemplate(template) |
||
2400 | |||
2401 | obj.unmarkCreationFlag() |
||
2402 | |||
2403 | self.load_analyses(obj) |
||
2404 | |||
2405 | |||
2406 | class Invoice_Batches(WorksheetImporter): |
||
2407 | |||
2408 | def Import(self): |
||
2409 | folder = self.context.invoices |
||
2410 | for row in self.get_rows(3): |
||
2411 | obj = _createObjectByType("InvoiceBatch", folder, tmpID()) |
||
2412 | if not row['title']: |
||
2413 | message = _("InvoiceBatch has no Title") |
||
2414 | raise Exception(t(message)) |
||
2415 | if not row['start']: |
||
2416 | message = _("InvoiceBatch has no Start Date") |
||
2417 | raise Exception(t(message)) |
||
2418 | if not row['end']: |
||
2419 | message = _("InvoiceBatch has no End Date") |
||
2420 | raise Exception(t(message)) |
||
2421 | obj.edit( |
||
2422 | title=row['title'], |
||
2423 | BatchStartDate=row['start'], |
||
2424 | BatchEndDate=row['end'], |
||
2425 | ) |
||
2426 | renameAfterCreation(obj) |
||
2427 | notify(ObjectInitializedEvent(obj)) |
||
2428 |