Passed
Push — 2.x ( b7a35b...e2bf9c )
by Ramon
06:06
created

bika.lims.api.get_icon()   B

Complexity

Conditions 6

Size

Total Lines 35
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 35
rs 8.5166
c 0
b 0
f 0
cc 6
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from plone import api as ploneapi
41
from plone.api.exc import InvalidParameterError
42
from plone.app.layout.viewlets.content import ContentHistoryView
43
from plone.behavior.interfaces import IBehaviorAssignable
44
from plone.dexterity.interfaces import IDexterityContent
45
from plone.dexterity.schema import SchemaInvalidatedEvent
46
from plone.dexterity.utils import addContentToContainer
47
from plone.dexterity.utils import createContent
48
from plone.dexterity.utils import resolveDottedName
49
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
50
from plone.i18n.normalizer.interfaces import IIDNormalizer
51
from plone.memoize.volatile import DontCache
52
from Products.Archetypes.atapi import DisplayList
53
from Products.Archetypes.BaseObject import BaseObject
54
from Products.Archetypes.event import ObjectInitializedEvent
55
from Products.Archetypes.utils import mapply
56
from Products.CMFCore.interfaces import IFolderish
57
from Products.CMFCore.interfaces import ISiteRoot
58
from Products.CMFCore.permissions import DeleteObjects
59
from Products.CMFCore.permissions import ModifyPortalContent
60
from Products.CMFCore.permissions import View
61
from Products.CMFCore.utils import getToolByName
62
from Products.CMFCore.WorkflowCore import WorkflowException
63
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
64
from Products.CMFPlone.utils import _createObjectByType
65
from Products.CMFPlone.utils import base_hasattr
66
from Products.CMFPlone.utils import safe_unicode
67
from Products.PlonePAS.tools.memberdata import MemberData
68
from Products.ZCatalog.interfaces import ICatalogBrain
69
from senaite.core.interfaces import ITemporaryObject
70
from zope import globalrequest
71
from zope.annotation.interfaces import IAttributeAnnotatable
72
from zope.component import getUtility
73
from zope.component import queryMultiAdapter
74
from zope.event import notify
75
from zope.interface import alsoProvides
76
from zope.interface import directlyProvides
77
from zope.interface import noLongerProvides
78
from zope.publisher.browser import TestRequest
79
from zope.schema import getFieldsInOrder
80
from zope.security.interfaces import Unauthorized
81
82
"""SENAITE LIMS Framework API
83
84
Please see tests/doctests/API.rst for documentation.
85
86
Architecural Notes:
87
88
Please add only functions that do a single thing for a single object.
89
90
Good: `def get_foo(brain_or_object)`
91
Bad:  `def get_foos(list_of_brain_objects)`
92
93
Why?
94
95
Because it makes things more complex. You can always use a pattern like this to
96
achieve the same::
97
98
    >>> foos = map(get_foo, list_of_brain_objects)
99
100
Please add for all functions a descriptive test in tests/doctests/API.rst.
101
102
Thanks.
103
"""
104
105
_marker = object()
106
107
UID_RX = re.compile("[a-z0-9]{32}$")
108
109
UID_CATALOG = "uid_catalog"
110
PORTAL_CATALOG = "portal_catalog"
111
112
113
class APIError(Exception):
114
    """Base exception class for bika.lims errors."""
115
116
117
def get_portal():
118
    """Get the portal object
119
120
    :returns: Portal object
121
    """
122
    return ploneapi.portal.getSite()
123
124
125
def get_setup():
126
    """Fetch the `bika_setup` folder.
127
    """
128
    portal = get_portal()
129
    return portal.get("bika_setup")
130
131
132
def get_bika_setup():
133
    """Fetch the `bika_setup` folder.
134
    """
135
    return get_setup()
136
137
138
def get_senaite_setup():
139
    """Fetch the new DX `setup` folder.
140
    """
141
    portal = get_portal()
142
    return portal.get("setup")
143
144
145
def create(container, portal_type, *args, **kwargs):
146
    """Creates an object in Bika LIMS
147
148
    This code uses most of the parts from the TypesTool
149
    see: `Products.CMFCore.TypesTool._constructInstance`
150
151
    :param container: container
152
    :type container: ATContentType/DexterityContentType/CatalogBrain
153
    :param portal_type: The portal type to create, e.g. "Client"
154
    :type portal_type: string
155
    :param title: The title for the new content object
156
    :type title: string
157
    :returns: The new created object
158
    """
159
    from bika.lims.utils import tmpID
160
161
    tmp_id = tmpID()
162
    id = kwargs.pop("id", "")
163
    title = kwargs.pop("title", "")
164
165
    # get the fti
166
    types_tool = get_tool("portal_types")
167
    fti = types_tool.getTypeInfo(portal_type)
168
169
    if fti.product:
170
        # create the AT object
171
        obj = _createObjectByType(portal_type, container, id or tmp_id)
172
        # update the object with values
173
        edit(obj, check_permissions=False, title=title, **kwargs)
174
        # auto-id if required
175
        if obj._at_rename_after_creation:
176
            obj._renameAfterCreation(check_auto_id=True)
177
        # we are no longer under creation
178
        obj.unmarkCreationFlag()
179
        # notify that the object was created
180
        notify(ObjectInitializedEvent(obj))
181
    else:
182
        content = createContent(portal_type, **kwargs)
183
        content.id = id
184
        content.title = title
185
        obj = addContentToContainer(container, content)
186
187
    return obj
188
189
190
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
191
    """Creates a copy of the source object. If container is None, creates the
192
    copy inside the same container as the source. If portal_type is specified,
193
    creates a new object of this type, and copies the values from source fields
194
    to the destination object. Field values sent as kwargs have priority over
195
    the field values from source.
196
197
    :param source: object from which create a copy
198
    :type source: ATContentType/DexterityContentType/CatalogBrain
199
    :param container: destination container
200
    :type container: ATContentType/DexterityContentType/CatalogBrain
201
    :param portal_type: destination portal type
202
    :returns: The new created object
203
    """
204
    # Prevent circular dependencies
205
    from security import check_permission
206
    # Use same container as source unless explicitly set
207
    source = get_object(source)
208
    if not container:
209
        container = get_parent(source)
210
211
    # Use same portal type as source unless explicitly set
212
    if not portal_type:
213
        portal_type = get_portal_type(source)
214
215
    # Extend the fields to skip with defaults
216
    skip = kwargs.pop("skip", [])
217
    skip = set(skip)
218
    skip.update([
219
        "Products.Archetypes.Field.ComputedField",
220
        "UID",
221
        "id",
222
        "allowDiscussion",
223
        "contributors",
224
        "creation_date",
225
        "creators",
226
        "effectiveDate",
227
        "expirationDate",
228
        "language",
229
        "location",
230
        "modification_date",
231
        "rights",
232
        "subject",
233
    ])
234
    # Build a dict for complexity reduction
235
    skip = dict([(item, True) for item in skip])
236
237
    # Update kwargs with the field values to copy from source
238
    fields = get_fields(source)
239
    for field_name, field in fields.items():
240
        # Prioritize field values passed as kwargs
241
        if field_name in kwargs:
242
            continue
243
        # Skip framework internal fields by name
244
        if skip.get(field_name, False):
245
            continue
246
        # Skip fields of non-suitable types
247
        if hasattr(field, "getType") and skip.get(field.getType(), False):
248
            continue
249
        # Skip readonly fields
250
        if getattr(field, "readonly", False):
251
            continue
252
        # Skip non-readable fields
253
        perm = getattr(field, "read_permission", View)
254
        if perm and not check_permission(perm, source):
255
            continue
256
257
        # do not wake-up objects unnecessarily
258
        if hasattr(field, "getRaw"):
259
            field_value = field.getRaw(source)
260
        elif hasattr(field, "get_raw"):
261
            field_value = field.get_raw(source)
262
        elif hasattr(field, "getAccessor"):
263
            accessor = field.getAccessor(source)
264
            field_value = accessor()
265
        else:
266
            field_value = field.get(source)
267
268
        # Do a hard copy of value if mutable type
269
        if isinstance(field_value, (list, dict, set)):
270
            field_value = copy.deepcopy(field_value)
271
        kwargs.update({field_name: field_value})
272
273
    # Create a copy
274
    return create(container, portal_type, *args, **kwargs)
275
276
277
def edit(obj, check_permissions=True, **kwargs):
278
    """Updates the values of object fields with the new values passed-in
279
    """
280
    # Prevent circular dependencies
281
    from security import check_permission
282
    fields = get_fields(obj)
283
    for name, value in kwargs.items():
284
        field = fields.get(name, None)
285
        if not field:
286
            continue
287
288
        # cannot update readonly fields
289
        readonly = getattr(field, "readonly", False)
290
        if readonly:
291
            raise ValueError("Field '{}' is readonly".format(name))
292
293
        # check field writable permission
294
        if check_permissions:
295
            perm = getattr(field, "write_permission", ModifyPortalContent)
296
            if perm and not check_permission(perm, obj):
297
                raise Unauthorized("Field '{}' is not writeable".format(name))
298
299
        # Set the value
300
        if hasattr(field, "getMutator"):
301
            mutator = field.getMutator(obj)
302
            mapply(mutator, value)
303
        else:
304
            field.set(obj, value)
305
306
307
def uncatalog_object(obj):
308
    """Un-catalog the object from all catalogs
309
310
    :param obj: object to un-catalog
311
    :type obj: ATContentType/DexterityContentType
312
    """
313
    # un-catalog from registered catalogs
314
    obj.unindexObject()
315
    # explicitly un-catalog from uid_catalog
316
    uid_catalog = get_tool("uid_catalog")
317
    # the uids of uid_catalog are relative paths to portal root
318
    # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
319
    url = "/".join(obj.getPhysicalPath()[2:])
320
    uid_catalog.uncatalog_object(url)
321
322
323
def catalog_object(obj):
324
    """Re-catalog the object
325
326
    :param obj: object to un-catalog
327
    :type obj: ATContentType/DexterityContentType
328
    """
329
    if is_at_content(obj):
330
        # explicitly re-catalog AT types at uid_catalog (DX types are
331
        # automatically reindexed in UID catalog on reindexObject)
332
        uc = get_tool("uid_catalog")
333
        # the uids of uid_catalog are relative paths to portal root
334
        # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
335
        url = "/".join(obj.getPhysicalPath()[2:])
336
        uc.catalog_object(obj, url)
337
    obj.reindexObject()
338
339
340
def delete(obj, check_permissions=True, suppress_events=False):
341
    """Deletes the given object
342
343
    :param obj: object to un-catalog
344
    :param check_permissions: whether delete permission must be checked
345
    :param suppress_events: whether ondelete events have to be fired
346
    :type obj: ATContentType/DexterityContentType
347
    """
348
    from security import check_permission
349
    if check_permissions and not check_permission(DeleteObjects, obj):
350
        raise Unauthorized("Do not have permissions to remove this object")
351
352
    # un-catalog the object from all catalogs (uid_catalog included)
353
    uncatalog_object(obj)
354
    # delete the object
355
    parent = get_parent(obj)
356
    parent._delObject(obj.getId(), suppress_events=suppress_events)
357
358
359
def get_tool(name, context=None, default=_marker):
360
    """Get a portal tool by name
361
362
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
363
    :type name: string
364
    :param context: A portal object
365
    :type context: ATContentType/DexterityContentType/CatalogBrain
366
    :returns: Portal Tool
367
    """
368
369
    # Try first with the context
370
    if context is not None:
371
        try:
372
            context = get_object(context)
373
            return getToolByName(context, name)
374
        except (APIError, AttributeError) as e:
375
            # https://github.com/senaite/bika.lims/issues/396
376
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
377
                        "-> falling back to plone.api.portal.get_tool('{}')"
378
                        .format(repr(context), name, repr(e), name))
379
            return get_tool(name, default=default)
380
381
    # Try with the plone api
382
    try:
383
        return ploneapi.portal.get_tool(name)
384
    except InvalidParameterError:
385
        if default is not _marker:
386
            if isinstance(default, six.string_types):
387
                return get_tool(default)
388
            return default
389
        fail("No tool named '%s' found." % name)
390
391
392
def fail(msg=None):
393
    """API LIMS Error
394
    """
395
    if msg is None:
396
        msg = "Reason not given."
397
    raise APIError("{}".format(msg))
398
399
400
def is_object(brain_or_object):
401
    """Check if the passed in object is a supported portal content object
402
403
    :param brain_or_object: A single catalog brain or content object
404
    :type brain_or_object: Portal Object
405
    :returns: True if the passed in object is a valid portal content
406
    """
407
    if is_portal(brain_or_object):
408
        return True
409
    if is_supermodel(brain_or_object):
410
        return True
411
    if is_at_content(brain_or_object):
412
        return True
413
    if is_dexterity_content(brain_or_object):
414
        return True
415
    if is_brain(brain_or_object):
416
        return True
417
    return False
418
419
420
def get_object(brain_object_uid, default=_marker):
421
    """Get the full content object
422
423
    :param brain_object_uid: A catalog brain or content object or uid
424
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
425
    /CatalogBrain/basestring
426
    :returns: The full object
427
    """
428
    if is_uid(brain_object_uid):
429
        return get_object_by_uid(brain_object_uid, default=default)
430
    elif is_supermodel(brain_object_uid):
431
        return brain_object_uid.instance
432
    if not is_object(brain_object_uid):
433
        if default is _marker:
434
            fail("{} is not supported.".format(repr(brain_object_uid)))
435
        return default
436
    if is_brain(brain_object_uid):
437
        return brain_object_uid.getObject()
438
    return brain_object_uid
439
440
441
def is_portal(brain_or_object):
442
    """Checks if the passed in object is the portal root object
443
444
    :param brain_or_object: A single catalog brain or content object
445
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
446
    :returns: True if the object is the portal root object
447
    :rtype: bool
448
    """
449
    return ISiteRoot.providedBy(brain_or_object)
450
451
452
def is_brain(brain_or_object):
453
    """Checks if the passed in object is a portal catalog brain
454
455
    :param brain_or_object: A single catalog brain or content object
456
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
457
    :returns: True if the object is a catalog brain
458
    :rtype: bool
459
    """
460
    return ICatalogBrain.providedBy(brain_or_object)
461
462
463
def is_supermodel(brain_or_object):
464
    """Checks if the passed in object is a supermodel
465
466
    :param brain_or_object: A single catalog brain or content object
467
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
468
    :returns: True if the object is a catalog brain
469
    :rtype: bool
470
    """
471
    # avoid circular imports
472
    from senaite.app.supermodel.interfaces import ISuperModel
473
    return ISuperModel.providedBy(brain_or_object)
474
475
476
def is_dexterity_content(brain_or_object):
477
    """Checks if the passed in object is a dexterity content type
478
479
    :param brain_or_object: A single catalog brain or content object
480
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
481
    :returns: True if the object is a dexterity content type
482
    :rtype: bool
483
    """
484
    return IDexterityContent.providedBy(brain_or_object)
485
486
487
def is_at_content(brain_or_object):
488
    """Checks if the passed in object is an AT content type
489
490
    :param brain_or_object: A single catalog brain or content object
491
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
492
    :returns: True if the object is an AT content type
493
    :rtype: bool
494
    """
495
    return isinstance(brain_or_object, BaseObject)
496
497
498
def is_dx_type(portal_type):
499
    """Checks if the portal type is DX based
500
501
    :param portal_type: The portal type name to check
502
    :returns: True if the portal type is DX based
503
    """
504
    portal_types = get_tool("portal_types")
505
    fti = portal_types.getTypeInfo(portal_type)
506
    if fti.product:
507
        return False
508
    return True
509
510
511
def is_at_type(portal_type):
512
    """Checks if the portal type is AT based
513
514
    :param portal_type: The portal type name to check
515
    :returns: True if the portal type is AT based
516
    """
517
    return not is_dx_type(portal_type)
518
519
520
def is_folderish(brain_or_object):
521
    """Checks if the passed in object is folderish
522
523
    :param brain_or_object: A single catalog brain or content object
524
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
525
    :returns: True if the object is folderish
526
    :rtype: bool
527
    """
528
    if hasattr(brain_or_object, "is_folderish"):
529
        if callable(brain_or_object.is_folderish):
530
            return brain_or_object.is_folderish()
531
        return brain_or_object.is_folderish
532
    return IFolderish.providedBy(get_object(brain_or_object))
533
534
535
def get_portal_type(brain_or_object):
536
    """Get the portal type for this object
537
538
    :param brain_or_object: A single catalog brain or content object
539
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
540
    :returns: Portal type
541
    :rtype: string
542
    """
543
    if not is_object(brain_or_object):
544
        fail("{} is not supported.".format(repr(brain_or_object)))
545
    return brain_or_object.portal_type
546
547
548
def get_schema(brain_or_object):
549
    """Get the schema of the content
550
551
    :param brain_or_object: A single catalog brain or content object
552
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
553
    :returns: Schema object
554
    """
555
    obj = get_object(brain_or_object)
556
    if is_portal(obj):
557
        fail("get_schema can't return schema of portal root")
558
    if is_dexterity_content(obj):
559
        pt = get_tool("portal_types")
560
        fti = pt.getTypeInfo(obj.portal_type)
561
        return fti.lookupSchema()
562
    if is_at_content(obj):
563
        return obj.Schema()
564
    fail("{} has no Schema.".format(brain_or_object))
565
566
567
def get_behaviors(portal_type):
568
    """List all behaviors
569
570
    :param portal_type: DX portal type name
571
    """
572
    portal_types = get_tool("portal_types")
573
    fti = portal_types.getTypeInfo(portal_type)
574
    if fti.product:
575
        raise TypeError("Expected DX type, got AT type instead.")
576
    return fti.behaviors
577
578
579
def enable_behavior(portal_type, behavior_id):
580
    """Enable behavior
581
582
    :param portal_type: DX portal type name
583
    :param behavior_id: The behavior to enable
584
    """
585
    portal_types = get_tool("portal_types")
586
    fti = portal_types.getTypeInfo(portal_type)
587
    if fti.product:
588
        raise TypeError("Expected DX type, got AT type instead.")
589
590
    if behavior_id not in fti.behaviors:
591
        fti.behaviors += (behavior_id, )
592
        # invalidate schema cache
593
        notify(SchemaInvalidatedEvent(portal_type))
594
595
596
def disable_behavior(portal_type, behavior_id):
597
    """Disable behavior
598
599
    :param portal_type: DX portal type name
600
    :param behavior_id: The behavior to disable
601
    """
602
    portal_types = get_tool("portal_types")
603
    fti = portal_types.getTypeInfo(portal_type)
604
    if fti.product:
605
        raise TypeError("Expected DX type, got AT type instead.")
606
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
607
    # invalidate schema cache
608
    notify(SchemaInvalidatedEvent(portal_type))
609
610
611
def get_fields(brain_or_object):
612
    """Get a name to field mapping of the object
613
614
    :param brain_or_object: A single catalog brain or content object
615
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
616
    :returns: Mapping of name -> field
617
    :rtype: OrderedDict
618
    """
619
    obj = get_object(brain_or_object)
620
    schema = get_schema(obj)
621
    if is_dexterity_content(obj):
622
        # get the fields directly provided by the interface
623
        fields = getFieldsInOrder(schema)
624
        # append the fields coming from behaviors
625
        behavior_assignable = IBehaviorAssignable(obj)
626
        if behavior_assignable:
627
            behaviors = behavior_assignable.enumerateBehaviors()
628
            for behavior in behaviors:
629
                fields.extend(getFieldsInOrder(behavior.interface))
630
        return OrderedDict(fields)
631
    return OrderedDict(schema)
632
633
634
def get_id(brain_or_object):
635
    """Get the Plone ID for this object
636
637
    :param brain_or_object: A single catalog brain or content object
638
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
639
    :returns: Plone ID
640
    :rtype: string
641
    """
642
    if is_brain(brain_or_object):
643
        if base_hasattr(brain_or_object, "getId"):
644
            return brain_or_object.getId
645
        if base_hasattr(brain_or_object, "id"):
646
            return brain_or_object.id
647
    return get_object(brain_or_object).getId()
648
649
650
def get_title(brain_or_object):
651
    """Get the Title for this object
652
653
    :param brain_or_object: A single catalog brain or content object
654
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
655
    :returns: Title
656
    :rtype: string
657
    """
658
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
659
        return brain_or_object.Title
660
    return get_object(brain_or_object).Title()
661
662
663
def get_description(brain_or_object):
664
    """Get the Title for this object
665
666
    :param brain_or_object: A single catalog brain or content object
667
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
668
    :returns: Title
669
    :rtype: string
670
    """
671
    if is_brain(brain_or_object) \
672
            and base_hasattr(brain_or_object, "Description"):
673
        return brain_or_object.Description
674
    return get_object(brain_or_object).Description()
675
676
677
def get_uid(brain_or_object):
678
    """Get the Plone UID for this object
679
680
    :param brain_or_object: A single catalog brain or content object or an UID
681
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
682
    :returns: Plone UID
683
    :rtype: string
684
    """
685
    if is_uid(brain_or_object):
686
        return brain_or_object
687
    if is_portal(brain_or_object):
688
        return '0'
689
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
690
        return brain_or_object.UID
691
    return get_object(brain_or_object).UID()
692
693
694
def get_url(brain_or_object):
695
    """Get the absolute URL for this object
696
697
    :param brain_or_object: A single catalog brain or content object
698
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
699
    :returns: Absolute URL
700
    :rtype: string
701
    """
702
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
703
        return brain_or_object.getURL()
704
    return get_object(brain_or_object).absolute_url()
705
706
707
def get_icon(thing, html_tag=True):
708
    """Get the icon of the content object
709
710
    :param thing: A single catalog brain, content object or portal_type
711
    :type thing: ATContentType/DexterityContentType/CatalogBrain/String
712
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
713
    :type html_tag: bool
714
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
715
    :rtype: string
716
    """
717
    portal_type = thing
718
    if is_object(thing):
719
        portal_type = get_portal_type(thing)
720
721
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
722
    # work for Contents coming from other catalogs than the
723
    # `portal_catalog`
724
    portal_types = get_tool("portal_types")
725
    fti = portal_types.getTypeInfo(portal_type)
726
    if not fti:
727
        fail("No type info for {}".format(repr(thing)))
728
    icon = fti.getIcon()
729
    if not icon:
730
        return ""
731
    url = "%s/%s" % (get_url(get_portal()), icon)
732
    if not html_tag:
733
        return url
734
735
    # build the img element
736
    if is_object(thing):
737
        title = get_title(thing)
738
    else:
739
        title = fti.Title()
740
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'
741
    return tag.format(url=url, title=title)
742
743
744
def get_object_by_uid(uid, default=_marker):
745
    """Find an object by a given UID
746
747
    :param uid: The UID of the object to find
748
    :type uid: string
749
    :returns: Found Object or None
750
    """
751
752
    # nothing to do here
753
    if not uid:
754
        if default is not _marker:
755
            return default
756
        fail("get_object_by_uid requires UID as first argument; got {} instead"
757
             .format(uid))
758
759
    # we defined the portal object UID to be '0'::
760
    if uid == '0':
761
        return get_portal()
762
763
    brain = get_brain_by_uid(uid)
764
765
    if brain is None:
766
        if default is not _marker:
767
            return default
768
        fail("No object found for UID {}".format(uid))
769
770
    return get_object(brain)
771
772
773
def get_brain_by_uid(uid, default=None):
774
    """Query a brain by a given UID
775
776
    :param uid: The UID of the object to find
777
    :type uid: string
778
    :returns: ZCatalog brain or None
779
    """
780
    if not is_uid(uid):
781
        return default
782
783
    # we try to find the object with the UID catalog
784
    uc = get_tool(UID_CATALOG)
785
786
    # try to find the object with the reference catalog first
787
    brains = uc(UID=uid)
788
    if len(brains) != 1:
789
        return default
790
    return brains[0]
791
792
793
def get_object_by_path(path, default=_marker):
794
    """Find an object by a given physical path or absolute_url
795
796
    :param path: The physical path of the object to find
797
    :type path: string
798
    :returns: Found Object or None
799
    """
800
801
    # nothing to do here
802
    if not path:
803
        if default is not _marker:
804
            return default
805
        fail("get_object_by_path first argument must be a path; {} received"
806
             .format(path))
807
808
    portal = get_portal()
809
    portal_path = get_path(portal)
810
    portal_url = get_url(portal)
811
812
    # ensure we have a physical path
813
    if path.startswith(portal_url):
814
        request = get_request()
815
        path = "/".join(request.physicalPathFromURL(path))
816
817
    if not path.startswith(portal_path):
818
        if default is not _marker:
819
            return default
820
        fail("Not a physical path inside the portal.")
821
822
    if path == portal_path:
823
        return portal
824
825
    return portal.restrictedTraverse(path, default)
826
827
828
def get_path(brain_or_object):
829
    """Calculate the physical path of this object
830
831
    :param brain_or_object: A single catalog brain or content object
832
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
833
    :returns: Physical path of the object
834
    :rtype: string
835
    """
836
    if is_brain(brain_or_object):
837
        return brain_or_object.getPath()
838
    return "/".join(get_object(brain_or_object).getPhysicalPath())
839
840
841
def get_parent_path(brain_or_object):
842
    """Calculate the physical parent path of this object
843
844
    :param brain_or_object: A single catalog brain or content object
845
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
846
    :returns: Physical path of the parent object
847
    :rtype: string
848
    """
849
    if is_portal(brain_or_object):
850
        return get_path(get_portal())
851
    if is_brain(brain_or_object):
852
        path = get_path(brain_or_object)
853
        return path.rpartition("/")[0]
854
    return get_path(get_object(brain_or_object).aq_parent)
855
856
857
def get_parent(brain_or_object, **kw):
858
    """Locate the parent object of the content/catalog brain
859
860
    :param brain_or_object: A single catalog brain or content object
861
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
862
    :param catalog_search: Use a catalog query to find the parent object
863
    :type catalog_search: bool
864
    :returns: parent object
865
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
866
    """
867
868
    if is_portal(brain_or_object):
869
        return get_portal()
870
871
    # BBB: removed `catalog_search` keyword
872
    if kw:
873
        logger.warn("API function `get_parent` no longer support keywords.")
874
875
    return get_object(brain_or_object).aq_parent
876
877
878
def search(query, catalog=_marker):
879
    """Search for objects.
880
881
    :param query: A suitable search query.
882
    :type query: dict
883
    :param catalog: A single catalog id or a list of catalog ids
884
    :type catalog: str/list
885
    :returns: Search results
886
    :rtype: List of ZCatalog brains
887
    """
888
889
    # query needs to be a dictionary
890
    if not isinstance(query, dict):
891
        fail("Catalog query needs to be a dictionary")
892
893
    # Portal types to query
894
    portal_types = query.get("portal_type", [])
895
    # We want the portal_type as a list
896
    if not isinstance(portal_types, (tuple, list)):
897
        portal_types = [portal_types]
898
899
    # The catalogs used for the query
900
    catalogs = []
901
902
    # The user did **not** specify a catalog
903
    if catalog is _marker:
904
        # Find the registered catalogs for the queried portal types
905
        for portal_type in portal_types:
906
            # Just get the first registered/default catalog
907
            catalogs.append(get_catalogs_for(
908
                portal_type, default=UID_CATALOG)[0])
909
    else:
910
        # User defined catalogs
911
        if isinstance(catalog, (list, tuple)):
912
            catalogs.extend(map(get_tool, catalog))
913
        else:
914
            catalogs.append(get_tool(catalog))
915
916
    # Cleanup: Avoid duplicate catalogs
917
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
918
919
    # We only support **single** catalog queries
920
    if len(catalogs) > 1:
921
        fail("Multi Catalog Queries are not supported!")
922
923
    return catalogs[0](query)
924
925
926
def safe_getattr(brain_or_object, attr, default=_marker):
927
    """Return the attribute value
928
929
    :param brain_or_object: A single catalog brain or content object
930
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
931
    :param attr: Attribute name
932
    :type attr: str
933
    :returns: Attribute value
934
    :rtype: obj
935
    """
936
    try:
937
        value = getattr(brain_or_object, attr, _marker)
938
        if value is _marker:
939
            if default is not _marker:
940
                return default
941
            fail("Attribute '{}' not found.".format(attr))
942
        if callable(value):
943
            return value()
944
        return value
945
    except Unauthorized:
946
        if default is not _marker:
947
            return default
948
        fail("You are not authorized to access '{}' of '{}'.".format(
949
            attr, repr(brain_or_object)))
950
951
952
def get_uid_catalog():
953
    """Get the UID catalog tool
954
955
    :returns: UID Catalog Tool
956
    """
957
    return get_tool(UID_CATALOG)
958
959
960
def get_portal_catalog():
961
    """Get the portal catalog tool
962
963
    :returns: Portal Catalog Tool
964
    """
965
    return get_tool(PORTAL_CATALOG)
966
967
968
def get_review_history(brain_or_object, rev=True):
969
    """Get the review history for the given brain or context.
970
971
    :param brain_or_object: A single catalog brain or content object
972
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
973
    :returns: Workflow history
974
    :rtype: [{}, ...]
975
    """
976
    obj = get_object(brain_or_object)
977
    review_history = []
978
    try:
979
        workflow = get_tool("portal_workflow")
980
        review_history = workflow.getInfoFor(obj, 'review_history')
981
    except WorkflowException as e:
982
        message = str(e)
983
        logger.error("Cannot retrieve review_history on {}: {}".format(
984
            obj, message))
985
    if not isinstance(review_history, (list, tuple)):
986
        logger.error("get_review_history: expected list, received {}".format(
987
            review_history))
988
        review_history = []
989
990
    if isinstance(review_history, tuple):
991
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
992
        # tuple when "review_history" is passed in:
993
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
994
        #
995
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
996
        # Expression.StateChangeInfo, that always returns a list, except when no
997
        # review_history is found:
998
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
999
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
1000
        review_history = list(review_history)
1001
1002
    if rev is True:
1003
        review_history.reverse()
1004
    return review_history
1005
1006
1007
def get_revision_history(brain_or_object):
1008
    """Get the revision history for the given brain or context.
1009
1010
    :param brain_or_object: A single catalog brain or content object
1011
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1012
    :returns: Workflow history
1013
    :rtype: obj
1014
    """
1015
    obj = get_object(brain_or_object)
1016
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
1017
    return chv.fullHistory()
1018
1019
1020
def get_workflows_for(brain_or_object):
1021
    """Get the assigned workflows for the given brain or context.
1022
1023
    Note: This function supports also the portal_type as parameter.
1024
1025
    :param brain_or_object: A single catalog brain or content object
1026
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1027
    :returns: Assigned Workflows
1028
    :rtype: tuple
1029
    """
1030
    workflow = ploneapi.portal.get_tool("portal_workflow")
1031
    if isinstance(brain_or_object, six.string_types):
1032
        return workflow.getChainFor(brain_or_object)
1033
    obj = get_object(brain_or_object)
1034
    return workflow.getChainFor(obj)
1035
1036
1037
def get_workflow_status_of(brain_or_object, state_var="review_state"):
1038
    """Get the current workflow status of the given brain or context.
1039
1040
    :param brain_or_object: A single catalog brain or content object
1041
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1042
    :param state_var: The name of the state variable
1043
    :type state_var: string
1044
    :returns: Status
1045
    :rtype: str
1046
    """
1047
    # Try to get the state from the catalog brain first
1048
    if is_brain(brain_or_object):
1049
        if state_var in brain_or_object.schema():
1050
            return brain_or_object[state_var]
1051
1052
    # Retrieve the sate from the object
1053
    workflow = get_tool("portal_workflow")
1054
    obj = get_object(brain_or_object)
1055
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1056
1057
1058
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1059
    """Get the previous workflow status of the object
1060
1061
    :param brain_or_object: A single catalog brain or content object
1062
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1063
    :param skip: Workflow states to skip
1064
    :type skip: tuple/list
1065
    :returns: status
1066
    :rtype: str
1067
    """
1068
1069
    skip = isinstance(skip, (list, tuple)) and skip or []
1070
    history = get_review_history(brain_or_object)
1071
1072
    # Remove consecutive duplicates, some transitions might happen more than
1073
    # once consecutively (e.g. publish)
1074
    history = map(lambda i: i[0], groupby(history))
1075
1076
    for num, item in enumerate(history):
1077
        # skip the current history entry
1078
        if num == 0:
1079
            continue
1080
        status = item.get("review_state")
1081
        if status in skip:
1082
            continue
1083
        return status
1084
    return default
1085
1086
1087
def get_creation_date(brain_or_object):
1088
    """Get the creation date of the brain or object
1089
1090
    :param brain_or_object: A single catalog brain or content object
1091
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1092
    :returns: Creation date
1093
    :rtype: DateTime
1094
    """
1095
    created = getattr(brain_or_object, "created", None)
1096
    if created is None:
1097
        fail("Object {} has no creation date ".format(
1098
             repr(brain_or_object)))
1099
    if callable(created):
1100
        return created()
1101
    return created
1102
1103
1104
def get_modification_date(brain_or_object):
1105
    """Get the modification date of the brain or object
1106
1107
    :param brain_or_object: A single catalog brain or content object
1108
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1109
    :returns: Modification date
1110
    :rtype: DateTime
1111
    """
1112
    modified = getattr(brain_or_object, "modified", None)
1113
    if modified is None:
1114
        fail("Object {} has no modification date ".format(
1115
             repr(brain_or_object)))
1116
    if callable(modified):
1117
        return modified()
1118
    return modified
1119
1120
1121
def get_review_status(brain_or_object):
1122
    """Get the `review_state` of an object
1123
1124
    :param brain_or_object: A single catalog brain or content object
1125
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1126
    :returns: Value of the review_status variable
1127
    :rtype: String
1128
    """
1129
    if is_brain(brain_or_object) \
1130
       and base_hasattr(brain_or_object, "review_state"):
1131
        return brain_or_object.review_state
1132
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1133
1134
1135
def is_active(brain_or_object):
1136
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1137
1138
    :param brain_or_object: A single catalog brain or content object
1139
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1140
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1141
    :rtype: bool
1142
    """
1143
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1144
        return False
1145
    return True
1146
1147
1148
def get_fti(portal_type, default=None):
1149
    """Lookup the Dynamic Filetype Information for the given portal_type
1150
1151
    :param portal_type: The portal type to get the FTI for
1152
    :returns: FTI or default value
1153
    """
1154
    if not is_string(portal_type):
1155
        return default
1156
    portal_types = get_tool("portal_types")
1157
    fti = portal_types.getTypeInfo(portal_type)
1158
    return fti or default
1159
1160
1161
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1162
    """Get all registered catalogs for the given portal_type, catalog brain or
1163
    content object
1164
1165
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1166
          work around the missing `uid_catalog` during snapshot creation when
1167
          installing a fresh site!
1168
1169
    :param brain_or_object: The portal_type, a catalog brain or content object
1170
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1171
    :returns: List of supported catalogs
1172
    :rtype: list
1173
    """
1174
1175
    # only handle catalog lookups by portal_type internally
1176
    if is_uid(brain_or_object) or is_object(brain_or_object):
1177
        obj = get_object(brain_or_object)
1178
        portal_type = get_portal_type(obj)
1179
        return get_catalogs_for(portal_type)
1180
1181
    catalogs = []
1182
1183
    if not is_string(brain_or_object):
1184
        raise APIError("Expected a portal_type string, got <%s>"
1185
                       % type(brain_or_object))
1186
1187
    # at this point the brain_or_object is a portal_type
1188
    portal_type = brain_or_object
1189
1190
    # check static portal_type -> catalog mapping first
1191
    from senaite.core.catalog import get_catalogs_by_type
1192
    catalogs = get_catalogs_by_type(portal_type)
1193
1194
    # no catalogs in static mapping
1195
    # => Lookup catalogs by FTI
1196
    if len(catalogs) == 0:
1197
        fti = get_fti(portal_type)
1198
        if fti.product:
1199
            # AT content type
1200
            # => Looup via archetype_tool
1201
            archetype_tool = get_tool("archetype_tool")
1202
            catalogs = archetype_tool.catalog_map.get(portal_type) or []
1203
        else:
1204
            # DX content type
1205
            # => resolve the `_catalogs` attribute from the class
1206
            klass = resolveDottedName(fti.klass)
1207
            # XXX: Refactor multi-catalog behavior to not rely
1208
            #      on this hidden `_catalogs` attribute!
1209
            catalogs = getattr(klass, "_catalogs", [])
1210
1211
    # fetch the catalog objects
1212
    catalogs = filter(None, map(lambda cid: get_tool(cid, None), catalogs))
1213
1214
    if len(catalogs) == 0:
1215
        return [get_tool(default, default=PORTAL_CATALOG)]
1216
1217
    return list(catalogs)
1218
1219
1220
def get_transitions_for(brain_or_object):
1221
    """List available workflow transitions for all workflows
1222
1223
    :param brain_or_object: A single catalog brain or content object
1224
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1225
    :returns: All possible available and allowed transitions
1226
    :rtype: list[dict]
1227
    """
1228
    workflow = get_tool('portal_workflow')
1229
    transitions = []
1230
    instance = get_object(brain_or_object)
1231
    for wfid in get_workflows_for(brain_or_object):
1232
        wf = workflow[wfid]
1233
        tlist = wf.getTransitionsFor(instance)
1234
        transitions.extend([t for t in tlist if t not in transitions])
1235
    return transitions
1236
1237
1238
def do_transition_for(brain_or_object, transition):
1239
    """Performs a workflow transition for the passed in object.
1240
1241
    :param brain_or_object: A single catalog brain or content object
1242
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1243
    :returns: The object where the transtion was performed
1244
    """
1245
    if not isinstance(transition, six.string_types):
1246
        fail("Transition type needs to be string, got '%s'" % type(transition))
1247
    obj = get_object(brain_or_object)
1248
    try:
1249
        ploneapi.content.transition(obj, transition)
1250
    except ploneapi.exc.InvalidParameterError as e:
1251
        fail("Failed to perform transition '{}' on {}: {}".format(
1252
             transition, obj, str(e)))
1253
    return obj
1254
1255
1256
def get_roles_for_permission(permission, brain_or_object):
1257
    """Get a list of granted roles for the given permission on the object.
1258
1259
    :param brain_or_object: A single catalog brain or content object
1260
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1261
    :returns: Roles for the given Permission
1262
    :rtype: list
1263
    """
1264
    obj = get_object(brain_or_object)
1265
    allowed = set(rolesForPermissionOn(permission, obj))
1266
    return sorted(allowed)
1267
1268
1269
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1270
    """Checks if the passed in object is versionable.
1271
1272
    :param brain_or_object: A single catalog brain or content object
1273
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1274
    :returns: True if the object is versionable
1275
    :rtype: bool
1276
    """
1277
    pr = get_tool("portal_repository")
1278
    obj = get_object(brain_or_object)
1279
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1280
        and pr.isVersionable(obj)
1281
1282
1283
def get_version(brain_or_object):
1284
    """Get the version of the current object
1285
1286
    :param brain_or_object: A single catalog brain or content object
1287
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1288
    :returns: The current version of the object, or None if not available
1289
    :rtype: int or None
1290
    """
1291
    obj = get_object(brain_or_object)
1292
    if not is_versionable(obj):
1293
        return None
1294
    return getattr(aq_base(obj), "version_id", 0)
1295
1296
1297
def get_view(name, context=None, request=None, default=None):
1298
    """Get the view by name
1299
1300
    :param name: The name of the view
1301
    :type name: str
1302
    :param context: The context to query the view
1303
    :type context: ATContentType/DexterityContentType/CatalogBrain
1304
    :param request: The request to query the view
1305
    :type request: HTTPRequest object
1306
    :returns: HTTP Request
1307
    :rtype: Products.Five.metaclass View object
1308
    """
1309
    context = context or get_portal()
1310
    request = request or get_request() or None
1311
    view = queryMultiAdapter((get_object(context), request), name=name)
1312
    if view is None:
1313
        return default
1314
    return view
1315
1316
1317
def get_request():
1318
    """Get the global request object
1319
1320
    :returns: HTTP Request
1321
    :rtype: HTTPRequest object
1322
    """
1323
    return globalrequest.getRequest()
1324
1325
1326
def get_test_request():
1327
    """Get the TestRequest object
1328
    """
1329
    request = TestRequest()
1330
    directlyProvides(request, IAttributeAnnotatable)
1331
    return request
1332
1333
1334
def get_group(group_or_groupname):
1335
    """Return Plone Group
1336
1337
    :param group_or_groupname: Plone group or the name of the group
1338
    :type groupname:  GroupData/str
1339
    :returns: Plone GroupData
1340
    """
1341
    if not group_or_groupname:
1342
1343
        return None
1344
    if hasattr(group_or_groupname, "_getGroup"):
1345
        return group_or_groupname
1346
    gtool = get_tool("portal_groups")
1347
    return gtool.getGroupById(group_or_groupname)
1348
1349
1350
def get_user(user_or_username):
1351
    """Return Plone User
1352
1353
    :param user_or_username: Plone user or user id
1354
    :returns: Plone MemberData
1355
    """
1356
    user = None
1357
    if isinstance(user_or_username, MemberData):
1358
        user = user_or_username
1359
    if isinstance(user_or_username, six.string_types):
1360
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1361
    return user
1362
1363
1364
def get_user_properties(user_or_username):
1365
    """Return User Properties
1366
1367
    :param user_or_username: Plone group identifier
1368
    :returns: Plone MemberData
1369
    """
1370
    user = get_user(user_or_username)
1371
    if user is None:
1372
        return {}
1373
    if not callable(user.getUser):
1374
        return {}
1375
    out = {}
1376
    plone_user = user.getUser()
1377
    for sheet in plone_user.listPropertysheets():
1378
        ps = plone_user.getPropertysheet(sheet)
1379
        out.update(dict(ps.propertyItems()))
1380
    return out
1381
1382
1383
def get_users_by_roles(roles=None):
1384
    """Search Plone users by their roles
1385
1386
    :param roles: Plone role name or list of roles
1387
    :type roles:  list/str
1388
    :returns: List of Plone users having the role(s)
1389
    """
1390
    if not isinstance(roles, (tuple, list)):
1391
        roles = [roles]
1392
    mtool = get_tool("portal_membership")
1393
    return mtool.searchForMembers(roles=roles)
1394
1395
1396
def get_current_user():
1397
    """Returns the current logged in user
1398
1399
    :returns: Current User
1400
    """
1401
    return ploneapi.user.get_current()
1402
1403
1404
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1405
    """Returns the associated contact of a Plone user
1406
1407
    If the user passed in has no contact associated, return None.
1408
    The `contact_types` parameter filter the portal types for the search.
1409
1410
    :param: Plone user
1411
    :contact_types: List with the contact portal types to search
1412
    :returns: Contact associated to the Plone user or None
1413
    """
1414
    if not user:
1415
        return None
1416
1417
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1418
    query = {"portal_type": contact_types, "getUsername": user.id}
1419
    brains = search(query, catalog=CONTACT_CATALOG)
1420
    if not brains:
1421
        return None
1422
1423
    if len(brains) > 1:
1424
        # Oops, the user has multiple contacts assigned, return None
1425
        contacts = map(lambda c: c.Title, brains)
1426
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1427
        err_msg = err_msg.format(user.id, ','.join(contacts))
1428
        logger.error(err_msg)
1429
        return None
1430
1431
    return get_object(brains[0])
1432
1433
1434
def get_user_client(user_or_contact):
1435
    """Returns the client of the contact of a Plone user
1436
1437
    If the user passed in has no contact or does not belong to any client,
1438
    returns None.
1439
1440
    :param: Plone user or contact
1441
    :returns: Client the contact of the Plone user belongs to
1442
    """
1443
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1444
        # Lab contacts cannot belong to a client
1445
        return None
1446
1447
    if not IContact.providedBy(user_or_contact):
1448
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1449
        if IContact.providedBy(contact):
1450
            return get_user_client(contact)
1451
        return None
1452
1453
    client = get_parent(user_or_contact)
1454
    if client and IClient.providedBy(client):
1455
        return client
1456
1457
    return None
1458
1459
1460
def get_current_client():
1461
    """Returns the current client the current logged in user belongs to, if any
1462
1463
    :returns: Client the current logged in user belongs to or None
1464
    """
1465
    return get_user_client(get_current_user())
1466
1467
1468
def get_cache_key(brain_or_object):
1469
    """Generate a cache key for a common brain or object
1470
1471
    :param brain_or_object: A single catalog brain or content object
1472
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1473
    :returns: Cache Key
1474
    :rtype: str
1475
    """
1476
    key = [
1477
        get_portal_type(brain_or_object),
1478
        get_id(brain_or_object),
1479
        get_uid(brain_or_object),
1480
        # handle different domains gracefully
1481
        get_url(brain_or_object),
1482
        # Return the microsecond since the epoch in GMT
1483
        get_modification_date(brain_or_object).micros(),
1484
    ]
1485
    return "-".join(map(lambda x: str(x), key))
1486
1487
1488
def bika_cache_key_decorator(method, self, brain_or_object):
1489
    """Bika cache key decorator usable for
1490
1491
    :param brain_or_object: A single catalog brain or content object
1492
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1493
    :returns: Cache Key
1494
    :rtype: str
1495
    """
1496
    if brain_or_object is None:
1497
        raise DontCache
1498
    return get_cache_key(brain_or_object)
1499
1500
1501
def normalize_id(string):
1502
    """Normalize the id
1503
1504
    :param string: A string to normalize
1505
    :type string: str
1506
    :returns: Normalized ID
1507
    :rtype: str
1508
    """
1509
    if not isinstance(string, six.string_types):
1510
        fail("Type of argument must be string, found '{}'"
1511
             .format(type(string)))
1512
    # get the id nomalizer utility
1513
    normalizer = getUtility(IIDNormalizer).normalize
1514
    return normalizer(string)
1515
1516
1517
def normalize_filename(string):
1518
    """Normalize the filename
1519
1520
    :param string: A string to normalize
1521
    :type string: str
1522
    :returns: Normalized ID
1523
    :rtype: str
1524
    """
1525
    if not isinstance(string, six.string_types):
1526
        fail("Type of argument must be string, found '{}'"
1527
             .format(type(string)))
1528
    # get the file nomalizer utility
1529
    normalizer = getUtility(IFileNameNormalizer).normalize
1530
    return normalizer(string)
1531
1532
1533
def is_uid(uid, validate=False):
1534
    """Checks if the passed in uid is a valid UID
1535
1536
    :param uid: The uid to check
1537
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1538
    True, also verifies if a brain exists for the uid passed in
1539
    :type uid: string
1540
    :return: True if a valid uid
1541
    :rtype: bool
1542
    """
1543
    if not isinstance(uid, six.string_types):
1544
        return False
1545
    if uid == '0':
1546
        return True
1547
    if len(uid) != 32:
1548
        return False
1549
    if not UID_RX.match(uid):
1550
        return False
1551
    if not validate:
1552
        return True
1553
1554
    # Check if a brain for this uid exists
1555
    uc = get_tool('uid_catalog')
1556
    brains = uc(UID=uid)
1557
    if brains:
1558
        assert (len(brains) == 1)
1559
    return len(brains) > 0
1560
1561
1562
def is_date(date):
1563
    """Checks if the passed in value is a valid Zope's DateTime
1564
1565
    :param date: The date to check
1566
    :type date: DateTime
1567
    :return: True if a valid date
1568
    :rtype: bool
1569
    """
1570
    if not date:
1571
        return False
1572
    return isinstance(date, (DateTime, datetime))
1573
1574
1575
def to_date(value, default=None):
1576
    """Tries to convert the passed in value to Zope's DateTime
1577
1578
    :param value: The value to be converted to a valid DateTime
1579
    :type value: str, DateTime or datetime
1580
    :return: The DateTime representation of the value passed in or default
1581
    """
1582
1583
    # cannot use bika.lims.deprecated (circular dependencies)
1584
    import warnings
1585
    warnings.simplefilter("always", DeprecationWarning)
1586
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1587
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1588
    warnings.simplefilter("default", DeprecationWarning)
1589
1590
    # prevent circular dependencies
1591
    from senaite.core.api.dtime import to_DT
1592
    date = to_DT(value)
1593
    if not date:
1594
        return to_DT(default)
1595
    return date
1596
1597
1598
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1599
               round_to_int=True):
1600
    """Returns the computed total number of minutes
1601
    """
1602
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1603
        float(seconds)/60 + float(milliseconds)/1000/60
1604
    return int(round(total)) if round_to_int else total
1605
1606
1607
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1608
    """Returns a representation of time in a string in xd yh zm format
1609
    """
1610
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1611
                         seconds=seconds, milliseconds=milliseconds)
1612
    delta = timedelta(minutes=int(round(minutes)))
1613
    d = delta.days
1614
    h = delta.seconds // 3600
1615
    m = (delta.seconds // 60) % 60
1616
    m = m and "{}m ".format(str(m)) or ""
1617
    d = d and "{}d ".format(str(d)) or ""
1618
    if m and d:
1619
        h = "{}h ".format(str(h))
1620
    else:
1621
        h = h and "{}h ".format(str(h)) or ""
1622
    return "".join([d, h, m]).strip()
1623
1624
1625
def to_int(value, default=_marker):
1626
    """Tries to convert the value to int.
1627
    Truncates at the decimal point if the value is a float
1628
1629
    :param value: The value to be converted to an int
1630
    :return: The resulting int or default
1631
    """
1632
    if is_floatable(value):
1633
        value = to_float(value)
1634
    try:
1635
        return int(value)
1636
    except (TypeError, ValueError):
1637
        if default is None:
1638
            return default
1639
        if default is not _marker:
1640
            return to_int(default)
1641
        fail("Value %s cannot be converted to int" % repr(value))
1642
1643
1644
def is_floatable(value):
1645
    """Checks if the passed in value is a valid floatable number
1646
1647
    :param value: The value to be evaluated as a float number
1648
    :type value: str, float, int
1649
    :returns: True if is a valid float number
1650
    :rtype: bool"""
1651
    try:
1652
        float(value)
1653
        return True
1654
    except (TypeError, ValueError):
1655
        return False
1656
1657
1658
def to_float(value, default=_marker):
1659
    """Converts the passed in value to a float number
1660
1661
    :param value: The value to be converted to a floatable number
1662
    :type value: str, float, int
1663
    :returns: The float number representation of the passed in value
1664
    :rtype: float
1665
    """
1666
    if not is_floatable(value):
1667
        if default is not _marker:
1668
            return to_float(default)
1669
        fail("Value %s is not floatable" % repr(value))
1670
    return float(value)
1671
1672
1673
def float_to_string(value, default=_marker):
1674
    """Convert a float value to string without exponential notation
1675
1676
    This function preserves the whole fraction
1677
1678
    :param value: The float value to be converted to a string
1679
    :type value: str, float, int
1680
    :returns: String representation of the float w/o exponential notation
1681
    :rtype: str
1682
    """
1683
    if not is_floatable(value):
1684
        if default is not _marker:
1685
            return default
1686
        fail("Value %s is not floatable" % repr(value))
1687
1688
    # Leave floatable string values unchanged
1689
    if isinstance(value, six.string_types):
1690
        return value
1691
1692
    value = float(value)
1693
    str_value = str(value)
1694
1695
    if "." in str_value:
1696
        # might be something like 1.23e-26
1697
        front, back = str_value.split(".")
1698
    else:
1699
        # or 1e-07 for 0.0000001
1700
        back = str_value
1701
1702
    if "e-" in back:
1703
        fraction, zeros = back.split("e-")
1704
        # we want to cover the faction and the zeros
1705
        precision = len(fraction) + int(zeros)
1706
        template = "{:.%df}" % precision
1707
        str_value = template.format(value)
1708
    elif "e+" in back:
1709
        # positive numbers, e.g. 1e+16 don't need a fractional part
1710
        str_value = "{:.0f}".format(value)
1711
1712
    # cut off trailing zeros
1713
    if "." in str_value:
1714
        str_value = str_value.rstrip("0").rstrip(".")
1715
1716
    return str_value
1717
1718
1719
def to_searchable_text_metadata(value):
1720
    """Parse the given metadata value to searchable text
1721
1722
    :param value: The raw value of the metadata column
1723
    :returns: Searchable and translated unicode value or None
1724
    """
1725
    if not value:
1726
        return u""
1727
    if value is Missing.Value:
1728
        return u""
1729
    if is_uid(value):
1730
        return u""
1731
    if isinstance(value, (bool)):
1732
        return u""
1733
    if isinstance(value, (list, tuple)):
1734
        values = map(to_searchable_text_metadata, value)
1735
        values = filter(None, values)
1736
        return " ".join(values)
1737
    if isinstance(value, dict):
1738
        return to_searchable_text_metadata(value.values())
1739
    if is_date(value):
1740
        from senaite.core.api.dtime import date_to_string
1741
        return date_to_string(value, "%Y-%m-%d")
1742
    if is_at_content(value):
1743
        return to_searchable_text_metadata(get_title(value))
1744
    if not isinstance(value, six.string_types):
1745
        value = str(value)
1746
    return safe_unicode(value)
1747
1748
1749
def get_registry_record(name, default=None):
1750
    """Returns the value of a registry record
1751
1752
    :param name: [required] name of the registry record
1753
    :type name: str
1754
    :param default: The value returned if the record is not found
1755
    :type default: anything
1756
    :returns: value of the registry record
1757
    """
1758
    return ploneapi.portal.get_registry_record(name, default=default)
1759
1760
1761
def to_display_list(pairs, sort_by="key", allow_empty=True):
1762
    """Create a Plone DisplayList from list items
1763
1764
    :param pairs: list of key, value pairs
1765
    :param sort_by: Sort the items either by key or value
1766
    :param allow_empty: Allow to select an empty value
1767
    :returns: Plone DisplayList
1768
    """
1769
    dl = DisplayList()
1770
1771
    if isinstance(pairs, six.string_types):
1772
        pairs = [pairs, pairs]
1773
    for pair in pairs:
1774
        # pairs is a list of lists -> add each pair
1775
        if isinstance(pair, (tuple, list)):
1776
            dl.add(*pair)
1777
        # pairs is just a single pair -> add it and stop
1778
        if isinstance(pair, six.string_types):
1779
            dl.add(*pairs)
1780
            break
1781
1782
    # add the empty option
1783
    if allow_empty:
1784
        dl.add("", "")
1785
1786
    # sort by key/value
1787
    if sort_by == "key":
1788
        dl = dl.sortedByKey()
1789
    elif sort_by == "value":
1790
        dl = dl.sortedByValue()
1791
1792
    return dl
1793
1794
1795
def text_to_html(text, wrap="p", encoding="utf8"):
1796
    """Convert `\n` sequences in the text to HTML `\n`
1797
1798
    :param text: Plain text to convert
1799
    :param wrap: Toggle to wrap the text in a
1800
    :returns: HTML converted and encoded text
1801
    """
1802
    if not text:
1803
        return ""
1804
    # handle text internally as unicode
1805
    text = safe_unicode(text)
1806
    # replace newline characters with HTML entities
1807
    html = text.replace("\n", "<br/>")
1808
    if wrap:
1809
        html = u"<{tag}>{html}</{tag}>".format(
1810
            tag=wrap, html=html)
1811
    # return encoded html
1812
    return html.encode(encoding)
1813
1814
1815
def to_utf8(string, default=_marker):
1816
    """Encode string to UTF8
1817
1818
    :param string: String to be encoded to UTF8
1819
    :returns: UTF8 encoded string
1820
    """
1821
    if not isinstance(string, six.string_types):
1822
        if default is _marker:
1823
            fail("Expected string type, got '%s'" % type(string))
1824
        return default
1825
    return safe_unicode(string).encode("utf8")
1826
1827
1828
def is_temporary(obj):
1829
    """Returns whether the given object is temporary or not
1830
1831
    :param obj: the object to evaluate
1832
    :returns: True if the object is temporary
1833
    """
1834
    if ITemporaryObject.providedBy(obj):
1835
        return True
1836
1837
    obj_id = getattr(aq_base(obj), "id", None)
1838
    if obj_id is None or UID_RX.match(obj_id):
1839
        return True
1840
1841
    parent = aq_parent(aq_inner(obj))
1842
    if not parent:
1843
        return True
1844
1845
    parent_id = getattr(aq_base(parent), "id", None)
1846
    if parent_id is None or UID_RX.match(parent_id):
1847
        return True
1848
1849
    # Checks to see if we are created inside the portal_factory.
1850
    # This might also happen for DX types in senaite.databox!
1851
    meta_type = getattr(aq_base(parent), "meta_type", "")
1852
    if meta_type == "TempFolder":
1853
        return True
1854
1855
    return False
1856
1857
1858
def mark_temporary(brain_or_object):
1859
    """Mark the object as temporary
1860
    """
1861
    obj = get_object(brain_or_object)
1862
    alsoProvides(obj, ITemporaryObject)
1863
1864
1865
def unmark_temporary(brain_or_object):
1866
    """Unmark the object as temporary
1867
    """
1868
    obj = get_object(brain_or_object)
1869
    noLongerProvides(obj, ITemporaryObject)
1870
1871
1872
def is_string(thing):
1873
    """Checks if the passed in object is a string type
1874
1875
    :param thing: object to test
1876
    :returns: True if the object is a string
1877
    """
1878
    return isinstance(thing, six.string_types)
1879
1880
1881
def parse_json(thing, default=""):
1882
    """Parse from JSON
1883
1884
    :param thing: thing to parse
1885
    :param default: value to return if cannot parse
1886
    :returns: the object representing the JSON or default
1887
    """
1888
    try:
1889
        return json.loads(thing)
1890
    except (TypeError, ValueError):
1891
        return default
1892
1893
1894
def to_list(value):
1895
    """Converts the value to a list
1896
1897
    :param value: the value to be represented as a list
1898
    :returns: a list that represents or contains the value
1899
    """
1900
    if is_string(value):
1901
        val = parse_json(value)
1902
        if isinstance(val, (list, tuple, set)):
1903
            value = val
1904
    if not isinstance(value, (list, tuple, set)):
1905
        value = [value]
1906
    return list(value)
1907