Passed
Push — 2.x ( 0ba30f...7ab1ba )
by Jordi
08:49
created

AR_Templates.Import()   B

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 36
nop 1
dl 0
loc 41
rs 8.5493
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import datetime
22
import os.path
23
import re
24
25
import transaction
26
from bika.lims import api
27
from bika.lims import bikaMessageFactory as _
28
from bika.lims import logger
29
from senaite.core.idserver import renameAfterCreation
30
from bika.lims.interfaces import ISetupDataSetList
31
from bika.lims.utils import getFromString
32
from senaite.core.i18n import translate as t
33
from bika.lims.utils import tmpID
34
from bika.lims.utils import to_unicode
35
from bika.lims.utils import to_utf8
36
from bika.lims.utils.analysis import create_analysis
37
from pkg_resources import resource_filename
38
from Products.Archetypes.event import ObjectInitializedEvent
39
from Products.CMFCore.utils import getToolByName
40
from Products.CMFPlone.utils import _createObjectByType
41
from Products.CMFPlone.utils import safe_unicode
42
from senaite.core.catalog import CLIENT_CATALOG
43
from senaite.core.catalog import CONTACT_CATALOG
44
from senaite.core.catalog import SENAITE_CATALOG
45
from senaite.core.catalog import SETUP_CATALOG
46
from senaite.core.exportimport.dataimport import SetupDataSetList as SDL
47
from zope.event import notify
48
from zope.interface import implements
49
50
UID_CATALOG = "uid_catalog"
51
52
53
def lookup(context, portal_type, **kwargs):
54
    at = getToolByName(context, 'archetype_tool')
55
    catalog = at.catalog_map.get(portal_type, [None])[0] or UID_CATALOG
56
    catalog = getToolByName(context, catalog)
57
    kwargs['portal_type'] = portal_type
58
    return catalog(**kwargs)[0].getObject()
59
60
61
def check_for_required_columns(name, data, required):
62
    for column in required:
63
        if not data.get(column, None):
64
            message = _("%s has no '%s' column." % (name, column))
65
            raise Exception(t(message))
66
67
68
def Float(thing):
69
    try:
70
        f = float(thing)
71
    except ValueError:
72
        f = 0.0
73
    return f
74
75
76
def read_file(path):
77
    if os.path.isfile(path):
78
        return open(path, "rb").read()
79
    allowed_ext = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'ods', 'odt',
80
                   'xlsx', 'doc', 'docx', 'xls', 'csv', 'txt']
81
    allowed_ext += [e.upper() for e in allowed_ext]
82
    for e in allowed_ext:
83
        out = '%s.%s' % (path, e)
84
        if os.path.isfile(out):
85
            return open(out, "rb").read()
86
    raise IOError("File not found: %s. Allowed extensions: %s" % (path, ','.join(allowed_ext)))
87
88
89
class SetupDataSetList(SDL):
90
91
    implements(ISetupDataSetList)
92
93
    def __call__(self):
94
        return SDL.__call__(self, projectname="bika.lims")
95
96
97
class WorksheetImporter:
98
99
    """Use this as a base, for normal tabular data sheet imports.
100
    """
101
102
    def __init__(self, context):
103
        self.adapter_context = context
104
105
    def __call__(self, lsd, workbook, dataset_project, dataset_name):
106
        self.lsd = lsd
107
        self.context = lsd.context
108
        self.workbook = workbook
109
        self.sheetname = self.__class__.__name__.replace("_", " ")
110
        try:
111
            self.worksheet = workbook[self.sheetname]
112
        except KeyError:
113
            self.worksheet = None
114
        self.dataset_project = dataset_project
115
        self.dataset_name = dataset_name
116
        if self.worksheet:
117
            logger.info("Loading {0}.{1}: {2}".format(
118
                self.dataset_project, self.dataset_name, self.sheetname))
119
            try:
120
                self.Import()
121
            except IOError:
122
                # The importer must omit the files not found inside the server filesystem (bika/lims/setupdata/test/
123
                # if the file is loaded from 'select existing file' or bika/lims/setupdata/uploaded if it's loaded from
124
                # 'Load from file') and finishes the import without errors. https://jira.bikalabs.com/browse/LIMS-1624
125
                warning = "Error while loading attached file from %s. The file will not be uploaded into the system."
126
                logger.warning(warning, self.sheetname)
127
                self.context.plone_utils.addPortalMessage("Error while loading some attached files. "
128
                                                          "The files weren't uploaded into the system.")
129
        else:
130
            logger.info("No records found: '{0}'".format(self.sheetname))
131
132
    def get_rows(self, startrow=3, worksheet=None):
133
        """Returns a generator for all rows in a sheet.
134
           Each row contains a dictionary where the key is the value of the
135
           first row of the sheet for each column.
136
           The data values are returned in utf-8 format.
137
           Starts to consume data from startrow
138
        """
139
140
        headers = []
141
        row_nr = 0
142
        worksheet = worksheet if worksheet else self.worksheet
143
        for row in worksheet.rows:  # .iter_rows():
144
            row_nr += 1
145
            if row_nr == 1:
146
                # headers = [cell.internal_value for cell in row]
147
                headers = [cell.value for cell in row]
148
                continue
149
            if row_nr % 1000 == 0:
150
                transaction.savepoint()
151
            if row_nr <= startrow:
152
                continue
153
            # row = [_c(cell.internal_value).decode('utf-8') for cell in row]
154
            new_row = []
155
            for cell in row:
156
                value = cell.value
157
                if value is None:
158
                    value = ''
159
                if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
160
                    value = value.encode('utf-8')
161
                # Strip any space, \t, \n, or \r characters from the left-hand
162
                # side, right-hand side, or both sides of the string
163
                if isinstance(value, str):
164
                    value = value.strip(' \t\n\r')
165
                new_row.append(value)
166
            row = dict(zip(headers, new_row))
167
168
            # parse out addresses
169
            for add_type in ['Physical', 'Postal', 'Billing']:
170
                row[add_type] = {}
171
                if add_type + "_Address" in row:
172
                    for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']:
173
                        row[add_type][key] = str(row.get("%s_%s" % (add_type, key), ''))
174
175
            yield row
176
177
    def get_file_data(self, filename):
178
        if filename:
179
            try:
180
                path = resource_filename(
181
                    self.dataset_project,
182
                    "setupdata/%s/%s" % (self.dataset_name, filename))
183
                file_data = open(path, "rb").read()
184
            except Exception:
185
                file_data = None
186
        else:
187
            file_data = None
188
        return file_data
189
190
    def to_bool(self, value):
191
        """ Converts a sheet string value to a boolean value.
192
            Needed because of utf-8 conversions
193
        """
194
195
        try:
196
            value = value.lower()
197
        except Exception:
198
            pass
199
        try:
200
            value = value.encode('utf-8')
201
        except Exception:
202
            pass
203
        try:
204
            value = int(value)
205
        except Exception:
206
            pass
207
        if value in ('true', 1):
208
            return True
209
        else:
210
            return False
211
212
    def to_int(self, value, default=0):
213
        """ Converts a value o a int. Returns default if the conversion fails.
214
        """
215
        try:
216
            return int(value)
217
        except ValueError:
218
            try:
219
                return int(default)
220
            except Exception:
221
                return 0
222
223
    def to_float(self, value, default=0):
224
        """ Converts a value o a float. Returns default if the conversion fails.
225
        """
226
        try:
227
            return float(value)
228
        except ValueError:
229
            try:
230
                return float(default)
231
            except Exception:
232
                return 0.0
233
234
    def defer(self, **kwargs):
235
        self.lsd.deferred.append(kwargs)
236
237
    def Import(self):
238
        """ Override this.
239
        XXX Simple generic sheet importer
240
        """
241
242
    def fill_addressfields(self, row, obj):
243
        """ Fills the address fields for the specified object if allowed:
244
            PhysicalAddress, PostalAddress, CountryState, BillingAddress
245
        """
246
        addresses = {}
247
        for add_type in ['Physical', 'Postal', 'Billing', 'CountryState']:
248
            addresses[add_type] = {}
249
            for key in ['Address', 'City', 'State', 'District', 'Zip', 'Country']:
250
                addresses[add_type][key.lower()] = str(row.get("%s_%s" % (add_type, key), ''))
251
252
        if addresses['CountryState']['country'] == '' \
253
            and addresses['CountryState']['state'] == '':
254
            addresses['CountryState']['country'] = addresses['Physical']['country']
255
            addresses['CountryState']['state'] = addresses['Physical']['state']
256
257
        if hasattr(obj, 'setPhysicalAddress'):
258
            obj.setPhysicalAddress(addresses['Physical'])
259
        if hasattr(obj, 'setPostalAddress'):
260
            obj.setPostalAddress(addresses['Postal'])
261
        if hasattr(obj, 'setCountryState'):
262
            obj.setCountryState(addresses['CountryState'])
263
        if hasattr(obj, 'setBillingAddress'):
264
            obj.setBillingAddress(addresses['Billing'])
265
266
    def fill_contactfields(self, row, obj):
267
        """ Fills the contact fields for the specified object if allowed:
268
            EmailAddress, Phone, Fax, BusinessPhone, BusinessFax, HomePhone,
269
            MobilePhone
270
        """
271
        fieldnames = ['EmailAddress',
272
                      'Phone',
273
                      'Fax',
274
                      'BusinessPhone',
275
                      'BusinessFax',
276
                      'HomePhone',
277
                      'MobilePhone',
278
                      ]
279
        schema = obj.Schema()
280
        fields = dict([(field.getName(), field) for field in schema.fields()])
281
        for fieldname in fieldnames:
282
            try:
283
                field = fields[fieldname]
284
            except Exception:
285
                if fieldname in row:
286
                    logger.info("Address field %s not found on %s"%(fieldname,obj))
287
                continue
288
            value = row.get(fieldname, '')
289
            field.set(obj, value)
290
291
    def get_object(self, catalog, portal_type, title=None, **kwargs):
292
        """This will return an object from the catalog.
293
        Logs a message and returns None if no object or multiple objects found.
294
        All keyword arguments are passed verbatim to the contentFilter
295
        """
296
        if not title and not kwargs:
297
            return None
298
        contentFilter = {"portal_type": portal_type}
299
        if title:
300
            contentFilter['title'] = to_unicode(title)
301
        contentFilter.update(kwargs)
302
        brains = catalog(contentFilter)
303
        if len(brains) > 1:
304
            logger.info("More than one object found for %s" % contentFilter)
305
            return None
306
        elif len(brains) == 0:
307
            if portal_type == 'AnalysisService':
308
                brains = catalog(portal_type=portal_type, getKeyword=title)
309
                if brains:
310
                    return brains[0].getObject()
311
            logger.info("No objects found for %s" % contentFilter)
312
            return None
313
        else:
314
            return brains[0].getObject()
315
316
317
class Sub_Groups(WorksheetImporter):
318
319
    def Import(self):
320
        folder = self.context.bika_setup.bika_subgroups
321
        for row in self.get_rows(3):
322
            if 'title' in row and row['title']:
323
                obj = _createObjectByType("SubGroup", folder, tmpID())
324
                obj.edit(title=row['title'],
325
                         description=row['description'],
326
                         SortKey=row['SortKey'])
327
                obj.unmarkCreationFlag()
328
                renameAfterCreation(obj)
329
                notify(ObjectInitializedEvent(obj))
330
331
332
class Lab_Information(WorksheetImporter):
333
334
    def Import(self):
335
        laboratory = self.context.bika_setup.laboratory
336
        values = {}
337
        for row in self.get_rows(3):
338
            values[row['Field']] = row['Value']
339
340
        if values['AccreditationBodyLogo']:
341
            path = resource_filename(
342
                self.dataset_project,
343
                "setupdata/%s/%s" % (self.dataset_name,
344
                                     values['AccreditationBodyLogo']))
345
            try:
346
                file_data = read_file(path)
347
            except Exception as msg:
348
                file_data = None
349
                logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
350
        else:
351
            file_data = None
352
353
        laboratory.edit(
354
            Name=values['Name'],
355
            LabURL=values['LabURL'],
356
            Confidence=values['Confidence'],
357
            LaboratoryAccredited=self.to_bool(values['LaboratoryAccredited']),
358
            AccreditationBodyLong=values['AccreditationBodyLong'],
359
            AccreditationBody=values['AccreditationBody'],
360
            AccreditationBodyURL=values['AccreditationBodyURL'],
361
            Accreditation=values['Accreditation'],
362
            AccreditationReference=values['AccreditationReference'],
363
            AccreditationBodyLogo=file_data,
364
            TaxNumber=values['TaxNumber'],
365
        )
366
        self.fill_contactfields(values, laboratory)
367
        self.fill_addressfields(values, laboratory)
368
369
370
class Lab_Contacts(WorksheetImporter):
371
372
    def Import(self):
373
        folder = self.context.bika_setup.bika_labcontacts
374
        portal_groups = getToolByName(self.context, 'portal_groups')
375
        portal_registration = getToolByName(
376
            self.context, 'portal_registration')
377
        rownum = 2
378
        for row in self.get_rows(3):
379
            rownum+=1
380
            if not row.get('Firstname',None):
381
                continue
382
383
            # Username already exists?
384
            username = row.get('Username','')
385
            fullname = ('%s %s' % (row['Firstname'], row.get('Surname', ''))).strip()
386
            if username:
387
                username = safe_unicode(username).encode('utf-8')
388
                bsc = getToolByName(self.context, SETUP_CATALOG)
389
                exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username]
390
                if exists:
391
                    error = "Lab Contact: username '{0}' in row {1} already exists. This contact will be omitted.".format(username, str(rownum))
392
                    logger.error(error)
393
                    continue
394
395
            # Is there a signature file defined? Try to get the file first.
396
            signature = None
397
            if row.get('Signature'):
398
                signature = self.get_file_data(row['Signature'])
399
                if not signature:
400
                    warning = "Lab Contact: Cannot load the signature file '{0}' for user '{1}'. The contact will be created, but without a signature image".format(row['Signature'], username)
401
                    logger.warning(warning)
402
403
            obj = _createObjectByType("LabContact", folder, tmpID())
404
            obj.edit(
405
                title=fullname,
406
                Salutation=row.get('Salutation', ''),
407
                Firstname=row['Firstname'],
408
                Surname=row.get('Surname', ''),
409
                JobTitle=row.get('JobTitle', ''),
410
                Username=row.get('Username', ''),
411
                Signature=signature
412
            )
413
            obj.unmarkCreationFlag()
414
            renameAfterCreation(obj)
415
            notify(ObjectInitializedEvent(obj))
416
            self.fill_contactfields(row, obj)
417
            self.fill_addressfields(row, obj)
418
419
            if row['Department_title']:
420
                self.defer(src_obj=obj,
421
                           src_field='Department',
422
                           dest_catalog=SETUP_CATALOG,
423
                           dest_query={'portal_type': 'Department',
424
                                       'title': row['Department_title']}
425
                           )
426
427
            # Create Plone user
428
            if not row['Username']:
429
                warn = "Lab Contact: No username defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum))
430
                logger.warning(warn)
431
            if not row.get('EmailAddress', ''):
432
                warn = "Lab Contact: No Email defined for user '{0}' in row {1}. Contact created, but without access credentials.".format(fullname, str(rownum))
433
                logger.warning(warn)
434
435
            if(row['Username'] and row.get('EmailAddress','')):
436
                username = safe_unicode(row['Username']).encode('utf-8')
437
                passw = row['Password']
438
                if not passw:
439
                    passw = username
440
                    warn = ("Lab Contact: No password defined for user '{0}' in row {1}."
441
                            " Password established automatically to '{2}'").format(username, str(rownum), passw)
442
                    logger.warning(warn)
443
444
                try:
445
                    member = portal_registration.addMember(
446
                        username,
447
                        passw,
448
                        properties={
449
                            'username': username,
450
                            'email': row['EmailAddress'],
451
                            'fullname': fullname}
452
                    )
453
                except Exception as msg:
454
                    logger.error("Client Contact: Error adding user (%s): %s" % (msg, username))
455
                    continue
456
457
                groups = row.get('Groups', '')
458
                if not groups:
459
                    warn = "Lab Contact: No groups defined for user '{0}' in row {1}. Group established automatically to 'Analysts'".format(username, str(rownum))
460
                    logger.warning(warn)
461
                    groups = 'Analysts'
462
463
                group_ids = [g.strip() for g in groups.split(',')]
464
                # Add user to all specified groups
465
                for group_id in group_ids:
466
                    group = portal_groups.getGroupById(group_id)
467
                    if group:
468
                        group.addMember(username)
469
                roles = row.get('Roles', '')
470
                if roles:
471
                    role_ids = [r.strip() for r in roles.split(',')]
472
                    # Add user to all specified roles
473
                    for role_id in role_ids:
474
                        member._addRole(role_id)
475
                # If user is in LabManagers, add Owner local role on clients
476
                # folder
477
                if 'LabManager' in group_ids:
478
                    self.context.clients.manage_setLocalRoles(
479
                        username, ['Owner', ])
480
481
        # Now we have the lab contacts registered, try to assign the managers
482
        # to each department if required
483
        sheet = self.workbook["Lab Departments"]
484
        bsc = getToolByName(self.context, SETUP_CATALOG)
485
        for row in self.get_rows(3, sheet):
486
            if row['title'] and row['LabContact_Username']:
487
                dept = self.get_object(bsc, "Department", row.get('title'))
488
                if dept and not dept.getManager():
489
                    username = safe_unicode(row['LabContact_Username']).encode('utf-8')
490
                    exists = [o.getObject() for o in bsc(portal_type="LabContact") if o.getObject().getUsername()==username]
491
                    if exists:
492
                        dept.setManager(exists[0].UID())
493
494
495
class Lab_Departments(WorksheetImporter):
496
    """Import Lab Departments
497
    """
498
    def Import(self):
499
        setup = api.get_senaite_setup()
500
        container = setup.departments
501
        cat = getToolByName(self.context, CONTACT_CATALOG)
502
        lab_contacts = [o.getObject() for o in cat(portal_type="LabContact")]
503
        for row in self.get_rows(3):
504
            title = row.get("title")
505
            description = row.get("description")
506
            username = row.get("LabContact_Username")
507
            manager = None
508
509
            if not title:
510
                continue
511
512
            obj = api.create(container,
513
                             "Department",
514
                             title=title,
515
                             description=description)
516
517
            for contact in lab_contacts:
518
                if contact.getUsername() == username:
519
                    manager = contact
520
                    break
521
            if manager:
522
                obj.setManager(manager.UID())
523
            else:
524
                message = "Department: lookup of '%s' in LabContacts" \
525
                            "/Username failed." % username
526
                logger.info(message)
527
528
529
class Lab_Products(WorksheetImporter):
530
531
    def Import(self):
532
        # Refer to the default folder
533
        folder = self.context.bika_setup.bika_labproducts
534
        # Iterate through the rows
535
        for row in self.get_rows(3):
536
            # Create the LabProduct object
537
            obj = _createObjectByType('LabProduct', folder, tmpID())
538
            # Apply the row values
539
            obj.edit(
540
                title=row.get('title', 'Unknown'),
541
                description=row.get('description', ''),
542
                Volume=row.get('volume', 0),
543
                Unit=str(row.get('unit', 0)),
544
                Price=str(row.get('price', 0)),
545
            )
546
            # Rename the new object
547
            renameAfterCreation(obj)
548
            notify(ObjectInitializedEvent(obj))
549
550
551
class Clients(WorksheetImporter):
552
553
    def Import(self):
554
        folder = self.context.clients
555
        for row in self.get_rows(3):
556
            obj = _createObjectByType("Client", folder, tmpID())
557
            if not row['Name']:
558
                message = "Client %s has no Name"
559
                raise Exception(message)
560
            if not row['ClientID']:
561
                message = "Client %s has no Client ID"
562
                raise Exception(message)
563
            obj.edit(Name=row['Name'],
564
                     ClientID=row['ClientID'],
565
                     MemberDiscountApplies=row[
566
                         'MemberDiscountApplies'] and True or False,
567
                     BulkDiscount=row['BulkDiscount'] and True or False,
568
                     TaxNumber=row.get('TaxNumber', ''),
569
                     AccountNumber=row.get('AccountNumber', '')
570
                     )
571
            self.fill_contactfields(row, obj)
572
            self.fill_addressfields(row, obj)
573
            obj.unmarkCreationFlag()
574
            renameAfterCreation(obj)
575
            notify(ObjectInitializedEvent(obj))
576
577
578
class Client_Contacts(WorksheetImporter):
579
580
    def Import(self):
581
        portal_groups = getToolByName(self.context, 'portal_groups')
582
        cat = api.get_tool(CLIENT_CATALOG)
583
        for row in self.get_rows(3):
584
            client = cat(portal_type="Client",
585
                         getName=row['Client_title'])
586
            if len(client) == 0:
587
                client_contact = "%(Firstname)s %(Surname)s" % row
588
                error = "Client invalid: '%s'. The Client Contact %s will not be uploaded."
589
                logger.error(error, row['Client_title'], client_contact)
590
                continue
591
            client = client[0].getObject()
592
            contact = _createObjectByType("Contact", client, tmpID())
593
            fullname = "%(Firstname)s %(Surname)s" % row
594
            pub_pref = [x.strip() for x in
595
                        row.get('PublicationPreference', '').split(",")]
596
            contact.edit(
597
                Salutation=row.get('Salutation', ''),
598
                Firstname=row.get('Firstname', ''),
599
                Surname=row.get('Surname', ''),
600
                Username=row['Username'],
601
                JobTitle=row.get('JobTitle', ''),
602
                Department=row.get('Department', ''),
603
                PublicationPreference=pub_pref,
604
            )
605
            self.fill_contactfields(row, contact)
606
            self.fill_addressfields(row, contact)
607
            contact.unmarkCreationFlag()
608
            renameAfterCreation(contact)
609
            notify(ObjectInitializedEvent(contact))
610
            # CC Contacts
611
            if row['CCContacts']:
612
                names = [x.strip() for x in row['CCContacts'].split(",")]
613
                for _fullname in names:
614
                    self.defer(src_obj=contact,
615
                               src_field='CCContact',
616
                               dest_catalog=CONTACT_CATALOG,
617
                               dest_query={'portal_type': 'Contact',
618
                                           'getFullname': _fullname}
619
                               )
620
            ## Create Plone user
621
            username = safe_unicode(row['Username']).encode('utf-8')
622
            password = safe_unicode(row['Password']).encode('utf-8')
623
            if(username):
624
                try:
625
                    self.context.portal_registration.addMember(
626
                        username,
627
                        password,
628
                        properties={
629
                            'username': username,
630
                            'email': row['EmailAddress'],
631
                            'fullname': fullname}
632
                        )
633
                except Exception as msg:
634
                    logger.info("Error adding user (%s): %s" % (msg, username))
635
                contact.aq_parent.manage_setLocalRoles(row['Username'], ['Owner', ])
636
                contact.reindexObject()
637
                # add user to Clients group
638
                group = portal_groups.getGroupById('Clients')
639
                group.addMember(username)
640
641
642
class Container_Types(WorksheetImporter):
643
644 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
645
        folder = self.context.bika_setup.bika_containertypes
646
        for row in self.get_rows(3):
647
            if not row['title']:
648
                continue
649
            obj = _createObjectByType("ContainerType", folder, tmpID())
650
            obj.edit(title=row['title'],
651
                     description=row.get('description', ''))
652
            obj.unmarkCreationFlag()
653
            renameAfterCreation(obj)
654
            notify(ObjectInitializedEvent(obj))
655
656
657
class Preservations(WorksheetImporter):
658
659
    def Import(self):
660
        container = self.context.setup.samplepreservations
661
        for row in self.get_rows(3):
662
            title = row.get("title")
663
            if not title:
664
                continue
665
666
            api.create(container, "SamplePreservation",
667
                       title=title, description=row.get("description"))
668
669
670
class Containers(WorksheetImporter):
671
672
    def Import(self):
673
        folder = self.context.bika_setup.sample_containers
674
        bsc = getToolByName(self.context, SETUP_CATALOG)
675
        for row in self.get_rows(3):
676
            if not row["title"]:
677
                continue
678
            obj = api.create(folder, "SampleContainer")
679
            obj.setTitle(row["title"])
680
            obj.setDescription(row.get("description", ""))
681
            obj.setCapacity(row.get("Capacity", 0))
682
            obj.setPrePreserved(self.to_bool(row["PrePreserved"]))
683
            if row["ContainerType_title"]:
684
                ct = self.get_object(
685
                    bsc, "ContainerType", row.get("ContainerType_title", ""))
686
                if ct:
687
                    obj.setContainerType(ct)
688
            if row["Preservation_title"]:
689
                pres = self.get_object(bsc, "SamplePreservation",
690
                                       row.get("Preservation_title", ""))
691
                if pres:
692
                    obj.setPreservation(pres)
693
694
695
class Suppliers(WorksheetImporter):
696
697
    def Import(self):
698
        folder = self.context.bika_setup.bika_suppliers
699
        for row in self.get_rows(3):
700
            obj = _createObjectByType("Supplier", folder, tmpID())
701
            if row['Name']:
702
                obj.edit(
703
                    Name=row.get('Name', ''),
704
                    TaxNumber=row.get('TaxNumber', ''),
705
                    AccountType=row.get('AccountType', {}),
706
                    AccountName=row.get('AccountName', {}),
707
                    AccountNumber=row.get('AccountNumber', ''),
708
                    BankName=row.get('BankName', ''),
709
                    BankBranch=row.get('BankBranch', ''),
710
                    SWIFTcode=row.get('SWIFTcode', ''),
711
                    IBN=row.get('IBN', ''),
712
                    NIB=row.get('NIB', ''),
713
                    Website=row.get('Website', ''),
714
                )
715
                self.fill_contactfields(row, obj)
716
                self.fill_addressfields(row, obj)
717
                obj.unmarkCreationFlag()
718
                renameAfterCreation(obj)
719
                notify(ObjectInitializedEvent(obj))
720
721
722
class Supplier_Contacts(WorksheetImporter):
723
724
    def Import(self):
725
        bsc = getToolByName(self.context, SETUP_CATALOG)
726
        for row in self.get_rows(3):
727
            if not row['Supplier_Name']:
728
                continue
729
            if not row['Firstname']:
730
                continue
731
            folder = bsc(portal_type="Supplier",
732
                         Title=row['Supplier_Name'])
733
            if not folder:
734
                continue
735
            folder = folder[0].getObject()
736
            obj = _createObjectByType("SupplierContact", folder, tmpID())
737
            obj.edit(
738
                Firstname=row['Firstname'],
739
                Surname=row.get('Surname', ''),
740
                Username=row.get('Username')
741
            )
742
            self.fill_contactfields(row, obj)
743
            self.fill_addressfields(row, obj)
744
            obj.unmarkCreationFlag()
745
            renameAfterCreation(obj)
746
            notify(ObjectInitializedEvent(obj))
747
748
749
class Manufacturers(WorksheetImporter):
750
751
    def Import(self):
752
        folder = self.context.bika_setup.bika_manufacturers
753
        for row in self.get_rows(3):
754
            obj = _createObjectByType("Manufacturer", folder, tmpID())
755
            if row['title']:
756
                obj.edit(
757
                    title=row['title'],
758
                    description=row.get('description', '')
759
                )
760
                self.fill_addressfields(row, obj)
761
                obj.unmarkCreationFlag()
762
                renameAfterCreation(obj)
763
                notify(ObjectInitializedEvent(obj))
764
765
766
class Instrument_Types(WorksheetImporter):
767
768
    def Import(self):
769
        folder = self.context.bika_setup.bika_instrumenttypes
770
        for row in self.get_rows(3):
771
                obj = _createObjectByType("InstrumentType", folder, tmpID())
772
                obj.edit(
773
                    title=row['title'],
774
                    description=row.get('description', ''))
775
                obj.unmarkCreationFlag()
776
                renameAfterCreation(obj)
777
                notify(ObjectInitializedEvent(obj))
778
779
780
class Instruments(WorksheetImporter):
781
782
    def Import(self):
783
        folder = self.context.bika_setup.bika_instruments
784
        bsc = getToolByName(self.context, SETUP_CATALOG)
785
        for row in self.get_rows(3):
786
            if ('Type' not in row
787
                or 'Supplier' not in row
788
                or 'Brand' not in row):
789
                logger.info("Unable to import '%s'. Missing supplier, manufacturer or type" % row.get('title',''))
790
                continue
791
792
            obj = _createObjectByType("Instrument", folder, tmpID())
793
794
            obj.edit(
795
                title=row.get('title', ''),
796
                AssetNumber=row.get('assetnumber', ''),
797
                description=row.get('description', ''),
798
                Type=row.get('Type', ''),
799
                Brand=row.get('Brand', ''),
800
                Model=row.get('Model', ''),
801
                SerialNo=row.get('SerialNo', ''),
802
                DataInterface=row.get('DataInterface', ''),
803
                Location=row.get('Location', ''),
804
                InstallationDate=row.get('Instalationdate', ''),
805
                UserManualID=row.get('UserManualID', ''),
806
            )
807
            instrumenttype = self.get_object(bsc, 'InstrumentType', title=row.get('Type'))
808
            manufacturer = self.get_object(bsc, 'Manufacturer', title=row.get('Brand'))
809
            supplier = self.get_object(bsc, 'Supplier', title=row.get('Supplier', ''))
810
            method = self.get_object(bsc, 'Method', title=row.get('Method'))
811
            obj.setInstrumentType(instrumenttype)
812
            obj.setManufacturer(manufacturer)
813
            obj.setSupplier(supplier)
814
            if method:
815
                obj.setMethods([method])
816
                obj.setMethod(method)
817
818
            # Attaching the instrument's photo
819 View Code Duplication
            if row.get('Photo', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
820
                path = resource_filename(
821
                    self.dataset_project,
822
                    "setupdata/%s/%s" % (self.dataset_name,
823
                                         row['Photo'])
824
                )
825
                try:
826
                    file_data = read_file(path)
827
                    obj.setPhoto(file_data)
828
                except Exception as msg:
829
                    file_data = None
830
                    logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
831
832
            # Attaching the Installation Certificate if exists
833 View Code Duplication
            if row.get('InstalationCertificate', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
834
                path = resource_filename(
835
                    self.dataset_project,
836
                    "setupdata/%s/%s" % (self.dataset_name,
837
                                         row['InstalationCertificate'])
838
                )
839
                try:
840
                    file_data = read_file(path)
841
                    obj.setInstallationCertificate(file_data)
842
                except Exception as msg:
843
                    logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
844
845
            # Attaching the Instrument's manual if exists
846
            if row.get('UserManualFile', None):
847
                row_dict = {'DocumentID': row.get('UserManualID', 'manual'),
848
                            'DocumentVersion': '',
849
                            'DocumentLocation': '',
850
                            'DocumentType': 'Manual',
851
                            'File': row.get('UserManualFile', None)
852
                            }
853
                addDocument(self, row_dict, obj)
854
            obj.unmarkCreationFlag()
855
            renameAfterCreation(obj)
856
            notify(ObjectInitializedEvent(obj))
857
858
859 View Code Duplication
class Instrument_Validations(WorksheetImporter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
860
861
    def Import(self):
862
        bsc = getToolByName(self.context, SETUP_CATALOG)
863
        for row in self.get_rows(3):
864
            if not row.get('instrument', None) or not row.get('title', None):
865
                continue
866
867
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
868
            if folder:
869
                obj = _createObjectByType("InstrumentValidation", folder, tmpID())
870
                obj.edit(
871
                    title=row['title'],
872
                    DownFrom=row.get('downfrom', ''),
873
                    DownTo=row.get('downto', ''),
874
                    Validator=row.get('validator', ''),
875
                    Considerations=row.get('considerations', ''),
876
                    WorkPerformed=row.get('workperformed', ''),
877
                    Remarks=row.get('remarks', ''),
878
                    DateIssued=row.get('DateIssued', ''),
879
                    ReportID=row.get('ReportID', '')
880
                )
881
                # Getting lab contacts
882
                bsc = getToolByName(self.context, SETUP_CATALOG)
883
                lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", is_active=True)]
884
                for contact in lab_contacts:
885
                    if contact.getFullname() == row.get('Worker', ''):
886
                        obj.setWorker(contact.UID())
887
                obj.unmarkCreationFlag()
888
                renameAfterCreation(obj)
889
                notify(ObjectInitializedEvent(obj))
890
891
892 View Code Duplication
class Instrument_Calibrations(WorksheetImporter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
893
894
    def Import(self):
895
        bsc = getToolByName(self.context, SETUP_CATALOG)
896
        for row in self.get_rows(3):
897
            if not row.get('instrument', None) or not row.get('title', None):
898
                continue
899
900
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
901
            if folder:
902
                obj = _createObjectByType("InstrumentCalibration", folder, tmpID())
903
                obj.edit(
904
                    title=row['title'],
905
                    DownFrom=row.get('downfrom', ''),
906
                    DownTo=row.get('downto', ''),
907
                    Calibrator=row.get('calibrator', ''),
908
                    Considerations=row.get('considerations', ''),
909
                    WorkPerformed=row.get('workperformed', ''),
910
                    Remarks=row.get('remarks', ''),
911
                    DateIssued=row.get('DateIssued', ''),
912
                    ReportID=row.get('ReportID', '')
913
                )
914
                # Gettinginstrument lab contacts
915
                bsc = getToolByName(self.context, SETUP_CATALOG)
916
                lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')]
917
                for contact in lab_contacts:
918
                    if contact.getFullname() == row.get('Worker', ''):
919
                        obj.setWorker(contact.UID())
920
                obj.unmarkCreationFlag()
921
                renameAfterCreation(obj)
922
                notify(ObjectInitializedEvent(obj))
923
924
925
class Instrument_Certifications(WorksheetImporter):
926
927
    def Import(self):
928
        bsc = getToolByName(self.context, SETUP_CATALOG)
929
        for row in self.get_rows(3):
930
            if not row['instrument'] or not row['title']:
931
                continue
932
933
            folder = self.get_object(bsc, 'Instrument', row.get('instrument',''))
934
            if folder:
935
                obj = _createObjectByType("InstrumentCertification", folder, tmpID())
936
                today = datetime.date.today()
937
                certificate_expire_date = today.strftime('%d/%m') + '/' + str(today.year+1) \
938
                    if row.get('validto', '') == '' else row.get('validto')
939
                certificate_start_date = today.strftime('%d/%m/%Y') \
940
                    if row.get('validfrom', '') == '' else row.get('validfrom')
941
                obj.edit(
942
                    title=row['title'],
943
                    AssetNumber=row.get('assetnumber', ''),
944
                    Date=row.get('date', ''),
945
                    ValidFrom=certificate_start_date,
946
                    ValidTo=certificate_expire_date,
947
                    Agency=row.get('agency', ''),
948
                    Remarks=row.get('remarks', ''),
949
                )
950
                # Attaching the Report Certificate if exists
951 View Code Duplication
                if row.get('report', None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
952
                    path = resource_filename(
953
                        self.dataset_project,
954
                        "setupdata/%s/%s" % (self.dataset_name,
955
                                             row['report'])
956
                    )
957
                    try:
958
                        file_data = read_file(path)
959
                        obj.setDocument(file_data)
960
                    except Exception as msg:
961
                        file_data = None
962
                        logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
963
964
                # Getting lab contacts
965
                bsc = getToolByName(self.context, SETUP_CATALOG)
966
                lab_contacts = [o.getObject() for o in bsc(portal_type="LabContact", nactive_state='active')]
967
                for contact in lab_contacts:
968
                    if contact.getFullname() == row.get('preparedby', ''):
969
                        obj.setPreparator(contact.UID())
970
                    if contact.getFullname() == row.get('approvedby', ''):
971
                        obj.setValidator(contact.UID())
972
                obj.unmarkCreationFlag()
973
                renameAfterCreation(obj)
974
                notify(ObjectInitializedEvent(obj))
975
976
977
class Instrument_Documents(WorksheetImporter):
978
979
    def Import(self):
980
        bsc = getToolByName(self.context, SETUP_CATALOG)
981
        for row in self.get_rows(3):
982
            if not row.get('instrument', ''):
983
                continue
984
            folder = self.get_object(bsc, 'Instrument', row.get('instrument', ''))
985
            addDocument(self, row, folder)
986
987
def addDocument(self, row_dict, folder):
988
    """
989
    This function adds a multifile object to the instrument folder
990
    :param row_dict: the dictionary which contains the document information
991
    :param folder: the instrument object
992
    """
993
    if folder:
994
        # This content type need a file
995
        if row_dict.get('File', None):
996
            path = resource_filename(
997
                self.dataset_project,
998
                "setupdata/%s/%s" % (self.dataset_name,
999
                                     row_dict['File'])
1000
            )
1001
            try:
1002
                file_data = read_file(path)
1003
            except Exception as msg:
1004
                file_data = None
1005
                logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
1006
1007
            # Obtain all created instrument documents content type
1008
            catalog = getToolByName(self.context, SETUP_CATALOG)
1009
            documents_brains = catalog.searchResults({'portal_type': 'Multifile'})
1010
            # If a the new document has the same DocumentID as a created document, this object won't be created.
1011
            idAlreadyInUse = False
1012
            for item in documents_brains:
1013
                if item.getObject().getDocumentID() == row_dict.get('DocumentID', ''):
1014
                    warning = "The ID '%s' used for this document is already in use on instrument '%s', consequently " \
1015
                              "the file hasn't been upload." % (row_dict.get('DocumentID', ''), row_dict.get('instrument', ''))
1016
                    self.context.plone_utils.addPortalMessage(warning)
1017
                    idAlreadyInUse = True
1018
            if not idAlreadyInUse:
1019
                obj = _createObjectByType("Multifile", folder, tmpID())
1020
                obj.edit(
1021
                    DocumentID=row_dict.get('DocumentID', ''),
1022
                    DocumentVersion=row_dict.get('DocumentVersion', ''),
1023
                    DocumentLocation=row_dict.get('DocumentLocation', ''),
1024
                    DocumentType=row_dict.get('DocumentType', ''),
1025
                    File=file_data
1026
                )
1027
                obj.unmarkCreationFlag()
1028
                renameAfterCreation(obj)
1029
                notify(ObjectInitializedEvent(obj))
1030
1031
1032
class Instrument_Maintenance_Tasks(WorksheetImporter):
1033
1034
    def Import(self):
1035
        bsc = getToolByName(self.context, SETUP_CATALOG)
1036
        for row in self.get_rows(3):
1037
            if not row['instrument'] or not row['title'] or not row['type']:
1038
                continue
1039
1040
            folder = self.get_object(bsc, 'Instrument',row.get('instrument'))
1041
            if folder:
1042
                obj = _createObjectByType("InstrumentMaintenanceTask", folder, tmpID())
1043
                try:
1044
                    cost = "%.2f" % (row.get('cost', 0))
1045
                except Exception:
1046
                    cost = row.get('cost', '0.0')
1047
1048
                obj.edit(
1049
                    title=row['title'],
1050
                    description=row['description'],
1051
                    Type=row['type'],
1052
                    DownFrom=row.get('downfrom', ''),
1053
                    DownTo=row.get('downto', ''),
1054
                    Maintainer=row.get('maintaner', ''),
1055
                    Considerations=row.get('considerations', ''),
1056
                    WorkPerformed=row.get('workperformed', ''),
1057
                    Remarks=row.get('remarks', ''),
1058
                    Cost=cost,
1059
                    Closed=self.to_bool(row.get('closed'))
1060
                )
1061
                obj.unmarkCreationFlag()
1062
                renameAfterCreation(obj)
1063
                notify(ObjectInitializedEvent(obj))
1064
1065
1066
class Instrument_Schedule(WorksheetImporter):
1067
1068
    def Import(self):
1069
        bsc = getToolByName(self.context, SETUP_CATALOG)
1070
        for row in self.get_rows(3):
1071
            if not row['instrument'] or not row['title'] or not row['type']:
1072
                continue
1073
            folder = self.get_object(bsc, 'Instrument', row.get('instrument'))
1074
            if folder:
1075
                obj = _createObjectByType("InstrumentScheduledTask", folder, tmpID())
1076
                criteria = [
1077
                    {'fromenabled': row.get('date', None) is not None,
1078
                     'fromdate': row.get('date', ''),
1079
                     'repeatenabled': ((row['numrepeats'] and
1080
                                        row['numrepeats'] > 1) or
1081
                                       (row['repeatuntil'] and
1082
                                        len(row['repeatuntil']) > 0)),
1083
                     'repeatunit': row.get('numrepeats', ''),
1084
                     'repeatperiod': row.get('periodicity', ''),
1085
                     'repeatuntilenabled': (row['repeatuntil'] and
1086
                                            len(row['repeatuntil']) > 0),
1087
                     'repeatuntil': row.get('repeatuntil')}
1088
                ]
1089
                obj.edit(
1090
                    title=row['title'],
1091
                    Type=row['type'],
1092
                    ScheduleCriteria=criteria,
1093
                    Considerations=row.get('considerations', ''),
1094
                )
1095
                obj.unmarkCreationFlag()
1096
                renameAfterCreation(obj)
1097
                notify(ObjectInitializedEvent(obj))
1098
1099
1100
class Sample_Matrices(WorksheetImporter):
1101
1102
    def Import(self):
1103
        container = self.context.setup.samplematrices
1104
        for row in self.get_rows(3):
1105
            title = row.get("title")
1106
            if not title:
1107
                continue
1108
            api.create(container, "SampleMatrix",
1109
                       title=title, description=row.get("description"))
1110
1111
1112
class Batch_Labels(WorksheetImporter):
1113
1114
    def Import(self):
1115
        folder = self.context.bika_setup.bika_batchlabels
1116
        for row in self.get_rows(3):
1117
            if row['title']:
1118
                obj = _createObjectByType("BatchLabel", folder, tmpID())
1119
                obj.edit(title=row['title'])
1120
                obj.unmarkCreationFlag()
1121
                renameAfterCreation(obj)
1122
                notify(ObjectInitializedEvent(obj))
1123
1124
1125
class Sample_Types(WorksheetImporter):
1126
1127
    def Import(self):
1128
        folder = self.context.bika_setup.bika_sampletypes
1129
        bsc = getToolByName(self.context, SETUP_CATALOG)
1130
        for row in self.get_rows(3):
1131
            if not row['title']:
1132
                continue
1133
            obj = _createObjectByType("SampleType", folder, tmpID())
1134
            samplematrix = self.get_object(bsc, 'SampleMatrix',
1135
                                           row.get('SampleMatrix_title'))
1136
            containertype = self.get_object(bsc, 'ContainerType',
1137
                                            row.get('ContainerType_title'))
1138
            retentionperiod = {
1139
                'days': row['RetentionPeriod'] if row['RetentionPeriod'] else 0,
1140
                'hours': 0,
1141
                'minutes': 0}
1142
            obj.edit(
1143
                title=row['title'],
1144
                description=row.get('description', ''),
1145
                RetentionPeriod=retentionperiod,
1146
                Hazardous=self.to_bool(row['Hazardous']),
1147
                SampleMatrix=samplematrix,
1148
                Prefix=row['Prefix'],
1149
                MinimumVolume=row['MinimumVolume'],
1150
                ContainerType=containertype
1151
            )
1152
            samplepoint = self.get_object(bsc, 'SamplePoint',
1153
                                          row.get('SamplePoint_title'))
1154
            if samplepoint:
1155
                samplepoint.setSampleType([obj, ])
1156
            obj.unmarkCreationFlag()
1157
            renameAfterCreation(obj)
1158
            notify(ObjectInitializedEvent(obj))
1159
1160
1161
class Sample_Points(WorksheetImporter):
1162
1163
    def Import(self):
1164
        setup_folder = self.context.bika_setup.bika_samplepoints
1165
        bsc = getToolByName(self.context, SETUP_CATALOG)
1166
        cat = api.get_tool(CLIENT_CATALOG)
1167
        for row in self.get_rows(3):
1168
            if not row['title']:
1169
                continue
1170
            if row['Client_title']:
1171
                client_title = row['Client_title']
1172
                client = cat(portal_type="Client", getName=client_title)
1173
                if len(client) == 0:
1174
                    error = "Sample Point %s: Client invalid: '%s'. The Sample point will not be uploaded."
1175
                    logger.error(error, row['title'], client_title)
1176
                    continue
1177
                folder = client[0].getObject()
1178
            else:
1179
                folder = setup_folder
1180
1181
            if row['Latitude']:
1182
                logger.log("Ignored SamplePoint Latitude", 'error')
1183
            if row['Longitude']:
1184
                logger.log("Ignored SamplePoint Longitude", 'error')
1185
1186
            obj = _createObjectByType("SamplePoint", folder, tmpID())
1187
            obj.edit(
1188
                title=row['title'],
1189
                description=row.get('description', ''),
1190
                Composite=self.to_bool(row['Composite']),
1191
                Elevation=row['Elevation'],
1192
            )
1193
            sampletype = self.get_object(bsc, 'SampleType',
1194
                                         row.get('SampleType_title'))
1195
            if sampletype:
1196
                obj.setSampleTypes([sampletype, ])
1197
            obj.unmarkCreationFlag()
1198
            renameAfterCreation(obj)
1199
            notify(ObjectInitializedEvent(obj))
1200
1201
1202
class Sample_Point_Sample_Types(WorksheetImporter):
1203
1204
    def Import(self):
1205
        bsc = getToolByName(self.context, SETUP_CATALOG)
1206
        for row in self.get_rows(3):
1207
            sampletype = self.get_object(bsc,
1208
                                         'SampleType',
1209
                                         row.get('SampleType_title'))
1210
            samplepoint = self.get_object(bsc,
1211
                                          'SamplePoint',
1212
                                          row['SamplePoint_title'])
1213
            if samplepoint:
1214
                sampletypes = samplepoint.getSampleTypes()
1215
                if sampletype not in sampletypes:
1216
                    sampletypes.append(sampletype)
1217
                    samplepoint.setSampleTypes(sampletypes)
1218
1219
1220
class Storage_Locations(WorksheetImporter):
1221
1222
    def Import(self):
1223
        setup_folder = self.context.bika_setup.bika_storagelocations
1224
        for row in self.get_rows(3):
1225
            if not row['Address']:
1226
                continue
1227
1228
            obj = _createObjectByType("StorageLocation", setup_folder, tmpID())
1229
            obj.edit(
1230
                title=row['Address'],
1231
                SiteTitle=row['SiteTitle'],
1232
                SiteCode=row['SiteCode'],
1233
                SiteDescription=row['SiteDescription'],
1234
                LocationTitle=row['LocationTitle'],
1235
                LocationCode=row['LocationCode'],
1236
                LocationDescription=row['LocationDescription'],
1237
                LocationType=row['LocationType'],
1238
                ShelfTitle=row['ShelfTitle'],
1239
                ShelfCode=row['ShelfCode'],
1240
                ShelfDescription=row['ShelfDescription'],
1241
            )
1242
            obj.unmarkCreationFlag()
1243
            renameAfterCreation(obj)
1244
            notify(ObjectInitializedEvent(obj))
1245
1246
1247
class Sample_Conditions(WorksheetImporter):
1248
1249
    def Import(self):
1250
        container = self.context.setup.sampleconditions
1251
        for row in self.get_rows(3):
1252
            title = row.get("title")
1253
            if not title:
1254
                continue
1255
1256
            description = row.get("description")
1257
            api.create(container, "SampleCondition",
1258
                       title=title,
1259
                       description=description)
1260
1261
1262
class Analysis_Categories(WorksheetImporter):
1263
1264
    def Import(self):
1265
        folder = self.context.bika_setup.bika_analysiscategories
1266
        bsc = getToolByName(self.context, SETUP_CATALOG)
1267
        for row in self.get_rows(3):
1268
            department = None
1269
            if row.get('Department_title', None):
1270
                department = self.get_object(bsc, 'Department',
1271
                                             row.get('Department_title'))
1272
            if row.get('title', None) and department:
1273
                obj = _createObjectByType("AnalysisCategory", folder, tmpID())
1274
                obj.edit(
1275
                    title=row['title'],
1276
                    description=row.get('description', ''))
1277
                obj.setDepartment(department)
1278
                obj.unmarkCreationFlag()
1279
                renameAfterCreation(obj)
1280
                notify(ObjectInitializedEvent(obj))
1281
            elif not row.get('title', None):
1282
                logger.warning("Error in in " + self.sheetname + ". Missing Title field")
1283
            elif not row.get('Department_title', None):
1284
                logger.warning("Error in " + self.sheetname + ". Department field missing.")
1285
            else:
1286
                logger.warning("Error in " + self.sheetname + ". Department "
1287
                               + row.get('Department_title') + "is wrong.")
1288
1289
1290
class Methods(WorksheetImporter):
1291
1292
    def Import(self):
1293
        folder = self.context.methods
1294
        bsc = getToolByName(self.context, SETUP_CATALOG)
1295
        for row in self.get_rows(3):
1296
            if row['title']:
1297
                calculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title'))
1298
                obj = _createObjectByType("Method", folder, tmpID())
1299
                obj.edit(
1300
                    title=row['title'],
1301
                    description=row.get('description', ''),
1302
                    Instructions=row.get('Instructions', ''),
1303
                    ManualEntryOfResults=row.get('ManualEntryOfResults', True),
1304
                    Calculation=calculation,
1305
                    MethodID=row.get('MethodID', ''),
1306
                    Accredited=row.get('Accredited', True),
1307
                )
1308
                # Obtain all created methods
1309
                methods_brains = bsc.searchResults({'portal_type': 'Method'})
1310
                # If a the new method has the same MethodID as a created method, remove MethodID value.
1311
                for methods in methods_brains:
1312
                    if methods.getObject().get('MethodID', '') != '' and methods.getObject.get('MethodID', '') == obj['MethodID']:
1313
                        obj.edit(MethodID='')
1314
1315 View Code Duplication
                if row['MethodDocument']:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1316
                    path = resource_filename(
1317
                        self.dataset_project,
1318
                        "setupdata/%s/%s" % (self.dataset_name,
1319
                                             row['MethodDocument'])
1320
                    )
1321
                    try:
1322
                        file_data = read_file(path)
1323
                        obj.setMethodDocument(file_data)
1324
                    except Exception as msg:
1325
                        logger.warning(msg[0] + " Error on sheet: " + self.sheetname)
1326
1327
                obj.unmarkCreationFlag()
1328
                renameAfterCreation(obj)
1329
                notify(ObjectInitializedEvent(obj))
1330
1331
1332
class Sampling_Deviations(WorksheetImporter):
1333
1334 View Code Duplication
    def Import(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1335
        folder = self.context.bika_setup.bika_samplingdeviations
1336
        for row in self.get_rows(3):
1337
            if row['title']:
1338
                obj = _createObjectByType("SamplingDeviation", folder, tmpID())
1339
                obj.edit(
1340
                    title=row['title'],
1341
                    description=row.get('description', '')
1342
                )
1343
                obj.unmarkCreationFlag()
1344
                renameAfterCreation(obj)
1345
                notify(ObjectInitializedEvent(obj))
1346
1347
1348
class Calculations(WorksheetImporter):
1349
1350
    def get_interim_fields(self):
1351
        # preload Calculation Interim Fields sheet
1352
        sheetname = 'Calculation Interim Fields'
1353
        worksheet = self.workbook[sheetname]
1354
        if not worksheet:
1355
            return
1356
        self.interim_fields = {}
1357
        rows = self.get_rows(3, worksheet=worksheet)
1358
        for row in rows:
1359
            calc_title = row['Calculation_title']
1360
            if calc_title not in self.interim_fields.keys():
1361
                self.interim_fields[calc_title] = []
1362
            self.interim_fields[calc_title].append({
1363
                'keyword': row['keyword'],
1364
                'title': row.get('title', ''),
1365
                'type': 'int',
1366
                'hidden': ('hidden' in row and row['hidden']) and True or False,
1367
                'value': row['value'],
1368
                'unit': row['unit'] and row['unit'] or ''})
1369
1370
    def Import(self):
1371
        self.get_interim_fields()
1372
        folder = self.context.bika_setup.bika_calculations
1373
        for row in self.get_rows(3):
1374
            if not row['title']:
1375
                continue
1376
            calc_title = row['title']
1377
            calc_interims = self.interim_fields.get(calc_title, [])
1378
            formula = row['Formula']
1379
            # scan formula for dep services
1380
            keywords = re.compile(r"\[([^\.^\]]+)\]").findall(formula)
1381
            # remove interims from deps
1382
            interim_keys = [k['keyword'] for k in calc_interims]
1383
            dep_keywords = [k for k in keywords if k not in interim_keys]
1384
1385
            obj = _createObjectByType("Calculation", folder, tmpID())
1386
            obj.edit(
1387
                title=calc_title,
1388
                description=row.get('description', ''),
1389
                InterimFields=calc_interims,
1390
                Formula=str(row['Formula'])
1391
            )
1392
            for kw in dep_keywords:
1393
                self.defer(src_obj=obj,
1394
                           src_field='DependentServices',
1395
                           dest_catalog=SETUP_CATALOG,
1396
                           dest_query={'portal_type': 'AnalysisService',
1397
                                       'getKeyword': kw}
1398
                           )
1399
            obj.unmarkCreationFlag()
1400
            renameAfterCreation(obj)
1401
            notify(ObjectInitializedEvent(obj))
1402
1403
        # Now we have the calculations registered, try to assign default calcs
1404
        # to methods
1405
        sheet = self.workbook["Methods"]
1406
        bsc = getToolByName(self.context, SETUP_CATALOG)
1407
        for row in self.get_rows(3, sheet):
1408
            if row.get('title', '') and row.get('Calculation_title', ''):
1409
                meth = self.get_object(bsc, "Method", row.get('title'))
1410
                if meth and not meth.getCalculation():
1411
                    calctit = safe_unicode(row['Calculation_title']).encode('utf-8')
1412
                    calc = self.get_object(bsc, "Calculation", calctit)
1413
                    if calc:
1414
                        meth.setCalculation(calc.UID())
1415
1416
1417
class Analysis_Services(WorksheetImporter):
1418
1419
    def load_interim_fields(self):
1420
        # preload AnalysisService InterimFields sheet
1421
        sheetname = 'AnalysisService InterimFields'
1422
        worksheet = self.workbook[sheetname]
1423
        if not worksheet:
1424
            return
1425
        self.service_interims = {}
1426
        rows = self.get_rows(3, worksheet=worksheet)
1427
        for row in rows:
1428
            service_title = row['Service_title']
1429
            if service_title not in self.service_interims.keys():
1430
                self.service_interims[service_title] = []
1431
            self.service_interims[service_title].append({
1432
                'keyword': row['keyword'],
1433
                'title': row.get('title', ''),
1434
                'type': 'int',
1435
                'value': row['value'],
1436
                'unit': row['unit'] and row['unit'] or ''})
1437
1438
    def load_result_options(self):
1439
        bsc = getToolByName(self.context, SETUP_CATALOG)
1440
        sheetname = 'AnalysisService ResultOptions'
1441
        worksheet = self.workbook[sheetname]
1442
        if not worksheet:
1443
            return
1444
        for row in self.get_rows(3, worksheet=worksheet):
1445
            service = self.get_object(bsc, 'AnalysisService',
1446
                                      row.get('Service_title'))
1447
            if not service:
1448
                return
1449
            sro = service.getResultOptions()
1450
            sro.append({'ResultValue': row['ResultValue'],
1451
                        'ResultText': row['ResultText']})
1452
            service.setResultOptions(sro)
1453
1454
    def load_service_uncertainties(self):
1455
        bsc = getToolByName(self.context, SETUP_CATALOG)
1456
        sheetname = 'Analysis Service Uncertainties'
1457
        worksheet = self.workbook[sheetname]
1458
        if not worksheet:
1459
            return
1460
1461
        bucket = {}
1462
        count = 0
1463
        for row in self.get_rows(3, worksheet=worksheet):
1464
            count += 1
1465
            service = self.get_object(bsc, 'AnalysisService',
1466
                                      row.get('Service_title'))
1467
            if not service:
1468
                warning = "Unable to load an Analysis Service uncertainty. Service '%s' not found." % row.get('Service_title')
1469
                logger.warning(warning)
1470
                continue
1471
            service_uid = service.UID()
1472
            if service_uid not in bucket:
1473
                bucket[service_uid] = []
1474
            bucket[service_uid].append(
1475
                {'intercept_min': row['Range Min'],
1476
                 'intercept_max': row['Range Max'],
1477
                 'errorvalue': row['Uncertainty Value']}
1478
            )
1479
            if count > 500:
1480
                self.write_bucket(bucket)
1481
                bucket = {}
1482
        if bucket:
1483
            self.write_bucket(bucket)
1484
1485
    def get_methods(self, service_title, default_method):
1486
        """ Return an array of objects of the type Method in accordance to the
1487
            methods listed in the 'AnalysisService Methods' sheet and service
1488
            set in the parameter service_title.
1489
            If default_method is set, it will be included in the returned
1490
            array.
1491
        """
1492
        return self.get_relations(service_title,
1493
                                  default_method,
1494
                                  'Method',
1495
                                  SETUP_CATALOG,
1496
                                  'AnalysisService Methods',
1497
                                  'Method_title')
1498
1499
    def get_instruments(self, service_title, default_instrument):
1500
        """ Return an array of objects of the type Instrument in accordance to
1501
            the instruments listed in the 'AnalysisService Instruments' sheet
1502
            and service set in the parameter 'service_title'.
1503
            If default_instrument is set, it will be included in the returned
1504
            array.
1505
        """
1506
        return self.get_relations(service_title,
1507
                                  default_instrument,
1508
                                  'Instrument',
1509
                                  SETUP_CATALOG,
1510
                                  'AnalysisService Instruments',
1511
                                  'Instrument_title')
1512
1513
    def get_relations(self, service_title, default_obj, obj_type, catalog_name, sheet_name, column):
1514
        """ Return an array of objects of the specified type in accordance to
1515
            the object titles defined in the sheet specified in 'sheet_name' and
1516
            service set in the paramenter 'service_title'.
1517
            If a default_obj is set, it will be included in the returned array.
1518
        """
1519
        out_objects = [default_obj] if default_obj else []
1520
        cat = getToolByName(self.context, catalog_name)
1521
        worksheet = self.workbook[sheet_name]
1522
        if not worksheet:
1523
            return out_objects
1524
        for row in self.get_rows(3, worksheet=worksheet):
1525
            row_as_title = row.get('Service_title')
1526
            if not row_as_title:
1527
                return out_objects
1528
            elif row_as_title != service_title:
1529
                continue
1530
            obj = self.get_object(cat, obj_type, row.get(column))
1531
            if obj:
1532
                if default_obj and default_obj.UID() == obj.UID():
1533
                    continue
1534
                out_objects.append(obj)
1535
        return out_objects
1536
1537
    def write_bucket(self, bucket):
1538
        bsc = getToolByName(self.context, SETUP_CATALOG)
1539
        for service_uid, uncertainties in bucket.items():
1540
            obj = bsc(UID=service_uid)[0].getObject()
1541
            _uncert = list(obj.getUncertainties())
1542
            _uncert.extend(uncertainties)
1543
            obj.setUncertainties(_uncert)
1544
1545
    def Import(self):
1546
        self.load_interim_fields()
1547
        folder = self.context.bika_setup.bika_analysisservices
1548
        bsc = getToolByName(self.context, SETUP_CATALOG)
1549
        for row in self.get_rows(3):
1550
            if not row['title']:
1551
                continue
1552
1553
            obj = _createObjectByType("AnalysisService", folder, tmpID())
1554
            MTA = {
1555
                'days': self.to_int(row.get('MaxTimeAllowed_days',0),0),
1556
                'hours': self.to_int(row.get('MaxTimeAllowed_hours',0),0),
1557
                'minutes': self.to_int(row.get('MaxTimeAllowed_minutes',0),0),
1558
            }
1559
            category = self.get_object(bsc, 'AnalysisCategory', row.get('AnalysisCategory_title'))
1560
            department = self.get_object(bsc, 'Department', row.get('Department_title'))
1561
            container = self.get_object(bsc, 'SampleContainer', row.get('Container_title'))
1562
            preservation = self.get_object(bsc, 'SamplePreservation', row.get('Preservation_title'))
1563
1564
            # Analysis Service - Method considerations:
1565
            # One Analysis Service can have 0 or n Methods associated (field
1566
            # 'Methods' from the Schema).
1567
            # If the Analysis Service has at least one method associated, then
1568
            # one of those methods can be set as the defualt method (field
1569
            # '_Method' from the Schema).
1570
            #
1571
            # To make it easier, if a DefaultMethod is declared in the
1572
            # Analysis_Services spreadsheet, but the same AS has no method
1573
            # associated in the Analysis_Service_Methods spreadsheet, then make
1574
            # the assumption that the DefaultMethod set in the former has to be
1575
            # associated to the AS although the relation is missing.
1576
            defaultmethod = self.get_object(bsc, 'Method', row.get('DefaultMethod_title'))
1577
            methods = self.get_methods(row['title'], defaultmethod)
1578
            if not defaultmethod and methods:
1579
                defaultmethod = methods[0]
1580
1581
            # Analysis Service - Instrument considerations:
1582
            # By default, an Analysis Services will be associated automatically
1583
            # with several Instruments due to the Analysis Service - Methods
1584
            # relation (an Instrument can be assigned to a Method and one Method
1585
            # can have zero or n Instruments associated). There is no need to
1586
            # set this assignment directly, the AnalysisService object will
1587
            # find those instruments.
1588
            # Besides this 'automatic' behavior, an Analysis Service can also
1589
            # have 0 or n Instruments manually associated ('Instruments' field).
1590
            # In this case, the attribute 'AllowInstrumentEntryOfResults' should
1591
            # be set to True.
1592
            #
1593
            # To make it easier, if a DefaultInstrument is declared in the
1594
            # Analysis_Services spreadsheet, but the same AS has no instrument
1595
            # associated in the AnalysisService_Instruments spreadsheet, then
1596
            # make the assumption the DefaultInstrument set in the former has
1597
            # to be associated to the AS although the relation is missing and
1598
            # the option AllowInstrumentEntryOfResults will be set to True.
1599
            defaultinstrument = self.get_object(bsc, 'Instrument', row.get('DefaultInstrument_title'))
1600
            instruments = self.get_instruments(row['title'], defaultinstrument)
1601
            allowinstrentry = True if instruments else False
1602
            if not defaultinstrument and instruments:
1603
                defaultinstrument = instruments[0]
1604
1605
            # The manual entry of results can only be set to false if the value
1606
            # for the attribute "InstrumentEntryOfResults" is False.
1607
            allowmanualentry = True if not allowinstrentry else row.get('ManualEntryOfResults', True)
1608
1609
            # Analysis Service - Calculation considerations:
1610
            # By default, the AnalysisService will use the Calculation associated
1611
            # to the Default Method (the field "UseDefaultCalculation"==True).
1612
            # If the Default Method for this AS doesn't have any Calculation
1613
            # associated and the field "UseDefaultCalculation" is True, no
1614
            # Calculation will be used for this AS ("_Calculation" field is
1615
            # reserved and should not be set directly).
1616
            #
1617
            # To make it easier, if a Calculation is set by default in the
1618
            # spreadsheet, then assume the UseDefaultCalculation has to be set
1619
            # to False.
1620
            deferredcalculation = self.get_object(bsc, 'Calculation', row.get('Calculation_title'))
1621
            usedefaultcalculation = False if deferredcalculation else True
1622
            _calculation = deferredcalculation if deferredcalculation else \
1623
                            (defaultmethod.getCalculation() if defaultmethod else None)
1624
1625
            obj.edit(
1626
                title=row['title'],
1627
                ShortTitle=row.get('ShortTitle', row['title']),
1628
                description=row.get('description', ''),
1629
                Keyword=row['Keyword'],
1630
                PointOfCapture=row['PointOfCapture'].lower(),
1631
                Category=category,
1632
                Department=department,
1633
                Unit=row['Unit'] and row['Unit'] or None,
1634
                Precision=row['Precision'] and str(row['Precision']) or '0',
1635
                ExponentialFormatPrecision=str(self.to_int(row.get('ExponentialFormatPrecision',7),7)),
1636
                LowerDetectionLimit='%06f' % self.to_float(row.get('LowerDetectionLimit', '0.0'), 0),
1637
                UpperDetectionLimit='%06f' % self.to_float(row.get('UpperDetectionLimit', '1000000000.0'), 1000000000.0),
1638
                DetectionLimitSelector=self.to_bool(row.get('DetectionLimitSelector',0)),
1639
                MaxTimeAllowed=MTA,
1640
                Price="%02f" % Float(row['Price']),
1641
                BulkPrice="%02f" % Float(row['BulkPrice']),
1642
                VAT="%02f" % Float(row['VAT']),
1643
                _Method=defaultmethod,
1644
                Methods=methods,
1645
                ManualEntryOfResults=allowmanualentry,
1646
                InstrumentEntryOfResults=allowinstrentry,
1647
                Instruments=instruments,
1648
                Calculation=_calculation,
1649
                UseDefaultCalculation=usedefaultcalculation,
1650
                DuplicateVariation="%02f" % Float(row['DuplicateVariation']),
1651
                Accredited=self.to_bool(row['Accredited']),
1652
                InterimFields=hasattr(self, 'service_interims') and self.service_interims.get(
1653
                    row['title'], []) or [],
1654
                Separate=self.to_bool(row.get('Separate', False)),
1655
                Container=container,
1656
                Preservation=preservation,
1657
                CommercialID=row.get('CommercialID', ''),
1658
                ProtocolID=row.get('ProtocolID', '')
1659
            )
1660
            obj.unmarkCreationFlag()
1661
            renameAfterCreation(obj)
1662
            notify(ObjectInitializedEvent(obj))
1663
        self.load_result_options()
1664
        self.load_service_uncertainties()
1665
1666
1667
class Analysis_Specifications(WorksheetImporter):
1668
1669
    def resolve_service(self, row):
1670
        bsc = getToolByName(self.context, SETUP_CATALOG)
1671
        service = bsc(
1672
            portal_type="AnalysisService",
1673
            title=safe_unicode(row["service"])
1674
        )
1675
        if not service:
1676
            service = bsc(
1677
                portal_type="AnalysisService",
1678
                getKeyword=safe_unicode(row["service"])
1679
            )
1680
        service = service[0].getObject()
1681
        return service
1682
1683
    def Import(self):
1684
        bucket = {}
1685
        client_catalog = getToolByName(self.context, CLIENT_CATALOG)
1686
        setup_catalog = getToolByName(self.context, SETUP_CATALOG)
1687
        # collect up all values into the bucket
1688
        for row in self.get_rows(3):
1689
            title = row.get("Title", False)
1690
            if not title:
1691
                title = row.get("title", False)
1692
                if not title:
1693
                    continue
1694
            parent = row["Client_title"] if row["Client_title"] else "lab"
1695
            st = row["SampleType_title"] if row["SampleType_title"] else ""
1696
            service = self.resolve_service(row)
1697
1698
            if parent not in bucket:
1699
                bucket[parent] = {}
1700
            if title not in bucket[parent]:
1701
                bucket[parent][title] = {"sampletype": st, "resultsrange": []}
1702
            bucket[parent][title]["resultsrange"].append({
1703
                "keyword": service.getKeyword(),
1704
                "min": row["min"] if row["min"] else "0",
1705
                "max": row["max"] if row["max"] else "0",
1706
            })
1707
        # write objects.
1708
        for parent in bucket.keys():
1709
            for title in bucket[parent]:
1710
                if parent == "lab":
1711
                    folder = self.context.bika_setup.bika_analysisspecs
1712
                else:
1713
                    proxy = client_catalog(
1714
                        portal_type="Client", getName=safe_unicode(parent))[0]
1715
                    folder = proxy.getObject()
1716
                st = bucket[parent][title]["sampletype"]
1717
                resultsrange = bucket[parent][title]["resultsrange"]
1718
                if st:
1719
                    st_uid = setup_catalog(
1720
                        portal_type="SampleType", title=safe_unicode(st))[0].UID
1721
                obj = _createObjectByType("AnalysisSpec", folder, tmpID())
1722
                obj.edit(title=title)
1723
                obj.setResultsRange(resultsrange)
1724
                if st:
1725
                    obj.setSampleType(st_uid)
0 ignored issues
show
introduced by
The variable st_uid does not seem to be defined for all execution paths.
Loading history...
1726
                obj.unmarkCreationFlag()
1727
                renameAfterCreation(obj)
1728
                notify(ObjectInitializedEvent(obj))
1729
1730
1731
class Analysis_Profiles(WorksheetImporter):
1732
1733
    def load_analysis_profile_services(self):
1734
        sheetname = 'Analysis Profile Services'
1735
        worksheet = self.workbook[sheetname]
1736
        self.profile_services = {}
1737
        if not worksheet:
1738
            return
1739
        bsc = getToolByName(self.context, SETUP_CATALOG)
1740
        for row in self.get_rows(3, worksheet=worksheet):
1741
            if not row.get('Profile','') or not row.get('Service',''):
1742
                continue
1743
            if row['Profile'] not in self.profile_services.keys():
1744
                self.profile_services[row['Profile']] = []
1745
            # Here we match againts Keyword or Title.
1746
            # XXX We need a utility for this kind of thing.
1747
            service = self.get_object(bsc, 'AnalysisService', row.get('Service'))
1748
            if not service:
1749
                service = bsc(portal_type='AnalysisService',
1750
                              getKeyword=row['Service'])[0].getObject()
1751
            self.profile_services[row['Profile']].append(service)
1752
1753
    def Import(self):
1754
        self.load_analysis_profile_services()
1755
        folder = self.context.setup.analysisprofiles
1756
        for row in self.get_rows(3):
1757
            title = row.get("title", "")
1758
            description = row.get("description", "")
1759
            profile_key = row.get("ProfileKey", "")
1760
            commercial_id = row.get("CommercialID", "")
1761
            analysis_profile_price = row.get("AnalysisProfilePrice")
1762
            analysis_profile_vat = row.get("AnalysisProfileVAT")
1763
            use_analysis_profile_price = row.get("UseAnalysisProfilePrice")
1764
            if title:
1765
                obj = api.create(folder, "AnalysisProfile")
1766
                api.edit(obj,
1767
                         title=api.safe_unicode(title),
1768
                         description=api.safe_unicode(description),
1769
                         profile_key=api.safe_unicode(profile_key),
1770
                         commercial_id=api.safe_unicode(commercial_id),
1771
                         analysis_profile_price=api.to_float(
1772
                             analysis_profile_price, 0.0),
1773
                         analysis_profile_vat=api.to_float(
1774
                             analysis_profile_vat, 0.0),
1775
                         use_analysis_profile_price=bool(
1776
                             use_analysis_profile_price))
1777
                # set the services
1778
                obj.setServices(self.profile_services[row["title"]])
1779
1780
1781
class Sample_Templates(WorksheetImporter):
1782
1783
    def load_sampletemplate_services(self):
1784
        sheetname = "Sample Template Services"
1785
        worksheet = self.workbook[sheetname]
1786
        if not worksheet:
1787
            return
1788
        sc = api.get_tool(SETUP_CATALOG)
1789
        self.services = {}
1790
        for row in self.get_rows(3, worksheet=worksheet):
1791
            keyword = row.get("keyword")
1792
            service = self.get_object(sc, "AnalysisService", keyword)
1793
            part_id = row.get("part_id", "")
1794
            title = row.get("SampleTemplate")
1795
            if title not in self.services:
1796
                self.services[title] = []
1797
            self.services[title].append({
1798
                "uid": api.get_uid(service),
1799
                "part_id": part_id,
1800
            })
1801
1802
    def load_sampletemplate_partitions(self):
1803
        sheetname = "Sample Template Partitions"
1804
        worksheet = self.workbook[sheetname]
1805
        if not worksheet:
1806
            return
1807
1808
        sc = api.get_tool(SETUP_CATALOG)
1809
        self.partitions = {}
1810
        for row in self.get_rows(3, worksheet=worksheet):
1811
            title = row.get("SampleTemplate")
1812
            container = row.get("container")
1813
            preservation = row.get("preservation")
1814
            sampletype = row.get("sampletype")
1815
            part_id = row.get("part_id")
1816
            if title not in self.partitions:
1817
                self.partitions[title] = []
1818
            container = self.get_object(sc, "SampleContainer", container)
1819
            preservation = self.get_object(sc, "SamplePreservation", preservation)
1820
            sampletype = self.get_object(sc, "SampleType", sampletype)
1821
            self.partitions[title].append({
1822
                "part_id": part_id,
1823
                "container": api.get_uid(container) if container else "",
1824
                "preservation": api.get_uid(preservation) if preservation else "",
1825
                "sampletype": api.get_uid(sampletype) if sampletype else "",
1826
            })
1827
1828
    def Import(self):
1829
        self.load_sampletemplate_services()
1830
        self.load_sampletemplate_partitions()
1831
1832
        setup = api.get_senaite_setup()
1833
        folder = setup.sampletemplates
1834
        sc = api.get_tool(SETUP_CATALOG)
1835
1836
        for row in self.get_rows(3):
1837
            title = row.get("title")
1838
            if not title:
1839
                continue
1840
            services = self.services.get(title)
1841
            client_title = row.get("Client_title") or "lab"
1842
            partitions = self.partitions.get(title, [])
1843
            if client_title == "lab":
1844
                folder = setup.sampletemplates
1845
            else:
1846
                client = api.search({
1847
                    "portal_type": "Client",
1848
                    "getName": client_title
1849
                }, CLIENT_CATALOG)
1850
                if len(client) == 1:
1851
                    folder = api.get_object(client[0])
1852
1853
            sampletype = self.get_object(
1854
                sc, 'SampleType', row.get('SampleType_title'))
1855
            samplepoint = self.get_object(
1856
                sc, 'SamplePoint', row.get('SamplePoint_title'))
1857
1858
            obj = api.create(folder, "SampleTemplate", title=title)
1859
            obj.setSampleType(sampletype)
1860
            obj.setSamplePoint(samplepoint)
1861
            obj.setPartitions(partitions)
1862
            obj.setServices(services)
1863
1864
1865
class Reference_Definitions(WorksheetImporter):
1866
1867
    def load_reference_definition_results(self):
1868
        sheetname = 'Reference Definition Results'
1869
        worksheet = self.workbook[sheetname]
1870
        if not worksheet:
1871
            sheetname = 'Reference Definition Values'
1872
            worksheet = self.workbook[sheetname]
1873
            if not worksheet:
1874
                return
1875
        self.results = {}
1876
        if not worksheet:
1877
            return
1878
        bsc = getToolByName(self.context, SETUP_CATALOG)
1879
        for row in self.get_rows(3, worksheet=worksheet):
1880
            if row['ReferenceDefinition_title'] not in self.results.keys():
1881
                self.results[row['ReferenceDefinition_title']] = []
1882
            service = self.get_object(bsc, 'AnalysisService',
1883
                                      row.get('service'))
1884
            self.results[
1885
                row['ReferenceDefinition_title']].append({
1886
                    'uid': service.UID(),
1887
                    'result': row['result'] if row['result'] else '0',
1888
                    'min': row['min'] if row['min'] else '0',
1889
                    'max': row['max'] if row['max'] else '0'})
1890
1891
    def Import(self):
1892
        self.load_reference_definition_results()
1893
        folder = self.context.bika_setup.bika_referencedefinitions
1894
        for row in self.get_rows(3):
1895
            if not row['title']:
1896
                continue
1897
            obj = _createObjectByType("ReferenceDefinition", folder, tmpID())
1898
            obj.edit(
1899
                title=row['title'],
1900
                description=row.get('description', ''),
1901
                Blank=self.to_bool(row['Blank']),
1902
                ReferenceResults=self.results.get(row['title'], []),
1903
                Hazardous=self.to_bool(row['Hazardous']))
1904
            obj.unmarkCreationFlag()
1905
            renameAfterCreation(obj)
1906
            notify(ObjectInitializedEvent(obj))
1907
1908
1909
class Worksheet_Templates(WorksheetImporter):
1910
1911
    def load_wst_layouts(self):
1912
        sheetname = 'Worksheet Template Layouts'
1913
        worksheet = self.workbook[sheetname]
1914
        self.wst_layouts = {}
1915
        if not worksheet:
1916
            return
1917
        for row in self.get_rows(3, worksheet=worksheet):
1918
            if row['WorksheetTemplate_title'] \
1919
               not in self.wst_layouts.keys():
1920
                self.wst_layouts[
1921
                    row['WorksheetTemplate_title']] = []
1922
            self.wst_layouts[
1923
                row['WorksheetTemplate_title']].append({
1924
                    'pos': row['pos'],
1925
                    'type': row['type'],
1926
                    'blank_ref': row['blank_ref'],
1927
                    'control_ref': row['control_ref'],
1928
                    'dup': row['dup']})
1929
1930
    def load_wst_services(self):
1931
        sheetname = 'Worksheet Template Services'
1932
        worksheet = self.workbook[sheetname]
1933
        self.wst_services = {}
1934
        if not worksheet:
1935
            return
1936
        bsc = getToolByName(self.context, SETUP_CATALOG)
1937
        for row in self.get_rows(3, worksheet=worksheet):
1938
            service = self.get_object(bsc, 'AnalysisService',
1939
                                      row.get('service'))
1940
            if row['WorksheetTemplate_title'] not in self.wst_services.keys():
1941
                self.wst_services[row['WorksheetTemplate_title']] = []
1942
            self.wst_services[
1943
                row['WorksheetTemplate_title']].append(service.UID())
1944
1945
    def Import(self):
1946
        self.load_wst_services()
1947
        self.load_wst_layouts()
1948
        folder = self.context.bika_setup.bika_worksheettemplates
1949
        for row in self.get_rows(3):
1950
            if row['title']:
1951
                obj = _createObjectByType("WorksheetTemplate", folder, tmpID())
1952
                obj.edit(
1953
                    title=row['title'],
1954
                    description=row.get('description', ''),
1955
                    Layout=self.wst_layouts[row['title']])
1956
                obj.setService(self.wst_services[row['title']])
1957
                obj.unmarkCreationFlag()
1958
                renameAfterCreation(obj)
1959
                notify(ObjectInitializedEvent(obj))
1960
1961
1962
class Setup(WorksheetImporter):
1963
1964
1965
    def get_field_value(self, field, value):
1966
        if value is None:
1967
            return None
1968
        converters = {
1969
            "integer": self.to_integer_value,
1970
            "fixedpoint": self.to_fixedpoint_value,
1971
            "boolean": self.to_boolean_value,
1972
            "string": self.to_string_value,
1973
            "reference": self.to_reference_value,
1974
            "duration": self.to_duration_value
1975
        }
1976
        try:
1977
            return converters.get(field.type, None)(field, value)
1978
        except Exception:
1979
            logger.error("No valid type for Setup.{} ({}): {}"
1980
                         .format(field.getName(), field.type, value))
1981
1982
    def to_integer_value(self, field, value):
1983
        return str(int(value))
1984
1985
    def to_fixedpoint_value(self, field, value):
1986
        return str(float(value))
1987
1988
    def to_boolean_value(self, field, value):
1989
        return self.to_bool(value)
1990
1991
    def to_string_value(self, field, value):
1992
        if field.vocabulary:
1993
            return self.to_string_vocab_value(field, value)
1994
        return value and str(value) or ""
1995
1996
    def to_reference_value(self, field, value):
1997
        if not value:
1998
            return None
1999
2000
        brains = api.search({"title": to_unicode(value)})
2001
        if brains:
2002
            return api.get_uid(brains[0])
2003
2004
        msg = "No object found for Setup.{0} ({1}): {2}"
2005
        msg = msg.format(field.getName(), field.type, value)
2006
        logger.error(msg)
2007
        raise ValueError(msg)
2008
2009
    def to_string_vocab_value(self, field, value):
2010
        vocabulary = field.vocabulary
2011
        if type(vocabulary) is str:
2012
            vocabulary = getFromString(api.get_setup(), vocabulary)
2013
        else:
2014
            vocabulary = vocabulary.items()
2015
2016
        if not vocabulary:
2017
            raise ValueError("Empty vocabulary for {}".format(field.getName()))
2018
2019
        if type(vocabulary) in (tuple, list):
2020
            vocabulary = {item[0]: item[1] for item in vocabulary}
2021
2022
        for key, val in vocabulary.items():
2023
            key_low = str(to_utf8(key)).lower()
2024
            val_low = str(to_utf8(val)).lower()
2025
            value_low = str(value).lower()
2026
            if key_low == value_low or val_low == value_low:
2027
                return key
2028
        raise ValueError("Vocabulary entry not found")
2029
2030
    def to_duration_value(self, field, values):
2031
        duration = ["days", "hours", "minutes"]
2032
        duration = map(lambda d: "{}_{}".format(field.getName(), d), duration)
2033
        return dict(
2034
            days=api.to_int(values.get(duration[0], 0), 0),
2035
            hours=api.to_int(values.get(duration[1], 0), 0),
2036
            minutes=api.to_int(values.get(duration[2], 0), 0))
2037
2038
    def Import(self):
2039
        values = {}
2040
        for row in self.get_rows(3):
2041
            values[row['Field']] = row['Value']
2042
2043
        bsetup = self.context.bika_setup
2044
        bschema = bsetup.Schema()
2045
        for field in bschema.fields():
2046
            value = None
2047
            field_name = field.getName()
2048
            if field_name in values:
2049
                value = self.get_field_value(field, values[field_name])
2050
            elif field.type == "duration":
2051
                value = self.get_field_value(field, values)
2052
2053
            if value is None:
2054
                continue
2055
            try:
2056
                obj_field = bsetup.getField(field_name)
2057
                obj_field.set(bsetup, str(value))
2058
            except Exception:
2059
                logger.error("No valid type for Setup.{} ({}): {}"
2060
                             .format(field_name, field.type, value))
2061
2062
2063
class ID_Prefixes(WorksheetImporter):
2064
2065
    def Import(self):
2066
        prefixes = self.context.bika_setup.getIDFormatting()
2067
        for row in self.get_rows(3):
2068
            # remove existing prefix from list
2069
            prefixes = [p for p in prefixes
2070
                        if p['portal_type'] != row['portal_type']]
2071
            # The spreadsheet will contain 'none' for user's visual stuff, but it means 'no separator'
2072
            separator = row.get('separator', '-')
2073
            separator = '' if separator == 'none' else separator
2074
            # add new prefix to list
2075
            prefixes.append({'portal_type': row['portal_type'],
2076
                             'padding': row['padding'],
2077
                             'prefix': row['prefix'],
2078
                             'separator': separator})
2079
        #self.context.bika_setup.setIDFormatting(prefixes)
2080
2081
2082
class Attachment_Types(WorksheetImporter):
2083
2084
    def Import(self):
2085
        folder = self.context.bika_setup.bika_attachmenttypes
2086
        for row in self.get_rows(3):
2087
            obj = _createObjectByType("AttachmentType", folder, tmpID())
2088
            obj.edit(
2089
                title=row['title'],
2090
                description=row.get('description', ''))
2091
            obj.unmarkCreationFlag()
2092
            renameAfterCreation(obj)
2093
            notify(ObjectInitializedEvent(obj))
2094
2095
2096
class Reference_Samples(WorksheetImporter):
2097
2098
    def load_reference_sample_results(self, sample):
2099
        sheetname = 'Reference Sample Results'
2100
        if not hasattr(self, 'results_worksheet'):
2101
            worksheet = self.workbook[sheetname]
2102
            if not worksheet:
2103
                return
2104
            self.results_worksheet = worksheet
2105
        results = []
2106
        bsc = getToolByName(self.context, SETUP_CATALOG)
2107
        for row in self.get_rows(3, worksheet=self.results_worksheet):
2108
            if row['ReferenceSample_id'] != sample.getId():
2109
                continue
2110
            service = self.get_object(bsc, 'AnalysisService',
2111
                                      row.get('AnalysisService_title'))
2112
            if not service:
2113
                warning = "Unable to load a reference sample result. Service %s not found."
2114
                logger.warning(warning, sheetname)
2115
                continue
2116
            results.append({
2117
                    'uid': service.UID(),
2118
                    'result': row['result'],
2119
                    'min': row['min'],
2120
                    'max': row['max']})
2121
        sample.setReferenceResults(results)
2122
2123
    def load_reference_analyses(self, sample):
2124
        sheetname = 'Reference Analyses'
2125
        if not hasattr(self, 'analyses_worksheet'):
2126
            worksheet = self.workbook[sheetname]
2127
            if not worksheet:
2128
                return
2129
            self.analyses_worksheet = worksheet
2130
        bsc = getToolByName(self.context, SETUP_CATALOG)
2131
        for row in self.get_rows(3, worksheet=self.analyses_worksheet):
2132
            if row['ReferenceSample_id'] != sample.getId():
2133
                continue
2134
            service = self.get_object(bsc, 'AnalysisService',
2135
                                      row.get('AnalysisService_title'))
2136
            # Analyses are keyed/named by service keyword
2137
            obj = _createObjectByType("ReferenceAnalysis", sample, row['id'])
2138
            obj.edit(title=row['id'],
2139
                     ReferenceType=row['ReferenceType'],
2140
                     Result=row['Result'],
2141
                     Analyst=row['Analyst'],
2142
                     Instrument=row['Instrument'],
2143
                     Retested=row['Retested']
2144
                     )
2145
            obj.setService(service)
2146
            # obj.setCreators(row['creator'])
2147
            # obj.setCreationDate(row['created'])
2148
            # self.set_wf_history(obj, row['workflow_history'])
2149
            obj.unmarkCreationFlag()
2150
2151
            self.load_reference_analysis_interims(obj)
2152
2153 View Code Duplication
    def load_reference_analysis_interims(self, analysis):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2154
        sheetname = 'Reference Analysis Interims'
2155
        if not hasattr(self, 'interim_worksheet'):
2156
            worksheet = self.workbook[sheetname]
2157
            if not worksheet:
2158
                return
2159
            self.interim_worksheet = worksheet
2160
        interims = []
2161
        for row in self.get_rows(3, worksheet=self.interim_worksheet):
2162
            if row['ReferenceAnalysis_id'] != analysis.getId():
2163
                continue
2164
            interims.append({
2165
                    'keyword': row['keyword'],
2166
                    'title': row['title'],
2167
                    'value': row['value'],
2168
                    'unit': row['unit'],
2169
                    'hidden': row['hidden']})
2170
        analysis.setInterimFields(interims)
2171
2172
    def Import(self):
2173
        bsc = getToolByName(self.context, SETUP_CATALOG)
2174
        for row in self.get_rows(3):
2175
            if not row['id']:
2176
                continue
2177
            supplier = bsc(portal_type='Supplier',
2178
                           getName=row.get('Supplier_title', ''))[0].getObject()
2179
            obj = _createObjectByType("ReferenceSample", supplier, row['id'])
2180
            ref_def = self.get_object(bsc, 'ReferenceDefinition',
2181
                                      row.get('ReferenceDefinition_title'))
2182
            ref_man = self.get_object(bsc, 'Manufacturer',
2183
                                      row.get('Manufacturer_title'))
2184
            obj.edit(title=row['id'],
2185
                     description=row.get('description', ''),
2186
                     Blank=self.to_bool(row['Blank']),
2187
                     Hazardous=self.to_bool(row['Hazardous']),
2188
                     CatalogueNumber=row['CatalogueNumber'],
2189
                     LotNumber=row['LotNumber'],
2190
                     Remarks=row['Remarks'],
2191
                     ExpiryDate=row['ExpiryDate'],
2192
                     DateSampled=row['DateSampled'],
2193
                     DateReceived=row['DateReceived'],
2194
                     DateOpened=row['DateOpened'],
2195
                     DateExpired=row['DateExpired'],
2196
                     DateDisposed=row['DateDisposed']
2197
                     )
2198
            obj.setReferenceDefinition(ref_def)
2199
            obj.setManufacturer(ref_man)
2200
            obj.unmarkCreationFlag()
2201
2202
            self.load_reference_sample_results(obj)
2203
            self.load_reference_analyses(obj)
2204
2205
class Analysis_Requests(WorksheetImporter):
2206
2207
    def load_analyses(self, sample):
2208
        sheetname = 'Analyses'
2209
        if not hasattr(self, 'analyses_worksheet'):
2210
            worksheet = self.workbook[sheetname]
2211
            if not worksheet:
2212
                return
2213
            self.analyses_worksheet = worksheet
2214
        bsc = getToolByName(self.context, SETUP_CATALOG)
2215
        bc = getToolByName(self.context, SENAITE_CATALOG)
2216
        for row in self.get_rows(3, worksheet=self.analyses_worksheet):
2217
            service = bsc(portal_type='AnalysisService',
2218
                          title=row['AnalysisService_title'])[0].getObject()
2219
            # analyses are keyed/named by keyword
2220
            ar = bc(portal_type='AnalysisRequest', id=row['AnalysisRequest_id'])[0].getObject()
2221
            obj = create_analysis(
2222
                ar, service,
2223
                Result=row['Result'],
2224
                ResultCaptureDate=row['ResultCaptureDate'],
2225
                Analyst=row['Analyst'],
2226
                Instrument=row['Instrument'],
2227
                Retested=self.to_bool(row['Retested']),
2228
                MaxTimeAllowed={
2229
                    'days': int(row.get('MaxTimeAllowed_days', 0)),
2230
                    'hours': int(row.get('MaxTimeAllowed_hours', 0)),
2231
                    'minutes': int(row.get('MaxTimeAllowed_minutes', 0)),
2232
                },
2233
            )
2234
2235
            analyses = ar.objectValues('Analyses')
2236
            analyses = list(analyses)
2237
            analyses.append(obj)
2238
            ar.setAnalyses(analyses)
2239
            obj.unmarkCreationFlag()
2240
2241
            self.load_analysis_interims(obj)
2242
2243 View Code Duplication
    def load_analysis_interims(self, analysis):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2244
        sheetname = 'Reference Analysis Interims'
2245
        if not hasattr(self, 'interim_worksheet'):
2246
            worksheet = self.workbook[sheetname]
2247
            if not worksheet:
2248
                return
2249
            self.interim_worksheet = worksheet
2250
        interims = []
2251
        for row in self.get_rows(3, worksheet=self.interim_worksheet):
2252
            if row['ReferenceAnalysis_id'] != analysis.getId():
2253
                continue
2254
            interims.append({
2255
                    'keyword': row['keyword'],
2256
                    'title': row['title'],
2257
                    'value': row['value'],
2258
                    'unit': row['unit'],
2259
                    'hidden': row['hidden']})
2260
        analysis.setInterimFields(interims)
2261
2262
    def Import(self):
2263
        client_cat = api.get_tool(CLIENT_CATALOG)
2264
        contact_cat = api.get_tool(CONTACT_CATALOG)
2265
        setup_cat = api.get_tool(SETUP_CATALOG)
2266
        for row in self.get_rows(3):
2267
            if not row['id']:
2268
                continue
2269
            client = client_cat(portal_type="Client",
2270
                                getName=row['Client_title'])[0].getObject()
2271
            obj = _createObjectByType("AnalysisRequest", client, row['id'])
2272
            contact = contact_cat(portal_type="Contact",
2273
                                  getFullname=row['Contact_Fullname'])[0].getObject()
2274
            obj.edit(
2275
                RequestID=row['id'],
2276
                Contact=contact,
2277
                CCEmails=row['CCEmails'],
2278
                ClientOrderNumber=row['ClientOrderNumber'],
2279
                InvoiceExclude=row['InvoiceExclude'],
2280
                DateReceived=row['DateReceived'],
2281
                DatePublished=row['DatePublished'],
2282
                Remarks=row['Remarks']
2283
            )
2284
            if row['CCContact_Fullname']:
2285
                contact = contact_cat(portal_type="Contact",
2286
                                      getFullname=row['CCContact_Fullname'])[0].getObject()
2287
                obj.setCCContact(contact)
2288
            if row['AnalysisProfile_title']:
2289
                profiles = setup_cat(portal_type="AnalysisProfile",
2290
                                     title=row['AnalysisProfile_title'])[0].getObject()
2291
                obj.setProfiles([profiles])
2292
            if row['ARTemplate_title']:
2293
                template = setup_cat(portal_type="ARTemplate",
2294
                                     title=row['ARTemplate_title'])[0].getObject()
2295
                obj.setTemplate(template)
2296
2297
            obj.unmarkCreationFlag()
2298
2299
            self.load_analyses(obj)
2300
2301
2302
class Invoice_Batches(WorksheetImporter):
2303
2304
    def Import(self):
2305
        folder = self.context.invoices
2306
        for row in self.get_rows(3):
2307
            obj = _createObjectByType("InvoiceBatch", folder, tmpID())
2308
            if not row['title']:
2309
                message = _("InvoiceBatch has no Title")
2310
                raise Exception(t(message))
2311
            if not row['start']:
2312
                message = _("InvoiceBatch has no Start Date")
2313
                raise Exception(t(message))
2314
            if not row['end']:
2315
                message = _("InvoiceBatch has no End Date")
2316
                raise Exception(t(message))
2317
            obj.edit(
2318
                title=row['title'],
2319
                BatchStartDate=row['start'],
2320
                BatchEndDate=row['end'],
2321
            )
2322
            renameAfterCreation(obj)
2323
            notify(ObjectInitializedEvent(obj))
2324