Passed
Push — 2.x ( d59516...b5e2f5 )
by Jordi
06:37
created

bika.lims.api.get_review_history()   B

Complexity

Conditions 5

Size

Total Lines 37
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 37
rs 8.9833
c 0
b 0
f 0
cc 5
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from AccessControl.Permissions import copy_or_move as CopyOrMove
33
from Acquisition import aq_base
34
from Acquisition import aq_inner
35
from Acquisition import aq_parent
36
from bika.lims import logger
37
from bika.lims.interfaces import IClient
38
from bika.lims.interfaces import IContact
39
from bika.lims.interfaces import ILabContact
40
from DateTime import DateTime
41
from OFS.event import ObjectWillBeMovedEvent
42
from plone import api as ploneapi
43
from plone.api.exc import InvalidParameterError
44
from plone.app.layout.viewlets.content import ContentHistoryView
45
from plone.behavior.interfaces import IBehaviorAssignable
46
from plone.dexterity.interfaces import IDexterityContent
47
from plone.dexterity.schema import SchemaInvalidatedEvent
48
from plone.dexterity.utils import addContentToContainer
49
from plone.dexterity.utils import createContent
50
from plone.dexterity.utils import resolveDottedName
51
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
52
from plone.i18n.normalizer.interfaces import IIDNormalizer
53
from plone.memoize.volatile import DontCache
54
from Products.Archetypes.atapi import DisplayList
55
from Products.Archetypes.BaseObject import BaseObject
56
from Products.Archetypes.event import ObjectInitializedEvent
57
from Products.Archetypes.utils import mapply
58
from Products.CMFCore.interfaces import IFolderish
59
from Products.CMFCore.interfaces import ISiteRoot
60
from Products.CMFCore.permissions import DeleteObjects
61
from Products.CMFCore.permissions import ModifyPortalContent
62
from Products.CMFCore.permissions import View
63
from Products.CMFCore.utils import getToolByName
64
from Products.CMFCore.WorkflowCore import WorkflowException
65
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
66
from Products.CMFPlone.utils import _createObjectByType
67
from Products.CMFPlone.utils import base_hasattr
68
from Products.CMFPlone.utils import safe_unicode
69
from Products.PlonePAS.tools.memberdata import MemberData
70
from Products.ZCatalog.interfaces import ICatalogBrain
71
from senaite.core.interfaces import ITemporaryObject
72
from zope import globalrequest
73
from zope.annotation.interfaces import IAttributeAnnotatable
74
from zope.component import getUtility
75
from zope.component import queryMultiAdapter
76
from zope.container.contained import notifyContainerModified
77
from zope.event import notify
78
from zope.interface import alsoProvides
79
from zope.interface import directlyProvides
80
from zope.interface import noLongerProvides
81
from zope.lifecycleevent import ObjectMovedEvent
82
from zope.publisher.browser import TestRequest
83
from zope.schema import getFieldsInOrder
84
from zope.security.interfaces import Unauthorized
85
86
"""SENAITE LIMS Framework API
87
88
Please see tests/doctests/API.rst for documentation.
89
90
Architecural Notes:
91
92
Please add only functions that do a single thing for a single object.
93
94
Good: `def get_foo(brain_or_object)`
95
Bad:  `def get_foos(list_of_brain_objects)`
96
97
Why?
98
99
Because it makes things more complex. You can always use a pattern like this to
100
achieve the same::
101
102
    >>> foos = map(get_foo, list_of_brain_objects)
103
104
Please add for all functions a descriptive test in tests/doctests/API.rst.
105
106
Thanks.
107
"""
108
109
_marker = object()
110
111
UID_RX = re.compile("[a-z0-9]{32}$")
112
113
UID_CATALOG = "uid_catalog"
114
PORTAL_CATALOG = "portal_catalog"
115
116
117
class APIError(Exception):
118
    """Base exception class for bika.lims errors."""
119
120
121
def get_portal():
122
    """Get the portal object
123
124
    :returns: Portal object
125
    """
126
    return ploneapi.portal.getSite()
127
128
129
def get_setup():
130
    """Fetch the `bika_setup` folder.
131
    """
132
    portal = get_portal()
133
    return portal.get("bika_setup")
134
135
136
def get_bika_setup():
137
    """Fetch the `bika_setup` folder.
138
    """
139
    return get_setup()
140
141
142
def get_senaite_setup():
143
    """Fetch the new DX `setup` folder.
144
    """
145
    portal = get_portal()
146
    return portal.get("setup")
147
148
149
def create(container, portal_type, *args, **kwargs):
150
    """Creates an object in Bika LIMS
151
152
    This code uses most of the parts from the TypesTool
153
    see: `Products.CMFCore.TypesTool._constructInstance`
154
155
    :param container: container
156
    :type container: ATContentType/DexterityContentType/CatalogBrain
157
    :param portal_type: The portal type to create, e.g. "Client"
158
    :type portal_type: string
159
    :param title: The title for the new content object
160
    :type title: string
161
    :returns: The new created object
162
    """
163
    from bika.lims.utils import tmpID
164
165
    tmp_id = tmpID()
166
    id = kwargs.pop("id", "")
167
    title = kwargs.pop("title", "")
168
169
    # get the fti
170
    types_tool = get_tool("portal_types")
171
    fti = types_tool.getTypeInfo(portal_type)
172
173
    if fti.product:
174
        # create the AT object
175
        obj = _createObjectByType(portal_type, container, id or tmp_id)
176
        # update the object with values
177
        edit(obj, check_permissions=False, title=title, **kwargs)
178
        # auto-id if required
179
        if obj._at_rename_after_creation:
180
            obj._renameAfterCreation(check_auto_id=True)
181
        # we are no longer under creation
182
        obj.unmarkCreationFlag()
183
        # notify that the object was created
184
        notify(ObjectInitializedEvent(obj))
185
    else:
186
        content = createContent(portal_type, **kwargs)
187
        content.id = id
188
        content.title = title
189
        obj = addContentToContainer(container, content)
190
191
    return obj
192
193
194
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
195
    """Creates a copy of the source object. If container is None, creates the
196
    copy inside the same container as the source. If portal_type is specified,
197
    creates a new object of this type, and copies the values from source fields
198
    to the destination object. Field values sent as kwargs have priority over
199
    the field values from source.
200
201
    :param source: object from which create a copy
202
    :type source: ATContentType/DexterityContentType/CatalogBrain
203
    :param container: destination container
204
    :type container: ATContentType/DexterityContentType/CatalogBrain
205
    :param portal_type: destination portal type
206
    :returns: The new created object
207
    """
208
    # Prevent circular dependencies
209
    from security import check_permission
210
    # Use same container as source unless explicitly set
211
    source = get_object(source)
212
    if not container:
213
        container = get_parent(source)
214
215
    # Use same portal type as source unless explicitly set
216
    if not portal_type:
217
        portal_type = get_portal_type(source)
218
219
    # Extend the fields to skip with defaults
220
    skip = kwargs.pop("skip", [])
221
    skip = set(skip)
222
    skip.update([
223
        "Products.Archetypes.Field.ComputedField",
224
        "UID",
225
        "id",
226
        "allowDiscussion",
227
        "contributors",
228
        "creation_date",
229
        "creators",
230
        "effectiveDate",
231
        "expirationDate",
232
        "language",
233
        "location",
234
        "modification_date",
235
        "rights",
236
        "subject",
237
    ])
238
    # Build a dict for complexity reduction
239
    skip = dict([(item, True) for item in skip])
240
241
    # Update kwargs with the field values to copy from source
242
    fields = get_fields(source)
243
    for field_name, field in fields.items():
244
        # Prioritize field values passed as kwargs
245
        if field_name in kwargs:
246
            continue
247
        # Skip framework internal fields by name
248
        if skip.get(field_name, False):
249
            continue
250
        # Skip fields of non-suitable types
251
        if hasattr(field, "getType") and skip.get(field.getType(), False):
252
            continue
253
        # Skip readonly fields
254
        if getattr(field, "readonly", False):
255
            continue
256
        # Skip non-readable fields
257
        perm = getattr(field, "read_permission", View)
258
        if perm and not check_permission(perm, source):
259
            continue
260
261
        # do not wake-up objects unnecessarily
262
        if hasattr(field, "getRaw"):
263
            field_value = field.getRaw(source)
264
        elif hasattr(field, "get_raw"):
265
            field_value = field.get_raw(source)
266
        elif hasattr(field, "getAccessor"):
267
            accessor = field.getAccessor(source)
268
            field_value = accessor()
269
        else:
270
            field_value = field.get(source)
271
272
        # Do a hard copy of value if mutable type
273
        if isinstance(field_value, (list, dict, set)):
274
            field_value = copy.deepcopy(field_value)
275
        kwargs.update({field_name: field_value})
276
277
    # Create a copy
278
    return create(container, portal_type, *args, **kwargs)
279
280
281
def edit(obj, check_permissions=True, **kwargs):
282
    """Updates the values of object fields with the new values passed-in
283
    """
284
    # Prevent circular dependencies
285
    from security import check_permission
286
    fields = get_fields(obj)
287
    for name, value in kwargs.items():
288
        field = fields.get(name, None)
289
        if not field:
290
            continue
291
292
        # cannot update readonly fields
293
        readonly = getattr(field, "readonly", False)
294
        if readonly:
295
            raise ValueError("Field '{}' is readonly".format(name))
296
297
        # check field writable permission
298
        if check_permissions:
299
            perm = getattr(field, "write_permission", ModifyPortalContent)
300
            if perm and not check_permission(perm, obj):
301
                raise Unauthorized("Field '{}' is not writeable".format(name))
302
303
        # Set the value
304
        if hasattr(field, "getMutator"):
305
            mutator = field.getMutator(obj)
306
            mapply(mutator, value)
307
        else:
308
            field.set(obj, value)
309
310
311
def move_object(obj, destination, check_constraints=True):
312
    """Moves the object to the destination folder
313
314
    This function has the same effect as:
315
316
        id = obj.getId()
317
        cp = origin.manage_cutObjects(id)
318
        destination.manage_pasteObjects(cp)
319
320
    but with slightly better performance. The code is mostly grabbed from
321
    OFS.CopySupport.CopyContainer_pasteObjects
322
323
    :param obj: object to move to destination
324
    :type obj: ATContentType/DexterityContentType/CatalogBrain/UID
325
    :param destination: destination container
326
    :type destination: ATContentType/DexterityContentType/CatalogBrain/UID
327
    :param check_constraints: constraints and permissions must be checked
328
    :type check_constraints: bool
329
    :returns: The moved object
330
    """
331
    # prevent circular dependencies
332
    from bika.lims.api.security import check_permission
333
334
    obj = get_object(obj)
335
    destination = get_object(destination)
336
337
    # make sure the object is not moved into itself
338
    if obj == destination:
339
        raise ValueError("Cannot move object into itself: {}".format(obj))
340
341
    # do nothing if destination is the same as origin
342
    origin = get_parent(obj)
343
    if origin == destination:
344
        return obj
345
346
    if check_constraints:
347
348
        # check origin object has CopyOrMove permission
349
        if not check_permission(CopyOrMove, obj):
350
            raise Unauthorized("Cannot move {}".format(obj))
351
352
        # check if portal type is allowed in destination object
353
        portal_type = get_portal_type(obj)
354
        pt = get_tool("portal_types")
355
        ti = pt.getTypeInfo(destination)
356
        if not ti.allowType(portal_type):
357
            raise ValueError("Disallowed subobject type: %s" % portal_type)
358
359
    id = get_id(obj)
360
361
    # notify that the object will be copied to destination
362
    obj._notifyOfCopyTo(destination, op=1)  # noqa
363
364
    # notify that the object will be moved to destination
365
    notify(ObjectWillBeMovedEvent(obj, origin, id, destination, id))
366
367
    # effectively move the object from origin to destination
368
    delete(obj, check_permissions=check_constraints, suppress_events=True)
369
    obj = aq_base(obj)
370
    destination._setObject(id, obj, set_owner=0, suppress_events=True)  # noqa
371
    obj = destination._getOb(id)  # noqa
372
373
    # since we used "suppress_events=True", we need to manually notify that the
374
    # object was moved and containers modified. This also makes the objects to
375
    # be re-catalogued
376
    notify(ObjectMovedEvent(obj, origin, id, destination, id))
377
    notifyContainerModified(origin)
378
    notifyContainerModified(destination)
379
380
    # make ownership implicit if possible, so it acquires the permissions from
381
    # the container
382
    obj.manage_changeOwnershipType(explicit=0)
383
384
    return obj
385
386
387
def uncatalog_object(obj):
388
    """Un-catalog the object from all catalogs
389
390
    :param obj: object to un-catalog
391
    :type obj: ATContentType/DexterityContentType
392
    """
393
    # un-catalog from registered catalogs
394
    obj.unindexObject()
395
    # explicitly un-catalog from uid_catalog
396
    uid_catalog = get_tool("uid_catalog")
397
    # the uids of uid_catalog are relative paths to portal root
398
    # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
399
    url = "/".join(obj.getPhysicalPath()[2:])
400
    uid_catalog.uncatalog_object(url)
401
402
403
def catalog_object(obj):
404
    """Re-catalog the object
405
406
    :param obj: object to un-catalog
407
    :type obj: ATContentType/DexterityContentType
408
    """
409
    if is_at_content(obj):
410
        # explicitly re-catalog AT types at uid_catalog (DX types are
411
        # automatically reindexed in UID catalog on reindexObject)
412
        uc = get_tool("uid_catalog")
413
        # the uids of uid_catalog are relative paths to portal root
414
        # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
415
        url = "/".join(obj.getPhysicalPath()[2:])
416
        uc.catalog_object(obj, url)
417
    obj.reindexObject()
418
419
420
def delete(obj, check_permissions=True, suppress_events=False):
421
    """Deletes the given object
422
423
    :param obj: object to un-catalog
424
    :param check_permissions: whether delete permission must be checked
425
    :param suppress_events: whether ondelete events have to be fired
426
    :type obj: ATContentType/DexterityContentType
427
    """
428
    from security import check_permission
429
    if check_permissions and not check_permission(DeleteObjects, obj):
430
        raise Unauthorized("Do not have permissions to remove this object")
431
432
    # un-catalog the object from all catalogs (uid_catalog included)
433
    uncatalog_object(obj)
434
    # delete the object
435
    parent = get_parent(obj)
436
    parent._delObject(obj.getId(), suppress_events=suppress_events)
437
438
439
def get_tool(name, context=None, default=_marker):
440
    """Get a portal tool by name
441
442
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
443
    :type name: string
444
    :param context: A portal object
445
    :type context: ATContentType/DexterityContentType/CatalogBrain
446
    :returns: Portal Tool
447
    """
448
449
    # Try first with the context
450
    if context is not None:
451
        try:
452
            context = get_object(context)
453
            return getToolByName(context, name)
454
        except (APIError, AttributeError) as e:
455
            # https://github.com/senaite/bika.lims/issues/396
456
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
457
                        "-> falling back to plone.api.portal.get_tool('{}')"
458
                        .format(repr(context), name, repr(e), name))
459
            return get_tool(name, default=default)
460
461
    # Try with the plone api
462
    try:
463
        return ploneapi.portal.get_tool(name)
464
    except InvalidParameterError:
465
        if default is not _marker:
466
            if isinstance(default, six.string_types):
467
                return get_tool(default)
468
            return default
469
        fail("No tool named '%s' found." % name)
470
471
472
def fail(msg=None):
473
    """API LIMS Error
474
    """
475
    if msg is None:
476
        msg = "Reason not given."
477
    raise APIError("{}".format(msg))
478
479
480
def is_object(brain_or_object):
481
    """Check if the passed in object is a supported portal content object
482
483
    :param brain_or_object: A single catalog brain or content object
484
    :type brain_or_object: Portal Object
485
    :returns: True if the passed in object is a valid portal content
486
    """
487
    if is_portal(brain_or_object):
488
        return True
489
    if is_supermodel(brain_or_object):
490
        return True
491
    if is_at_content(brain_or_object):
492
        return True
493
    if is_dexterity_content(brain_or_object):
494
        return True
495
    if is_brain(brain_or_object):
496
        return True
497
    return False
498
499
500
def get_object(brain_object_uid, default=_marker):
501
    """Get the full content object
502
503
    :param brain_object_uid: A catalog brain or content object or uid
504
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
505
    /CatalogBrain/basestring
506
    :returns: The full object
507
    """
508
    if is_uid(brain_object_uid):
509
        return get_object_by_uid(brain_object_uid, default=default)
510
    elif is_supermodel(brain_object_uid):
511
        return brain_object_uid.instance
512
    if not is_object(brain_object_uid):
513
        if default is _marker:
514
            fail("{} is not supported.".format(repr(brain_object_uid)))
515
        return default
516
    if is_brain(brain_object_uid):
517
        return brain_object_uid.getObject()
518
    return brain_object_uid
519
520
521
def is_portal(brain_or_object):
522
    """Checks if the passed in object is the portal root object
523
524
    :param brain_or_object: A single catalog brain or content object
525
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
526
    :returns: True if the object is the portal root object
527
    :rtype: bool
528
    """
529
    return ISiteRoot.providedBy(brain_or_object)
530
531
532
def is_brain(brain_or_object):
533
    """Checks if the passed in object is a portal catalog brain
534
535
    :param brain_or_object: A single catalog brain or content object
536
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
537
    :returns: True if the object is a catalog brain
538
    :rtype: bool
539
    """
540
    return ICatalogBrain.providedBy(brain_or_object)
541
542
543
def is_supermodel(brain_or_object):
544
    """Checks if the passed in object is a supermodel
545
546
    :param brain_or_object: A single catalog brain or content object
547
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
548
    :returns: True if the object is a catalog brain
549
    :rtype: bool
550
    """
551
    # avoid circular imports
552
    from senaite.app.supermodel.interfaces import ISuperModel
553
    return ISuperModel.providedBy(brain_or_object)
554
555
556
def is_dexterity_content(brain_or_object):
557
    """Checks if the passed in object is a dexterity content type
558
559
    :param brain_or_object: A single catalog brain or content object
560
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
561
    :returns: True if the object is a dexterity content type
562
    :rtype: bool
563
    """
564
    return IDexterityContent.providedBy(brain_or_object)
565
566
567
def is_at_content(brain_or_object):
568
    """Checks if the passed in object is an AT content type
569
570
    :param brain_or_object: A single catalog brain or content object
571
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
572
    :returns: True if the object is an AT content type
573
    :rtype: bool
574
    """
575
    return isinstance(brain_or_object, BaseObject)
576
577
578
def is_dx_type(portal_type):
579
    """Checks if the portal type is DX based
580
581
    :param portal_type: The portal type name to check
582
    :returns: True if the portal type is DX based
583
    """
584
    portal_types = get_tool("portal_types")
585
    fti = portal_types.getTypeInfo(portal_type)
586
    if fti.product:
587
        return False
588
    return True
589
590
591
def is_at_type(portal_type):
592
    """Checks if the portal type is AT based
593
594
    :param portal_type: The portal type name to check
595
    :returns: True if the portal type is AT based
596
    """
597
    return not is_dx_type(portal_type)
598
599
600
def is_folderish(brain_or_object):
601
    """Checks if the passed in object is folderish
602
603
    :param brain_or_object: A single catalog brain or content object
604
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
605
    :returns: True if the object is folderish
606
    :rtype: bool
607
    """
608
    if hasattr(brain_or_object, "is_folderish"):
609
        if callable(brain_or_object.is_folderish):
610
            return brain_or_object.is_folderish()
611
        return brain_or_object.is_folderish
612
    return IFolderish.providedBy(get_object(brain_or_object))
613
614
615
def get_portal_type(brain_or_object):
616
    """Get the portal type for this object
617
618
    :param brain_or_object: A single catalog brain or content object
619
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
620
    :returns: Portal type
621
    :rtype: string
622
    """
623
    if not is_object(brain_or_object):
624
        fail("{} is not supported.".format(repr(brain_or_object)))
625
    return brain_or_object.portal_type
626
627
628
def get_schema(brain_or_object):
629
    """Get the schema of the content
630
631
    :param brain_or_object: A single catalog brain or content object
632
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
633
    :returns: Schema object
634
    """
635
    obj = get_object(brain_or_object)
636
    if is_portal(obj):
637
        fail("get_schema can't return schema of portal root")
638
    if is_dexterity_content(obj):
639
        pt = get_tool("portal_types")
640
        fti = pt.getTypeInfo(obj.portal_type)
641
        return fti.lookupSchema()
642
    if is_at_content(obj):
643
        return obj.Schema()
644
    fail("{} has no Schema.".format(brain_or_object))
645
646
647
def get_behaviors(portal_type):
648
    """List all behaviors
649
650
    :param portal_type: DX portal type name
651
    """
652
    portal_types = get_tool("portal_types")
653
    fti = portal_types.getTypeInfo(portal_type)
654
    if fti.product:
655
        raise TypeError("Expected DX type, got AT type instead.")
656
    return fti.behaviors
657
658
659
def enable_behavior(portal_type, behavior_id):
660
    """Enable behavior
661
662
    :param portal_type: DX portal type name
663
    :param behavior_id: The behavior to enable
664
    """
665
    portal_types = get_tool("portal_types")
666
    fti = portal_types.getTypeInfo(portal_type)
667
    if fti.product:
668
        raise TypeError("Expected DX type, got AT type instead.")
669
670
    if behavior_id not in fti.behaviors:
671
        fti.behaviors += (behavior_id, )
672
        # invalidate schema cache
673
        notify(SchemaInvalidatedEvent(portal_type))
674
675
676
def disable_behavior(portal_type, behavior_id):
677
    """Disable behavior
678
679
    :param portal_type: DX portal type name
680
    :param behavior_id: The behavior to disable
681
    """
682
    portal_types = get_tool("portal_types")
683
    fti = portal_types.getTypeInfo(portal_type)
684
    if fti.product:
685
        raise TypeError("Expected DX type, got AT type instead.")
686
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
687
    # invalidate schema cache
688
    notify(SchemaInvalidatedEvent(portal_type))
689
690
691
def get_fields(brain_or_object):
692
    """Get a name to field mapping of the object
693
694
    :param brain_or_object: A single catalog brain or content object
695
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
696
    :returns: Mapping of name -> field
697
    :rtype: OrderedDict
698
    """
699
    obj = get_object(brain_or_object)
700
    schema = get_schema(obj)
701
    if is_dexterity_content(obj):
702
        # get the fields directly provided by the interface
703
        fields = getFieldsInOrder(schema)
704
        # append the fields coming from behaviors
705
        behavior_assignable = IBehaviorAssignable(obj)
706
        if behavior_assignable:
707
            behaviors = behavior_assignable.enumerateBehaviors()
708
            for behavior in behaviors:
709
                fields.extend(getFieldsInOrder(behavior.interface))
710
        return OrderedDict(fields)
711
    return OrderedDict(schema)
712
713
714
def get_id(brain_or_object):
715
    """Get the Plone ID for this object
716
717
    :param brain_or_object: A single catalog brain or content object
718
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
719
    :returns: Plone ID
720
    :rtype: string
721
    """
722
    if is_brain(brain_or_object):
723
        if base_hasattr(brain_or_object, "getId"):
724
            return brain_or_object.getId
725
        if base_hasattr(brain_or_object, "id"):
726
            return brain_or_object.id
727
    return get_object(brain_or_object).getId()
728
729
730
def get_title(brain_or_object):
731
    """Get the Title for this object
732
733
    :param brain_or_object: A single catalog brain or content object
734
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
735
    :returns: Title
736
    :rtype: string
737
    """
738
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
739
        return brain_or_object.Title
740
    return get_object(brain_or_object).Title()
741
742
743
def get_description(brain_or_object):
744
    """Get the Title for this object
745
746
    :param brain_or_object: A single catalog brain or content object
747
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
748
    :returns: Title
749
    :rtype: string
750
    """
751
    if is_brain(brain_or_object) \
752
            and base_hasattr(brain_or_object, "Description"):
753
        return brain_or_object.Description
754
    return get_object(brain_or_object).Description()
755
756
757
def get_uid(brain_or_object):
758
    """Get the Plone UID for this object
759
760
    :param brain_or_object: A single catalog brain or content object or an UID
761
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
762
    :returns: Plone UID
763
    :rtype: string
764
    """
765
    if is_uid(brain_or_object):
766
        return brain_or_object
767
    if is_portal(brain_or_object):
768
        return '0'
769
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
770
        return brain_or_object.UID
771
    return get_object(brain_or_object).UID()
772
773
774
def get_url(brain_or_object):
775
    """Get the absolute URL for this object
776
777
    :param brain_or_object: A single catalog brain or content object
778
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
779
    :returns: Absolute URL
780
    :rtype: string
781
    """
782
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
783
        return brain_or_object.getURL()
784
    return get_object(brain_or_object).absolute_url()
785
786
787
def get_icon(thing, html_tag=True):
788
    """Get the icon of the content object
789
790
    :param thing: A single catalog brain, content object or portal_type
791
    :type thing: ATContentType/DexterityContentType/CatalogBrain/String
792
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
793
    :type html_tag: bool
794
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
795
    :rtype: string
796
    """
797
    portal_type = thing
798
    if is_object(thing):
799
        portal_type = get_portal_type(thing)
800
801
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
802
    # work for Contents coming from other catalogs than the
803
    # `portal_catalog`
804
    portal_types = get_tool("portal_types")
805
    fti = portal_types.getTypeInfo(portal_type)
806
    if not fti:
807
        fail("No type info for {}".format(repr(thing)))
808
    icon = fti.getIcon()
809
    if not icon:
810
        return ""
811
    url = "%s/%s" % (get_url(get_portal()), icon)
812
    if not html_tag:
813
        return url
814
815
    # build the img element
816
    if is_object(thing):
817
        title = get_title(thing)
818
    else:
819
        title = fti.Title()
820
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'
821
    return tag.format(url=url, title=title)
822
823
824
def get_object_by_uid(uid, default=_marker):
825
    """Find an object by a given UID
826
827
    :param uid: The UID of the object to find
828
    :type uid: string
829
    :returns: Found Object or None
830
    """
831
832
    # nothing to do here
833
    if not uid:
834
        if default is not _marker:
835
            return default
836
        fail("get_object_by_uid requires UID as first argument; got {} instead"
837
             .format(uid))
838
839
    # we defined the portal object UID to be '0'::
840
    if uid == '0':
841
        return get_portal()
842
843
    brain = get_brain_by_uid(uid)
844
845
    if brain is None:
846
        if default is not _marker:
847
            return default
848
        fail("No object found for UID {}".format(uid))
849
850
    return get_object(brain)
851
852
853
def get_brain_by_uid(uid, default=None):
854
    """Query a brain by a given UID
855
856
    :param uid: The UID of the object to find
857
    :type uid: string
858
    :returns: ZCatalog brain or None
859
    """
860
    if not is_uid(uid):
861
        return default
862
863
    # we try to find the object with the UID catalog
864
    uc = get_tool(UID_CATALOG)
865
866
    # try to find the object with the reference catalog first
867
    brains = uc(UID=uid)
868
    if len(brains) != 1:
869
        return default
870
    return brains[0]
871
872
873
def get_object_by_path(path, default=_marker):
874
    """Find an object by a given physical path or absolute_url
875
876
    :param path: The physical path of the object to find
877
    :type path: string
878
    :returns: Found Object or None
879
    """
880
881
    # nothing to do here
882
    if not path:
883
        if default is not _marker:
884
            return default
885
        fail("get_object_by_path first argument must be a path; {} received"
886
             .format(path))
887
888
    portal = get_portal()
889
    portal_path = get_path(portal)
890
    portal_url = get_url(portal)
891
892
    # ensure we have a physical path
893
    if path.startswith(portal_url):
894
        request = get_request()
895
        path = "/".join(request.physicalPathFromURL(path))
896
897
    if not path.startswith(portal_path):
898
        if default is not _marker:
899
            return default
900
        fail("Not a physical path inside the portal.")
901
902
    if path == portal_path:
903
        return portal
904
905
    return portal.restrictedTraverse(path, default)
906
907
908
def get_path(brain_or_object):
909
    """Calculate the physical path of this object
910
911
    :param brain_or_object: A single catalog brain or content object
912
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
913
    :returns: Physical path of the object
914
    :rtype: string
915
    """
916
    if is_brain(brain_or_object):
917
        return brain_or_object.getPath()
918
    return "/".join(get_object(brain_or_object).getPhysicalPath())
919
920
921
def get_parent_path(brain_or_object):
922
    """Calculate the physical parent path of this object
923
924
    :param brain_or_object: A single catalog brain or content object
925
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
926
    :returns: Physical path of the parent object
927
    :rtype: string
928
    """
929
    if is_portal(brain_or_object):
930
        return get_path(get_portal())
931
    if is_brain(brain_or_object):
932
        path = get_path(brain_or_object)
933
        return path.rpartition("/")[0]
934
    return get_path(get_object(brain_or_object).aq_parent)
935
936
937
def get_parent(brain_or_object, **kw):
938
    """Locate the parent object of the content/catalog brain
939
940
    :param brain_or_object: A single catalog brain or content object
941
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
942
    :param catalog_search: Use a catalog query to find the parent object
943
    :type catalog_search: bool
944
    :returns: parent object
945
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
946
    """
947
948
    if is_portal(brain_or_object):
949
        return get_portal()
950
951
    # BBB: removed `catalog_search` keyword
952
    if kw:
953
        logger.warn("API function `get_parent` no longer support keywords.")
954
955
    return get_object(brain_or_object).aq_parent
956
957
958
def search(query, catalog=_marker):
959
    """Search for objects.
960
961
    :param query: A suitable search query.
962
    :type query: dict
963
    :param catalog: A single catalog id or a list of catalog ids
964
    :type catalog: str/list
965
    :returns: Search results
966
    :rtype: List of ZCatalog brains
967
    """
968
969
    # query needs to be a dictionary
970
    if not isinstance(query, dict):
971
        fail("Catalog query needs to be a dictionary")
972
973
    # Portal types to query
974
    portal_types = query.get("portal_type", [])
975
    # We want the portal_type as a list
976
    if not isinstance(portal_types, (tuple, list)):
977
        portal_types = [portal_types]
978
979
    # The catalogs used for the query
980
    catalogs = []
981
982
    # The user did **not** specify a catalog
983
    if catalog is _marker:
984
        # Find the registered catalogs for the queried portal types
985
        for portal_type in portal_types:
986
            # Just get the first registered/default catalog
987
            catalogs.append(get_catalogs_for(
988
                portal_type, default=UID_CATALOG)[0])
989
    else:
990
        # User defined catalogs
991
        if isinstance(catalog, (list, tuple)):
992
            catalogs.extend(map(get_tool, catalog))
993
        else:
994
            catalogs.append(get_tool(catalog))
995
996
    # Cleanup: Avoid duplicate catalogs
997
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
998
999
    # We only support **single** catalog queries
1000
    if len(catalogs) > 1:
1001
        fail("Multi Catalog Queries are not supported!")
1002
1003
    return catalogs[0](query)
1004
1005
1006
def safe_getattr(brain_or_object, attr, default=_marker):
1007
    """Return the attribute value
1008
1009
    :param brain_or_object: A single catalog brain or content object
1010
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1011
    :param attr: Attribute name
1012
    :type attr: str
1013
    :returns: Attribute value
1014
    :rtype: obj
1015
    """
1016
    try:
1017
        value = getattr(brain_or_object, attr, _marker)
1018
        if value is _marker:
1019
            if default is not _marker:
1020
                return default
1021
            fail("Attribute '{}' not found.".format(attr))
1022
        if callable(value):
1023
            return value()
1024
        return value
1025
    except Unauthorized:
1026
        if default is not _marker:
1027
            return default
1028
        fail("You are not authorized to access '{}' of '{}'.".format(
1029
            attr, repr(brain_or_object)))
1030
1031
1032
def get_uid_catalog():
1033
    """Get the UID catalog tool
1034
1035
    :returns: UID Catalog Tool
1036
    """
1037
    return get_tool(UID_CATALOG)
1038
1039
1040
def get_portal_catalog():
1041
    """Get the portal catalog tool
1042
1043
    :returns: Portal Catalog Tool
1044
    """
1045
    return get_tool(PORTAL_CATALOG)
1046
1047
1048
def get_review_history(brain_or_object, rev=True):
1049
    """Get the review history for the given brain or context.
1050
1051
    :param brain_or_object: A single catalog brain or content object
1052
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1053
    :returns: Workflow history
1054
    :rtype: [{}, ...]
1055
    """
1056
    obj = get_object(brain_or_object)
1057
    review_history = []
1058
    try:
1059
        workflow = get_tool("portal_workflow")
1060
        review_history = workflow.getInfoFor(obj, 'review_history')
1061
    except WorkflowException as e:
1062
        message = str(e)
1063
        logger.error("Cannot retrieve review_history on {}: {}".format(
1064
            obj, message))
1065
    if not isinstance(review_history, (list, tuple)):
1066
        logger.error("get_review_history: expected list, received {}".format(
1067
            review_history))
1068
        review_history = []
1069
1070
    if isinstance(review_history, tuple):
1071
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
1072
        # tuple when "review_history" is passed in:
1073
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
1074
        #
1075
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
1076
        # Expression.StateChangeInfo, that always returns a list, except when no
1077
        # review_history is found:
1078
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
1079
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
1080
        review_history = list(review_history)
1081
1082
    if rev is True:
1083
        review_history.reverse()
1084
    return review_history
1085
1086
1087
def get_revision_history(brain_or_object):
1088
    """Get the revision history for the given brain or context.
1089
1090
    :param brain_or_object: A single catalog brain or content object
1091
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1092
    :returns: Workflow history
1093
    :rtype: obj
1094
    """
1095
    obj = get_object(brain_or_object)
1096
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
1097
    return chv.fullHistory()
1098
1099
1100
def get_workflows_for(brain_or_object):
1101
    """Get the assigned workflows for the given brain or context.
1102
1103
    Note: This function supports also the portal_type as parameter.
1104
1105
    :param brain_or_object: A single catalog brain or content object
1106
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1107
    :returns: Assigned Workflows
1108
    :rtype: tuple
1109
    """
1110
    workflow = ploneapi.portal.get_tool("portal_workflow")
1111
    if isinstance(brain_or_object, six.string_types):
1112
        return workflow.getChainFor(brain_or_object)
1113
    obj = get_object(brain_or_object)
1114
    return workflow.getChainFor(obj)
1115
1116
1117
def get_workflow_status_of(brain_or_object, state_var="review_state"):
1118
    """Get the current workflow status of the given brain or context.
1119
1120
    :param brain_or_object: A single catalog brain or content object
1121
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1122
    :param state_var: The name of the state variable
1123
    :type state_var: string
1124
    :returns: Status
1125
    :rtype: str
1126
    """
1127
    # Try to get the state from the catalog brain first
1128
    if is_brain(brain_or_object):
1129
        if state_var in brain_or_object.schema():
1130
            return brain_or_object[state_var]
1131
1132
    # Retrieve the sate from the object
1133
    workflow = get_tool("portal_workflow")
1134
    obj = get_object(brain_or_object)
1135
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1136
1137
1138
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1139
    """Get the previous workflow status of the object
1140
1141
    :param brain_or_object: A single catalog brain or content object
1142
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1143
    :param skip: Workflow states to skip
1144
    :type skip: tuple/list
1145
    :returns: status
1146
    :rtype: str
1147
    """
1148
1149
    skip = isinstance(skip, (list, tuple)) and skip or []
1150
    history = get_review_history(brain_or_object)
1151
1152
    # Remove consecutive duplicates, some transitions might happen more than
1153
    # once consecutively (e.g. publish)
1154
    history = map(lambda i: i[0], groupby(history))
1155
1156
    for num, item in enumerate(history):
1157
        # skip the current history entry
1158
        if num == 0:
1159
            continue
1160
        status = item.get("review_state")
1161
        if status in skip:
1162
            continue
1163
        return status
1164
    return default
1165
1166
1167
def get_creation_date(brain_or_object):
1168
    """Get the creation date of the brain or object
1169
1170
    :param brain_or_object: A single catalog brain or content object
1171
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1172
    :returns: Creation date
1173
    :rtype: DateTime
1174
    """
1175
    created = getattr(brain_or_object, "created", None)
1176
    if created is None:
1177
        fail("Object {} has no creation date ".format(
1178
             repr(brain_or_object)))
1179
    if callable(created):
1180
        return created()
1181
    return created
1182
1183
1184
def get_modification_date(brain_or_object):
1185
    """Get the modification date of the brain or object
1186
1187
    :param brain_or_object: A single catalog brain or content object
1188
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1189
    :returns: Modification date
1190
    :rtype: DateTime
1191
    """
1192
    modified = getattr(brain_or_object, "modified", None)
1193
    if modified is None:
1194
        fail("Object {} has no modification date ".format(
1195
             repr(brain_or_object)))
1196
    if callable(modified):
1197
        return modified()
1198
    return modified
1199
1200
1201
def get_review_status(brain_or_object):
1202
    """Get the `review_state` of an object
1203
1204
    :param brain_or_object: A single catalog brain or content object
1205
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1206
    :returns: Value of the review_status variable
1207
    :rtype: String
1208
    """
1209
    if is_brain(brain_or_object) \
1210
       and base_hasattr(brain_or_object, "review_state"):
1211
        return brain_or_object.review_state
1212
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1213
1214
1215
def is_active(brain_or_object):
1216
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1217
1218
    :param brain_or_object: A single catalog brain or content object
1219
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1220
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1221
    :rtype: bool
1222
    """
1223
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1224
        return False
1225
    return True
1226
1227
1228
def get_fti(portal_type, default=None):
1229
    """Lookup the Dynamic Filetype Information for the given portal_type
1230
1231
    :param portal_type: The portal type to get the FTI for
1232
    :returns: FTI or default value
1233
    """
1234
    if not is_string(portal_type):
1235
        return default
1236
    portal_types = get_tool("portal_types")
1237
    fti = portal_types.getTypeInfo(portal_type)
1238
    return fti or default
1239
1240
1241
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1242
    """Get all registered catalogs for the given portal_type, catalog brain or
1243
    content object
1244
1245
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1246
          work around the missing `uid_catalog` during snapshot creation when
1247
          installing a fresh site!
1248
1249
    :param brain_or_object: The portal_type, a catalog brain or content object
1250
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1251
    :returns: List of supported catalogs
1252
    :rtype: list
1253
    """
1254
1255
    # only handle catalog lookups by portal_type internally
1256
    if is_uid(brain_or_object) or is_object(brain_or_object):
1257
        obj = get_object(brain_or_object)
1258
        portal_type = get_portal_type(obj)
1259
        return get_catalogs_for(portal_type)
1260
1261
    catalogs = []
1262
1263
    if not is_string(brain_or_object):
1264
        raise APIError("Expected a portal_type string, got <%s>"
1265
                       % type(brain_or_object))
1266
1267
    # at this point the brain_or_object is a portal_type
1268
    portal_type = brain_or_object
1269
1270
    # check static portal_type -> catalog mapping first
1271
    from senaite.core.catalog import get_catalogs_by_type
1272
    catalogs = get_catalogs_by_type(portal_type)
1273
1274
    # no catalogs in static mapping
1275
    # => Lookup catalogs by FTI
1276
    if len(catalogs) == 0:
1277
        fti = get_fti(portal_type)
1278
        if fti.product:
1279
            # AT content type
1280
            # => Looup via archetype_tool
1281
            archetype_tool = get_tool("archetype_tool")
1282
            catalogs = archetype_tool.catalog_map.get(portal_type) or []
1283
        else:
1284
            # DX content type
1285
            # => resolve the `_catalogs` attribute from the class
1286
            klass = resolveDottedName(fti.klass)
1287
            # XXX: Refactor multi-catalog behavior to not rely
1288
            #      on this hidden `_catalogs` attribute!
1289
            catalogs = getattr(klass, "_catalogs", [])
1290
1291
    # fetch the catalog objects
1292
    catalogs = filter(None, map(lambda cid: get_tool(cid, None), catalogs))
1293
1294
    if len(catalogs) == 0:
1295
        return [get_tool(default, default=PORTAL_CATALOG)]
1296
1297
    return list(catalogs)
1298
1299
1300
def get_transitions_for(brain_or_object):
1301
    """List available workflow transitions for all workflows
1302
1303
    :param brain_or_object: A single catalog brain or content object
1304
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1305
    :returns: All possible available and allowed transitions
1306
    :rtype: list[dict]
1307
    """
1308
    workflow = get_tool('portal_workflow')
1309
    transitions = []
1310
    instance = get_object(brain_or_object)
1311
    for wfid in get_workflows_for(brain_or_object):
1312
        wf = workflow[wfid]
1313
        tlist = wf.getTransitionsFor(instance)
1314
        transitions.extend([t for t in tlist if t not in transitions])
1315
    return transitions
1316
1317
1318
def do_transition_for(brain_or_object, transition):
1319
    """Performs a workflow transition for the passed in object.
1320
1321
    :param brain_or_object: A single catalog brain or content object
1322
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1323
    :returns: The object where the transtion was performed
1324
    """
1325
    if not isinstance(transition, six.string_types):
1326
        fail("Transition type needs to be string, got '%s'" % type(transition))
1327
    obj = get_object(brain_or_object)
1328
    try:
1329
        ploneapi.content.transition(obj, transition)
1330
    except ploneapi.exc.InvalidParameterError as e:
1331
        fail("Failed to perform transition '{}' on {}: {}".format(
1332
             transition, obj, str(e)))
1333
    return obj
1334
1335
1336
def get_roles_for_permission(permission, brain_or_object):
1337
    """Get a list of granted roles for the given permission on the object.
1338
1339
    :param brain_or_object: A single catalog brain or content object
1340
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1341
    :returns: Roles for the given Permission
1342
    :rtype: list
1343
    """
1344
    obj = get_object(brain_or_object)
1345
    allowed = set(rolesForPermissionOn(permission, obj))
1346
    return sorted(allowed)
1347
1348
1349
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1350
    """Checks if the passed in object is versionable.
1351
1352
    :param brain_or_object: A single catalog brain or content object
1353
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1354
    :returns: True if the object is versionable
1355
    :rtype: bool
1356
    """
1357
    pr = get_tool("portal_repository")
1358
    obj = get_object(brain_or_object)
1359
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1360
        and pr.isVersionable(obj)
1361
1362
1363
def get_version(brain_or_object):
1364
    """Get the version of the current object
1365
1366
    :param brain_or_object: A single catalog brain or content object
1367
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1368
    :returns: The current version of the object, or None if not available
1369
    :rtype: int or None
1370
    """
1371
    obj = get_object(brain_or_object)
1372
    if not is_versionable(obj):
1373
        return None
1374
    return getattr(aq_base(obj), "version_id", 0)
1375
1376
1377
def get_view(name, context=None, request=None, default=None):
1378
    """Get the view by name
1379
1380
    :param name: The name of the view
1381
    :type name: str
1382
    :param context: The context to query the view
1383
    :type context: ATContentType/DexterityContentType/CatalogBrain
1384
    :param request: The request to query the view
1385
    :type request: HTTPRequest object
1386
    :returns: HTTP Request
1387
    :rtype: Products.Five.metaclass View object
1388
    """
1389
    context = context or get_portal()
1390
    request = request or get_request() or None
1391
    view = queryMultiAdapter((get_object(context), request), name=name)
1392
    if view is None:
1393
        return default
1394
    return view
1395
1396
1397
def get_request():
1398
    """Get the global request object
1399
1400
    :returns: HTTP Request
1401
    :rtype: HTTPRequest object
1402
    """
1403
    return globalrequest.getRequest()
1404
1405
1406
def get_test_request():
1407
    """Get the TestRequest object
1408
    """
1409
    request = TestRequest()
1410
    directlyProvides(request, IAttributeAnnotatable)
1411
    return request
1412
1413
1414
def get_group(group_or_groupname):
1415
    """Return Plone Group
1416
1417
    :param group_or_groupname: Plone group or the name of the group
1418
    :type groupname:  GroupData/str
1419
    :returns: Plone GroupData
1420
    """
1421
    if not group_or_groupname:
1422
1423
        return None
1424
    if hasattr(group_or_groupname, "_getGroup"):
1425
        return group_or_groupname
1426
    gtool = get_tool("portal_groups")
1427
    return gtool.getGroupById(group_or_groupname)
1428
1429
1430
def get_user(user_or_username):
1431
    """Return Plone User
1432
1433
    :param user_or_username: Plone user or user id
1434
    :returns: Plone MemberData
1435
    """
1436
    user = None
1437
    if isinstance(user_or_username, MemberData):
1438
        user = user_or_username
1439
    if isinstance(user_or_username, six.string_types):
1440
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1441
    return user
1442
1443
1444
def get_user_properties(user_or_username):
1445
    """Return User Properties
1446
1447
    :param user_or_username: Plone group identifier
1448
    :returns: Plone MemberData
1449
    """
1450
    user = get_user(user_or_username)
1451
    if user is None:
1452
        return {}
1453
    if not callable(user.getUser):
1454
        return {}
1455
    out = {}
1456
    plone_user = user.getUser()
1457
    for sheet in plone_user.listPropertysheets():
1458
        ps = plone_user.getPropertysheet(sheet)
1459
        out.update(dict(ps.propertyItems()))
1460
    return out
1461
1462
1463
def get_users_by_roles(roles=None):
1464
    """Search Plone users by their roles
1465
1466
    :param roles: Plone role name or list of roles
1467
    :type roles:  list/str
1468
    :returns: List of Plone users having the role(s)
1469
    """
1470
    if not isinstance(roles, (tuple, list)):
1471
        roles = [roles]
1472
    mtool = get_tool("portal_membership")
1473
    return mtool.searchForMembers(roles=roles)
1474
1475
1476
def get_current_user():
1477
    """Returns the current logged in user
1478
1479
    :returns: Current User
1480
    """
1481
    return ploneapi.user.get_current()
1482
1483
1484
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1485
    """Returns the associated contact of a Plone user
1486
1487
    If the user passed in has no contact associated, return None.
1488
    The `contact_types` parameter filter the portal types for the search.
1489
1490
    :param: Plone user
1491
    :contact_types: List with the contact portal types to search
1492
    :returns: Contact associated to the Plone user or None
1493
    """
1494
    if not user:
1495
        return None
1496
1497
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1498
    query = {"portal_type": contact_types, "getUsername": user.getId()}
1499
    brains = search(query, catalog=CONTACT_CATALOG)
1500
    if not brains:
1501
        return None
1502
1503
    if len(brains) > 1:
1504
        # Oops, the user has multiple contacts assigned, return None
1505
        contacts = map(lambda c: c.Title, brains)
1506
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1507
        err_msg = err_msg.format(user.getId(), ','.join(contacts))
1508
        logger.error(err_msg)
1509
        return None
1510
1511
    return get_object(brains[0])
1512
1513
1514
def get_user_client(user_or_contact):
1515
    """Returns the client of the contact of a Plone user
1516
1517
    If the user passed in has no contact or does not belong to any client,
1518
    returns None.
1519
1520
    :param: Plone user or contact
1521
    :returns: Client the contact of the Plone user belongs to
1522
    """
1523
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1524
        # Lab contacts cannot belong to a client
1525
        return None
1526
1527
    if not IContact.providedBy(user_or_contact):
1528
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1529
        if IContact.providedBy(contact):
1530
            return get_user_client(contact)
1531
        return None
1532
1533
    client = get_parent(user_or_contact)
1534
    if client and IClient.providedBy(client):
1535
        return client
1536
1537
    return None
1538
1539
1540
def get_user_fullname(user_or_contact):
1541
    """Returns the fullname of the contact or Plone user.
1542
1543
    If the user has a linked contact, the fullname of the contact has priority
1544
    over the value of the fullname property from the user
1545
1546
    :param: Plone user or contact
1547
    :returns: Fullname of the contact or user
1548
    """
1549
    if IContact.providedBy(user_or_contact):
1550
        return user_or_contact.getFullname()
1551
1552
    user = get_user(user_or_contact)
1553
    if not user:
1554
        return ""
1555
1556
    # contact's fullname has priority over user's
1557
    contact = get_user_contact(user)
1558
    if not contact:
1559
        return user.getProperty("fullname")
1560
1561
    return contact.getFullname()
1562
1563
1564
def get_user_email(user_or_contact):
1565
    """Returns the email of the contact or Plone user.
1566
    If the user has a linked contact, the email of the contact has priority
1567
    over the value of the email property from the user
1568
    :param: Plone user or contact
1569
    :returns: Fullname of the contact or user
1570
    """
1571
    if IContact.providedBy(user_or_contact):
1572
        return user_or_contact.getEmailAddress()
1573
1574
    user = get_user(user_or_contact)
1575
    if not user:
1576
        return ""
1577
1578
    # contact's email has priority over user's
1579
    contact = get_user_contact(user)
1580
    if not contact:
1581
        return user.getProperty("email", default="")
1582
1583
    return contact.getEmailAddress()
1584
1585
1586
def get_current_client():
1587
    """Returns the current client the current logged in user belongs to, if any
1588
1589
    :returns: Client the current logged in user belongs to or None
1590
    """
1591
    return get_user_client(get_current_user())
1592
1593
1594
def get_cache_key(brain_or_object):
1595
    """Generate a cache key for a common brain or object
1596
1597
    :param brain_or_object: A single catalog brain or content object
1598
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1599
    :returns: Cache Key
1600
    :rtype: str
1601
    """
1602
    key = [
1603
        get_portal_type(brain_or_object),
1604
        get_id(brain_or_object),
1605
        get_uid(brain_or_object),
1606
        # handle different domains gracefully
1607
        get_url(brain_or_object),
1608
        # Return the microsecond since the epoch in GMT
1609
        get_modification_date(brain_or_object).micros(),
1610
    ]
1611
    return "-".join(map(lambda x: str(x), key))
1612
1613
1614
def bika_cache_key_decorator(method, self, brain_or_object):
1615
    """Bika cache key decorator usable for
1616
1617
    :param brain_or_object: A single catalog brain or content object
1618
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1619
    :returns: Cache Key
1620
    :rtype: str
1621
    """
1622
    if brain_or_object is None:
1623
        raise DontCache
1624
    return get_cache_key(brain_or_object)
1625
1626
1627
def normalize_id(string):
1628
    """Normalize the id
1629
1630
    :param string: A string to normalize
1631
    :type string: str
1632
    :returns: Normalized ID
1633
    :rtype: str
1634
    """
1635
    if not isinstance(string, six.string_types):
1636
        fail("Type of argument must be string, found '{}'"
1637
             .format(type(string)))
1638
    # get the id nomalizer utility
1639
    normalizer = getUtility(IIDNormalizer).normalize
1640
    return normalizer(string)
1641
1642
1643
def normalize_filename(string):
1644
    """Normalize the filename
1645
1646
    :param string: A string to normalize
1647
    :type string: str
1648
    :returns: Normalized ID
1649
    :rtype: str
1650
    """
1651
    if not isinstance(string, six.string_types):
1652
        fail("Type of argument must be string, found '{}'"
1653
             .format(type(string)))
1654
    # get the file nomalizer utility
1655
    normalizer = getUtility(IFileNameNormalizer).normalize
1656
    return normalizer(string)
1657
1658
1659
def is_uid(uid, validate=False):
1660
    """Checks if the passed in uid is a valid UID
1661
1662
    :param uid: The uid to check
1663
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1664
    True, also verifies if a brain exists for the uid passed in
1665
    :type uid: string
1666
    :return: True if a valid uid
1667
    :rtype: bool
1668
    """
1669
    if not isinstance(uid, six.string_types):
1670
        return False
1671
    if uid == '0':
1672
        return True
1673
    if len(uid) != 32:
1674
        return False
1675
    if not UID_RX.match(uid):
1676
        return False
1677
    if not validate:
1678
        return True
1679
1680
    # Check if a brain for this uid exists
1681
    uc = get_tool('uid_catalog')
1682
    brains = uc(UID=uid)
1683
    if brains:
1684
        assert (len(brains) == 1)
1685
    return len(brains) > 0
1686
1687
1688
def is_date(date):
1689
    """Checks if the passed in value is a valid Zope's DateTime
1690
1691
    :param date: The date to check
1692
    :type date: DateTime
1693
    :return: True if a valid date
1694
    :rtype: bool
1695
    """
1696
    if not date:
1697
        return False
1698
    return isinstance(date, (DateTime, datetime))
1699
1700
1701
def to_date(value, default=None):
1702
    """Tries to convert the passed in value to Zope's DateTime
1703
1704
    :param value: The value to be converted to a valid DateTime
1705
    :type value: str, DateTime or datetime
1706
    :return: The DateTime representation of the value passed in or default
1707
    """
1708
1709
    # cannot use bika.lims.deprecated (circular dependencies)
1710
    import warnings
1711
    warnings.simplefilter("always", DeprecationWarning)
1712
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1713
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1714
    warnings.simplefilter("default", DeprecationWarning)
1715
1716
    # prevent circular dependencies
1717
    from senaite.core.api.dtime import to_DT
1718
    date = to_DT(value)
1719
    if not date:
1720
        return to_DT(default)
1721
    return date
1722
1723
1724
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1725
               round_to_int=True):
1726
    """Returns the computed total number of minutes
1727
    """
1728
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1729
        float(seconds)/60 + float(milliseconds)/1000/60
1730
    return int(round(total)) if round_to_int else total
1731
1732
1733
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1734
    """Returns a representation of time in a string in xd yh zm format
1735
    """
1736
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1737
                         seconds=seconds, milliseconds=milliseconds)
1738
    delta = timedelta(minutes=int(round(minutes)))
1739
    d = delta.days
1740
    h = delta.seconds // 3600
1741
    m = (delta.seconds // 60) % 60
1742
    m = m and "{}m ".format(str(m)) or ""
1743
    d = d and "{}d ".format(str(d)) or ""
1744
    if m and d:
1745
        h = "{}h ".format(str(h))
1746
    else:
1747
        h = h and "{}h ".format(str(h)) or ""
1748
    return "".join([d, h, m]).strip()
1749
1750
1751
def to_int(value, default=_marker):
1752
    """Tries to convert the value to int.
1753
    Truncates at the decimal point if the value is a float
1754
1755
    :param value: The value to be converted to an int
1756
    :return: The resulting int or default
1757
    """
1758
    if is_floatable(value):
1759
        value = to_float(value)
1760
    try:
1761
        return int(value)
1762
    except (TypeError, ValueError):
1763
        if default is None:
1764
            return default
1765
        if default is not _marker:
1766
            return to_int(default)
1767
        fail("Value %s cannot be converted to int" % repr(value))
1768
1769
1770
def is_floatable(value):
1771
    """Checks if the passed in value is a valid floatable number
1772
1773
    :param value: The value to be evaluated as a float number
1774
    :type value: str, float, int
1775
    :returns: True if is a valid float number
1776
    :rtype: bool"""
1777
    try:
1778
        float(value)
1779
        return True
1780
    except (TypeError, ValueError):
1781
        return False
1782
1783
1784
def to_float(value, default=_marker):
1785
    """Converts the passed in value to a float number
1786
1787
    :param value: The value to be converted to a floatable number
1788
    :type value: str, float, int
1789
    :returns: The float number representation of the passed in value
1790
    :rtype: float
1791
    """
1792
    if not is_floatable(value):
1793
        if default is not _marker:
1794
            return to_float(default)
1795
        fail("Value %s is not floatable" % repr(value))
1796
    return float(value)
1797
1798
1799
def float_to_string(value, default=_marker):
1800
    """Convert a float value to string without exponential notation
1801
1802
    This function preserves the whole fraction
1803
1804
    :param value: The float value to be converted to a string
1805
    :type value: str, float, int
1806
    :returns: String representation of the float w/o exponential notation
1807
    :rtype: str
1808
    """
1809
    if not is_floatable(value):
1810
        if default is not _marker:
1811
            return default
1812
        fail("Value %s is not floatable" % repr(value))
1813
1814
    # Leave floatable string values unchanged
1815
    if isinstance(value, six.string_types):
1816
        return value
1817
1818
    value = float(value)
1819
    str_value = str(value)
1820
1821
    if "." in str_value:
1822
        # might be something like 1.23e-26
1823
        front, back = str_value.split(".")
1824
    else:
1825
        # or 1e-07 for 0.0000001
1826
        back = str_value
1827
1828
    if "e-" in back:
1829
        fraction, zeros = back.split("e-")
1830
        # we want to cover the faction and the zeros
1831
        precision = len(fraction) + int(zeros)
1832
        template = "{:.%df}" % precision
1833
        str_value = template.format(value)
1834
    elif "e+" in back:
1835
        # positive numbers, e.g. 1e+16 don't need a fractional part
1836
        str_value = "{:.0f}".format(value)
1837
1838
    # cut off trailing zeros
1839
    if "." in str_value:
1840
        str_value = str_value.rstrip("0").rstrip(".")
1841
1842
    return str_value
1843
1844
1845
def to_searchable_text_metadata(value):
1846
    """Parse the given metadata value to searchable text
1847
1848
    :param value: The raw value of the metadata column
1849
    :returns: Searchable and translated unicode value or None
1850
    """
1851
    if not value:
1852
        return u""
1853
    if value is Missing.Value:
1854
        return u""
1855
    if is_uid(value):
1856
        return u""
1857
    if isinstance(value, (bool)):
1858
        return u""
1859
    if isinstance(value, (list, tuple)):
1860
        values = map(to_searchable_text_metadata, value)
1861
        values = filter(None, values)
1862
        return " ".join(values)
1863
    if isinstance(value, dict):
1864
        return to_searchable_text_metadata(value.values())
1865
    if is_date(value):
1866
        from senaite.core.api.dtime import date_to_string
1867
        return date_to_string(value, "%Y-%m-%d")
1868
    if is_at_content(value):
1869
        return to_searchable_text_metadata(get_title(value))
1870
    if not isinstance(value, six.string_types):
1871
        value = str(value)
1872
    return safe_unicode(value)
1873
1874
1875
def get_registry_record(name, default=None):
1876
    """Returns the value of a registry record
1877
1878
    :param name: [required] name of the registry record
1879
    :type name: str
1880
    :param default: The value returned if the record is not found
1881
    :type default: anything
1882
    :returns: value of the registry record
1883
    """
1884
    return ploneapi.portal.get_registry_record(name, default=default)
1885
1886
1887
def to_display_list(pairs, sort_by="key", allow_empty=True):
1888
    """Create a Plone DisplayList from list items
1889
1890
    :param pairs: list of key, value pairs
1891
    :param sort_by: Sort the items either by key or value
1892
    :param allow_empty: Allow to select an empty value
1893
    :returns: Plone DisplayList
1894
    """
1895
    dl = DisplayList()
1896
1897
    if isinstance(pairs, six.string_types):
1898
        pairs = [pairs, pairs]
1899
    for pair in pairs:
1900
        # pairs is a list of lists -> add each pair
1901
        if isinstance(pair, (tuple, list)):
1902
            dl.add(*pair)
1903
        # pairs is just a single pair -> add it and stop
1904
        if isinstance(pair, six.string_types):
1905
            dl.add(*pairs)
1906
            break
1907
1908
    # add the empty option
1909
    if allow_empty:
1910
        dl.add("", "")
1911
1912
    # sort by key/value
1913
    if sort_by == "key":
1914
        dl = dl.sortedByKey()
1915
    elif sort_by == "value":
1916
        dl = dl.sortedByValue()
1917
1918
    return dl
1919
1920
1921
def text_to_html(text, wrap="p", encoding="utf8"):
1922
    """Convert `\n` sequences in the text to HTML `\n`
1923
1924
    :param text: Plain text to convert
1925
    :param wrap: Toggle to wrap the text in a
1926
    :returns: HTML converted and encoded text
1927
    """
1928
    if not text:
1929
        return ""
1930
    # handle text internally as unicode
1931
    text = safe_unicode(text)
1932
    # replace newline characters with HTML entities
1933
    html = text.replace("\n", "<br/>")
1934
    if wrap:
1935
        html = u"<{tag}>{html}</{tag}>".format(
1936
            tag=wrap, html=html)
1937
    # return encoded html
1938
    return html.encode(encoding)
1939
1940
1941
def to_utf8(string, default=_marker):
1942
    """Encode string to UTF8
1943
1944
    :param string: String to be encoded to UTF8
1945
    :returns: UTF8 encoded string
1946
    """
1947
    if not isinstance(string, six.string_types):
1948
        if default is _marker:
1949
            fail("Expected string type, got '%s'" % type(string))
1950
        return default
1951
    return safe_unicode(string).encode("utf8")
1952
1953
1954
def is_temporary(obj):
1955
    """Returns whether the given object is temporary or not
1956
1957
    :param obj: the object to evaluate
1958
    :returns: True if the object is temporary
1959
    """
1960
    if ITemporaryObject.providedBy(obj):
1961
        return True
1962
1963
    obj_id = getattr(aq_base(obj), "id", None)
1964
    if obj_id is None or UID_RX.match(obj_id):
1965
        return True
1966
1967
    parent = aq_parent(aq_inner(obj))
1968
    if not parent:
1969
        return True
1970
1971
    parent_id = getattr(aq_base(parent), "id", None)
1972
    if parent_id is None or UID_RX.match(parent_id):
1973
        return True
1974
1975
    # Checks to see if we are created inside the portal_factory.
1976
    # This might also happen for DX types in senaite.databox!
1977
    meta_type = getattr(aq_base(parent), "meta_type", "")
1978
    if meta_type == "TempFolder":
1979
        return True
1980
1981
    return False
1982
1983
1984
def mark_temporary(brain_or_object):
1985
    """Mark the object as temporary
1986
    """
1987
    obj = get_object(brain_or_object)
1988
    alsoProvides(obj, ITemporaryObject)
1989
1990
1991
def unmark_temporary(brain_or_object):
1992
    """Unmark the object as temporary
1993
    """
1994
    obj = get_object(brain_or_object)
1995
    noLongerProvides(obj, ITemporaryObject)
1996
1997
1998
def is_string(thing):
1999
    """Checks if the passed in object is a string type
2000
2001
    :param thing: object to test
2002
    :returns: True if the object is a string
2003
    """
2004
    return isinstance(thing, six.string_types)
2005
2006
2007
def is_list(thing):
2008
    """Checks if the passed in object is a list type
2009
2010
    :param thing: object to test
2011
    :returns: True if the object is a list
2012
    """
2013
    return isinstance(thing, list)
2014
2015
2016
def is_list_iterable(thing):
2017
    """Checks if the passed in object can be iterated like a list
2018
2019
    :param thing: object to test
2020
    :returns: True if the object is a list, tuple or set
2021
    """
2022
    return isinstance(thing, (list, tuple, set))
2023
2024
2025
def parse_json(thing, default=""):
2026
    """Parse from JSON
2027
2028
    :param thing: thing to parse
2029
    :param default: value to return if cannot parse
2030
    :returns: the object representing the JSON or default
2031
    """
2032
    try:
2033
        return json.loads(thing)
2034
    except (TypeError, ValueError):
2035
        return default
2036
2037
2038
def to_list(value):
2039
    """Converts the value to a list
2040
2041
    :param value: the value to be represented as a list
2042
    :returns: a list that represents or contains the value
2043
    """
2044
    if is_string(value):
2045
        val = parse_json(value)
2046
        if isinstance(val, (list, tuple, set)):
2047
            value = val
2048
    if not isinstance(value, (list, tuple, set)):
2049
        value = [value]
2050
    return list(value)
2051