Passed
Push — master ( b124da...78fa88 )
by Jordi
04:16
created

bika.lims.vocabularies   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 511
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 49
eloc 234
dl 0
loc 511
rs 8.48
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A BikaContentVocabulary.__init__() 0 5 1
A CatalogVocabulary.__init__() 0 6 4
C CatalogVocabulary.__call__() 0 41 11
A BikaContentVocabulary.__call__() 0 18 5
A UserVocabulary.__call__() 0 9 2
A AnalysisProfileVocabulary.__init__() 0 4 1
A UserVocabulary.__init__() 0 2 2
A SamplePointVocabulary.__init__() 0 4 1
A ClientContactVocabulary.__call__() 0 13 3
A StorageLocationVocabulary.__init__() 0 4 1
A AnalystVocabulary.__init__() 0 2 1
A AnalysisServiceVocabulary.__init__() 0 4 1
A AnalysisCategoryVocabulary.__init__() 0 4 1
A ClientVocabulary.__init__() 0 4 1
A BikaCatalogTypesVocabulary.__call__() 0 12 1
A AnalysisRequestWorkflowStateVocabulary.__call__() 0 19 3
A SampleTypeVocabulary.__init__() 0 4 1
A StickerTemplatesVocabulary.__call__() 0 4 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
B getTemplates() 0 59 6
A getStickerTemplates() 0 39 2

How to fix   Complexity   

Complexity

Complex classes like bika.lims.vocabularies often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
from Acquisition import aq_get
9
from bika.lims import bikaMessageFactory as _
10
from bika.lims.api import is_active
11
from bika.lims.utils import t
12
from bika.lims.interfaces import IDisplayListVocabulary
13
from bika.lims.utils import to_utf8
14
from Products.Archetypes.public import DisplayList
15
from Products.CMFCore.utils import getToolByName
16
from zope.interface import implements
17
from pkg_resources import resource_filename
18
from plone.resource.utils import iterDirectoriesOfType
19
from zope.schema.interfaces import IVocabularyFactory
20
from zope.schema.vocabulary import SimpleTerm
21
from zope.schema.vocabulary import SimpleVocabulary
22
from zope.component import getAdapters
23
from zope.site.hooks import getSite
24
25
import os
26
import glob
27
28
29
class CatalogVocabulary(object):
30
    """Make vocabulary from catalog query.
31
32
    """
33
    implements(IDisplayListVocabulary)
34
35
    catalog = 'portal_catalog'
36
    contentFilter = {}
37
    key = 'UID'
38
    value = 'Title'
39
40
    def __init__(self, context, key=None, value=None, contentFilter=None):
41
        self.context = context
42
        self.key = key if key else self.key
43
        self.value = value if value else self.value
44
        self.contentFilter = \
45
            contentFilter if contentFilter else self.contentFilter
46
47
    def __call__(self, **kwargs):
48
        site = getSite()
49
        request = aq_get(site, 'REQUEST', None)
50
        catalog = getToolByName(site, self.catalog)
51
        allow_blank = False
52
        if 'allow_blank' in kwargs:
53
            allow_blank = kwargs.get('allow_blank')
54
            del (kwargs['allow_blank'])
55
56
        self.contentFilter.update(**kwargs)
57
58
        # If a secondary deactivation/cancellation workflow is anbled,
59
        # Be sure and select only active objects, unless other instructions
60
        # are explicitly specified:
61
        wf = getToolByName(site, 'portal_workflow')
62
        if 'portal_type' in self.contentFilter:
63
            portal_type = self.contentFilter['portal_type']
64
            wf_ids = [x.id for x in wf.getWorkflowsFor(portal_type)]
65
            if 'bika_inactive_workflow' in wf_ids \
66
                    and 'bika_inactive_workflow' not in self.contentFilter:
67
                self.contentFilter['inactive_state'] = 'active'
68
            elif 'bika_cancellation_workflow' in wf_ids \
69
                    and 'bika_inactive_workflow' not in self.contentFilter:
70
                self.contentFilter['cancellation_state'] = 'active'
71
72
        brains = catalog(self.contentFilter)
73
74
        items = [('', '')] if allow_blank else []
75
        for brain in brains:
76
            if self.key in brain and self.value in brain:
77
                key = getattr(brain, self.key)
78
                value = getattr(brain, self.value)
79
            else:
80
                obj = brain.getObjec()
81
                key = obj[self.key]
82
                key = callable(key) and key() or key
83
                value = obj[self.value]
84
                value = callable(value) and value() or value
85
            items.append((key, t(value)))
86
87
        return DisplayList(items)
88
89
90
class BikaContentVocabulary(object):
91
    """Vocabulary factory for Bika Setup objects.  We find them by listing
92
    folder contents directly.
93
    """
94
    implements(IVocabularyFactory)
95
96
    def __init__(self, folders, portal_types):
97
        self.folders = isinstance(folders, (tuple, list)) and \
98
                       folders or [folders, ]
99
        self.portal_types = isinstance(portal_types, (tuple, list)) and \
100
                            portal_types or [portal_types, ]
101
102
    def __call__(self, context):
103
        site = getSite()
104
        request = aq_get(site, 'REQUEST', None)
105
        items = []
106
        wf = site.portal_workflow
107
        for folder in self.folders:
108
            folder = site.restrictedTraverse(folder)
109
            for portal_type in self.portal_types:
110
                objects = list(folder.objectValues(portal_type))
111
                objects = filter(is_active, objects)
112
                if not objects:
113
                    continue
114
                objects.sort(lambda x, y: cmp(x.Title().lower(),
115
                                              y.Title().lower()))
116
                xitems = [(t(item.Title()), item.Title()) for item in objects]
117
                xitems = [SimpleTerm(i[1], i[1], i[0]) for i in xitems]
118
                items += xitems
119
        return SimpleVocabulary(items)
120
121
122
class BikaCatalogTypesVocabulary(object):
123
    """Vocabulary factory for really user friendly portal types,
124
    filtered to return only types listed as indexed by bika_catalog
125
    """
126
    implements(IVocabularyFactory)
127
128
    def __call__(self, context):
129
        translate = context.translate
130
        types = (
131
            ('AnalysisRequest', translate(to_utf8(_('Sample')))),
132
            ('Batch', translate(to_utf8(_('Batch')))),
133
            # TODO Remove in >v1.3.0
134
            ('Sample', translate(to_utf8(_('Sample')))),
135
            ('ReferenceSample', translate(to_utf8(_('Reference Sample')))),
136
            ('Worksheet', translate(to_utf8(_('Worksheet'))))
137
        )
138
        items = [SimpleTerm(i[0], i[0], i[1]) for i in types]
139
        return SimpleVocabulary(items)
140
141
142
BikaCatalogTypesVocabularyFactory = BikaCatalogTypesVocabulary()
143
144
145
class AnalysisCategoryVocabulary(BikaContentVocabulary):
146
    """" AnalysisCategories
147
148
    >>> portal = layer['portal']
149
150
    >>> from plone.app.testing import TEST_USER_NAME
151
    >>> from plone.app.testing import TEST_USER_ID
152
    >>> from plone.app.testing import setRoles
153
    >>> from plone.app.testing import login
154
    >>> login(portal, TEST_USER_NAME)
155
    >>> setRoles(portal, TEST_USER_ID, ['Manager',])
156
157
    >>> from zope.component import queryUtility
158
    >>> name = 'bika.lims.vocabularies.AnalysisCategories'
159
    >>> util = queryUtility(IVocabularyFactory, name)
160
    >>> folder = portal.bika_setup.bika_analysiscategories
161
    >>> objects = folder.objectValues()
162
    >>> len(objects)
163
    3
164
165
    >>> source = util(portal)
166
    >>> source
167
    <zope.schema.vocabulary.SimpleVocabulary object at ...>
168
169
    >>> 'Water Chemistry' in source.by_token
170
    True
171
    """
172
173
    def __init__(self):
174
        BikaContentVocabulary.__init__(self,
175
                                       ['bika_setup/bika_analysiscategories', ],
176
                                       ['AnalysisCategory', ])
177
178
179
AnalysisCategoryVocabularyFactory = AnalysisCategoryVocabulary()
180
181
182
class AnalysisProfileVocabulary(BikaContentVocabulary):
183
    def __init__(self):
184
        BikaContentVocabulary.__init__(self,
185
                                       ['bika_setup/bika_analysisprofiles', ],
186
                                       ['AnalysisProfile', ])
187
188
189
AnalysisProfileVocabularyFactory = AnalysisProfileVocabulary()
190
191
192
class StorageLocationVocabulary(BikaContentVocabulary):
193
    def __init__(self):
194
        BikaContentVocabulary.__init__(self,
195
                                       ['bika_setup/bika_storagelocations', ],
196
                                       ['StorageLocation', ])
197
198
199
StorageLocationVocabularyFactory = StorageLocationVocabulary()
200
201
202
class SamplePointVocabulary(BikaContentVocabulary):
203
    def __init__(self):
204
        BikaContentVocabulary.__init__(self,
205
                                       ['bika_setup/bika_samplepoints', ],
206
                                       ['SamplePoint', ])
207
208
209
SamplePointVocabularyFactory = SamplePointVocabulary()
210
211
212
class SampleTypeVocabulary(BikaContentVocabulary):
213
    def __init__(self):
214
        BikaContentVocabulary.__init__(self,
215
                                       ['bika_setup/bika_sampletypes', ],
216
                                       ['SampleType', ])
217
218
219
SampleTypeVocabularyFactory = SampleTypeVocabulary()
220
221
222
class AnalysisServiceVocabulary(BikaContentVocabulary):
223
    def __init__(self):
224
        BikaContentVocabulary.__init__(self,
225
                                       ['bika_setup/bika_analysisservices', ],
226
                                       ['AnalysisService', ])
227
228
229
AnalysisServiceVocabularyFactory = AnalysisServiceVocabulary()
230
231
232
class ClientVocabulary(BikaContentVocabulary):
233
    def __init__(self):
234
        BikaContentVocabulary.__init__(self,
235
                                       ['clients', ],
236
                                       ['Client', ])
237
238
239
ClientVocabularyFactory = ClientVocabulary()
240
241
242
class UserVocabulary(object):
243
    """ Present a vocabulary containing users in the specified
244
    list of roles
245
246
    >>> from zope.component import queryUtility
247
248
    >>> portal = layer['portal']
249
    >>> name = 'bika.lims.vocabularies.Users'
250
    >>> util = queryUtility(IVocabularyFactory, name)
251
252
    >>> tool = portal.portal_registration
253
    >>> tool.addMember('user1', 'user1',
254
    ...     properties = {
255
    ...         'username': 'user1',
256
    ...         'email': '[email protected]',
257
    ...         'fullname': 'user1'}
258
    ... )
259
    <MemberData at /plone/portal_memberdata/user1 used for /plone/acl_users>
260
261
    >>> source = util(portal)
262
    >>> source
263
    <zope.schema.vocabulary.SimpleVocabulary object at ...>
264
265
    >>> 'test_user_1_' in source.by_value
266
    True
267
    >>> 'user1' in source.by_value
268
    True
269
    """
270
    implements(IVocabularyFactory)
271
272
    def __init__(self, roles=[]):
273
        self.roles = roles if isinstance(roles, (tuple, list)) else [roles, ]
274
275
    def __call__(self, context):
276
        site = getSite()
277
        mtool = getToolByName(site, 'portal_membership')
278
        users = mtool.searchForMembers(roles=self.roles)
279
        items = [(item.getProperty('fullname'), item.getId())
280
                 for item in users]
281
        items.sort(lambda x, y: cmp(x[0].lower(), y[0].lower()))
282
        items = [SimpleTerm(i[1], i[1], i[0]) for i in items]
283
        return SimpleVocabulary(items)
284
285
286
UserVocabularyFactory = UserVocabulary()
287
288
ClientVocabularyFactory = ClientVocabulary()
289
290
291
class ClientContactVocabulary(object):
292
    """ Present Client Contacts
293
294
    >>> from zope.component import queryUtility
295
296
    >>> portal = layer['portal']
297
    >>> name = 'bika.lims.vocabularies.ClientContacts'
298
    >>> util = queryUtility(IVocabularyFactory, name)
299
300
    >>> from plone.app.testing import TEST_USER_NAME
301
    >>> from plone.app.testing import TEST_USER_ID
302
    >>> from plone.app.testing import setRoles
303
    >>> from plone.app.testing import login
304
    >>> login(portal, TEST_USER_NAME)
305
    >>> setRoles(portal, TEST_USER_ID, ['Manager',])
306
307
    >>> portal.clients.invokeFactory('Client', id='client1')
308
    'client1'
309
    >>> client1 = portal.clients.client1
310
    >>> client1.processForm()
311
    >>> client1.invokeFactory('Contact', id='contact1')
312
    'contact1'
313
    >>> contact1 = client1.contact1
314
    >>> contact1.processForm()
315
    >>> contact1.edit(Firstname='Contact', Surname='One')
316
    >>> contact1.reindexObject()
317
318
    >>> source = util(portal)
319
    >>> source
320
    <zope.schema.vocabulary.SimpleVocabulary object at ...>
321
322
    >>> 'Contact One' in source.by_value
323
    True
324
    """
325
    implements(IVocabularyFactory)
326
327
    def __call__(self, context):
328
        site = getSite()
329
        request = aq_get(site, 'REQUEST', None)
330
        items = []
331
        for client in site.clients.objectValues('Client'):
332
            objects = list(client.objectValues('Contact'))
333
            objects.sort(lambda x, y: cmp(x.getFullname().lower(),
334
                                          y.getFullname().lower()))
335
            xitems = [(to_utf8(item.getFullname()), item.getFullname())
336
                      for item in objects]
337
            xitems = [SimpleTerm(i[1], i[1], i[0]) for i in xitems]
338
            items += xitems
339
        return SimpleVocabulary(items)
340
341
342
ClientContactVocabularyFactory = ClientContactVocabulary()
343
344
345
class AnalystVocabulary(UserVocabulary):
346
    def __init__(self):
347
        UserVocabulary.__init__(self, roles=['Analyst', ])
348
349
350
AnalystVocabularyFactory = AnalystVocabulary()
351
352
353
class AnalysisRequestWorkflowStateVocabulary(object):
354
    """Vocabulary factory for workflow states.
355
356
        >>> from zope.component import queryUtility
357
358
        >>> portal = layer['portal']
359
360
        >>> name = 'bika.lims.vocabularies.AnalysisRequestWorkflowStates'
361
        >>> util = queryUtility(IVocabularyFactory, name)
362
363
        >>> tool = getToolByName(portal, "portal_workflow")
364
365
        >>> states = util(portal)
366
        >>> states
367
        <zope.schema.vocabulary.SimpleVocabulary object at ...>
368
369
        >>> pub = states.by_token['published']
370
        >>> pub.title, pub.token, pub.value
371
        (u'Published', 'published', 'published')
372
    """
373
    implements(IVocabularyFactory)
374
375
    def __call__(self, context):
376
        portal = getSite()
377
        wftool = getToolByName(portal, 'portal_workflow', None)
378
        if wftool is None:
379
            return SimpleVocabulary([])
380
381
        # XXX This is evil. A vocabulary shouldn't be request specific.
382
        # The sorting should go into a separate widget.
383
384
        # we get REQUEST from wftool because context may be an adapter
385
        request = aq_get(wftool, 'REQUEST', None)
386
387
        wf = wftool.getWorkflowById('bika_ar_workflow')
388
        items = wftool.listWFStatesByTitle(filter_similar=True)
389
        items_dict = dict([(i[1], t(i[0])) for i in items])
390
        items_list = [(k, v) for k, v in items_dict.items()]
391
        items_list.sort(lambda x, y: cmp(x[1], y[1]))
392
        terms = [SimpleTerm(k, title=u'%s' % v) for k, v in items_list]
393
        return SimpleVocabulary(terms)
394
395
396
AnalysisRequestWorkflowStateVocabularyFactory = \
397
    AnalysisRequestWorkflowStateVocabulary()
398
399
400
def getTemplates(bikalims_path, restype, filter_by_type=False):
401
    """ Returns an array with the Templates available in the Bika LIMS path
402
        specified plus the templates from the resources directory specified and
403
        available on each additional product (restype).
404
405
        Each array item is a dictionary with the following structure:
406
            {'id': <template_id>,
407
             'title': <template_title>}
408
409
        If the template lives outside the bika.lims add-on, both the template_id
410
        and template_title include a prefix that matches with the add-on
411
        identifier. template_title is the same name as the id, but with
412
        whitespaces and without extension.
413
414
        As an example, for a template from the my.product add-on located in
415
        <restype> resource dir, and with a filename "My_cool_report.pt", the
416
        dictionary will look like:
417
            {'id': 'my.product:My_cool_report.pt',
418
             'title': 'my.product: My cool report'}
419
420
        :param bikalims_path: the path inside bika lims to find the stickers.
421
        :type bikalims_path: an string as a path
422
        :param restype: the resource directory type to search for inside
423
            an addon.
424
        :type restype: string
425
        :param filter_by_type: the folder name to look for inside the
426
        templates path
427
        :type filter_by_type: string/boolean
428
    """
429
    # Retrieve the templates from bika.lims add-on
430
    templates_dir = resource_filename("bika.lims", bikalims_path)
431
    tempath = os.path.join(templates_dir, '*.pt')
432
    templates = [os.path.split(x)[-1] for x in glob.glob(tempath)]
433
434
    # Retrieve the templates from other add-ons
435
    for templates_resource in iterDirectoriesOfType(restype):
436
        prefix = templates_resource.__name__
437
        if prefix == 'bika.lims':
438
            continue
439
        directory = templates_resource.directory
440
        # Only use the directory asked in 'filter_by_type'
441
        if filter_by_type:
442
            directory = directory + '/' + filter_by_type
443
        if os.path.isdir(directory):
444
            dirlist = os.listdir(directory)
445
            exts = ['{0}:{1}'.format(prefix, tpl) for tpl in dirlist if
446
                    tpl.endswith('.pt')]
447
            templates.extend(exts)
448
449
    out = []
450
    templates.sort()
451
    for template in templates:
452
        title = template[:-3]
453
        title = title.replace('_', ' ')
454
        title = title.replace(':', ': ')
455
        out.append({'id': template,
456
                    'title': title})
457
458
    return out
459
460
461
def getStickerTemplates(filter_by_type=False):
462
    """ Returns an array with the sticker templates available. Retrieves the
463
        TAL templates saved in templates/stickers folder.
464
465
        Each array item is a dictionary with the following structure:
466
            {'id': <template_id>,
467
             'title': <template_title>}
468
469
        If the template lives outside the bika.lims add-on, both the template_id
470
        and template_title include a prefix that matches with the add-on
471
        identifier. template_title is the same name as the id, but with
472
        whitespaces and without extension.
473
474
        As an example, for a template from the my.product add-on located in
475
        templates/stickers, and with a filename "EAN128_default_small.pt", the
476
        dictionary will look like:
477
            {'id': 'my.product:EAN128_default_small.pt',
478
             'title': 'my.product: EAN128 default small'}
479
        If filter by type is given in the request, only the templates under
480
        the path with the type name will be rendered given as vocabulary.
481
        Example: If filter_by_type=='worksheet', only *.tp files under a
482
        folder with this name will be displayed.
483
484
        :param filter_by_type:
485
        :type filter_by_type: string/bool.
486
        :returns: an array with the sticker templates available
487
    """
488
    # Retrieve the templates from bika.lims add-on
489
    # resdirname
490
    resdirname = 'stickers'
491
    if filter_by_type:
492
        bikalims_path = os.path.join(
493
            "browser", "templates", resdirname, filter_by_type)
494
    else:
495
        bikalims_path = os.path.join("browser", "templates", resdirname)
496
    # getTemplates needs two parameters, the first one is the bikalims path
497
    # where the stickers will be found. The second one is the resource
498
    # directory type. This allows us to filter stickers by the type we want.
499
    return getTemplates(bikalims_path, resdirname, filter_by_type)
500
501
502
class StickerTemplatesVocabulary(object):
503
    """ Locate all sticker templates
504
    """
505
    implements(IVocabularyFactory)
506
507
    def __call__(self, context, filter_by_type=False):
508
        out = [SimpleTerm(x['id'], x['id'], x['title']) for x in
509
               getStickerTemplates(filter_by_type=filter_by_type)]
510
        return SimpleVocabulary(out)
511