Passed
Push — master ( 61fb2b...e9b448 )
by Ramon
05:21
created

bika.lims.api.get_user_client()   B

Complexity

Conditions 7

Size

Total Lines 24
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 24
rs 8
c 0
b 0
f 0
cc 7
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import Missing
9
import re
10
11
from Acquisition import aq_base
12
from AccessControl.PermissionRole import rolesForPermissionOn
13
14
from datetime import datetime
15
from DateTime import DateTime
16
17
from Products.CMFPlone.utils import base_hasattr, safe_unicode
18
from Products.CMFCore.interfaces import ISiteRoot
19
from Products.CMFCore.interfaces import IFolderish
20
from Products.Archetypes.BaseObject import BaseObject
21
from Products.ZCatalog.interfaces import ICatalogBrain
22
from Products.CMFPlone.utils import _createObjectByType
23
from Products.CMFCore.WorkflowCore import WorkflowException
24
from Products.CMFCore.utils import getToolByName
25
from bika.lims.interfaces import IClient, IContact, ILabContact
26
27
from zope import globalrequest
28
from zope.event import notify
29
from zope.interface import implements
30
from zope.component import getUtility
31
from zope.component import getMultiAdapter
32
from zope.component.interfaces import IFactory
33
from zope.component.interfaces import ObjectEvent
34
from zope.component.interfaces import IObjectEvent
35
from zope.lifecycleevent import modified
36
from zope.lifecycleevent import ObjectCreatedEvent
37
from zope.security.interfaces import Unauthorized
38
39
from plone import api as ploneapi
40
from plone.memoize.volatile import DontCache
41
from plone.api.exc import InvalidParameterError
42
from plone.dexterity.interfaces import IDexterityContent
43
from plone.app.layout.viewlets.content import ContentHistoryView
44
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
45
from plone.i18n.normalizer.interfaces import IIDNormalizer
46
47
from bika.lims import logger
48
49
"""Bika LIMS Framework API
50
51
Please see bika.lims/docs/API.rst for documentation.
52
53
Architecural Notes:
54
55
Please add only functions that do a single thing for a single object.
56
57
Good: `def get_foo(brain_or_object)`
58
Bad:  `def get_foos(list_of_brain_objects)`
59
60
Why?
61
62
Because it makes things more complex. You can always use a pattern like this to
63
achieve the same::
64
65
    >>> foos = map(get_foo, list_of_brain_objects)
66
67
Please add for all of your functions a descriptive test in docs/API.rst.
68
69
Thanks.
70
"""
71
72
_marker = object()
73
74
UID_RX = re.compile("[a-z0-9]{32}$")
75
76
77
class BikaLIMSError(Exception):
78
    """Base exception class for bika.lims errors."""
79
80
81
class IBikaTransitionEvent(IObjectEvent):
82
    """Bika WF transition event interface"""
83
84
85
class IBikaBeforeTransitionEvent(IBikaTransitionEvent):
86
    """Fired before the transition is invoked"""
87
88
89
class IBikaAfterTransitionEvent(IBikaTransitionEvent):
90
    """Fired after the transition done"""
91
92
93
class IBikaTransitionFailedEvent(IBikaTransitionEvent):
94
    """Fired if the transition failed"""
95
96
97
class BikaTransitionEvent(ObjectEvent):
98
    """Bika WF transition event"""
99
    def __init__(self, obj, transition, exception=None):
100
        ObjectEvent.__init__(self, obj)
101
        self.obj = obj
102
        self.transition = transition
103
        self.exception = exception
104
105
106
class BikaBeforeTransitionEvent(BikaTransitionEvent):
107
    implements(IBikaBeforeTransitionEvent)
108
109
110
class BikaAfterTransitionEvent(BikaTransitionEvent):
111
    implements(IBikaAfterTransitionEvent)
112
113
114
class BikaTransitionFailedEvent(BikaTransitionEvent):
115
    implements(IBikaTransitionFailedEvent)
116
117
118
def get_portal():
119
    """Get the portal object
120
121
    :returns: Portal object
122
    """
123
    return ploneapi.portal.getSite()
124
125
126
def get_bika_setup():
127
    """Fetch the `bika_setup` folder.
128
    """
129
    portal = get_portal()
130
    return portal.get("bika_setup")
131
132
133
def create(container, portal_type, *args, **kwargs):
134
    """Creates an object in Bika LIMS
135
136
    This code uses most of the parts from the TypesTool
137
    see: `Products.CMFCore.TypesTool._constructInstance`
138
139
    :param container: container
140
    :type container: ATContentType/DexterityContentType/CatalogBrain
141
    :param portal_type: The portal type to create, e.g. "Client"
142
    :type portal_type: string
143
    :param title: The title for the new content object
144
    :type title: string
145
    :returns: The new created object
146
    """
147
    from bika.lims.utils import tmpID
148
    if kwargs.get("title") is None:
149
        kwargs["title"] = "New {}".format(portal_type)
150
151
    # generate a temporary ID
152
    tmp_id = tmpID()
153
154
    # get the fti
155
    types_tool = get_tool("portal_types")
156
    fti = types_tool.getTypeInfo(portal_type)
157
158
    if fti.product:
159
        obj = _createObjectByType(portal_type, container, tmp_id)
160
    else:
161
        # newstyle factory
162
        factory = getUtility(IFactory, fti.factory)
163
        obj = factory(tmp_id, *args, **kwargs)
164
        if hasattr(obj, '_setPortalTypeName'):
165
            obj._setPortalTypeName(fti.getId())
166
        notify(ObjectCreatedEvent(obj))
167
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and ContainerModifiedEvent
168
        container._setObject(tmp_id, obj)
169
        # we get the object here with the current object id, as it might be renamed
170
        # already by an event handler
171
        obj = container._getOb(obj.getId())
172
173
    # handle AT Content
174
    if is_at_content(obj):
175
        obj.processForm()
176
177
    # Edit after processForm; processForm does AT unmarkCreationFlag.
178
    obj.edit(**kwargs)
179
180
    # explicit notification
181
    modified(obj)
182
    return obj
183
184
185
def get_tool(name, context=None, default=_marker):
186
    """Get a portal tool by name
187
188
    :param name: The name of the tool, e.g. `portal_catalog`
189
    :type name: string
190
    :param context: A portal object
191
    :type context: ATContentType/DexterityContentType/CatalogBrain
192
    :returns: Portal Tool
193
    """
194
195
    # Try first with the context
196
    if context is not None:
197
        try:
198
            context = get_object(context)
199
            return getToolByName(context, name)
200
        except (BikaLIMSError, AttributeError) as e:
201
            # https://github.com/senaite/bika.lims/issues/396
202
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
203
                        "-> falling back to plone.api.portal.get_tool('{}')"
204
                        .format(repr(context), name, repr(e), name))
205
            return get_tool(name, default=default)
206
207
    # Try with the plone api
208
    try:
209
        return ploneapi.portal.get_tool(name)
210
    except InvalidParameterError:
211
        if default is not _marker:
212
            return default
213
        fail("No tool named '%s' found." % name)
214
215
216
def fail(msg=None):
217
    """Bika LIMS Error
218
    """
219
    if msg is None:
220
        msg = "Reason not given."
221
    raise BikaLIMSError("{}".format(msg))
222
223
224
def is_object(brain_or_object):
225
    """Check if the passed in object is a supported portal content object
226
227
    :param brain_or_object: A single catalog brain or content object
228
    :type brain_or_object: Portal Object
229
    :returns: True if the passed in object is a valid portal content
230
    """
231
    if is_portal(brain_or_object):
232
        return True
233
    if is_at_content(brain_or_object):
234
        return True
235
    if is_dexterity_content(brain_or_object):
236
        return True
237
    if is_brain(brain_or_object):
238
        return True
239
    return False
240
241
242
def get_object(brain_or_object):
243
    """Get the full content object
244
245
    :param brain_or_object: A single catalog brain or content object
246
    :type brain_or_object: PortalObject/ATContentType/DexterityContentType
247
    /CatalogBrain
248
    :returns: The full object
249
    """
250
    if not is_object(brain_or_object):
251
        fail("{} is not supported.".format(repr(brain_or_object)))
252
    if is_brain(brain_or_object):
253
        return brain_or_object.getObject()
254
    return brain_or_object
255
256
257
def is_portal(brain_or_object):
258
    """Checks if the passed in object is the portal root object
259
260
    :param brain_or_object: A single catalog brain or content object
261
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
262
    :returns: True if the object is the portal root object
263
    :rtype: bool
264
    """
265
    return ISiteRoot.providedBy(brain_or_object)
266
267
268
def is_brain(brain_or_object):
269
    """Checks if the passed in object is a portal catalog brain
270
271
    :param brain_or_object: A single catalog brain or content object
272
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
273
    :returns: True if the object is a catalog brain
274
    :rtype: bool
275
    """
276
    return ICatalogBrain.providedBy(brain_or_object)
277
278
279
def is_dexterity_content(brain_or_object):
280
    """Checks if the passed in object is a dexterity content type
281
282
    :param brain_or_object: A single catalog brain or content object
283
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
284
    :returns: True if the object is a dexterity content type
285
    :rtype: bool
286
    """
287
    return IDexterityContent.providedBy(brain_or_object)
288
289
290
def is_at_content(brain_or_object):
291
    """Checks if the passed in object is an AT content type
292
293
    :param brain_or_object: A single catalog brain or content object
294
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
295
    :returns: True if the object is an AT content type
296
    :rtype: bool
297
    """
298
    return isinstance(brain_or_object, BaseObject)
299
300
301
def is_folderish(brain_or_object):
302
    """Checks if the passed in object is folderish
303
304
    :param brain_or_object: A single catalog brain or content object
305
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
306
    :returns: True if the object is folderish
307
    :rtype: bool
308
    """
309
    if hasattr(brain_or_object, "is_folderish"):
310
        if callable(brain_or_object.is_folderish):
311
            return brain_or_object.is_folderish()
312
        return brain_or_object.is_folderish
313
    return IFolderish.providedBy(get_object(brain_or_object))
314
315
316
def get_portal_type(brain_or_object):
317
    """Get the portal type for this object
318
319
    :param brain_or_object: A single catalog brain or content object
320
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
321
    :returns: Portal type
322
    :rtype: string
323
    """
324
    if not is_object(brain_or_object):
325
        fail("{} is not supported.".format(repr(brain_or_object)))
326
    return brain_or_object.portal_type
327
328
329
def get_schema(brain_or_object):
330
    """Get the schema of the content
331
332
    :param brain_or_object: A single catalog brain or content object
333
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
334
    :returns: Schema object
335
    """
336
    obj = get_object(brain_or_object)
337
    if is_portal(obj):
338
        fail("get_schema can't return schema of portal root")
339
    if is_dexterity_content(obj):
340
        pt = get_tool("portal_types")
341
        fti = pt.getTypeInfo(obj.portal_type)
342
        return fti.lookupSchema()
343
    if is_at_content(obj):
344
        return obj.Schema()
345
    fail("{} has no Schema.".format(brain_or_object))
346
347
348
def get_fields(brain_or_object):
349
    """Get the list of fields from the object
350
351
    :param brain_or_object: A single catalog brain or content object
352
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
353
    :returns: List of fields
354
    :rtype: list
355
    """
356
    obj = get_object(brain_or_object)
357
    schema = get_schema(obj)
358
    if is_dexterity_content(obj):
359
        names = schema.names()
360
        fields = map(lambda name: schema.get(name), names)
361
        return dict(zip(names, fields))
362
    return dict(zip(schema.keys(), schema.fields()))
363
364
365
def get_id(brain_or_object):
366
    """Get the Plone ID for this object
367
368
    :param brain_or_object: A single catalog brain or content object
369
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
370
    :returns: Plone ID
371
    :rtype: string
372
    """
373
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getId"):
374
        return brain_or_object.getId
375
    return get_object(brain_or_object).getId()
376
377
378
def get_title(brain_or_object):
379
    """Get the Title for this object
380
381
    :param brain_or_object: A single catalog brain or content object
382
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
383
    :returns: Title
384
    :rtype: string
385
    """
386
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
387
        return brain_or_object.Title
388
    return get_object(brain_or_object).Title()
389
390
391
def get_description(brain_or_object):
392
    """Get the Title for this object
393
394
    :param brain_or_object: A single catalog brain or content object
395
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
396
    :returns: Title
397
    :rtype: string
398
    """
399
    if is_brain(brain_or_object) \
400
            and base_hasattr(brain_or_object, "Description"):
401
        return brain_or_object.Description
402
    return get_object(brain_or_object).Description()
403
404
405
def get_uid(brain_or_object):
406
    """Get the Plone UID for this object
407
408
    :param brain_or_object: A single catalog brain or content object
409
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
410
    :returns: Plone UID
411
    :rtype: string
412
    """
413
    if is_portal(brain_or_object):
414
        return '0'
415
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
416
        return brain_or_object.UID
417
    return get_object(brain_or_object).UID()
418
419
420
def get_url(brain_or_object):
421
    """Get the absolute URL for this object
422
423
    :param brain_or_object: A single catalog brain or content object
424
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
425
    :returns: Absolute URL
426
    :rtype: string
427
    """
428
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
429
        return brain_or_object.getURL()
430
    return get_object(brain_or_object).absolute_url()
431
432
433
def get_icon(brain_or_object, html_tag=True):
434
    """Get the icon of the content object
435
436
    :param brain_or_object: A single catalog brain or content object
437
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
438
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
439
    :type html_tag: bool
440
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
441
    :rtype: string
442
    """
443
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
444
    # work for Bika Contents coming from other catalogs than the
445
    # `portal_catalog`
446
    portal_types = get_tool("portal_types")
447
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
448
    icon = fti.getIcon()
449
    if not icon:
450
        return ""
451
    url = "%s/%s" % (get_url(get_portal()), icon)
452
    if not html_tag:
453
        return url
454
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
455
        url=url, title=get_title(brain_or_object))
456
    return tag
457
458
459
def get_object_by_uid(uid, default=_marker):
460
    """Find an object by a given UID
461
462
    :param uid: The UID of the object to find
463
    :type uid: string
464
    :returns: Found Object or None
465
    """
466
467
    # nothing to do here
468
    if not uid:
469
        if default is not _marker:
470
            return default
471
        fail("get_object_by_uid requires UID as first argument; got {} instead"
472
             .format(uid))
473
474
    # we defined the portal object UID to be '0'::
475
    if uid == '0':
476
        return get_portal()
477
478
    # we try to find the object with both catalogs
479
    pc = get_portal_catalog()
480
    uc = get_tool("uid_catalog")
481
482
    # try to find the object with the reference catalog first
483
    brains = uc(UID=uid)
484
    if brains:
485
        return brains[0].getObject()
486
487
    # try to find the object with the portal catalog
488
    res = pc(UID=uid)
489
    if not res:
490
        if default is not _marker:
491
            return default
492
        fail("No object found for UID {}".format(uid))
493
494
    return get_object(res[0])
495
496
497
def get_object_by_path(path, default=_marker):
498
    """Find an object by a given physical path or absolute_url
499
500
    :param path: The physical path of the object to find
501
    :type path: string
502
    :returns: Found Object or None
503
    """
504
505
    # nothing to do here
506
    if not path:
507
        if default is not _marker:
508
            return default
509
        fail("get_object_by_path first argument must be a path; {} received"
510
             .format(path))
511
512
    pc = get_portal_catalog()
513
    portal = get_portal()
514
    portal_path = get_path(portal)
515
    portal_url = get_url(portal)
516
517
    # ensure we have a physical path
518
    if path.startswith(portal_url):
519
        request = get_request()
520
        path = "/".join(request.physicalPathFromURL(path))
521
522
    if not path.startswith(portal_path):
523
        if default is not _marker:
524
            return default
525
        fail("Not a physical path inside the portal.")
526
527
    if path == portal_path:
528
        return portal
529
530
    res = pc(path=dict(query=path, depth=0))
531
    if not res:
532
        if default is not _marker:
533
            return default
534
        fail("Object at path '{}' not found".format(path))
535
    return get_object(res[0])
536
537
538
def get_path(brain_or_object):
539
    """Calculate the physical path of this object
540
541
    :param brain_or_object: A single catalog brain or content object
542
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
543
    :returns: Physical path of the object
544
    :rtype: string
545
    """
546
    if is_brain(brain_or_object):
547
        return brain_or_object.getPath()
548
    return "/".join(get_object(brain_or_object).getPhysicalPath())
549
550
551
def get_parent_path(brain_or_object):
552
    """Calculate the physical parent path of this object
553
554
    :param brain_or_object: A single catalog brain or content object
555
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
556
    :returns: Physical path of the parent object
557
    :rtype: string
558
    """
559
    if is_portal(brain_or_object):
560
        return get_path(get_portal())
561
    if is_brain(brain_or_object):
562
        path = get_path(brain_or_object)
563
        return path.rpartition("/")[0]
564
    return get_path(get_object(brain_or_object).aq_parent)
565
566
567
def get_parent(brain_or_object, catalog_search=False):
568
    """Locate the parent object of the content/catalog brain
569
570
    The `catalog_search` switch uses the `portal_catalog` to do a search return
571
    a brain instead of the full parent object. However, if the search returned
572
    no results, it falls back to return the full parent object.
573
574
    :param brain_or_object: A single catalog brain or content object
575
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
576
    :param catalog_search: Use a catalog query to find the parent object
577
    :type catalog_search: bool
578
    :returns: parent object
579
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
580
    """
581
582
    if is_portal(brain_or_object):
583
        return get_portal()
584
585
    # Do a catalog search and return the brain
586
    if catalog_search:
587
        parent_path = get_parent_path(brain_or_object)
588
589
        # parent is the portal object
590
        if parent_path == get_path(get_portal()):
591
            return get_portal()
592
593
        # get the catalog tool
594
        pc = get_portal_catalog()
595
596
        # query for the parent path
597
        results = pc(path={
598
            "query": parent_path,
599
            "depth": 0})
600
601
        # No results fallback: return the parent object
602
        if not results:
603
            return get_object(brain_or_object).aq_parent
604
605
        # return the brain
606
        return results[0]
607
608
    return get_object(brain_or_object).aq_parent
609
610
611
def search(query, catalog=_marker):
612
    """Search for objects.
613
614
    :param query: A suitable search query.
615
    :type query: dict
616
    :param catalog: A single catalog id or a list of catalog ids
617
    :type catalog: str/list
618
    :returns: Search results
619
    :rtype: List of ZCatalog brains
620
    """
621
622
    # query needs to be a dictionary
623
    if not isinstance(query, dict):
624
        fail("Catalog query needs to be a dictionary")
625
626
    # Portal types to query
627
    portal_types = query.get("portal_type", [])
628
    # We want the portal_type as a list
629
    if not isinstance(portal_types, (tuple, list)):
630
        portal_types = [portal_types]
631
632
    # The catalogs used for the query
633
    catalogs = []
634
635
    # The user did **not** specify a catalog
636
    if catalog is _marker:
637
        # Find the registered catalogs for the queried portal types
638
        for portal_type in portal_types:
639
            # Just get the first registered/default catalog
640
            catalogs.append(get_catalogs_for(
641
                portal_type, default="portal_catalog")[0])
642
    else:
643
        # User defined catalogs
644
        if isinstance(catalog, (list, tuple)):
645
            catalogs.extend(map(get_tool, catalog))
646
        else:
647
            catalogs.append(get_tool(catalog))
648
649
    # Cleanup: Avoid duplicate catalogs
650
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
651
652
    # We only support **single** catalog queries
653
    if len(catalogs) > 1:
654
        fail("Multi Catalog Queries are not supported, please specify a catalog.")
655
656
    return catalogs[0](query)
657
658
659
def safe_getattr(brain_or_object, attr, default=_marker):
660
    """Return the attribute value
661
662
    :param brain_or_object: A single catalog brain or content object
663
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
664
    :param attr: Attribute name
665
    :type attr: str
666
    :returns: Attribute value
667
    :rtype: obj
668
    """
669
    try:
670
        value = getattr(brain_or_object, attr, _marker)
671
        if value is _marker:
672
            if default is not _marker:
673
                return default
674
            fail("Attribute '{}' not found.".format(attr))
675
        if callable(value):
676
            return value()
677
        return value
678
    except Unauthorized:
679
        if default is not _marker:
680
            return default
681
        fail("You are not authorized to access '{}' of '{}'.".format(
682
            attr, repr(brain_or_object)))
683
684
685
def get_portal_catalog():
686
    """Get the portal catalog tool
687
688
    :returns: Portal Catalog Tool
689
    """
690
    return get_tool("portal_catalog")
691
692
693
def get_review_history(brain_or_object, rev=True):
694
    """Get the review history for the given brain or context.
695
696
    :param brain_or_object: A single catalog brain or content object
697
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
698
    :returns: Workflow history
699
    :rtype: [{}, ...]
700
    """
701
    obj = get_object(brain_or_object)
702
    review_history = []
703
    try:
704
        workflow = get_tool("portal_workflow")
705
        review_history = workflow.getInfoFor(obj, 'review_history')
706
    except WorkflowException as e:
707
        message = str(e)
708
        logger.error("Cannot retrieve review_history on {}: {}".format(
709
            obj, message))
710
    if not isinstance(review_history, (list, tuple)):
711
        logger.error("get_review_history: expected list, recieved {}".format(
712
            review_history))
713
        review_history = []
714
    if rev is True:
715
        review_history.reverse()
716
    return review_history
717
718
719
def get_revision_history(brain_or_object):
720
    """Get the revision history for the given brain or context.
721
722
    :param brain_or_object: A single catalog brain or content object
723
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
724
    :returns: Workflow history
725
    :rtype: obj
726
    """
727
    obj = get_object(brain_or_object)
728
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
729
    return chv.fullHistory()
730
731
732
def get_workflows_for(brain_or_object):
733
    """Get the assigned workflows for the given brain or context.
734
735
    Note: This function supports also the portal_type as parameter.
736
737
    :param brain_or_object: A single catalog brain or content object
738
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
739
    :returns: Assigned Workflows
740
    :rtype: tuple
741
    """
742
    workflow = ploneapi.portal.get_tool("portal_workflow")
743
    if isinstance(brain_or_object, basestring):
744
        return workflow.getChainFor(brain_or_object)
745
    obj = get_object(brain_or_object)
746
    return workflow.getChainFor(obj)
747
748
749
def get_workflow_status_of(brain_or_object, state_var="review_state"):
750
    """Get the current workflow status of the given brain or context.
751
752
    :param brain_or_object: A single catalog brain or content object
753
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
754
    :param state_var: The name of the state variable
755
    :type state_var: string
756
    :returns: Status
757
    :rtype: str
758
    """
759
    workflow = get_tool("portal_workflow")
760
    obj = get_object(brain_or_object)
761
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
762
763
764
def get_creation_date(brain_or_object):
765
    """Get the creation date of the brain or object
766
767
    :param brain_or_object: A single catalog brain or content object
768
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
769
    :returns: Creation date
770
    :rtype: DateTime
771
    """
772
    created = getattr(brain_or_object, "created", None)
773
    if created is None:
774
        fail("Object {} has no creation date ".format(
775
             repr(brain_or_object)))
776
    if callable(created):
777
        return created()
778
    return created
779
780
781
def get_modification_date(brain_or_object):
782
    """Get the modification date of the brain or object
783
784
    :param brain_or_object: A single catalog brain or content object
785
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
786
    :returns: Modification date
787
    :rtype: DateTime
788
    """
789
    modified = getattr(brain_or_object, "modified", None)
790
    if modified is None:
791
        fail("Object {} has no modification date ".format(
792
             repr(brain_or_object)))
793
    if callable(modified):
794
        return modified()
795
    return modified
796
797
798
def get_review_status(brain_or_object):
799
    """Get the `review_state` of an object
800
801
    :param brain_or_object: A single catalog brain or content object
802
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
803
    :returns: Value of the review_status variable
804
    :rtype: String
805
    """
806
    if is_brain(brain_or_object):
807
        return brain_or_object.review_state
808
    return get_workflow_status_of(brain_or_object, state_var="review_state")
809
810
811 View Code Duplication
def get_cancellation_status(brain_or_object, default="active"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
812
    """Get the `cancellation_state` of an object
813
814
    :param brain_or_object: A single catalog brain or content object
815
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
816
    :returns: Value of the review_status variable
817
    :rtype: String
818
    """
819
820
    if is_brain(brain_or_object):
821
        state = getattr(brain_or_object, "cancellation_state", None)
822
        if state is not None:
823
            return state
824
825
    workflows = get_workflows_for(brain_or_object)
826
    if "bika_cancellation_workflow" in workflows:
827
        return get_workflow_status_of(brain_or_object, "cancellation_state")
828
829
    state = get_workflow_status_of(brain_or_object)
830
    if state not in ("active", "inactive"):
831
        return default
832
    return state
833
834
835 View Code Duplication
def get_inactive_status(brain_or_object, default="active"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
836
    """Get the `cancellation_state` of an objct
837
838
    :param brain_or_object: A single catalog brain or content object
839
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
840
    :returns: Value of the review_status variable
841
    :rtype: String
842
    """
843
844
    if is_brain(brain_or_object):
845
        state = getattr(brain_or_object, "inactive_state", None)
846
        if state is not None:
847
            return state
848
849
    workflows = get_workflows_for(brain_or_object)
850
    if "bika_inactive_workflow" in workflows:
851
        return get_workflow_status_of(brain_or_object, "inactive_state")
852
853
    state = get_workflow_status_of(brain_or_object)
854
    if state not in ("active", "inactive"):
855
        return default
856
    return state
857
858
859
def is_active(brain_or_object):
860
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
861
862
    :param brain_or_object: A single catalog brain or content object
863
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
864
    :returns: False if the object is in the state 'inactive' or 'cancelled'
865
    :rtype: bool
866
    """
867
    if get_inactive_status(brain_or_object) == "inactive":
868
        return False
869
    if get_cancellation_status(brain_or_object) == "cancelled":
870
        return False
871
    return True
872
873
874
def get_catalogs_for(brain_or_object, default="portal_catalog"):
875
    """Get all registered catalogs for the given portal_type, catalog brain or
876
    content object
877
878
    :param brain_or_object: The portal_type, a catalog brain or content object
879
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
880
    :returns: List of supported catalogs
881
    :rtype: list
882
    """
883
    archetype_tool = get_tool("archetype_tool", None)
884
    if not archetype_tool:
885
        # return the default catalog
886
        return [get_tool(default)]
887
888
    catalogs = []
889
890
    # get the registered catalogs for portal_type
891
    if is_object(brain_or_object):
892
        catalogs = archetype_tool.getCatalogsByType(
893
            get_portal_type(brain_or_object))
894
    if isinstance(brain_or_object, basestring):
895
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
896
897
    if not catalogs:
898
        return [get_tool(default)]
899
    return catalogs
900
901
902
def get_transitions_for(brain_or_object):
903
    """List available workflow transitions for all workflows
904
905
    :param brain_or_object: A single catalog brain or content object
906
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
907
    :returns: All possible available and allowed transitions
908
    :rtype: list[dict]
909
    """
910
    workflow = get_tool('portal_workflow')
911
    transitions = []
912
    instance = get_object(brain_or_object)
913
    for wfid in get_workflows_for(brain_or_object):
914
        wf = workflow[wfid]
915
        tlist = wf.getTransitionsFor(instance)
916
        transitions.extend([t for t in tlist if t not in transitions])
917
    return transitions
918
919
920
def do_transition_for(brain_or_object, transition):
921
    """Performs a workflow transition for the passed in object.
922
923
    :param brain_or_object: A single catalog brain or content object
924
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
925
    :returns: The object where the transtion was performed
926
    """
927
    if not isinstance(transition, basestring):
928
        fail("Transition type needs to be string, got '%s'" % type(transition))
929
    obj = get_object(brain_or_object)
930
    # notify the BeforeTransitionEvent
931
    notify(BikaBeforeTransitionEvent(obj, transition))
932
    try:
933
        ploneapi.content.transition(obj, transition)
934
    except ploneapi.exc.InvalidParameterError as e:
935
        # notify the TransitionFailedEvent
936
        notify(BikaTransitionFailedEvent(obj, transition, exception=e))
937
        fail("Failed to perform transition '{}' on {}: {}".format(
938
             transition, obj, str(e)))
939
    # notify the AfterTransitionEvent
940
    notify(BikaAfterTransitionEvent(obj, transition))
941
    return obj
942
943
944
def get_roles_for_permission(permission, brain_or_object):
945
    """Get a list of granted roles for the given permission on the object.
946
947
    :param brain_or_object: A single catalog brain or content object
948
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
949
    :returns: Roles for the given Permission
950
    :rtype: list
951
    """
952
    obj = get_object(brain_or_object)
953
    allowed = set(rolesForPermissionOn(permission, obj))
954
    return sorted(allowed)
955
956
957
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
958
    """Checks if the passed in object is versionable.
959
960
    :param brain_or_object: A single catalog brain or content object
961
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
962
    :returns: True if the object is versionable
963
    :rtype: bool
964
    """
965
    pr = get_tool("portal_repository")
966
    obj = get_object(brain_or_object)
967
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
968
        and pr.isVersionable(obj)
969
970
971
def get_version(brain_or_object):
972
    """Get the version of the current object
973
974
    :param brain_or_object: A single catalog brain or content object
975
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
976
    :returns: The current version of the object, or None if not available
977
    :rtype: int or None
978
    """
979
    obj = get_object(brain_or_object)
980
    if not is_versionable(obj):
981
        return None
982
    return getattr(aq_base(obj), "version_id", 0)
983
984
985
def get_view(name, context=None, request=None):
986
    """Get the view by name
987
988
    :param name: The name of the view
989
    :type name: str
990
    :param context: The context to query the view
991
    :type context: ATContentType/DexterityContentType/CatalogBrain
992
    :param request: The request to query the view
993
    :type request: HTTPRequest object
994
    :returns: HTTP Request
995
    :rtype: Products.Five.metaclass View object
996
    """
997
    context = context or get_portal()
998
    request = request or get_request() or None
999
    return getMultiAdapter((get_object(context), request), name=name)
1000
1001
1002
def get_request():
1003
    """Get the global request object
1004
1005
    :returns: HTTP Request
1006
    :rtype: HTTPRequest object
1007
    """
1008
    return globalrequest.getRequest()
1009
1010
1011
def get_group(group_or_groupname):
1012
    """Return Plone Group
1013
1014
    :param group_or_groupname: Plone group or the name of the group
1015
    :type groupname:  GroupData/str
1016
    :returns: Plone GroupData
1017
    """
1018
    if not group_or_groupname:
1019
1020
        return None
1021
    if hasattr(group_or_groupname, "_getGroup"):
1022
        return group_or_groupname
1023
    gtool = get_tool("portal_groups")
1024
    return gtool.getGroupById(group_or_groupname)
1025
1026
1027
def get_user(user_or_username):
1028
    """Return Plone User
1029
1030
    :param user_or_username: Plone user or user id
1031
    :returns: Plone MemberData
1032
    """
1033
    if not user_or_username:
1034
        return None
1035
    if hasattr(user_or_username, "getUserId"):
1036
        return ploneapi.user.get(user_or_username.getUserId())
1037
    return ploneapi.user.get(userid=user_or_username)
1038
1039
1040
def get_user_properties(user_or_username):
1041
    """Return User Properties
1042
1043
    :param user_or_username: Plone group identifier
1044
    :returns: Plone MemberData
1045
    """
1046
    user = get_user(user_or_username)
1047
    if user is None:
1048
        return {}
1049
    if not callable(user.getUser):
1050
        return {}
1051
    out = {}
1052
    plone_user = user.getUser()
1053
    for sheet in plone_user.listPropertysheets():
1054
        ps = plone_user.getPropertysheet(sheet)
1055
        out.update(dict(ps.propertyItems()))
1056
    return out
1057
1058
1059
def get_users_by_roles(roles=None):
1060
    """Search Plone users by their roles
1061
1062
    :param roles: Plone role name or list of roles
1063
    :type roles:  list/str
1064
    :returns: List of Plone users having the role(s)
1065
    """
1066
    if not isinstance(roles, (tuple, list)):
1067
        roles = [roles]
1068
    mtool = get_tool("portal_membership")
1069
    return mtool.searchForMembers(roles=roles)
1070
1071
1072
def get_current_user():
1073
    """Returns the current logged in user
1074
1075
    :returns: Current User
1076
    """
1077
    return ploneapi.user.get_current()
1078
1079
1080
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1081
    """Returns the associated contact of a Plone user
1082
1083
    If the user passed in has no contact associated, return None.
1084
    The `contact_types` parameter filter the portal types for the search.
1085
1086
    :param: Plone user
1087
    :contact_types: List with the contact portal types to search
1088
    :returns: Contact associated to the Plone user or None
1089
    """
1090
    if not user:
1091
        return None
1092
1093
    query = {'portal_type': contact_types, 'getUsername': user.id}
1094
    brains = search(query, catalog='portal_catalog')
1095
    if not brains:
1096
        return None
1097
1098
    if len(brains) > 1:
1099
        # Oops, the user has multiple contacts assigned, return None
1100
        contacts = map(lambda c: c.Title, brains)
1101
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1102
        err_msg = err_msg.format(user.id, ','.join(contacts))
1103
        logger.error(err_msg)
1104
        return None
1105
1106
    return get_object(brains[0])
1107
1108
1109
def get_user_client(user_or_contact):
1110
    """Returns the client of the contact of a Plone user
1111
1112
    If the user passed in has no contact or does not belong to any client,
1113
    returns None.
1114
1115
    :param: Plone user or contact
1116
    :returns: Client the contact of the Plone user belongs to
1117
    """
1118
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1119
        # Lab contacts cannot belong to a client
1120
        return None
1121
1122
    if not IContact.providedBy(user_or_contact):
1123
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1124
        if IContact.providedBy(contact):
1125
            return get_user_client(contact)
1126
        return None
1127
1128
    client = get_parent(user_or_contact)
1129
    if client and IClient.providedBy(client):
1130
        return client
1131
1132
    return None
1133
1134
1135
def get_current_client():
1136
    """Returns the current client the current logged in user belongs to, if any
1137
1138
    :returns: Client the current logged in user belongs to or None
1139
    """
1140
    return get_user_client(get_current_user())
1141
1142
1143
def get_cache_key(brain_or_object):
1144
    """Generate a cache key for a common brain or object
1145
1146
    :param brain_or_object: A single catalog brain or content object
1147
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1148
    :returns: Cache Key
1149
    :rtype: str
1150
    """
1151
    key = [
1152
        get_portal_type(brain_or_object),
1153
        get_id(brain_or_object),
1154
        get_uid(brain_or_object),
1155
        # handle different domains gracefully
1156
        get_url(brain_or_object),
1157
        # Return the microsecond since the epoch in GMT
1158
        get_modification_date(brain_or_object).micros(),
1159
    ]
1160
    return "-".join(map(lambda x: str(x), key))
1161
1162
1163
def bika_cache_key_decorator(method, self, brain_or_object):
1164
    """Bika cache key decorator usable for
1165
1166
    :param brain_or_object: A single catalog brain or content object
1167
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1168
    :returns: Cache Key
1169
    :rtype: str
1170
    """
1171
    if brain_or_object is None:
1172
        raise DontCache
1173
    return get_cache_key(brain_or_object)
1174
1175
1176
def normalize_id(string):
1177
    """Normalize the id
1178
1179
    :param string: A string to normalize
1180
    :type string: str
1181
    :returns: Normalized ID
1182
    :rtype: str
1183
    """
1184
    if not isinstance(string, basestring):
1185
        fail("Type of argument must be string, found '{}'".format(type(string)))
1186
    # get the id nomalizer utility
1187
    normalizer = getUtility(IIDNormalizer).normalize
1188
    return normalizer(string)
1189
1190
1191
def normalize_filename(string):
1192
    """Normalize the filename
1193
1194
    :param string: A string to normalize
1195
    :type string: str
1196
    :returns: Normalized ID
1197
    :rtype: str
1198
    """
1199
    if not isinstance(string, basestring):
1200
        fail("Type of argument must be string, found '{}'".format(type(string)))
1201
    # get the file nomalizer utility
1202
    normalizer = getUtility(IFileNameNormalizer).normalize
1203
    return normalizer(string)
1204
1205
1206
def is_uid(uid, validate=False):
1207
    """Checks if the passed in uid is a valid UID
1208
1209
    :param uid: The uid to check
1210
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1211
    True, also verifies if a brain exists for the uid passed in
1212
    :type uid: string
1213
    :return: True if a valid uid
1214
    :rtype: bool
1215
    """
1216
    if not isinstance(uid, basestring):
1217
        return False
1218
    if len(uid) != 32:
1219
        return False
1220
    if not UID_RX.match(uid):
1221
        return False
1222
    if not validate:
1223
        return True
1224
1225
    # Check if a brain for this uid exists
1226
    uc = get_tool('uid_catalog')
1227
    brains = uc(UID=uid)
1228
    if brains:
1229
        assert (len(brains) == 1)
1230
    return len(brains) > 0
1231
1232
1233
def is_date(date):
1234
    """Checks if the passed in value is a valid Zope's DateTime
1235
1236
    :param date: The date to check
1237
    :type date: DateTime
1238
    :return: True if a valid date
1239
    :rtype: bool
1240
    """
1241
    if not date:
1242
        return False
1243
    return isinstance(date, (DateTime, datetime))
1244
1245
1246
def to_date(value, default=None):
1247
    """Tries to convert the passed in value to Zope's DateTime
1248
1249
    :param value: The value to be converted to a valid DateTime
1250
    :type value: str, DateTime or datetime
1251
    :return: The DateTime representation of the value passed in or default
1252
    """
1253
    if isinstance(value, DateTime):
1254
        return value
1255
    if not value:
1256
        if default is None:
1257
            return None
1258
        return to_date(default)
1259
    try:
1260
        if isinstance(value, str) and '.' in value:
1261
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1262
            return DateTime(value, datefmt='international')
1263
        return DateTime(value)
1264
    except:
1265
        return to_date(default)
1266
1267
1268
def to_int(value, default=_marker):
1269
    """Tries to convert the value to int.
1270
    Truncates at the decimal point if the value is a float
1271
1272
    :param value: The value to be converted to an int
1273
    :return: The resulting int or default
1274
    """
1275
    if is_floatable(value):
1276
        value = to_float(value)
1277
    try:
1278
        return int(value)
1279
    except (TypeError, ValueError):
1280
        if default is None:
1281
            return default
1282
        if default is not _marker:
1283
            return to_int(default)
1284
        fail("Value %s cannot be converted to int" % repr(value))
1285
1286
1287
def is_floatable(value):
1288
    """Checks if the passed in value is a valid floatable number
1289
1290
    :param value: The value to be evaluated as a float number
1291
    :type value: str, float, int
1292
    :returns: True if is a valid float number
1293
    :rtype: bool"""
1294
    try:
1295
        float(value)
1296
        return True
1297
    except (TypeError, ValueError):
1298
        return False
1299
1300
1301
def to_float(value, default=_marker):
1302
    """Converts the passed in value to a float number
1303
1304
    :param value: The value to be converted to a floatable number
1305
    :type value: str, float, int
1306
    :returns: The float number representation of the passed in value
1307
    :rtype: float
1308
    """
1309
    if not is_floatable(value):
1310
        if default is not _marker:
1311
            return to_float(default)
1312
        fail("Value %s is not floatable" % repr(value))
1313
    return float(value)
1314
1315
1316
def to_searchable_text_metadata(value):
1317
    """Parse the given metadata value to searchable text
1318
1319
    :param value: The raw value of the metadata column
1320
    :returns: Searchable and translated unicode value or None
1321
    """
1322
    if not value:
1323
        return u""
1324
    if value is Missing.Value:
1325
        return u""
1326
    if is_uid(value):
1327
        return u""
1328
    if isinstance(value, (bool)):
1329
        return u""
1330
    if isinstance(value, (list, tuple)):
1331
        for v in value:
1332
            return to_searchable_text_metadata(v)
1333
    if isinstance(value, (dict)):
1334
        for k, v in value.items():
1335
            return to_searchable_text_metadata(v)
1336
    if is_date(value):
1337
        return value.strftime("%Y-%m-%d")
1338
    if not isinstance(value, basestring):
1339
        value = str(value)
1340
    return safe_unicode(value)
1341