Total Complexity | 443 |
Total Lines | 2324 |
Duplicated Lines | 7.19 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like senaite.core.exportimport.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2024 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | import datetime |
||
22 | import os.path |
||
23 | import re |
||
24 | |||
25 | import transaction |
||
26 | from bika.lims import api |
||
27 | from bika.lims import bikaMessageFactory as _ |
||
28 | from bika.lims import logger |
||
29 | from senaite.core.idserver import renameAfterCreation |
||
30 | from bika.lims.interfaces import ISetupDataSetList |
||
31 | from bika.lims.utils import getFromString |
||
32 | from senaite.core.i18n import translate as t |
||
33 | from bika.lims.utils import tmpID |
||
34 | from bika.lims.utils import to_unicode |
||
35 | from bika.lims.utils import to_utf8 |
||
36 | from bika.lims.utils.analysis import create_analysis |
||
37 | from pkg_resources import resource_filename |
||
38 | from Products.Archetypes.event import ObjectInitializedEvent |
||
39 | from Products.CMFCore.utils import getToolByName |
||
40 | from Products.CMFPlone.utils import _createObjectByType |
||
41 | from Products.CMFPlone.utils import safe_unicode |
||
42 | from senaite.core.catalog import CLIENT_CATALOG |
||
43 | from senaite.core.catalog import CONTACT_CATALOG |
||
44 | from senaite.core.catalog import SENAITE_CATALOG |
||
45 | from senaite.core.catalog import SETUP_CATALOG |
||
46 | from senaite.core.exportimport.dataimport import SetupDataSetList as SDL |
||
47 | from zope.event import notify |
||
48 | from zope.interface import implements |
||
49 | |||
50 | UID_CATALOG = "uid_catalog" |
||
51 | |||
52 | |||
53 | def lookup(context, portal_type, **kwargs): |
||
54 | at = getToolByName(context, 'archetype_tool') |
||
55 | catalog = at.catalog_map.get(portal_type, [None])[0] or UID_CATALOG |
||
56 | catalog = getToolByName(context, catalog) |
||
57 | kwargs['portal_type'] = portal_type |
||
58 | return catalog(**kwargs)[0].getObject() |
||
59 | |||
60 | |||
61 | def check_for_required_columns(name, data, required): |
||
62 | for column in required: |
||
63 | if not data.get(column, None): |
||
64 | message = _("%s has no '%s' column." % (name, column)) |
||
65 | raise Exception(t(message)) |
||
66 | |||
67 | |||
68 | def Float(thing): |
||
69 | try: |
||
70 | f = float(thing) |
||
71 | except ValueError: |
||
72 | f = 0.0 |
||
73 | return f |
||
74 | |||
75 | |||
76 | def read_file(path): |
||
77 | if os.path.isfile(path): |
||
78 | return open(path, "rb").read() |
||
79 | allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt', |
||
80 | 'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt'] |
||
81 | allowed_ext += [e.upper() for e in allowed_ext] |
||
82 | for e in allowed_ext: |
||
83 | out = '%s.%s' % (path, e) |
||
84 | if os.path.isfile(out): |
||
85 | return open(out, "rb").read() |
||
86 | raise IOError("File not found: %s. Allowed extensions: %s" % (path, ','.join(allowed_ext))) |
||
87 | |||
88 | |||
89 | class SetupDataSetList(SDL): |
||
90 | |||
91 | implements(ISetupDataSetList) |
||
92 | |||
93 | def __call__(self): |
||
94 | return SDL.__call__(self, projectname="bika.lims") |
||
95 | |||
96 | |||
97 | class WorksheetImporter: |
||
98 | |||
99 | """Use this as a base, for normal tabular data sheet imports. |
||
100 | """ |
||
101 | |||
102 | def __init__(self, context): |
||
103 | self.adapter_context = context |
||
104 | |||
105 | def __call__(self, lsd, workbook, dataset_project, dataset_name): |
||
106 | self.lsd = lsd |
||
107 | self.context = lsd.context |
||
108 | self.workbook = workbook |
||
109 | self.sheetname = self.__class__.__name__.replace("_", " ") |
||
110 | try: |
||
111 | self.worksheet = workbook[self.sheetname] |
||
112 | except KeyError: |
||
113 | self.worksheet = None |
||
114 | self.dataset_project = dataset_project |
||
115 | self.dataset_name = dataset_name |
||
116 | if self.worksheet: |
||
117 | logger.info("Loading {0}.{1}: {2}".format( |
||
118 | self.dataset_project, self.dataset_name, self.sheetname)) |
||
119 | try: |
||
120 | self.Import() |
||
121 | except IOError: |
||
122 | # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/ |
||
123 | # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from |
||
124 | # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624 |
||
125 | warning = "Error while loading attached file from %s. The file will not be uploaded into the system." |
||
126 | logger.warning(warning, self.sheetname) |
||
127 | self.context.plone_utils.addPortalMessage("Error while loading some attached files. " |
||
128 | "The files weren't uploaded into the system.") |
||
129 | else: |
||
130 | logger.info("No records found: '{0}'".format(self.sheetname)) |
||
131 | |||
132 | def get_rows(self, startrow=3, worksheet=None): |
||
133 | """Returns a generator for all rows in a sheet. |
||
134 | Each row contains a dictionary where the key is the value of the |
||
135 | first row of the sheet for each column. |
||
136 | The data values are returned in utf-8 format. |
||
137 | Starts to consume data from startrow |
||
138 | """ |
||
139 | |||
140 | headers = [] |
||
141 | row_nr = 0 |
||
142 | worksheet = worksheet if worksheet else self.worksheet |
||
143 | for row in worksheet.rows: # .iter_rows(): |
||
144 | row_nr += 1 |
||
145 | if row_nr == 1: |
||
146 | # headers = [cell.internal_value for cell in row] |
||
147 | headers = [cell.value for cell in row] |
||
148 | continue |
||
149 | if row_nr % 1000 == 0: |
||
150 | transaction.savepoint() |
||
151 | if row_nr <= startrow: |
||
152 | continue |
||
153 | # row = [_c(cell.internal_value).decode('utf-8') for cell in row] |
||
154 | new_row = [] |
||
155 | for cell in row: |
||
156 | value = cell.value |
||
157 | if value is None: |
||
158 | value = '' |
||
159 | if isinstance(value, unicode): |
||
|
|||
160 | value = value.encode('utf-8') |
||
161 | # Strip any space, \t, \n, or \r characters from the left-hand |
||
162 | # side, right-hand side, or both sides of the string |
||
163 | if isinstance(value, str): |
||
164 | value = value.strip(' \t\n\r') |
||
165 | new_row.append(value) |
||
166 | row = dict(zip(headers, new_row)) |
||
167 | |||
168 | # parse out addresses |
||
169 | for add_type in ['Physical', 'Postal', 'Billing']: |
||
170 | row[add_type] = {} |
||
171 | if add_type + "_Address" in row: |
||
172 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
173 | row[add_type][key] = str(row.get("%s_%s" % (add_type, key), '')) |
||
174 | |||
175 | yield row |
||
176 | |||
177 | def get_file_data(self, filename): |
||
178 | if filename: |
||
179 | try: |
||
180 | path = resource_filename( |
||
181 | self.dataset_project, |
||
182 | "setupdata/%s/%s" % (self.dataset_name, filename)) |
||
183 | file_data = open(path, "rb").read() |
||
184 | except Exception: |
||
185 | file_data = None |
||
186 | else: |
||
187 | file_data = None |
||
188 | return file_data |
||
189 | |||
190 | def to_bool(self, value): |
||
191 | """ Converts a sheet string value to a boolean value. |
||
192 | Needed because of utf-8 conversions |
||
193 | """ |
||
194 | |||
195 | try: |
||
196 | value = value.lower() |
||
197 | except Exception: |
||
198 | pass |
||
199 | try: |
||
200 | value = value.encode('utf-8') |
||
201 | except Exception: |
||
202 | pass |
||
203 | try: |
||
204 | value = int(value) |
||
205 | except Exception: |
||
206 | pass |
||
207 | if value in ('true', 1): |
||
208 | return True |
||
209 | else: |
||
210 | return False |
||
211 | |||
212 | def to_int(self, value, default=0): |
||
213 | """ Converts a value o a int. Returns default if the conversion fails. |
||
214 | """ |
||
215 | try: |
||
216 | return int(value) |
||
217 | except ValueError: |
||
218 | try: |
||
219 | return int(default) |
||
220 | except Exception: |
||
221 | return 0 |
||
222 | |||
223 | def to_float(self, value, default=0): |
||
224 | """ Converts a value o a float. Returns default if the conversion fails. |
||
225 | """ |
||
226 | try: |
||
227 | return float(value) |
||
228 | except ValueError: |
||
229 | try: |
||
230 | return float(default) |
||
231 | except Exception: |
||
232 | return 0.0 |
||
233 | |||
234 | def defer(self, **kwargs): |
||
235 | self.lsd.deferred.append(kwargs) |
||
236 | |||
237 | def Import(self): |
||
238 | """ Override this. |
||
239 | XXX Simple generic sheet importer |
||
240 | """ |
||
241 | |||
242 | def fill_addressfields(self, row, obj): |
||
243 | """ Fills the address fields for the specified object if allowed: |
||
244 | PhysicalAddress, PostalAddress, CountryState, BillingAddress |
||
245 | """ |
||
246 | addresses = {} |
||
247 | for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']: |
||
248 | addresses[add_type] = {} |
||
249 | for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']: |
||
250 | addresses[add_type][key.lower()] = str(row.get("%s_%s" % (add_type, key), '')) |
||
251 | |||
252 | if addresses['CountryState']['country'] == '' \ |
||
253 | and addresses['CountryState']['state'] == '': |
||
254 | addresses['CountryState']['country'] = addresses['Physical']['country'] |
||
255 | addresses['CountryState']['state'] = addresses['Physical']['state'] |
||
256 | |||
257 | if hasattr(obj, 'setPhysicalAddress'): |
||
258 | obj.setPhysicalAddress(addresses['Physical']) |
||
259 | if hasattr(obj, 'setPostalAddress'): |
||
260 | obj.setPostalAddress(addresses['Postal']) |
||
261 | if hasattr(obj, 'setCountryState'): |
||
262 | obj.setCountryState(addresses['CountryState']) |
||
263 | if hasattr(obj, 'setBillingAddress'): |
||
264 | obj.setBillingAddress(addresses['Billing']) |
||
265 | |||
266 | def fill_contactfields(self, row, obj): |
||
267 | """ Fills the contact fields for the specified object if allowed: |
||
268 | EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone, |
||
269 | MobilePhone |
||
270 | """ |
||
271 | fieldnames = ['EmailAddress', |
||
272 | 'Phone', |
||
273 | 'Fax', |
||
274 | 'BusinessPhone', |
||
275 | 'BusinessFax', |
||
276 | 'HomePhone', |
||
277 | 'MobilePhone', |
||
278 | ] |
||
279 | schema = obj.Schema() |
||
280 | fields = dict([(field.getName(), field) for field in schema.fields()]) |
||
281 | for fieldname in fieldnames: |
||
282 | try: |
||
283 | field = fields[fieldname] |
||
284 | except Exception: |
||
285 | if fieldname in row: |
||
286 | logger.info("Address field %s not found on %s"%(fieldname,obj)) |
||
287 | continue |
||
288 | value = row.get(fieldname, '') |
||
289 | field.set(obj, value) |
||
290 | |||
291 | def get_object(self, catalog, portal_type, title=None, **kwargs): |
||
292 | """This will return an object from the catalog. |
||
293 | Logs a message and returns None if no object or multiple objects found. |
||
294 | All keyword arguments are passed verbatim to the contentFilter |
||
295 | """ |
||
296 | if not title and not kwargs: |
||
297 | return None |
||
298 | contentFilter = {"portal_type": portal_type} |
||
299 | if title: |
||
300 | contentFilter['title'] = to_unicode(title) |
||
301 | contentFilter.update(kwargs) |
||
302 | brains = catalog(contentFilter) |
||
303 | if len(brains) > 1: |
||
304 | logger.info("More than one object found for %s" % contentFilter) |
||
305 | return None |
||
306 | elif len(brains) == 0: |
||
307 | if portal_type == 'AnalysisService': |
||
308 | brains = catalog(portal_type=portal_type, getKeyword=title) |
||
309 | if brains: |
||
310 | return brains[0].getObject() |
||
311 | logger.info("No objects found for %s" % contentFilter) |
||
312 | return None |
||
313 | else: |
||
314 | return brains[0].getObject() |
||
315 | |||
316 | |||
317 | class Sub_Groups(WorksheetImporter): |
||
318 | |||
319 | def Import(self): |
||
320 | folder = self.context.bika_setup.bika_subgroups |
||
321 | for row in self.get_rows(3): |
||
322 | if 'title' in row and row['title']: |
||
323 | obj = _createObjectByType("SubGroup", folder, tmpID()) |
||
324 | obj.edit(title=row['title'], |
||
325 | description=row['description'], |
||
326 | SortKey=row['SortKey']) |
||
327 | obj.unmarkCreationFlag() |
||
328 | renameAfterCreation(obj) |
||
329 | notify(ObjectInitializedEvent(obj)) |
||
330 | |||
331 | |||
332 | class Lab_Information(WorksheetImporter): |
||
333 | |||
334 | def Import(self): |
||
335 | laboratory = self.context.bika_setup.laboratory |
||
336 | values = {} |
||
337 | for row in self.get_rows(3): |
||
338 | values[row['Field']] = row['Value'] |
||
339 | |||
340 | if values['AccreditationBodyLogo']: |
||
341 | path = resource_filename( |
||
342 | self.dataset_project, |
||
343 | "setupdata/%s/%s" % (self.dataset_name, |
||
344 | values['AccreditationBodyLogo'])) |
||
345 | try: |
||
346 | file_data = read_file(path) |
||
347 | except Exception as msg: |
||
348 | file_data = None |
||
349 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
350 | else: |
||
351 | file_data = None |
||
352 | |||
353 | laboratory.edit( |
||
354 | Name=values['Name'], |
||
355 | LabURL=values['LabURL'], |
||
356 | Confidence=values['Confidence'], |
||
357 | LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']), |
||
358 | AccreditationBodyLong=values['AccreditationBodyLong'], |
||
359 | AccreditationBody=values['AccreditationBody'], |
||
360 | AccreditationBodyURL=values['AccreditationBodyURL'], |
||
361 | Accreditation=values['Accreditation'], |
||
362 | AccreditationReference=values['AccreditationReference'], |
||
363 | AccreditationBodyLogo=file_data, |
||
364 | TaxNumber=values['TaxNumber'], |
||
365 | ) |
||
366 | self.fill_contactfields(values, laboratory) |
||
367 | self.fill_addressfields(values, laboratory) |
||
368 | |||
369 | |||
370 | class Lab_Contacts(WorksheetImporter): |
||
371 | |||
372 | def Import(self): |
||
373 | folder = self.context.bika_setup.bika_labcontacts |
||
374 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
375 | portal_registration = getToolByName( |
||
376 | self.context, 'portal_registration') |
||
377 | rownum = 2 |
||
378 | for row in self.get_rows(3): |
||
379 | rownum+=1 |
||
380 | if not row.get('Firstname',None): |
||
381 | continue |
||
382 | |||
383 | # Username already exists? |
||
384 | username = row.get('Username','') |
||
385 | fullname = ('%s %s' % (row['Firstname'], row.get('Surname', ''))).strip() |
||
386 | if username: |
||
387 | username = safe_unicode(username).encode('utf-8') |
||
388 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
389 | exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username] |
||
390 | if exists: |
||
391 | error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format(username, str(rownum)) |
||
392 | logger.error(error) |
||
393 | continue |
||
394 | |||
395 | # Is there a signature file defined? Try to get the file first. |
||
396 | signature = None |
||
397 | if row.get('Signature'): |
||
398 | signature = self.get_file_data(row['Signature']) |
||
399 | if not signature: |
||
400 | warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format(row['Signature'], username) |
||
401 | logger.warning(warning) |
||
402 | |||
403 | obj = _createObjectByType("LabContact", folder, tmpID()) |
||
404 | obj.edit( |
||
405 | title=fullname, |
||
406 | Salutation=row.get('Salutation', ''), |
||
407 | Firstname=row['Firstname'], |
||
408 | Surname=row.get('Surname', ''), |
||
409 | JobTitle=row.get('JobTitle', ''), |
||
410 | Username=row.get('Username', ''), |
||
411 | Signature=signature |
||
412 | ) |
||
413 | obj.unmarkCreationFlag() |
||
414 | renameAfterCreation(obj) |
||
415 | notify(ObjectInitializedEvent(obj)) |
||
416 | self.fill_contactfields(row, obj) |
||
417 | self.fill_addressfields(row, obj) |
||
418 | |||
419 | if row['Department_title']: |
||
420 | self.defer(src_obj=obj, |
||
421 | src_field='Department', |
||
422 | dest_catalog=SETUP_CATALOG, |
||
423 | dest_query={'portal_type': 'Department', |
||
424 | 'title': row['Department_title']} |
||
425 | ) |
||
426 | |||
427 | # Create Plone user |
||
428 | if not row['Username']: |
||
429 | warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum)) |
||
430 | logger.warning(warn) |
||
431 | if not row.get('EmailAddress', ''): |
||
432 | warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum)) |
||
433 | logger.warning(warn) |
||
434 | |||
435 | if(row['Username'] and row.get('EmailAddress','')): |
||
436 | username = safe_unicode(row['Username']).encode('utf-8') |
||
437 | passw = row['Password'] |
||
438 | if not passw: |
||
439 | passw = username |
||
440 | warn = ("Lab Contact: No password defined for user '{0}' in row {1}." |
||
441 | " Password established automatically to '{2}'").format(username, str(rownum), passw) |
||
442 | logger.warning(warn) |
||
443 | |||
444 | try: |
||
445 | member = portal_registration.addMember( |
||
446 | username, |
||
447 | passw, |
||
448 | properties={ |
||
449 | 'username': username, |
||
450 | 'email': row['EmailAddress'], |
||
451 | 'fullname': fullname} |
||
452 | ) |
||
453 | except Exception as msg: |
||
454 | logger.error("Client Contact: Error adding user (%s): %s" % (msg, username)) |
||
455 | continue |
||
456 | |||
457 | groups = row.get('Groups', '') |
||
458 | if not groups: |
||
459 | warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format(username, str(rownum)) |
||
460 | logger.warning(warn) |
||
461 | groups = 'Analysts' |
||
462 | |||
463 | group_ids = [g.strip() for g in groups.split(',')] |
||
464 | # Add user to all specified groups |
||
465 | for group_id in group_ids: |
||
466 | group = portal_groups.getGroupById(group_id) |
||
467 | if group: |
||
468 | group.addMember(username) |
||
469 | roles = row.get('Roles', '') |
||
470 | if roles: |
||
471 | role_ids = [r.strip() for r in roles.split(',')] |
||
472 | # Add user to all specified roles |
||
473 | for role_id in role_ids: |
||
474 | member._addRole(role_id) |
||
475 | # If user is in LabManagers, add Owner local role on clients |
||
476 | # folder |
||
477 | if 'LabManager' in group_ids: |
||
478 | self.context.clients.manage_setLocalRoles( |
||
479 | username, ['Owner', ]) |
||
480 | |||
481 | # Now we have the lab contacts registered, try to assign the managers |
||
482 | # to each department if required |
||
483 | sheet = self.workbook["Lab Departments"] |
||
484 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
485 | for row in self.get_rows(3, sheet): |
||
486 | if row['title'] and row['LabContact_Username']: |
||
487 | dept = self.get_object(bsc, "Department", row.get('title')) |
||
488 | if dept and not dept.getManager(): |
||
489 | username = safe_unicode(row['LabContact_Username']).encode('utf-8') |
||
490 | exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username] |
||
491 | if exists: |
||
492 | dept.setManager(exists[0].UID()) |
||
493 | |||
494 | |||
495 | class Lab_Departments(WorksheetImporter): |
||
496 | """Import Lab Departments |
||
497 | """ |
||
498 | def Import(self): |
||
499 | setup = api.get_senaite_setup() |
||
500 | container = setup.departments |
||
501 | cat = getToolByName(self.context, CONTACT_CATALOG) |
||
502 | lab_contacts = [o.getObject() for o in cat(portal_type="LabContact")] |
||
503 | for row in self.get_rows(3): |
||
504 | title = row.get("title") |
||
505 | description = row.get("description") |
||
506 | username = row.get("LabContact_Username") |
||
507 | manager = None |
||
508 | |||
509 | if not title: |
||
510 | continue |
||
511 | |||
512 | obj = api.create(container, |
||
513 | "Department", |
||
514 | title=title, |
||
515 | description=description) |
||
516 | |||
517 | for contact in lab_contacts: |
||
518 | if contact.getUsername() == username: |
||
519 | manager = contact |
||
520 | break |
||
521 | if manager: |
||
522 | obj.setManager(manager.UID()) |
||
523 | else: |
||
524 | message = "Department: lookup of '%s' in LabContacts" \ |
||
525 | "/Username failed." % username |
||
526 | logger.info(message) |
||
527 | |||
528 | |||
529 | class Lab_Products(WorksheetImporter): |
||
530 | |||
531 | def Import(self): |
||
532 | # Refer to the default folder |
||
533 | folder = self.context.bika_setup.bika_labproducts |
||
534 | # Iterate through the rows |
||
535 | for row in self.get_rows(3): |
||
536 | # Create the LabProduct object |
||
537 | obj = _createObjectByType('LabProduct', folder, tmpID()) |
||
538 | # Apply the row values |
||
539 | obj.edit( |
||
540 | title=row.get('title', 'Unknown'), |
||
541 | description=row.get('description', ''), |
||
542 | Volume=row.get('volume', 0), |
||
543 | Unit=str(row.get('unit', 0)), |
||
544 | Price=str(row.get('price', 0)), |
||
545 | ) |
||
546 | # Rename the new object |
||
547 | renameAfterCreation(obj) |
||
548 | notify(ObjectInitializedEvent(obj)) |
||
549 | |||
550 | |||
551 | class Clients(WorksheetImporter): |
||
552 | |||
553 | def Import(self): |
||
554 | folder = self.context.clients |
||
555 | for row in self.get_rows(3): |
||
556 | obj = _createObjectByType("Client", folder, tmpID()) |
||
557 | if not row['Name']: |
||
558 | message = "Client %s has no Name" |
||
559 | raise Exception(message) |
||
560 | if not row['ClientID']: |
||
561 | message = "Client %s has no Client ID" |
||
562 | raise Exception(message) |
||
563 | obj.edit(Name=row['Name'], |
||
564 | ClientID=row['ClientID'], |
||
565 | MemberDiscountApplies=row[ |
||
566 | 'MemberDiscountApplies'] and True or False, |
||
567 | BulkDiscount=row['BulkDiscount'] and True or False, |
||
568 | TaxNumber=row.get('TaxNumber', ''), |
||
569 | AccountNumber=row.get('AccountNumber', '') |
||
570 | ) |
||
571 | self.fill_contactfields(row, obj) |
||
572 | self.fill_addressfields(row, obj) |
||
573 | obj.unmarkCreationFlag() |
||
574 | renameAfterCreation(obj) |
||
575 | notify(ObjectInitializedEvent(obj)) |
||
576 | |||
577 | |||
578 | class Client_Contacts(WorksheetImporter): |
||
579 | |||
580 | def Import(self): |
||
581 | portal_groups = getToolByName(self.context, 'portal_groups') |
||
582 | cat = api.get_tool(CLIENT_CATALOG) |
||
583 | for row in self.get_rows(3): |
||
584 | client = cat(portal_type="Client", |
||
585 | getName=row['Client_title']) |
||
586 | if len(client) == 0: |
||
587 | client_contact = "%(Firstname)s %(Surname)s" % row |
||
588 | error = "Client invalid: '%s'. The Client Contact %s will not be uploaded." |
||
589 | logger.error(error, row['Client_title'], client_contact) |
||
590 | continue |
||
591 | client = client[0].getObject() |
||
592 | contact = _createObjectByType("Contact", client, tmpID()) |
||
593 | fullname = "%(Firstname)s %(Surname)s" % row |
||
594 | pub_pref = [x.strip() for x in |
||
595 | row.get('PublicationPreference', '').split(",")] |
||
596 | contact.edit( |
||
597 | Salutation=row.get('Salutation', ''), |
||
598 | Firstname=row.get('Firstname', ''), |
||
599 | Surname=row.get('Surname', ''), |
||
600 | Username=row['Username'], |
||
601 | JobTitle=row.get('JobTitle', ''), |
||
602 | Department=row.get('Department', ''), |
||
603 | PublicationPreference=pub_pref, |
||
604 | ) |
||
605 | self.fill_contactfields(row, contact) |
||
606 | self.fill_addressfields(row, contact) |
||
607 | contact.unmarkCreationFlag() |
||
608 | renameAfterCreation(contact) |
||
609 | notify(ObjectInitializedEvent(contact)) |
||
610 | # CC Contacts |
||
611 | if row['CCContacts']: |
||
612 | names = [x.strip() for x in row['CCContacts'].split(",")] |
||
613 | for _fullname in names: |
||
614 | self.defer(src_obj=contact, |
||
615 | src_field='CCContact', |
||
616 | dest_catalog=CONTACT_CATALOG, |
||
617 | dest_query={'portal_type': 'Contact', |
||
618 | 'getFullname': _fullname} |
||
619 | ) |
||
620 | ## Create Plone user |
||
621 | username = safe_unicode(row['Username']).encode('utf-8') |
||
622 | password = safe_unicode(row['Password']).encode('utf-8') |
||
623 | if(username): |
||
624 | try: |
||
625 | self.context.portal_registration.addMember( |
||
626 | username, |
||
627 | password, |
||
628 | properties={ |
||
629 | 'username': username, |
||
630 | 'email': row['EmailAddress'], |
||
631 | 'fullname': fullname} |
||
632 | ) |
||
633 | except Exception as msg: |
||
634 | logger.info("Error adding user (%s): %s" % (msg, username)) |
||
635 | contact.aq_parent.manage_setLocalRoles(row['Username'], ['Owner', ]) |
||
636 | contact.reindexObject() |
||
637 | # add user to Clients group |
||
638 | group = portal_groups.getGroupById('Clients') |
||
639 | group.addMember(username) |
||
640 | |||
641 | |||
642 | class Container_Types(WorksheetImporter): |
||
643 | |||
644 | View Code Duplication | def Import(self): |
|
645 | folder = self.context.bika_setup.bika_containertypes |
||
646 | for row in self.get_rows(3): |
||
647 | if not row['title']: |
||
648 | continue |
||
649 | obj = _createObjectByType("ContainerType", folder, tmpID()) |
||
650 | obj.edit(title=row['title'], |
||
651 | description=row.get('description', '')) |
||
652 | obj.unmarkCreationFlag() |
||
653 | renameAfterCreation(obj) |
||
654 | notify(ObjectInitializedEvent(obj)) |
||
655 | |||
656 | |||
657 | class Preservations(WorksheetImporter): |
||
658 | |||
659 | def Import(self): |
||
660 | container = self.context.setup.samplepreservations |
||
661 | for row in self.get_rows(3): |
||
662 | title = row.get("title") |
||
663 | if not title: |
||
664 | continue |
||
665 | |||
666 | api.create(container, "SamplePreservation", |
||
667 | title=title, description=row.get("description")) |
||
668 | |||
669 | |||
670 | class Containers(WorksheetImporter): |
||
671 | |||
672 | def Import(self): |
||
673 | folder = self.context.bika_setup.sample_containers |
||
674 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
675 | for row in self.get_rows(3): |
||
676 | if not row["title"]: |
||
677 | continue |
||
678 | obj = api.create(folder, "SampleContainer") |
||
679 | obj.setTitle(row["title"]) |
||
680 | obj.setDescription(row.get("description", "")) |
||
681 | obj.setCapacity(row.get("Capacity", 0)) |
||
682 | obj.setPrePreserved(self.to_bool(row["PrePreserved"])) |
||
683 | if row["ContainerType_title"]: |
||
684 | ct = self.get_object( |
||
685 | bsc, "ContainerType", row.get("ContainerType_title", "")) |
||
686 | if ct: |
||
687 | obj.setContainerType(ct) |
||
688 | if row["Preservation_title"]: |
||
689 | pres = self.get_object(bsc, "SamplePreservation", |
||
690 | row.get("Preservation_title", "")) |
||
691 | if pres: |
||
692 | obj.setPreservation(pres) |
||
693 | |||
694 | |||
695 | class Suppliers(WorksheetImporter): |
||
696 | |||
697 | def Import(self): |
||
698 | folder = self.context.bika_setup.bika_suppliers |
||
699 | for row in self.get_rows(3): |
||
700 | obj = _createObjectByType("Supplier", folder, tmpID()) |
||
701 | if row['Name']: |
||
702 | obj.edit( |
||
703 | Name=row.get('Name', ''), |
||
704 | TaxNumber=row.get('TaxNumber', ''), |
||
705 | AccountType=row.get('AccountType', {}), |
||
706 | AccountName=row.get('AccountName', {}), |
||
707 | AccountNumber=row.get('AccountNumber', ''), |
||
708 | BankName=row.get('BankName', ''), |
||
709 | BankBranch=row.get('BankBranch', ''), |
||
710 | SWIFTcode=row.get('SWIFTcode', ''), |
||
711 | IBN=row.get('IBN', ''), |
||
712 | NIB=row.get('NIB', ''), |
||
713 | Website=row.get('Website', ''), |
||
714 | ) |
||
715 | self.fill_contactfields(row, obj) |
||
716 | self.fill_addressfields(row, obj) |
||
717 | obj.unmarkCreationFlag() |
||
718 | renameAfterCreation(obj) |
||
719 | notify(ObjectInitializedEvent(obj)) |
||
720 | |||
721 | |||
722 | class Supplier_Contacts(WorksheetImporter): |
||
723 | |||
724 | def Import(self): |
||
725 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
726 | for row in self.get_rows(3): |
||
727 | if not row['Supplier_Name']: |
||
728 | continue |
||
729 | if not row['Firstname']: |
||
730 | continue |
||
731 | folder = bsc(portal_type="Supplier", |
||
732 | Title=row['Supplier_Name']) |
||
733 | if not folder: |
||
734 | continue |
||
735 | folder = folder[0].getObject() |
||
736 | obj = _createObjectByType("SupplierContact", folder, tmpID()) |
||
737 | obj.edit( |
||
738 | Firstname=row['Firstname'], |
||
739 | Surname=row.get('Surname', ''), |
||
740 | Username=row.get('Username') |
||
741 | ) |
||
742 | self.fill_contactfields(row, obj) |
||
743 | self.fill_addressfields(row, obj) |
||
744 | obj.unmarkCreationFlag() |
||
745 | renameAfterCreation(obj) |
||
746 | notify(ObjectInitializedEvent(obj)) |
||
747 | |||
748 | |||
749 | class Manufacturers(WorksheetImporter): |
||
750 | |||
751 | def Import(self): |
||
752 | folder = self.context.bika_setup.bika_manufacturers |
||
753 | for row in self.get_rows(3): |
||
754 | obj = _createObjectByType("Manufacturer", folder, tmpID()) |
||
755 | if row['title']: |
||
756 | obj.edit( |
||
757 | title=row['title'], |
||
758 | description=row.get('description', '') |
||
759 | ) |
||
760 | self.fill_addressfields(row, obj) |
||
761 | obj.unmarkCreationFlag() |
||
762 | renameAfterCreation(obj) |
||
763 | notify(ObjectInitializedEvent(obj)) |
||
764 | |||
765 | |||
766 | class Instrument_Types(WorksheetImporter): |
||
767 | |||
768 | def Import(self): |
||
769 | folder = self.context.bika_setup.bika_instrumenttypes |
||
770 | for row in self.get_rows(3): |
||
771 | obj = _createObjectByType("InstrumentType", folder, tmpID()) |
||
772 | obj.edit( |
||
773 | title=row['title'], |
||
774 | description=row.get('description', '')) |
||
775 | obj.unmarkCreationFlag() |
||
776 | renameAfterCreation(obj) |
||
777 | notify(ObjectInitializedEvent(obj)) |
||
778 | |||
779 | |||
780 | class Instruments(WorksheetImporter): |
||
781 | |||
782 | def Import(self): |
||
783 | folder = self.context.bika_setup.bika_instruments |
||
784 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
785 | for row in self.get_rows(3): |
||
786 | if ('Type' not in row |
||
787 | or 'Supplier' not in row |
||
788 | or 'Brand' not in row): |
||
789 | logger.info("Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title','')) |
||
790 | continue |
||
791 | |||
792 | obj = _createObjectByType("Instrument", folder, tmpID()) |
||
793 | |||
794 | obj.edit( |
||
795 | title=row.get('title', ''), |
||
796 | AssetNumber=row.get('assetnumber', ''), |
||
797 | description=row.get('description', ''), |
||
798 | Type=row.get('Type', ''), |
||
799 | Brand=row.get('Brand', ''), |
||
800 | Model=row.get('Model', ''), |
||
801 | SerialNo=row.get('SerialNo', ''), |
||
802 | DataInterface=row.get('DataInterface', ''), |
||
803 | Location=row.get('Location', ''), |
||
804 | InstallationDate=row.get('Instalationdate', ''), |
||
805 | UserManualID=row.get('UserManualID', ''), |
||
806 | ) |
||
807 | instrumenttype = self.get_object(bsc, 'InstrumentType', title=row.get('Type')) |
||
808 | manufacturer = self.get_object(bsc, 'Manufacturer', title=row.get('Brand')) |
||
809 | supplier = self.get_object(bsc, 'Supplier', title=row.get('Supplier', '')) |
||
810 | method = self.get_object(bsc, 'Method', title=row.get('Method')) |
||
811 | obj.setInstrumentType(instrumenttype) |
||
812 | obj.setManufacturer(manufacturer) |
||
813 | obj.setSupplier(supplier) |
||
814 | if method: |
||
815 | obj.setMethods([method]) |
||
816 | obj.setMethod(method) |
||
817 | |||
818 | # Attaching the instrument's photo |
||
819 | View Code Duplication | if row.get('Photo', None): |
|
820 | path = resource_filename( |
||
821 | self.dataset_project, |
||
822 | "setupdata/%s/%s" % (self.dataset_name, |
||
823 | row['Photo']) |
||
824 | ) |
||
825 | try: |
||
826 | file_data = read_file(path) |
||
827 | obj.setPhoto(file_data) |
||
828 | except Exception as msg: |
||
829 | file_data = None |
||
830 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
831 | |||
832 | # Attaching the Installation Certificate if exists |
||
833 | View Code Duplication | if row.get('InstalationCertificate', None): |
|
834 | path = resource_filename( |
||
835 | self.dataset_project, |
||
836 | "setupdata/%s/%s" % (self.dataset_name, |
||
837 | row['InstalationCertificate']) |
||
838 | ) |
||
839 | try: |
||
840 | file_data = read_file(path) |
||
841 | obj.setInstallationCertificate(file_data) |
||
842 | except Exception as msg: |
||
843 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
844 | |||
845 | # Attaching the Instrument's manual if exists |
||
846 | if row.get('UserManualFile', None): |
||
847 | row_dict = {'DocumentID': row.get('UserManualID', 'manual'), |
||
848 | 'DocumentVersion': '', |
||
849 | 'DocumentLocation': '', |
||
850 | 'DocumentType': 'Manual', |
||
851 | 'File': row.get('UserManualFile', None) |
||
852 | } |
||
853 | addDocument(self, row_dict, obj) |
||
854 | obj.unmarkCreationFlag() |
||
855 | renameAfterCreation(obj) |
||
856 | notify(ObjectInitializedEvent(obj)) |
||
857 | |||
858 | |||
859 | View Code Duplication | class Instrument_Validations(WorksheetImporter): |
|
860 | |||
861 | def Import(self): |
||
862 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
863 | for row in self.get_rows(3): |
||
864 | if not row.get('instrument', None) or not row.get('title', None): |
||
865 | continue |
||
866 | |||
867 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
868 | if folder: |
||
869 | obj = _createObjectByType("InstrumentValidation", folder, tmpID()) |
||
870 | obj.edit( |
||
871 | title=row['title'], |
||
872 | DownFrom=row.get('downfrom', ''), |
||
873 | DownTo=row.get('downto', ''), |
||
874 | Validator=row.get('validator', ''), |
||
875 | Considerations=row.get('considerations', ''), |
||
876 | WorkPerformed=row.get('workperformed', ''), |
||
877 | Remarks=row.get('remarks', ''), |
||
878 | DateIssued=row.get('DateIssued', ''), |
||
879 | ReportID=row.get('ReportID', '') |
||
880 | ) |
||
881 | # Getting lab contacts |
||
882 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
883 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", is_active=True)] |
||
884 | for contact in lab_contacts: |
||
885 | if contact.getFullname() == row.get('Worker', ''): |
||
886 | obj.setWorker(contact.UID()) |
||
887 | obj.unmarkCreationFlag() |
||
888 | renameAfterCreation(obj) |
||
889 | notify(ObjectInitializedEvent(obj)) |
||
890 | |||
891 | |||
892 | View Code Duplication | class Instrument_Calibrations(WorksheetImporter): |
|
893 | |||
894 | def Import(self): |
||
895 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
896 | for row in self.get_rows(3): |
||
897 | if not row.get('instrument', None) or not row.get('title', None): |
||
898 | continue |
||
899 | |||
900 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
901 | if folder: |
||
902 | obj = _createObjectByType("InstrumentCalibration", folder, tmpID()) |
||
903 | obj.edit( |
||
904 | title=row['title'], |
||
905 | DownFrom=row.get('downfrom', ''), |
||
906 | DownTo=row.get('downto', ''), |
||
907 | Calibrator=row.get('calibrator', ''), |
||
908 | Considerations=row.get('considerations', ''), |
||
909 | WorkPerformed=row.get('workperformed', ''), |
||
910 | Remarks=row.get('remarks', ''), |
||
911 | DateIssued=row.get('DateIssued', ''), |
||
912 | ReportID=row.get('ReportID', '') |
||
913 | ) |
||
914 | # Gettinginstrument lab contacts |
||
915 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
916 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')] |
||
917 | for contact in lab_contacts: |
||
918 | if contact.getFullname() == row.get('Worker', ''): |
||
919 | obj.setWorker(contact.UID()) |
||
920 | obj.unmarkCreationFlag() |
||
921 | renameAfterCreation(obj) |
||
922 | notify(ObjectInitializedEvent(obj)) |
||
923 | |||
924 | |||
925 | class Instrument_Certifications(WorksheetImporter): |
||
926 | |||
927 | def Import(self): |
||
928 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
929 | for row in self.get_rows(3): |
||
930 | if not row['instrument'] or not row['title']: |
||
931 | continue |
||
932 | |||
933 | folder = self.get_object(bsc, 'Instrument', row.get('instrument','')) |
||
934 | if folder: |
||
935 | obj = _createObjectByType("InstrumentCertification", folder, tmpID()) |
||
936 | today = datetime.date.today() |
||
937 | certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \ |
||
938 | if row.get('validto', '') == '' else row.get('validto') |
||
939 | certificate_start_date = today.strftime('%d/%m/%Y') \ |
||
940 | if row.get('validfrom', '') == '' else row.get('validfrom') |
||
941 | obj.edit( |
||
942 | title=row['title'], |
||
943 | AssetNumber=row.get('assetnumber', ''), |
||
944 | Date=row.get('date', ''), |
||
945 | ValidFrom=certificate_start_date, |
||
946 | ValidTo=certificate_expire_date, |
||
947 | Agency=row.get('agency', ''), |
||
948 | Remarks=row.get('remarks', ''), |
||
949 | ) |
||
950 | # Attaching the Report Certificate if exists |
||
951 | View Code Duplication | if row.get('report', None): |
|
952 | path = resource_filename( |
||
953 | self.dataset_project, |
||
954 | "setupdata/%s/%s" % (self.dataset_name, |
||
955 | row['report']) |
||
956 | ) |
||
957 | try: |
||
958 | file_data = read_file(path) |
||
959 | obj.setDocument(file_data) |
||
960 | except Exception as msg: |
||
961 | file_data = None |
||
962 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
963 | |||
964 | # Getting lab contacts |
||
965 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
966 | lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')] |
||
967 | for contact in lab_contacts: |
||
968 | if contact.getFullname() == row.get('preparedby', ''): |
||
969 | obj.setPreparator(contact.UID()) |
||
970 | if contact.getFullname() == row.get('approvedby', ''): |
||
971 | obj.setValidator(contact.UID()) |
||
972 | obj.unmarkCreationFlag() |
||
973 | renameAfterCreation(obj) |
||
974 | notify(ObjectInitializedEvent(obj)) |
||
975 | |||
976 | |||
977 | class Instrument_Documents(WorksheetImporter): |
||
978 | |||
979 | def Import(self): |
||
980 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
981 | for row in self.get_rows(3): |
||
982 | if not row.get('instrument', ''): |
||
983 | continue |
||
984 | folder = self.get_object(bsc, 'Instrument', row.get('instrument', '')) |
||
985 | addDocument(self, row, folder) |
||
986 | |||
987 | def addDocument(self, row_dict, folder): |
||
988 | """ |
||
989 | This function adds a multifile object to the instrument folder |
||
990 | :param row_dict: the dictionary which contains the document information |
||
991 | :param folder: the instrument object |
||
992 | """ |
||
993 | if folder: |
||
994 | # This content type need a file |
||
995 | if row_dict.get('File', None): |
||
996 | path = resource_filename( |
||
997 | self.dataset_project, |
||
998 | "setupdata/%s/%s" % (self.dataset_name, |
||
999 | row_dict['File']) |
||
1000 | ) |
||
1001 | try: |
||
1002 | file_data = read_file(path) |
||
1003 | except Exception as msg: |
||
1004 | file_data = None |
||
1005 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
1006 | |||
1007 | # Obtain all created instrument documents content type |
||
1008 | catalog = getToolByName(self.context, SETUP_CATALOG) |
||
1009 | documents_brains = catalog.searchResults({'portal_type': 'Multifile'}) |
||
1010 | # If a the new document has the same DocumentID as a created document, this object won't be created. |
||
1011 | idAlreadyInUse = False |
||
1012 | for item in documents_brains: |
||
1013 | if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''): |
||
1014 | warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \ |
||
1015 | "the file hasn't been upload." % (row_dict.get('DocumentID', ''), row_dict.get('instrument', '')) |
||
1016 | self.context.plone_utils.addPortalMessage(warning) |
||
1017 | idAlreadyInUse = True |
||
1018 | if not idAlreadyInUse: |
||
1019 | obj = _createObjectByType("Multifile", folder, tmpID()) |
||
1020 | obj.edit( |
||
1021 | DocumentID=row_dict.get('DocumentID', ''), |
||
1022 | DocumentVersion=row_dict.get('DocumentVersion', ''), |
||
1023 | DocumentLocation=row_dict.get('DocumentLocation', ''), |
||
1024 | DocumentType=row_dict.get('DocumentType', ''), |
||
1025 | File=file_data |
||
1026 | ) |
||
1027 | obj.unmarkCreationFlag() |
||
1028 | renameAfterCreation(obj) |
||
1029 | notify(ObjectInitializedEvent(obj)) |
||
1030 | |||
1031 | |||
1032 | class Instrument_Maintenance_Tasks(WorksheetImporter): |
||
1033 | |||
1034 | def Import(self): |
||
1035 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1036 | for row in self.get_rows(3): |
||
1037 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1038 | continue |
||
1039 | |||
1040 | folder = self.get_object(bsc, 'Instrument',row.get('instrument')) |
||
1041 | if folder: |
||
1042 | obj = _createObjectByType("InstrumentMaintenanceTask", folder, tmpID()) |
||
1043 | try: |
||
1044 | cost = "%.2f" % (row.get('cost', 0)) |
||
1045 | except Exception: |
||
1046 | cost = row.get('cost', '0.0') |
||
1047 | |||
1048 | obj.edit( |
||
1049 | title=row['title'], |
||
1050 | description=row['description'], |
||
1051 | Type=row['type'], |
||
1052 | DownFrom=row.get('downfrom', ''), |
||
1053 | DownTo=row.get('downto', ''), |
||
1054 | Maintainer=row.get('maintaner', ''), |
||
1055 | Considerations=row.get('considerations', ''), |
||
1056 | WorkPerformed=row.get('workperformed', ''), |
||
1057 | Remarks=row.get('remarks', ''), |
||
1058 | Cost=cost, |
||
1059 | Closed=self.to_bool(row.get('closed')) |
||
1060 | ) |
||
1061 | obj.unmarkCreationFlag() |
||
1062 | renameAfterCreation(obj) |
||
1063 | notify(ObjectInitializedEvent(obj)) |
||
1064 | |||
1065 | |||
1066 | class Instrument_Schedule(WorksheetImporter): |
||
1067 | |||
1068 | def Import(self): |
||
1069 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1070 | for row in self.get_rows(3): |
||
1071 | if not row['instrument'] or not row['title'] or not row['type']: |
||
1072 | continue |
||
1073 | folder = self.get_object(bsc, 'Instrument', row.get('instrument')) |
||
1074 | if folder: |
||
1075 | obj = _createObjectByType("InstrumentScheduledTask", folder, tmpID()) |
||
1076 | criteria = [ |
||
1077 | {'fromenabled': row.get('date', None) is not None, |
||
1078 | 'fromdate': row.get('date', ''), |
||
1079 | 'repeatenabled': ((row['numrepeats'] and |
||
1080 | row['numrepeats'] > 1) or |
||
1081 | (row['repeatuntil'] and |
||
1082 | len(row['repeatuntil']) > 0)), |
||
1083 | 'repeatunit': row.get('numrepeats', ''), |
||
1084 | 'repeatperiod': row.get('periodicity', ''), |
||
1085 | 'repeatuntilenabled': (row['repeatuntil'] and |
||
1086 | len(row['repeatuntil']) > 0), |
||
1087 | 'repeatuntil': row.get('repeatuntil')} |
||
1088 | ] |
||
1089 | obj.edit( |
||
1090 | title=row['title'], |
||
1091 | Type=row['type'], |
||
1092 | ScheduleCriteria=criteria, |
||
1093 | Considerations=row.get('considerations', ''), |
||
1094 | ) |
||
1095 | obj.unmarkCreationFlag() |
||
1096 | renameAfterCreation(obj) |
||
1097 | notify(ObjectInitializedEvent(obj)) |
||
1098 | |||
1099 | |||
1100 | class Sample_Matrices(WorksheetImporter): |
||
1101 | |||
1102 | def Import(self): |
||
1103 | container = self.context.setup.samplematrices |
||
1104 | for row in self.get_rows(3): |
||
1105 | title = row.get("title") |
||
1106 | if not title: |
||
1107 | continue |
||
1108 | api.create(container, "SampleMatrix", |
||
1109 | title=title, description=row.get("description")) |
||
1110 | |||
1111 | |||
1112 | class Batch_Labels(WorksheetImporter): |
||
1113 | |||
1114 | def Import(self): |
||
1115 | folder = self.context.bika_setup.bika_batchlabels |
||
1116 | for row in self.get_rows(3): |
||
1117 | if row['title']: |
||
1118 | obj = _createObjectByType("BatchLabel", folder, tmpID()) |
||
1119 | obj.edit(title=row['title']) |
||
1120 | obj.unmarkCreationFlag() |
||
1121 | renameAfterCreation(obj) |
||
1122 | notify(ObjectInitializedEvent(obj)) |
||
1123 | |||
1124 | |||
1125 | class Sample_Types(WorksheetImporter): |
||
1126 | |||
1127 | def Import(self): |
||
1128 | folder = self.context.bika_setup.bika_sampletypes |
||
1129 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1130 | for row in self.get_rows(3): |
||
1131 | if not row['title']: |
||
1132 | continue |
||
1133 | obj = _createObjectByType("SampleType", folder, tmpID()) |
||
1134 | samplematrix = self.get_object(bsc, 'SampleMatrix', |
||
1135 | row.get('SampleMatrix_title')) |
||
1136 | containertype = self.get_object(bsc, 'ContainerType', |
||
1137 | row.get('ContainerType_title')) |
||
1138 | retentionperiod = { |
||
1139 | 'days': row['RetentionPeriod'] if row['RetentionPeriod'] else 0, |
||
1140 | 'hours': 0, |
||
1141 | 'minutes': 0} |
||
1142 | obj.edit( |
||
1143 | title=row['title'], |
||
1144 | description=row.get('description', ''), |
||
1145 | RetentionPeriod=retentionperiod, |
||
1146 | Hazardous=self.to_bool(row['Hazardous']), |
||
1147 | SampleMatrix=samplematrix, |
||
1148 | Prefix=row['Prefix'], |
||
1149 | MinimumVolume=row['MinimumVolume'], |
||
1150 | ContainerType=containertype |
||
1151 | ) |
||
1152 | samplepoint = self.get_object(bsc, 'SamplePoint', |
||
1153 | row.get('SamplePoint_title')) |
||
1154 | if samplepoint: |
||
1155 | samplepoint.setSampleType([obj, ]) |
||
1156 | obj.unmarkCreationFlag() |
||
1157 | renameAfterCreation(obj) |
||
1158 | notify(ObjectInitializedEvent(obj)) |
||
1159 | |||
1160 | |||
1161 | class Sample_Points(WorksheetImporter): |
||
1162 | |||
1163 | def Import(self): |
||
1164 | setup_folder = self.context.bika_setup.bika_samplepoints |
||
1165 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1166 | cat = api.get_tool(CLIENT_CATALOG) |
||
1167 | for row in self.get_rows(3): |
||
1168 | if not row['title']: |
||
1169 | continue |
||
1170 | if row['Client_title']: |
||
1171 | client_title = row['Client_title'] |
||
1172 | client = cat(portal_type="Client", getName=client_title) |
||
1173 | if len(client) == 0: |
||
1174 | error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded." |
||
1175 | logger.error(error, row['title'], client_title) |
||
1176 | continue |
||
1177 | folder = client[0].getObject() |
||
1178 | else: |
||
1179 | folder = setup_folder |
||
1180 | |||
1181 | if row['Latitude']: |
||
1182 | logger.log("Ignored SamplePoint Latitude", 'error') |
||
1183 | if row['Longitude']: |
||
1184 | logger.log("Ignored SamplePoint Longitude", 'error') |
||
1185 | |||
1186 | obj = _createObjectByType("SamplePoint", folder, tmpID()) |
||
1187 | obj.edit( |
||
1188 | title=row['title'], |
||
1189 | description=row.get('description', ''), |
||
1190 | Composite=self.to_bool(row['Composite']), |
||
1191 | Elevation=row['Elevation'], |
||
1192 | ) |
||
1193 | sampletype = self.get_object(bsc, 'SampleType', |
||
1194 | row.get('SampleType_title')) |
||
1195 | if sampletype: |
||
1196 | obj.setSampleTypes([sampletype, ]) |
||
1197 | obj.unmarkCreationFlag() |
||
1198 | renameAfterCreation(obj) |
||
1199 | notify(ObjectInitializedEvent(obj)) |
||
1200 | |||
1201 | |||
1202 | class Sample_Point_Sample_Types(WorksheetImporter): |
||
1203 | |||
1204 | def Import(self): |
||
1205 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1206 | for row in self.get_rows(3): |
||
1207 | sampletype = self.get_object(bsc, |
||
1208 | 'SampleType', |
||
1209 | row.get('SampleType_title')) |
||
1210 | samplepoint = self.get_object(bsc, |
||
1211 | 'SamplePoint', |
||
1212 | row['SamplePoint_title']) |
||
1213 | if samplepoint: |
||
1214 | sampletypes = samplepoint.getSampleTypes() |
||
1215 | if sampletype not in sampletypes: |
||
1216 | sampletypes.append(sampletype) |
||
1217 | samplepoint.setSampleTypes(sampletypes) |
||
1218 | |||
1219 | |||
1220 | class Storage_Locations(WorksheetImporter): |
||
1221 | |||
1222 | def Import(self): |
||
1223 | setup_folder = self.context.bika_setup.bika_storagelocations |
||
1224 | for row in self.get_rows(3): |
||
1225 | if not row['Address']: |
||
1226 | continue |
||
1227 | |||
1228 | obj = _createObjectByType("StorageLocation", setup_folder, tmpID()) |
||
1229 | obj.edit( |
||
1230 | title=row['Address'], |
||
1231 | SiteTitle=row['SiteTitle'], |
||
1232 | SiteCode=row['SiteCode'], |
||
1233 | SiteDescription=row['SiteDescription'], |
||
1234 | LocationTitle=row['LocationTitle'], |
||
1235 | LocationCode=row['LocationCode'], |
||
1236 | LocationDescription=row['LocationDescription'], |
||
1237 | LocationType=row['LocationType'], |
||
1238 | ShelfTitle=row['ShelfTitle'], |
||
1239 | ShelfCode=row['ShelfCode'], |
||
1240 | ShelfDescription=row['ShelfDescription'], |
||
1241 | ) |
||
1242 | obj.unmarkCreationFlag() |
||
1243 | renameAfterCreation(obj) |
||
1244 | notify(ObjectInitializedEvent(obj)) |
||
1245 | |||
1246 | |||
1247 | class Sample_Conditions(WorksheetImporter): |
||
1248 | |||
1249 | def Import(self): |
||
1250 | container = self.context.setup.sampleconditions |
||
1251 | for row in self.get_rows(3): |
||
1252 | title = row.get("title") |
||
1253 | if not title: |
||
1254 | continue |
||
1255 | |||
1256 | description = row.get("description") |
||
1257 | api.create(container, "SampleCondition", |
||
1258 | title=title, |
||
1259 | description=description) |
||
1260 | |||
1261 | |||
1262 | class Analysis_Categories(WorksheetImporter): |
||
1263 | |||
1264 | def Import(self): |
||
1265 | folder = self.context.bika_setup.bika_analysiscategories |
||
1266 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1267 | for row in self.get_rows(3): |
||
1268 | department = None |
||
1269 | if row.get('Department_title', None): |
||
1270 | department = self.get_object(bsc, 'Department', |
||
1271 | row.get('Department_title')) |
||
1272 | if row.get('title', None) and department: |
||
1273 | obj = _createObjectByType("AnalysisCategory", folder, tmpID()) |
||
1274 | obj.edit( |
||
1275 | title=row['title'], |
||
1276 | description=row.get('description', '')) |
||
1277 | obj.setDepartment(department) |
||
1278 | obj.unmarkCreationFlag() |
||
1279 | renameAfterCreation(obj) |
||
1280 | notify(ObjectInitializedEvent(obj)) |
||
1281 | elif not row.get('title', None): |
||
1282 | logger.warning("Error in in " + self.sheetname + ". Missing Title field") |
||
1283 | elif not row.get('Department_title', None): |
||
1284 | logger.warning("Error in " + self.sheetname + ". Department field missing.") |
||
1285 | else: |
||
1286 | logger.warning("Error in " + self.sheetname + ". Department " |
||
1287 | + row.get('Department_title') + "is wrong.") |
||
1288 | |||
1289 | |||
1290 | class Methods(WorksheetImporter): |
||
1291 | |||
1292 | def Import(self): |
||
1293 | folder = self.context.methods |
||
1294 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1295 | for row in self.get_rows(3): |
||
1296 | if row['title']: |
||
1297 | calculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title')) |
||
1298 | obj = _createObjectByType("Method", folder, tmpID()) |
||
1299 | obj.edit( |
||
1300 | title=row['title'], |
||
1301 | description=row.get('description', ''), |
||
1302 | Instructions=row.get('Instructions', ''), |
||
1303 | ManualEntryOfResults=row.get('ManualEntryOfResults', True), |
||
1304 | Calculation=calculation, |
||
1305 | MethodID=row.get('MethodID', ''), |
||
1306 | Accredited=row.get('Accredited', True), |
||
1307 | ) |
||
1308 | # Obtain all created methods |
||
1309 | methods_brains = bsc.searchResults({'portal_type': 'Method'}) |
||
1310 | # If a the new method has the same MethodID as a created method, remove MethodID value. |
||
1311 | for methods in methods_brains: |
||
1312 | if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']: |
||
1313 | obj.edit(MethodID='') |
||
1314 | |||
1315 | View Code Duplication | if row['MethodDocument']: |
|
1316 | path = resource_filename( |
||
1317 | self.dataset_project, |
||
1318 | "setupdata/%s/%s" % (self.dataset_name, |
||
1319 | row['MethodDocument']) |
||
1320 | ) |
||
1321 | try: |
||
1322 | file_data = read_file(path) |
||
1323 | obj.setMethodDocument(file_data) |
||
1324 | except Exception as msg: |
||
1325 | logger.warning(msg[0] + " Error on sheet: " + self.sheetname) |
||
1326 | |||
1327 | obj.unmarkCreationFlag() |
||
1328 | renameAfterCreation(obj) |
||
1329 | notify(ObjectInitializedEvent(obj)) |
||
1330 | |||
1331 | |||
1332 | class Sampling_Deviations(WorksheetImporter): |
||
1333 | |||
1334 | View Code Duplication | def Import(self): |
|
1335 | folder = self.context.bika_setup.bika_samplingdeviations |
||
1336 | for row in self.get_rows(3): |
||
1337 | if row['title']: |
||
1338 | obj = _createObjectByType("SamplingDeviation", folder, tmpID()) |
||
1339 | obj.edit( |
||
1340 | title=row['title'], |
||
1341 | description=row.get('description', '') |
||
1342 | ) |
||
1343 | obj.unmarkCreationFlag() |
||
1344 | renameAfterCreation(obj) |
||
1345 | notify(ObjectInitializedEvent(obj)) |
||
1346 | |||
1347 | |||
1348 | class Calculations(WorksheetImporter): |
||
1349 | |||
1350 | def get_interim_fields(self): |
||
1351 | # preload Calculation Interim Fields sheet |
||
1352 | sheetname = 'Calculation Interim Fields' |
||
1353 | worksheet = self.workbook[sheetname] |
||
1354 | if not worksheet: |
||
1355 | return |
||
1356 | self.interim_fields = {} |
||
1357 | rows = self.get_rows(3, worksheet=worksheet) |
||
1358 | for row in rows: |
||
1359 | calc_title = row['Calculation_title'] |
||
1360 | if calc_title not in self.interim_fields.keys(): |
||
1361 | self.interim_fields[calc_title] = [] |
||
1362 | self.interim_fields[calc_title].append({ |
||
1363 | 'keyword': row['keyword'], |
||
1364 | 'title': row.get('title', ''), |
||
1365 | 'type': 'int', |
||
1366 | 'hidden': ('hidden' in row and row['hidden']) and True or False, |
||
1367 | 'value': row['value'], |
||
1368 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1369 | |||
1370 | def Import(self): |
||
1371 | self.get_interim_fields() |
||
1372 | folder = self.context.bika_setup.bika_calculations |
||
1373 | for row in self.get_rows(3): |
||
1374 | if not row['title']: |
||
1375 | continue |
||
1376 | calc_title = row['title'] |
||
1377 | calc_interims = self.interim_fields.get(calc_title, []) |
||
1378 | formula = row['Formula'] |
||
1379 | # scan formula for dep services |
||
1380 | keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula) |
||
1381 | # remove interims from deps |
||
1382 | interim_keys = [k['keyword'] for k in calc_interims] |
||
1383 | dep_keywords = [k for k in keywords if k not in interim_keys] |
||
1384 | |||
1385 | obj = _createObjectByType("Calculation", folder, tmpID()) |
||
1386 | obj.edit( |
||
1387 | title=calc_title, |
||
1388 | description=row.get('description', ''), |
||
1389 | InterimFields=calc_interims, |
||
1390 | Formula=str(row['Formula']) |
||
1391 | ) |
||
1392 | for kw in dep_keywords: |
||
1393 | self.defer(src_obj=obj, |
||
1394 | src_field='DependentServices', |
||
1395 | dest_catalog=SETUP_CATALOG, |
||
1396 | dest_query={'portal_type': 'AnalysisService', |
||
1397 | 'getKeyword': kw} |
||
1398 | ) |
||
1399 | obj.unmarkCreationFlag() |
||
1400 | renameAfterCreation(obj) |
||
1401 | notify(ObjectInitializedEvent(obj)) |
||
1402 | |||
1403 | # Now we have the calculations registered, try to assign default calcs |
||
1404 | # to methods |
||
1405 | sheet = self.workbook["Methods"] |
||
1406 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1407 | for row in self.get_rows(3, sheet): |
||
1408 | if row.get('title', '') and row.get('Calculation_title', ''): |
||
1409 | meth = self.get_object(bsc, "Method", row.get('title')) |
||
1410 | if meth and not meth.getCalculation(): |
||
1411 | calctit = safe_unicode(row['Calculation_title']).encode('utf-8') |
||
1412 | calc = self.get_object(bsc, "Calculation", calctit) |
||
1413 | if calc: |
||
1414 | meth.setCalculation(calc.UID()) |
||
1415 | |||
1416 | |||
1417 | class Analysis_Services(WorksheetImporter): |
||
1418 | |||
1419 | def load_interim_fields(self): |
||
1420 | # preload AnalysisService InterimFields sheet |
||
1421 | sheetname = 'AnalysisService InterimFields' |
||
1422 | worksheet = self.workbook[sheetname] |
||
1423 | if not worksheet: |
||
1424 | return |
||
1425 | self.service_interims = {} |
||
1426 | rows = self.get_rows(3, worksheet=worksheet) |
||
1427 | for row in rows: |
||
1428 | service_title = row['Service_title'] |
||
1429 | if service_title not in self.service_interims.keys(): |
||
1430 | self.service_interims[service_title] = [] |
||
1431 | self.service_interims[service_title].append({ |
||
1432 | 'keyword': row['keyword'], |
||
1433 | 'title': row.get('title', ''), |
||
1434 | 'type': 'int', |
||
1435 | 'value': row['value'], |
||
1436 | 'unit': row['unit'] and row['unit'] or ''}) |
||
1437 | |||
1438 | def load_result_options(self): |
||
1439 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1440 | sheetname = 'AnalysisService ResultOptions' |
||
1441 | worksheet = self.workbook[sheetname] |
||
1442 | if not worksheet: |
||
1443 | return |
||
1444 | for row in self.get_rows(3, worksheet=worksheet): |
||
1445 | service = self.get_object(bsc, 'AnalysisService', |
||
1446 | row.get('Service_title')) |
||
1447 | if not service: |
||
1448 | return |
||
1449 | sro = service.getResultOptions() |
||
1450 | sro.append({'ResultValue': row['ResultValue'], |
||
1451 | 'ResultText': row['ResultText']}) |
||
1452 | service.setResultOptions(sro) |
||
1453 | |||
1454 | def load_service_uncertainties(self): |
||
1455 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1456 | sheetname = 'Analysis Service Uncertainties' |
||
1457 | worksheet = self.workbook[sheetname] |
||
1458 | if not worksheet: |
||
1459 | return |
||
1460 | |||
1461 | bucket = {} |
||
1462 | count = 0 |
||
1463 | for row in self.get_rows(3, worksheet=worksheet): |
||
1464 | count += 1 |
||
1465 | service = self.get_object(bsc, 'AnalysisService', |
||
1466 | row.get('Service_title')) |
||
1467 | if not service: |
||
1468 | warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get('Service_title') |
||
1469 | logger.warning(warning) |
||
1470 | continue |
||
1471 | service_uid = service.UID() |
||
1472 | if service_uid not in bucket: |
||
1473 | bucket[service_uid] = [] |
||
1474 | bucket[service_uid].append( |
||
1475 | {'intercept_min': row['Range Min'], |
||
1476 | 'intercept_max': row['Range Max'], |
||
1477 | 'errorvalue': row['Uncertainty Value']} |
||
1478 | ) |
||
1479 | if count > 500: |
||
1480 | self.write_bucket(bucket) |
||
1481 | bucket = {} |
||
1482 | if bucket: |
||
1483 | self.write_bucket(bucket) |
||
1484 | |||
1485 | def get_methods(self, service_title, default_method): |
||
1486 | """ Return an array of objects of the type Method in accordance to the |
||
1487 | methods listed in the 'AnalysisService Methods' sheet and service |
||
1488 | set in the parameter service_title. |
||
1489 | If default_method is set, it will be included in the returned |
||
1490 | array. |
||
1491 | """ |
||
1492 | return self.get_relations(service_title, |
||
1493 | default_method, |
||
1494 | 'Method', |
||
1495 | SETUP_CATALOG, |
||
1496 | 'AnalysisService Methods', |
||
1497 | 'Method_title') |
||
1498 | |||
1499 | def get_instruments(self, service_title, default_instrument): |
||
1500 | """ Return an array of objects of the type Instrument in accordance to |
||
1501 | the instruments listed in the 'AnalysisService Instruments' sheet |
||
1502 | and service set in the parameter 'service_title'. |
||
1503 | If default_instrument is set, it will be included in the returned |
||
1504 | array. |
||
1505 | """ |
||
1506 | return self.get_relations(service_title, |
||
1507 | default_instrument, |
||
1508 | 'Instrument', |
||
1509 | SETUP_CATALOG, |
||
1510 | 'AnalysisService Instruments', |
||
1511 | 'Instrument_title') |
||
1512 | |||
1513 | def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column): |
||
1514 | """ Return an array of objects of the specified type in accordance to |
||
1515 | the object titles defined in the sheet specified in 'sheet_name' and |
||
1516 | service set in the paramenter 'service_title'. |
||
1517 | If a default_obj is set, it will be included in the returned array. |
||
1518 | """ |
||
1519 | out_objects = [default_obj] if default_obj else [] |
||
1520 | cat = getToolByName(self.context, catalog_name) |
||
1521 | worksheet = self.workbook[sheet_name] |
||
1522 | if not worksheet: |
||
1523 | return out_objects |
||
1524 | for row in self.get_rows(3, worksheet=worksheet): |
||
1525 | row_as_title = row.get('Service_title') |
||
1526 | if not row_as_title: |
||
1527 | return out_objects |
||
1528 | elif row_as_title != service_title: |
||
1529 | continue |
||
1530 | obj = self.get_object(cat, obj_type, row.get(column)) |
||
1531 | if obj: |
||
1532 | if default_obj and default_obj.UID() == obj.UID(): |
||
1533 | continue |
||
1534 | out_objects.append(obj) |
||
1535 | return out_objects |
||
1536 | |||
1537 | def write_bucket(self, bucket): |
||
1538 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1539 | for service_uid, uncertainties in bucket.items(): |
||
1540 | obj = bsc(UID=service_uid)[0].getObject() |
||
1541 | _uncert = list(obj.getUncertainties()) |
||
1542 | _uncert.extend(uncertainties) |
||
1543 | obj.setUncertainties(_uncert) |
||
1544 | |||
1545 | def Import(self): |
||
1546 | self.load_interim_fields() |
||
1547 | folder = self.context.bika_setup.bika_analysisservices |
||
1548 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1549 | for row in self.get_rows(3): |
||
1550 | if not row['title']: |
||
1551 | continue |
||
1552 | |||
1553 | obj = _createObjectByType("AnalysisService", folder, tmpID()) |
||
1554 | MTA = { |
||
1555 | 'days': self.to_int(row.get('MaxTimeAllowed_days',0),0), |
||
1556 | 'hours': self.to_int(row.get('MaxTimeAllowed_hours',0),0), |
||
1557 | 'minutes': self.to_int(row.get('MaxTimeAllowed_minutes',0),0), |
||
1558 | } |
||
1559 | category = self.get_object(bsc, 'AnalysisCategory', row.get('AnalysisCategory_title')) |
||
1560 | department = self.get_object(bsc, 'Department', row.get('Department_title')) |
||
1561 | container = self.get_object(bsc, 'SampleContainer', row.get('Container_title')) |
||
1562 | preservation = self.get_object(bsc, 'SamplePreservation', row.get('Preservation_title')) |
||
1563 | |||
1564 | # Analysis Service - Method considerations: |
||
1565 | # One Analysis Service can have 0 or n Methods associated (field |
||
1566 | # 'Methods' from the Schema). |
||
1567 | # If the Analysis Service has at least one method associated, then |
||
1568 | # one of those methods can be set as the defualt method (field |
||
1569 | # '_Method' from the Schema). |
||
1570 | # |
||
1571 | # To make it easier, if a DefaultMethod is declared in the |
||
1572 | # Analysis_Services spreadsheet, but the same AS has no method |
||
1573 | # associated in the Analysis_Service_Methods spreadsheet, then make |
||
1574 | # the assumption that the DefaultMethod set in the former has to be |
||
1575 | # associated to the AS although the relation is missing. |
||
1576 | defaultmethod = self.get_object(bsc, 'Method', row.get('DefaultMethod_title')) |
||
1577 | methods = self.get_methods(row['title'], defaultmethod) |
||
1578 | if not defaultmethod and methods: |
||
1579 | defaultmethod = methods[0] |
||
1580 | |||
1581 | # Analysis Service - Instrument considerations: |
||
1582 | # By default, an Analysis Services will be associated automatically |
||
1583 | # with several Instruments due to the Analysis Service - Methods |
||
1584 | # relation (an Instrument can be assigned to a Method and one Method |
||
1585 | # can have zero or n Instruments associated). There is no need to |
||
1586 | # set this assignment directly, the AnalysisService object will |
||
1587 | # find those instruments. |
||
1588 | # Besides this 'automatic' behavior, an Analysis Service can also |
||
1589 | # have 0 or n Instruments manually associated ('Instruments' field). |
||
1590 | # In this case, the attribute 'AllowInstrumentEntryOfResults' should |
||
1591 | # be set to True. |
||
1592 | # |
||
1593 | # To make it easier, if a DefaultInstrument is declared in the |
||
1594 | # Analysis_Services spreadsheet, but the same AS has no instrument |
||
1595 | # associated in the AnalysisService_Instruments spreadsheet, then |
||
1596 | # make the assumption the DefaultInstrument set in the former has |
||
1597 | # to be associated to the AS although the relation is missing and |
||
1598 | # the option AllowInstrumentEntryOfResults will be set to True. |
||
1599 | defaultinstrument = self.get_object(bsc, 'Instrument', row.get('DefaultInstrument_title')) |
||
1600 | instruments = self.get_instruments(row['title'], defaultinstrument) |
||
1601 | allowinstrentry = True if instruments else False |
||
1602 | if not defaultinstrument and instruments: |
||
1603 | defaultinstrument = instruments[0] |
||
1604 | |||
1605 | # The manual entry of results can only be set to false if the value |
||
1606 | # for the attribute "InstrumentEntryOfResults" is False. |
||
1607 | allowmanualentry = True if not allowinstrentry else row.get('ManualEntryOfResults', True) |
||
1608 | |||
1609 | # Analysis Service - Calculation considerations: |
||
1610 | # By default, the AnalysisService will use the Calculation associated |
||
1611 | # to the Default Method (the field "UseDefaultCalculation"==True). |
||
1612 | # If the Default Method for this AS doesn't have any Calculation |
||
1613 | # associated and the field "UseDefaultCalculation" is True, no |
||
1614 | # Calculation will be used for this AS ("_Calculation" field is |
||
1615 | # reserved and should not be set directly). |
||
1616 | # |
||
1617 | # To make it easier, if a Calculation is set by default in the |
||
1618 | # spreadsheet, then assume the UseDefaultCalculation has to be set |
||
1619 | # to False. |
||
1620 | deferredcalculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title')) |
||
1621 | usedefaultcalculation = False if deferredcalculation else True |
||
1622 | _calculation = deferredcalculation if deferredcalculation else \ |
||
1623 | (defaultmethod.getCalculation() if defaultmethod else None) |
||
1624 | |||
1625 | obj.edit( |
||
1626 | title=row['title'], |
||
1627 | ShortTitle=row.get('ShortTitle', row['title']), |
||
1628 | description=row.get('description', ''), |
||
1629 | Keyword=row['Keyword'], |
||
1630 | PointOfCapture=row['PointOfCapture'].lower(), |
||
1631 | Category=category, |
||
1632 | Department=department, |
||
1633 | Unit=row['Unit'] and row['Unit'] or None, |
||
1634 | Precision=row['Precision'] and str(row['Precision']) or '0', |
||
1635 | ExponentialFormatPrecision=str(self.to_int(row.get('ExponentialFormatPrecision',7),7)), |
||
1636 | LowerDetectionLimit='%06f' % self.to_float(row.get('LowerDetectionLimit', '0.0'), 0), |
||
1637 | UpperDetectionLimit='%06f' % self.to_float(row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0), |
||
1638 | DetectionLimitSelector=self.to_bool(row.get('DetectionLimitSelector',0)), |
||
1639 | MaxTimeAllowed=MTA, |
||
1640 | Price="%02f" % Float(row['Price']), |
||
1641 | BulkPrice="%02f" % Float(row['BulkPrice']), |
||
1642 | VAT="%02f" % Float(row['VAT']), |
||
1643 | _Method=defaultmethod, |
||
1644 | Methods=methods, |
||
1645 | ManualEntryOfResults=allowmanualentry, |
||
1646 | InstrumentEntryOfResults=allowinstrentry, |
||
1647 | Instruments=instruments, |
||
1648 | Calculation=_calculation, |
||
1649 | UseDefaultCalculation=usedefaultcalculation, |
||
1650 | DuplicateVariation="%02f" % Float(row['DuplicateVariation']), |
||
1651 | Accredited=self.to_bool(row['Accredited']), |
||
1652 | InterimFields=hasattr(self, 'service_interims') and self.service_interims.get( |
||
1653 | row['title'], []) or [], |
||
1654 | Separate=self.to_bool(row.get('Separate', False)), |
||
1655 | Container=container, |
||
1656 | Preservation=preservation, |
||
1657 | CommercialID=row.get('CommercialID', ''), |
||
1658 | ProtocolID=row.get('ProtocolID', '') |
||
1659 | ) |
||
1660 | obj.unmarkCreationFlag() |
||
1661 | renameAfterCreation(obj) |
||
1662 | notify(ObjectInitializedEvent(obj)) |
||
1663 | self.load_result_options() |
||
1664 | self.load_service_uncertainties() |
||
1665 | |||
1666 | |||
1667 | class Analysis_Specifications(WorksheetImporter): |
||
1668 | |||
1669 | def resolve_service(self, row): |
||
1670 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1671 | service = bsc( |
||
1672 | portal_type="AnalysisService", |
||
1673 | title=safe_unicode(row["service"]) |
||
1674 | ) |
||
1675 | if not service: |
||
1676 | service = bsc( |
||
1677 | portal_type="AnalysisService", |
||
1678 | getKeyword=safe_unicode(row["service"]) |
||
1679 | ) |
||
1680 | service = service[0].getObject() |
||
1681 | return service |
||
1682 | |||
1683 | def Import(self): |
||
1684 | bucket = {} |
||
1685 | client_catalog = getToolByName(self.context, CLIENT_CATALOG) |
||
1686 | setup_catalog = getToolByName(self.context, SETUP_CATALOG) |
||
1687 | # collect up all values into the bucket |
||
1688 | for row in self.get_rows(3): |
||
1689 | title = row.get("Title", False) |
||
1690 | if not title: |
||
1691 | title = row.get("title", False) |
||
1692 | if not title: |
||
1693 | continue |
||
1694 | parent = row["Client_title"] if row["Client_title"] else "lab" |
||
1695 | st = row["SampleType_title"] if row["SampleType_title"] else "" |
||
1696 | service = self.resolve_service(row) |
||
1697 | |||
1698 | if parent not in bucket: |
||
1699 | bucket[parent] = {} |
||
1700 | if title not in bucket[parent]: |
||
1701 | bucket[parent][title] = {"sampletype": st, "resultsrange": []} |
||
1702 | bucket[parent][title]["resultsrange"].append({ |
||
1703 | "keyword": service.getKeyword(), |
||
1704 | "min": row["min"] if row["min"] else "0", |
||
1705 | "max": row["max"] if row["max"] else "0", |
||
1706 | }) |
||
1707 | # write objects. |
||
1708 | for parent in bucket.keys(): |
||
1709 | for title in bucket[parent]: |
||
1710 | if parent == "lab": |
||
1711 | folder = self.context.bika_setup.bika_analysisspecs |
||
1712 | else: |
||
1713 | proxy = client_catalog( |
||
1714 | portal_type="Client", getName=safe_unicode(parent))[0] |
||
1715 | folder = proxy.getObject() |
||
1716 | st = bucket[parent][title]["sampletype"] |
||
1717 | resultsrange = bucket[parent][title]["resultsrange"] |
||
1718 | if st: |
||
1719 | st_uid = setup_catalog( |
||
1720 | portal_type="SampleType", title=safe_unicode(st))[0].UID |
||
1721 | obj = _createObjectByType("AnalysisSpec", folder, tmpID()) |
||
1722 | obj.edit(title=title) |
||
1723 | obj.setResultsRange(resultsrange) |
||
1724 | if st: |
||
1725 | obj.setSampleType(st_uid) |
||
1726 | obj.unmarkCreationFlag() |
||
1727 | renameAfterCreation(obj) |
||
1728 | notify(ObjectInitializedEvent(obj)) |
||
1729 | |||
1730 | |||
1731 | class Analysis_Profiles(WorksheetImporter): |
||
1732 | |||
1733 | def load_analysis_profile_services(self): |
||
1734 | sheetname = 'Analysis Profile Services' |
||
1735 | worksheet = self.workbook[sheetname] |
||
1736 | self.profile_services = {} |
||
1737 | if not worksheet: |
||
1738 | return |
||
1739 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1740 | for row in self.get_rows(3, worksheet=worksheet): |
||
1741 | if not row.get('Profile','') or not row.get('Service',''): |
||
1742 | continue |
||
1743 | if row['Profile'] not in self.profile_services.keys(): |
||
1744 | self.profile_services[row['Profile']] = [] |
||
1745 | # Here we match againts Keyword or Title. |
||
1746 | # XXX We need a utility for this kind of thing. |
||
1747 | service = self.get_object(bsc, 'AnalysisService', row.get('Service')) |
||
1748 | if not service: |
||
1749 | service = bsc(portal_type='AnalysisService', |
||
1750 | getKeyword=row['Service'])[0].getObject() |
||
1751 | self.profile_services[row['Profile']].append(service) |
||
1752 | |||
1753 | def Import(self): |
||
1754 | self.load_analysis_profile_services() |
||
1755 | folder = self.context.setup.analysisprofiles |
||
1756 | for row in self.get_rows(3): |
||
1757 | title = row.get("title", "") |
||
1758 | description = row.get("description", "") |
||
1759 | profile_key = row.get("ProfileKey", "") |
||
1760 | commercial_id = row.get("CommercialID", "") |
||
1761 | analysis_profile_price = row.get("AnalysisProfilePrice") |
||
1762 | analysis_profile_vat = row.get("AnalysisProfileVAT") |
||
1763 | use_analysis_profile_price = row.get("UseAnalysisProfilePrice") |
||
1764 | if title: |
||
1765 | obj = api.create(folder, "AnalysisProfile") |
||
1766 | api.edit(obj, |
||
1767 | title=api.safe_unicode(title), |
||
1768 | description=api.safe_unicode(description), |
||
1769 | profile_key=api.safe_unicode(profile_key), |
||
1770 | commercial_id=api.safe_unicode(commercial_id), |
||
1771 | analysis_profile_price=api.to_float( |
||
1772 | analysis_profile_price, 0.0), |
||
1773 | analysis_profile_vat=api.to_float( |
||
1774 | analysis_profile_vat, 0.0), |
||
1775 | use_analysis_profile_price=bool( |
||
1776 | use_analysis_profile_price)) |
||
1777 | # set the services |
||
1778 | obj.setServices(self.profile_services[row["title"]]) |
||
1779 | |||
1780 | |||
1781 | class Sample_Templates(WorksheetImporter): |
||
1782 | |||
1783 | def load_sampletemplate_services(self): |
||
1784 | sheetname = "Sample Template Services" |
||
1785 | worksheet = self.workbook[sheetname] |
||
1786 | if not worksheet: |
||
1787 | return |
||
1788 | sc = api.get_tool(SETUP_CATALOG) |
||
1789 | self.services = {} |
||
1790 | for row in self.get_rows(3, worksheet=worksheet): |
||
1791 | keyword = row.get("keyword") |
||
1792 | service = self.get_object(sc, "AnalysisService", keyword) |
||
1793 | part_id = row.get("part_id", "") |
||
1794 | title = row.get("SampleTemplate") |
||
1795 | if title not in self.services: |
||
1796 | self.services[title] = [] |
||
1797 | self.services[title].append({ |
||
1798 | "uid": api.get_uid(service), |
||
1799 | "part_id": part_id, |
||
1800 | }) |
||
1801 | |||
1802 | def load_sampletemplate_partitions(self): |
||
1803 | sheetname = "Sample Template Partitions" |
||
1804 | worksheet = self.workbook[sheetname] |
||
1805 | if not worksheet: |
||
1806 | return |
||
1807 | |||
1808 | sc = api.get_tool(SETUP_CATALOG) |
||
1809 | self.partitions = {} |
||
1810 | for row in self.get_rows(3, worksheet=worksheet): |
||
1811 | title = row.get("SampleTemplate") |
||
1812 | container = row.get("container") |
||
1813 | preservation = row.get("preservation") |
||
1814 | sampletype = row.get("sampletype") |
||
1815 | part_id = row.get("part_id") |
||
1816 | if title not in self.partitions: |
||
1817 | self.partitions[title] = [] |
||
1818 | container = self.get_object(sc, "SampleContainer", container) |
||
1819 | preservation = self.get_object(sc, "SamplePreservation", preservation) |
||
1820 | sampletype = self.get_object(sc, "SampleType", sampletype) |
||
1821 | self.partitions[title].append({ |
||
1822 | "part_id": part_id, |
||
1823 | "container": api.get_uid(container) if container else "", |
||
1824 | "preservation": api.get_uid(preservation) if preservation else "", |
||
1825 | "sampletype": api.get_uid(sampletype) if sampletype else "", |
||
1826 | }) |
||
1827 | |||
1828 | def Import(self): |
||
1829 | self.load_sampletemplate_services() |
||
1830 | self.load_sampletemplate_partitions() |
||
1831 | |||
1832 | setup = api.get_senaite_setup() |
||
1833 | folder = setup.sampletemplates |
||
1834 | sc = api.get_tool(SETUP_CATALOG) |
||
1835 | |||
1836 | for row in self.get_rows(3): |
||
1837 | title = row.get("title") |
||
1838 | if not title: |
||
1839 | continue |
||
1840 | services = self.services.get(title) |
||
1841 | client_title = row.get("Client_title") or "lab" |
||
1842 | partitions = self.partitions.get(title, []) |
||
1843 | if client_title == "lab": |
||
1844 | folder = setup.sampletemplates |
||
1845 | else: |
||
1846 | client = api.search({ |
||
1847 | "portal_type": "Client", |
||
1848 | "getName": client_title |
||
1849 | }, CLIENT_CATALOG) |
||
1850 | if len(client) == 1: |
||
1851 | folder = api.get_object(client[0]) |
||
1852 | |||
1853 | sampletype = self.get_object( |
||
1854 | sc, 'SampleType', row.get('SampleType_title')) |
||
1855 | samplepoint = self.get_object( |
||
1856 | sc, 'SamplePoint', row.get('SamplePoint_title')) |
||
1857 | |||
1858 | obj = api.create(folder, "SampleTemplate", title=title) |
||
1859 | obj.setSampleType(sampletype) |
||
1860 | obj.setSamplePoint(samplepoint) |
||
1861 | obj.setPartitions(partitions) |
||
1862 | obj.setServices(services) |
||
1863 | |||
1864 | |||
1865 | class Reference_Definitions(WorksheetImporter): |
||
1866 | |||
1867 | def load_reference_definition_results(self): |
||
1868 | sheetname = 'Reference Definition Results' |
||
1869 | worksheet = self.workbook[sheetname] |
||
1870 | if not worksheet: |
||
1871 | sheetname = 'Reference Definition Values' |
||
1872 | worksheet = self.workbook[sheetname] |
||
1873 | if not worksheet: |
||
1874 | return |
||
1875 | self.results = {} |
||
1876 | if not worksheet: |
||
1877 | return |
||
1878 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1879 | for row in self.get_rows(3, worksheet=worksheet): |
||
1880 | if row['ReferenceDefinition_title'] not in self.results.keys(): |
||
1881 | self.results[row['ReferenceDefinition_title']] = [] |
||
1882 | service = self.get_object(bsc, 'AnalysisService', |
||
1883 | row.get('service')) |
||
1884 | self.results[ |
||
1885 | row['ReferenceDefinition_title']].append({ |
||
1886 | 'uid': service.UID(), |
||
1887 | 'result': row['result'] if row['result'] else '0', |
||
1888 | 'min': row['min'] if row['min'] else '0', |
||
1889 | 'max': row['max'] if row['max'] else '0'}) |
||
1890 | |||
1891 | def Import(self): |
||
1892 | self.load_reference_definition_results() |
||
1893 | folder = self.context.bika_setup.bika_referencedefinitions |
||
1894 | for row in self.get_rows(3): |
||
1895 | if not row['title']: |
||
1896 | continue |
||
1897 | obj = _createObjectByType("ReferenceDefinition", folder, tmpID()) |
||
1898 | obj.edit( |
||
1899 | title=row['title'], |
||
1900 | description=row.get('description', ''), |
||
1901 | Blank=self.to_bool(row['Blank']), |
||
1902 | ReferenceResults=self.results.get(row['title'], []), |
||
1903 | Hazardous=self.to_bool(row['Hazardous'])) |
||
1904 | obj.unmarkCreationFlag() |
||
1905 | renameAfterCreation(obj) |
||
1906 | notify(ObjectInitializedEvent(obj)) |
||
1907 | |||
1908 | |||
1909 | class Worksheet_Templates(WorksheetImporter): |
||
1910 | |||
1911 | def load_wst_layouts(self): |
||
1912 | sheetname = 'Worksheet Template Layouts' |
||
1913 | worksheet = self.workbook[sheetname] |
||
1914 | self.wst_layouts = {} |
||
1915 | if not worksheet: |
||
1916 | return |
||
1917 | for row in self.get_rows(3, worksheet=worksheet): |
||
1918 | if row['WorksheetTemplate_title'] \ |
||
1919 | not in self.wst_layouts.keys(): |
||
1920 | self.wst_layouts[ |
||
1921 | row['WorksheetTemplate_title']] = [] |
||
1922 | self.wst_layouts[ |
||
1923 | row['WorksheetTemplate_title']].append({ |
||
1924 | 'pos': row['pos'], |
||
1925 | 'type': row['type'], |
||
1926 | 'blank_ref': row['blank_ref'], |
||
1927 | 'control_ref': row['control_ref'], |
||
1928 | 'dup': row['dup']}) |
||
1929 | |||
1930 | def load_wst_services(self): |
||
1931 | sheetname = 'Worksheet Template Services' |
||
1932 | worksheet = self.workbook[sheetname] |
||
1933 | self.wst_services = {} |
||
1934 | if not worksheet: |
||
1935 | return |
||
1936 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
1937 | for row in self.get_rows(3, worksheet=worksheet): |
||
1938 | service = self.get_object(bsc, 'AnalysisService', |
||
1939 | row.get('service')) |
||
1940 | if row['WorksheetTemplate_title'] not in self.wst_services.keys(): |
||
1941 | self.wst_services[row['WorksheetTemplate_title']] = [] |
||
1942 | self.wst_services[ |
||
1943 | row['WorksheetTemplate_title']].append(service.UID()) |
||
1944 | |||
1945 | def Import(self): |
||
1946 | self.load_wst_services() |
||
1947 | self.load_wst_layouts() |
||
1948 | folder = self.context.bika_setup.bika_worksheettemplates |
||
1949 | for row in self.get_rows(3): |
||
1950 | if row['title']: |
||
1951 | obj = _createObjectByType("WorksheetTemplate", folder, tmpID()) |
||
1952 | obj.edit( |
||
1953 | title=row['title'], |
||
1954 | description=row.get('description', ''), |
||
1955 | Layout=self.wst_layouts[row['title']]) |
||
1956 | obj.setService(self.wst_services[row['title']]) |
||
1957 | obj.unmarkCreationFlag() |
||
1958 | renameAfterCreation(obj) |
||
1959 | notify(ObjectInitializedEvent(obj)) |
||
1960 | |||
1961 | |||
1962 | class Setup(WorksheetImporter): |
||
1963 | |||
1964 | |||
1965 | def get_field_value(self, field, value): |
||
1966 | if value is None: |
||
1967 | return None |
||
1968 | converters = { |
||
1969 | "integer": self.to_integer_value, |
||
1970 | "fixedpoint": self.to_fixedpoint_value, |
||
1971 | "boolean": self.to_boolean_value, |
||
1972 | "string": self.to_string_value, |
||
1973 | "reference": self.to_reference_value, |
||
1974 | "duration": self.to_duration_value |
||
1975 | } |
||
1976 | try: |
||
1977 | return converters.get(field.type, None)(field, value) |
||
1978 | except Exception: |
||
1979 | logger.error("No valid type for Setup.{} ({}): {}" |
||
1980 | .format(field.getName(), field.type, value)) |
||
1981 | |||
1982 | def to_integer_value(self, field, value): |
||
1983 | return str(int(value)) |
||
1984 | |||
1985 | def to_fixedpoint_value(self, field, value): |
||
1986 | return str(float(value)) |
||
1987 | |||
1988 | def to_boolean_value(self, field, value): |
||
1989 | return self.to_bool(value) |
||
1990 | |||
1991 | def to_string_value(self, field, value): |
||
1992 | if field.vocabulary: |
||
1993 | return self.to_string_vocab_value(field, value) |
||
1994 | return value and str(value) or "" |
||
1995 | |||
1996 | def to_reference_value(self, field, value): |
||
1997 | if not value: |
||
1998 | return None |
||
1999 | |||
2000 | brains = api.search({"title": to_unicode(value)}) |
||
2001 | if brains: |
||
2002 | return api.get_uid(brains[0]) |
||
2003 | |||
2004 | msg = "No object found for Setup.{0} ({1}): {2}" |
||
2005 | msg = msg.format(field.getName(), field.type, value) |
||
2006 | logger.error(msg) |
||
2007 | raise ValueError(msg) |
||
2008 | |||
2009 | def to_string_vocab_value(self, field, value): |
||
2010 | vocabulary = field.vocabulary |
||
2011 | if type(vocabulary) is str: |
||
2012 | vocabulary = getFromString(api.get_setup(), vocabulary) |
||
2013 | else: |
||
2014 | vocabulary = vocabulary.items() |
||
2015 | |||
2016 | if not vocabulary: |
||
2017 | raise ValueError("Empty vocabulary for {}".format(field.getName())) |
||
2018 | |||
2019 | if type(vocabulary) in (tuple, list): |
||
2020 | vocabulary = {item[0]: item[1] for item in vocabulary} |
||
2021 | |||
2022 | for key, val in vocabulary.items(): |
||
2023 | key_low = str(to_utf8(key)).lower() |
||
2024 | val_low = str(to_utf8(val)).lower() |
||
2025 | value_low = str(value).lower() |
||
2026 | if key_low == value_low or val_low == value_low: |
||
2027 | return key |
||
2028 | raise ValueError("Vocabulary entry not found") |
||
2029 | |||
2030 | def to_duration_value(self, field, values): |
||
2031 | duration = ["days", "hours", "minutes"] |
||
2032 | duration = map(lambda d: "{}_{}".format(field.getName(), d), duration) |
||
2033 | return dict( |
||
2034 | days=api.to_int(values.get(duration[0], 0), 0), |
||
2035 | hours=api.to_int(values.get(duration[1], 0), 0), |
||
2036 | minutes=api.to_int(values.get(duration[2], 0), 0)) |
||
2037 | |||
2038 | def Import(self): |
||
2039 | values = {} |
||
2040 | for row in self.get_rows(3): |
||
2041 | values[row['Field']] = row['Value'] |
||
2042 | |||
2043 | bsetup = self.context.bika_setup |
||
2044 | bschema = bsetup.Schema() |
||
2045 | for field in bschema.fields(): |
||
2046 | value = None |
||
2047 | field_name = field.getName() |
||
2048 | if field_name in values: |
||
2049 | value = self.get_field_value(field, values[field_name]) |
||
2050 | elif field.type == "duration": |
||
2051 | value = self.get_field_value(field, values) |
||
2052 | |||
2053 | if value is None: |
||
2054 | continue |
||
2055 | try: |
||
2056 | obj_field = bsetup.getField(field_name) |
||
2057 | obj_field.set(bsetup, str(value)) |
||
2058 | except Exception: |
||
2059 | logger.error("No valid type for Setup.{} ({}): {}" |
||
2060 | .format(field_name, field.type, value)) |
||
2061 | |||
2062 | |||
2063 | class ID_Prefixes(WorksheetImporter): |
||
2064 | |||
2065 | def Import(self): |
||
2066 | prefixes = self.context.bika_setup.getIDFormatting() |
||
2067 | for row in self.get_rows(3): |
||
2068 | # remove existing prefix from list |
||
2069 | prefixes = [p for p in prefixes |
||
2070 | if p['portal_type'] != row['portal_type']] |
||
2071 | # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator' |
||
2072 | separator = row.get('separator', '-') |
||
2073 | separator = '' if separator == 'none' else separator |
||
2074 | # add new prefix to list |
||
2075 | prefixes.append({'portal_type': row['portal_type'], |
||
2076 | 'padding': row['padding'], |
||
2077 | 'prefix': row['prefix'], |
||
2078 | 'separator': separator}) |
||
2079 | #self.context.bika_setup.setIDFormatting(prefixes) |
||
2080 | |||
2081 | |||
2082 | class Attachment_Types(WorksheetImporter): |
||
2083 | |||
2084 | def Import(self): |
||
2085 | folder = self.context.bika_setup.bika_attachmenttypes |
||
2086 | for row in self.get_rows(3): |
||
2087 | obj = _createObjectByType("AttachmentType", folder, tmpID()) |
||
2088 | obj.edit( |
||
2089 | title=row['title'], |
||
2090 | description=row.get('description', '')) |
||
2091 | obj.unmarkCreationFlag() |
||
2092 | renameAfterCreation(obj) |
||
2093 | notify(ObjectInitializedEvent(obj)) |
||
2094 | |||
2095 | |||
2096 | class Reference_Samples(WorksheetImporter): |
||
2097 | |||
2098 | def load_reference_sample_results(self, sample): |
||
2099 | sheetname = 'Reference Sample Results' |
||
2100 | if not hasattr(self, 'results_worksheet'): |
||
2101 | worksheet = self.workbook[sheetname] |
||
2102 | if not worksheet: |
||
2103 | return |
||
2104 | self.results_worksheet = worksheet |
||
2105 | results = [] |
||
2106 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2107 | for row in self.get_rows(3, worksheet=self.results_worksheet): |
||
2108 | if row['ReferenceSample_id'] != sample.getId(): |
||
2109 | continue |
||
2110 | service = self.get_object(bsc, 'AnalysisService', |
||
2111 | row.get('AnalysisService_title')) |
||
2112 | if not service: |
||
2113 | warning = "Unable to load a reference sample result. Service %s not found." |
||
2114 | logger.warning(warning, sheetname) |
||
2115 | continue |
||
2116 | results.append({ |
||
2117 | 'uid': service.UID(), |
||
2118 | 'result': row['result'], |
||
2119 | 'min': row['min'], |
||
2120 | 'max': row['max']}) |
||
2121 | sample.setReferenceResults(results) |
||
2122 | |||
2123 | def load_reference_analyses(self, sample): |
||
2124 | sheetname = 'Reference Analyses' |
||
2125 | if not hasattr(self, 'analyses_worksheet'): |
||
2126 | worksheet = self.workbook[sheetname] |
||
2127 | if not worksheet: |
||
2128 | return |
||
2129 | self.analyses_worksheet = worksheet |
||
2130 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2131 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2132 | if row['ReferenceSample_id'] != sample.getId(): |
||
2133 | continue |
||
2134 | service = self.get_object(bsc, 'AnalysisService', |
||
2135 | row.get('AnalysisService_title')) |
||
2136 | # Analyses are keyed/named by service keyword |
||
2137 | obj = _createObjectByType("ReferenceAnalysis", sample, row['id']) |
||
2138 | obj.edit(title=row['id'], |
||
2139 | ReferenceType=row['ReferenceType'], |
||
2140 | Result=row['Result'], |
||
2141 | Analyst=row['Analyst'], |
||
2142 | Instrument=row['Instrument'], |
||
2143 | Retested=row['Retested'] |
||
2144 | ) |
||
2145 | obj.setService(service) |
||
2146 | # obj.setCreators(row['creator']) |
||
2147 | # obj.setCreationDate(row['created']) |
||
2148 | # self.set_wf_history(obj, row['workflow_history']) |
||
2149 | obj.unmarkCreationFlag() |
||
2150 | |||
2151 | self.load_reference_analysis_interims(obj) |
||
2152 | |||
2153 | View Code Duplication | def load_reference_analysis_interims(self, analysis): |
|
2154 | sheetname = 'Reference Analysis Interims' |
||
2155 | if not hasattr(self, 'interim_worksheet'): |
||
2156 | worksheet = self.workbook[sheetname] |
||
2157 | if not worksheet: |
||
2158 | return |
||
2159 | self.interim_worksheet = worksheet |
||
2160 | interims = [] |
||
2161 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2162 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2163 | continue |
||
2164 | interims.append({ |
||
2165 | 'keyword': row['keyword'], |
||
2166 | 'title': row['title'], |
||
2167 | 'value': row['value'], |
||
2168 | 'unit': row['unit'], |
||
2169 | 'hidden': row['hidden']}) |
||
2170 | analysis.setInterimFields(interims) |
||
2171 | |||
2172 | def Import(self): |
||
2173 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2174 | for row in self.get_rows(3): |
||
2175 | if not row['id']: |
||
2176 | continue |
||
2177 | supplier = bsc(portal_type='Supplier', |
||
2178 | getName=row.get('Supplier_title', ''))[0].getObject() |
||
2179 | obj = _createObjectByType("ReferenceSample", supplier, row['id']) |
||
2180 | ref_def = self.get_object(bsc, 'ReferenceDefinition', |
||
2181 | row.get('ReferenceDefinition_title')) |
||
2182 | ref_man = self.get_object(bsc, 'Manufacturer', |
||
2183 | row.get('Manufacturer_title')) |
||
2184 | obj.edit(title=row['id'], |
||
2185 | description=row.get('description', ''), |
||
2186 | Blank=self.to_bool(row['Blank']), |
||
2187 | Hazardous=self.to_bool(row['Hazardous']), |
||
2188 | CatalogueNumber=row['CatalogueNumber'], |
||
2189 | LotNumber=row['LotNumber'], |
||
2190 | Remarks=row['Remarks'], |
||
2191 | ExpiryDate=row['ExpiryDate'], |
||
2192 | DateSampled=row['DateSampled'], |
||
2193 | DateReceived=row['DateReceived'], |
||
2194 | DateOpened=row['DateOpened'], |
||
2195 | DateExpired=row['DateExpired'], |
||
2196 | DateDisposed=row['DateDisposed'] |
||
2197 | ) |
||
2198 | obj.setReferenceDefinition(ref_def) |
||
2199 | obj.setManufacturer(ref_man) |
||
2200 | obj.unmarkCreationFlag() |
||
2201 | |||
2202 | self.load_reference_sample_results(obj) |
||
2203 | self.load_reference_analyses(obj) |
||
2204 | |||
2205 | class Analysis_Requests(WorksheetImporter): |
||
2206 | |||
2207 | def load_analyses(self, sample): |
||
2208 | sheetname = 'Analyses' |
||
2209 | if not hasattr(self, 'analyses_worksheet'): |
||
2210 | worksheet = self.workbook[sheetname] |
||
2211 | if not worksheet: |
||
2212 | return |
||
2213 | self.analyses_worksheet = worksheet |
||
2214 | bsc = getToolByName(self.context, SETUP_CATALOG) |
||
2215 | bc = getToolByName(self.context, SENAITE_CATALOG) |
||
2216 | for row in self.get_rows(3, worksheet=self.analyses_worksheet): |
||
2217 | service = bsc(portal_type='AnalysisService', |
||
2218 | title=row['AnalysisService_title'])[0].getObject() |
||
2219 | # analyses are keyed/named by keyword |
||
2220 | ar = bc(portal_type='AnalysisRequest', id=row['AnalysisRequest_id'])[0].getObject() |
||
2221 | obj = create_analysis( |
||
2222 | ar, service, |
||
2223 | Result=row['Result'], |
||
2224 | ResultCaptureDate=row['ResultCaptureDate'], |
||
2225 | Analyst=row['Analyst'], |
||
2226 | Instrument=row['Instrument'], |
||
2227 | Retested=self.to_bool(row['Retested']), |
||
2228 | MaxTimeAllowed={ |
||
2229 | 'days': int(row.get('MaxTimeAllowed_days', 0)), |
||
2230 | 'hours': int(row.get('MaxTimeAllowed_hours', 0)), |
||
2231 | 'minutes': int(row.get('MaxTimeAllowed_minutes', 0)), |
||
2232 | }, |
||
2233 | ) |
||
2234 | |||
2235 | analyses = ar.objectValues('Analyses') |
||
2236 | analyses = list(analyses) |
||
2237 | analyses.append(obj) |
||
2238 | ar.setAnalyses(analyses) |
||
2239 | obj.unmarkCreationFlag() |
||
2240 | |||
2241 | self.load_analysis_interims(obj) |
||
2242 | |||
2243 | View Code Duplication | def load_analysis_interims(self, analysis): |
|
2244 | sheetname = 'Reference Analysis Interims' |
||
2245 | if not hasattr(self, 'interim_worksheet'): |
||
2246 | worksheet = self.workbook[sheetname] |
||
2247 | if not worksheet: |
||
2248 | return |
||
2249 | self.interim_worksheet = worksheet |
||
2250 | interims = [] |
||
2251 | for row in self.get_rows(3, worksheet=self.interim_worksheet): |
||
2252 | if row['ReferenceAnalysis_id'] != analysis.getId(): |
||
2253 | continue |
||
2254 | interims.append({ |
||
2255 | 'keyword': row['keyword'], |
||
2256 | 'title': row['title'], |
||
2257 | 'value': row['value'], |
||
2258 | 'unit': row['unit'], |
||
2259 | 'hidden': row['hidden']}) |
||
2260 | analysis.setInterimFields(interims) |
||
2261 | |||
2262 | def Import(self): |
||
2263 | client_cat = api.get_tool(CLIENT_CATALOG) |
||
2264 | contact_cat = api.get_tool(CONTACT_CATALOG) |
||
2265 | setup_cat = api.get_tool(SETUP_CATALOG) |
||
2266 | for row in self.get_rows(3): |
||
2267 | if not row['id']: |
||
2268 | continue |
||
2269 | client = client_cat(portal_type="Client", |
||
2270 | getName=row['Client_title'])[0].getObject() |
||
2271 | obj = _createObjectByType("AnalysisRequest", client, row['id']) |
||
2272 | contact = contact_cat(portal_type="Contact", |
||
2273 | getFullname=row['Contact_Fullname'])[0].getObject() |
||
2274 | obj.edit( |
||
2275 | RequestID=row['id'], |
||
2276 | Contact=contact, |
||
2277 | CCEmails=row['CCEmails'], |
||
2278 | ClientOrderNumber=row['ClientOrderNumber'], |
||
2279 | InvoiceExclude=row['InvoiceExclude'], |
||
2280 | DateReceived=row['DateReceived'], |
||
2281 | DatePublished=row['DatePublished'], |
||
2282 | Remarks=row['Remarks'] |
||
2283 | ) |
||
2284 | if row['CCContact_Fullname']: |
||
2285 | contact = contact_cat(portal_type="Contact", |
||
2286 | getFullname=row['CCContact_Fullname'])[0].getObject() |
||
2287 | obj.setCCContact(contact) |
||
2288 | if row['AnalysisProfile_title']: |
||
2289 | profiles = setup_cat(portal_type="AnalysisProfile", |
||
2290 | title=row['AnalysisProfile_title'])[0].getObject() |
||
2291 | obj.setProfiles([profiles]) |
||
2292 | if row['ARTemplate_title']: |
||
2293 | template = setup_cat(portal_type="ARTemplate", |
||
2294 | title=row['ARTemplate_title'])[0].getObject() |
||
2295 | obj.setTemplate(template) |
||
2296 | |||
2297 | obj.unmarkCreationFlag() |
||
2298 | |||
2299 | self.load_analyses(obj) |
||
2300 | |||
2301 | |||
2302 | class Invoice_Batches(WorksheetImporter): |
||
2303 | |||
2304 | def Import(self): |
||
2305 | folder = self.context.invoices |
||
2306 | for row in self.get_rows(3): |
||
2307 | obj = _createObjectByType("InvoiceBatch", folder, tmpID()) |
||
2308 | if not row['title']: |
||
2309 | message = _("InvoiceBatch has no Title") |
||
2310 | raise Exception(t(message)) |
||
2311 | if not row['start']: |
||
2312 | message = _("InvoiceBatch has no Start Date") |
||
2313 | raise Exception(t(message)) |
||
2314 | if not row['end']: |
||
2315 | message = _("InvoiceBatch has no End Date") |
||
2316 | raise Exception(t(message)) |
||
2317 | obj.edit( |
||
2318 | title=row['title'], |
||
2319 | BatchStartDate=row['start'], |
||
2320 | BatchEndDate=row['end'], |
||
2321 | ) |
||
2322 | renameAfterCreation(obj) |
||
2323 | notify(ObjectInitializedEvent(obj)) |
||
2324 |