Passed
Push — master ( caa188...9d0ad3 )
by Ramon
11:25 queued 07:12
created

bika.lims.api.to_dhm_format()   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 16
rs 9.75
c 0
b 0
f 0
cc 3
nop 5
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import Missing
9
import re
10
11
from Acquisition import aq_base
12
from AccessControl.PermissionRole import rolesForPermissionOn
13
14
from datetime import datetime
15
from datetime import timedelta
16
from DateTime import DateTime
17
18
from Products.CMFPlone.utils import base_hasattr, safe_unicode
19
from Products.CMFCore.interfaces import ISiteRoot
20
from Products.CMFCore.interfaces import IFolderish
21
from Products.Archetypes.BaseObject import BaseObject
22
from Products.ZCatalog.interfaces import ICatalogBrain
23
from Products.CMFPlone.utils import _createObjectByType
24
from Products.CMFCore.WorkflowCore import WorkflowException
25
from Products.CMFCore.utils import getToolByName
26
from bika.lims.interfaces import IClient, IContact, ILabContact
27
28
from zope import globalrequest
29
from zope.event import notify
30
from zope.interface import implements
31
from zope.component import getUtility
32
from zope.component import getMultiAdapter
33
from zope.component.interfaces import IFactory
34
from zope.component.interfaces import ObjectEvent
35
from zope.component.interfaces import IObjectEvent
36
from zope.lifecycleevent import modified
37
from zope.lifecycleevent import ObjectCreatedEvent
38
from zope.security.interfaces import Unauthorized
39
40
from plone import api as ploneapi
41
from plone.memoize.volatile import DontCache
42
from plone.api.exc import InvalidParameterError
43
from plone.dexterity.interfaces import IDexterityContent
44
from plone.app.layout.viewlets.content import ContentHistoryView
45
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
46
from plone.i18n.normalizer.interfaces import IIDNormalizer
47
48
from bika.lims import logger
49
50
"""Bika LIMS Framework API
51
52
Please see bika.lims/docs/API.rst for documentation.
53
54
Architecural Notes:
55
56
Please add only functions that do a single thing for a single object.
57
58
Good: `def get_foo(brain_or_object)`
59
Bad:  `def get_foos(list_of_brain_objects)`
60
61
Why?
62
63
Because it makes things more complex. You can always use a pattern like this to
64
achieve the same::
65
66
    >>> foos = map(get_foo, list_of_brain_objects)
67
68
Please add for all of your functions a descriptive test in docs/API.rst.
69
70
Thanks.
71
"""
72
73
_marker = object()
74
75
UID_RX = re.compile("[a-z0-9]{32}$")
76
77
78
class BikaLIMSError(Exception):
79
    """Base exception class for bika.lims errors."""
80
81
82
class IBikaTransitionEvent(IObjectEvent):
83
    """Bika WF transition event interface"""
84
85
86
class IBikaBeforeTransitionEvent(IBikaTransitionEvent):
87
    """Fired before the transition is invoked"""
88
89
90
class IBikaAfterTransitionEvent(IBikaTransitionEvent):
91
    """Fired after the transition done"""
92
93
94
class IBikaTransitionFailedEvent(IBikaTransitionEvent):
95
    """Fired if the transition failed"""
96
97
98
class BikaTransitionEvent(ObjectEvent):
99
    """Bika WF transition event"""
100
    def __init__(self, obj, transition, exception=None):
101
        ObjectEvent.__init__(self, obj)
102
        self.obj = obj
103
        self.transition = transition
104
        self.exception = exception
105
106
107
class BikaBeforeTransitionEvent(BikaTransitionEvent):
108
    implements(IBikaBeforeTransitionEvent)
109
110
111
class BikaAfterTransitionEvent(BikaTransitionEvent):
112
    implements(IBikaAfterTransitionEvent)
113
114
115
class BikaTransitionFailedEvent(BikaTransitionEvent):
116
    implements(IBikaTransitionFailedEvent)
117
118
119
def get_portal():
120
    """Get the portal object
121
122
    :returns: Portal object
123
    """
124
    return ploneapi.portal.getSite()
125
126
127
def get_bika_setup():
128
    """Fetch the `bika_setup` folder.
129
    """
130
    portal = get_portal()
131
    return portal.get("bika_setup")
132
133
134
def create(container, portal_type, *args, **kwargs):
135
    """Creates an object in Bika LIMS
136
137
    This code uses most of the parts from the TypesTool
138
    see: `Products.CMFCore.TypesTool._constructInstance`
139
140
    :param container: container
141
    :type container: ATContentType/DexterityContentType/CatalogBrain
142
    :param portal_type: The portal type to create, e.g. "Client"
143
    :type portal_type: string
144
    :param title: The title for the new content object
145
    :type title: string
146
    :returns: The new created object
147
    """
148
    from bika.lims.utils import tmpID
149
    if kwargs.get("title") is None:
150
        kwargs["title"] = "New {}".format(portal_type)
151
152
    # generate a temporary ID
153
    tmp_id = tmpID()
154
155
    # get the fti
156
    types_tool = get_tool("portal_types")
157
    fti = types_tool.getTypeInfo(portal_type)
158
159
    if fti.product:
160
        obj = _createObjectByType(portal_type, container, tmp_id)
161
    else:
162
        # newstyle factory
163
        factory = getUtility(IFactory, fti.factory)
164
        obj = factory(tmp_id, *args, **kwargs)
165
        if hasattr(obj, '_setPortalTypeName'):
166
            obj._setPortalTypeName(fti.getId())
167
        notify(ObjectCreatedEvent(obj))
168
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and ContainerModifiedEvent
169
        container._setObject(tmp_id, obj)
170
        # we get the object here with the current object id, as it might be renamed
171
        # already by an event handler
172
        obj = container._getOb(obj.getId())
173
174
    # handle AT Content
175
    if is_at_content(obj):
176
        obj.processForm()
177
178
    # Edit after processForm; processForm does AT unmarkCreationFlag.
179
    obj.edit(**kwargs)
180
181
    # explicit notification
182
    modified(obj)
183
    return obj
184
185
186
def get_tool(name, context=None, default=_marker):
187
    """Get a portal tool by name
188
189
    :param name: The name of the tool, e.g. `portal_catalog`
190
    :type name: string
191
    :param context: A portal object
192
    :type context: ATContentType/DexterityContentType/CatalogBrain
193
    :returns: Portal Tool
194
    """
195
196
    # Try first with the context
197
    if context is not None:
198
        try:
199
            context = get_object(context)
200
            return getToolByName(context, name)
201
        except (BikaLIMSError, AttributeError) as e:
202
            # https://github.com/senaite/bika.lims/issues/396
203
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
204
                        "-> falling back to plone.api.portal.get_tool('{}')"
205
                        .format(repr(context), name, repr(e), name))
206
            return get_tool(name, default=default)
207
208
    # Try with the plone api
209
    try:
210
        return ploneapi.portal.get_tool(name)
211
    except InvalidParameterError:
212
        if default is not _marker:
213
            return default
214
        fail("No tool named '%s' found." % name)
215
216
217
def fail(msg=None):
218
    """Bika LIMS Error
219
    """
220
    if msg is None:
221
        msg = "Reason not given."
222
    raise BikaLIMSError("{}".format(msg))
223
224
225
def is_object(brain_or_object):
226
    """Check if the passed in object is a supported portal content object
227
228
    :param brain_or_object: A single catalog brain or content object
229
    :type brain_or_object: Portal Object
230
    :returns: True if the passed in object is a valid portal content
231
    """
232
    if is_portal(brain_or_object):
233
        return True
234
    if is_at_content(brain_or_object):
235
        return True
236
    if is_dexterity_content(brain_or_object):
237
        return True
238
    if is_brain(brain_or_object):
239
        return True
240
    return False
241
242
243
def get_object(brain_or_object):
244
    """Get the full content object
245
246
    :param brain_or_object: A single catalog brain or content object
247
    :type brain_or_object: PortalObject/ATContentType/DexterityContentType
248
    /CatalogBrain
249
    :returns: The full object
250
    """
251
    if not is_object(brain_or_object):
252
        fail("{} is not supported.".format(repr(brain_or_object)))
253
    if is_brain(brain_or_object):
254
        return brain_or_object.getObject()
255
    return brain_or_object
256
257
258
def is_portal(brain_or_object):
259
    """Checks if the passed in object is the portal root object
260
261
    :param brain_or_object: A single catalog brain or content object
262
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
263
    :returns: True if the object is the portal root object
264
    :rtype: bool
265
    """
266
    return ISiteRoot.providedBy(brain_or_object)
267
268
269
def is_brain(brain_or_object):
270
    """Checks if the passed in object is a portal catalog brain
271
272
    :param brain_or_object: A single catalog brain or content object
273
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
274
    :returns: True if the object is a catalog brain
275
    :rtype: bool
276
    """
277
    return ICatalogBrain.providedBy(brain_or_object)
278
279
280
def is_dexterity_content(brain_or_object):
281
    """Checks if the passed in object is a dexterity content type
282
283
    :param brain_or_object: A single catalog brain or content object
284
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
285
    :returns: True if the object is a dexterity content type
286
    :rtype: bool
287
    """
288
    return IDexterityContent.providedBy(brain_or_object)
289
290
291
def is_at_content(brain_or_object):
292
    """Checks if the passed in object is an AT content type
293
294
    :param brain_or_object: A single catalog brain or content object
295
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
296
    :returns: True if the object is an AT content type
297
    :rtype: bool
298
    """
299
    return isinstance(brain_or_object, BaseObject)
300
301
302
def is_folderish(brain_or_object):
303
    """Checks if the passed in object is folderish
304
305
    :param brain_or_object: A single catalog brain or content object
306
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
307
    :returns: True if the object is folderish
308
    :rtype: bool
309
    """
310
    if hasattr(brain_or_object, "is_folderish"):
311
        if callable(brain_or_object.is_folderish):
312
            return brain_or_object.is_folderish()
313
        return brain_or_object.is_folderish
314
    return IFolderish.providedBy(get_object(brain_or_object))
315
316
317
def get_portal_type(brain_or_object):
318
    """Get the portal type for this object
319
320
    :param brain_or_object: A single catalog brain or content object
321
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
322
    :returns: Portal type
323
    :rtype: string
324
    """
325
    if not is_object(brain_or_object):
326
        fail("{} is not supported.".format(repr(brain_or_object)))
327
    return brain_or_object.portal_type
328
329
330
def get_schema(brain_or_object):
331
    """Get the schema of the content
332
333
    :param brain_or_object: A single catalog brain or content object
334
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
335
    :returns: Schema object
336
    """
337
    obj = get_object(brain_or_object)
338
    if is_portal(obj):
339
        fail("get_schema can't return schema of portal root")
340
    if is_dexterity_content(obj):
341
        pt = get_tool("portal_types")
342
        fti = pt.getTypeInfo(obj.portal_type)
343
        return fti.lookupSchema()
344
    if is_at_content(obj):
345
        return obj.Schema()
346
    fail("{} has no Schema.".format(brain_or_object))
347
348
349
def get_fields(brain_or_object):
350
    """Get the list of fields from the object
351
352
    :param brain_or_object: A single catalog brain or content object
353
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
354
    :returns: List of fields
355
    :rtype: list
356
    """
357
    obj = get_object(brain_or_object)
358
    schema = get_schema(obj)
359
    if is_dexterity_content(obj):
360
        names = schema.names()
361
        fields = map(lambda name: schema.get(name), names)
362
        return dict(zip(names, fields))
363
    return dict(zip(schema.keys(), schema.fields()))
364
365
366
def get_id(brain_or_object):
367
    """Get the Plone ID for this object
368
369
    :param brain_or_object: A single catalog brain or content object
370
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
371
    :returns: Plone ID
372
    :rtype: string
373
    """
374
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getId"):
375
        return brain_or_object.getId
376
    return get_object(brain_or_object).getId()
377
378
379
def get_title(brain_or_object):
380
    """Get the Title for this object
381
382
    :param brain_or_object: A single catalog brain or content object
383
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
384
    :returns: Title
385
    :rtype: string
386
    """
387
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
388
        return brain_or_object.Title
389
    return get_object(brain_or_object).Title()
390
391
392
def get_description(brain_or_object):
393
    """Get the Title for this object
394
395
    :param brain_or_object: A single catalog brain or content object
396
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
397
    :returns: Title
398
    :rtype: string
399
    """
400
    if is_brain(brain_or_object) \
401
            and base_hasattr(brain_or_object, "Description"):
402
        return brain_or_object.Description
403
    return get_object(brain_or_object).Description()
404
405
406
def get_uid(brain_or_object):
407
    """Get the Plone UID for this object
408
409
    :param brain_or_object: A single catalog brain or content object
410
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
411
    :returns: Plone UID
412
    :rtype: string
413
    """
414
    if is_portal(brain_or_object):
415
        return '0'
416
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
417
        return brain_or_object.UID
418
    return get_object(brain_or_object).UID()
419
420
421
def get_url(brain_or_object):
422
    """Get the absolute URL for this object
423
424
    :param brain_or_object: A single catalog brain or content object
425
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
426
    :returns: Absolute URL
427
    :rtype: string
428
    """
429
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
430
        return brain_or_object.getURL()
431
    return get_object(brain_or_object).absolute_url()
432
433
434
def get_icon(brain_or_object, html_tag=True):
435
    """Get the icon of the content object
436
437
    :param brain_or_object: A single catalog brain or content object
438
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
439
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
440
    :type html_tag: bool
441
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
442
    :rtype: string
443
    """
444
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
445
    # work for Bika Contents coming from other catalogs than the
446
    # `portal_catalog`
447
    portal_types = get_tool("portal_types")
448
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
449
    icon = fti.getIcon()
450
    if not icon:
451
        return ""
452
    url = "%s/%s" % (get_url(get_portal()), icon)
453
    if not html_tag:
454
        return url
455
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
456
        url=url, title=get_title(brain_or_object))
457
    return tag
458
459
460
def get_object_by_uid(uid, default=_marker):
461
    """Find an object by a given UID
462
463
    :param uid: The UID of the object to find
464
    :type uid: string
465
    :returns: Found Object or None
466
    """
467
468
    # nothing to do here
469
    if not uid:
470
        if default is not _marker:
471
            return default
472
        fail("get_object_by_uid requires UID as first argument; got {} instead"
473
             .format(uid))
474
475
    # we defined the portal object UID to be '0'::
476
    if uid == '0':
477
        return get_portal()
478
479
    # we try to find the object with both catalogs
480
    pc = get_portal_catalog()
481
    uc = get_tool("uid_catalog")
482
483
    # try to find the object with the reference catalog first
484
    brains = uc(UID=uid)
485
    if brains:
486
        return brains[0].getObject()
487
488
    # try to find the object with the portal catalog
489
    res = pc(UID=uid)
490
    if not res:
491
        if default is not _marker:
492
            return default
493
        fail("No object found for UID {}".format(uid))
494
495
    return get_object(res[0])
496
497
498
def get_object_by_path(path, default=_marker):
499
    """Find an object by a given physical path or absolute_url
500
501
    :param path: The physical path of the object to find
502
    :type path: string
503
    :returns: Found Object or None
504
    """
505
506
    # nothing to do here
507
    if not path:
508
        if default is not _marker:
509
            return default
510
        fail("get_object_by_path first argument must be a path; {} received"
511
             .format(path))
512
513
    pc = get_portal_catalog()
514
    portal = get_portal()
515
    portal_path = get_path(portal)
516
    portal_url = get_url(portal)
517
518
    # ensure we have a physical path
519
    if path.startswith(portal_url):
520
        request = get_request()
521
        path = "/".join(request.physicalPathFromURL(path))
522
523
    if not path.startswith(portal_path):
524
        if default is not _marker:
525
            return default
526
        fail("Not a physical path inside the portal.")
527
528
    if path == portal_path:
529
        return portal
530
531
    res = pc(path=dict(query=path, depth=0))
532
    if not res:
533
        if default is not _marker:
534
            return default
535
        fail("Object at path '{}' not found".format(path))
536
    return get_object(res[0])
537
538
539
def get_path(brain_or_object):
540
    """Calculate the physical path of this object
541
542
    :param brain_or_object: A single catalog brain or content object
543
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
544
    :returns: Physical path of the object
545
    :rtype: string
546
    """
547
    if is_brain(brain_or_object):
548
        return brain_or_object.getPath()
549
    return "/".join(get_object(brain_or_object).getPhysicalPath())
550
551
552
def get_parent_path(brain_or_object):
553
    """Calculate the physical parent path of this object
554
555
    :param brain_or_object: A single catalog brain or content object
556
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
557
    :returns: Physical path of the parent object
558
    :rtype: string
559
    """
560
    if is_portal(brain_or_object):
561
        return get_path(get_portal())
562
    if is_brain(brain_or_object):
563
        path = get_path(brain_or_object)
564
        return path.rpartition("/")[0]
565
    return get_path(get_object(brain_or_object).aq_parent)
566
567
568
def get_parent(brain_or_object, catalog_search=False):
569
    """Locate the parent object of the content/catalog brain
570
571
    The `catalog_search` switch uses the `portal_catalog` to do a search return
572
    a brain instead of the full parent object. However, if the search returned
573
    no results, it falls back to return the full parent object.
574
575
    :param brain_or_object: A single catalog brain or content object
576
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
577
    :param catalog_search: Use a catalog query to find the parent object
578
    :type catalog_search: bool
579
    :returns: parent object
580
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
581
    """
582
583
    if is_portal(brain_or_object):
584
        return get_portal()
585
586
    # Do a catalog search and return the brain
587
    if catalog_search:
588
        parent_path = get_parent_path(brain_or_object)
589
590
        # parent is the portal object
591
        if parent_path == get_path(get_portal()):
592
            return get_portal()
593
594
        # get the catalog tool
595
        pc = get_portal_catalog()
596
597
        # query for the parent path
598
        results = pc(path={
599
            "query": parent_path,
600
            "depth": 0})
601
602
        # No results fallback: return the parent object
603
        if not results:
604
            return get_object(brain_or_object).aq_parent
605
606
        # return the brain
607
        return results[0]
608
609
    return get_object(brain_or_object).aq_parent
610
611
612
def search(query, catalog=_marker):
613
    """Search for objects.
614
615
    :param query: A suitable search query.
616
    :type query: dict
617
    :param catalog: A single catalog id or a list of catalog ids
618
    :type catalog: str/list
619
    :returns: Search results
620
    :rtype: List of ZCatalog brains
621
    """
622
623
    # query needs to be a dictionary
624
    if not isinstance(query, dict):
625
        fail("Catalog query needs to be a dictionary")
626
627
    # Portal types to query
628
    portal_types = query.get("portal_type", [])
629
    # We want the portal_type as a list
630
    if not isinstance(portal_types, (tuple, list)):
631
        portal_types = [portal_types]
632
633
    # The catalogs used for the query
634
    catalogs = []
635
636
    # The user did **not** specify a catalog
637
    if catalog is _marker:
638
        # Find the registered catalogs for the queried portal types
639
        for portal_type in portal_types:
640
            # Just get the first registered/default catalog
641
            catalogs.append(get_catalogs_for(
642
                portal_type, default="portal_catalog")[0])
643
    else:
644
        # User defined catalogs
645
        if isinstance(catalog, (list, tuple)):
646
            catalogs.extend(map(get_tool, catalog))
647
        else:
648
            catalogs.append(get_tool(catalog))
649
650
    # Cleanup: Avoid duplicate catalogs
651
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
652
653
    # We only support **single** catalog queries
654
    if len(catalogs) > 1:
655
        fail("Multi Catalog Queries are not supported, please specify a catalog.")
656
657
    return catalogs[0](query)
658
659
660
def safe_getattr(brain_or_object, attr, default=_marker):
661
    """Return the attribute value
662
663
    :param brain_or_object: A single catalog brain or content object
664
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
665
    :param attr: Attribute name
666
    :type attr: str
667
    :returns: Attribute value
668
    :rtype: obj
669
    """
670
    try:
671
        value = getattr(brain_or_object, attr, _marker)
672
        if value is _marker:
673
            if default is not _marker:
674
                return default
675
            fail("Attribute '{}' not found.".format(attr))
676
        if callable(value):
677
            return value()
678
        return value
679
    except Unauthorized:
680
        if default is not _marker:
681
            return default
682
        fail("You are not authorized to access '{}' of '{}'.".format(
683
            attr, repr(brain_or_object)))
684
685
686
def get_portal_catalog():
687
    """Get the portal catalog tool
688
689
    :returns: Portal Catalog Tool
690
    """
691
    return get_tool("portal_catalog")
692
693
694
def get_review_history(brain_or_object, rev=True):
695
    """Get the review history for the given brain or context.
696
697
    :param brain_or_object: A single catalog brain or content object
698
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
699
    :returns: Workflow history
700
    :rtype: [{}, ...]
701
    """
702
    obj = get_object(brain_or_object)
703
    review_history = []
704
    try:
705
        workflow = get_tool("portal_workflow")
706
        review_history = workflow.getInfoFor(obj, 'review_history')
707
    except WorkflowException as e:
708
        message = str(e)
709
        logger.error("Cannot retrieve review_history on {}: {}".format(
710
            obj, message))
711
    if not isinstance(review_history, (list, tuple)):
712
        logger.error("get_review_history: expected list, recieved {}".format(
713
            review_history))
714
        review_history = []
715
    if rev is True:
716
        review_history.reverse()
717
    return review_history
718
719
720
def get_revision_history(brain_or_object):
721
    """Get the revision history for the given brain or context.
722
723
    :param brain_or_object: A single catalog brain or content object
724
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
725
    :returns: Workflow history
726
    :rtype: obj
727
    """
728
    obj = get_object(brain_or_object)
729
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
730
    return chv.fullHistory()
731
732
733
def get_workflows_for(brain_or_object):
734
    """Get the assigned workflows for the given brain or context.
735
736
    Note: This function supports also the portal_type as parameter.
737
738
    :param brain_or_object: A single catalog brain or content object
739
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
740
    :returns: Assigned Workflows
741
    :rtype: tuple
742
    """
743
    workflow = ploneapi.portal.get_tool("portal_workflow")
744
    if isinstance(brain_or_object, basestring):
745
        return workflow.getChainFor(brain_or_object)
746
    obj = get_object(brain_or_object)
747
    return workflow.getChainFor(obj)
748
749
750
def get_workflow_status_of(brain_or_object, state_var="review_state"):
751
    """Get the current workflow status of the given brain or context.
752
753
    :param brain_or_object: A single catalog brain or content object
754
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
755
    :param state_var: The name of the state variable
756
    :type state_var: string
757
    :returns: Status
758
    :rtype: str
759
    """
760
    workflow = get_tool("portal_workflow")
761
    obj = get_object(brain_or_object)
762
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
763
764
765
def get_creation_date(brain_or_object):
766
    """Get the creation date of the brain or object
767
768
    :param brain_or_object: A single catalog brain or content object
769
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
770
    :returns: Creation date
771
    :rtype: DateTime
772
    """
773
    created = getattr(brain_or_object, "created", None)
774
    if created is None:
775
        fail("Object {} has no creation date ".format(
776
             repr(brain_or_object)))
777
    if callable(created):
778
        return created()
779
    return created
780
781
782
def get_modification_date(brain_or_object):
783
    """Get the modification date of the brain or object
784
785
    :param brain_or_object: A single catalog brain or content object
786
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
787
    :returns: Modification date
788
    :rtype: DateTime
789
    """
790
    modified = getattr(brain_or_object, "modified", None)
791
    if modified is None:
792
        fail("Object {} has no modification date ".format(
793
             repr(brain_or_object)))
794
    if callable(modified):
795
        return modified()
796
    return modified
797
798
799
def get_review_status(brain_or_object):
800
    """Get the `review_state` of an object
801
802
    :param brain_or_object: A single catalog brain or content object
803
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
804
    :returns: Value of the review_status variable
805
    :rtype: String
806
    """
807
    if is_brain(brain_or_object):
808
        return brain_or_object.review_state
809
    return get_workflow_status_of(brain_or_object, state_var="review_state")
810
811
812 View Code Duplication
def get_cancellation_status(brain_or_object, default="active"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
813
    """Get the `cancellation_state` of an object
814
815
    :param brain_or_object: A single catalog brain or content object
816
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
817
    :returns: Value of the review_status variable
818
    :rtype: String
819
    """
820
821
    if is_brain(brain_or_object):
822
        state = getattr(brain_or_object, "cancellation_state", None)
823
        if state is not None:
824
            return state
825
826
    workflows = get_workflows_for(brain_or_object)
827
    if "bika_cancellation_workflow" in workflows:
828
        return get_workflow_status_of(brain_or_object, "cancellation_state")
829
830
    state = get_workflow_status_of(brain_or_object)
831
    if state not in ("active", "inactive"):
832
        return default
833
    return state
834
835
836 View Code Duplication
def get_inactive_status(brain_or_object, default="active"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
837
    """Get the `cancellation_state` of an objct
838
839
    :param brain_or_object: A single catalog brain or content object
840
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
841
    :returns: Value of the review_status variable
842
    :rtype: String
843
    """
844
845
    if is_brain(brain_or_object):
846
        state = getattr(brain_or_object, "inactive_state", None)
847
        if state is not None:
848
            return state
849
850
    workflows = get_workflows_for(brain_or_object)
851
    if "bika_inactive_workflow" in workflows:
852
        return get_workflow_status_of(brain_or_object, "inactive_state")
853
854
    state = get_workflow_status_of(brain_or_object)
855
    if state not in ("active", "inactive"):
856
        return default
857
    return state
858
859
860
def is_active(brain_or_object):
861
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
862
863
    :param brain_or_object: A single catalog brain or content object
864
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
865
    :returns: False if the object is in the state 'inactive' or 'cancelled'
866
    :rtype: bool
867
    """
868
    if get_inactive_status(brain_or_object) == "inactive":
869
        return False
870
    if get_cancellation_status(brain_or_object) == "cancelled":
871
        return False
872
    return True
873
874
875
def get_catalogs_for(brain_or_object, default="portal_catalog"):
876
    """Get all registered catalogs for the given portal_type, catalog brain or
877
    content object
878
879
    :param brain_or_object: The portal_type, a catalog brain or content object
880
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
881
    :returns: List of supported catalogs
882
    :rtype: list
883
    """
884
    archetype_tool = get_tool("archetype_tool", None)
885
    if not archetype_tool:
886
        # return the default catalog
887
        return [get_tool(default)]
888
889
    catalogs = []
890
891
    # get the registered catalogs for portal_type
892
    if is_object(brain_or_object):
893
        catalogs = archetype_tool.getCatalogsByType(
894
            get_portal_type(brain_or_object))
895
    if isinstance(brain_or_object, basestring):
896
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
897
898
    if not catalogs:
899
        return [get_tool(default)]
900
    return catalogs
901
902
903
def get_transitions_for(brain_or_object):
904
    """List available workflow transitions for all workflows
905
906
    :param brain_or_object: A single catalog brain or content object
907
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
908
    :returns: All possible available and allowed transitions
909
    :rtype: list[dict]
910
    """
911
    workflow = get_tool('portal_workflow')
912
    transitions = []
913
    instance = get_object(brain_or_object)
914
    for wfid in get_workflows_for(brain_or_object):
915
        wf = workflow[wfid]
916
        tlist = wf.getTransitionsFor(instance)
917
        transitions.extend([t for t in tlist if t not in transitions])
918
    return transitions
919
920
921
def do_transition_for(brain_or_object, transition):
922
    """Performs a workflow transition for the passed in object.
923
924
    :param brain_or_object: A single catalog brain or content object
925
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
926
    :returns: The object where the transtion was performed
927
    """
928
    if not isinstance(transition, basestring):
929
        fail("Transition type needs to be string, got '%s'" % type(transition))
930
    obj = get_object(brain_or_object)
931
    # notify the BeforeTransitionEvent
932
    notify(BikaBeforeTransitionEvent(obj, transition))
933
    try:
934
        ploneapi.content.transition(obj, transition)
935
    except ploneapi.exc.InvalidParameterError as e:
936
        # notify the TransitionFailedEvent
937
        notify(BikaTransitionFailedEvent(obj, transition, exception=e))
938
        fail("Failed to perform transition '{}' on {}: {}".format(
939
             transition, obj, str(e)))
940
    # notify the AfterTransitionEvent
941
    notify(BikaAfterTransitionEvent(obj, transition))
942
    return obj
943
944
945
def get_roles_for_permission(permission, brain_or_object):
946
    """Get a list of granted roles for the given permission on the object.
947
948
    :param brain_or_object: A single catalog brain or content object
949
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
950
    :returns: Roles for the given Permission
951
    :rtype: list
952
    """
953
    obj = get_object(brain_or_object)
954
    allowed = set(rolesForPermissionOn(permission, obj))
955
    return sorted(allowed)
956
957
958
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
959
    """Checks if the passed in object is versionable.
960
961
    :param brain_or_object: A single catalog brain or content object
962
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
963
    :returns: True if the object is versionable
964
    :rtype: bool
965
    """
966
    pr = get_tool("portal_repository")
967
    obj = get_object(brain_or_object)
968
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
969
        and pr.isVersionable(obj)
970
971
972
def get_version(brain_or_object):
973
    """Get the version of the current object
974
975
    :param brain_or_object: A single catalog brain or content object
976
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
977
    :returns: The current version of the object, or None if not available
978
    :rtype: int or None
979
    """
980
    obj = get_object(brain_or_object)
981
    if not is_versionable(obj):
982
        return None
983
    return getattr(aq_base(obj), "version_id", 0)
984
985
986
def get_view(name, context=None, request=None):
987
    """Get the view by name
988
989
    :param name: The name of the view
990
    :type name: str
991
    :param context: The context to query the view
992
    :type context: ATContentType/DexterityContentType/CatalogBrain
993
    :param request: The request to query the view
994
    :type request: HTTPRequest object
995
    :returns: HTTP Request
996
    :rtype: Products.Five.metaclass View object
997
    """
998
    context = context or get_portal()
999
    request = request or get_request() or None
1000
    return getMultiAdapter((get_object(context), request), name=name)
1001
1002
1003
def get_request():
1004
    """Get the global request object
1005
1006
    :returns: HTTP Request
1007
    :rtype: HTTPRequest object
1008
    """
1009
    return globalrequest.getRequest()
1010
1011
1012
def get_group(group_or_groupname):
1013
    """Return Plone Group
1014
1015
    :param group_or_groupname: Plone group or the name of the group
1016
    :type groupname:  GroupData/str
1017
    :returns: Plone GroupData
1018
    """
1019
    if not group_or_groupname:
1020
1021
        return None
1022
    if hasattr(group_or_groupname, "_getGroup"):
1023
        return group_or_groupname
1024
    gtool = get_tool("portal_groups")
1025
    return gtool.getGroupById(group_or_groupname)
1026
1027
1028
def get_user(user_or_username):
1029
    """Return Plone User
1030
1031
    :param user_or_username: Plone user or user id
1032
    :returns: Plone MemberData
1033
    """
1034
    if not user_or_username:
1035
        return None
1036
    if hasattr(user_or_username, "getUserId"):
1037
        return ploneapi.user.get(user_or_username.getUserId())
1038
    return ploneapi.user.get(userid=user_or_username)
1039
1040
1041
def get_user_properties(user_or_username):
1042
    """Return User Properties
1043
1044
    :param user_or_username: Plone group identifier
1045
    :returns: Plone MemberData
1046
    """
1047
    user = get_user(user_or_username)
1048
    if user is None:
1049
        return {}
1050
    if not callable(user.getUser):
1051
        return {}
1052
    out = {}
1053
    plone_user = user.getUser()
1054
    for sheet in plone_user.listPropertysheets():
1055
        ps = plone_user.getPropertysheet(sheet)
1056
        out.update(dict(ps.propertyItems()))
1057
    return out
1058
1059
1060
def get_users_by_roles(roles=None):
1061
    """Search Plone users by their roles
1062
1063
    :param roles: Plone role name or list of roles
1064
    :type roles:  list/str
1065
    :returns: List of Plone users having the role(s)
1066
    """
1067
    if not isinstance(roles, (tuple, list)):
1068
        roles = [roles]
1069
    mtool = get_tool("portal_membership")
1070
    return mtool.searchForMembers(roles=roles)
1071
1072
1073
def get_current_user():
1074
    """Returns the current logged in user
1075
1076
    :returns: Current User
1077
    """
1078
    return ploneapi.user.get_current()
1079
1080
1081
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1082
    """Returns the associated contact of a Plone user
1083
1084
    If the user passed in has no contact associated, return None.
1085
    The `contact_types` parameter filter the portal types for the search.
1086
1087
    :param: Plone user
1088
    :contact_types: List with the contact portal types to search
1089
    :returns: Contact associated to the Plone user or None
1090
    """
1091
    if not user:
1092
        return None
1093
1094
    query = {'portal_type': contact_types, 'getUsername': user.id}
1095
    brains = search(query, catalog='portal_catalog')
1096
    if not brains:
1097
        return None
1098
1099
    if len(brains) > 1:
1100
        # Oops, the user has multiple contacts assigned, return None
1101
        contacts = map(lambda c: c.Title, brains)
1102
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1103
        err_msg = err_msg.format(user.id, ','.join(contacts))
1104
        logger.error(err_msg)
1105
        return None
1106
1107
    return get_object(brains[0])
1108
1109
1110
def get_user_client(user_or_contact):
1111
    """Returns the client of the contact of a Plone user
1112
1113
    If the user passed in has no contact or does not belong to any client,
1114
    returns None.
1115
1116
    :param: Plone user or contact
1117
    :returns: Client the contact of the Plone user belongs to
1118
    """
1119
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1120
        # Lab contacts cannot belong to a client
1121
        return None
1122
1123
    if not IContact.providedBy(user_or_contact):
1124
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1125
        if IContact.providedBy(contact):
1126
            return get_user_client(contact)
1127
        return None
1128
1129
    client = get_parent(user_or_contact)
1130
    if client and IClient.providedBy(client):
1131
        return client
1132
1133
    return None
1134
1135
1136
def get_current_client():
1137
    """Returns the current client the current logged in user belongs to, if any
1138
1139
    :returns: Client the current logged in user belongs to or None
1140
    """
1141
    return get_user_client(get_current_user())
1142
1143
1144
def get_cache_key(brain_or_object):
1145
    """Generate a cache key for a common brain or object
1146
1147
    :param brain_or_object: A single catalog brain or content object
1148
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1149
    :returns: Cache Key
1150
    :rtype: str
1151
    """
1152
    key = [
1153
        get_portal_type(brain_or_object),
1154
        get_id(brain_or_object),
1155
        get_uid(brain_or_object),
1156
        # handle different domains gracefully
1157
        get_url(brain_or_object),
1158
        # Return the microsecond since the epoch in GMT
1159
        get_modification_date(brain_or_object).micros(),
1160
    ]
1161
    return "-".join(map(lambda x: str(x), key))
1162
1163
1164
def bika_cache_key_decorator(method, self, brain_or_object):
1165
    """Bika cache key decorator usable for
1166
1167
    :param brain_or_object: A single catalog brain or content object
1168
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1169
    :returns: Cache Key
1170
    :rtype: str
1171
    """
1172
    if brain_or_object is None:
1173
        raise DontCache
1174
    return get_cache_key(brain_or_object)
1175
1176
1177
def normalize_id(string):
1178
    """Normalize the id
1179
1180
    :param string: A string to normalize
1181
    :type string: str
1182
    :returns: Normalized ID
1183
    :rtype: str
1184
    """
1185
    if not isinstance(string, basestring):
1186
        fail("Type of argument must be string, found '{}'".format(type(string)))
1187
    # get the id nomalizer utility
1188
    normalizer = getUtility(IIDNormalizer).normalize
1189
    return normalizer(string)
1190
1191
1192
def normalize_filename(string):
1193
    """Normalize the filename
1194
1195
    :param string: A string to normalize
1196
    :type string: str
1197
    :returns: Normalized ID
1198
    :rtype: str
1199
    """
1200
    if not isinstance(string, basestring):
1201
        fail("Type of argument must be string, found '{}'".format(type(string)))
1202
    # get the file nomalizer utility
1203
    normalizer = getUtility(IFileNameNormalizer).normalize
1204
    return normalizer(string)
1205
1206
1207
def is_uid(uid, validate=False):
1208
    """Checks if the passed in uid is a valid UID
1209
1210
    :param uid: The uid to check
1211
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1212
    True, also verifies if a brain exists for the uid passed in
1213
    :type uid: string
1214
    :return: True if a valid uid
1215
    :rtype: bool
1216
    """
1217
    if not isinstance(uid, basestring):
1218
        return False
1219
    if len(uid) != 32:
1220
        return False
1221
    if not UID_RX.match(uid):
1222
        return False
1223
    if not validate:
1224
        return True
1225
1226
    # Check if a brain for this uid exists
1227
    uc = get_tool('uid_catalog')
1228
    brains = uc(UID=uid)
1229
    if brains:
1230
        assert (len(brains) == 1)
1231
    return len(brains) > 0
1232
1233
1234
def is_date(date):
1235
    """Checks if the passed in value is a valid Zope's DateTime
1236
1237
    :param date: The date to check
1238
    :type date: DateTime
1239
    :return: True if a valid date
1240
    :rtype: bool
1241
    """
1242
    if not date:
1243
        return False
1244
    return isinstance(date, (DateTime, datetime))
1245
1246
1247
def to_date(value, default=None):
1248
    """Tries to convert the passed in value to Zope's DateTime
1249
1250
    :param value: The value to be converted to a valid DateTime
1251
    :type value: str, DateTime or datetime
1252
    :return: The DateTime representation of the value passed in or default
1253
    """
1254
    if isinstance(value, DateTime):
1255
        return value
1256
    if not value:
1257
        if default is None:
1258
            return None
1259
        return to_date(default)
1260
    try:
1261
        if isinstance(value, str) and '.' in value:
1262
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1263
            return DateTime(value, datefmt='international')
1264
        return DateTime(value)
1265
    except:
1266
        return to_date(default)
1267
1268
1269
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1270
               round_to_int=True):
1271
    """Returns the computed total number of minutes
1272
    """
1273
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1274
            float(seconds)/60 + float(milliseconds)/1000/60
1275
    return int(round(total)) if round_to_int else total
1276
1277
1278
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1279
    """Returns a representation of time in a string in xd yh zm format
1280
    """
1281
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1282
                         seconds=seconds, milliseconds=milliseconds)
1283
    delta = timedelta(minutes=int(round(minutes)))
1284
    d = delta.days
1285
    h = delta.seconds // 3600
1286
    m = (delta.seconds // 60) % 60
1287
    m = m and "{}m ".format(str(m)) or ""
1288
    d = d and "{}d ".format(str(d)) or ""
1289
    if m and d:
1290
        h = "{}h ".format(str(h))
1291
    else:
1292
        h = h and "{}h ".format(str(h)) or ""
1293
    return "".join([d, h, m]).strip()
1294
1295
1296
def to_int(value, default=_marker):
1297
    """Tries to convert the value to int.
1298
    Truncates at the decimal point if the value is a float
1299
1300
    :param value: The value to be converted to an int
1301
    :return: The resulting int or default
1302
    """
1303
    if is_floatable(value):
1304
        value = to_float(value)
1305
    try:
1306
        return int(value)
1307
    except (TypeError, ValueError):
1308
        if default is None:
1309
            return default
1310
        if default is not _marker:
1311
            return to_int(default)
1312
        fail("Value %s cannot be converted to int" % repr(value))
1313
1314
1315
def is_floatable(value):
1316
    """Checks if the passed in value is a valid floatable number
1317
1318
    :param value: The value to be evaluated as a float number
1319
    :type value: str, float, int
1320
    :returns: True if is a valid float number
1321
    :rtype: bool"""
1322
    try:
1323
        float(value)
1324
        return True
1325
    except (TypeError, ValueError):
1326
        return False
1327
1328
1329
def to_float(value, default=_marker):
1330
    """Converts the passed in value to a float number
1331
1332
    :param value: The value to be converted to a floatable number
1333
    :type value: str, float, int
1334
    :returns: The float number representation of the passed in value
1335
    :rtype: float
1336
    """
1337
    if not is_floatable(value):
1338
        if default is not _marker:
1339
            return to_float(default)
1340
        fail("Value %s is not floatable" % repr(value))
1341
    return float(value)
1342
1343
1344
def to_searchable_text_metadata(value):
1345
    """Parse the given metadata value to searchable text
1346
1347
    :param value: The raw value of the metadata column
1348
    :returns: Searchable and translated unicode value or None
1349
    """
1350
    if not value:
1351
        return u""
1352
    if value is Missing.Value:
1353
        return u""
1354
    if is_uid(value):
1355
        return u""
1356
    if isinstance(value, (bool)):
1357
        return u""
1358
    if isinstance(value, (list, tuple)):
1359
        for v in value:
1360
            return to_searchable_text_metadata(v)
1361
    if isinstance(value, (dict)):
1362
        for k, v in value.items():
1363
            return to_searchable_text_metadata(v)
1364
    if is_date(value):
1365
        return value.strftime("%Y-%m-%d")
1366
    if not isinstance(value, basestring):
1367
        value = str(value)
1368
    return safe_unicode(value)
1369