Passed
Push — master ( b60410...062e8c )
by Jordi
05:05
created

bika.lims.exportimport.setupdata   F

Complexity

Total Complexity 443

Size/Duplication

Total Lines 2333
Duplicated Lines 9.6 %

Importance

Changes 0
Metric Value
wmc 443
eloc 1845
dl 224
loc 2333
rs 0.8
c 0
b 0
f 0

85 Methods

Rating   Name   Duplication   Size   Complexity  
A Attachment_Types.Import() 0 10 2
A Lab_Products.Import() 0 19 2
C Calculations.Import() 0 45 10
A Reference_Definitions.Import() 0 16 3
A Analysis_Services.write_bucket() 0 7 2
A Sub_Groups.Import() 0 11 4
A Batch_Labels.Import() 0 9 3
A AR_Templates.load_artemplate_partitions() 0 20 4
C Analysis_Services.get_relations() 0 23 9
A Sample_Conditions.Import() 12 12 3
B Supplier_Contacts.Import() 0 23 5
B Sample_Point_Sample_Types.Import() 0 20 6
A Lab_Information.Import() 0 34 4
A Worksheet_Templates.load_wst_services() 14 14 4
A Analysis_Specifications.resolve_service() 0 13 2
A Worksheet_Templates.load_wst_layouts() 0 18 4
B Reference_Samples.load_reference_analysis_interims() 19 19 5
A Setup.to_fixedpoint_value() 0 2 1
A Instrument_Types.Import() 0 10 2
F Analysis_Services.Import() 0 122 13
A WorksheetImporter.to_float() 0 10 3
A Analysis_Profiles.Import() 0 18 3
A Setup.get_field_value() 0 16 3
A Instrument_Documents.Import() 0 7 3
A Analysis_Services.get_instruments() 0 13 1
B Reference_Samples.load_reference_sample_results() 0 24 6
A WorksheetImporter.fill_contactfields() 0 24 4
A WorksheetImporter.__init__() 0 2 1
A WorksheetImporter.get_file_data() 0 12 3
C WorksheetImporter.fill_addressfields() 0 23 9
A AR_Templates.load_artemplate_analyses() 16 16 4
A Setup.to_boolean_value() 0 2 1
A Calculations.get_interim_fields() 0 19 4
A Reference_Samples.Import() 0 32 3
A Sample_Matrices.Import() 13 13 3
B Methods.Import() 11 39 8
B AR_Templates.Import() 0 41 5
A Analysis_Services.load_interim_fields() 0 18 4
B Setup.Import() 0 23 7
A Setup.to_duration_value() 0 7 2
A WorksheetImporter.__call__() 0 23 3
D WorksheetImporter.get_rows() 0 44 13
A Analysis_Requests.load_analyses() 0 35 4
A Clients.Import() 0 23 4
D Instrument_Certifications.Import() 12 48 12
C Reference_Definitions.load_reference_definition_results() 0 23 9
A SetupDataSetList.__call__() 0 2 1
B Analysis_Requests.load_analysis_interims() 19 19 5
B Instrument_Validations.Import() 29 29 7
A Analysis_Services.load_result_options() 0 15 4
B Sample_Points.Import() 0 37 8
A Storage_Locations.Import() 0 25 3
B Setup.to_string_vocab_value() 0 20 7
F Lab_Contacts.Import() 0 120 26
B Containers.Import() 0 24 7
A Suppliers.Import() 0 23 3
A Container_Types.Import() 11 11 3
B Analysis_Requests.Import() 0 38 6
B Instrument_Calibrations.Import() 29 29 7
A WorksheetImporter.to_int() 0 10 3
D Instruments.Import() 23 76 11
B Client_Contacts.Import() 0 62 7
B WorksheetImporter.get_object() 0 24 8
B Reference_Samples.load_reference_analyses() 0 29 5
A Setup.to_reference_value() 0 12 3
A WorksheetImporter.defer() 0 2 1
A WorksheetImporter.Import() 0 2 1
B Invoice_Batches.Import() 0 20 5
B Instrument_Maintenance_Tasks.Import() 0 30 7
A Setup.to_integer_value() 0 2 1
A Preservations.Import() 0 18 3
B Sample_Types.Import() 0 32 5
B Analysis_Profiles.load_analysis_profile_services() 0 19 7
B Analysis_Services.load_service_uncertainties() 0 30 7
B Lab_Departments.Import() 0 23 6
B Instrument_Schedule.Import() 0 30 6
A Setup.to_string_value() 0 4 2
A Manufacturers.Import() 0 13 3
A WorksheetImporter.to_bool() 0 21 5
A Sampling_Deviations.Import() 12 12 3
A Analysis_Services.get_methods() 0 13 1
A ID_Prefixes.Import() 0 14 3
B Analysis_Categories.Import() 0 24 7
F Analysis_Specifications.Import() 0 46 16
A Worksheet_Templates.Import() 0 15 3

5 Functions

Rating   Name   Duplication   Size   Complexity  
B addDocument() 0 43 7
A lookup() 0 6 1
A read_file() 0 11 4
A check_for_required_columns() 0 5 3
A Float() 0 6 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like bika.lims.exportimport.setupdata often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import datetime
22
import os.path
23
import re
24
25
from pkg_resources import resource_filename
26
27
import transaction
28
from bika.lims import api
29
from bika.lims import bikaMessageFactory as _
30
from bika.lims import logger
31
from bika.lims.exportimport.dataimport import SetupDataSetList as SDL
32
from bika.lims.idserver import renameAfterCreation
33
from bika.lims.interfaces import ISetupDataSetList
34
from bika.lims.utils import getFromString
35
from bika.lims.utils import t
36
from bika.lims.utils import tmpID
37
from bika.lims.utils import to_unicode
38
from bika.lims.utils.analysis import create_analysis
39
from Products.Archetypes.event import ObjectInitializedEvent
40
from Products.CMFCore.utils import getToolByName
41
from Products.CMFPlone.utils import _createObjectByType
42
from Products.CMFPlone.utils import safe_unicode
43
from zope.event import notify
44
from zope.interface import implements
45
46
47
def lookup(context, portal_type, **kwargs):
48
    at = getToolByName(context, 'archetype_tool')
49
    catalog = at.catalog_map.get(portal_type, [None])[0] or 'portal_catalog'
50
    catalog = getToolByName(context, catalog)
51
    kwargs['portal_type'] = portal_type
52
    return catalog(**kwargs)[0].getObject()
53
54
55
def check_for_required_columns(name, data, required):
56
    for column in required:
57
        if not data.get(column, None):
58
            message = _("%s has no '%s' column." % (name, column))
59
            raise Exception(t(message))
60
61
62
def Float(thing):
63
    try:
64
        f = float(thing)
65
    except ValueError:
66
        f = 0.0
67
    return f
68
69
70
def read_file(path):
71
    if os.path.isfile(path):
72
        return open(path, "rb").read()
73
    allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt',
74
                   'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt']
75
    allowed_ext += [e.upper() for e in allowed_ext]
76
    for e in allowed_ext:
77
        out = '%s.%s' % (path, e)
78
        if os.path.isfile(out):
79
            return open(out, "rb").read()
80
    raise IOError("File not found: %s. Allowed extensions: %s" % (path, ','.join(allowed_ext)))
81
82
83
class SetupDataSetList(SDL):
84
85
    implements(ISetupDataSetList)
86
87
    def __call__(self):
88
        return SDL.__call__(self, projectname="bika.lims")
89
90
91
class WorksheetImporter:
92
93
    """Use this as a base, for normal tabular data sheet imports.
94
    """
95
96
    def __init__(self, context):
97
        self.adapter_context = context
98
99
    def __call__(self, lsd, workbook, dataset_project, dataset_name):
100
        self.lsd = lsd
101
        self.context = lsd.context
102
        self.workbook = workbook
103
        self.sheetname = self.__class__.__name__.replace("_", " ")
104
        self.worksheet = workbook.get_sheet_by_name(self.sheetname)
105
        self.dataset_project = dataset_project
106
        self.dataset_name = dataset_name
107
        if self.worksheet:
108
            logger.info("Loading {0}.{1}: {2}".format(
109
                self.dataset_project, self.dataset_name, self.sheetname))
110
            try:
111
                self.Import()
112
            except IOError:
113
                # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/
114
                # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from
115
                # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624
116
                warning = "Error while loading attached file from %s. The file will not be uploaded into the system."
117
                logger.warning(warning, self.sheetname)
118
                self.context.plone_utils.addPortalMessage("Error while loading some attached files. "
119
                                                          "The files weren't uploaded into the system.")
120
        else:
121
            logger.info("No records found: '{0}'".format(self.sheetname))
122
123
    def get_rows(self, startrow=3, worksheet=None):
124
        """Returns a generator for all rows in a sheet.
125
           Each row contains a dictionary where the key is the value of the
126
           first row of the sheet for each column.
127
           The data values are returned in utf-8 format.
128
           Starts to consume data from startrow
129
        """
130
131
        headers = []
132
        row_nr = 0
133
        worksheet = worksheet if worksheet else self.worksheet
134
        for row in worksheet.rows:  # .iter_rows():
135
            row_nr += 1
136
            if row_nr == 1:
137
                # headers = [cell.internal_value for cell in row]
138
                headers = [cell.value for cell in row]
139
                continue
140
            if row_nr % 1000 == 0:
141
                transaction.savepoint()
142
            if row_nr <= startrow:
143
                continue
144
            # row = [_c(cell.internal_value).decode('utf-8') for cell in row]
145
            new_row = []
146
            for cell in row:
147
                value = cell.value
148
                if value is None:
149
                    value = ''
150
                if isinstance(value, unicode):
151
                    value = value.encode('utf-8')
152
                # Strip any space, \t, \n, or \r characters from the left-hand
153
                # side, right-hand side, or both sides of the string
154
                if isinstance(value, str):
155
                    value = value.strip(' \t\n\r')
156
                new_row.append(value)
157
            row = dict(zip(headers, new_row))
158
159
            # parse out addresses
160
            for add_type in ['Physical', 'Postal', 'Billing']:
161
                row[add_type] = {}
162
                if add_type + "_Address" in row:
163
                    for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']:
164
                        row[add_type][key] = str(row.get("%s_%s" % (add_type, key), ''))
165
166
            yield row
167
168
    def get_file_data(self, filename):
169
        if filename:
170
            try:
171
                path = resource_filename(
172
                    self.dataset_project,
173
                    "setupdata/%s/%s" % (self.dataset_name, filename))
174
                file_data = open(path, "rb").read()
175
            except:
176
                file_data = None
177
        else:
178
            file_data = None
179
        return file_data
180
181
    def to_bool(self, value):
182
        """ Converts a sheet string value to a boolean value.
183
            Needed because of utf-8 conversions
184
        """
185
186
        try:
187
            value = value.lower()
188
        except:
189
            pass
190
        try:
191
            value = value.encode('utf-8')
192
        except:
193
            pass
194
        try:
195
            value = int(value)
196
        except:
197
            pass
198
        if value in ('true', 1):
199
            return True
200
        else:
201
            return False
202
203
    def to_int(self, value, default=0):
204
        """ Converts a value o a int. Returns default if the conversion fails.
205
        """
206
        try:
207
            return int(value)
208
        except ValueError:
209
            try:
210
                return int(default)
211
            except:
212
                return 0
213
214
    def to_float(self, value, default=0):
215
        """ Converts a value o a float. Returns default if the conversion fails.
216
        """
217
        try:
218
            return float(value)
219
        except ValueError:
220
            try:
221
                return float(default)
222
            except:
223
                return 0.0
224
225
    def defer(self, **kwargs):
226
        self.lsd.deferred.append(kwargs)
227
228
    def Import(self):
229
        """ Override this.
230
        XXX Simple generic sheet importer
231
        """
232
233
    def fill_addressfields(self, row, obj):
234
        """ Fills the address fields for the specified object if allowed:
235
            PhysicalAddress, PostalAddress, CountryState, BillingAddress
236
        """
237
        addresses = {}
238
        for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']:
239
            addresses[add_type] = {}
240
            for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']:
241
                addresses[add_type][key.lower()] = str(row.get("%s_%s" % (add_type, key), ''))
242
243
        if addresses['CountryState']['country'] == '' \
244
            and addresses['CountryState']['state'] == '':
245
            addresses['CountryState']['country'] = addresses['Physical']['country']
246
            addresses['CountryState']['state'] = addresses['Physical']['state']
247
248
        if hasattr(obj, 'setPhysicalAddress'):
249
            obj.setPhysicalAddress(addresses['Physical'])
250
        if hasattr(obj, 'setPostalAddress'):
251
            obj.setPostalAddress(addresses['Postal'])
252
        if hasattr(obj, 'setCountryState'):
253
            obj.setCountryState(addresses['CountryState'])
254
        if hasattr(obj, 'setBillingAddress'):
255
            obj.setBillingAddress(addresses['Billing'])
256
257
    def fill_contactfields(self, row, obj):
258
        """ Fills the contact fields for the specified object if allowed:
259
            EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone,
260
            MobilePhone
261
        """
262
        fieldnames = ['EmailAddress',
263
                      'Phone',
264
                      'Fax',
265
                      'BusinessPhone',
266
                      'BusinessFax',
267
                      'HomePhone',
268
                      'MobilePhone',
269
                      ]
270
        schema = obj.Schema()
271
        fields = dict([(field.getName(), field) for field in schema.fields()])
272
        for fieldname in fieldnames:
273
            try:
274
                field = fields[fieldname]
275
            except:
276
                if fieldname in row:
277
                    logger.info("Address field %s not found on %s"%(fieldname,obj))
278
                continue
279
            value = row.get(fieldname, '')
280
            field.set(obj, value)
281
282
    def get_object(self, catalog, portal_type, title=None, **kwargs):
283
        """This will return an object from the catalog.
284
        Logs a message and returns None if no object or multiple objects found.
285
        All keyword arguments are passed verbatim to the contentFilter
286
        """
287
        if not title and not kwargs:
288
            return None
289
        contentFilter = {"portal_type": portal_type}
290
        if title:
291
            contentFilter['title'] = to_unicode(title)
292
        contentFilter.update(kwargs)
293
        brains = catalog(contentFilter)
294
        if len(brains) > 1:
295
            logger.info("More than one object found for %s" % contentFilter)
296
            return None
297
        elif len(brains) == 0:
298
            if portal_type == 'AnalysisService':
299
                brains = catalog(portal_type=portal_type, getKeyword=title)
300
                if brains:
301
                    return brains[0].getObject()
302
            logger.info("No objects found for %s" % contentFilter)
303
            return None
304
        else:
305
            return brains[0].getObject()
306
307
308
class Sub_Groups(WorksheetImporter):
309
310
    def Import(self):
311
        folder = self.context.bika_setup.bika_subgroups
312
        for row in self.get_rows(3):
313
            if 'title' in row and row['title']:
314
                obj = _createObjectByType("SubGroup", folder, tmpID())
315
                obj.edit(title=row['title'],
316
                         description=row['description'],
317
                         SortKey=row['SortKey'])
318
                obj.unmarkCreationFlag()
319
                renameAfterCreation(obj)
320
                notify(ObjectInitializedEvent(obj))
321
322
323
class Lab_Information(WorksheetImporter):
324
325
    def Import(self):
326
        laboratory = self.context.bika_setup.laboratory
327
        values = {}
328
        for row in self.get_rows(3):
329
            values[row['Field']] = row['Value']
330
331
        if values['AccreditationBodyLogo']:
332
            path = resource_filename(
333
                self.dataset_project,
334
                "setupdata/%s/%s" % (self.dataset_name,
335
                                     values['AccreditationBodyLogo']))
336
            try:
337
                file_data = read_file(path)
338
            except Exception as msg:
339
                file_data = None
340
                logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
341
        else:
342
            file_data = None
343
344
        laboratory.edit(
345
            Name=values['Name'],
346
            LabURL=values['LabURL'],
347
            Confidence=values['Confidence'],
348
            LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']),
349
            AccreditationBodyLong=values['AccreditationBodyLong'],
350
            AccreditationBody=values['AccreditationBody'],
351
            AccreditationBodyURL=values['AccreditationBodyURL'],
352
            Accreditation=values['Accreditation'],
353
            AccreditationReference=values['AccreditationReference'],
354
            AccreditationBodyLogo=file_data,
355
            TaxNumber=values['TaxNumber'],
356
        )
357
        self.fill_contactfields(values, laboratory)
358
        self.fill_addressfields(values, laboratory)
359
360
361
class Lab_Contacts(WorksheetImporter):
362
363
    def Import(self):
364
        folder = self.context.bika_setup.bika_labcontacts
365
        portal_groups = getToolByName(self.context, 'portal_groups')
366
        portal_registration = getToolByName(
367
            self.context, 'portal_registration')
368
        rownum = 2
369
        for row in self.get_rows(3):
370
            rownum+=1
371
            if not row.get('Firstname',None):
372
                continue
373
374
            # Username already exists?
375
            username = row.get('Username','')
376
            fullname = ('%s %s' % (row['Firstname'], row.get('Surname', ''))).strip()
377
            if username:
378
                username = safe_unicode(username).encode('utf-8')
379
                bsc = getToolByName(self.context, 'bika_setup_catalog')
380
                exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username]
381
                if exists:
382
                    error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format(username, str(rownum))
383
                    logger.error(error)
384
                    continue
385
386
            # Is there a signature file defined? Try to get the file first.
387
            signature = None
388
            if row.get('Signature'):
389
                signature = self.get_file_data(row['Signature'])
390
                if not signature:
391
                    warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format(row['Signature'], username)
392
                    logger.warning(warning)
393
394
            obj = _createObjectByType("LabContact", folder, tmpID())
395
            obj.edit(
396
                title=fullname,
397
                Salutation=row.get('Salutation', ''),
398
                Firstname=row['Firstname'],
399
                Surname=row.get('Surname', ''),
400
                JobTitle=row.get('JobTitle', ''),
401
                Username=row.get('Username', ''),
402
                Signature=signature
403
            )
404
            obj.unmarkCreationFlag()
405
            renameAfterCreation(obj)
406
            notify(ObjectInitializedEvent(obj))
407
            self.fill_contactfields(row, obj)
408
            self.fill_addressfields(row, obj)
409
410
            if row['Department_title']:
411
                self.defer(src_obj=obj,
412
                           src_field='Department',
413
                           dest_catalog='bika_setup_catalog',
414
                           dest_query={'portal_type': 'Department',
415
                                       'title': row['Department_title']}
416
                           )
417
418
            # Create Plone user
419
            if not row['Username']:
420
                warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum))
421
                logger.warning(warn)
422
            if not row.get('EmailAddress', ''):
423
                warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum))
424
                logger.warning(warn)
425
426
            if(row['Username'] and row.get('EmailAddress','')):
427
                username = safe_unicode(row['Username']).encode('utf-8')
428
                passw = row['Password']
429
                if not passw:
430
                    warn = "Lab Contact: No password defined for user '{0}' in row {1}. Password established automatically to '{3}'".format(username, str(rownum), username)
431
                    logger.warning(warn)
432
                    passw = username
433
434
                try:
435
                    member = portal_registration.addMember(
436
                        username,
437
                        passw,
438
                        properties={
439
                            'username': username,
440
                            'email': row['EmailAddress'],
441
                            'fullname': fullname}
442
                    )
443
                except Exception as msg:
444
                    logger.error("Client Contact: Error adding user (%s): %s" % (msg, username))
445
                    continue
446
447
                groups = row.get('Groups', '')
448
                if not groups:
449
                    warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format(username, str(rownum))
450
                    logger.warning(warn)
451
                    groups = 'Analysts'
452
453
                group_ids = [g.strip() for g in groups.split(',')]
454
                # Add user to all specified groups
455
                for group_id in group_ids:
456
                    group = portal_groups.getGroupById(group_id)
457
                    if group:
458
                        group.addMember(username)
459
                roles = row.get('Roles', '')
460
                if roles:
461
                    role_ids = [r.strip() for r in roles.split(',')]
462
                    # Add user to all specified roles
463
                    for role_id in role_ids:
464
                        member._addRole(role_id)
465
                # If user is in LabManagers, add Owner local role on clients
466
                # folder
467
                if 'LabManager' in group_ids:
468
                    self.context.clients.manage_setLocalRoles(
469
                        username, ['Owner', ])
470
471
        # Now we have the lab contacts registered, try to assign the managers
472
        # to each department if required
473
        sheet = self.workbook.get_sheet_by_name("Lab Departments")
474
        bsc = getToolByName(self.context, 'bika_setup_catalog')
475
        for row in self.get_rows(3, sheet):
476
            if row['title'] and row['LabContact_Username']:
477
                dept = self.get_object(bsc, "Department", row.get('title'))
478
                if dept and not dept.getManager():
479
                    username = safe_unicode(row['LabContact_Username']).encode('utf-8')
480
                    exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username]
481
                    if exists:
482
                        dept.setManager(exists[0].UID())
483
484
class Lab_Departments(WorksheetImporter):
485
486
    def Import(self):
487
        folder = self.context.bika_setup.bika_departments
488
        bsc = getToolByName(self.context, 'bika_setup_catalog')
489
        lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact")]
490
        for row in self.get_rows(3):
491
            if row['title']:
492
                obj = _createObjectByType("Department", folder, tmpID())
493
                obj.edit(title=row['title'],
494
                         description=row.get('description', ''))
495
                manager = None
496
                for contact in lab_contacts:
497
                    if contact.getUsername() == row['LabContact_Username']:
498
                        manager = contact
499
                        break
500
                if manager:
501
                    obj.setManager(manager.UID())
502
                else:
503
                    message = "Department: lookup of '%s' in LabContacts/Username failed." % row[
504
                        'LabContact_Username']
505
                    logger.info(message)
506
                obj.unmarkCreationFlag()
507
                renameAfterCreation(obj)
508
                notify(ObjectInitializedEvent(obj))
509
510
511
class Lab_Products(WorksheetImporter):
512
513
    def Import(self):
514
        context = self.context
515
        # Refer to the default folder
516
        folder = self.context.bika_setup.bika_labproducts
517
        # Iterate through the rows
518
        for row in self.get_rows(3):
519
            # Create the SRTemplate object
520
            obj = _createObjectByType('LabProduct', folder, tmpID())
521
            # Apply the row values
522
            obj.edit(
523
                title=row.get('title', 'Unknown'),
524
                description=row.get('description', ''),
525
                Volume=row.get('volume', 0),
526
                Unit=str(row.get('unit', 0)),
527
                Price=str(row.get('price', 0)),
528
            )
529
            # Rename the new object
530
            renameAfterCreation(obj)
531
            notify(ObjectInitializedEvent(obj))
532
533
534
class Clients(WorksheetImporter):
535
536
    def Import(self):
537
        folder = self.context.clients
538
        for row in self.get_rows(3):
539
            obj = _createObjectByType("Client", folder, tmpID())
540
            if not row['Name']:
541
                message = "Client %s has no Name"
542
                raise Exception(message)
543
            if not row['ClientID']:
544
                message = "Client %s has no Client ID"
545
                raise Exception(message)
546
            obj.edit(Name=row['Name'],
547
                     ClientID=row['ClientID'],
548
                     MemberDiscountApplies=row[
549
                         'MemberDiscountApplies'] and True or False,
550
                     BulkDiscount=row['BulkDiscount'] and True or False,
551
                     TaxNumber=row.get('TaxNumber', ''),
552
                     AccountNumber=row.get('AccountNumber', '')
553
                     )
554
            self.fill_contactfields(row, obj)
555
            self.fill_addressfields(row, obj)
556
            obj.unmarkCreationFlag()
557
            renameAfterCreation(obj)
558
            notify(ObjectInitializedEvent(obj))
559
560
561
class Client_Contacts(WorksheetImporter):
562
563
    def Import(self):
564
        portal_groups = getToolByName(self.context, 'portal_groups')
565
        pc = getToolByName(self.context, 'portal_catalog')
566
        for row in self.get_rows(3):
567
            client = pc(portal_type="Client",
568
                        getName=row['Client_title'])
569
            if len(client) == 0:
570
                client_contact = "%(Firstname)s %(Surname)s" % row
571
                error = "Client invalid: '%s'. The Client Contact %s will not be uploaded."
572
                logger.error(error, row['Client_title'], client_contact)
573
                continue
574
            client = client[0].getObject()
575
            contact = _createObjectByType("Contact", client, tmpID())
576
            fullname = "%(Firstname)s %(Surname)s" % row
577
            pub_pref = [x.strip() for x in
578
                        row.get('PublicationPreference', '').split(",")]
579
            contact.edit(
580
                Salutation=row.get('Salutation', ''),
581
                Firstname=row.get('Firstname', ''),
582
                Surname=row.get('Surname', ''),
583
                Username=row['Username'],
584
                JobTitle=row.get('JobTitle', ''),
585
                Department=row.get('Department', ''),
586
                PublicationPreference=pub_pref,
587
                AttachmentsPermitted=row[
588
                    'AttachmentsPermitted'] and True or False,
589
            )
590
            self.fill_contactfields(row, contact)
591
            self.fill_addressfields(row, contact)
592
            contact.unmarkCreationFlag()
593
            renameAfterCreation(contact)
594
            notify(ObjectInitializedEvent(contact))
595
            # CC Contacts
596
            if row['CCContacts']:
597
                names = [x.strip() for x in row['CCContacts'].split(",")]
598
                for _fullname in names:
599
                    self.defer(src_obj=contact,
600
                               src_field='CCContact',
601
                               dest_catalog='portal_catalog',
602
                               dest_query={'portal_type': 'Contact',
603
                                           'getFullname': _fullname}
604
                               )
605
            ## Create Plone user
606
            username = safe_unicode(row['Username']).encode('utf-8')
607
            password = safe_unicode(row['Password']).encode('utf-8')
608
            if(username):
609
                try:
610
                    member = self.context.portal_registration.addMember(
611
                        username,
612
                        password,
613
                        properties={
614
                            'username': username,
615
                            'email': row['EmailAddress'],
616
                            'fullname': fullname}
617
                        )
618
                except Exception as msg:
619
                    logger.info("Error adding user (%s): %s" % (msg, username))
620
                contact.aq_parent.manage_setLocalRoles(row['Username'], ['Owner', ])
621
                contact.reindexObject()
622
                # add user to Clients group
623
                group = portal_groups.getGroupById('Clients')
624
                group.addMember(username)
625
626
627
class Container_Types(WorksheetImporter):
628
629 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
630
        folder = self.context.bika_setup.bika_containertypes
631
        for row in self.get_rows(3):
632
            if not row['title']:
633
                continue
634
            obj = _createObjectByType("ContainerType", folder, tmpID())
635
            obj.edit(title=row['title'],
636
                     description=row.get('description', ''))
637
            obj.unmarkCreationFlag()
638
            renameAfterCreation(obj)
639
            notify(ObjectInitializedEvent(obj))
640
641
642
class Preservations(WorksheetImporter):
643
644
    def Import(self):
645
        folder = self.context.bika_setup.bika_preservations
646
        for row in self.get_rows(3):
647
            if not row['title']:
648
                continue
649
            obj = _createObjectByType("Preservation", folder, tmpID())
650
            RP = {
651
                'days': int(row['RetentionPeriod_days'] and row['RetentionPeriod_days'] or 0),
652
                'hours': int(row['RetentionPeriod_hours'] and row['RetentionPeriod_hours'] or 0),
653
                'minutes': int(row['RetentionPeriod_minutes'] and row['RetentionPeriod_minutes'] or 0),
654
            }
655
656
            obj.edit(title=row['title'],
657
                     description=row.get('description', ''),
658
                     RetentionPeriod=RP)
659
            obj.unmarkCreationFlag()
660
            renameAfterCreation(obj)
661
            notify(ObjectInitializedEvent(obj))
662
663
664
class Containers(WorksheetImporter):
665
666
    def Import(self):
667
        folder = self.context.bika_setup.bika_containers
668
        bsc = getToolByName(self.context, 'bika_setup_catalog')
669
        for row in self.get_rows(3):
670
            if not row['title']:
671
                continue
672
            obj = _createObjectByType("Container", folder, tmpID())
673
            obj.edit(
674
                title=row['title'],
675
                description=row.get('description', ''),
676
                Capacity=row.get('Capacity', 0),
677
                PrePreserved=self.to_bool(row['PrePreserved'])
678
            )
679
            if row['ContainerType_title']:
680
                ct = self.get_object(bsc, 'ContainerType', row.get('ContainerType_title',''))
681
                if ct:
682
                    obj.setContainerType(ct)
683
            if row['Preservation_title']:
684
                pres = self.get_object(bsc, 'Preservation',row.get('Preservation_title',''))
685
                if pres:
686
                    obj.setPreservation(pres)
687
            obj.unmarkCreationFlag()
688
            renameAfterCreation(obj)
689
            notify(ObjectInitializedEvent(obj))
690
691
692
class Suppliers(WorksheetImporter):
693
694
    def Import(self):
695
        folder = self.context.bika_setup.bika_suppliers
696
        for row in self.get_rows(3):
697
            obj = _createObjectByType("Supplier", folder, tmpID())
698
            if row['Name']:
699
                obj.edit(
700
                    Name=row.get('Name', ''),
701
                    TaxNumber=row.get('TaxNumber', ''),
702
                    AccountType=row.get('AccountType', {}),
703
                    AccountName=row.get('AccountName', {}),
704
                    AccountNumber=row.get('AccountNumber', ''),
705
                    BankName=row.get('BankName', ''),
706
                    BankBranch=row.get('BankBranch', ''),
707
                    SWIFTcode=row.get('SWIFTcode', ''),
708
                    IBN=row.get('IBN', ''),
709
                    NIB=row.get('NIB', ''),
710
                    Website=row.get('Website', ''),
711
                )
712
                self.fill_contactfields(row, obj)
713
                self.fill_addressfields(row, obj)
714
                obj.unmarkCreationFlag()
715
                renameAfterCreation(obj)
716
                notify(ObjectInitializedEvent(obj))
717
718
719
class Supplier_Contacts(WorksheetImporter):
720
721
    def Import(self):
722
        bsc = getToolByName(self.context, 'bika_setup_catalog')
723
        for row in self.get_rows(3):
724
            if not row['Supplier_Name']:
725
                continue
726
            if not row['Firstname']:
727
                continue
728
            folder = bsc(portal_type="Supplier",
729
                         Title=row['Supplier_Name'])
730
            if not folder:
731
                continue
732
            folder = folder[0].getObject()
733
            obj = _createObjectByType("SupplierContact", folder, tmpID())
734
            obj.edit(
735
                Firstname=row['Firstname'],
736
                Surname=row.get('Surname', ''),
737
                Username=row.get('Username')
738
            )
739
            self.fill_contactfields(row, obj)
740
            self.fill_addressfields(row, obj)
741
            obj.unmarkCreationFlag()
742
            renameAfterCreation(obj)
743
            notify(ObjectInitializedEvent(obj))
744
745
746
class Manufacturers(WorksheetImporter):
747
748
    def Import(self):
749
        folder = self.context.bika_setup.bika_manufacturers
750
        for row in self.get_rows(3):
751
            obj = _createObjectByType("Manufacturer", folder, tmpID())
752
            if row['title']:
753
                obj.edit(
754
                    title=row['title'],
755
                    description=row.get('description', '')
756
                )
757
                self.fill_addressfields(row, obj)
758
                obj.unmarkCreationFlag()
759
                renameAfterCreation(obj)
760
                notify(ObjectInitializedEvent(obj))
761
762
763
class Instrument_Types(WorksheetImporter):
764
765
    def Import(self):
766
        folder = self.context.bika_setup.bika_instrumenttypes
767
        for row in self.get_rows(3):
768
                obj = _createObjectByType("InstrumentType", folder, tmpID())
769
                obj.edit(
770
                    title=row['title'],
771
                    description=row.get('description', ''))
772
                obj.unmarkCreationFlag()
773
                renameAfterCreation(obj)
774
                notify(ObjectInitializedEvent(obj))
775
776
777
class Instruments(WorksheetImporter):
778
779
    def Import(self):
780
        folder = self.context.bika_setup.bika_instruments
781
        bsc = getToolByName(self.context, 'bika_setup_catalog')
782
        pc = getToolByName(self.context, 'portal_catalog')
783
        for row in self.get_rows(3):
784
            if ('Type' not in row
785
                or 'Supplier' not in row
786
                or 'Brand' not in row):
787
                logger.info("Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title',''))
788
                continue
789
790
            obj = _createObjectByType("Instrument", folder, tmpID())
791
792
            obj.edit(
793
                title=row.get('title', ''),
794
                AssetNumber=row.get('assetnumber', ''),
795
                description=row.get('description', ''),
796
                Type=row.get('Type', ''),
797
                Brand=row.get('Brand', ''),
798
                Model=row.get('Model', ''),
799
                SerialNo=row.get('SerialNo', ''),
800
                DataInterface=row.get('DataInterface', ''),
801
                Location=row.get('Location', ''),
802
                InstallationDate=row.get('Instalationdate', ''),
803
                UserManualID=row.get('UserManualID', ''),
804
            )
805
            instrumenttype = self.get_object(bsc, 'InstrumentType', title=row.get('Type'))
806
            manufacturer = self.get_object(bsc, 'Manufacturer', title=row.get('Brand'))
807
            supplier = self.get_object(bsc, 'Supplier', getName=row.get('Supplier', ''))
808
            method = self.get_object(pc, 'Method', title=row.get('Method'))
809
            obj.setInstrumentType(instrumenttype)
810
            obj.setManufacturer(manufacturer)
811
            obj.setSupplier(supplier)
812
            if method:
813
                obj.setMethods([method])
814
                obj.setMethod(method)
815
816
            # Attaching the instrument's photo
817 View Code Duplication
            if row.get('Photo', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
818
                path = resource_filename(
819
                    self.dataset_project,
820
                    "setupdata/%s/%s" % (self.dataset_name,
821
                                         row['Photo'])
822
                )
823
                try:
824
                    file_data = read_file(path)
825
                    obj.setPhoto(file_data)
826
                except Exception as msg:
827
                    file_data = None
828
                    logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
829
830
            # Attaching the Installation Certificate if exists
831 View Code Duplication
            if row.get('InstalationCertificate', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
832
                path = resource_filename(
833
                    self.dataset_project,
834
                    "setupdata/%s/%s" % (self.dataset_name,
835
                                         row['InstalationCertificate'])
836
                )
837
                try:
838
                    file_data = read_file(path)
839
                    obj.setInstallationCertificate(file_data)
840
                except Exception as msg:
841
                    logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
842
843
            # Attaching the Instrument's manual if exists
844
            if row.get('UserManualFile', None):
845
                row_dict = {'DocumentID': row.get('UserManualID', 'manual'),
846
                            'DocumentVersion': '',
847
                            'DocumentLocation': '',
848
                            'DocumentType': 'Manual',
849
                            'File': row.get('UserManualFile', None)
850
                            }
851
                addDocument(self, row_dict, obj)
852
            obj.unmarkCreationFlag()
853
            renameAfterCreation(obj)
854
            notify(ObjectInitializedEvent(obj))
855
856
857 View Code Duplication
class Instrument_Validations(WorksheetImporter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
858
859
    def Import(self):
860
        bsc = getToolByName(self.context, 'bika_setup_catalog')
861
        for row in self.get_rows(3):
862
            if not row.get('instrument', None) or not row.get('title', None):
863
                continue
864
865
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
866
            if folder:
867
                obj = _createObjectByType("InstrumentValidation", folder, tmpID())
868
                obj.edit(
869
                    title=row['title'],
870
                    DownFrom=row.get('downfrom', ''),
871
                    DownTo=row.get('downto', ''),
872
                    Validator=row.get('validator', ''),
873
                    Considerations=row.get('considerations', ''),
874
                    WorkPerformed=row.get('workperformed', ''),
875
                    Remarks=row.get('remarks', ''),
876
                    DateIssued=row.get('DateIssued', ''),
877
                    ReportID=row.get('ReportID', '')
878
                )
879
                # Getting lab contacts
880
                bsc = getToolByName(self.context, 'bika_setup_catalog')
881
                lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", is_active=True)]
882
                for contact in lab_contacts:
883
                    if contact.getFullname() == row.get('Worker', ''):
884
                        obj.setWorker(contact.UID())
885
                obj.unmarkCreationFlag()
886
                renameAfterCreation(obj)
887
                notify(ObjectInitializedEvent(obj))
888
889
890 View Code Duplication
class Instrument_Calibrations(WorksheetImporter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
891
892
    def Import(self):
893
        bsc = getToolByName(self.context, 'bika_setup_catalog')
894
        for row in self.get_rows(3):
895
            if not row.get('instrument', None) or not row.get('title', None):
896
                continue
897
898
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
899
            if folder:
900
                obj = _createObjectByType("InstrumentCalibration", folder, tmpID())
901
                obj.edit(
902
                    title=row['title'],
903
                    DownFrom=row.get('downfrom', ''),
904
                    DownTo=row.get('downto', ''),
905
                    Calibrator=row.get('calibrator', ''),
906
                    Considerations=row.get('considerations', ''),
907
                    WorkPerformed=row.get('workperformed', ''),
908
                    Remarks=row.get('remarks', ''),
909
                    DateIssued=row.get('DateIssued', ''),
910
                    ReportID=row.get('ReportID', '')
911
                )
912
                # Gettinginstrument lab contacts
913
                bsc = getToolByName(self.context, 'bika_setup_catalog')
914
                lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')]
915
                for contact in lab_contacts:
916
                    if contact.getFullname() == row.get('Worker', ''):
917
                        obj.setWorker(contact.UID())
918
                obj.unmarkCreationFlag()
919
                renameAfterCreation(obj)
920
                notify(ObjectInitializedEvent(obj))
921
922
923
class Instrument_Certifications(WorksheetImporter):
924
925
    def Import(self):
926
        bsc = getToolByName(self.context, 'bika_setup_catalog')
927
        for row in self.get_rows(3):
928
            if not row['instrument'] or not row['title']:
929
                continue
930
931
            folder = self.get_object(bsc, 'Instrument', row.get('instrument',''))
932
            if folder:
933
                obj = _createObjectByType("InstrumentCertification", folder, tmpID())
934
                today = datetime.date.today()
935
                certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \
936
                    if row.get('validto', '') == '' else row.get('validto')
937
                certificate_start_date = today.strftime('%d/%m/%Y') \
938
                    if row.get('validfrom', '') == '' else row.get('validfrom')
939
                obj.edit(
940
                    title=row['title'],
941
                    AssetNumber=row.get('assetnumber', ''),
942
                    Date=row.get('date', ''),
943
                    ValidFrom=certificate_start_date,
944
                    ValidTo=certificate_expire_date,
945
                    Agency=row.get('agency', ''),
946
                    Remarks=row.get('remarks', ''),
947
                )
948
                # Attaching the Report Certificate if exists
949 View Code Duplication
                if row.get('report', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
950
                    path = resource_filename(
951
                        self.dataset_project,
952
                        "setupdata/%s/%s" % (self.dataset_name,
953
                                             row['report'])
954
                    )
955
                    try:
956
                        file_data = read_file(path)
957
                        obj.setDocument(file_data)
958
                    except Exception as msg:
959
                        file_data = None
960
                        logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
961
962
                # Getting lab contacts
963
                bsc = getToolByName(self.context, 'bika_setup_catalog')
964
                lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')]
965
                for contact in lab_contacts:
966
                    if contact.getFullname() == row.get('preparedby', ''):
967
                        obj.setPreparator(contact.UID())
968
                    if contact.getFullname() == row.get('approvedby', ''):
969
                        obj.setValidator(contact.UID())
970
                obj.unmarkCreationFlag()
971
                renameAfterCreation(obj)
972
                notify(ObjectInitializedEvent(obj))
973
974
975
class Instrument_Documents(WorksheetImporter):
976
977
    def Import(self):
978
        bsc = getToolByName(self.context, 'bika_setup_catalog')
979
        for row in self.get_rows(3):
980
            if not row.get('instrument', ''):
981
                continue
982
            folder = self.get_object(bsc, 'Instrument', row.get('instrument', ''))
983
            addDocument(self, row, folder)
984
985
def addDocument(self, row_dict, folder):
986
    """
987
    This function adds a multifile object to the instrument folder
988
    :param row_dict: the dictionary which contains the document information
989
    :param folder: the instrument object
990
    """
991
    if folder:
992
        # This content type need a file
993
        if row_dict.get('File', None):
994
            path = resource_filename(
995
                self.dataset_project,
996
                "setupdata/%s/%s" % (self.dataset_name,
997
                                     row_dict['File'])
998
            )
999
            try:
1000
                file_data = read_file(path)
1001
            except Exception as msg:
1002
                file_data = None
1003
                logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
1004
1005
            # Obtain all created instrument documents content type
1006
            catalog = getToolByName(self.context, 'bika_setup_catalog')
1007
            documents_brains = catalog.searchResults({'portal_type': 'Multifile'})
1008
            # If a the new document has the same DocumentID as a created document, this object won't be created.
1009
            idAlreadyInUse = False
1010
            for item in documents_brains:
1011
                if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''):
1012
                    warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \
1013
                              "the file hasn't been upload." % (row_dict.get('DocumentID', ''), row_dict.get('instrument', ''))
1014
                    self.context.plone_utils.addPortalMessage(warning)
1015
                    idAlreadyInUse = True
1016
            if not idAlreadyInUse:
1017
                obj = _createObjectByType("Multifile", folder, tmpID())
1018
                obj.edit(
1019
                    DocumentID=row_dict.get('DocumentID', ''),
1020
                    DocumentVersion=row_dict.get('DocumentVersion', ''),
1021
                    DocumentLocation=row_dict.get('DocumentLocation', ''),
1022
                    DocumentType=row_dict.get('DocumentType', ''),
1023
                    File=file_data
1024
                )
1025
                obj.unmarkCreationFlag()
1026
                renameAfterCreation(obj)
1027
                notify(ObjectInitializedEvent(obj))
1028
1029
1030
class Instrument_Maintenance_Tasks(WorksheetImporter):
1031
1032
    def Import(self):
1033
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1034
        for row in self.get_rows(3):
1035
            if not row['instrument'] or not row['title'] or not row['type']:
1036
                continue
1037
1038
            folder = self.get_object(bsc, 'Instrument',row.get('instrument'))
1039
            if folder:
1040
                obj = _createObjectByType("InstrumentMaintenanceTask", folder, tmpID())
1041
                try:
1042
                    cost = "%.2f" % (row.get('cost', 0))
1043
                except:
1044
                    cost = row.get('cost', '0.0')
1045
1046
                obj.edit(
1047
                    title=row['title'],
1048
                    description=row['description'],
1049
                    Type=row['type'],
1050
                    DownFrom=row.get('downfrom', ''),
1051
                    DownTo=row.get('downto', ''),
1052
                    Maintainer=row.get('maintaner', ''),
1053
                    Considerations=row.get('considerations', ''),
1054
                    WorkPerformed=row.get('workperformed', ''),
1055
                    Remarks=row.get('remarks', ''),
1056
                    Cost=cost,
1057
                    Closed=self.to_bool(row.get('closed'))
1058
                )
1059
                obj.unmarkCreationFlag()
1060
                renameAfterCreation(obj)
1061
                notify(ObjectInitializedEvent(obj))
1062
1063
1064
class Instrument_Schedule(WorksheetImporter):
1065
1066
    def Import(self):
1067
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1068
        for row in self.get_rows(3):
1069
            if not row['instrument'] or not row['title'] or not row['type']:
1070
                continue
1071
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
1072
            if folder:
1073
                obj = _createObjectByType("InstrumentScheduledTask", folder, tmpID())
1074
                criteria = [
1075
                    {'fromenabled': row.get('date', None) is not None,
1076
                     'fromdate': row.get('date', ''),
1077
                     'repeatenabled': ((row['numrepeats'] and
1078
                                        row['numrepeats'] > 1) or
1079
                                       (row['repeatuntil'] and
1080
                                        len(row['repeatuntil']) > 0)),
1081
                     'repeatunit': row.get('numrepeats', ''),
1082
                     'repeatperiod': row.get('periodicity', ''),
1083
                     'repeatuntilenabled': (row['repeatuntil'] and
1084
                                            len(row['repeatuntil']) > 0),
1085
                     'repeatuntil': row.get('repeatuntil')}
1086
                ]
1087
                obj.edit(
1088
                    title=row['title'],
1089
                    Type=row['type'],
1090
                    ScheduleCriteria=criteria,
1091
                    Considerations=row.get('considerations', ''),
1092
                )
1093
                obj.unmarkCreationFlag()
1094
                renameAfterCreation(obj)
1095
                notify(ObjectInitializedEvent(obj))
1096
1097
1098
class Sample_Matrices(WorksheetImporter):
1099
1100 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1101
        folder = self.context.bika_setup.bika_samplematrices
1102
        for row in self.get_rows(3):
1103
            if not row['title']:
1104
                continue
1105
            obj = _createObjectByType("SampleMatrix", folder, tmpID())
1106
            obj.edit(
1107
                title=row['title'],
1108
                description=row.get('description', '')
1109
            )
1110
            obj.unmarkCreationFlag()
1111
            renameAfterCreation(obj)
1112
            notify(ObjectInitializedEvent(obj))
1113
1114
1115
class Batch_Labels(WorksheetImporter):
1116
1117
    def Import(self):
1118
        folder = self.context.bika_setup.bika_batchlabels
1119
        for row in self.get_rows(3):
1120
            if row['title']:
1121
                obj = _createObjectByType("BatchLabel", folder, tmpID())
1122
                obj.edit(title=row['title'])
1123
                obj.unmarkCreationFlag()
1124
                renameAfterCreation(obj)
1125
                notify(ObjectInitializedEvent(obj))
1126
1127
1128
class Sample_Types(WorksheetImporter):
1129
1130
    def Import(self):
1131
        folder = self.context.bika_setup.bika_sampletypes
1132
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1133
        for row in self.get_rows(3):
1134
            if not row['title']:
1135
                continue
1136
            obj = _createObjectByType("SampleType", folder, tmpID())
1137
            samplematrix = self.get_object(bsc, 'SampleMatrix',
1138
                                           row.get('SampleMatrix_title'))
1139
            containertype = self.get_object(bsc, 'ContainerType',
1140
                                            row.get('ContainerType_title'))
1141
            retentionperiod = {
1142
                'days': row['RetentionPeriod'] if row['RetentionPeriod'] else 0,
1143
                'hours': 0,
1144
                'minutes': 0}
1145
            obj.edit(
1146
                title=row['title'],
1147
                description=row.get('description', ''),
1148
                RetentionPeriod=retentionperiod,
1149
                Hazardous=self.to_bool(row['Hazardous']),
1150
                SampleMatrix=samplematrix,
1151
                Prefix=row['Prefix'],
1152
                MinimumVolume=row['MinimumVolume'],
1153
                ContainerType=containertype
1154
            )
1155
            samplepoint = self.get_object(bsc, 'SamplePoint',
1156
                                          row.get('SamplePoint_title'))
1157
            if samplepoint:
1158
                obj.setSamplePoints([samplepoint, ])
1159
            obj.unmarkCreationFlag()
1160
            renameAfterCreation(obj)
1161
            notify(ObjectInitializedEvent(obj))
1162
1163
1164
class Sample_Points(WorksheetImporter):
1165
1166
    def Import(self):
1167
        setup_folder = self.context.bika_setup.bika_samplepoints
1168
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1169
        pc = getToolByName(self.context, 'portal_catalog')
1170
        for row in self.get_rows(3):
1171
            if not row['title']:
1172
                continue
1173
            if row['Client_title']:
1174
                client_title = row['Client_title']
1175
                client = pc(portal_type="Client", getName=client_title)
1176
                if len(client) == 0:
1177
                    error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded."
1178
                    logger.error(error, row['title'], client_title)
1179
                    continue
1180
                folder = client[0].getObject()
1181
            else:
1182
                folder = setup_folder
1183
1184
            if row['Latitude']:
1185
                logger.log("Ignored SamplePoint Latitude", 'error')
1186
            if row['Longitude']:
1187
                logger.log("Ignored SamplePoint Longitude", 'error')
1188
1189
            obj = _createObjectByType("SamplePoint", folder, tmpID())
1190
            obj.edit(
1191
                title=row['title'],
1192
                description=row.get('description', ''),
1193
                Composite=self.to_bool(row['Composite']),
1194
                Elevation=row['Elevation'],
1195
            )
1196
            sampletype = self.get_object(bsc, 'SampleType',
1197
                                         row.get('SampleType_title'))
1198
            if sampletype:
1199
                obj.setSampleTypes([sampletype, ])
1200
            obj.unmarkCreationFlag()
1201
            renameAfterCreation(obj)
1202
            notify(ObjectInitializedEvent(obj))
1203
1204
1205
class Sample_Point_Sample_Types(WorksheetImporter):
1206
1207
    def Import(self):
1208
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1209
        for row in self.get_rows(3):
1210
            sampletype = self.get_object(bsc,
1211
                                         'SampleType',
1212
                                         row.get('SampleType_title'))
1213
            samplepoint = self.get_object(bsc,
1214
                                          'SamplePoint',
1215
                                          row['SamplePoint_title'])
1216
            if samplepoint:
1217
                sampletypes = samplepoint.getSampleTypes()
1218
                if sampletype not in sampletypes:
1219
                    sampletypes.append(sampletype)
1220
                    samplepoint.setSampleTypes(sampletypes)
1221
1222
            if sampletype:
1223
                samplepoints = sampletype.getSamplePoints()
1224
                if samplepoint not in samplepoints:
1225
                    samplepoints.append(samplepoint)
1226
                    sampletype.setSamplePoints(samplepoints)
1227
1228
class Storage_Locations(WorksheetImporter):
1229
1230
    def Import(self):
1231
        setup_folder = self.context.bika_setup.bika_storagelocations
1232
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1233
        pc = getToolByName(self.context, 'portal_catalog')
1234
        for row in self.get_rows(3):
1235
            if not row['Address']:
1236
                continue
1237
1238
            obj = _createObjectByType("StorageLocation", setup_folder, tmpID())
1239
            obj.edit(
1240
                title=row['Address'],
1241
                SiteTitle=row['SiteTitle'],
1242
                SiteCode=row['SiteCode'],
1243
                SiteDescription=row['SiteDescription'],
1244
                LocationTitle=row['LocationTitle'],
1245
                LocationCode=row['LocationCode'],
1246
                LocationDescription=row['LocationDescription'],
1247
                LocationType=row['LocationType'],
1248
                ShelfTitle=row['ShelfTitle'],
1249
                ShelfCode=row['ShelfCode'],
1250
                ShelfDescription=row['ShelfDescription'],
1251
            )
1252
            obj.unmarkCreationFlag()
1253
            renameAfterCreation(obj)
1254
            notify(ObjectInitializedEvent(obj))
1255
1256
1257
class Sample_Conditions(WorksheetImporter):
1258
1259 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1260
        folder = self.context.bika_setup.bika_sampleconditions
1261
        for row in self.get_rows(3):
1262
            if row['title']:
1263
                obj = _createObjectByType("SampleCondition", folder, tmpID())
1264
                obj.edit(
1265
                    title=row['title'],
1266
                    description=row.get('description', '')
1267
                )
1268
                obj.unmarkCreationFlag()
1269
                renameAfterCreation(obj)
1270
                notify(ObjectInitializedEvent(obj))
1271
1272
1273
class Analysis_Categories(WorksheetImporter):
1274
1275
    def Import(self):
1276
        folder = self.context.bika_setup.bika_analysiscategories
1277
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1278
        for row in self.get_rows(3):
1279
            department = None
1280
            if row.get('Department_title', None):
1281
                department = self.get_object(bsc, 'Department',
1282
                                             row.get('Department_title'))
1283
            if row.get('title', None) and department:
1284
                obj = _createObjectByType("AnalysisCategory", folder, tmpID())
1285
                obj.edit(
1286
                    title=row['title'],
1287
                    description=row.get('description', ''))
1288
                obj.setDepartment(department)
1289
                obj.unmarkCreationFlag()
1290
                renameAfterCreation(obj)
1291
                notify(ObjectInitializedEvent(obj))
1292
            elif not row.get('title', None):
1293
                logger.warning("Error in in " + self.sheetname + ". Missing Title field")
1294
            elif not row.get('Department_title', None):
1295
                logger.warning("Error in " + self.sheetname + ". Department field missing.")
1296
            else:
1297
                logger.warning("Error in " + self.sheetname + ". Department "
1298
                               + row.get('Department_title') + "is wrong.")
1299
1300
1301
class Methods(WorksheetImporter):
1302
1303
    def Import(self):
1304
        folder = self.context.methods
1305
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1306
        for row in self.get_rows(3):
1307
            if row['title']:
1308
                calculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title'))
1309
                obj = _createObjectByType("Method", folder, tmpID())
1310
                obj.edit(
1311
                    title=row['title'],
1312
                    description=row.get('description', ''),
1313
                    Instructions=row.get('Instructions', ''),
1314
                    ManualEntryOfResults=row.get('ManualEntryOfResults', True),
1315
                    Calculation=calculation,
1316
                    MethodID=row.get('MethodID', ''),
1317
                    Accredited=row.get('Accredited', True),
1318
                )
1319
                # Obtain all created methods
1320
                catalog = getToolByName(self.context, 'portal_catalog')
1321
                methods_brains = catalog.searchResults({'portal_type': 'Method'})
1322
                # If a the new method has the same MethodID as a created method, remove MethodID value.
1323
                for methods in methods_brains:
1324
                    if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']:
1325
                        obj.edit(MethodID='')
1326
1327 View Code Duplication
                if row['MethodDocument']:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1328
                    path = resource_filename(
1329
                        self.dataset_project,
1330
                        "setupdata/%s/%s" % (self.dataset_name,
1331
                                             row['MethodDocument'])
1332
                    )
1333
                    try:
1334
                        file_data = read_file(path)
1335
                        obj.setMethodDocument(file_data)
1336
                    except Exception as msg:
1337
                        logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
1338
1339
                obj.unmarkCreationFlag()
1340
                renameAfterCreation(obj)
1341
                notify(ObjectInitializedEvent(obj))
1342
1343
1344
class Sampling_Deviations(WorksheetImporter):
1345
1346 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1347
        folder = self.context.bika_setup.bika_samplingdeviations
1348
        for row in self.get_rows(3):
1349
            if row['title']:
1350
                obj = _createObjectByType("SamplingDeviation", folder, tmpID())
1351
                obj.edit(
1352
                    title=row['title'],
1353
                    description=row.get('description', '')
1354
                )
1355
                obj.unmarkCreationFlag()
1356
                renameAfterCreation(obj)
1357
                notify(ObjectInitializedEvent(obj))
1358
1359
1360
class Calculations(WorksheetImporter):
1361
1362
    def get_interim_fields(self):
1363
        # preload Calculation Interim Fields sheet
1364
        sheetname = 'Calculation Interim Fields'
1365
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1366
        if not worksheet:
1367
            return
1368
        self.interim_fields = {}
1369
        rows = self.get_rows(3, worksheet=worksheet)
1370
        for row in rows:
1371
            calc_title = row['Calculation_title']
1372
            if calc_title not in self.interim_fields.keys():
1373
                self.interim_fields[calc_title] = []
1374
            self.interim_fields[calc_title].append({
1375
                'keyword': row['keyword'],
1376
                'title': row.get('title', ''),
1377
                'type': 'int',
1378
                'hidden': ('hidden' in row and row['hidden']) and True or False,
1379
                'value': row['value'],
1380
                'unit': row['unit'] and row['unit'] or ''})
1381
1382
    def Import(self):
1383
        self.get_interim_fields()
1384
        folder = self.context.bika_setup.bika_calculations
1385
        for row in self.get_rows(3):
1386
            if not row['title']:
1387
                continue
1388
            calc_title = row['title']
1389
            calc_interims = self.interim_fields.get(calc_title, [])
1390
            formula = row['Formula']
1391
            # scan formula for dep services
1392
            keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula)
1393
            # remove interims from deps
1394
            interim_keys = [k['keyword'] for k in calc_interims]
1395
            dep_keywords = [k for k in keywords if k not in interim_keys]
1396
1397
            obj = _createObjectByType("Calculation", folder, tmpID())
1398
            obj.edit(
1399
                title=calc_title,
1400
                description=row.get('description', ''),
1401
                InterimFields=calc_interims,
1402
                Formula=str(row['Formula'])
1403
            )
1404
            for kw in dep_keywords:
1405
                self.defer(src_obj=obj,
1406
                           src_field='DependentServices',
1407
                           dest_catalog='bika_setup_catalog',
1408
                           dest_query={'portal_type': 'AnalysisService',
1409
                                       'getKeyword': kw}
1410
                           )
1411
            obj.unmarkCreationFlag()
1412
            renameAfterCreation(obj)
1413
            notify(ObjectInitializedEvent(obj))
1414
1415
        # Now we have the calculations registered, try to assign default calcs
1416
        # to methods
1417
        sheet = self.workbook.get_sheet_by_name("Methods")
1418
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1419
        for row in self.get_rows(3, sheet):
1420
            if row.get('title', '') and row.get('Calculation_title', ''):
1421
                meth = self.get_object(bsc, "Method", row.get('title'))
1422
                if meth and not meth.getCalculation():
1423
                    calctit = safe_unicode(row['Calculation_title']).encode('utf-8')
1424
                    calc = self.get_object(bsc, "Calculation", calctit)
1425
                    if calc:
1426
                        meth.setCalculation(calc.UID())
1427
1428
1429
class Analysis_Services(WorksheetImporter):
1430
1431
    def load_interim_fields(self):
1432
        # preload AnalysisService InterimFields sheet
1433
        sheetname = 'AnalysisService InterimFields'
1434
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1435
        if not worksheet:
1436
            return
1437
        self.service_interims = {}
1438
        rows = self.get_rows(3, worksheet=worksheet)
1439
        for row in rows:
1440
            service_title = row['Service_title']
1441
            if service_title not in self.service_interims.keys():
1442
                self.service_interims[service_title] = []
1443
            self.service_interims[service_title].append({
1444
                'keyword': row['keyword'],
1445
                'title': row.get('title', ''),
1446
                'type': 'int',
1447
                'value': row['value'],
1448
                'unit': row['unit'] and row['unit'] or ''})
1449
1450
    def load_result_options(self):
1451
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1452
        sheetname = 'AnalysisService ResultOptions'
1453
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1454
        if not worksheet:
1455
            return
1456
        for row in self.get_rows(3, worksheet=worksheet):
1457
            service = self.get_object(bsc, 'AnalysisService',
1458
                                      row.get('Service_title'))
1459
            if not service:
1460
                return
1461
            sro = service.getResultOptions()
1462
            sro.append({'ResultValue': row['ResultValue'],
1463
                        'ResultText': row['ResultText']})
1464
            service.setResultOptions(sro)
1465
1466
    def load_service_uncertainties(self):
1467
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1468
        sheetname = 'AnalysisService Uncertainties'
1469
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1470
        if not worksheet:
1471
            return
1472
1473
        bucket = {}
1474
        count = 0
1475
        for row in self.get_rows(3, worksheet=worksheet):
1476
            count += 1
1477
            service = self.get_object(bsc, 'AnalysisService',
1478
                                      row.get('Service_title'))
1479
            if not service:
1480
                warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get('Service_title')
1481
                logger.warning(warning)
1482
                continue
1483
            service_uid = service.UID()
1484
            if service_uid not in bucket:
1485
                bucket[service_uid] = []
1486
            bucket[service_uid].append(
1487
                {'intercept_min': row['Range Min'],
1488
                 'intercept_max': row['Range Max'],
1489
                 'errorvalue': row['Uncertainty Value']}
1490
            )
1491
            if count > 500:
1492
                self.write_bucket(bucket)
1493
                bucket = {}
1494
        if bucket:
1495
            self.write_bucket(bucket)
1496
1497
    def get_methods(self, service_title, default_method):
1498
        """ Return an array of objects of the type Method in accordance to the
1499
            methods listed in the 'AnalysisService Methods' sheet and service
1500
            set in the parameter service_title.
1501
            If default_method is set, it will be included in the returned
1502
            array.
1503
        """
1504
        return self.get_relations(service_title,
1505
                                default_method,
1506
                                'Method',
1507
                                'portal_catalog',
1508
                                'AnalysisService Methods',
1509
                                'Method_title')
1510
1511
    def get_instruments(self, service_title, default_instrument):
1512
        """ Return an array of objects of the type Instrument in accordance to
1513
            the instruments listed in the 'AnalysisService Instruments' sheet
1514
            and service set in the parameter 'service_title'.
1515
            If default_instrument is set, it will be included in the returned
1516
            array.
1517
        """
1518
        return self.get_relations(service_title,
1519
                                default_instrument,
1520
                                'Instrument',
1521
                                'bika_setup_catalog',
1522
                                'AnalysisService Instruments',
1523
                                'Instrument_title')
1524
1525
    def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column):
1526
        """ Return an array of objects of the specified type in accordance to
1527
            the object titles defined in the sheet specified in 'sheet_name' and
1528
            service set in the paramenter 'service_title'.
1529
            If a default_obj is set, it will be included in the returned array.
1530
        """
1531
        out_objects = [default_obj] if default_obj else []
1532
        cat = getToolByName(self.context, catalog_name)
1533
        worksheet = self.workbook.get_sheet_by_name(sheet_name)
1534
        if not worksheet:
1535
            return out_objects
1536
        for row in self.get_rows(3, worksheet=worksheet):
1537
            row_as_title = row.get('Service_title')
1538
            if not row_as_title:
1539
                return out_objects
1540
            elif row_as_title != service_title:
1541
                continue
1542
            obj = self.get_object(cat, obj_type, row.get(column))
1543
            if obj:
1544
                if default_obj and default_obj.UID() == obj.UID():
1545
                    continue
1546
                out_objects.append(obj)
1547
        return out_objects
1548
1549
    def write_bucket(self, bucket):
1550
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1551
        for service_uid, uncertainties in bucket.items():
1552
            obj = bsc(UID=service_uid)[0].getObject()
1553
            _uncert = list(obj.getUncertainties())
1554
            _uncert.extend(uncertainties)
1555
            obj.setUncertainties(_uncert)
1556
1557
    def Import(self):
1558
        self.load_interim_fields()
1559
        folder = self.context.bika_setup.bika_analysisservices
1560
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1561
        pc = getToolByName(self.context, 'portal_catalog')
1562
        for row in self.get_rows(3):
1563
            if not row['title']:
1564
                continue
1565
1566
            obj = _createObjectByType("AnalysisService", folder, tmpID())
1567
            MTA = {
1568
                'days': self.to_int(row.get('MaxTimeAllowed_days',0),0),
1569
                'hours': self.to_int(row.get('MaxTimeAllowed_hours',0),0),
1570
                'minutes': self.to_int(row.get('MaxTimeAllowed_minutes',0),0),
1571
            }
1572
            category = self.get_object(bsc, 'AnalysisCategory', row.get('AnalysisCategory_title'))
1573
            department = self.get_object(bsc, 'Department', row.get('Department_title'))
1574
            container = self.get_object(bsc, 'Container', row.get('Container_title'))
1575
            preservation = self.get_object(bsc, 'Preservation', row.get('Preservation_title'))
1576
1577
            # Analysis Service - Method considerations:
1578
            # One Analysis Service can have 0 or n Methods associated (field
1579
            # 'Methods' from the Schema).
1580
            # If the Analysis Service has at least one method associated, then
1581
            # one of those methods can be set as the defualt method (field
1582
            # '_Method' from the Schema).
1583
            #
1584
            # To make it easier, if a DefaultMethod is declared in the
1585
            # Analysis_Services spreadsheet, but the same AS has no method
1586
            # associated in the Analysis_Service_Methods spreadsheet, then make
1587
            # the assumption that the DefaultMethod set in the former has to be
1588
            # associated to the AS although the relation is missing.
1589
            defaultmethod = self.get_object(pc, 'Method', row.get('DefaultMethod_title'))
1590
            methods = self.get_methods(row['title'], defaultmethod)
1591
            if not defaultmethod and methods:
1592
                defaultmethod = methods[0]
1593
1594
            # Analysis Service - Instrument considerations:
1595
            # By default, an Analysis Services will be associated automatically
1596
            # with several Instruments due to the Analysis Service - Methods
1597
            # relation (an Instrument can be assigned to a Method and one Method
1598
            # can have zero or n Instruments associated). There is no need to
1599
            # set this assignment directly, the AnalysisService object will
1600
            # find those instruments.
1601
            # Besides this 'automatic' behavior, an Analysis Service can also
1602
            # have 0 or n Instruments manually associated ('Instruments' field).
1603
            # In this case, the attribute 'AllowInstrumentEntryOfResults' should
1604
            # be set to True.
1605
            #
1606
            # To make it easier, if a DefaultInstrument is declared in the
1607
            # Analysis_Services spreadsheet, but the same AS has no instrument
1608
            # associated in the AnalysisService_Instruments spreadsheet, then
1609
            # make the assumption the DefaultInstrument set in the former has
1610
            # to be associated to the AS although the relation is missing and
1611
            # the option AllowInstrumentEntryOfResults will be set to True.
1612
            defaultinstrument = self.get_object(bsc, 'Instrument', row.get('DefaultInstrument_title'))
1613
            instruments = self.get_instruments(row['title'], defaultinstrument)
1614
            allowinstrentry = True if instruments else False
1615
            if not defaultinstrument and instruments:
1616
                defaultinstrument = instruments[0]
1617
1618
            # The manual entry of results can only be set to false if the value
1619
            # for the attribute "InstrumentEntryOfResults" is False.
1620
            allowmanualentry = True if not allowinstrentry else row.get('ManualEntryOfResults', True)
1621
1622
            # Analysis Service - Calculation considerations:
1623
            # By default, the AnalysisService will use the Calculation associated
1624
            # to the Default Method (the field "UseDefaultCalculation"==True).
1625
            # If the Default Method for this AS doesn't have any Calculation
1626
            # associated and the field "UseDefaultCalculation" is True, no
1627
            # Calculation will be used for this AS ("_Calculation" field is
1628
            # reserved and should not be set directly).
1629
            #
1630
            # To make it easier, if a Calculation is set by default in the
1631
            # spreadsheet, then assume the UseDefaultCalculation has to be set
1632
            # to False.
1633
            deferredcalculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title'))
1634
            usedefaultcalculation = False if deferredcalculation else True
1635
            _calculation = deferredcalculation if deferredcalculation else \
1636
                            (defaultmethod.getCalculation() if defaultmethod else None)
1637
1638
            obj.edit(
1639
                title=row['title'],
1640
                ShortTitle=row.get('ShortTitle', row['title']),
1641
                description=row.get('description', ''),
1642
                Keyword=row['Keyword'],
1643
                PointOfCapture=row['PointOfCapture'].lower(),
1644
                Category=category,
1645
                Department=department,
1646
                AttachmentOption=row.get('Attachment', '')[0].lower() if row.get('Attachment', '') else 'p',
1647
                Unit=row['Unit'] and row['Unit'] or None,
1648
                Precision=row['Precision'] and str(row['Precision']) or '0',
1649
                ExponentialFormatPrecision=str(self.to_int(row.get('ExponentialFormatPrecision',7),7)),
1650
                LowerDetectionLimit='%06f' % self.to_float(row.get('LowerDetectionLimit', '0.0'), 0),
1651
                UpperDetectionLimit='%06f' % self.to_float(row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0),
1652
                DetectionLimitSelector=self.to_bool(row.get('DetectionLimitSelector',0)),
1653
                MaxTimeAllowed=MTA,
1654
                Price="%02f" % Float(row['Price']),
1655
                BulkPrice="%02f" % Float(row['BulkPrice']),
1656
                VAT="%02f" % Float(row['VAT']),
1657
                _Method=defaultmethod,
1658
                Methods=methods,
1659
                ManualEntryOfResults=allowmanualentry,
1660
                InstrumentEntryOfResults=allowinstrentry,
1661
                Instruments=instruments,
1662
                Calculation=_calculation,
1663
                UseDefaultCalculation=usedefaultcalculation,
1664
                DuplicateVariation="%02f" % Float(row['DuplicateVariation']),
1665
                Accredited=self.to_bool(row['Accredited']),
1666
                InterimFields=hasattr(self, 'service_interims') and self.service_interims.get(
1667
                    row['title'], []) or [],
1668
                Separate=self.to_bool(row.get('Separate', False)),
1669
                Container=container,
1670
                Preservation=preservation,
1671
                CommercialID=row.get('CommercialID', ''),
1672
                ProtocolID=row.get('ProtocolID', '')
1673
            )
1674
            obj.unmarkCreationFlag()
1675
            renameAfterCreation(obj)
1676
            notify(ObjectInitializedEvent(obj))
1677
        self.load_result_options()
1678
        self.load_service_uncertainties()
1679
1680
1681
class Analysis_Specifications(WorksheetImporter):
1682
1683
    def resolve_service(self, row):
1684
        bsc = getToolByName(self.context, "bika_setup_catalog")
1685
        service = bsc(
1686
            portal_type="AnalysisService",
1687
            title=safe_unicode(row["service"])
1688
        )
1689
        if not service:
1690
            service = bsc(
1691
                portal_type="AnalysisService",
1692
                getKeyword=safe_unicode(row["service"])
1693
            )
1694
        service = service[0].getObject()
1695
        return service
1696
1697
    def Import(self):
1698
        s_t = ""
1699
        bucket = {}
1700
        pc = getToolByName(self.context, "portal_catalog")
1701
        bsc = getToolByName(self.context, "bika_setup_catalog")
1702
        # collect up all values into the bucket
1703
        for row in self.get_rows(3):
1704
            title = row.get("Title", False)
1705
            if not title:
1706
                title = row.get("title", False)
1707
                if not title:
1708
                    continue
1709
            parent = row["Client_title"] if row["Client_title"] else "lab"
1710
            st = row["SampleType_title"] if row["SampleType_title"] else ""
1711
            service = self.resolve_service(row)
1712
1713
            if parent not in bucket:
1714
                bucket[parent] = {}
1715
            if title not in bucket[parent]:
1716
                bucket[parent][title] = {"sampletype": st, "resultsrange": []}
1717
            bucket[parent][title]["resultsrange"].append({
1718
                "keyword": service.getKeyword(),
1719
                "min": row["min"] if row["min"] else "0",
1720
                "max": row["max"] if row["max"] else "0",
1721
                "error": row["error"] if row["error"] else "0"
1722
            })
1723
        # write objects.
1724
        for parent in bucket.keys():
1725
            for title in bucket[parent]:
1726
                if parent == "lab":
1727
                    folder = self.context.bika_setup.bika_analysisspecs
1728
                else:
1729
                    proxy = pc(portal_type="Client", getName=safe_unicode(parent))[0]
1730
                    folder = proxy.getObject()
1731
                st = bucket[parent][title]["sampletype"]
1732
                resultsrange = bucket[parent][title]["resultsrange"]
1733
                if st:
1734
                    st_uid = bsc(portal_type="SampleType", title=safe_unicode(st))[0].UID
1735
                obj = _createObjectByType("AnalysisSpec", folder, tmpID())
1736
                obj.edit(title=title)
1737
                obj.setResultsRange(resultsrange)
1738
                if st:
1739
                    obj.setSampleType(st_uid)
1740
                obj.unmarkCreationFlag()
1741
                renameAfterCreation(obj)
1742
                notify(ObjectInitializedEvent(obj))
1743
1744
1745
class Analysis_Profiles(WorksheetImporter):
1746
1747
    def load_analysis_profile_services(self):
1748
        sheetname = 'Analysis Profile Services'
1749
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1750
        self.profile_services = {}
1751
        if not worksheet:
1752
            return
1753
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1754
        for row in self.get_rows(3, worksheet=worksheet):
1755
            if not row.get('Profile','') or not row.get('Service',''):
1756
                continue
1757
            if row['Profile'] not in self.profile_services.keys():
1758
                self.profile_services[row['Profile']] = []
1759
            # Here we match againts Keyword or Title.
1760
            # XXX We need a utility for this kind of thing.
1761
            service = self.get_object(bsc, 'AnalysisService', row.get('Service'))
1762
            if not service:
1763
                service = bsc(portal_type='AnalysisService',
1764
                              getKeyword=row['Service'])[0].getObject()
1765
            self.profile_services[row['Profile']].append(service)
1766
1767
    def Import(self):
1768
        self.load_analysis_profile_services()
1769
        folder = self.context.bika_setup.bika_analysisprofiles
1770
        for row in self.get_rows(3):
1771
            if row['title']:
1772
                obj = _createObjectByType("AnalysisProfile", folder, tmpID())
1773
                obj.edit(title=row['title'],
1774
                         description=row.get('description', ''),
1775
                         ProfileKey=row['ProfileKey'],
1776
                         CommercialID=row.get('CommercialID', ''),
1777
                         AnalysisProfilePrice="%02f" % Float(row.get('AnalysisProfilePrice', '0.0')),
1778
                         AnalysisProfileVAT="%02f" % Float(row.get('AnalysisProfileVAT', '0.0')),
1779
                         UseAnalysisProfilePrice=row.get('UseAnalysisProfilePrice', False)
1780
                         )
1781
                obj.setService(self.profile_services[row['title']])
1782
                obj.unmarkCreationFlag()
1783
                renameAfterCreation(obj)
1784
                notify(ObjectInitializedEvent(obj))
1785
1786
1787
class AR_Templates(WorksheetImporter):
1788
1789 View Code Duplication
    def load_artemplate_analyses(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1790
        sheetname = 'AR Template Analyses'
1791
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1792
        self.artemplate_analyses = {}
1793
        if not worksheet:
1794
            return
1795
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1796
        for row in self.get_rows(3, worksheet=worksheet):
1797
            # XXX service_uid is not a uid
1798
            service = self.get_object(bsc, 'AnalysisService',
1799
                                      row.get('service_uid'))
1800
            if row['ARTemplate'] not in self.artemplate_analyses.keys():
1801
                self.artemplate_analyses[row['ARTemplate']] = []
1802
            self.artemplate_analyses[row['ARTemplate']].append(
1803
                {'service_uid': service.UID(),
1804
                 'partition': row['partition']
1805
                 }
1806
            )
1807
1808
    def load_artemplate_partitions(self):
1809
        sheetname = 'AR Template Partitions'
1810
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1811
        self.artemplate_partitions = {}
1812
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1813
        if not worksheet:
1814
            return
1815
        for row in self.get_rows(3, worksheet=worksheet):
1816
            if row['ARTemplate'] not in self.artemplate_partitions.keys():
1817
                self.artemplate_partitions[row['ARTemplate']] = []
1818
            container = self.get_object(bsc, 'Container',
1819
                                        row.get('container'))
1820
            preservation = self.get_object(bsc, 'Preservation',
1821
                                           row.get('preservation'))
1822
            self.artemplate_partitions[row['ARTemplate']].append({
1823
                'part_id': row['part_id'],
1824
                'Container': container.Title(),
1825
                'container_uid': container.UID(),
1826
                'Preservation': preservation.Title(),
1827
                'preservation_uid': preservation.UID()})
1828
1829
    def Import(self):
1830
        self.load_artemplate_analyses()
1831
        self.load_artemplate_partitions()
1832
        folder = self.context.bika_setup.bika_artemplates
1833
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1834
        pc = getToolByName(self.context, 'portal_catalog')
1835
        for row in self.get_rows(3):
1836
            if not row['title']:
1837
                continue
1838
            analyses = self.artemplate_analyses[row['title']]
1839
            client_title = row['Client_title'] or 'lab'
1840
            if row['title'] in self.artemplate_partitions:
1841
                partitions = self.artemplate_partitions[row['title']]
1842
            else:
1843
                partitions = [{'part_id': 'part-1',
1844
                               'container': '',
1845
                               'preservation': ''}]
1846
1847
            if client_title == 'lab':
1848
                folder = self.context.bika_setup.bika_artemplates
1849
            else:
1850
                folder = pc(portal_type='Client',
1851
                            getName=client_title)[0].getObject()
1852
1853
            sampletype = self.get_object(bsc, 'SampleType',
1854
                                         row.get('SampleType_title'))
1855
            samplepoint = self.get_object(bsc, 'SamplePoint',
1856
                                         row.get('SamplePoint_title'))
1857
1858
            obj = _createObjectByType("ARTemplate", folder, tmpID())
1859
            obj.edit(
1860
                title=str(row['title']),
1861
                description=row.get('description', ''),
1862
                Remarks=row.get('Remarks', ''),)
1863
            obj.setSampleType(sampletype)
1864
            obj.setSamplePoint(samplepoint)
1865
            obj.setPartitions(partitions)
1866
            obj.setAnalyses(analyses)
1867
            obj.unmarkCreationFlag()
1868
            renameAfterCreation(obj)
1869
            notify(ObjectInitializedEvent(obj))
1870
1871
1872
class Reference_Definitions(WorksheetImporter):
1873
1874
    def load_reference_definition_results(self):
1875
        sheetname = 'Reference Definition Results'
1876
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1877
        if not worksheet:
1878
            sheetname = 'Reference Definition Values'
1879
            worksheet = self.workbook.get_sheet_by_name(sheetname)
1880
            if not worksheet:
1881
                return
1882
        self.results = {}
1883
        if not worksheet:
1884
            return
1885
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1886
        for row in self.get_rows(3, worksheet=worksheet):
1887
            if row['ReferenceDefinition_title'] not in self.results.keys():
1888
                self.results[row['ReferenceDefinition_title']] = []
1889
            service = self.get_object(bsc, 'AnalysisService',
1890
                                      row.get('service'))
1891
            self.results[
1892
                row['ReferenceDefinition_title']].append({
1893
                    'uid': service.UID(),
1894
                    'result': row['result'] if row['result'] else '0',
1895
                    'min': row['min'] if row['min'] else '0',
1896
                    'max': row['max'] if row['max'] else '0'})
1897
1898
    def Import(self):
1899
        self.load_reference_definition_results()
1900
        folder = self.context.bika_setup.bika_referencedefinitions
1901
        for row in self.get_rows(3):
1902
            if not row['title']:
1903
                continue
1904
            obj = _createObjectByType("ReferenceDefinition", folder, tmpID())
1905
            obj.edit(
1906
                title=row['title'],
1907
                description=row.get('description', ''),
1908
                Blank=self.to_bool(row['Blank']),
1909
                ReferenceResults=self.results.get(row['title'], []),
1910
                Hazardous=self.to_bool(row['Hazardous']))
1911
            obj.unmarkCreationFlag()
1912
            renameAfterCreation(obj)
1913
            notify(ObjectInitializedEvent(obj))
1914
1915
1916
class Worksheet_Templates(WorksheetImporter):
1917
1918
    def load_wst_layouts(self):
1919
        sheetname = 'Worksheet Template Layouts'
1920
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1921
        self.wst_layouts = {}
1922
        if not worksheet:
1923
            return
1924
        for row in self.get_rows(3, worksheet=worksheet):
1925
            if row['WorksheetTemplate_title'] \
1926
               not in self.wst_layouts.keys():
1927
                self.wst_layouts[
1928
                    row['WorksheetTemplate_title']] = []
1929
            self.wst_layouts[
1930
                row['WorksheetTemplate_title']].append({
1931
                    'pos': row['pos'],
1932
                    'type': row['type'],
1933
                    'blank_ref': row['blank_ref'],
1934
                    'control_ref': row['control_ref'],
1935
                    'dup': row['dup']})
1936
1937 View Code Duplication
    def load_wst_services(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1938
        sheetname = 'Worksheet Template Services'
1939
        worksheet = self.workbook.get_sheet_by_name(sheetname)
1940
        self.wst_services = {}
1941
        if not worksheet:
1942
            return
1943
        bsc = getToolByName(self.context, 'bika_setup_catalog')
1944
        for row in self.get_rows(3, worksheet=worksheet):
1945
            service = self.get_object(bsc, 'AnalysisService',
1946
                                      row.get('service'))
1947
            if row['WorksheetTemplate_title'] not in self.wst_services.keys():
1948
                self.wst_services[row['WorksheetTemplate_title']] = []
1949
            self.wst_services[
1950
                row['WorksheetTemplate_title']].append(service.UID())
1951
1952
    def Import(self):
1953
        self.load_wst_services()
1954
        self.load_wst_layouts()
1955
        folder = self.context.bika_setup.bika_worksheettemplates
1956
        for row in self.get_rows(3):
1957
            if row['title']:
1958
                obj = _createObjectByType("WorksheetTemplate", folder, tmpID())
1959
                obj.edit(
1960
                    title=row['title'],
1961
                    description=row.get('description', ''),
1962
                    Layout=self.wst_layouts[row['title']])
1963
                obj.setService(self.wst_services[row['title']])
1964
                obj.unmarkCreationFlag()
1965
                renameAfterCreation(obj)
1966
                notify(ObjectInitializedEvent(obj))
1967
1968
1969
class Setup(WorksheetImporter):
1970
1971
1972
    def get_field_value(self, field, value):
1973
        if value is None:
1974
            return None
1975
        converters = {
1976
            "integer": self.to_integer_value,
1977
            "fixedpoint": self.to_fixedpoint_value,
1978
            "boolean": self.to_boolean_value,
1979
            "string": self.to_string_value,
1980
            "reference": self.to_reference_value,
1981
            "duration": self.to_duration_value
1982
        }
1983
        try:
1984
            return converters.get(field.type, None)(field, value)
1985
        except:
1986
            logger.error("No valid type for Setup.{} ({}): {}"
1987
                         .format(field.getName(), field.type, value))
1988
1989
    def to_integer_value(self, field, value):
1990
        return str(int(value))
1991
1992
    def to_fixedpoint_value(self, field, value):
1993
        return str(float(value))
1994
1995
    def to_boolean_value(self, field, value):
1996
        return self.to_bool(value)
1997
1998
    def to_string_value(self, field, value):
1999
        if field.vocabulary:
2000
            return self.to_string_vocab_value(field, value)
2001
        return value and str(value) or ""
2002
2003
    def to_reference_value(self, field, value):
2004
        if not value:
2005
            return None
2006
2007
        brains = api.search({"title": to_unicode(value)})
2008
        if brains:
2009
            return api.get_uid(brains[0])
2010
2011
        msg = "No object found for Setup.{0} ({1}): {2}"
2012
        msg = msg.format(field.getName(), field.type, value)
2013
        logger.error(msg)
2014
        raise ValueError(msg)
2015
2016
    def to_string_vocab_value(self, field, value):
2017
        vocabulary = field.vocabulary
2018
        if type(vocabulary) is str:
2019
            vocabulary = getFromString(api.get_setup(), vocabulary)
2020
        else:
2021
            vocabulary = vocabulary.items()
2022
2023
        if not vocabulary:
2024
            raise ValueError("Empty vocabulary for {}".format(field.getName()))
2025
2026
        if type(vocabulary) in (tuple, list):
2027
            vocabulary = {item[0]: item[1] for item in vocabulary}
2028
2029
        for key, val in vocabulary.items():
2030
            key_low = str(to_utf8(key)).lower()
2031
            val_low = str(to_utf8(val)).lower()
2032
            value_low = str(value).lower()
2033
            if key_low == value_low or val_low == value_low:
2034
                return key
2035
        raise ValueError("Vocabulary entry not found")
2036
2037
    def to_duration_value(self, field, values):
2038
        duration = ["days", "hours", "minutes"]
2039
        duration = map(lambda d: "{}_{}".format(field.getName(), d), duration)
2040
        return dict(
2041
            days=api.to_int(values.get(duration[0], 0), 0),
2042
            hours=api.to_int(values.get(duration[1], 0), 0),
2043
            minutes=api.to_int(values.get(duration[2], 0), 0))
2044
2045
    def Import(self):
2046
        values = {}
2047
        for row in self.get_rows(3):
2048
            values[row['Field']] = row['Value']
2049
2050
        bsetup = self.context.bika_setup
2051
        bschema = bsetup.Schema()
2052
        for field in bschema.fields():
2053
            value = None
2054
            field_name = field.getName()
2055
            if field_name in values:
2056
                value = self.get_field_value(field, values[field_name])
2057
            elif field.type == "duration":
2058
                value = self.get_field_value(field, values)
2059
2060
            if value is None:
2061
                continue
2062
            try:
2063
                obj_field = bsetup.getField(field_name)
2064
                obj_field.set(bsetup, str(value))
2065
            except:
2066
                logger.error("No valid type for Setup.{} ({}): {}"
2067
                             .format(field_name, field.type, value))
2068
2069
2070
class ID_Prefixes(WorksheetImporter):
2071
2072
    def Import(self):
2073
        prefixes = self.context.bika_setup.getIDFormatting()
2074
        for row in self.get_rows(3):
2075
            # remove existing prefix from list
2076
            prefixes = [p for p in prefixes
2077
                        if p['portal_type'] != row['portal_type']]
2078
            # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator'
2079
            separator = row.get('separator', '-')
2080
            separator = '' if separator == 'none' else separator
2081
            # add new prefix to list
2082
            prefixes.append({'portal_type': row['portal_type'],
2083
                             'padding': row['padding'],
2084
                             'prefix': row['prefix'],
2085
                             'separator': separator})
2086
        #self.context.bika_setup.setIDFormatting(prefixes)
2087
2088
2089
class Attachment_Types(WorksheetImporter):
2090
2091
    def Import(self):
2092
        folder = self.context.bika_setup.bika_attachmenttypes
2093
        for row in self.get_rows(3):
2094
            obj = _createObjectByType("AttachmentType", folder, tmpID())
2095
            obj.edit(
2096
                title=row['title'],
2097
                description=row.get('description', ''))
2098
            obj.unmarkCreationFlag()
2099
            renameAfterCreation(obj)
2100
            notify(ObjectInitializedEvent(obj))
2101
2102
2103
class Reference_Samples(WorksheetImporter):
2104
2105
    def load_reference_sample_results(self, sample):
2106
        sheetname = 'Reference Sample Results'
2107
        if not hasattr(self, 'results_worksheet'):
2108
            worksheet = self.workbook.get_sheet_by_name(sheetname)
2109
            if not worksheet:
2110
                return
2111
            self.results_worksheet = worksheet
2112
        results = []
2113
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2114
        for row in self.get_rows(3, worksheet=self.results_worksheet):
2115
            if row['ReferenceSample_id'] != sample.getId():
2116
                continue
2117
            service = self.get_object(bsc, 'AnalysisService',
2118
                                      row.get('AnalysisService_title'))
2119
            if not service:
2120
                warning = "Unable to load a reference sample result. Service %s not found."
2121
                logger.warning(warning, sheetname)
2122
                continue
2123
            results.append({
2124
                    'uid': service.UID(),
2125
                    'result': row['result'],
2126
                    'min': row['min'],
2127
                    'max': row['max']})
2128
        sample.setReferenceResults(results)
2129
2130
    def load_reference_analyses(self, sample):
2131
        sheetname = 'Reference Analyses'
2132
        if not hasattr(self, 'analyses_worksheet'):
2133
            worksheet = self.workbook.get_sheet_by_name(sheetname)
2134
            if not worksheet:
2135
                return
2136
            self.analyses_worksheet = worksheet
2137
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2138
        for row in self.get_rows(3, worksheet=self.analyses_worksheet):
2139
            if row['ReferenceSample_id'] != sample.getId():
2140
                continue
2141
            service = self.get_object(bsc, 'AnalysisService',
2142
                                      row.get('AnalysisService_title'))
2143
            # Analyses are keyed/named by service keyword
2144
            obj = _createObjectByType("ReferenceAnalysis", sample, row['id'])
2145
            obj.edit(title=row['id'],
2146
                     ReferenceType=row['ReferenceType'],
2147
                     Result=row['Result'],
2148
                     Analyst=row['Analyst'],
2149
                     Instrument=row['Instrument'],
2150
                     Retested=row['Retested']
2151
                     )
2152
            obj.setService(service)
2153
            # obj.setCreators(row['creator'])
2154
            # obj.setCreationDate(row['created'])
2155
            # self.set_wf_history(obj, row['workflow_history'])
2156
            obj.unmarkCreationFlag()
2157
2158
            self.load_reference_analysis_interims(obj)
2159
2160 View Code Duplication
    def load_reference_analysis_interims(self, analysis):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2161
        sheetname = 'Reference Analysis Interims'
2162
        if not hasattr(self, 'interim_worksheet'):
2163
            worksheet = self.workbook.get_sheet_by_name(sheetname)
2164
            if not worksheet:
2165
                return
2166
            self.interim_worksheet = worksheet
2167
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2168
        interims = []
2169
        for row in self.get_rows(3, worksheet=self.interim_worksheet):
2170
            if row['ReferenceAnalysis_id'] != analysis.getId():
2171
                continue
2172
            interims.append({
2173
                    'keyword': row['keyword'],
2174
                    'title': row['title'],
2175
                    'value': row['value'],
2176
                    'unit': row['unit'],
2177
                    'hidden': row['hidden']})
2178
        analysis.setInterimFields(interims)
2179
2180
    def Import(self):
2181
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2182
        for row in self.get_rows(3):
2183
            if not row['id']:
2184
                continue
2185
            supplier = bsc(portal_type='Supplier',
2186
                           getName=row.get('Supplier_title', ''))[0].getObject()
2187
            obj = _createObjectByType("ReferenceSample", supplier, row['id'])
2188
            ref_def = self.get_object(bsc, 'ReferenceDefinition',
2189
                                      row.get('ReferenceDefinition_title'))
2190
            ref_man = self.get_object(bsc, 'Manufacturer',
2191
                                      row.get('Manufacturer_title'))
2192
            obj.edit(title=row['id'],
2193
                     description=row.get('description', ''),
2194
                     Blank=self.to_bool(row['Blank']),
2195
                     Hazardous=self.to_bool(row['Hazardous']),
2196
                     CatalogueNumber=row['CatalogueNumber'],
2197
                     LotNumber=row['LotNumber'],
2198
                     Remarks=row['Remarks'],
2199
                     ExpiryDate=row['ExpiryDate'],
2200
                     DateSampled=row['DateSampled'],
2201
                     DateReceived=row['DateReceived'],
2202
                     DateOpened=row['DateOpened'],
2203
                     DateExpired=row['DateExpired'],
2204
                     DateDisposed=row['DateDisposed']
2205
                     )
2206
            obj.setReferenceDefinition(ref_def)
2207
            obj.setManufacturer(ref_man)
2208
            obj.unmarkCreationFlag()
2209
2210
            self.load_reference_sample_results(obj)
2211
            self.load_reference_analyses(obj)
2212
2213
class Analysis_Requests(WorksheetImporter):
2214
2215
    def load_analyses(self, sample):
2216
        sheetname = 'Analyses'
2217
        if not hasattr(self, 'analyses_worksheet'):
2218
            worksheet = self.workbook.get_sheet_by_name(sheetname)
2219
            if not worksheet:
2220
                return
2221
            self.analyses_worksheet = worksheet
2222
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2223
        bc = getToolByName(self.context, 'bika_catalog')
2224
        for row in self.get_rows(3, worksheet=self.analyses_worksheet):
2225
            service = bsc(portal_type='AnalysisService',
2226
                          title=row['AnalysisService_title'])[0].getObject()
2227
            # analyses are keyed/named by keyword
2228
            ar = bc(portal_type='AnalysisRequest', id=row['AnalysisRequest_id'])[0].getObject()
2229
            obj = create_analysis(
2230
                ar, service,
2231
                Result=row['Result'],
2232
                ResultCaptureDate=row['ResultCaptureDate'],
2233
                Analyst=row['Analyst'],
2234
                Instrument=row['Instrument'],
2235
                Retested=self.to_bool(row['Retested']),
2236
                MaxTimeAllowed={
2237
                    'days': int(row.get('MaxTimeAllowed_days', 0)),
2238
                    'hours': int(row.get('MaxTimeAllowed_hours', 0)),
2239
                    'minutes': int(row.get('MaxTimeAllowed_minutes', 0)),
2240
                },
2241
            )
2242
2243
            analyses = ar.objectValues('Analyses')
2244
            analyses = list(analyses)
2245
            analyses.append(obj)
2246
            ar.setAnalyses(analyses)
2247
            obj.unmarkCreationFlag()
2248
2249
            self.load_analysis_interims(obj)
2250
2251 View Code Duplication
    def load_analysis_interims(self, analysis):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2252
        sheetname = 'Reference Analysis Interims'
2253
        if not hasattr(self, 'interim_worksheet'):
2254
            worksheet = self.workbook.get_sheet_by_name(sheetname)
2255
            if not worksheet:
2256
                return
2257
            self.interim_worksheet = worksheet
2258
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2259
        interims = []
2260
        for row in self.get_rows(3, worksheet=self.interim_worksheet):
2261
            if row['ReferenceAnalysis_id'] != analysis.getId():
2262
                continue
2263
            interims.append({
2264
                    'keyword': row['keyword'],
2265
                    'title': row['title'],
2266
                    'value': row['value'],
2267
                    'unit': row['unit'],
2268
                    'hidden': row['hidden']})
2269
        analysis.setInterimFields(interims)
2270
2271
    def Import(self):
2272
        bc = getToolByName(self.context, 'bika_catalog')
2273
        bsc = getToolByName(self.context, 'bika_setup_catalog')
2274
        pc = getToolByName(self.context, 'portal_catalog')
2275
        for row in self.get_rows(3):
2276
            if not row['id']:
2277
                continue
2278
            client = pc(portal_type="Client",
2279
                        getName=row['Client_title'])[0].getObject()
2280
            obj = _createObjectByType("AnalysisRequest", client, row['id'])
2281
            contact = pc(portal_type="Contact",
2282
                         getFullname=row['Contact_Fullname'])[0].getObject()
2283
            obj.edit(
2284
                RequestID=row['id'],
2285
                Contact=contact,
2286
                CCEmails=row['CCEmails'],
2287
                ClientOrderNumber=row['ClientOrderNumber'],
2288
                InvoiceExclude=row['InvoiceExclude'],
2289
                DateReceived=row['DateReceived'],
2290
                DatePublished=row['DatePublished'],
2291
                Remarks=row['Remarks']
2292
            )
2293
            if row['CCContact_Fullname']:
2294
                contact = pc(portal_type="Contact",
2295
                             getFullname=row['CCContact_Fullname'])[0].getObject()
2296
                obj.setCCContact(contact)
2297
            if row['AnalysisProfile_title']:
2298
                profile = pc(portal_type="AnalysisProfile",
2299
                             title=row['AnalysisProfile_title'].getObject())
2300
                obj.setProfile(profile)
2301
            if row['ARTemplate_title']:
2302
                template = pc(portal_type="ARTemplate",
2303
                             title=row['ARTemplate_title'])[0].getObject()
2304
                obj.setProfile(template)
2305
2306
            obj.unmarkCreationFlag()
2307
2308
            self.load_analyses(obj)
2309
2310
2311
class Invoice_Batches(WorksheetImporter):
2312
2313
    def Import(self):
2314
        folder = self.context.invoices
2315
        for row in self.get_rows(3):
2316
            obj = _createObjectByType("InvoiceBatch", folder, tmpID())
2317
            if not row['title']:
2318
                message = _("InvoiceBatch has no Title")
2319
                raise Exception(t(message))
2320
            if not row['start']:
2321
                message = _("InvoiceBatch has no Start Date")
2322
                raise Exception(t(message))
2323
            if not row['end']:
2324
                message = _("InvoiceBatch has no End Date")
2325
                raise Exception(t(message))
2326
            obj.edit(
2327
                title=row['title'],
2328
                BatchStartDate=row['start'],
2329
                BatchEndDate=row['end'],
2330
            )
2331
            renameAfterCreation(obj)
2332
            notify(ObjectInitializedEvent(obj))
2333