Passed
Push — 2.x ( 78af08...f8d2ad )
by Ramon
06:16
created

bika.lims.api.delete()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 17
rs 10
c 0
b 0
f 0
cc 3
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from plone import api as ploneapi
41
from plone.api.exc import InvalidParameterError
42
from plone.app.layout.viewlets.content import ContentHistoryView
43
from plone.behavior.interfaces import IBehaviorAssignable
44
from plone.dexterity.interfaces import IDexterityContent
45
from plone.dexterity.schema import SchemaInvalidatedEvent
46
from plone.dexterity.utils import addContentToContainer
47
from plone.dexterity.utils import createContent
48
from plone.dexterity.utils import resolveDottedName
49
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
50
from plone.i18n.normalizer.interfaces import IIDNormalizer
51
from plone.memoize.volatile import DontCache
52
from Products.Archetypes.atapi import DisplayList
53
from Products.Archetypes.BaseObject import BaseObject
54
from Products.Archetypes.event import ObjectInitializedEvent
55
from Products.Archetypes.utils import mapply
56
from Products.CMFCore.interfaces import IFolderish
57
from Products.CMFCore.interfaces import ISiteRoot
58
from Products.CMFCore.permissions import DeleteObjects
59
from Products.CMFCore.permissions import ModifyPortalContent
60
from Products.CMFCore.permissions import View
61
from Products.CMFCore.utils import getToolByName
62
from Products.CMFCore.WorkflowCore import WorkflowException
63
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
64
from Products.CMFPlone.utils import _createObjectByType
65
from Products.CMFPlone.utils import base_hasattr
66
from Products.CMFPlone.utils import safe_unicode
67
from Products.PlonePAS.tools.memberdata import MemberData
68
from Products.ZCatalog.interfaces import ICatalogBrain
69
from senaite.core.interfaces import ITemporaryObject
70
from zope import globalrequest
71
from zope.annotation.interfaces import IAttributeAnnotatable
72
from zope.component import getUtility
73
from zope.component import queryMultiAdapter
74
from zope.event import notify
75
from zope.interface import alsoProvides
76
from zope.interface import directlyProvides
77
from zope.interface import noLongerProvides
78
from zope.publisher.browser import TestRequest
79
from zope.schema import getFieldsInOrder
80
from zope.security.interfaces import Unauthorized
81
82
"""SENAITE LIMS Framework API
83
84
Please see tests/doctests/API.rst for documentation.
85
86
Architecural Notes:
87
88
Please add only functions that do a single thing for a single object.
89
90
Good: `def get_foo(brain_or_object)`
91
Bad:  `def get_foos(list_of_brain_objects)`
92
93
Why?
94
95
Because it makes things more complex. You can always use a pattern like this to
96
achieve the same::
97
98
    >>> foos = map(get_foo, list_of_brain_objects)
99
100
Please add for all functions a descriptive test in tests/doctests/API.rst.
101
102
Thanks.
103
"""
104
105
_marker = object()
106
107
UID_RX = re.compile("[a-z0-9]{32}$")
108
109
UID_CATALOG = "uid_catalog"
110
PORTAL_CATALOG = "portal_catalog"
111
112
113
class APIError(Exception):
114
    """Base exception class for bika.lims errors."""
115
116
117
def get_portal():
118
    """Get the portal object
119
120
    :returns: Portal object
121
    """
122
    return ploneapi.portal.getSite()
123
124
125
def get_setup():
126
    """Fetch the `bika_setup` folder.
127
    """
128
    portal = get_portal()
129
    return portal.get("bika_setup")
130
131
132
def get_bika_setup():
133
    """Fetch the `bika_setup` folder.
134
    """
135
    return get_setup()
136
137
138
def get_senaite_setup():
139
    """Fetch the new DX `setup` folder.
140
    """
141
    portal = get_portal()
142
    return portal.get("setup")
143
144
145
def create(container, portal_type, *args, **kwargs):
146
    """Creates an object in Bika LIMS
147
148
    This code uses most of the parts from the TypesTool
149
    see: `Products.CMFCore.TypesTool._constructInstance`
150
151
    :param container: container
152
    :type container: ATContentType/DexterityContentType/CatalogBrain
153
    :param portal_type: The portal type to create, e.g. "Client"
154
    :type portal_type: string
155
    :param title: The title for the new content object
156
    :type title: string
157
    :returns: The new created object
158
    """
159
    from bika.lims.utils import tmpID
160
161
    tmp_id = tmpID()
162
    id = kwargs.pop("id", "")
163
    title = kwargs.pop("title", "")
164
165
    # get the fti
166
    types_tool = get_tool("portal_types")
167
    fti = types_tool.getTypeInfo(portal_type)
168
169
    if fti.product:
170
        # create the AT object
171
        obj = _createObjectByType(portal_type, container, id or tmp_id)
172
        # update the object with values
173
        edit(obj, check_permissions=False, title=title, **kwargs)
174
        # auto-id if required
175
        if obj._at_rename_after_creation:
176
            obj._renameAfterCreation(check_auto_id=True)
177
        # we are no longer under creation
178
        obj.unmarkCreationFlag()
179
        # notify that the object was created
180
        notify(ObjectInitializedEvent(obj))
181
    else:
182
        content = createContent(portal_type, **kwargs)
183
        content.id = id
184
        content.title = title
185
        obj = addContentToContainer(container, content)
186
187
    return obj
188
189
190
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
191
    """Creates a copy of the source object. If container is None, creates the
192
    copy inside the same container as the source. If portal_type is specified,
193
    creates a new object of this type, and copies the values from source fields
194
    to the destination object. Field values sent as kwargs have priority over
195
    the field values from source.
196
197
    :param source: object from which create a copy
198
    :type source: ATContentType/DexterityContentType/CatalogBrain
199
    :param container: destination container
200
    :type container: ATContentType/DexterityContentType/CatalogBrain
201
    :param portal_type: destination portal type
202
    :returns: The new created object
203
    """
204
    # Prevent circular dependencies
205
    from security import check_permission
206
    # Use same container as source unless explicitly set
207
    source = get_object(source)
208
    if not container:
209
        container = get_parent(source)
210
211
    # Use same portal type as source unless explicitly set
212
    if not portal_type:
213
        portal_type = get_portal_type(source)
214
215
    # Extend the fields to skip with defaults
216
    skip = kwargs.pop("skip", [])
217
    skip = set(skip)
218
    skip.update([
219
        "Products.Archetypes.Field.ComputedField",
220
        "UID",
221
        "id",
222
        "allowDiscussion",
223
        "contributors",
224
        "creation_date",
225
        "creators",
226
        "effectiveDate",
227
        "expirationDate",
228
        "language",
229
        "location",
230
        "modification_date",
231
        "rights",
232
        "subject",
233
    ])
234
    # Build a dict for complexity reduction
235
    skip = dict([(item, True) for item in skip])
236
237
    # Update kwargs with the field values to copy from source
238
    fields = get_fields(source)
239
    for field_name, field in fields.items():
240
        # Prioritize field values passed as kwargs
241
        if field_name in kwargs:
242
            continue
243
        # Skip framework internal fields by name
244
        if skip.get(field_name, False):
245
            continue
246
        # Skip fields of non-suitable types
247
        if hasattr(field, "getType") and skip.get(field.getType(), False):
248
            continue
249
        # Skip readonly fields
250
        if getattr(field, "readonly", False):
251
            continue
252
        # Skip non-readable fields
253
        perm = getattr(field, "read_permission", View)
254
        if perm and not check_permission(perm, source):
255
            continue
256
257
        # do not wake-up objects unnecessarily
258
        if hasattr(field, "getRaw"):
259
            field_value = field.getRaw(source)
260
        elif hasattr(field, "get_raw"):
261
            field_value = field.get_raw(source)
262
        elif hasattr(field, "getAccessor"):
263
            accessor = field.getAccessor(source)
264
            field_value = accessor()
265
        else:
266
            field_value = field.get(source)
267
268
        # Do a hard copy of value if mutable type
269
        if isinstance(field_value, (list, dict, set)):
270
            field_value = copy.deepcopy(field_value)
271
        kwargs.update({field_name: field_value})
272
273
    # Create a copy
274
    return create(container, portal_type, *args, **kwargs)
275
276
277
def edit(obj, check_permissions=True, **kwargs):
278
    """Updates the values of object fields with the new values passed-in
279
    """
280
    # Prevent circular dependencies
281
    from security import check_permission
282
    fields = get_fields(obj)
283
    for name, value in kwargs.items():
284
        field = fields.get(name, None)
285
        if not field:
286
            continue
287
288
        # cannot update readonly fields
289
        readonly = getattr(field, "readonly", False)
290
        if readonly:
291
            raise ValueError("Field '{}' is readonly".format(name))
292
293
        # check field writable permission
294
        if check_permissions:
295
            perm = getattr(field, "write_permission", ModifyPortalContent)
296
            if perm and not check_permission(perm, obj):
297
                raise Unauthorized("Field '{}' is not writeable".format(name))
298
299
        # Set the value
300
        if hasattr(field, "getMutator"):
301
            mutator = field.getMutator(obj)
302
            mapply(mutator, value)
303
        else:
304
            field.set(obj, value)
305
306
307
def uncatalog_object(obj):
308
    """Un-catalog the object from all catalogs
309
310
    :param obj: object to un-catalog
311
    :type obj: ATContentType/DexterityContentType
312
    """
313
    # un-catalog from registered catalogs
314
    obj.unindexObject()
315
    # explicitly un-catalog from uid_catalog
316
    uid_catalog = get_tool("uid_catalog")
317
    # the uids of uid_catalog are relative paths to portal root
318
    # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
319
    url = "/".join(obj.getPhysicalPath()[2:])
320
    uid_catalog.uncatalog_object(url)
321
322
323
def catalog_object(obj):
324
    """Re-catalog the object
325
326
    :param obj: object to un-catalog
327
    :type obj: ATContentType/DexterityContentType
328
    """
329
    if is_at_content(obj):
330
        # explicitly re-catalog AT types at uid_catalog (DX types are
331
        # automatically reindexed in UID catalog on reindexObject)
332
        uc = get_tool("uid_catalog")
333
        # the uids of uid_catalog are relative paths to portal root
334
        # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
335
        url = "/".join(obj.getPhysicalPath()[2:])
336
        uc.catalog_object(obj, url)
337
    obj.reindexObject()
338
339
340
def delete(obj, check_permissions=True, suppress_events=False):
341
    """Deletes the given object
342
343
    :param obj: object to un-catalog
344
    :param check_permissions: whether delete permission must be checked
345
    :param suppress_events: whether ondelete events have to be fired
346
    :type obj: ATContentType/DexterityContentType
347
    """
348
    from security import check_permission
349
    if check_permissions and not check_permission(DeleteObjects, obj):
350
        raise Unauthorized("Do not have permissions to remove this object")
351
352
    # un-catalog the object from all catalogs (uid_catalog included)
353
    uncatalog_object(obj)
354
    # delete the object
355
    parent = get_parent(obj)
356
    parent._delObject(obj.getId(), suppress_events=suppress_events)
357
358
359
def get_tool(name, context=None, default=_marker):
360
    """Get a portal tool by name
361
362
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
363
    :type name: string
364
    :param context: A portal object
365
    :type context: ATContentType/DexterityContentType/CatalogBrain
366
    :returns: Portal Tool
367
    """
368
369
    # Try first with the context
370
    if context is not None:
371
        try:
372
            context = get_object(context)
373
            return getToolByName(context, name)
374
        except (APIError, AttributeError) as e:
375
            # https://github.com/senaite/bika.lims/issues/396
376
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
377
                        "-> falling back to plone.api.portal.get_tool('{}')"
378
                        .format(repr(context), name, repr(e), name))
379
            return get_tool(name, default=default)
380
381
    # Try with the plone api
382
    try:
383
        return ploneapi.portal.get_tool(name)
384
    except InvalidParameterError:
385
        if default is not _marker:
386
            if isinstance(default, six.string_types):
387
                return get_tool(default)
388
            return default
389
        fail("No tool named '%s' found." % name)
390
391
392
def fail(msg=None):
393
    """API LIMS Error
394
    """
395
    if msg is None:
396
        msg = "Reason not given."
397
    raise APIError("{}".format(msg))
398
399
400
def is_object(brain_or_object):
401
    """Check if the passed in object is a supported portal content object
402
403
    :param brain_or_object: A single catalog brain or content object
404
    :type brain_or_object: Portal Object
405
    :returns: True if the passed in object is a valid portal content
406
    """
407
    if is_portal(brain_or_object):
408
        return True
409
    if is_supermodel(brain_or_object):
410
        return True
411
    if is_at_content(brain_or_object):
412
        return True
413
    if is_dexterity_content(brain_or_object):
414
        return True
415
    if is_brain(brain_or_object):
416
        return True
417
    return False
418
419
420
def get_object(brain_object_uid, default=_marker):
421
    """Get the full content object
422
423
    :param brain_object_uid: A catalog brain or content object or uid
424
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
425
    /CatalogBrain/basestring
426
    :returns: The full object
427
    """
428
    if is_uid(brain_object_uid):
429
        return get_object_by_uid(brain_object_uid, default=default)
430
    elif is_supermodel(brain_object_uid):
431
        return brain_object_uid.instance
432
    if not is_object(brain_object_uid):
433
        if default is _marker:
434
            fail("{} is not supported.".format(repr(brain_object_uid)))
435
        return default
436
    if is_brain(brain_object_uid):
437
        return brain_object_uid.getObject()
438
    return brain_object_uid
439
440
441
def is_portal(brain_or_object):
442
    """Checks if the passed in object is the portal root object
443
444
    :param brain_or_object: A single catalog brain or content object
445
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
446
    :returns: True if the object is the portal root object
447
    :rtype: bool
448
    """
449
    return ISiteRoot.providedBy(brain_or_object)
450
451
452
def is_brain(brain_or_object):
453
    """Checks if the passed in object is a portal catalog brain
454
455
    :param brain_or_object: A single catalog brain or content object
456
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
457
    :returns: True if the object is a catalog brain
458
    :rtype: bool
459
    """
460
    return ICatalogBrain.providedBy(brain_or_object)
461
462
463
def is_supermodel(brain_or_object):
464
    """Checks if the passed in object is a supermodel
465
466
    :param brain_or_object: A single catalog brain or content object
467
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
468
    :returns: True if the object is a catalog brain
469
    :rtype: bool
470
    """
471
    # avoid circular imports
472
    from senaite.app.supermodel.interfaces import ISuperModel
473
    return ISuperModel.providedBy(brain_or_object)
474
475
476
def is_dexterity_content(brain_or_object):
477
    """Checks if the passed in object is a dexterity content type
478
479
    :param brain_or_object: A single catalog brain or content object
480
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
481
    :returns: True if the object is a dexterity content type
482
    :rtype: bool
483
    """
484
    return IDexterityContent.providedBy(brain_or_object)
485
486
487
def is_at_content(brain_or_object):
488
    """Checks if the passed in object is an AT content type
489
490
    :param brain_or_object: A single catalog brain or content object
491
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
492
    :returns: True if the object is an AT content type
493
    :rtype: bool
494
    """
495
    return isinstance(brain_or_object, BaseObject)
496
497
498
def is_dx_type(portal_type):
499
    """Checks if the portal type is DX based
500
501
    :param portal_type: The portal type name to check
502
    :returns: True if the portal type is DX based
503
    """
504
    portal_types = get_tool("portal_types")
505
    fti = portal_types.getTypeInfo(portal_type)
506
    if fti.product:
507
        return False
508
    return True
509
510
511
def is_at_type(portal_type):
512
    """Checks if the portal type is AT based
513
514
    :param portal_type: The portal type name to check
515
    :returns: True if the portal type is AT based
516
    """
517
    return not is_dx_type(portal_type)
518
519
520
def is_folderish(brain_or_object):
521
    """Checks if the passed in object is folderish
522
523
    :param brain_or_object: A single catalog brain or content object
524
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
525
    :returns: True if the object is folderish
526
    :rtype: bool
527
    """
528
    if hasattr(brain_or_object, "is_folderish"):
529
        if callable(brain_or_object.is_folderish):
530
            return brain_or_object.is_folderish()
531
        return brain_or_object.is_folderish
532
    return IFolderish.providedBy(get_object(brain_or_object))
533
534
535
def get_portal_type(brain_or_object):
536
    """Get the portal type for this object
537
538
    :param brain_or_object: A single catalog brain or content object
539
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
540
    :returns: Portal type
541
    :rtype: string
542
    """
543
    if not is_object(brain_or_object):
544
        fail("{} is not supported.".format(repr(brain_or_object)))
545
    return brain_or_object.portal_type
546
547
548
def get_schema(brain_or_object):
549
    """Get the schema of the content
550
551
    :param brain_or_object: A single catalog brain or content object
552
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
553
    :returns: Schema object
554
    """
555
    obj = get_object(brain_or_object)
556
    if is_portal(obj):
557
        fail("get_schema can't return schema of portal root")
558
    if is_dexterity_content(obj):
559
        pt = get_tool("portal_types")
560
        fti = pt.getTypeInfo(obj.portal_type)
561
        return fti.lookupSchema()
562
    if is_at_content(obj):
563
        return obj.Schema()
564
    fail("{} has no Schema.".format(brain_or_object))
565
566
567
def get_behaviors(portal_type):
568
    """List all behaviors
569
570
    :param portal_type: DX portal type name
571
    """
572
    portal_types = get_tool("portal_types")
573
    fti = portal_types.getTypeInfo(portal_type)
574
    if fti.product:
575
        raise TypeError("Expected DX type, got AT type instead.")
576
    return fti.behaviors
577
578
579
def enable_behavior(portal_type, behavior_id):
580
    """Enable behavior
581
582
    :param portal_type: DX portal type name
583
    :param behavior_id: The behavior to enable
584
    """
585
    portal_types = get_tool("portal_types")
586
    fti = portal_types.getTypeInfo(portal_type)
587
    if fti.product:
588
        raise TypeError("Expected DX type, got AT type instead.")
589
590
    if behavior_id not in fti.behaviors:
591
        fti.behaviors += (behavior_id, )
592
        # invalidate schema cache
593
        notify(SchemaInvalidatedEvent(portal_type))
594
595
596
def disable_behavior(portal_type, behavior_id):
597
    """Disable behavior
598
599
    :param portal_type: DX portal type name
600
    :param behavior_id: The behavior to disable
601
    """
602
    portal_types = get_tool("portal_types")
603
    fti = portal_types.getTypeInfo(portal_type)
604
    if fti.product:
605
        raise TypeError("Expected DX type, got AT type instead.")
606
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
607
    # invalidate schema cache
608
    notify(SchemaInvalidatedEvent(portal_type))
609
610
611
def get_fields(brain_or_object):
612
    """Get a name to field mapping of the object
613
614
    :param brain_or_object: A single catalog brain or content object
615
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
616
    :returns: Mapping of name -> field
617
    :rtype: OrderedDict
618
    """
619
    obj = get_object(brain_or_object)
620
    schema = get_schema(obj)
621
    if is_dexterity_content(obj):
622
        # get the fields directly provided by the interface
623
        fields = getFieldsInOrder(schema)
624
        # append the fields coming from behaviors
625
        behavior_assignable = IBehaviorAssignable(obj)
626
        if behavior_assignable:
627
            behaviors = behavior_assignable.enumerateBehaviors()
628
            for behavior in behaviors:
629
                fields.extend(getFieldsInOrder(behavior.interface))
630
        return OrderedDict(fields)
631
    return OrderedDict(schema)
632
633
634
def get_id(brain_or_object):
635
    """Get the Plone ID for this object
636
637
    :param brain_or_object: A single catalog brain or content object
638
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
639
    :returns: Plone ID
640
    :rtype: string
641
    """
642
    if is_brain(brain_or_object):
643
        if base_hasattr(brain_or_object, "getId"):
644
            return brain_or_object.getId
645
        if base_hasattr(brain_or_object, "id"):
646
            return brain_or_object.id
647
    return get_object(brain_or_object).getId()
648
649
650
def get_title(brain_or_object):
651
    """Get the Title for this object
652
653
    :param brain_or_object: A single catalog brain or content object
654
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
655
    :returns: Title
656
    :rtype: string
657
    """
658
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
659
        return brain_or_object.Title
660
    return get_object(brain_or_object).Title()
661
662
663
def get_description(brain_or_object):
664
    """Get the Title for this object
665
666
    :param brain_or_object: A single catalog brain or content object
667
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
668
    :returns: Title
669
    :rtype: string
670
    """
671
    if is_brain(brain_or_object) \
672
            and base_hasattr(brain_or_object, "Description"):
673
        return brain_or_object.Description
674
    return get_object(brain_or_object).Description()
675
676
677
def get_uid(brain_or_object):
678
    """Get the Plone UID for this object
679
680
    :param brain_or_object: A single catalog brain or content object or an UID
681
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
682
    :returns: Plone UID
683
    :rtype: string
684
    """
685
    if is_uid(brain_or_object):
686
        return brain_or_object
687
    if is_portal(brain_or_object):
688
        return '0'
689
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
690
        return brain_or_object.UID
691
    return get_object(brain_or_object).UID()
692
693
694
def get_url(brain_or_object):
695
    """Get the absolute URL for this object
696
697
    :param brain_or_object: A single catalog brain or content object
698
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
699
    :returns: Absolute URL
700
    :rtype: string
701
    """
702
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
703
        return brain_or_object.getURL()
704
    return get_object(brain_or_object).absolute_url()
705
706
707
def get_icon(brain_or_object, html_tag=True):
708
    """Get the icon of the content object
709
710
    :param brain_or_object: A single catalog brain or content object
711
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
712
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
713
    :type html_tag: bool
714
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
715
    :rtype: string
716
    """
717
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
718
    # work for Contents coming from other catalogs than the
719
    # `portal_catalog`
720
    portal_types = get_tool("portal_types")
721
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
722
    icon = fti.getIcon()
723
    if not icon:
724
        return ""
725
    url = "%s/%s" % (get_url(get_portal()), icon)
726
    if not html_tag:
727
        return url
728
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
729
        url=url, title=get_title(brain_or_object))
730
    return tag
731
732
733
def get_object_by_uid(uid, default=_marker):
734
    """Find an object by a given UID
735
736
    :param uid: The UID of the object to find
737
    :type uid: string
738
    :returns: Found Object or None
739
    """
740
741
    # nothing to do here
742
    if not uid:
743
        if default is not _marker:
744
            return default
745
        fail("get_object_by_uid requires UID as first argument; got {} instead"
746
             .format(uid))
747
748
    # we defined the portal object UID to be '0'::
749
    if uid == '0':
750
        return get_portal()
751
752
    brain = get_brain_by_uid(uid)
753
754
    if brain is None:
755
        if default is not _marker:
756
            return default
757
        fail("No object found for UID {}".format(uid))
758
759
    return get_object(brain)
760
761
762
def get_brain_by_uid(uid, default=None):
763
    """Query a brain by a given UID
764
765
    :param uid: The UID of the object to find
766
    :type uid: string
767
    :returns: ZCatalog brain or None
768
    """
769
    if not is_uid(uid):
770
        return default
771
772
    # we try to find the object with the UID catalog
773
    uc = get_tool(UID_CATALOG)
774
775
    # try to find the object with the reference catalog first
776
    brains = uc(UID=uid)
777
    if len(brains) != 1:
778
        return default
779
    return brains[0]
780
781
782
def get_object_by_path(path, default=_marker):
783
    """Find an object by a given physical path or absolute_url
784
785
    :param path: The physical path of the object to find
786
    :type path: string
787
    :returns: Found Object or None
788
    """
789
790
    # nothing to do here
791
    if not path:
792
        if default is not _marker:
793
            return default
794
        fail("get_object_by_path first argument must be a path; {} received"
795
             .format(path))
796
797
    portal = get_portal()
798
    portal_path = get_path(portal)
799
    portal_url = get_url(portal)
800
801
    # ensure we have a physical path
802
    if path.startswith(portal_url):
803
        request = get_request()
804
        path = "/".join(request.physicalPathFromURL(path))
805
806
    if not path.startswith(portal_path):
807
        if default is not _marker:
808
            return default
809
        fail("Not a physical path inside the portal.")
810
811
    if path == portal_path:
812
        return portal
813
814
    return portal.restrictedTraverse(path, default)
815
816
817
def get_path(brain_or_object):
818
    """Calculate the physical path of this object
819
820
    :param brain_or_object: A single catalog brain or content object
821
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
822
    :returns: Physical path of the object
823
    :rtype: string
824
    """
825
    if is_brain(brain_or_object):
826
        return brain_or_object.getPath()
827
    return "/".join(get_object(brain_or_object).getPhysicalPath())
828
829
830
def get_parent_path(brain_or_object):
831
    """Calculate the physical parent path of this object
832
833
    :param brain_or_object: A single catalog brain or content object
834
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
835
    :returns: Physical path of the parent object
836
    :rtype: string
837
    """
838
    if is_portal(brain_or_object):
839
        return get_path(get_portal())
840
    if is_brain(brain_or_object):
841
        path = get_path(brain_or_object)
842
        return path.rpartition("/")[0]
843
    return get_path(get_object(brain_or_object).aq_parent)
844
845
846
def get_parent(brain_or_object, **kw):
847
    """Locate the parent object of the content/catalog brain
848
849
    :param brain_or_object: A single catalog brain or content object
850
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
851
    :param catalog_search: Use a catalog query to find the parent object
852
    :type catalog_search: bool
853
    :returns: parent object
854
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
855
    """
856
857
    if is_portal(brain_or_object):
858
        return get_portal()
859
860
    # BBB: removed `catalog_search` keyword
861
    if kw:
862
        logger.warn("API function `get_parent` no longer support keywords.")
863
864
    return get_object(brain_or_object).aq_parent
865
866
867
def search(query, catalog=_marker):
868
    """Search for objects.
869
870
    :param query: A suitable search query.
871
    :type query: dict
872
    :param catalog: A single catalog id or a list of catalog ids
873
    :type catalog: str/list
874
    :returns: Search results
875
    :rtype: List of ZCatalog brains
876
    """
877
878
    # query needs to be a dictionary
879
    if not isinstance(query, dict):
880
        fail("Catalog query needs to be a dictionary")
881
882
    # Portal types to query
883
    portal_types = query.get("portal_type", [])
884
    # We want the portal_type as a list
885
    if not isinstance(portal_types, (tuple, list)):
886
        portal_types = [portal_types]
887
888
    # The catalogs used for the query
889
    catalogs = []
890
891
    # The user did **not** specify a catalog
892
    if catalog is _marker:
893
        # Find the registered catalogs for the queried portal types
894
        for portal_type in portal_types:
895
            # Just get the first registered/default catalog
896
            catalogs.append(get_catalogs_for(
897
                portal_type, default=UID_CATALOG)[0])
898
    else:
899
        # User defined catalogs
900
        if isinstance(catalog, (list, tuple)):
901
            catalogs.extend(map(get_tool, catalog))
902
        else:
903
            catalogs.append(get_tool(catalog))
904
905
    # Cleanup: Avoid duplicate catalogs
906
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
907
908
    # We only support **single** catalog queries
909
    if len(catalogs) > 1:
910
        fail("Multi Catalog Queries are not supported!")
911
912
    return catalogs[0](query)
913
914
915
def safe_getattr(brain_or_object, attr, default=_marker):
916
    """Return the attribute value
917
918
    :param brain_or_object: A single catalog brain or content object
919
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
920
    :param attr: Attribute name
921
    :type attr: str
922
    :returns: Attribute value
923
    :rtype: obj
924
    """
925
    try:
926
        value = getattr(brain_or_object, attr, _marker)
927
        if value is _marker:
928
            if default is not _marker:
929
                return default
930
            fail("Attribute '{}' not found.".format(attr))
931
        if callable(value):
932
            return value()
933
        return value
934
    except Unauthorized:
935
        if default is not _marker:
936
            return default
937
        fail("You are not authorized to access '{}' of '{}'.".format(
938
            attr, repr(brain_or_object)))
939
940
941
def get_uid_catalog():
942
    """Get the UID catalog tool
943
944
    :returns: UID Catalog Tool
945
    """
946
    return get_tool(UID_CATALOG)
947
948
949
def get_portal_catalog():
950
    """Get the portal catalog tool
951
952
    :returns: Portal Catalog Tool
953
    """
954
    return get_tool(PORTAL_CATALOG)
955
956
957
def get_review_history(brain_or_object, rev=True):
958
    """Get the review history for the given brain or context.
959
960
    :param brain_or_object: A single catalog brain or content object
961
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
962
    :returns: Workflow history
963
    :rtype: [{}, ...]
964
    """
965
    obj = get_object(brain_or_object)
966
    review_history = []
967
    try:
968
        workflow = get_tool("portal_workflow")
969
        review_history = workflow.getInfoFor(obj, 'review_history')
970
    except WorkflowException as e:
971
        message = str(e)
972
        logger.error("Cannot retrieve review_history on {}: {}".format(
973
            obj, message))
974
    if not isinstance(review_history, (list, tuple)):
975
        logger.error("get_review_history: expected list, received {}".format(
976
            review_history))
977
        review_history = []
978
979
    if isinstance(review_history, tuple):
980
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
981
        # tuple when "review_history" is passed in:
982
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
983
        #
984
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
985
        # Expression.StateChangeInfo, that always returns a list, except when no
986
        # review_history is found:
987
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
988
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
989
        review_history = list(review_history)
990
991
    if rev is True:
992
        review_history.reverse()
993
    return review_history
994
995
996
def get_revision_history(brain_or_object):
997
    """Get the revision history for the given brain or context.
998
999
    :param brain_or_object: A single catalog brain or content object
1000
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1001
    :returns: Workflow history
1002
    :rtype: obj
1003
    """
1004
    obj = get_object(brain_or_object)
1005
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
1006
    return chv.fullHistory()
1007
1008
1009
def get_workflows_for(brain_or_object):
1010
    """Get the assigned workflows for the given brain or context.
1011
1012
    Note: This function supports also the portal_type as parameter.
1013
1014
    :param brain_or_object: A single catalog brain or content object
1015
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1016
    :returns: Assigned Workflows
1017
    :rtype: tuple
1018
    """
1019
    workflow = ploneapi.portal.get_tool("portal_workflow")
1020
    if isinstance(brain_or_object, six.string_types):
1021
        return workflow.getChainFor(brain_or_object)
1022
    obj = get_object(brain_or_object)
1023
    return workflow.getChainFor(obj)
1024
1025
1026
def get_workflow_status_of(brain_or_object, state_var="review_state"):
1027
    """Get the current workflow status of the given brain or context.
1028
1029
    :param brain_or_object: A single catalog brain or content object
1030
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1031
    :param state_var: The name of the state variable
1032
    :type state_var: string
1033
    :returns: Status
1034
    :rtype: str
1035
    """
1036
    # Try to get the state from the catalog brain first
1037
    if is_brain(brain_or_object):
1038
        if state_var in brain_or_object.schema():
1039
            return brain_or_object[state_var]
1040
1041
    # Retrieve the sate from the object
1042
    workflow = get_tool("portal_workflow")
1043
    obj = get_object(brain_or_object)
1044
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1045
1046
1047
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1048
    """Get the previous workflow status of the object
1049
1050
    :param brain_or_object: A single catalog brain or content object
1051
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1052
    :param skip: Workflow states to skip
1053
    :type skip: tuple/list
1054
    :returns: status
1055
    :rtype: str
1056
    """
1057
1058
    skip = isinstance(skip, (list, tuple)) and skip or []
1059
    history = get_review_history(brain_or_object)
1060
1061
    # Remove consecutive duplicates, some transitions might happen more than
1062
    # once consecutively (e.g. publish)
1063
    history = map(lambda i: i[0], groupby(history))
1064
1065
    for num, item in enumerate(history):
1066
        # skip the current history entry
1067
        if num == 0:
1068
            continue
1069
        status = item.get("review_state")
1070
        if status in skip:
1071
            continue
1072
        return status
1073
    return default
1074
1075
1076
def get_creation_date(brain_or_object):
1077
    """Get the creation date of the brain or object
1078
1079
    :param brain_or_object: A single catalog brain or content object
1080
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1081
    :returns: Creation date
1082
    :rtype: DateTime
1083
    """
1084
    created = getattr(brain_or_object, "created", None)
1085
    if created is None:
1086
        fail("Object {} has no creation date ".format(
1087
             repr(brain_or_object)))
1088
    if callable(created):
1089
        return created()
1090
    return created
1091
1092
1093
def get_modification_date(brain_or_object):
1094
    """Get the modification date of the brain or object
1095
1096
    :param brain_or_object: A single catalog brain or content object
1097
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1098
    :returns: Modification date
1099
    :rtype: DateTime
1100
    """
1101
    modified = getattr(brain_or_object, "modified", None)
1102
    if modified is None:
1103
        fail("Object {} has no modification date ".format(
1104
             repr(brain_or_object)))
1105
    if callable(modified):
1106
        return modified()
1107
    return modified
1108
1109
1110
def get_review_status(brain_or_object):
1111
    """Get the `review_state` of an object
1112
1113
    :param brain_or_object: A single catalog brain or content object
1114
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1115
    :returns: Value of the review_status variable
1116
    :rtype: String
1117
    """
1118
    if is_brain(brain_or_object) \
1119
       and base_hasattr(brain_or_object, "review_state"):
1120
        return brain_or_object.review_state
1121
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1122
1123
1124
def is_active(brain_or_object):
1125
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1126
1127
    :param brain_or_object: A single catalog brain or content object
1128
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1129
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1130
    :rtype: bool
1131
    """
1132
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1133
        return False
1134
    return True
1135
1136
1137
def get_fti(portal_type, default=None):
1138
    """Lookup the Dynamic Filetype Information for the given portal_type
1139
1140
    :param portal_type: The portal type to get the FTI for
1141
    :returns: FTI or default value
1142
    """
1143
    if not is_string(portal_type):
1144
        return default
1145
    portal_types = get_tool("portal_types")
1146
    fti = portal_types.getTypeInfo(portal_type)
1147
    return fti or default
1148
1149
1150
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1151
    """Get all registered catalogs for the given portal_type, catalog brain or
1152
    content object
1153
1154
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1155
          work around the missing `uid_catalog` during snapshot creation when
1156
          installing a fresh site!
1157
1158
    :param brain_or_object: The portal_type, a catalog brain or content object
1159
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1160
    :returns: List of supported catalogs
1161
    :rtype: list
1162
    """
1163
1164
    # only handle catalog lookups by portal_type internally
1165
    if is_uid(brain_or_object) or is_object(brain_or_object):
1166
        obj = get_object(brain_or_object)
1167
        portal_type = get_portal_type(obj)
1168
        return get_catalogs_for(portal_type)
1169
1170
    catalogs = []
1171
1172
    if not is_string(brain_or_object):
1173
        raise APIError("Expected a portal_type string, got <%s>"
1174
                       % type(brain_or_object))
1175
1176
    # at this point the brain_or_object is a portal_type
1177
    portal_type = brain_or_object
1178
1179
    # check static portal_type -> catalog mapping first
1180
    from senaite.core.catalog import get_catalogs_by_type
1181
    catalogs = get_catalogs_by_type(portal_type)
1182
1183
    # no catalogs in static mapping
1184
    # => Lookup catalogs by FTI
1185
    if len(catalogs) == 0:
1186
        fti = get_fti(portal_type)
1187
        if fti.product:
1188
            # AT content type
1189
            # => Looup via archetype_tool
1190
            archetype_tool = get_tool("archetype_tool")
1191
            catalogs = archetype_tool.catalog_map.get(portal_type) or []
1192
        else:
1193
            # DX content type
1194
            # => resolve the `_catalogs` attribute from the class
1195
            klass = resolveDottedName(fti.klass)
1196
            # XXX: Refactor multi-catalog behavior to not rely
1197
            #      on this hidden `_catalogs` attribute!
1198
            catalogs = getattr(klass, "_catalogs", [])
1199
1200
    # fetch the catalog objects
1201
    catalogs = filter(None, map(lambda cid: get_tool(cid, None), catalogs))
1202
1203
    if len(catalogs) == 0:
1204
        return [get_tool(default, default=PORTAL_CATALOG)]
1205
1206
    return list(catalogs)
1207
1208
1209
def get_transitions_for(brain_or_object):
1210
    """List available workflow transitions for all workflows
1211
1212
    :param brain_or_object: A single catalog brain or content object
1213
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1214
    :returns: All possible available and allowed transitions
1215
    :rtype: list[dict]
1216
    """
1217
    workflow = get_tool('portal_workflow')
1218
    transitions = []
1219
    instance = get_object(brain_or_object)
1220
    for wfid in get_workflows_for(brain_or_object):
1221
        wf = workflow[wfid]
1222
        tlist = wf.getTransitionsFor(instance)
1223
        transitions.extend([t for t in tlist if t not in transitions])
1224
    return transitions
1225
1226
1227
def do_transition_for(brain_or_object, transition):
1228
    """Performs a workflow transition for the passed in object.
1229
1230
    :param brain_or_object: A single catalog brain or content object
1231
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1232
    :returns: The object where the transtion was performed
1233
    """
1234
    if not isinstance(transition, six.string_types):
1235
        fail("Transition type needs to be string, got '%s'" % type(transition))
1236
    obj = get_object(brain_or_object)
1237
    try:
1238
        ploneapi.content.transition(obj, transition)
1239
    except ploneapi.exc.InvalidParameterError as e:
1240
        fail("Failed to perform transition '{}' on {}: {}".format(
1241
             transition, obj, str(e)))
1242
    return obj
1243
1244
1245
def get_roles_for_permission(permission, brain_or_object):
1246
    """Get a list of granted roles for the given permission on the object.
1247
1248
    :param brain_or_object: A single catalog brain or content object
1249
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1250
    :returns: Roles for the given Permission
1251
    :rtype: list
1252
    """
1253
    obj = get_object(brain_or_object)
1254
    allowed = set(rolesForPermissionOn(permission, obj))
1255
    return sorted(allowed)
1256
1257
1258
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1259
    """Checks if the passed in object is versionable.
1260
1261
    :param brain_or_object: A single catalog brain or content object
1262
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1263
    :returns: True if the object is versionable
1264
    :rtype: bool
1265
    """
1266
    pr = get_tool("portal_repository")
1267
    obj = get_object(brain_or_object)
1268
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1269
        and pr.isVersionable(obj)
1270
1271
1272
def get_version(brain_or_object):
1273
    """Get the version of the current object
1274
1275
    :param brain_or_object: A single catalog brain or content object
1276
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1277
    :returns: The current version of the object, or None if not available
1278
    :rtype: int or None
1279
    """
1280
    obj = get_object(brain_or_object)
1281
    if not is_versionable(obj):
1282
        return None
1283
    return getattr(aq_base(obj), "version_id", 0)
1284
1285
1286
def get_view(name, context=None, request=None, default=None):
1287
    """Get the view by name
1288
1289
    :param name: The name of the view
1290
    :type name: str
1291
    :param context: The context to query the view
1292
    :type context: ATContentType/DexterityContentType/CatalogBrain
1293
    :param request: The request to query the view
1294
    :type request: HTTPRequest object
1295
    :returns: HTTP Request
1296
    :rtype: Products.Five.metaclass View object
1297
    """
1298
    context = context or get_portal()
1299
    request = request or get_request() or None
1300
    view = queryMultiAdapter((get_object(context), request), name=name)
1301
    if view is None:
1302
        return default
1303
    return view
1304
1305
1306
def get_request():
1307
    """Get the global request object
1308
1309
    :returns: HTTP Request
1310
    :rtype: HTTPRequest object
1311
    """
1312
    return globalrequest.getRequest()
1313
1314
1315
def get_test_request():
1316
    """Get the TestRequest object
1317
    """
1318
    request = TestRequest()
1319
    directlyProvides(request, IAttributeAnnotatable)
1320
    return request
1321
1322
1323
def get_group(group_or_groupname):
1324
    """Return Plone Group
1325
1326
    :param group_or_groupname: Plone group or the name of the group
1327
    :type groupname:  GroupData/str
1328
    :returns: Plone GroupData
1329
    """
1330
    if not group_or_groupname:
1331
1332
        return None
1333
    if hasattr(group_or_groupname, "_getGroup"):
1334
        return group_or_groupname
1335
    gtool = get_tool("portal_groups")
1336
    return gtool.getGroupById(group_or_groupname)
1337
1338
1339
def get_user(user_or_username):
1340
    """Return Plone User
1341
1342
    :param user_or_username: Plone user or user id
1343
    :returns: Plone MemberData
1344
    """
1345
    user = None
1346
    if isinstance(user_or_username, MemberData):
1347
        user = user_or_username
1348
    if isinstance(user_or_username, six.string_types):
1349
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1350
    return user
1351
1352
1353
def get_user_properties(user_or_username):
1354
    """Return User Properties
1355
1356
    :param user_or_username: Plone group identifier
1357
    :returns: Plone MemberData
1358
    """
1359
    user = get_user(user_or_username)
1360
    if user is None:
1361
        return {}
1362
    if not callable(user.getUser):
1363
        return {}
1364
    out = {}
1365
    plone_user = user.getUser()
1366
    for sheet in plone_user.listPropertysheets():
1367
        ps = plone_user.getPropertysheet(sheet)
1368
        out.update(dict(ps.propertyItems()))
1369
    return out
1370
1371
1372
def get_users_by_roles(roles=None):
1373
    """Search Plone users by their roles
1374
1375
    :param roles: Plone role name or list of roles
1376
    :type roles:  list/str
1377
    :returns: List of Plone users having the role(s)
1378
    """
1379
    if not isinstance(roles, (tuple, list)):
1380
        roles = [roles]
1381
    mtool = get_tool("portal_membership")
1382
    return mtool.searchForMembers(roles=roles)
1383
1384
1385
def get_current_user():
1386
    """Returns the current logged in user
1387
1388
    :returns: Current User
1389
    """
1390
    return ploneapi.user.get_current()
1391
1392
1393
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1394
    """Returns the associated contact of a Plone user
1395
1396
    If the user passed in has no contact associated, return None.
1397
    The `contact_types` parameter filter the portal types for the search.
1398
1399
    :param: Plone user
1400
    :contact_types: List with the contact portal types to search
1401
    :returns: Contact associated to the Plone user or None
1402
    """
1403
    if not user:
1404
        return None
1405
1406
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1407
    query = {"portal_type": contact_types, "getUsername": user.id}
1408
    brains = search(query, catalog=CONTACT_CATALOG)
1409
    if not brains:
1410
        return None
1411
1412
    if len(brains) > 1:
1413
        # Oops, the user has multiple contacts assigned, return None
1414
        contacts = map(lambda c: c.Title, brains)
1415
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1416
        err_msg = err_msg.format(user.id, ','.join(contacts))
1417
        logger.error(err_msg)
1418
        return None
1419
1420
    return get_object(brains[0])
1421
1422
1423
def get_user_client(user_or_contact):
1424
    """Returns the client of the contact of a Plone user
1425
1426
    If the user passed in has no contact or does not belong to any client,
1427
    returns None.
1428
1429
    :param: Plone user or contact
1430
    :returns: Client the contact of the Plone user belongs to
1431
    """
1432
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1433
        # Lab contacts cannot belong to a client
1434
        return None
1435
1436
    if not IContact.providedBy(user_or_contact):
1437
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1438
        if IContact.providedBy(contact):
1439
            return get_user_client(contact)
1440
        return None
1441
1442
    client = get_parent(user_or_contact)
1443
    if client and IClient.providedBy(client):
1444
        return client
1445
1446
    return None
1447
1448
1449
def get_current_client():
1450
    """Returns the current client the current logged in user belongs to, if any
1451
1452
    :returns: Client the current logged in user belongs to or None
1453
    """
1454
    return get_user_client(get_current_user())
1455
1456
1457
def get_cache_key(brain_or_object):
1458
    """Generate a cache key for a common brain or object
1459
1460
    :param brain_or_object: A single catalog brain or content object
1461
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1462
    :returns: Cache Key
1463
    :rtype: str
1464
    """
1465
    key = [
1466
        get_portal_type(brain_or_object),
1467
        get_id(brain_or_object),
1468
        get_uid(brain_or_object),
1469
        # handle different domains gracefully
1470
        get_url(brain_or_object),
1471
        # Return the microsecond since the epoch in GMT
1472
        get_modification_date(brain_or_object).micros(),
1473
    ]
1474
    return "-".join(map(lambda x: str(x), key))
1475
1476
1477
def bika_cache_key_decorator(method, self, brain_or_object):
1478
    """Bika cache key decorator usable for
1479
1480
    :param brain_or_object: A single catalog brain or content object
1481
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1482
    :returns: Cache Key
1483
    :rtype: str
1484
    """
1485
    if brain_or_object is None:
1486
        raise DontCache
1487
    return get_cache_key(brain_or_object)
1488
1489
1490
def normalize_id(string):
1491
    """Normalize the id
1492
1493
    :param string: A string to normalize
1494
    :type string: str
1495
    :returns: Normalized ID
1496
    :rtype: str
1497
    """
1498
    if not isinstance(string, six.string_types):
1499
        fail("Type of argument must be string, found '{}'"
1500
             .format(type(string)))
1501
    # get the id nomalizer utility
1502
    normalizer = getUtility(IIDNormalizer).normalize
1503
    return normalizer(string)
1504
1505
1506
def normalize_filename(string):
1507
    """Normalize the filename
1508
1509
    :param string: A string to normalize
1510
    :type string: str
1511
    :returns: Normalized ID
1512
    :rtype: str
1513
    """
1514
    if not isinstance(string, six.string_types):
1515
        fail("Type of argument must be string, found '{}'"
1516
             .format(type(string)))
1517
    # get the file nomalizer utility
1518
    normalizer = getUtility(IFileNameNormalizer).normalize
1519
    return normalizer(string)
1520
1521
1522
def is_uid(uid, validate=False):
1523
    """Checks if the passed in uid is a valid UID
1524
1525
    :param uid: The uid to check
1526
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1527
    True, also verifies if a brain exists for the uid passed in
1528
    :type uid: string
1529
    :return: True if a valid uid
1530
    :rtype: bool
1531
    """
1532
    if not isinstance(uid, six.string_types):
1533
        return False
1534
    if uid == '0':
1535
        return True
1536
    if len(uid) != 32:
1537
        return False
1538
    if not UID_RX.match(uid):
1539
        return False
1540
    if not validate:
1541
        return True
1542
1543
    # Check if a brain for this uid exists
1544
    uc = get_tool('uid_catalog')
1545
    brains = uc(UID=uid)
1546
    if brains:
1547
        assert (len(brains) == 1)
1548
    return len(brains) > 0
1549
1550
1551
def is_date(date):
1552
    """Checks if the passed in value is a valid Zope's DateTime
1553
1554
    :param date: The date to check
1555
    :type date: DateTime
1556
    :return: True if a valid date
1557
    :rtype: bool
1558
    """
1559
    if not date:
1560
        return False
1561
    return isinstance(date, (DateTime, datetime))
1562
1563
1564
def to_date(value, default=None):
1565
    """Tries to convert the passed in value to Zope's DateTime
1566
1567
    :param value: The value to be converted to a valid DateTime
1568
    :type value: str, DateTime or datetime
1569
    :return: The DateTime representation of the value passed in or default
1570
    """
1571
1572
    # cannot use bika.lims.deprecated (circular dependencies)
1573
    import warnings
1574
    warnings.simplefilter("always", DeprecationWarning)
1575
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1576
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1577
    warnings.simplefilter("default", DeprecationWarning)
1578
1579
    # prevent circular dependencies
1580
    from senaite.core.api.dtime import to_DT
1581
    date = to_DT(value)
1582
    if not date:
1583
        return to_DT(default)
1584
    return date
1585
1586
1587
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1588
               round_to_int=True):
1589
    """Returns the computed total number of minutes
1590
    """
1591
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1592
        float(seconds)/60 + float(milliseconds)/1000/60
1593
    return int(round(total)) if round_to_int else total
1594
1595
1596
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1597
    """Returns a representation of time in a string in xd yh zm format
1598
    """
1599
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1600
                         seconds=seconds, milliseconds=milliseconds)
1601
    delta = timedelta(minutes=int(round(minutes)))
1602
    d = delta.days
1603
    h = delta.seconds // 3600
1604
    m = (delta.seconds // 60) % 60
1605
    m = m and "{}m ".format(str(m)) or ""
1606
    d = d and "{}d ".format(str(d)) or ""
1607
    if m and d:
1608
        h = "{}h ".format(str(h))
1609
    else:
1610
        h = h and "{}h ".format(str(h)) or ""
1611
    return "".join([d, h, m]).strip()
1612
1613
1614
def to_int(value, default=_marker):
1615
    """Tries to convert the value to int.
1616
    Truncates at the decimal point if the value is a float
1617
1618
    :param value: The value to be converted to an int
1619
    :return: The resulting int or default
1620
    """
1621
    if is_floatable(value):
1622
        value = to_float(value)
1623
    try:
1624
        return int(value)
1625
    except (TypeError, ValueError):
1626
        if default is None:
1627
            return default
1628
        if default is not _marker:
1629
            return to_int(default)
1630
        fail("Value %s cannot be converted to int" % repr(value))
1631
1632
1633
def is_floatable(value):
1634
    """Checks if the passed in value is a valid floatable number
1635
1636
    :param value: The value to be evaluated as a float number
1637
    :type value: str, float, int
1638
    :returns: True if is a valid float number
1639
    :rtype: bool"""
1640
    try:
1641
        float(value)
1642
        return True
1643
    except (TypeError, ValueError):
1644
        return False
1645
1646
1647
def to_float(value, default=_marker):
1648
    """Converts the passed in value to a float number
1649
1650
    :param value: The value to be converted to a floatable number
1651
    :type value: str, float, int
1652
    :returns: The float number representation of the passed in value
1653
    :rtype: float
1654
    """
1655
    if not is_floatable(value):
1656
        if default is not _marker:
1657
            return to_float(default)
1658
        fail("Value %s is not floatable" % repr(value))
1659
    return float(value)
1660
1661
1662
def float_to_string(value, default=_marker):
1663
    """Convert a float value to string without exponential notation
1664
1665
    This function preserves the whole fraction
1666
1667
    :param value: The float value to be converted to a string
1668
    :type value: str, float, int
1669
    :returns: String representation of the float w/o exponential notation
1670
    :rtype: str
1671
    """
1672
    if not is_floatable(value):
1673
        if default is not _marker:
1674
            return default
1675
        fail("Value %s is not floatable" % repr(value))
1676
1677
    # Leave floatable string values unchanged
1678
    if isinstance(value, six.string_types):
1679
        return value
1680
1681
    value = float(value)
1682
    str_value = str(value)
1683
1684
    if "." in str_value:
1685
        # might be something like 1.23e-26
1686
        front, back = str_value.split(".")
1687
    else:
1688
        # or 1e-07 for 0.0000001
1689
        back = str_value
1690
1691
    if "e-" in back:
1692
        fraction, zeros = back.split("e-")
1693
        # we want to cover the faction and the zeros
1694
        precision = len(fraction) + int(zeros)
1695
        template = "{:.%df}" % precision
1696
        str_value = template.format(value)
1697
    elif "e+" in back:
1698
        # positive numbers, e.g. 1e+16 don't need a fractional part
1699
        str_value = "{:.0f}".format(value)
1700
1701
    # cut off trailing zeros
1702
    if "." in str_value:
1703
        str_value = str_value.rstrip("0").rstrip(".")
1704
1705
    return str_value
1706
1707
1708
def to_searchable_text_metadata(value):
1709
    """Parse the given metadata value to searchable text
1710
1711
    :param value: The raw value of the metadata column
1712
    :returns: Searchable and translated unicode value or None
1713
    """
1714
    if not value:
1715
        return u""
1716
    if value is Missing.Value:
1717
        return u""
1718
    if is_uid(value):
1719
        return u""
1720
    if isinstance(value, (bool)):
1721
        return u""
1722
    if isinstance(value, (list, tuple)):
1723
        values = map(to_searchable_text_metadata, value)
1724
        values = filter(None, values)
1725
        return " ".join(values)
1726
    if isinstance(value, dict):
1727
        return to_searchable_text_metadata(value.values())
1728
    if is_date(value):
1729
        from senaite.core.api.dtime import date_to_string
1730
        return date_to_string(value, "%Y-%m-%d")
1731
    if is_at_content(value):
1732
        return to_searchable_text_metadata(get_title(value))
1733
    if not isinstance(value, six.string_types):
1734
        value = str(value)
1735
    return safe_unicode(value)
1736
1737
1738
def get_registry_record(name, default=None):
1739
    """Returns the value of a registry record
1740
1741
    :param name: [required] name of the registry record
1742
    :type name: str
1743
    :param default: The value returned if the record is not found
1744
    :type default: anything
1745
    :returns: value of the registry record
1746
    """
1747
    return ploneapi.portal.get_registry_record(name, default=default)
1748
1749
1750
def to_display_list(pairs, sort_by="key", allow_empty=True):
1751
    """Create a Plone DisplayList from list items
1752
1753
    :param pairs: list of key, value pairs
1754
    :param sort_by: Sort the items either by key or value
1755
    :param allow_empty: Allow to select an empty value
1756
    :returns: Plone DisplayList
1757
    """
1758
    dl = DisplayList()
1759
1760
    if isinstance(pairs, six.string_types):
1761
        pairs = [pairs, pairs]
1762
    for pair in pairs:
1763
        # pairs is a list of lists -> add each pair
1764
        if isinstance(pair, (tuple, list)):
1765
            dl.add(*pair)
1766
        # pairs is just a single pair -> add it and stop
1767
        if isinstance(pair, six.string_types):
1768
            dl.add(*pairs)
1769
            break
1770
1771
    # add the empty option
1772
    if allow_empty:
1773
        dl.add("", "")
1774
1775
    # sort by key/value
1776
    if sort_by == "key":
1777
        dl = dl.sortedByKey()
1778
    elif sort_by == "value":
1779
        dl = dl.sortedByValue()
1780
1781
    return dl
1782
1783
1784
def text_to_html(text, wrap="p", encoding="utf8"):
1785
    """Convert `\n` sequences in the text to HTML `\n`
1786
1787
    :param text: Plain text to convert
1788
    :param wrap: Toggle to wrap the text in a
1789
    :returns: HTML converted and encoded text
1790
    """
1791
    if not text:
1792
        return ""
1793
    # handle text internally as unicode
1794
    text = safe_unicode(text)
1795
    # replace newline characters with HTML entities
1796
    html = text.replace("\n", "<br/>")
1797
    if wrap:
1798
        html = u"<{tag}>{html}</{tag}>".format(
1799
            tag=wrap, html=html)
1800
    # return encoded html
1801
    return html.encode(encoding)
1802
1803
1804
def to_utf8(string, default=_marker):
1805
    """Encode string to UTF8
1806
1807
    :param string: String to be encoded to UTF8
1808
    :returns: UTF8 encoded string
1809
    """
1810
    if not isinstance(string, six.string_types):
1811
        if default is _marker:
1812
            fail("Expected string type, got '%s'" % type(string))
1813
        return default
1814
    return safe_unicode(string).encode("utf8")
1815
1816
1817
def is_temporary(obj):
1818
    """Returns whether the given object is temporary or not
1819
1820
    :param obj: the object to evaluate
1821
    :returns: True if the object is temporary
1822
    """
1823
    if ITemporaryObject.providedBy(obj):
1824
        return True
1825
1826
    obj_id = getattr(aq_base(obj), "id", None)
1827
    if obj_id is None or UID_RX.match(obj_id):
1828
        return True
1829
1830
    parent = aq_parent(aq_inner(obj))
1831
    if not parent:
1832
        return True
1833
1834
    parent_id = getattr(aq_base(parent), "id", None)
1835
    if parent_id is None or UID_RX.match(parent_id):
1836
        return True
1837
1838
    # Checks to see if we are created inside the portal_factory.
1839
    # This might also happen for DX types in senaite.databox!
1840
    meta_type = getattr(aq_base(parent), "meta_type", "")
1841
    if meta_type == "TempFolder":
1842
        return True
1843
1844
    return False
1845
1846
1847
def mark_temporary(brain_or_object):
1848
    """Mark the object as temporary
1849
    """
1850
    obj = get_object(brain_or_object)
1851
    alsoProvides(obj, ITemporaryObject)
1852
1853
1854
def unmark_temporary(brain_or_object):
1855
    """Unmark the object as temporary
1856
    """
1857
    obj = get_object(brain_or_object)
1858
    noLongerProvides(obj, ITemporaryObject)
1859
1860
1861
def is_string(thing):
1862
    """Checks if the passed in object is a string type
1863
1864
    :param thing: object to test
1865
    :returns: True if the object is a string
1866
    """
1867
    return isinstance(thing, six.string_types)
1868
1869
1870
def parse_json(thing, default=""):
1871
    """Parse from JSON
1872
1873
    :param thing: thing to parse
1874
    :param default: value to return if cannot parse
1875
    :returns: the object representing the JSON or default
1876
    """
1877
    try:
1878
        return json.loads(thing)
1879
    except (TypeError, ValueError):
1880
        return default
1881
1882
1883
def to_list(value):
1884
    """Converts the value to a list
1885
1886
    :param value: the value to be represented as a list
1887
    :returns: a list that represents or contains the value
1888
    """
1889
    if is_string(value):
1890
        val = parse_json(value)
1891
        if isinstance(val, (list, tuple, set)):
1892
            value = val
1893
    if not isinstance(value, (list, tuple, set)):
1894
        value = [value]
1895
    return list(value)
1896