Preservations.Import()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 3
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import datetime
22
import os.path
23
import re
24
25
import transaction
26
from bika.lims import api
27
from bika.lims import bikaMessageFactory as _
28
from bika.lims import logger
29
from senaite.core.idserver import renameAfterCreation
30
from bika.lims.interfaces import ISetupDataSetList
31
from bika.lims.utils import getFromString
32
from senaite.core.i18n import translate as t
33
from bika.lims.utils import tmpID
34
from bika.lims.utils import to_unicode
35
from bika.lims.utils import to_utf8
36
from bika.lims.utils.analysis import create_analysis
37
from pkg_resources import resource_filename
38
from Products.Archetypes.event import ObjectInitializedEvent
39
from Products.CMFCore.utils import getToolByName
40
from Products.CMFPlone.utils import _createObjectByType
41
from Products.CMFPlone.utils import safe_unicode
42
from senaite.core.catalog import CLIENT_CATALOG
43
from senaite.core.catalog import CONTACT_CATALOG
44
from senaite.core.catalog import SENAITE_CATALOG
45
from senaite.core.catalog import SETUP_CATALOG
46
from senaite.core.exportimport.dataimport import SetupDataSetList as SDL
47
from senaite.core.schema.addressfield import BILLING_ADDRESS
48
from senaite.core.schema.addressfield import PHYSICAL_ADDRESS
49
from senaite.core.schema.addressfield import POSTAL_ADDRESS
50
from zope.event import notify
51
from zope.interface import implements
52
53
UID_CATALOG = "uid_catalog"
54
55
56
def get_addresses_from_row(row):
57
    """Fills the address fields for the specified object if allowed:
58
    PhysicalAddress, PostalAddress, CountryState, BillingAddress
59
    """
60
    types = [PHYSICAL_ADDRESS, POSTAL_ADDRESS, BILLING_ADDRESS]
61
    keys = ["Address", "City", "Zip", "Country"]
62
63
    address_list = []
64
    for address_type in types:
65
        address_item = {"type": address_type}
66
        for key in keys:
67
            field_name = "%s_%s" % (address_type.capitalize(), key)
68
            value = str(row.get(field_name, ""))
69
            if value:
70
                address_item.update({key.lower(): value})
71
        if len(address_item.keys()) > 1:
72
            address_list.append(address_item)
73
    return address_list
74
75
76
def lookup(context, portal_type, **kwargs):
77
    at = getToolByName(context, 'archetype_tool')
78
    catalog = at.catalog_map.get(portal_type, [None])[0] or UID_CATALOG
79
    catalog = getToolByName(context, catalog)
80
    kwargs['portal_type'] = portal_type
81
    return catalog(**kwargs)[0].getObject()
82
83
84
def check_for_required_columns(name, data, required):
85
    for column in required:
86
        if not data.get(column, None):
87
            message = _("%s has no '%s' column." % (name, column))
88
            raise Exception(t(message))
89
90
91
def Float(thing):
92
    try:
93
        f = float(thing)
94
    except ValueError:
95
        f = 0.0
96
    return f
97
98
99
def read_file(path):
100
    if os.path.isfile(path):
101
        return open(path, "rb").read()
102
    allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt',
103
                   'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt']
104
    allowed_ext += [e.upper() for e in allowed_ext]
105
    for e in allowed_ext:
106
        out = '%s.%s' % (path, e)
107
        if os.path.isfile(out):
108
            return open(out, "rb").read()
109
    raise IOError("File not found: %s. Allowed extensions: %s" %
110
                  (path, ','.join(allowed_ext)))
111
112
113
class SetupDataSetList(SDL):
114
115
    implements(ISetupDataSetList)
116
117
    def __call__(self):
118
        return SDL.__call__(self, projectname="bika.lims")
119
120
121
class WorksheetImporter(object):
122
123
    """Use this as a base, for normal tabular data sheet imports.
124
    """
125
126
    def __init__(self, context):
127
        self.adapter_context = context
128
129
    def __call__(self, lsd, workbook, dataset_project, dataset_name):
130
        self.lsd = lsd
131
        self.context = lsd.context
132
        self.workbook = workbook
133
        self.sheetname = self.__class__.__name__.replace("_", " ")
134
        try:
135
            self.worksheet = workbook[self.sheetname]
136
        except KeyError:
137
            self.worksheet = None
138
        self.dataset_project = dataset_project
139
        self.dataset_name = dataset_name
140
        if self.worksheet:
141
            logger.info("Loading {0}.{1}: {2}".format(
142
                self.dataset_project, self.dataset_name, self.sheetname))
143
            try:
144
                self.Import()
145
            except IOError:
146
                # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/
147
                # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from
148
                # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624
149
                warning = "Error while loading attached file from %s. The file will not be uploaded into the system."
150
                logger.warning(warning, self.sheetname)
151
                self.context.plone_utils.addPortalMessage("Error while loading some attached files. "
152
                                                          "The files weren't uploaded into the system.")
153
        else:
154
            logger.info("No records found: '{0}'".format(self.sheetname))
155
156
    def get_rows(self, startrow=3, worksheet=None):
157
        """Returns a generator for all rows in a sheet.
158
           Each row contains a dictionary where the key is the value of the
159
           first row of the sheet for each column.
160
           The data values are returned in utf-8 format.
161
           Starts to consume data from startrow
162
        """
163
164
        headers = []
165
        row_nr = 0
166
        worksheet = worksheet if worksheet else self.worksheet
167
        for row in worksheet.rows:  # .iter_rows():
168
            row_nr += 1
169
            if row_nr == 1:
170
                # headers = [cell.internal_value for cell in row]
171
                headers = [cell.value for cell in row]
172
                continue
173
            if row_nr % 1000 == 0:
174
                transaction.savepoint()
175
            if row_nr <= startrow:
176
                continue
177
            # row = [_c(cell.internal_value).decode('utf-8') for cell in row]
178
            new_row = []
179
            for cell in row:
180
                value = cell.value
181
                if value is None:
182
                    value = ''
183
                if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
184
                    value = value.encode('utf-8')
185
                # Strip any space, \t, \n, or \r characters from the left-hand
186
                # side, right-hand side, or both sides of the string
187
                if isinstance(value, str):
188
                    value = value.strip(' \t\n\r')
189
                new_row.append(value)
190
            row = dict(zip(headers, new_row))
191
192
            # parse out addresses
193
            for add_type in ['Physical', 'Postal', 'Billing']:
194
                row[add_type] = {}
195
                if add_type + "_Address" in row:
196
                    for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']:
197
                        row[add_type][key] = str(
198
                            row.get("%s_%s" % (add_type, key), ''))
199
200
            yield row
201
202
    def get_file_data(self, filename):
203
        if filename:
204
            try:
205
                path = resource_filename(
206
                    self.dataset_project,
207
                    "setupdata/%s/%s" % (self.dataset_name, filename))
208
                file_data = open(path, "rb").read()
209
            except Exception:
210
                file_data = None
211
        else:
212
            file_data = None
213
        return file_data
214
215
    def to_bool(self, value):
216
        """ Converts a sheet string value to a boolean value.
217
            Needed because of utf-8 conversions
218
        """
219
220
        try:
221
            value = value.lower()
222
        except Exception:
223
            pass
224
        try:
225
            value = value.encode('utf-8')
226
        except Exception:
227
            pass
228
        try:
229
            value = int(value)
230
        except Exception:
231
            pass
232
        if value in ('true', 1):
233
            return True
234
        else:
235
            return False
236
237
    def to_int(self, value, default=0):
238
        """ Converts a value o a int. Returns default if the conversion fails.
239
        """
240
        try:
241
            return int(value)
242
        except ValueError:
243
            try:
244
                return int(default)
245
            except Exception:
246
                return 0
247
248
    def to_float(self, value, default=0):
249
        """ Converts a value o a float. Returns default if the conversion fails.
250
        """
251
        try:
252
            return float(value)
253
        except ValueError:
254
            try:
255
                return float(default)
256
            except Exception:
257
                return 0.0
258
259
    def defer(self, **kwargs):
260
        self.lsd.deferred.append(kwargs)
261
262
    def Import(self):
263
        """ Override this.
264
        XXX Simple generic sheet importer
265
        """
266
267
    def fill_addressfields(self, row, obj):
268
        """ Fills the address fields for the specified object if allowed:
269
            PhysicalAddress, PostalAddress, CountryState, BillingAddress
270
        """
271
        addresses = {}
272
        for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']:
273
            addresses[add_type] = {}
274
            for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']:
275
                addresses[add_type][key.lower()] = str(
276
                    row.get("%s_%s" % (add_type, key), ''))
277
278
        if addresses['CountryState']['country'] == '' \
279
                and addresses['CountryState']['state'] == '':
280
            addresses['CountryState']['country'] = addresses['Physical']['country']
281
            addresses['CountryState']['state'] = addresses['Physical']['state']
282
283
        if hasattr(obj, 'setPhysicalAddress'):
284
            obj.setPhysicalAddress(addresses['Physical'])
285
        if hasattr(obj, 'setPostalAddress'):
286
            obj.setPostalAddress(addresses['Postal'])
287
        if hasattr(obj, 'setCountryState'):
288
            obj.setCountryState(addresses['CountryState'])
289
        if hasattr(obj, 'setBillingAddress'):
290
            obj.setBillingAddress(addresses['Billing'])
291
292
    def fill_contactfields(self, row, obj):
293
        """ Fills the contact fields for the specified object if allowed:
294
            EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone,
295
            MobilePhone
296
        """
297
        fieldnames = ['EmailAddress',
298
                      'Phone',
299
                      'Fax',
300
                      'BusinessPhone',
301
                      'BusinessFax',
302
                      'HomePhone',
303
                      'MobilePhone',
304
                      ]
305
        schema = obj.Schema()
306
        fields = dict([(field.getName(), field) for field in schema.fields()])
307
        for fieldname in fieldnames:
308
            try:
309
                field = fields[fieldname]
310
            except Exception:
311
                if fieldname in row:
312
                    logger.info("Address field %s not found on %s" %
313
                                (fieldname, obj))
314
                continue
315
            value = row.get(fieldname, '')
316
            field.set(obj, value)
317
318
    def get_object(self, catalog, portal_type, title=None, **kwargs):
319
        """This will return an object from the catalog.
320
        Logs a message and returns None if no object or multiple objects found.
321
        All keyword arguments are passed verbatim to the contentFilter
322
        """
323
        if not title and not kwargs:
324
            return None
325
        contentFilter = {"portal_type": portal_type}
326
        if title:
327
            contentFilter['title'] = to_unicode(title)
328
        contentFilter.update(kwargs)
329
        brains = catalog(contentFilter)
330
        if len(brains) > 1:
331
            logger.info("More than one object found for %s" % contentFilter)
332
            return None
333
        elif len(brains) == 0:
334
            if portal_type == 'AnalysisService':
335
                brains = catalog(portal_type=portal_type, getKeyword=title)
336
                if brains:
337
                    return brains[0].getObject()
338
            logger.info("No objects found for %s" % contentFilter)
339
            return None
340
        else:
341
            return brains[0].getObject()
342
343
344
class Sub_Groups(WorksheetImporter):
345
346
    def Import(self):
347
348
        container = self.context.setup.subgroups
349
        for row in self.get_rows(3):
350
            title = row.get("title")
351
            if not title:
352
                continue
353
            api.create(container, "SubGroup",
354
                       title=title, description=row.get("description"),
355
                       SortKey=row.get("SortKey"))
356
357
358
class Lab_Information(WorksheetImporter):
359
360
    def Import(self):
361
        laboratory = self.context.bika_setup.laboratory
362
        values = {}
363
        for row in self.get_rows(3):
364
            values[row['Field']] = row['Value']
365
366
        if values['AccreditationBodyLogo']:
367
            path = resource_filename(
368
                self.dataset_project,
369
                "setupdata/%s/%s" % (self.dataset_name,
370
                                     values['AccreditationBodyLogo']))
371
            try:
372
                file_data = read_file(path)
373
            except Exception as msg:
374
                file_data = None
375
                logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
376
        else:
377
            file_data = None
378
379
        laboratory.edit(
380
            Name=values['Name'],
381
            LabURL=values['LabURL'],
382
            Confidence=values['Confidence'],
383
            LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']),
384
            AccreditationBodyLong=values['AccreditationBodyLong'],
385
            AccreditationBody=values['AccreditationBody'],
386
            AccreditationBodyURL=values['AccreditationBodyURL'],
387
            Accreditation=values['Accreditation'],
388
            AccreditationReference=values['AccreditationReference'],
389
            AccreditationBodyLogo=file_data,
390
            TaxNumber=values['TaxNumber'],
391
        )
392
        self.fill_contactfields(values, laboratory)
393
        self.fill_addressfields(values, laboratory)
394
395
396
class Lab_Contacts(WorksheetImporter):
397
398
    def Import(self):
399
        folder = self.context.bika_setup.bika_labcontacts
400
        portal_groups = getToolByName(self.context, 'portal_groups')
401
        portal_registration = getToolByName(
402
            self.context, 'portal_registration')
403
        rownum = 2
404
        for row in self.get_rows(3):
405
            rownum += 1
406
            if not row.get('Firstname', None):
407
                continue
408
409
            # Username already exists?
410
            username = row.get('Username', '')
411
            fullname = ('%s %s' %
412
                        (row['Firstname'], row.get('Surname', ''))).strip()
413
            if username:
414
                username = safe_unicode(username).encode('utf-8')
415
                bsc = getToolByName(self.context, SETUP_CATALOG)
416
                exists = [o.getObject() for o in bsc(
417
                    portal_type="LabContact") if o.getObject().getUsername() == username]
418
                if exists:
419
                    error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format(
420
                        username, str(rownum))
421
                    logger.error(error)
422
                    continue
423
424
            # Is there a signature file defined? Try to get the file first.
425
            signature = None
426
            if row.get('Signature'):
427
                signature = self.get_file_data(row['Signature'])
428
                if not signature:
429
                    warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format(
430
                        row['Signature'], username)
431
                    logger.warning(warning)
432
433
            obj = _createObjectByType("LabContact", folder, tmpID())
434
            obj.edit(
435
                title=fullname,
436
                Salutation=row.get('Salutation', ''),
437
                Firstname=row['Firstname'],
438
                Surname=row.get('Surname', ''),
439
                JobTitle=row.get('JobTitle', ''),
440
                Username=row.get('Username', ''),
441
                Signature=signature
442
            )
443
            obj.unmarkCreationFlag()
444
            renameAfterCreation(obj)
445
            notify(ObjectInitializedEvent(obj))
446
            self.fill_contactfields(row, obj)
447
            self.fill_addressfields(row, obj)
448
449
            if row['Department_title']:
450
                self.defer(src_obj=obj,
451
                           src_field='Department',
452
                           dest_catalog=SETUP_CATALOG,
453
                           dest_query={'portal_type': 'Department',
454
                                       'title': row['Department_title']}
455
                           )
456
457
            # Create Plone user
458
            if not row['Username']:
459
                warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(
460
                    fullname, str(rownum))
461
                logger.warning(warn)
462
            if not row.get('EmailAddress', ''):
463
                warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(
464
                    fullname, str(rownum))
465
                logger.warning(warn)
466
467
            if (row['Username'] and row.get('EmailAddress', '')):
468
                username = safe_unicode(row['Username']).encode('utf-8')
469
                passw = row['Password']
470
                if not passw:
471
                    passw = username
472
                    warn = ("Lab Contact: No password defined for user '{0}' in row {1}."
473
                            " Password established automatically to '{2}'").format(username, str(rownum), passw)
474
                    logger.warning(warn)
475
476
                try:
477
                    member = portal_registration.addMember(
478
                        username,
479
                        passw,
480
                        properties={
481
                            'username': username,
482
                            'email': row['EmailAddress'],
483
                            'fullname': fullname}
484
                    )
485
                except Exception as msg:
486
                    logger.error(
487
                        "Client Contact: Error adding user (%s): %s" % (msg, username))
488
                    continue
489
490
                groups = row.get('Groups', '')
491
                if not groups:
492
                    warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format(
493
                        username, str(rownum))
494
                    logger.warning(warn)
495
                    groups = 'Analysts'
496
497
                group_ids = [g.strip() for g in groups.split(',')]
498
                # Add user to all specified groups
499
                for group_id in group_ids:
500
                    group = portal_groups.getGroupById(group_id)
501
                    if group:
502
                        group.addMember(username)
503
                roles = row.get('Roles', '')
504
                if roles:
505
                    role_ids = [r.strip() for r in roles.split(',')]
506
                    # Add user to all specified roles
507
                    for role_id in role_ids:
508
                        member._addRole(role_id)
509
                # If user is in LabManagers, add Owner local role on clients
510
                # folder
511
                if 'LabManager' in group_ids:
512
                    self.context.clients.manage_setLocalRoles(
513
                        username, ['Owner', ])
514
515
        # Now we have the lab contacts registered, try to assign the managers
516
        # to each department if required
517
        sheet = self.workbook["Lab Departments"]
518
        bsc = getToolByName(self.context, SETUP_CATALOG)
519
        for row in self.get_rows(3, sheet):
520
            if row['title'] and row['LabContact_Username']:
521
                dept = self.get_object(bsc, "Department", row.get('title'))
522
                if dept and not dept.getManager():
523
                    username = safe_unicode(
524
                        row['LabContact_Username']).encode('utf-8')
525
                    exists = [o.getObject() for o in bsc(
526
                        portal_type="LabContact") if o.getObject().getUsername() == username]
527
                    if exists:
528
                        dept.setManager(exists[0].UID())
529
530
531
class Lab_Departments(WorksheetImporter):
532
    """Import Lab Departments
533
    """
534
535
    def Import(self):
536
        setup = api.get_senaite_setup()
537
        container = setup.departments
538
        cat = getToolByName(self.context, CONTACT_CATALOG)
539
        lab_contacts = [o.getObject() for o in cat(portal_type="LabContact")]
540
        for row in self.get_rows(3):
541
            title = row.get("title")
542
            description = row.get("description")
543
            username = row.get("LabContact_Username")
544
            manager = None
545
546
            if not title:
547
                continue
548
549
            obj = api.create(container,
550
                             "Department",
551
                             title=title,
552
                             description=description)
553
554
            for contact in lab_contacts:
555
                if contact.getUsername() == username:
556
                    manager = contact
557
                    break
558
            if manager:
559
                obj.setManager(manager.UID())
560
            else:
561
                message = "Department: lookup of '%s' in LabContacts" \
562
                    "/Username failed." % username
563
                logger.info(message)
564
565
566
class Lab_Products(WorksheetImporter):
567
568
    def Import(self):
569
        # Refer to the default folder
570
        container = self.context.setup.labproducts
571
        # Iterate through the rows
572
        for row in self.get_rows(3):
573
            title = row.get("title")
574
            if not title:
575
                continue
576
            api.create(container, "LabProduct",
577
                       title=title, description=row.get("description"))
578
579
580
class Clients(WorksheetImporter):
581
582
    def Import(self):
583
        folder = self.context.clients
584
        for row in self.get_rows(3):
585
            obj = _createObjectByType("Client", folder, tmpID())
586
            if not row['Name']:
587
                message = "Client %s has no Name"
588
                raise Exception(message)
589
            if not row['ClientID']:
590
                message = "Client %s has no Client ID"
591
                raise Exception(message)
592
            obj.edit(Name=row['Name'],
593
                     ClientID=row['ClientID'],
594
                     MemberDiscountApplies=row[
595
                         'MemberDiscountApplies'] and True or False,
596
                     BulkDiscount=row['BulkDiscount'] and True or False,
597
                     TaxNumber=row.get('TaxNumber', ''),
598
                     AccountNumber=row.get('AccountNumber', '')
599
                     )
600
            self.fill_contactfields(row, obj)
601
            self.fill_addressfields(row, obj)
602
            obj.unmarkCreationFlag()
603
            renameAfterCreation(obj)
604
            notify(ObjectInitializedEvent(obj))
605
606
607
class Client_Contacts(WorksheetImporter):
608
609
    def Import(self):
610
        portal_groups = getToolByName(self.context, 'portal_groups')
611
        cat = api.get_tool(CLIENT_CATALOG)
612
        for row in self.get_rows(3):
613
            client = cat(portal_type="Client",
614
                         getName=row['Client_title'])
615
            if len(client) == 0:
616
                client_contact = "%(Firstname)s %(Surname)s" % row
617
                error = "Client invalid: '%s'. The Client Contact %s will not be uploaded."
618
                logger.error(error, row['Client_title'], client_contact)
619
                continue
620
            client = client[0].getObject()
621
            contact = _createObjectByType("Contact", client, tmpID())
622
            fullname = "%(Firstname)s %(Surname)s" % row
623
            pub_pref = [x.strip() for x in
624
                        row.get('PublicationPreference', '').split(",")]
625
            contact.edit(
626
                Salutation=row.get('Salutation', ''),
627
                Firstname=row.get('Firstname', ''),
628
                Surname=row.get('Surname', ''),
629
                Username=row['Username'],
630
                JobTitle=row.get('JobTitle', ''),
631
                Department=row.get('Department', ''),
632
                PublicationPreference=pub_pref,
633
            )
634
            self.fill_contactfields(row, contact)
635
            self.fill_addressfields(row, contact)
636
            contact.unmarkCreationFlag()
637
            renameAfterCreation(contact)
638
            notify(ObjectInitializedEvent(contact))
639
            # CC Contacts
640
            if row['CCContacts']:
641
                names = [x.strip() for x in row['CCContacts'].split(",")]
642
                for _fullname in names:
643
                    self.defer(src_obj=contact,
644
                               src_field='CCContact',
645
                               dest_catalog=CONTACT_CATALOG,
646
                               dest_query={'portal_type': 'Contact',
647
                                           'getFullname': _fullname}
648
                               )
649
            # Create Plone user
650
            username = safe_unicode(row['Username']).encode('utf-8')
651
            password = safe_unicode(row['Password']).encode('utf-8')
652
            if (username):
653
                try:
654
                    self.context.portal_registration.addMember(
655
                        username,
656
                        password,
657
                        properties={
658
                            'username': username,
659
                            'email': row['EmailAddress'],
660
                            'fullname': fullname}
661
                    )
662
                except Exception as msg:
663
                    logger.info("Error adding user (%s): %s" % (msg, username))
664
                contact.aq_parent.manage_setLocalRoles(
665
                    row['Username'], ['Owner', ])
666
                contact.reindexObject()
667
                # add user to Clients group
668
                group = portal_groups.getGroupById('Clients')
669
                group.addMember(username)
670
671
672
class Container_Types(WorksheetImporter):
673
674
    def Import(self):
675
        container = self.context.setup.containertypes
676
        for row in self.get_rows(3):
677
            title = row.get("title")
678
            if not title:
679
                continue
680
681
            api.create(container, "ContainerType",
682
                       title=title, description=row.get("description"))
683
684
685
class Preservations(WorksheetImporter):
686
687
    def Import(self):
688
        container = self.context.setup.samplepreservations
689
        for row in self.get_rows(3):
690
            title = row.get("title")
691
            if not title:
692
                continue
693
694
            api.create(container, "SamplePreservation",
695
                       title=title, description=row.get("description"))
696
697
698
class Containers(WorksheetImporter):
699
700
    def Import(self):
701
        bsc = getToolByName(self.context, SETUP_CATALOG)
702
        container = self.context.setup.samplecontainers
703
        for row in self.get_rows(3):
704
            title = row.get("title")
705
            if not title:
706
                continue
707
708
            description = row.get("description", "")
709
            capacity = row.get("Capacity", 0)
710
            pre_preserved = self.to_bool(row["PrePreserved"])
711
            containertype = None
712
            container_type_title = row.get("ContainerType_title", "")
713
714
            if container_type_title:
715
                containertype = self.get_object(
716
                    bsc, "ContainerType", container_type_title)
717
718
            api.create(container, "SampleContainer",
719
                       title=title,
720
                       description=description,
721
                       capacity=capacity,
722
                       pre_preserved=pre_preserved,
723
                       containertype=containertype)
724
725
726
class Suppliers(WorksheetImporter):
727
728
    def Import(self):
729
        container = self.context.setup.suppliers
730
        for row in self.get_rows(3):
731
            title = row.get("Name")
732
            if not title:
733
                continue
734
735
            api.create(container, "Supplier",
736
                       title=title,
737
                       description=row.get("description"),
738
                       tax_number=row.get("TaxNumber"),
739
                       phone=row.get("Phone", ""),
740
                       fax=row.get("Fax", ""),
741
                       email=row.get("EmailAddress", ""),
742
                       account_type=row.get("AccountType", {}),
743
                       account_name=row.get("AccountName", {}),
744
                       account_number=row.get("AccountNumber", ''),
745
                       bank_name=row.get("BankName", ""),
746
                       bank_branch=row.get("BankBranch", ""),
747
                       swift_code=row.get("SWIFTcode", ""),
748
                       iban=row.get("IBN", ""),
749
                       nib=row.get("NIB", ""),
750
                       website=row.get("Website", ""),
751
                       address=get_addresses_from_row(row))
752
753
754
class Supplier_Contacts(WorksheetImporter):
755
756
    def Import(self):
757
        bsc = getToolByName(self.context, SETUP_CATALOG)
758
        for row in self.get_rows(3):
759
            if not row['Supplier_Name']:
760
                continue
761
            if not row['Firstname']:
762
                continue
763
            folder = bsc(portal_type="Supplier",
764
                         Title=row['Supplier_Name'])
765
            if not folder:
766
                continue
767
            folder = folder[0].getObject()
768
            obj = _createObjectByType("SupplierContact", folder, tmpID())
769
            obj.edit(
770
                Firstname=row['Firstname'],
771
                Surname=row.get('Surname', ''),
772
                Username=row.get('Username')
773
            )
774
            self.fill_contactfields(row, obj)
775
            self.fill_addressfields(row, obj)
776
            obj.unmarkCreationFlag()
777
            renameAfterCreation(obj)
778
            notify(ObjectInitializedEvent(obj))
779
780
781
class Manufacturers(WorksheetImporter):
782
783
    def Import(self):
784
        container = self.context.setup.manufacturers
785
        for row in self.get_rows(3):
786
            title = row.get("title")
787
            if not title:
788
                continue
789
            api.create(container, "Manufacturer",
790
                       title=title, description=row.get("description"))
791
792
793
class Instrument_Types(WorksheetImporter):
794
795
    def Import(self):
796
        container = self.context.setup.instrumenttypes
797
        for row in self.get_rows(3):
798
            title = row.get("title")
799
            if not title:
800
                continue
801
            api.create(container, "InstrumentType",
802
                       title=title, description=row.get("description"))
803
804
805
class Instruments(WorksheetImporter):
806
807
    def Import(self):
808
        folder = self.context.bika_setup.bika_instruments
809
        bsc = getToolByName(self.context, SETUP_CATALOG)
810
        for row in self.get_rows(3):
811
            if ('Type' not in row
812
                or 'Supplier' not in row
813
                    or 'Brand' not in row):
814
                logger.info(
815
                    "Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title', ''))
816
                continue
817
818
            obj = _createObjectByType("Instrument", folder, tmpID())
819
820
            obj.edit(
821
                title=row.get('title', ''),
822
                AssetNumber=row.get('assetnumber', ''),
823
                description=row.get('description', ''),
824
                Type=row.get('Type', ''),
825
                Brand=row.get('Brand', ''),
826
                Model=row.get('Model', ''),
827
                SerialNo=row.get('SerialNo', ''),
828
                DataInterface=row.get('DataInterface', ''),
829
                Location=row.get('Location', ''),
830
                InstallationDate=row.get('Instalationdate', ''),
831
                UserManualID=row.get('UserManualID', ''),
832
            )
833
            instrumenttype = self.get_object(
834
                bsc, 'InstrumentType', title=row.get('Type'))
835
            manufacturer = self.get_object(
836
                bsc, 'Manufacturer', title=row.get('Brand'))
837
            supplier = self.get_object(
838
                bsc, 'Supplier', title=row.get('Supplier', ''))
839
            method = self.get_object(bsc, 'Method', title=row.get('Method'))
840
            obj.setInstrumentType(instrumenttype)
841
            obj.setManufacturer(manufacturer)
842
            obj.setSupplier(supplier)
843
            if method:
844
                obj.setMethods([method])
845
                obj.setMethod(method)
846
847
            # Attaching the instrument's photo
848 View Code Duplication
            if row.get('Photo', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
849
                path = resource_filename(
850
                    self.dataset_project,
851
                    "setupdata/%s/%s" % (self.dataset_name,
852
                                         row['Photo'])
853
                )
854
                try:
855
                    file_data = read_file(path)
856
                    obj.setPhoto(file_data)
857
                except Exception as msg:
858
                    file_data = None
859
                    logger.warning(
860
                        msg[0] + " Error on sheet: " + self.sheetname)
861
862
            # Attaching the Installation Certificate if exists
863 View Code Duplication
            if row.get('InstalationCertificate', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
864
                path = resource_filename(
865
                    self.dataset_project,
866
                    "setupdata/%s/%s" % (self.dataset_name,
867
                                         row['InstalationCertificate'])
868
                )
869
                try:
870
                    file_data = read_file(path)
871
                    obj.setInstallationCertificate(file_data)
872
                except Exception as msg:
873
                    logger.warning(
874
                        msg[0] + " Error on sheet: " + self.sheetname)
875
876
            # Attaching the Instrument's manual if exists
877
            if row.get('UserManualFile', None):
878
                row_dict = {'DocumentID': row.get('UserManualID', 'manual'),
879
                            'DocumentVersion': '',
880
                            'DocumentLocation': '',
881
                            'DocumentType': 'Manual',
882
                            'File': row.get('UserManualFile', None)
883
                            }
884
                addDocument(self, row_dict, obj)
885
            obj.unmarkCreationFlag()
886
            renameAfterCreation(obj)
887
            notify(ObjectInitializedEvent(obj))
888
889
890 View Code Duplication
class Instrument_Validations(WorksheetImporter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
891
892
    def Import(self):
893
        bsc = getToolByName(self.context, SETUP_CATALOG)
894
        for row in self.get_rows(3):
895
            if not row.get('instrument', None) or not row.get('title', None):
896
                continue
897
898
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
899
            if folder:
900
                obj = _createObjectByType(
901
                    "InstrumentValidation", folder, tmpID())
902
                obj.edit(
903
                    title=row['title'],
904
                    DownFrom=row.get('downfrom', ''),
905
                    DownTo=row.get('downto', ''),
906
                    Validator=row.get('validator', ''),
907
                    Considerations=row.get('considerations', ''),
908
                    WorkPerformed=row.get('workperformed', ''),
909
                    Remarks=row.get('remarks', ''),
910
                    DateIssued=row.get('DateIssued', ''),
911
                    ReportID=row.get('ReportID', '')
912
                )
913
                # Getting lab contacts
914
                bsc = getToolByName(self.context, SETUP_CATALOG)
915
                lab_contacts = [o.getObject() for o in bsc(
916
                    portal_type="LabContact", is_active=True)]
917
                for contact in lab_contacts:
918
                    if contact.getFullname() == row.get('Worker', ''):
919
                        obj.setWorker(contact.UID())
920
                obj.unmarkCreationFlag()
921
                renameAfterCreation(obj)
922
                notify(ObjectInitializedEvent(obj))
923
924
925 View Code Duplication
class Instrument_Calibrations(WorksheetImporter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
926
927
    def Import(self):
928
        bsc = getToolByName(self.context, SETUP_CATALOG)
929
        for row in self.get_rows(3):
930
            if not row.get('instrument', None) or not row.get('title', None):
931
                continue
932
933
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
934
            if folder:
935
                obj = _createObjectByType(
936
                    "InstrumentCalibration", folder, tmpID())
937
                obj.edit(
938
                    title=row['title'],
939
                    DownFrom=row.get('downfrom', ''),
940
                    DownTo=row.get('downto', ''),
941
                    Calibrator=row.get('calibrator', ''),
942
                    Considerations=row.get('considerations', ''),
943
                    WorkPerformed=row.get('workperformed', ''),
944
                    Remarks=row.get('remarks', ''),
945
                    DateIssued=row.get('DateIssued', ''),
946
                    ReportID=row.get('ReportID', '')
947
                )
948
                # Gettinginstrument lab contacts
949
                bsc = getToolByName(self.context, SETUP_CATALOG)
950
                lab_contacts = [o.getObject() for o in bsc(
951
                    portal_type="LabContact", nactive_state='active')]
952
                for contact in lab_contacts:
953
                    if contact.getFullname() == row.get('Worker', ''):
954
                        obj.setWorker(contact.UID())
955
                obj.unmarkCreationFlag()
956
                renameAfterCreation(obj)
957
                notify(ObjectInitializedEvent(obj))
958
959
960
class Instrument_Certifications(WorksheetImporter):
961
962
    def Import(self):
963
        bsc = getToolByName(self.context, SETUP_CATALOG)
964
        for row in self.get_rows(3):
965
            if not row['instrument'] or not row['title']:
966
                continue
967
968
            folder = self.get_object(
969
                bsc, 'Instrument', row.get('instrument', ''))
970
            if folder:
971
                obj = _createObjectByType(
972
                    "InstrumentCertification", folder, tmpID())
973
                today = datetime.date.today()
974
                certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \
975
                    if row.get('validto', '') == '' else row.get('validto')
976
                certificate_start_date = today.strftime('%d/%m/%Y') \
977
                    if row.get('validfrom', '') == '' else row.get('validfrom')
978
                obj.edit(
979
                    title=row['title'],
980
                    AssetNumber=row.get('assetnumber', ''),
981
                    Date=row.get('date', ''),
982
                    ValidFrom=certificate_start_date,
983
                    ValidTo=certificate_expire_date,
984
                    Agency=row.get('agency', ''),
985
                    Remarks=row.get('remarks', ''),
986
                )
987
                # Attaching the Report Certificate if exists
988 View Code Duplication
                if row.get('report', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
989
                    path = resource_filename(
990
                        self.dataset_project,
991
                        "setupdata/%s/%s" % (self.dataset_name,
992
                                             row['report'])
993
                    )
994
                    try:
995
                        file_data = read_file(path)
996
                        obj.setDocument(file_data)
997
                    except Exception as msg:
998
                        file_data = None
999
                        logger.warning(
1000
                            msg[0] + " Error on sheet: " + self.sheetname)
1001
1002
                # Getting lab contacts
1003
                bsc = getToolByName(self.context, SETUP_CATALOG)
1004
                lab_contacts = [o.getObject() for o in bsc(
1005
                    portal_type="LabContact", nactive_state='active')]
1006
                for contact in lab_contacts:
1007
                    if contact.getFullname() == row.get('preparedby', ''):
1008
                        obj.setPreparator(contact.UID())
1009
                    if contact.getFullname() == row.get('approvedby', ''):
1010
                        obj.setValidator(contact.UID())
1011
                obj.unmarkCreationFlag()
1012
                renameAfterCreation(obj)
1013
                notify(ObjectInitializedEvent(obj))
1014
1015
1016
class Instrument_Documents(WorksheetImporter):
1017
1018
    def Import(self):
1019
        bsc = getToolByName(self.context, SETUP_CATALOG)
1020
        for row in self.get_rows(3):
1021
            if not row.get('instrument', ''):
1022
                continue
1023
            folder = self.get_object(
1024
                bsc, 'Instrument', row.get('instrument', ''))
1025
            addDocument(self, row, folder)
1026
1027
1028
def addDocument(self, row_dict, folder):
1029
    """
1030
    This function adds a multifile object to the instrument folder
1031
    :param row_dict: the dictionary which contains the document information
1032
    :param folder: the instrument object
1033
    """
1034
    if folder:
1035
        # This content type need a file
1036
        if row_dict.get('File', None):
1037
            path = resource_filename(
1038
                self.dataset_project,
1039
                "setupdata/%s/%s" % (self.dataset_name,
1040
                                     row_dict['File'])
1041
            )
1042
            try:
1043
                file_data = read_file(path)
1044
            except Exception as msg:
1045
                file_data = None
1046
                logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
1047
1048
            # Obtain all created instrument documents content type
1049
            catalog = getToolByName(self.context, SETUP_CATALOG)
1050
            documents_brains = catalog.searchResults(
1051
                {'portal_type': 'Multifile'})
1052
            # If a the new document has the same DocumentID as a created document, this object won't be created.
1053
            idAlreadyInUse = False
1054
            for item in documents_brains:
1055
                if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''):
1056
                    warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \
1057
                              "the file hasn't been upload." % (row_dict.get(
1058
                                  'DocumentID', ''), row_dict.get('instrument', ''))
1059
                    self.context.plone_utils.addPortalMessage(warning)
1060
                    idAlreadyInUse = True
1061
            if not idAlreadyInUse:
1062
                obj = _createObjectByType("Multifile", folder, tmpID())
1063
                obj.edit(
1064
                    DocumentID=row_dict.get('DocumentID', ''),
1065
                    DocumentVersion=row_dict.get('DocumentVersion', ''),
1066
                    DocumentLocation=row_dict.get('DocumentLocation', ''),
1067
                    DocumentType=row_dict.get('DocumentType', ''),
1068
                    File=file_data
1069
                )
1070
                obj.unmarkCreationFlag()
1071
                renameAfterCreation(obj)
1072
                notify(ObjectInitializedEvent(obj))
1073
1074
1075
class Instrument_Maintenance_Tasks(WorksheetImporter):
1076
1077
    def Import(self):
1078
        bsc = getToolByName(self.context, SETUP_CATALOG)
1079
        for row in self.get_rows(3):
1080
            if not row['instrument'] or not row['title'] or not row['type']:
1081
                continue
1082
1083
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
1084
            if folder:
1085
                obj = _createObjectByType(
1086
                    "InstrumentMaintenanceTask", folder, tmpID())
1087
                try:
1088
                    cost = "%.2f" % (row.get('cost', 0))
1089
                except Exception:
1090
                    cost = row.get('cost', '0.0')
1091
1092
                obj.edit(
1093
                    title=row['title'],
1094
                    description=row['description'],
1095
                    Type=row['type'],
1096
                    DownFrom=row.get('downfrom', ''),
1097
                    DownTo=row.get('downto', ''),
1098
                    Maintainer=row.get('maintaner', ''),
1099
                    Considerations=row.get('considerations', ''),
1100
                    WorkPerformed=row.get('workperformed', ''),
1101
                    Remarks=row.get('remarks', ''),
1102
                    Cost=cost,
1103
                    Closed=self.to_bool(row.get('closed'))
1104
                )
1105
                obj.unmarkCreationFlag()
1106
                renameAfterCreation(obj)
1107
                notify(ObjectInitializedEvent(obj))
1108
1109
1110
class Instrument_Schedule(WorksheetImporter):
1111
1112
    def Import(self):
1113
        bsc = getToolByName(self.context, SETUP_CATALOG)
1114
        for row in self.get_rows(3):
1115
            if not row['instrument'] or not row['title'] or not row['type']:
1116
                continue
1117
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
1118
            if folder:
1119
                obj = _createObjectByType(
1120
                    "InstrumentScheduledTask", folder, tmpID())
1121
                criteria = [
1122
                    {'fromenabled': row.get('date', None) is not None,
1123
                     'fromdate': row.get('date', ''),
1124
                     'repeatenabled': ((row['numrepeats'] and
1125
                                        row['numrepeats'] > 1) or
1126
                                       (row['repeatuntil'] and
1127
                                        len(row['repeatuntil']) > 0)),
1128
                     'repeatunit': row.get('numrepeats', ''),
1129
                     'repeatperiod': row.get('periodicity', ''),
1130
                     'repeatuntilenabled': (row['repeatuntil'] and
1131
                                            len(row['repeatuntil']) > 0),
1132
                     'repeatuntil': row.get('repeatuntil')}
1133
                ]
1134
                obj.edit(
1135
                    title=row['title'],
1136
                    Type=row['type'],
1137
                    ScheduleCriteria=criteria,
1138
                    Considerations=row.get('considerations', ''),
1139
                )
1140
                obj.unmarkCreationFlag()
1141
                renameAfterCreation(obj)
1142
                notify(ObjectInitializedEvent(obj))
1143
1144
1145
class Sample_Matrices(WorksheetImporter):
1146
1147
    def Import(self):
1148
        container = self.context.setup.samplematrices
1149
        for row in self.get_rows(3):
1150
            title = row.get("title")
1151
            if not title:
1152
                continue
1153
            api.create(container, "SampleMatrix",
1154
                       title=title, description=row.get("description"))
1155
1156
1157
class Batch_Labels(WorksheetImporter):
1158
1159
    def Import(self):
1160
        container = self.context.setup.batchlabels
1161
        for row in self.get_rows(3):
1162
            title = row.get("title")
1163
            if not title:
1164
                continue
1165
            api.create(container, "BatchLabel", title=title)
1166
1167
1168
class Sample_Types(WorksheetImporter):
1169
1170
    def Import(self):
1171
        container = self.context.setup.sampletypes
1172
        sc = api.get_tool(SETUP_CATALOG)
1173
1174
        for row in self.get_rows(3):
1175
            title = row.get("title")
1176
            if not title:
1177
                continue
1178
1179
            obj = api.create(container, "SampleType", title=title,
1180
                             description=row.get("description"))
1181
1182
            samplematrix = self.get_object(
1183
                sc, 'SampleMatrix', row.get('SampleMatrix_title'))
1184
            containertype = self.get_object(
1185
                sc, 'ContainerType', row.get('ContainerType_title'))
1186
1187
            if samplematrix:
1188
                obj.setSampleMatrix(samplematrix)
1189
            if containertype:
1190
                obj.setContainerType(containertype)
1191
1192
            obj.setHazardous(self.to_bool(row['Hazardous']))
1193
            obj.setPrefix(row['Prefix'])
1194
            obj.setMinimumVolume(row['MinimumVolume'])
1195
            obj.setRetentionPeriod({
1196
                'days': row['RetentionPeriod'] or 0,
1197
                'hours': 0,
1198
                'minutes': 0})
1199
1200
            samplepoint = self.get_object(sc, 'SamplePoint',
1201
                                          row.get('SamplePoint_title'))
1202
            if samplepoint:
1203
                samplepoint.setSampleTypes([obj, ])
1204
            obj.reindexObject()
1205
1206
1207
class Sample_Points(WorksheetImporter):
1208
1209
    def Import(self):
1210
        setup_folder = self.context.setup.samplepoints
1211
        bsc = getToolByName(self.context, SETUP_CATALOG)
1212
        cat = api.get_tool(CLIENT_CATALOG)
1213
        for row in self.get_rows(3):
1214
            if not row['title']:
1215
                continue
1216
            if row['Client_title']:
1217
                client_title = row['Client_title']
1218
                client = cat(portal_type="Client", getName=client_title)
1219
                if len(client) == 0:
1220
                    error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded."
1221
                    logger.error(error, row['title'], client_title)
1222
                    continue
1223
                folder = client[0].getObject()
1224
            else:
1225
                folder = setup_folder
1226
1227
            if row['Latitude']:
1228
                logger.log("Ignored SamplePoint Latitude", 'error')
1229
            if row['Longitude']:
1230
                logger.log("Ignored SamplePoint Longitude", 'error')
1231
1232
            obj = api.create(folder, "SamplePoint", title=row['title'],
1233
                             description=row.get('description', ''))
1234
            obj.setComposite(self.to_bool(row["Composite"]))
1235
            obj.setElevation(row["Elevation"])
1236
            sampletype = self.get_object(bsc, 'SampleType',
1237
                                         row.get('SampleType_title'))
1238
            if sampletype:
1239
                obj.setSampleTypes([sampletype, ])
1240
                obj.reindexObject()
1241
1242
1243
class Sample_Point_Sample_Types(WorksheetImporter):
1244
1245
    def Import(self):
1246
        bsc = getToolByName(self.context, SETUP_CATALOG)
1247
        for row in self.get_rows(3):
1248
            sampletype = self.get_object(bsc,
1249
                                         'SampleType',
1250
                                         row.get('SampleType_title'))
1251
            samplepoint = self.get_object(bsc,
1252
                                          'SamplePoint',
1253
                                          row['SamplePoint_title'])
1254
            if samplepoint:
1255
                sampletypes = samplepoint.getSampleTypes()
1256
                if sampletype not in sampletypes:
1257
                    sampletypes.append(sampletype)
1258
                    samplepoint.setSampleTypes(sampletypes)
1259
1260
1261
class Storage_Locations(WorksheetImporter):
1262
1263
    def Import(self):
1264
        container = self.context.setup.storagelocations
1265
        for row in self.get_rows(3):
1266
            address = row.get('Address')
1267
            if not address:
1268
                continue
1269
1270
            api.create(container, "StorageLocation",
1271
                       title=address,
1272
                       SiteTitle=row.get('SiteTitle'),
1273
                       SiteCode=row.get('SiteCode'),
1274
                       SiteDescription=row.get('SiteDescription'),
1275
                       LocationTitle=row.get('LocationTitle'),
1276
                       LocationCode=row.get('LocationCode'),
1277
                       LocationDescription=row.get('LocationDescription'),
1278
                       LocationType=row.get('LocationType'),
1279
                       ShelfTitle=row.get('ShelfTitle'),
1280
                       ShelfCode=row.get('ShelfCode'),
1281
                       ShelfDescription=row.get('ShelfDescription'))
1282
1283
1284
class Sample_Conditions(WorksheetImporter):
1285
1286
    def Import(self):
1287
        container = self.context.setup.sampleconditions
1288
        for row in self.get_rows(3):
1289
            title = row.get("title")
1290
            if not title:
1291
                continue
1292
1293
            description = row.get("description")
1294
            api.create(container, "SampleCondition",
1295
                       title=title,
1296
                       description=description)
1297
1298
1299
class Analysis_Categories(WorksheetImporter):
1300
1301
    def Import(self):
1302
        container = self.context.setup.analysiscategories
1303
        setup_tool = getToolByName(self.context, SETUP_CATALOG)
1304
        for row in self.get_rows(3):
1305
            title = row.get("title")
1306
            if not title:
1307
                logger.warning("Error in in {}. Missing Title field."
1308
                               .format(self.sheetname))
1309
                continue
1310
1311
            department_title = row.get("Department_title", None)
1312
            if not department_title:
1313
                logger.warning("Error in {}. Department field missing."
1314
                               .format(self.sheetname))
1315
                continue
1316
1317
            department = self.get_object(setup_tool, "Department",
1318
                                         title=department_title)
1319
            if not department:
1320
                logger.warning("Error in {}. Department '{}' is wrong."
1321
                               .format(self.sheetname, department_title))
1322
                continue
1323
1324
            description = row.get("description", "")
1325
            comments = row.get("comments", "")
1326
            api.create(container, "AnalysisCategory",
1327
                       title=title,
1328
                       description=description,
1329
                       comments=comments,
1330
                       department=department)
1331
1332
1333
class Methods(WorksheetImporter):
1334
1335
    def Import(self):
1336
        folder = self.context.methods
1337
        bsc = getToolByName(self.context, SETUP_CATALOG)
1338
        for row in self.get_rows(3):
1339
            if row['title']:
1340
                calculation = self.get_object(
1341
                    bsc, 'Calculation', row.get('Calculation_title'))
1342
                obj = _createObjectByType("Method", folder, tmpID())
1343
                obj.edit(
1344
                    title=row['title'],
1345
                    description=row.get('description', ''),
1346
                    Instructions=row.get('Instructions', ''),
1347
                    ManualEntryOfResults=row.get('ManualEntryOfResults', True),
1348
                    Calculation=calculation,
1349
                    MethodID=row.get('MethodID', ''),
1350
                    Accredited=row.get('Accredited', True),
1351
                )
1352
                # Obtain all created methods
1353
                methods_brains = bsc.searchResults({'portal_type': 'Method'})
1354
                # If a the new method has the same MethodID as a created method, remove MethodID value.
1355
                for methods in methods_brains:
1356
                    if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']:
1357
                        obj.edit(MethodID='')
1358
1359 View Code Duplication
                if row['MethodDocument']:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1360
                    path = resource_filename(
1361
                        self.dataset_project,
1362
                        "setupdata/%s/%s" % (self.dataset_name,
1363
                                             row['MethodDocument'])
1364
                    )
1365
                    try:
1366
                        file_data = read_file(path)
1367
                        obj.setMethodDocument(file_data)
1368
                    except Exception as msg:
1369
                        logger.warning(
1370
                            msg[0] + " Error on sheet: " + self.sheetname)
1371
1372
                obj.unmarkCreationFlag()
1373
                renameAfterCreation(obj)
1374
                notify(ObjectInitializedEvent(obj))
1375
1376
1377
class Sampling_Deviations(WorksheetImporter):
1378
1379
    def Import(self):
1380
        container = self.context.setup.samplingdeviations
1381
        for row in self.get_rows(3):
1382
            title = row.get("title")
1383
            if not title:
1384
                continue
1385
            api.create(container, "SamplingDeviation",
1386
                       title=title, description=row.get("description"))
1387
1388
1389
class Calculations(WorksheetImporter):
1390
1391
    def get_interim_fields(self):
1392
        # preload Calculation Interim Fields sheet
1393
        sheetname = 'Calculation Interim Fields'
1394
        worksheet = self.workbook[sheetname]
1395
        if not worksheet:
1396
            return
1397
        self.interim_fields = {}
1398
        rows = self.get_rows(3, worksheet=worksheet)
1399
        for row in rows:
1400
            calc_title = row['Calculation_title']
1401
            if calc_title not in self.interim_fields.keys():
1402
                self.interim_fields[calc_title] = []
1403
            self.interim_fields[calc_title].append({
1404
                'keyword': row['keyword'],
1405
                'title': row.get('title', ''),
1406
                'type': 'int',
1407
                'hidden': ('hidden' in row and row['hidden']) and True or False,
1408
                'value': row['value'],
1409
                'unit': row['unit'] and row['unit'] or ''})
1410
1411
    def Import(self):
1412
        self.get_interim_fields()
1413
        folder = self.context.bika_setup.bika_calculations
1414
        for row in self.get_rows(3):
1415
            if not row['title']:
1416
                continue
1417
            calc_title = row['title']
1418
            calc_interims = self.interim_fields.get(calc_title, [])
1419
            formula = row['Formula']
1420
            # scan formula for dep services
1421
            keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula)
1422
            # remove interims from deps
1423
            interim_keys = [k['keyword'] for k in calc_interims]
1424
            dep_keywords = [k for k in keywords if k not in interim_keys]
1425
1426
            obj = _createObjectByType("Calculation", folder, tmpID())
1427
            obj.edit(
1428
                title=calc_title,
1429
                description=row.get('description', ''),
1430
                InterimFields=calc_interims,
1431
                Formula=str(row['Formula'])
1432
            )
1433
            for kw in dep_keywords:
1434
                self.defer(src_obj=obj,
1435
                           src_field='DependentServices',
1436
                           dest_catalog=SETUP_CATALOG,
1437
                           dest_query={'portal_type': 'AnalysisService',
1438
                                       'getKeyword': kw}
1439
                           )
1440
            obj.unmarkCreationFlag()
1441
            renameAfterCreation(obj)
1442
            notify(ObjectInitializedEvent(obj))
1443
1444
        # Now we have the calculations registered, try to assign default calcs
1445
        # to methods
1446
        sheet = self.workbook["Methods"]
1447
        bsc = getToolByName(self.context, SETUP_CATALOG)
1448
        for row in self.get_rows(3, sheet):
1449
            if row.get('title', '') and row.get('Calculation_title', ''):
1450
                meth = self.get_object(bsc, "Method", row.get('title'))
1451
                if meth and not meth.getCalculation():
1452
                    calctit = safe_unicode(
1453
                        row['Calculation_title']).encode('utf-8')
1454
                    calc = self.get_object(bsc, "Calculation", calctit)
1455
                    if calc:
1456
                        meth.setCalculation(calc.UID())
1457
1458
1459
class Analysis_Services(WorksheetImporter):
1460
1461
    def load_interim_fields(self):
1462
        # preload AnalysisService InterimFields sheet
1463
        sheetname = 'AnalysisService InterimFields'
1464
        worksheet = self.workbook[sheetname]
1465
        if not worksheet:
1466
            return
1467
        self.service_interims = {}
1468
        rows = self.get_rows(3, worksheet=worksheet)
1469
        for row in rows:
1470
            service_title = row['Service_title']
1471
            if service_title not in self.service_interims.keys():
1472
                self.service_interims[service_title] = []
1473
            self.service_interims[service_title].append({
1474
                'keyword': row['keyword'],
1475
                'title': row.get('title', ''),
1476
                'type': 'int',
1477
                'value': row['value'],
1478
                'unit': row['unit'] and row['unit'] or ''})
1479
1480
    def load_result_options(self):
1481
        bsc = getToolByName(self.context, SETUP_CATALOG)
1482
        sheetname = 'AnalysisService ResultOptions'
1483
        worksheet = self.workbook[sheetname]
1484
        if not worksheet:
1485
            return
1486
        for row in self.get_rows(3, worksheet=worksheet):
1487
            service = self.get_object(bsc, 'AnalysisService',
1488
                                      row.get('Service_title'))
1489
            if not service:
1490
                return
1491
            sro = service.getResultOptions()
1492
            sro.append({'ResultValue': row['ResultValue'],
1493
                        'ResultText': row['ResultText']})
1494
            service.setResultOptions(sro)
1495
1496
    def load_service_uncertainties(self):
1497
        bsc = getToolByName(self.context, SETUP_CATALOG)
1498
        sheetname = 'Analysis Service Uncertainties'
1499
        worksheet = self.workbook[sheetname]
1500
        if not worksheet:
1501
            return
1502
1503
        bucket = {}
1504
        count = 0
1505
        for row in self.get_rows(3, worksheet=worksheet):
1506
            count += 1
1507
            service = self.get_object(bsc, 'AnalysisService',
1508
                                      row.get('Service_title'))
1509
            if not service:
1510
                warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get(
1511
                    'Service_title')
1512
                logger.warning(warning)
1513
                continue
1514
            service_uid = service.UID()
1515
            if service_uid not in bucket:
1516
                bucket[service_uid] = []
1517
            bucket[service_uid].append(
1518
                {'intercept_min': row['Range Min'],
1519
                 'intercept_max': row['Range Max'],
1520
                 'errorvalue': row['Uncertainty Value']}
1521
            )
1522
            if count > 500:
1523
                self.write_bucket(bucket)
1524
                bucket = {}
1525
        if bucket:
1526
            self.write_bucket(bucket)
1527
1528
    def get_methods(self, service_title, default_method):
1529
        """ Return an array of objects of the type Method in accordance to the
1530
            methods listed in the 'AnalysisService Methods' sheet and service
1531
            set in the parameter service_title.
1532
            If default_method is set, it will be included in the returned
1533
            array.
1534
        """
1535
        return self.get_relations(service_title,
1536
                                  default_method,
1537
                                  'Method',
1538
                                  SETUP_CATALOG,
1539
                                  'AnalysisService Methods',
1540
                                  'Method_title')
1541
1542
    def get_instruments(self, service_title, default_instrument):
1543
        """ Return an array of objects of the type Instrument in accordance to
1544
            the instruments listed in the 'AnalysisService Instruments' sheet
1545
            and service set in the parameter 'service_title'.
1546
            If default_instrument is set, it will be included in the returned
1547
            array.
1548
        """
1549
        return self.get_relations(service_title,
1550
                                  default_instrument,
1551
                                  'Instrument',
1552
                                  SETUP_CATALOG,
1553
                                  'AnalysisService Instruments',
1554
                                  'Instrument_title')
1555
1556
    def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column):
1557
        """ Return an array of objects of the specified type in accordance to
1558
            the object titles defined in the sheet specified in 'sheet_name' and
1559
            service set in the paramenter 'service_title'.
1560
            If a default_obj is set, it will be included in the returned array.
1561
        """
1562
        out_objects = [default_obj] if default_obj else []
1563
        cat = getToolByName(self.context, catalog_name)
1564
        worksheet = self.workbook[sheet_name]
1565
        if not worksheet:
1566
            return out_objects
1567
        for row in self.get_rows(3, worksheet=worksheet):
1568
            row_as_title = row.get('Service_title')
1569
            if not row_as_title:
1570
                return out_objects
1571
            elif row_as_title != service_title:
1572
                continue
1573
            obj = self.get_object(cat, obj_type, row.get(column))
1574
            if obj:
1575
                if default_obj and default_obj.UID() == obj.UID():
1576
                    continue
1577
                out_objects.append(obj)
1578
        return out_objects
1579
1580
    def write_bucket(self, bucket):
1581
        bsc = getToolByName(self.context, SETUP_CATALOG)
1582
        for service_uid, uncertainties in bucket.items():
1583
            obj = bsc(UID=service_uid)[0].getObject()
1584
            _uncert = list(obj.getUncertainties())
1585
            _uncert.extend(uncertainties)
1586
            obj.setUncertainties(_uncert)
1587
1588
    def Import(self):
1589
        self.load_interim_fields()
1590
        folder = self.context.bika_setup.bika_analysisservices
1591
        bsc = getToolByName(self.context, SETUP_CATALOG)
1592
        for row in self.get_rows(3):
1593
            if not row['title']:
1594
                continue
1595
1596
            obj = _createObjectByType("AnalysisService", folder, tmpID())
1597
            MTA = {
1598
                'days': self.to_int(row.get('MaxTimeAllowed_days', 0), 0),
1599
                'hours': self.to_int(row.get('MaxTimeAllowed_hours', 0), 0),
1600
                'minutes': self.to_int(row.get('MaxTimeAllowed_minutes', 0), 0),
1601
            }
1602
            category = self.get_object(
1603
                bsc, 'AnalysisCategory', row.get('AnalysisCategory_title'))
1604
            department = self.get_object(
1605
                bsc, 'Department', row.get('Department_title'))
1606
            container = self.get_object(
1607
                bsc, 'SampleContainer', row.get('Container_title'))
1608
            preservation = self.get_object(
1609
                bsc, 'SamplePreservation', row.get('Preservation_title'))
1610
1611
            # Analysis Service - Method considerations:
1612
            # One Analysis Service can have 0 or n Methods associated (field
1613
            # 'Methods' from the Schema).
1614
            # If the Analysis Service has at least one method associated, then
1615
            # one of those methods can be set as the defualt method (field
1616
            # '_Method' from the Schema).
1617
            #
1618
            # To make it easier, if a DefaultMethod is declared in the
1619
            # Analysis_Services spreadsheet, but the same AS has no method
1620
            # associated in the Analysis_Service_Methods spreadsheet, then make
1621
            # the assumption that the DefaultMethod set in the former has to be
1622
            # associated to the AS although the relation is missing.
1623
            defaultmethod = self.get_object(
1624
                bsc, 'Method', row.get('DefaultMethod_title'))
1625
            methods = self.get_methods(row['title'], defaultmethod)
1626
            if not defaultmethod and methods:
1627
                defaultmethod = methods[0]
1628
1629
            # Analysis Service - Instrument considerations:
1630
            # By default, an Analysis Services will be associated automatically
1631
            # with several Instruments due to the Analysis Service - Methods
1632
            # relation (an Instrument can be assigned to a Method and one Method
1633
            # can have zero or n Instruments associated). There is no need to
1634
            # set this assignment directly, the AnalysisService object will
1635
            # find those instruments.
1636
            # Besides this 'automatic' behavior, an Analysis Service can also
1637
            # have 0 or n Instruments manually associated ('Instruments' field).
1638
            # In this case, the attribute 'AllowInstrumentEntryOfResults' should
1639
            # be set to True.
1640
            #
1641
            # To make it easier, if a DefaultInstrument is declared in the
1642
            # Analysis_Services spreadsheet, but the same AS has no instrument
1643
            # associated in the AnalysisService_Instruments spreadsheet, then
1644
            # make the assumption the DefaultInstrument set in the former has
1645
            # to be associated to the AS although the relation is missing and
1646
            # the option AllowInstrumentEntryOfResults will be set to True.
1647
            defaultinstrument = self.get_object(
1648
                bsc, 'Instrument', row.get('DefaultInstrument_title'))
1649
            instruments = self.get_instruments(row['title'], defaultinstrument)
1650
            allowinstrentry = True if instruments else False
1651
            if not defaultinstrument and instruments:
1652
                defaultinstrument = instruments[0]
1653
1654
            # The manual entry of results can only be set to false if the value
1655
            # for the attribute "InstrumentEntryOfResults" is False.
1656
            allowmanualentry = True if not allowinstrentry else row.get(
1657
                'ManualEntryOfResults', True)
1658
1659
            # Analysis Service - Calculation considerations:
1660
            # By default, the AnalysisService will use the Calculation associated
1661
            # to the Default Method (the field "UseDefaultCalculation"==True).
1662
            # If the Default Method for this AS doesn't have any Calculation
1663
            # associated and the field "UseDefaultCalculation" is True, no
1664
            # Calculation will be used for this AS ("_Calculation" field is
1665
            # reserved and should not be set directly).
1666
            #
1667
            # To make it easier, if a Calculation is set by default in the
1668
            # spreadsheet, then assume the UseDefaultCalculation has to be set
1669
            # to False.
1670
            deferredcalculation = self.get_object(
1671
                bsc, 'Calculation', row.get('Calculation_title'))
1672
            usedefaultcalculation = False if deferredcalculation else True
1673
            _calculation = deferredcalculation if deferredcalculation else \
1674
                (defaultmethod.getCalculation() if defaultmethod else None)
1675
1676
            obj.edit(
1677
                title=row['title'],
1678
                ShortTitle=row.get('ShortTitle', row['title']),
1679
                description=row.get('description', ''),
1680
                Keyword=row['Keyword'],
1681
                PointOfCapture=row['PointOfCapture'].lower(),
1682
                Category=category,
1683
                Department=department,
1684
                Unit=row['Unit'] and row['Unit'] or None,
1685
                Precision=row['Precision'] and str(row['Precision']) or '0',
1686
                ExponentialFormatPrecision=str(self.to_int(
1687
                    row.get('ExponentialFormatPrecision', 7), 7)),
1688
                LowerDetectionLimit='%06f' % self.to_float(
1689
                    row.get('LowerDetectionLimit', '0.0'), 0),
1690
                UpperDetectionLimit='%06f' % self.to_float(
1691
                    row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0),
1692
                DetectionLimitSelector=self.to_bool(
1693
                    row.get('DetectionLimitSelector', 0)),
1694
                MaxTimeAllowed=MTA,
1695
                Price="%02f" % Float(row['Price']),
1696
                BulkPrice="%02f" % Float(row['BulkPrice']),
1697
                VAT="%02f" % Float(row['VAT']),
1698
                _Method=defaultmethod,
1699
                Methods=methods,
1700
                ManualEntryOfResults=allowmanualentry,
1701
                InstrumentEntryOfResults=allowinstrentry,
1702
                Instruments=instruments,
1703
                Calculation=_calculation,
1704
                UseDefaultCalculation=usedefaultcalculation,
1705
                DuplicateVariation="%02f" % Float(row['DuplicateVariation']),
1706
                Accredited=self.to_bool(row['Accredited']),
1707
                InterimFields=hasattr(self, 'service_interims') and self.service_interims.get(
1708
                    row['title'], []) or [],
1709
                Separate=self.to_bool(row.get('Separate', False)),
1710
                Container=container,
1711
                Preservation=preservation,
1712
                CommercialID=row.get('CommercialID', ''),
1713
                ProtocolID=row.get('ProtocolID', '')
1714
            )
1715
            obj.unmarkCreationFlag()
1716
            renameAfterCreation(obj)
1717
            notify(ObjectInitializedEvent(obj))
1718
        self.load_result_options()
1719
        self.load_service_uncertainties()
1720
1721
1722
class Analysis_Specifications(WorksheetImporter):
1723
1724
    def resolve_service(self, row):
1725
        bsc = getToolByName(self.context, SETUP_CATALOG)
1726
        service = bsc(
1727
            portal_type="AnalysisService",
1728
            title=safe_unicode(row["service"])
1729
        )
1730
        if not service:
1731
            service = bsc(
1732
                portal_type="AnalysisService",
1733
                getKeyword=safe_unicode(row["service"])
1734
            )
1735
        service = service[0].getObject()
1736
        return service
1737
1738
    def Import(self):
1739
        bucket = {}
1740
        client_catalog = getToolByName(self.context, CLIENT_CATALOG)
1741
        setup_catalog = getToolByName(self.context, SETUP_CATALOG)
1742
        # collect up all values into the bucket
1743
        for row in self.get_rows(3):
1744
            title = row.get("Title", False)
1745
            if not title:
1746
                title = row.get("title", False)
1747
                if not title:
1748
                    continue
1749
            parent = row["Client_title"] if row["Client_title"] else "lab"
1750
            st = row["SampleType_title"] if row["SampleType_title"] else ""
1751
            service = self.resolve_service(row)
1752
1753
            if parent not in bucket:
1754
                bucket[parent] = {}
1755
            if title not in bucket[parent]:
1756
                bucket[parent][title] = {"sampletype": st, "resultsrange": []}
1757
            bucket[parent][title]["resultsrange"].append({
1758
                "keyword": service.getKeyword(),
1759
                "min": row["min"] if row["min"] else "0",
1760
                "max": row["max"] if row["max"] else "0",
1761
            })
1762
        # write objects.
1763
        for parent in bucket.keys():
1764
            for title in bucket[parent]:
1765
                if parent == "lab":
1766
                    folder = self.context.bika_setup.bika_analysisspecs
1767
                else:
1768
                    proxy = client_catalog(
1769
                        portal_type="Client", getName=safe_unicode(parent))[0]
1770
                    folder = proxy.getObject()
1771
                st = bucket[parent][title]["sampletype"]
1772
                resultsrange = bucket[parent][title]["resultsrange"]
1773
                if st:
1774
                    st_uid = setup_catalog(
1775
                        portal_type="SampleType", title=safe_unicode(st))[0].UID
1776
                obj = _createObjectByType("AnalysisSpec", folder, tmpID())
1777
                obj.edit(title=title)
1778
                obj.setResultsRange(resultsrange)
1779
                if st:
1780
                    obj.setSampleType(st_uid)
0 ignored issues
show
introduced by
The variable st_uid does not seem to be defined for all execution paths.
Loading history...
1781
                obj.unmarkCreationFlag()
1782
                renameAfterCreation(obj)
1783
                notify(ObjectInitializedEvent(obj))
1784
1785
1786
class Analysis_Profiles(WorksheetImporter):
1787
1788
    def load_analysis_profile_services(self):
1789
        sheetname = 'Analysis Profile Services'
1790
        worksheet = self.workbook[sheetname]
1791
        self.profile_services = {}
1792
        if not worksheet:
1793
            return
1794
        bsc = getToolByName(self.context, SETUP_CATALOG)
1795
        for row in self.get_rows(3, worksheet=worksheet):
1796
            if not row.get('Profile', '') or not row.get('Service', ''):
1797
                continue
1798
            if row['Profile'] not in self.profile_services.keys():
1799
                self.profile_services[row['Profile']] = []
1800
            # Here we match againts Keyword or Title.
1801
            # XXX We need a utility for this kind of thing.
1802
            service = self.get_object(
1803
                bsc, 'AnalysisService', row.get('Service'))
1804
            if not service:
1805
                service = bsc(portal_type='AnalysisService',
1806
                              getKeyword=row['Service'])[0].getObject()
1807
            self.profile_services[row['Profile']].append(service)
1808
1809
    def Import(self):
1810
        self.load_analysis_profile_services()
1811
        folder = self.context.setup.analysisprofiles
1812
        for row in self.get_rows(3):
1813
            title = row.get("title", "")
1814
            description = row.get("description", "")
1815
            profile_key = row.get("ProfileKey", "")
1816
            commercial_id = row.get("CommercialID", "")
1817
            analysis_profile_price = row.get("AnalysisProfilePrice")
1818
            analysis_profile_vat = row.get("AnalysisProfileVAT")
1819
            use_analysis_profile_price = row.get("UseAnalysisProfilePrice")
1820
            if title:
1821
                obj = api.create(folder, "AnalysisProfile")
1822
                api.edit(obj,
1823
                         title=api.safe_unicode(title),
1824
                         description=api.safe_unicode(description),
1825
                         profile_key=api.safe_unicode(profile_key),
1826
                         commercial_id=api.safe_unicode(commercial_id),
1827
                         analysis_profile_price=api.to_float(
1828
                             analysis_profile_price, 0.0),
1829
                         analysis_profile_vat=api.to_float(
1830
                             analysis_profile_vat, 0.0),
1831
                         use_analysis_profile_price=bool(
1832
                             use_analysis_profile_price))
1833
                # set the services
1834
                obj.setServices(self.profile_services[row["title"]])
1835
1836
1837
class Sample_Templates(WorksheetImporter):
1838
1839
    def load_sampletemplate_services(self):
1840
        sheetname = "Sample Template Services"
1841
        worksheet = self.workbook[sheetname]
1842
        if not worksheet:
1843
            return
1844
        sc = api.get_tool(SETUP_CATALOG)
1845
        self.services = {}
1846
        for row in self.get_rows(3, worksheet=worksheet):
1847
            keyword = row.get("keyword")
1848
            service = self.get_object(sc, "AnalysisService", keyword)
1849
            part_id = row.get("part_id", "")
1850
            title = row.get("SampleTemplate")
1851
            if title not in self.services:
1852
                self.services[title] = []
1853
            self.services[title].append({
1854
                "uid": api.get_uid(service),
1855
                "part_id": part_id,
1856
            })
1857
1858
    def load_sampletemplate_partitions(self):
1859
        sheetname = "Sample Template Partitions"
1860
        worksheet = self.workbook[sheetname]
1861
        if not worksheet:
1862
            return
1863
1864
        sc = api.get_tool(SETUP_CATALOG)
1865
        self.partitions = {}
1866
        for row in self.get_rows(3, worksheet=worksheet):
1867
            title = row.get("SampleTemplate")
1868
            container = row.get("container")
1869
            preservation = row.get("preservation")
1870
            sampletype = row.get("sampletype")
1871
            part_id = row.get("part_id")
1872
            if title not in self.partitions:
1873
                self.partitions[title] = []
1874
            container = self.get_object(sc, "SampleContainer", container)
1875
            preservation = self.get_object(
1876
                sc, "SamplePreservation", preservation)
1877
            sampletype = self.get_object(sc, "SampleType", sampletype)
1878
            self.partitions[title].append({
1879
                "part_id": part_id,
1880
                "container": api.get_uid(container) if container else "",
1881
                "preservation": api.get_uid(preservation) if preservation else "",
1882
                "sampletype": api.get_uid(sampletype) if sampletype else "",
1883
            })
1884
1885
    def Import(self):
1886
        self.load_sampletemplate_services()
1887
        self.load_sampletemplate_partitions()
1888
1889
        setup = api.get_senaite_setup()
1890
        folder = setup.sampletemplates
1891
        sc = api.get_tool(SETUP_CATALOG)
1892
1893
        for row in self.get_rows(3):
1894
            title = row.get("title")
1895
            if not title:
1896
                continue
1897
            services = self.services.get(title)
1898
            client_title = row.get("Client_title") or "lab"
1899
            partitions = self.partitions.get(title, [])
1900
            if client_title == "lab":
1901
                folder = setup.sampletemplates
1902
            else:
1903
                client = api.search({
1904
                    "portal_type": "Client",
1905
                    "getName": client_title
1906
                }, CLIENT_CATALOG)
1907
                if len(client) == 1:
1908
                    folder = api.get_object(client[0])
1909
1910
            sampletype = self.get_object(
1911
                sc, 'SampleType', row.get('SampleType_title'))
1912
            samplepoint = self.get_object(
1913
                sc, 'SamplePoint', row.get('SamplePoint_title'))
1914
1915
            obj = api.create(folder, "SampleTemplate", title=title)
1916
            obj.setSampleType(sampletype)
1917
            obj.setSamplePoint(samplepoint)
1918
            obj.setPartitions(partitions)
1919
            obj.setServices(services)
1920
1921
1922
class Reference_Definitions(WorksheetImporter):
1923
1924
    def load_reference_definition_results(self):
1925
        sheetname = 'Reference Definition Results'
1926
        worksheet = self.workbook[sheetname]
1927
        if not worksheet:
1928
            sheetname = 'Reference Definition Values'
1929
            worksheet = self.workbook[sheetname]
1930
            if not worksheet:
1931
                return
1932
        self.results = {}
1933
        if not worksheet:
1934
            return
1935
        bsc = getToolByName(self.context, SETUP_CATALOG)
1936
        for row in self.get_rows(3, worksheet=worksheet):
1937
            if row['ReferenceDefinition_title'] not in self.results.keys():
1938
                self.results[row['ReferenceDefinition_title']] = []
1939
            service = self.get_object(bsc, 'AnalysisService',
1940
                                      row.get('service'))
1941
            self.results[
1942
                row['ReferenceDefinition_title']].append({
1943
                    'uid': service.UID(),
1944
                    'result': row['result'] if row['result'] else '0',
1945
                    'min': row['min'] if row['min'] else '0',
1946
                    'max': row['max'] if row['max'] else '0'})
1947
1948
    def Import(self):
1949
        self.load_reference_definition_results()
1950
        folder = self.context.bika_setup.bika_referencedefinitions
1951
        for row in self.get_rows(3):
1952
            if not row['title']:
1953
                continue
1954
            obj = _createObjectByType("ReferenceDefinition", folder, tmpID())
1955
            obj.edit(
1956
                title=row['title'],
1957
                description=row.get('description', ''),
1958
                Blank=self.to_bool(row['Blank']),
1959
                ReferenceResults=self.results.get(row['title'], []),
1960
                Hazardous=self.to_bool(row['Hazardous']))
1961
            obj.unmarkCreationFlag()
1962
            renameAfterCreation(obj)
1963
            notify(ObjectInitializedEvent(obj))
1964
1965
1966
class Worksheet_Templates(WorksheetImporter):
1967
1968
    def __init__(self, context):
1969
        super(Worksheet_Templates, self).__init__(context)
1970
        self.wst_layouts = {}
1971
        self.wst_services = {}
1972
1973
    def load_definitions(self):
1974
        reference_query = {
1975
            "portal_type": "ReferenceDefinition",
1976
            "is_active": True,
1977
        }
1978
        definitions = {}
1979
        brains = api.search(reference_query, SETUP_CATALOG)
1980
        for brain in brains:
1981
            definitions[api.get_title(brain)] = api.get_uid(brain)
1982
        return definitions
1983
1984
    def load_wst_layouts(self):
1985
        definitions = self.load_definitions()
1986
        sheetname = "Worksheet Template Layouts"
1987
        worksheet = self.workbook[sheetname]
1988
        if not worksheet:
1989
            return
1990
        for row in self.get_rows(3, worksheet=worksheet):
1991
            wst_title = row.get("WorksheetTemplate_title")
1992
            if wst_title not in self.wst_layouts.keys():
1993
                self.wst_layouts[wst_title] = []
1994
1995
            ref_proxy = None
1996
            dup = row.get("dup", None)
1997
            dup = int(dup) if dup else None
1998
            analysis_type = row.get("type", "a")
1999
            blank_uid = None
2000
            control_uid = None
2001
2002
            # check control/blank fields if it 'Title' or 'UID'
2003
            if analysis_type in ["b", "Blank"]:
2004
                blank_ref = row.get("blank_ref", "")
2005
                if api.is_uid(blank_ref):
2006
                    blank_uid = blank_ref
2007
                else:
2008
                    blank_uid = definitions.get(blank_ref, None)
2009
                ref_proxy = blank_uid or None
2010
            elif analysis_type in ["c", "Control"]:
2011
                control_ref = row.get("control_ref", "")
2012
                if api.is_uid(control_ref):
2013
                    control_uid = control_ref
2014
                else:
2015
                    control_uid = definitions.get(control_ref, None)
2016
                ref_proxy = control_uid or None
2017
                
2018
            if analysis_type not in ["d", "Duplicate"]:
2019
                dup = None
2020
2021
            self.wst_layouts[wst_title].append(
2022
                {
2023
                    "pos": int(row["pos"]),
2024
                    "type": analysis_type[0].lower(), # if 'type' is full word
2025
                    "blank_ref": [blank_uid] if blank_uid else [],
2026
                    "control_ref": [control_uid] if blank_uid else [],
2027
                    "reference_proxy": ref_proxy,
2028
                    "dup": dup,
2029
                    "dup_proxy": dup,
2030
                }
2031
            )
2032
2033
    def load_wst_services(self):
2034
        sheetname = "Worksheet Template Services"
2035
        worksheet = self.workbook[sheetname]
2036
        if not worksheet:
2037
            return
2038
        bsc = getToolByName(self.context, SETUP_CATALOG)
2039
        for row in self.get_rows(3, worksheet=worksheet):
2040
            wst_title = row.get("WorksheetTemplate_title")
2041
            if wst_title not in self.wst_services.keys():
2042
                self.wst_services[wst_title] = []
2043
            service = self.get_object(bsc, "AnalysisService",
2044
                                      row.get("service"))
2045
            if service:
2046
                self.wst_services[wst_title].append(service.UID())
2047
2048
    def Import(self):
2049
        self.load_wst_services()
2050
        self.load_wst_layouts()
2051
        folder = self.context.setup.worksheettemplates
2052
        for row in self.get_rows(3):
2053
            title = row.get("title")
2054
            if not title:
2055
                continue
2056
2057
            obj = api.create(folder, "WorksheetTemplate",
2058
                             title=title,
2059
                             description=row.get("description", ""))
2060
            if title in self.wst_layouts.keys():
2061
                obj.setTemplateLayout(self.wst_layouts[title])
2062
            if title in self.wst_services.keys():
2063
                obj.setServices(self.wst_services[title])
2064
2065
2066
class Setup(WorksheetImporter):
2067
2068
    def get_field_value(self, field, value):
2069
        if value is None:
2070
            return None
2071
        converters = {
2072
            "integer": self.to_integer_value,
2073
            "fixedpoint": self.to_fixedpoint_value,
2074
            "boolean": self.to_boolean_value,
2075
            "string": self.to_string_value,
2076
            "reference": self.to_reference_value,
2077
            "duration": self.to_duration_value
2078
        }
2079
        try:
2080
            return converters.get(field.type, None)(field, value)
2081
        except Exception:
2082
            logger.error("No valid type for Setup.{} ({}): {}"
2083
                         .format(field.getName(), field.type, value))
2084
2085
    def to_integer_value(self, field, value):
2086
        return str(int(value))
2087
2088
    def to_fixedpoint_value(self, field, value):
2089
        return str(float(value))
2090
2091
    def to_boolean_value(self, field, value):
2092
        return self.to_bool(value)
2093
2094
    def to_string_value(self, field, value):
2095
        if field.vocabulary:
2096
            return self.to_string_vocab_value(field, value)
2097
        return value and str(value) or ""
2098
2099
    def to_reference_value(self, field, value):
2100
        if not value:
2101
            return None
2102
2103
        brains = api.search({"title": to_unicode(value)})
2104
        if brains:
2105
            return api.get_uid(brains[0])
2106
2107
        msg = "No object found for Setup.{0} ({1}): {2}"
2108
        msg = msg.format(field.getName(), field.type, value)
2109
        logger.error(msg)
2110
        raise ValueError(msg)
2111
2112
    def to_string_vocab_value(self, field, value):
2113
        vocabulary = field.vocabulary
2114
        if type(vocabulary) is str:
2115
            vocabulary = getFromString(api.get_setup(), vocabulary)
2116
        else:
2117
            vocabulary = vocabulary.items()
2118
2119
        if not vocabulary:
2120
            raise ValueError("Empty vocabulary for {}".format(field.getName()))
2121
2122
        if type(vocabulary) in (tuple, list):
2123
            vocabulary = {item[0]: item[1] for item in vocabulary}
2124
2125
        for key, val in vocabulary.items():
2126
            key_low = str(to_utf8(key)).lower()
2127
            val_low = str(to_utf8(val)).lower()
2128
            value_low = str(value).lower()
2129
            if key_low == value_low or val_low == value_low:
2130
                return key
2131
        raise ValueError("Vocabulary entry not found")
2132
2133
    def to_duration_value(self, field, values):
2134
        duration = ["days", "hours", "minutes"]
2135
        duration = map(lambda d: "{}_{}".format(field.getName(), d), duration)
2136
        return dict(
2137
            days=api.to_int(values.get(duration[0], 0), 0),
2138
            hours=api.to_int(values.get(duration[1], 0), 0),
2139
            minutes=api.to_int(values.get(duration[2], 0), 0))
2140
2141
    def Import(self):
2142
        values = {}
2143
        for row in self.get_rows(3):
2144
            values[row['Field']] = row['Value']
2145
2146
        bsetup = self.context.bika_setup
2147
        bschema = bsetup.Schema()
2148
        for field in bschema.fields():
2149
            value = None
2150
            field_name = field.getName()
2151
            if field_name in values:
2152
                value = self.get_field_value(field, values[field_name])
2153
            elif field.type == "duration":
2154
                value = self.get_field_value(field, values)
2155
2156
            if value is None:
2157
                continue
2158
            try:
2159
                obj_field = bsetup.getField(field_name)
2160
                obj_field.set(bsetup, str(value))
2161
            except Exception:
2162
                logger.error("No valid type for Setup.{} ({}): {}"
2163
                             .format(field_name, field.type, value))
2164
2165
2166
class ID_Prefixes(WorksheetImporter):
2167
2168
    def Import(self):
2169
        prefixes = self.context.bika_setup.getIDFormatting()
2170
        for row in self.get_rows(3):
2171
            # remove existing prefix from list
2172
            prefixes = [p for p in prefixes
2173
                        if p['portal_type'] != row['portal_type']]
2174
            # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator'
2175
            separator = row.get('separator', '-')
2176
            separator = '' if separator == 'none' else separator
2177
            # add new prefix to list
2178
            prefixes.append({'portal_type': row['portal_type'],
2179
                             'padding': row['padding'],
2180
                             'prefix': row['prefix'],
2181
                             'separator': separator})
2182
        # self.context.bika_setup.setIDFormatting(prefixes)
2183
2184
2185
class Attachment_Types(WorksheetImporter):
2186
2187
    def Import(self):
2188
        container = self.context.setup.attachmenttypes
2189
        for row in self.get_rows(3):
2190
            title = row.get("title")
2191
            if not title:
2192
                continue
2193
2194
            api.create(container, "AttachmentType",
2195
                       title=title, description=row.get("description"))
2196
2197
2198
class Reference_Samples(WorksheetImporter):
2199
2200
    def load_reference_sample_results(self, sample):
2201
        sheetname = 'Reference Sample Results'
2202
        if not hasattr(self, 'results_worksheet'):
2203
            worksheet = self.workbook[sheetname]
2204
            if not worksheet:
2205
                return
2206
            self.results_worksheet = worksheet
2207
        results = []
2208
        bsc = getToolByName(self.context, SETUP_CATALOG)
2209
        for row in self.get_rows(3, worksheet=self.results_worksheet):
2210
            if row['ReferenceSample_id'] != sample.getId():
2211
                continue
2212
            service = self.get_object(bsc, 'AnalysisService',
2213
                                      row.get('AnalysisService_title'))
2214
            if not service:
2215
                warning = "Unable to load a reference sample result. Service %s not found."
2216
                logger.warning(warning, sheetname)
2217
                continue
2218
            results.append({
2219
                'uid': service.UID(),
2220
                'result': row['result'],
2221
                'min': row['min'],
2222
                'max': row['max']})
2223
        sample.setReferenceResults(results)
2224
2225
    def load_reference_analyses(self, sample):
2226
        sheetname = 'Reference Analyses'
2227
        if not hasattr(self, 'analyses_worksheet'):
2228
            worksheet = self.workbook[sheetname]
2229
            if not worksheet:
2230
                return
2231
            self.analyses_worksheet = worksheet
2232
        bsc = getToolByName(self.context, SETUP_CATALOG)
2233
        for row in self.get_rows(3, worksheet=self.analyses_worksheet):
2234
            if row['ReferenceSample_id'] != sample.getId():
2235
                continue
2236
            service = self.get_object(bsc, 'AnalysisService',
2237
                                      row.get('AnalysisService_title'))
2238
            # Analyses are keyed/named by service keyword
2239
            obj = _createObjectByType("ReferenceAnalysis", sample, row['id'])
2240
            obj.edit(title=row['id'],
2241
                     ReferenceType=row['ReferenceType'],
2242
                     Result=row['Result'],
2243
                     Analyst=row['Analyst'],
2244
                     Instrument=row['Instrument'],
2245
                     Retested=row['Retested']
2246
                     )
2247
            obj.setService(service)
2248
            # obj.setCreators(row['creator'])
2249
            # obj.setCreationDate(row['created'])
2250
            # self.set_wf_history(obj, row['workflow_history'])
2251
            obj.unmarkCreationFlag()
2252
2253
            self.load_reference_analysis_interims(obj)
2254
2255 View Code Duplication
    def load_reference_analysis_interims(self, analysis):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2256
        sheetname = 'Reference Analysis Interims'
2257
        if not hasattr(self, 'interim_worksheet'):
2258
            worksheet = self.workbook[sheetname]
2259
            if not worksheet:
2260
                return
2261
            self.interim_worksheet = worksheet
2262
        interims = []
2263
        for row in self.get_rows(3, worksheet=self.interim_worksheet):
2264
            if row['ReferenceAnalysis_id'] != analysis.getId():
2265
                continue
2266
            interims.append({
2267
                'keyword': row['keyword'],
2268
                'title': row['title'],
2269
                'value': row['value'],
2270
                'unit': row['unit'],
2271
                'hidden': row['hidden']})
2272
        analysis.setInterimFields(interims)
2273
2274
    def Import(self):
2275
        bsc = getToolByName(self.context, SETUP_CATALOG)
2276
        for row in self.get_rows(3):
2277
            if not row['id']:
2278
                continue
2279
            supplier = bsc(portal_type='Supplier',
2280
                           getName=row.get('Supplier_title', ''))[0].getObject()
2281
            obj = _createObjectByType("ReferenceSample", supplier, row['id'])
2282
            ref_def = self.get_object(bsc, 'ReferenceDefinition',
2283
                                      row.get('ReferenceDefinition_title'))
2284
            ref_man = self.get_object(bsc, 'Manufacturer',
2285
                                      row.get('Manufacturer_title'))
2286
            obj.edit(title=row['id'],
2287
                     description=row.get('description', ''),
2288
                     Blank=self.to_bool(row['Blank']),
2289
                     Hazardous=self.to_bool(row['Hazardous']),
2290
                     CatalogueNumber=row['CatalogueNumber'],
2291
                     LotNumber=row['LotNumber'],
2292
                     Remarks=row['Remarks'],
2293
                     ExpiryDate=row['ExpiryDate'],
2294
                     DateSampled=row['DateSampled'],
2295
                     DateReceived=row['DateReceived'],
2296
                     DateOpened=row['DateOpened'],
2297
                     DateExpired=row['DateExpired'],
2298
                     DateDisposed=row['DateDisposed']
2299
                     )
2300
            obj.setReferenceDefinition(ref_def)
2301
            obj.setManufacturer(ref_man)
2302
            obj.unmarkCreationFlag()
2303
2304
            self.load_reference_sample_results(obj)
2305
            self.load_reference_analyses(obj)
2306
2307
2308
class Analysis_Requests(WorksheetImporter):
2309
2310
    def load_analyses(self, sample):
2311
        sheetname = 'Analyses'
2312
        if not hasattr(self, 'analyses_worksheet'):
2313
            worksheet = self.workbook[sheetname]
2314
            if not worksheet:
2315
                return
2316
            self.analyses_worksheet = worksheet
2317
        bsc = getToolByName(self.context, SETUP_CATALOG)
2318
        bc = getToolByName(self.context, SENAITE_CATALOG)
2319
        for row in self.get_rows(3, worksheet=self.analyses_worksheet):
2320
            service = bsc(portal_type='AnalysisService',
2321
                          title=row['AnalysisService_title'])[0].getObject()
2322
            # analyses are keyed/named by keyword
2323
            ar = bc(portal_type='AnalysisRequest',
2324
                    id=row['AnalysisRequest_id'])[0].getObject()
2325
            obj = create_analysis(
2326
                ar, service,
2327
                Result=row['Result'],
2328
                ResultCaptureDate=row['ResultCaptureDate'],
2329
                Analyst=row['Analyst'],
2330
                Instrument=row['Instrument'],
2331
                Retested=self.to_bool(row['Retested']),
2332
                MaxTimeAllowed={
2333
                    'days': int(row.get('MaxTimeAllowed_days', 0)),
2334
                    'hours': int(row.get('MaxTimeAllowed_hours', 0)),
2335
                    'minutes': int(row.get('MaxTimeAllowed_minutes', 0)),
2336
                },
2337
            )
2338
2339
            analyses = ar.objectValues('Analyses')
2340
            analyses = list(analyses)
2341
            analyses.append(obj)
2342
            ar.setAnalyses(analyses)
2343
            obj.unmarkCreationFlag()
2344
2345
            self.load_analysis_interims(obj)
2346
2347 View Code Duplication
    def load_analysis_interims(self, analysis):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2348
        sheetname = 'Reference Analysis Interims'
2349
        if not hasattr(self, 'interim_worksheet'):
2350
            worksheet = self.workbook[sheetname]
2351
            if not worksheet:
2352
                return
2353
            self.interim_worksheet = worksheet
2354
        interims = []
2355
        for row in self.get_rows(3, worksheet=self.interim_worksheet):
2356
            if row['ReferenceAnalysis_id'] != analysis.getId():
2357
                continue
2358
            interims.append({
2359
                'keyword': row['keyword'],
2360
                'title': row['title'],
2361
                'value': row['value'],
2362
                'unit': row['unit'],
2363
                'hidden': row['hidden']})
2364
        analysis.setInterimFields(interims)
2365
2366
    def Import(self):
2367
        client_cat = api.get_tool(CLIENT_CATALOG)
2368
        contact_cat = api.get_tool(CONTACT_CATALOG)
2369
        setup_cat = api.get_tool(SETUP_CATALOG)
2370
        for row in self.get_rows(3):
2371
            if not row['id']:
2372
                continue
2373
            client = client_cat(portal_type="Client",
2374
                                getName=row['Client_title'])[0].getObject()
2375
            obj = _createObjectByType("AnalysisRequest", client, row['id'])
2376
            contact = contact_cat(portal_type="Contact",
2377
                                  getFullname=row['Contact_Fullname'])[0].getObject()
2378
            obj.edit(
2379
                RequestID=row['id'],
2380
                Contact=contact,
2381
                CCEmails=row['CCEmails'],
2382
                ClientOrderNumber=row['ClientOrderNumber'],
2383
                InvoiceExclude=row['InvoiceExclude'],
2384
                DateReceived=row['DateReceived'],
2385
                DatePublished=row['DatePublished'],
2386
                Remarks=row['Remarks']
2387
            )
2388
            if row['CCContact_Fullname']:
2389
                contact = contact_cat(portal_type="Contact",
2390
                                      getFullname=row['CCContact_Fullname'])[0].getObject()
2391
                obj.setCCContact(contact)
2392
            if row['AnalysisProfile_title']:
2393
                profiles = setup_cat(portal_type="AnalysisProfile",
2394
                                     title=row['AnalysisProfile_title'])[0].getObject()
2395
                obj.setProfiles([profiles])
2396
            if row['ARTemplate_title']:
2397
                template = setup_cat(portal_type="ARTemplate",
2398
                                     title=row['ARTemplate_title'])[0].getObject()
2399
                obj.setTemplate(template)
2400
2401
            obj.unmarkCreationFlag()
2402
2403
            self.load_analyses(obj)
2404
2405
2406
class Invoice_Batches(WorksheetImporter):
2407
2408
    def Import(self):
2409
        folder = self.context.invoices
2410
        for row in self.get_rows(3):
2411
            obj = _createObjectByType("InvoiceBatch", folder, tmpID())
2412
            if not row['title']:
2413
                message = _("InvoiceBatch has no Title")
2414
                raise Exception(t(message))
2415
            if not row['start']:
2416
                message = _("InvoiceBatch has no Start Date")
2417
                raise Exception(t(message))
2418
            if not row['end']:
2419
                message = _("InvoiceBatch has no End Date")
2420
                raise Exception(t(message))
2421
            obj.edit(
2422
                title=row['title'],
2423
                BatchStartDate=row['start'],
2424
                BatchEndDate=row['end'],
2425
            )
2426
            renameAfterCreation(obj)
2427
            notify(ObjectInitializedEvent(obj))
2428