Passed
Push — 2.x ( ac1fab...0dfdd1 )
by Ramon
06:57
created

bika.lims.api.safe_unicode()   B

Complexity

Conditions 6

Size

Total Lines 28
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 28
rs 8.6666
c 0
b 0
f 0
cc 6
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from AccessControl.Permissions import copy_or_move as CopyOrMove
33
from Acquisition import aq_base
34
from Acquisition import aq_inner
35
from Acquisition import aq_parent
36
from bika.lims import logger
37
from bika.lims.interfaces import IClient
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from OFS.event import ObjectWillBeMovedEvent
41
from plone import api as ploneapi
42
from plone.api.exc import InvalidParameterError
43
from plone.app.layout.viewlets.content import ContentHistoryView
44
from plone.behavior.interfaces import IBehaviorAssignable
45
from plone.behavior.registration import lookup_behavior_registration
46
from plone.dexterity.interfaces import IDexterityContent
47
from plone.dexterity.schema import SchemaInvalidatedEvent
48
from plone.dexterity.utils import addContentToContainer
49
from plone.dexterity.utils import createContent
50
from plone.dexterity.utils import resolveDottedName
51
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
52
from plone.i18n.normalizer.interfaces import IIDNormalizer
53
from plone.memoize.volatile import DontCache
54
from Products.Archetypes.atapi import DisplayList
55
from Products.Archetypes.BaseObject import BaseObject
56
from Products.Archetypes.event import ObjectInitializedEvent
57
from Products.Archetypes.public import StringField
58
from Products.Archetypes.utils import mapply
59
from Products.CMFCore.interfaces import IFolderish
60
from Products.CMFCore.interfaces import ISiteRoot
61
from Products.CMFCore.permissions import DeleteObjects
62
from Products.CMFCore.permissions import ModifyPortalContent
63
from Products.CMFCore.permissions import View
64
from Products.CMFCore.utils import getToolByName
65
from Products.CMFCore.WorkflowCore import WorkflowException
66
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
67
from Products.CMFPlone.utils import _createObjectByType
68
from Products.CMFPlone.utils import base_hasattr
69
from Products.PlonePAS.tools.memberdata import MemberData
70
from Products.ZCatalog.interfaces import ICatalogBrain
71
from senaite.core.interfaces import IContact
72
from senaite.core.interfaces import IContacts
73
from senaite.core.interfaces import ITemporaryObject
74
from z3c.form.validator import Data as ValidatorData
75
from zope import globalrequest
76
from zope.annotation.interfaces import IAttributeAnnotatable
77
from zope.component import getUtility
78
from zope.component import queryMultiAdapter
79
from zope.container.contained import notifyContainerModified
80
from zope.event import notify
81
from zope.i18n import translate
82
from zope.interface import Invalid
83
from zope.interface import alsoProvides
84
from zope.interface import directlyProvides
85
from zope.interface import noLongerProvides
86
from zope.lifecycleevent import ObjectMovedEvent
87
from zope.publisher.browser import TestRequest
88
from zope.schema import getFieldsInOrder
89
from zope.schema.interfaces import RequiredMissing
90
from zope.schema.interfaces import WrongType
91
from zope.security.interfaces import Unauthorized
92
93
"""SENAITE LIMS Framework API
94
95
Please see tests/doctests/API.rst for documentation.
96
97
Architecural Notes:
98
99
Please add only functions that do a single thing for a single object.
100
101
Good: `def get_foo(brain_or_object)`
102
Bad:  `def get_foos(list_of_brain_objects)`
103
104
Why?
105
106
Because it makes things more complex. You can always use a pattern like this to
107
achieve the same::
108
109
    >>> foos = map(get_foo, list_of_brain_objects)
110
111
Please add for all functions a descriptive test in tests/doctests/API.rst.
112
113
Thanks.
114
"""
115
116
_marker = object()
117
118
UID_RX = re.compile("[a-z0-9]{32}$")
119
120
UID_CATALOG = "uid_catalog"
121
PORTAL_CATALOG = "portal_catalog"
122
123
# fields that are not validated by the API
124
SKIP_VALIDATION_FIELDS = [
125
    "allow_discussion",
126
    "contributors",
127
    "creators",
128
    "effective",
129
    "exclude_from_nav",
130
    "expires",
131
    "language",
132
    "nextPreviousEnabled",
133
    "relatedItems",
134
    "rights",
135
    "subjects",
136
]
137
138
class APIError(Exception):
139
    """Base exception class for bika.lims errors."""
140
141
142
def get_portal():
143
    """Get the portal object
144
145
    :returns: Portal object
146
    """
147
    return ploneapi.portal.getSite()
148
149
150
def get_setup():
151
    """Fetch the `bika_setup` folder.
152
    """
153
    portal = get_portal()
154
    return portal.get("bika_setup")
155
156
157
def get_bika_setup():
158
    """Fetch the `bika_setup` folder.
159
    """
160
    return get_setup()
161
162
163
def get_senaite_setup():
164
    """Fetch the new DX `setup` folder.
165
    """
166
    portal = get_portal()
167
    return portal.get("setup")
168
169
170
def create(container, portal_type, *args, **kwargs):
171
    """Creates an object in Bika LIMS
172
173
    This code uses most of the parts from the TypesTool
174
    see: `Products.CMFCore.TypesTool._constructInstance`
175
176
    :param container: container
177
    :type container: ATContentType/DexterityContentType/CatalogBrain
178
    :param portal_type: The portal type to create, e.g. "Client"
179
    :type portal_type: string
180
    :param title: The title for the new content object
181
    :type title: string
182
    :returns: The new created object
183
    """
184
    from bika.lims.utils import tmpID
185
186
    tmp_id = tmpID()
187
    id = kwargs.pop("id", "")
188
    title = kwargs.pop("title", "")
189
190
    # get the fti
191
    types_tool = get_tool("portal_types")
192
    fti = types_tool.getTypeInfo(portal_type)
193
194
    if fti.product:
195
        # create the AT object
196
        obj = _createObjectByType(portal_type, container, id or tmp_id)
197
        # update the object with values
198
        edit(obj, check_permissions=False, title=title, **kwargs)
199
        # auto-id if required
200
        if obj._at_rename_after_creation:
201
            obj._renameAfterCreation(check_auto_id=True)
202
        # we are no longer under creation
203
        obj.unmarkCreationFlag()
204
        # notify that the object was created
205
        notify(ObjectInitializedEvent(obj))
206
    else:
207
        content = createContent(portal_type, **kwargs)
208
        content.id = id
209
        content.title = title
210
        obj = addContentToContainer(container, content)
211
212
    return obj
213
214
215
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
216
    """Creates a copy of the source object. If container is None, creates the
217
    copy inside the same container as the source. If portal_type is specified,
218
    creates a new object of this type, and copies the values from source fields
219
    to the destination object. Field values sent as kwargs have priority over
220
    the field values from source.
221
222
    :param source: object from which create a copy
223
    :type source: ATContentType/DexterityContentType/CatalogBrain
224
    :param container: destination container
225
    :type container: ATContentType/DexterityContentType/CatalogBrain
226
    :param portal_type: destination portal type
227
    :returns: The new created object
228
    """
229
    # Prevent circular dependencies
230
    from security import check_permission
231
232
    # Use same container as source unless explicitly set
233
    source = get_object(source)
234
    if not container:
235
        container = get_parent(source)
236
237
    # Use same portal type as source unless explicitly set
238
    if not portal_type:
239
        portal_type = get_portal_type(source)
240
241
    # Extend the fields to skip with defaults
242
    skip = kwargs.pop("skip", [])
243
    skip = set(skip)
244
    skip.update([
245
        "Products.Archetypes.Field.ComputedField",
246
        "UID",
247
        "id",
248
        "allowDiscussion",
249
        "contributors",
250
        "creation_date",
251
        "creators",
252
        "effectiveDate",
253
        "expirationDate",
254
        "language",
255
        "location",
256
        "modification_date",
257
        "rights",
258
        "subject",
259
    ])
260
    # Build a dict for complexity reduction
261
    skip = dict([(item, True) for item in skip])
262
263
    # Update kwargs with the field values to copy from source
264
    fields = get_fields(source)
265
    for field_name, field in fields.items():
266
        # Prioritize field values passed as kwargs
267
        if field_name in kwargs:
268
            continue
269
        # Skip framework internal fields by name
270
        if skip.get(field_name, False):
271
            continue
272
        # Skip fields of non-suitable types
273
        if hasattr(field, "getType") and skip.get(field.getType(), False):
274
            continue
275
        # Skip readonly fields
276
        if getattr(field, "readonly", False):
277
            continue
278
        # Skip non-readable fields
279
        perm = getattr(field, "read_permission", View)
280
        if perm and not check_permission(perm, source):
281
            continue
282
283
        # do not wake-up objects unnecessarily
284
        if hasattr(field, "getRaw"):
285
            field_value = field.getRaw(source)
286
        elif hasattr(field, "get_raw"):
287
            field_value = field.get_raw(source)
288
        elif hasattr(field, "getAccessor"):
289
            accessor = field.getAccessor(source)
290
            field_value = accessor()
291
        else:
292
            field_value = field.get(source)
293
294
        # Do a hard copy of value if mutable type
295
        if isinstance(field_value, (list, dict, set)):
296
            field_value = copy.deepcopy(field_value)
297
        kwargs.update({field_name: field_value})
298
299
    # Create a copy
300
    return create(container, portal_type, *args, **kwargs)
301
302
303
def edit(obj, check_permissions=True, **kwargs):
304
    """Updates the values of object fields with the new values passed-in
305
    """
306
    # Prevent circular dependencies
307
    from security import check_permission
308
    fields = get_fields(obj)
309
    for name, value in kwargs.items():
310
        field = fields.get(name, None)
311
        if not field:
312
            continue
313
314
        # cannot update readonly fields
315
        readonly = getattr(field, "readonly", False)
316
        if readonly:
317
            raise ValueError("Field '{}' is readonly".format(name))
318
319
        # check field writable permission
320
        if check_permissions:
321
            perm = getattr(field, "write_permission", ModifyPortalContent)
322
            if perm and not check_permission(perm, obj):
323
                raise Unauthorized("Field '{}' is not writeable".format(name))
324
325
        # Set the value
326
        if hasattr(field, "getMutator"):
327
            mutator = field.getMutator(obj)
328
            mapply(mutator, value)
329
        else:
330
            field.set(obj, value)
331
332
333
def move_object(obj, destination, check_constraints=True):
334
    """Moves the object to the destination folder
335
336
    This function has the same effect as:
337
338
        id = obj.getId()
339
        cp = origin.manage_cutObjects(id)
340
        destination.manage_pasteObjects(cp)
341
342
    but with slightly better performance. The code is mostly grabbed from
343
    OFS.CopySupport.CopyContainer_pasteObjects
344
345
    :param obj: object to move to destination
346
    :type obj: ATContentType/DexterityContentType/CatalogBrain/UID
347
    :param destination: destination container
348
    :type destination: ATContentType/DexterityContentType/CatalogBrain/UID
349
    :param check_constraints: constraints and permissions must be checked
350
    :type check_constraints: bool
351
    :returns: The moved object
352
    """
353
    # prevent circular dependencies
354
    from bika.lims.api.security import check_permission
355
356
    obj = get_object(obj)
357
    destination = get_object(destination)
358
359
    # make sure the object is not moved into itself
360
    if obj == destination:
361
        raise ValueError("Cannot move object into itself: {}".format(obj))
362
363
    # do nothing if destination is the same as origin
364
    origin = get_parent(obj)
365
    if origin == destination:
366
        return obj
367
368
    if check_constraints:
369
370
        # check origin object has CopyOrMove permission
371
        if not check_permission(CopyOrMove, obj):
372
            raise Unauthorized("Cannot move {}".format(obj))
373
374
        # check if portal type is allowed in destination object
375
        portal_type = get_portal_type(obj)
376
        pt = get_tool("portal_types")
377
        ti = pt.getTypeInfo(destination)
378
        if not ti.allowType(portal_type):
379
            raise ValueError("Disallowed subobject type: %s" % portal_type)
380
381
    id = get_id(obj)
382
383
    # notify that the object will be copied to destination
384
    obj._notifyOfCopyTo(destination, op=1)  # noqa
385
386
    # notify that the object will be moved to destination
387
    notify(ObjectWillBeMovedEvent(obj, origin, id, destination, id))
388
389
    # effectively move the object from origin to destination
390
    delete(obj, check_permissions=check_constraints, suppress_events=True)
391
    obj = aq_base(obj)
392
    destination._setObject(id, obj, set_owner=0, suppress_events=True)  # noqa
393
    obj = destination._getOb(id)  # noqa
394
395
    # since we used "suppress_events=True", we need to manually notify that the
396
    # object was moved and containers modified. This also makes the objects to
397
    # be re-catalogued
398
    notify(ObjectMovedEvent(obj, origin, id, destination, id))
399
    notifyContainerModified(origin)
400
    notifyContainerModified(destination)
401
402
    # make ownership implicit if possible, so it acquires the permissions from
403
    # the container
404
    obj.manage_changeOwnershipType(explicit=0)
405
406
    return obj
407
408
409
def uncatalog_object(obj, recursive=False):
410
    """Un-catalog the object from all catalogs
411
412
    :param obj: object to un-catalog
413
    :param recursive: recursively uncatalog all child objects
414
    :type obj: ATContentType/DexterityContentType
415
    """
416
    # un-catalog from registered catalogs
417
    obj.unindexObject()
418
    # explicitly un-catalog from uid_catalog
419
    uid_catalog = get_tool("uid_catalog")
420
    # the uids of uid_catalog are relative paths to portal root
421
    # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
422
    url = "/".join(obj.getPhysicalPath()[2:])
423
    uid_catalog.uncatalog_object(url)
424
425
    if recursive:
426
        for child in obj.objectValues():
427
            uncatalog_object(child, recursive=recursive)
428
429
430
def catalog_object(obj, recursive=False):
431
    """Re-catalog the object
432
433
    :param obj: object to catalog
434
    :param recursive: recursively catalog all child objects
435
    :type obj: ATContentType/DexterityContentType
436
    """
437
    if is_at_content(obj):
438
        # explicitly re-catalog AT types at uid_catalog (DX types are
439
        # automatically reindexed in UID catalog on reindexObject)
440
        uc = get_tool("uid_catalog")
441
        # the uids of uid_catalog are relative paths to portal root
442
        # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
443
        url = "/".join(obj.getPhysicalPath()[2:])
444
        uc.catalog_object(obj, url)
445
    obj.reindexObject()
446
447
    if recursive:
448
        for child in obj.objectValues():
449
            catalog_object(child, recursive=recursive)
450
451
452
def delete(obj, check_permissions=True, suppress_events=False):
453
    """Deletes the given object
454
455
    :param obj: object to un-catalog
456
    :param check_permissions: whether delete permission must be checked
457
    :param suppress_events: whether ondelete events have to be fired
458
    :type obj: ATContentType/DexterityContentType
459
    """
460
    from security import check_permission
461
    if check_permissions and not check_permission(DeleteObjects, obj):
462
        raise Unauthorized("Do not have permissions to remove this object")
463
464
    # un-catalog the object from all catalogs (uid_catalog included)
465
    uncatalog_object(obj)
466
    # delete the object
467
    parent = get_parent(obj)
468
    parent._delObject(obj.getId(), suppress_events=suppress_events)
469
470
471
def get_tool(name, context=None, default=_marker):
472
    """Get a portal tool by name
473
474
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
475
    :type name: string
476
    :param context: A portal object
477
    :type context: ATContentType/DexterityContentType/CatalogBrain
478
    :returns: Portal Tool
479
    """
480
481
    # Try first with the context
482
    if context is not None:
483
        try:
484
            context = get_object(context)
485
            return getToolByName(context, name)
486
        except (APIError, AttributeError) as e:
487
            # https://github.com/senaite/bika.lims/issues/396
488
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
489
                        "-> falling back to plone.api.portal.get_tool('{}')"
490
                        .format(repr(context), name, repr(e), name))
491
            return get_tool(name, default=default)
492
493
    # Try with the plone api
494
    try:
495
        return ploneapi.portal.get_tool(name)
496
    except InvalidParameterError:
497
        if default is not _marker:
498
            if isinstance(default, six.string_types):
499
                return get_tool(default)
500
            return default
501
        fail("No tool named '%s' found." % name)
502
503
504
def fail(msg=None):
505
    """API LIMS Error
506
    """
507
    if msg is None:
508
        msg = "Reason not given."
509
    raise APIError("{}".format(msg))
510
511
512
def is_object(brain_or_object):
513
    """Check if the passed in object is a supported portal content object
514
515
    :param brain_or_object: A single catalog brain or content object
516
    :type brain_or_object: Portal Object
517
    :returns: True if the passed in object is a valid portal content
518
    """
519
    if is_portal(brain_or_object):
520
        return True
521
    if is_supermodel(brain_or_object):
522
        return True
523
    if is_at_content(brain_or_object):
524
        return True
525
    if is_dexterity_content(brain_or_object):
526
        return True
527
    if is_brain(brain_or_object):
528
        return True
529
    return False
530
531
532
def get_object(brain_object_uid, default=_marker):
533
    """Get the full content object
534
535
    :param brain_object_uid: A catalog brain or content object or uid
536
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
537
    /CatalogBrain/basestring
538
    :returns: The full object
539
    """
540
    if is_uid(brain_object_uid):
541
        return get_object_by_uid(brain_object_uid, default=default)
542
    elif is_supermodel(brain_object_uid):
543
        return brain_object_uid.instance
544
    if not is_object(brain_object_uid):
545
        if default is _marker:
546
            fail("{} is not supported.".format(repr(brain_object_uid)))
547
        return default
548
    if is_brain(brain_object_uid):
549
        return brain_object_uid.getObject()
550
    return brain_object_uid
551
552
553
def is_portal(brain_or_object):
554
    """Checks if the passed in object is the portal root object
555
556
    :param brain_or_object: A single catalog brain or content object
557
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
558
    :returns: True if the object is the portal root object
559
    :rtype: bool
560
    """
561
    return ISiteRoot.providedBy(brain_or_object)
562
563
564
def is_brain(brain_or_object):
565
    """Checks if the passed in object is a portal catalog brain
566
567
    :param brain_or_object: A single catalog brain or content object
568
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
569
    :returns: True if the object is a catalog brain
570
    :rtype: bool
571
    """
572
    return ICatalogBrain.providedBy(brain_or_object)
573
574
575
def is_supermodel(brain_or_object):
576
    """Checks if the passed in object is a supermodel
577
578
    :param brain_or_object: A single catalog brain or content object
579
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
580
    :returns: True if the object is a catalog brain
581
    :rtype: bool
582
    """
583
    # avoid circular imports
584
    from senaite.app.supermodel.interfaces import ISuperModel
585
    return ISuperModel.providedBy(brain_or_object)
586
587
588
def is_dexterity_content(brain_or_object):
589
    """Checks if the passed in object is a dexterity content type
590
591
    :param brain_or_object: A single catalog brain or content object
592
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
593
    :returns: True if the object is a dexterity content type
594
    :rtype: bool
595
    """
596
    return IDexterityContent.providedBy(brain_or_object)
597
598
599
def is_at_content(brain_or_object):
600
    """Checks if the passed in object is an AT content type
601
602
    :param brain_or_object: A single catalog brain or content object
603
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
604
    :returns: True if the object is an AT content type
605
    :rtype: bool
606
    """
607
    return isinstance(brain_or_object, BaseObject)
608
609
610
def is_dx_type(portal_type):
611
    """Checks if the portal type is DX based
612
613
    :param portal_type: The portal type name to check
614
    :returns: True if the portal type is DX based
615
    """
616
    portal_types = get_tool("portal_types")
617
    fti = portal_types.getTypeInfo(portal_type)
618
    if fti.product:
619
        return False
620
    return True
621
622
623
def is_at_type(portal_type):
624
    """Checks if the portal type is AT based
625
626
    :param portal_type: The portal type name to check
627
    :returns: True if the portal type is AT based
628
    """
629
    return not is_dx_type(portal_type)
630
631
632
def is_folderish(brain_or_object):
633
    """Checks if the passed in object is folderish
634
635
    :param brain_or_object: A single catalog brain or content object
636
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
637
    :returns: True if the object is folderish
638
    :rtype: bool
639
    """
640
    if hasattr(brain_or_object, "is_folderish"):
641
        if callable(brain_or_object.is_folderish):
642
            return brain_or_object.is_folderish()
643
        return brain_or_object.is_folderish
644
    return IFolderish.providedBy(get_object(brain_or_object))
645
646
647
def get_portal_type(brain_or_object):
648
    """Get the portal type for this object
649
650
    :param brain_or_object: A single catalog brain or content object
651
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
652
    :returns: Portal type
653
    :rtype: string
654
    """
655
    if not is_object(brain_or_object):
656
        fail("{} is not supported.".format(repr(brain_or_object)))
657
    return brain_or_object.portal_type
658
659
660
def get_schema(brain_or_object):
661
    """Get the schema of the content
662
663
    :param brain_or_object: A single catalog brain or content object
664
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
665
    :returns: Schema object
666
    """
667
    obj = get_object(brain_or_object)
668
    if is_portal(obj):
669
        fail("get_schema can't return schema of portal root")
670
    if is_dexterity_content(obj):
671
        pt = get_tool("portal_types")
672
        fti = pt.getTypeInfo(obj.portal_type)
673
        return fti.lookupSchema()
674
    if is_at_content(obj):
675
        return obj.Schema()
676
    fail("{} has no Schema.".format(brain_or_object))
677
678
679
def get_behaviors(portal_type):
680
    """List all behaviors
681
682
    :param portal_type: DX portal type name
683
    """
684
    portal_types = get_tool("portal_types")
685
    fti = portal_types.getTypeInfo(portal_type)
686
    if fti.product:
687
        raise TypeError("Expected DX type, got AT type instead.")
688
    return fti.behaviors
689
690
691
def enable_behavior(portal_type, behavior_id):
692
    """Enable behavior
693
694
    :param portal_type: DX portal type name
695
    :param behavior_id: The behavior to enable
696
    """
697
    portal_types = get_tool("portal_types")
698
    fti = portal_types.getTypeInfo(portal_type)
699
    if fti.product:
700
        raise TypeError("Expected DX type, got AT type instead.")
701
702
    if behavior_id not in fti.behaviors:
703
        fti.behaviors += (behavior_id, )
704
        # invalidate schema cache
705
        notify(SchemaInvalidatedEvent(portal_type))
706
707
708
def disable_behavior(portal_type, behavior_id):
709
    """Disable behavior
710
711
    :param portal_type: DX portal type name
712
    :param behavior_id: The behavior to disable
713
    """
714
    portal_types = get_tool("portal_types")
715
    fti = portal_types.getTypeInfo(portal_type)
716
    if fti.product:
717
        raise TypeError("Expected DX type, got AT type instead.")
718
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
719
    # invalidate schema cache
720
    notify(SchemaInvalidatedEvent(portal_type))
721
722
723
def get_fields(brain_or_object):
724
    """Get a name to field mapping of the object
725
726
    :param brain_or_object: A single catalog brain or content object
727
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
728
    :returns: Mapping of name -> field
729
    :rtype: OrderedDict
730
    """
731
    obj = get_object(brain_or_object)
732
    schema = get_schema(obj)
733
    if is_dexterity_content(obj):
734
        # get the fields directly provided by the interface
735
        fields = getFieldsInOrder(schema)
736
        # append the fields coming from behaviors
737
        behavior_assignable = IBehaviorAssignable(obj)
738
        if behavior_assignable:
739
            behaviors = behavior_assignable.enumerateBehaviors()
740
            for behavior in behaviors:
741
                fields.extend(getFieldsInOrder(behavior.interface))
742
        return OrderedDict(fields)
743
    return OrderedDict(schema)
744
745
746
def get_id(brain_or_object):
747
    """Get the Plone ID for this object
748
749
    :param brain_or_object: A single catalog brain or content object
750
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
751
    :returns: Plone ID
752
    :rtype: string
753
    """
754
    if is_brain(brain_or_object):
755
        if base_hasattr(brain_or_object, "getId"):
756
            return brain_or_object.getId
757
        if base_hasattr(brain_or_object, "id"):
758
            return brain_or_object.id
759
    return get_object(brain_or_object).getId()
760
761
762
def get_title(brain_or_object):
763
    """Get the Title for this object
764
765
    :param brain_or_object: A single catalog brain or content object
766
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
767
    :returns: Title
768
    :rtype: string
769
    """
770
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
771
        return brain_or_object.Title
772
    return get_object(brain_or_object).Title()
773
774
775
def get_description(brain_or_object):
776
    """Get the Title for this object
777
778
    :param brain_or_object: A single catalog brain or content object
779
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
780
    :returns: Title
781
    :rtype: string
782
    """
783
    if is_brain(brain_or_object) \
784
            and base_hasattr(brain_or_object, "Description"):
785
        return brain_or_object.Description
786
    return get_object(brain_or_object).Description()
787
788
789
def get_uid(brain_or_object):
790
    """Get the Plone UID for this object
791
792
    :param brain_or_object: A single catalog brain or content object or an UID
793
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
794
    :returns: Plone UID
795
    :rtype: string
796
    """
797
    if is_uid(brain_or_object):
798
        return brain_or_object
799
    if is_portal(brain_or_object):
800
        return '0'
801
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
802
        return brain_or_object.UID
803
    return get_object(brain_or_object).UID()
804
805
806
def get_url(brain_or_object):
807
    """Get the absolute URL for this object
808
809
    :param brain_or_object: A single catalog brain or content object
810
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
811
    :returns: Absolute URL
812
    :rtype: string
813
    """
814
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
815
        return brain_or_object.getURL()
816
    return get_object(brain_or_object).absolute_url()
817
818
819
def get_icon(thing, html_tag=True):
820
    """Get the icon of the content object
821
822
    :param thing: A single catalog brain, content object or portal_type
823
    :type thing: ATContentType/DexterityContentType/CatalogBrain/String
824
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
825
    :type html_tag: bool
826
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
827
    :rtype: string
828
    """
829
    portal_type = thing
830
    if is_object(thing):
831
        portal_type = get_portal_type(thing)
832
833
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
834
    # work for Contents coming from other catalogs than the
835
    # `portal_catalog`
836
    portal_types = get_tool("portal_types")
837
    fti = portal_types.getTypeInfo(portal_type)
838
    if not fti:
839
        fail("No type info for {}".format(repr(thing)))
840
    icon = fti.getIcon()
841
    if not icon:
842
        return ""
843
    url = "%s/%s" % (get_url(get_portal()), icon)
844
    if not html_tag:
845
        return url
846
847
    # build the img element
848
    if is_object(thing):
849
        title = get_title(thing)
850
    else:
851
        title = fti.Title()
852
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'
853
    return tag.format(url=url, title=title)
854
855
856
def get_object_by_uid(uid, default=_marker):
857
    """Find an object by a given UID
858
859
    :param uid: The UID of the object to find
860
    :type uid: string
861
    :returns: Found Object or None
862
    """
863
864
    # nothing to do here
865
    if not uid:
866
        if default is not _marker:
867
            return default
868
        fail("get_object_by_uid requires UID as first argument; got {} instead"
869
             .format(uid))
870
871
    # we defined the portal object UID to be '0'::
872
    if uid == '0':
873
        return get_portal()
874
875
    brain = get_brain_by_uid(uid)
876
877
    if brain is None:
878
        if default is not _marker:
879
            return default
880
        fail("No object found for UID {}".format(uid))
881
882
    return get_object(brain)
883
884
885
def get_brain_by_uid(uid, default=None):
886
    """Query a brain by a given UID
887
888
    :param uid: The UID of the object to find
889
    :type uid: string
890
    :returns: ZCatalog brain or None
891
    """
892
    if not is_uid(uid):
893
        return default
894
895
    # we try to find the object with the UID catalog
896
    uc = get_tool(UID_CATALOG)
897
898
    # try to find the object with the reference catalog first
899
    brains = uc(UID=uid)
900
    if len(brains) != 1:
901
        return default
902
    return brains[0]
903
904
905
def get_object_by_path(path, default=_marker):
906
    """Find an object by a given physical path or absolute_url
907
908
    :param path: The physical path of the object to find
909
    :type path: string
910
    :returns: Found Object or None
911
    """
912
913
    # nothing to do here
914
    if not path:
915
        if default is not _marker:
916
            return default
917
        fail("get_object_by_path first argument must be a path; {} received"
918
             .format(path))
919
920
    portal = get_portal()
921
    portal_path = get_path(portal)
922
    portal_url = get_url(portal)
923
924
    # ensure we have a physical path
925
    if path.startswith(portal_url):
926
        request = get_request()
927
        path = "/".join(request.physicalPathFromURL(path))
928
929
    if not path.startswith(portal_path):
930
        if default is not _marker:
931
            return default
932
        fail("Not a physical path inside the portal.")
933
934
    if path == portal_path:
935
        return portal
936
937
    return portal.restrictedTraverse(path, default)
938
939
940
def get_path(brain_or_object):
941
    """Calculate the physical path of this object
942
943
    :param brain_or_object: A single catalog brain or content object
944
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
945
    :returns: Physical path of the object
946
    :rtype: string
947
    """
948
    if is_brain(brain_or_object):
949
        return brain_or_object.getPath()
950
    return "/".join(get_object(brain_or_object).getPhysicalPath())
951
952
953
def get_parent_path(brain_or_object):
954
    """Calculate the physical parent path of this object
955
956
    :param brain_or_object: A single catalog brain or content object
957
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
958
    :returns: Physical path of the parent object
959
    :rtype: string
960
    """
961
    if is_portal(brain_or_object):
962
        return get_path(get_portal())
963
    if is_brain(brain_or_object):
964
        path = get_path(brain_or_object)
965
        return path.rpartition("/")[0]
966
    return get_path(get_object(brain_or_object).aq_parent)
967
968
969
def get_parent(brain_or_object, **kw):
970
    """Locate the parent object of the content/catalog brain
971
972
    :param brain_or_object: A single catalog brain or content object
973
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
974
    :param catalog_search: Use a catalog query to find the parent object
975
    :type catalog_search: bool
976
    :returns: parent object
977
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
978
    """
979
980
    if is_portal(brain_or_object):
981
        return get_portal()
982
983
    # BBB: removed `catalog_search` keyword
984
    if kw:
985
        logger.warn("API function `get_parent` no longer support keywords.")
986
987
    return get_object(brain_or_object).aq_parent
988
989
990
def search(query, catalog=_marker):
991
    """Search for objects.
992
993
    :param query: A suitable search query.
994
    :type query: dict
995
    :param catalog: A single catalog id or a list of catalog ids
996
    :type catalog: str/list
997
    :returns: Search results
998
    :rtype: List of ZCatalog brains
999
    """
1000
1001
    # query needs to be a dictionary
1002
    if not isinstance(query, dict):
1003
        fail("Catalog query needs to be a dictionary")
1004
1005
    # Portal types to query
1006
    portal_types = query.get("portal_type", [])
1007
    # We want the portal_type as a list
1008
    if not isinstance(portal_types, (tuple, list)):
1009
        portal_types = [portal_types]
1010
1011
    # The catalogs used for the query
1012
    catalogs = []
1013
1014
    # The user did **not** specify a catalog
1015
    if catalog is _marker:
1016
        # Find the registered catalogs for the queried portal types
1017
        for portal_type in portal_types:
1018
            # Just get the first registered/default catalog
1019
            catalogs.append(get_catalogs_for(
1020
                portal_type, default=UID_CATALOG)[0])
1021
    else:
1022
        # User defined catalogs
1023
        if isinstance(catalog, (list, tuple)):
1024
            catalogs.extend(map(get_tool, catalog))
1025
        else:
1026
            catalogs.append(get_tool(catalog))
1027
1028
    # Cleanup: Avoid duplicate catalogs
1029
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
1030
1031
    # We only support **single** catalog queries
1032
    if len(catalogs) > 1:
1033
        fail("Multi Catalog Queries are not supported!")
1034
1035
    return catalogs[0](query)
1036
1037
1038
def safe_getattr(brain_or_object, attr, default=_marker):
1039
    """Return the attribute value
1040
1041
    :param brain_or_object: A single catalog brain or content object
1042
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1043
    :param attr: Attribute name
1044
    :type attr: str
1045
    :returns: Attribute value
1046
    :rtype: obj
1047
    """
1048
    try:
1049
        value = getattr(brain_or_object, attr, _marker)
1050
        if value is _marker:
1051
            if default is not _marker:
1052
                return default
1053
            fail("Attribute '{}' not found.".format(attr))
1054
        if callable(value):
1055
            return value()
1056
        return value
1057
    except Unauthorized:
1058
        if default is not _marker:
1059
            return default
1060
        fail("You are not authorized to access '{}' of '{}'.".format(
1061
            attr, repr(brain_or_object)))
1062
1063
1064
def get_uid_catalog():
1065
    """Get the UID catalog tool
1066
1067
    :returns: UID Catalog Tool
1068
    """
1069
    return get_tool(UID_CATALOG)
1070
1071
1072
def get_portal_catalog():
1073
    """Get the portal catalog tool
1074
1075
    :returns: Portal Catalog Tool
1076
    """
1077
    return get_tool(PORTAL_CATALOG)
1078
1079
1080
def get_review_history(brain_or_object, rev=True):
1081
    """Get the review history for the given brain or context.
1082
1083
    :param brain_or_object: A single catalog brain or content object
1084
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1085
    :returns: Workflow history
1086
    :rtype: [{}, ...]
1087
    """
1088
    obj = get_object(brain_or_object)
1089
    review_history = []
1090
    try:
1091
        workflow = get_tool("portal_workflow")
1092
        review_history = workflow.getInfoFor(obj, 'review_history')
1093
    except WorkflowException as e:
1094
        message = str(e)
1095
        logger.error("Cannot retrieve review_history on {}: {}".format(
1096
            obj, message))
1097
    if not isinstance(review_history, (list, tuple)):
1098
        logger.error("get_review_history: expected list, received {}".format(
1099
            review_history))
1100
        review_history = []
1101
1102
    if isinstance(review_history, tuple):
1103
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
1104
        # tuple when "review_history" is passed in:
1105
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
1106
        #
1107
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
1108
        # Expression.StateChangeInfo, that always returns a list, except when no
1109
        # review_history is found:
1110
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
1111
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
1112
        review_history = list(review_history)
1113
1114
    if rev is True:
1115
        review_history.reverse()
1116
    return review_history
1117
1118
1119
def get_revision_history(brain_or_object):
1120
    """Get the revision history for the given brain or context.
1121
1122
    :param brain_or_object: A single catalog brain or content object
1123
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1124
    :returns: Workflow history
1125
    :rtype: obj
1126
    """
1127
    obj = get_object(brain_or_object)
1128
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
1129
    return chv.fullHistory()
1130
1131
1132
def get_workflows_for(brain_or_object):
1133
    """Get the assigned workflows for the given brain or context.
1134
1135
    Note: This function supports also the portal_type as parameter.
1136
1137
    :param brain_or_object: A single catalog brain or content object
1138
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1139
    :returns: Assigned Workflows
1140
    :rtype: tuple
1141
    """
1142
    workflow = ploneapi.portal.get_tool("portal_workflow")
1143
    if isinstance(brain_or_object, six.string_types):
1144
        return workflow.getChainFor(brain_or_object)
1145
    obj = get_object(brain_or_object)
1146
    return workflow.getChainFor(obj)
1147
1148
1149
def get_workflow_status_of(brain_or_object, state_var="review_state"):
1150
    """Get the current workflow status of the given brain or context.
1151
1152
    :param brain_or_object: A single catalog brain or content object
1153
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1154
    :param state_var: The name of the state variable
1155
    :type state_var: string
1156
    :returns: Status
1157
    :rtype: str
1158
    """
1159
    # Try to get the state from the catalog brain first
1160
    if is_brain(brain_or_object):
1161
        if state_var in brain_or_object.schema():
1162
            return brain_or_object[state_var]
1163
1164
    # Retrieve the sate from the object
1165
    workflow = get_tool("portal_workflow")
1166
    obj = get_object(brain_or_object)
1167
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1168
1169
1170
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1171
    """Get the previous workflow status of the object
1172
1173
    :param brain_or_object: A single catalog brain or content object
1174
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1175
    :param skip: Workflow states to skip
1176
    :type skip: tuple/list
1177
    :returns: status
1178
    :rtype: str
1179
    """
1180
1181
    skip = isinstance(skip, (list, tuple)) and skip or []
1182
    history = get_review_history(brain_or_object)
1183
1184
    # Remove consecutive duplicates, some transitions might happen more than
1185
    # once consecutively (e.g. publish)
1186
    history = map(lambda i: i[0], groupby(history))
1187
1188
    for num, item in enumerate(history):
1189
        # skip the current history entry
1190
        if num == 0:
1191
            continue
1192
        status = item.get("review_state")
1193
        if status in skip:
1194
            continue
1195
        return status
1196
    return default
1197
1198
1199
def get_creation_date(brain_or_object):
1200
    """Get the creation date of the brain or object
1201
1202
    :param brain_or_object: A single catalog brain or content object
1203
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1204
    :returns: Creation date
1205
    :rtype: DateTime
1206
    """
1207
    created = getattr(brain_or_object, "created", None)
1208
    if created is None:
1209
        fail("Object {} has no creation date ".format(
1210
             repr(brain_or_object)))
1211
    if callable(created):
1212
        return created()
1213
    return created
1214
1215
1216
def get_modification_date(brain_or_object):
1217
    """Get the modification date of the brain or object
1218
1219
    :param brain_or_object: A single catalog brain or content object
1220
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1221
    :returns: Modification date
1222
    :rtype: DateTime
1223
    """
1224
    modified = getattr(brain_or_object, "modified", None)
1225
    if modified is None:
1226
        fail("Object {} has no modification date ".format(
1227
             repr(brain_or_object)))
1228
    if callable(modified):
1229
        return modified()
1230
    return modified
1231
1232
1233
def get_review_status(brain_or_object):
1234
    """Get the `review_state` of an object
1235
1236
    :param brain_or_object: A single catalog brain or content object
1237
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1238
    :returns: Value of the review_status variable
1239
    :rtype: String
1240
    """
1241
    if is_brain(brain_or_object) \
1242
       and base_hasattr(brain_or_object, "review_state"):
1243
        return brain_or_object.review_state
1244
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1245
1246
1247
def is_active(brain_or_object):
1248
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1249
1250
    :param brain_or_object: A single catalog brain or content object
1251
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1252
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1253
    :rtype: bool
1254
    """
1255
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1256
        return False
1257
    return True
1258
1259
1260
def get_fti(portal_type, default=None):
1261
    """Lookup the Dynamic Filetype Information for the given portal_type
1262
1263
    :param portal_type: The portal type to get the FTI for
1264
    :returns: FTI or default value
1265
    """
1266
    if not is_string(portal_type):
1267
        return default
1268
    portal_types = get_tool("portal_types")
1269
    fti = portal_types.getTypeInfo(portal_type)
1270
    return fti or default
1271
1272
1273
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1274
    """Get all registered catalogs for the given portal_type, catalog brain or
1275
    content object
1276
1277
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1278
          work around the missing `uid_catalog` during snapshot creation when
1279
          installing a fresh site!
1280
1281
    :param brain_or_object: The portal_type, a catalog brain or content object
1282
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1283
    :returns: List of supported catalogs
1284
    :rtype: list
1285
    """
1286
1287
    # only handle catalog lookups by portal_type internally
1288
    if is_uid(brain_or_object) or is_object(brain_or_object):
1289
        obj = get_object(brain_or_object)
1290
        portal_type = get_portal_type(obj)
1291
        return get_catalogs_for(portal_type)
1292
1293
    catalogs = []
1294
1295
    if not is_string(brain_or_object):
1296
        raise APIError("Expected a portal_type string, got <%s>"
1297
                       % type(brain_or_object))
1298
1299
    # at this point the brain_or_object is a portal_type
1300
    portal_type = brain_or_object
1301
1302
    # check static portal_type -> catalog mapping first
1303
    from senaite.core.catalog import get_catalogs_by_type
1304
    catalogs = get_catalogs_by_type(portal_type)
1305
1306
    # no catalogs in static mapping
1307
    # => Lookup catalogs by FTI
1308
    if len(catalogs) == 0:
1309
        fti = get_fti(portal_type)
1310
        if fti.product:
1311
            # AT content type
1312
            # => Looup via archetype_tool
1313
            archetype_tool = get_tool("archetype_tool")
1314
            catalogs = archetype_tool.catalog_map.get(portal_type) or []
1315
        else:
1316
            # DX content type
1317
            # => resolve the `_catalogs` attribute from the class
1318
            klass = resolveDottedName(fti.klass)
1319
            # XXX: Refactor multi-catalog behavior to not rely
1320
            #      on this hidden `_catalogs` attribute!
1321
            catalogs = getattr(klass, "_catalogs", [])
1322
1323
    # fetch the catalog objects
1324
    catalogs = filter(None, map(lambda cid: get_tool(cid, None), catalogs))
1325
1326
    if len(catalogs) == 0:
1327
        return [get_tool(default, default=PORTAL_CATALOG)]
1328
1329
    return list(catalogs)
1330
1331
1332
def get_transitions_for(brain_or_object):
1333
    """List available workflow transitions for all workflows
1334
1335
    :param brain_or_object: A single catalog brain or content object
1336
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1337
    :returns: All possible available and allowed transitions
1338
    :rtype: list[dict]
1339
    """
1340
    workflow = get_tool('portal_workflow')
1341
    transitions = []
1342
    instance = get_object(brain_or_object)
1343
    for wfid in get_workflows_for(brain_or_object):
1344
        wf = workflow[wfid]
1345
        tlist = wf.getTransitionsFor(instance)
1346
        transitions.extend([t for t in tlist if t not in transitions])
1347
    return transitions
1348
1349
1350
def do_transition_for(brain_or_object, transition):
1351
    """Performs a workflow transition for the passed in object.
1352
1353
    :param brain_or_object: A single catalog brain or content object
1354
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1355
    :returns: The object where the transtion was performed
1356
    """
1357
    if not isinstance(transition, six.string_types):
1358
        fail("Transition type needs to be string, got '%s'" % type(transition))
1359
    obj = get_object(brain_or_object)
1360
    try:
1361
        ploneapi.content.transition(obj, transition)
1362
    except ploneapi.exc.InvalidParameterError as e:
1363
        fail("Failed to perform transition '{}' on {}: {}".format(
1364
             transition, obj, str(e)))
1365
    return obj
1366
1367
1368
def get_roles_for_permission(permission, brain_or_object):
1369
    """Get a list of granted roles for the given permission on the object.
1370
1371
    :param brain_or_object: A single catalog brain or content object
1372
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1373
    :returns: Roles for the given Permission
1374
    :rtype: list
1375
    """
1376
    obj = get_object(brain_or_object)
1377
    allowed = set(rolesForPermissionOn(permission, obj))
1378
    return sorted(allowed)
1379
1380
1381
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1382
    """Checks if the passed in object is versionable.
1383
1384
    :param brain_or_object: A single catalog brain or content object
1385
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1386
    :returns: True if the object is versionable
1387
    :rtype: bool
1388
    """
1389
    pr = get_tool("portal_repository")
1390
    obj = get_object(brain_or_object)
1391
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1392
        and pr.isVersionable(obj)
1393
1394
1395
def get_version(brain_or_object):
1396
    """Get the version of the current object
1397
1398
    :param brain_or_object: A single catalog brain or content object
1399
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1400
    :returns: The current version of the object, or None if not available
1401
    :rtype: int or None
1402
    """
1403
    obj = get_object(brain_or_object)
1404
    if not is_versionable(obj):
1405
        return None
1406
    return getattr(aq_base(obj), "version_id", 0)
1407
1408
1409
def get_view(name, context=None, request=None, default=None):
1410
    """Get the view by name
1411
1412
    :param name: The name of the view
1413
    :type name: str
1414
    :param context: The context to query the view
1415
    :type context: ATContentType/DexterityContentType/CatalogBrain
1416
    :param request: The request to query the view
1417
    :type request: HTTPRequest object
1418
    :returns: HTTP Request
1419
    :rtype: Products.Five.metaclass View object
1420
    """
1421
    context = context or get_portal()
1422
    request = request or get_request() or None
1423
    view = queryMultiAdapter((get_object(context), request), name=name)
1424
    if view is None:
1425
        return default
1426
    return view
1427
1428
1429
def get_request():
1430
    """Get the global request object
1431
1432
    :returns: HTTP Request
1433
    :rtype: HTTPRequest object
1434
    """
1435
    return globalrequest.getRequest()
1436
1437
1438
def get_test_request():
1439
    """Get the TestRequest object
1440
    """
1441
    request = TestRequest()
1442
    directlyProvides(request, IAttributeAnnotatable)
1443
    return request
1444
1445
1446
def get_group(group_or_groupname):
1447
    """Return Plone Group
1448
1449
    :param group_or_groupname: Plone group or the name of the group
1450
    :type groupname:  GroupData/str
1451
    :returns: Plone GroupData
1452
    """
1453
    if not group_or_groupname:
1454
1455
        return None
1456
    if hasattr(group_or_groupname, "_getGroup"):
1457
        return group_or_groupname
1458
    gtool = get_tool("portal_groups")
1459
    return gtool.getGroupById(group_or_groupname)
1460
1461
1462
def get_user(user_or_username):
1463
    """Return Plone User
1464
1465
    :param user_or_username: Plone user or user id
1466
    :returns: Plone MemberData
1467
    """
1468
    user = None
1469
    if isinstance(user_or_username, MemberData):
1470
        user = user_or_username
1471
    if isinstance(user_or_username, six.string_types):
1472
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1473
    return user
1474
1475
1476
def get_user_properties(user_or_username):
1477
    """Return User Properties
1478
1479
    :param user_or_username: Plone group identifier
1480
    :returns: Plone MemberData
1481
    """
1482
    user = get_user(user_or_username)
1483
    if user is None:
1484
        return {}
1485
    if not callable(user.getUser):
1486
        return {}
1487
    out = {}
1488
    plone_user = user.getUser()
1489
    for sheet in plone_user.listPropertysheets():
1490
        ps = plone_user.getPropertysheet(sheet)
1491
        out.update(dict(ps.propertyItems()))
1492
    return out
1493
1494
1495
def get_users_by_roles(roles=None):
1496
    """Search Plone users by their roles
1497
1498
    :param roles: Plone role name or list of roles
1499
    :type roles:  list/str
1500
    :returns: List of Plone users having the role(s)
1501
    """
1502
    if not isinstance(roles, (tuple, list)):
1503
        roles = [roles]
1504
    mtool = get_tool("portal_membership")
1505
    return mtool.searchForMembers(roles=roles)
1506
1507
1508
def get_current_user():
1509
    """Returns the current logged in user
1510
1511
    :returns: Current User
1512
    """
1513
    return ploneapi.user.get_current()
1514
1515
1516
def get_user_contact(user, contact_types=None):
1517
    """Returns the associated contact of a Plone user
1518
1519
    If the user passed in has no contact associated, return None.
1520
    The `contact_types` parameter filter the portal types for the search.
1521
1522
    :param: Plone user
1523
    :contact_types: List with the contact portal types to search
1524
    :returns: Contact associated to the Plone user or None
1525
    """
1526
    if not user:
1527
        return None
1528
1529
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1530
1531
    if contact_types is None:
1532
        contact_types = ["Contact", "LabContact"]
1533
1534
    query = {"portal_type": contact_types, "getUsername": user.getId()}
1535
    brains = search(query, catalog=CONTACT_CATALOG)
1536
    if not brains:
1537
        return None
1538
1539
    if len(brains) > 1:
1540
        # Oops, the user has multiple contacts assigned, return None
1541
        contacts = map(lambda c: c.Title, brains)
1542
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1543
        err_msg = err_msg.format(user.getId(), ','.join(contacts))
1544
        logger.error(err_msg)
1545
        return None
1546
1547
    return get_object(brains[0])
1548
1549
1550
def get_user_client(user_or_contact):
1551
    """Returns the client of the contact of a Plone user
1552
1553
    If the user passed in has no contact or does not belong to any client,
1554
    returns None.
1555
1556
    :param: Plone user or contact
1557
    :returns: Client the contact of the Plone user belongs to
1558
    """
1559
    if not user_or_contact or is_lab_contact(user_or_contact):
1560
        # Lab contacts cannot belong to a client
1561
        return None
1562
1563
    if not is_contact(user_or_contact):
1564
        contact = get_user_contact(user_or_contact, contact_types=["Contact"])
1565
        if is_client_contact(contact):
1566
            return get_user_client(contact)
1567
        return None
1568
1569
    client = get_parent(user_or_contact)
1570
    if client and IClient.providedBy(client):
1571
        return client
1572
1573
    return None
1574
1575
1576
def get_user_fullname(user_or_contact):
1577
    """Returns the fullname of the contact or Plone user.
1578
1579
    If the user has a linked contact, the fullname of the contact has priority
1580
    over the value of the fullname property from the user
1581
1582
    :param: Plone user or contact
1583
    :returns: Fullname of the contact or user
1584
    """
1585
    if is_contact(user_or_contact):
1586
        return user_or_contact.getFullname()
1587
1588
    user = get_user(user_or_contact)
1589
    if not user:
1590
        return ""
1591
1592
    # contact's fullname has priority over user's
1593
    contact = get_user_contact(user)
1594
    if not contact:
1595
        return user.getProperty("fullname")
1596
1597
    return contact.getFullname()
1598
1599
1600
def get_user_email(user_or_contact):
1601
    """Returns the email of the contact or Plone user.
1602
    If the user has a linked contact, the email of the contact has priority
1603
    over the value of the email property from the user
1604
    :param: Plone user or contact
1605
    :returns: Fullname of the contact or user
1606
    """
1607
    if is_contact(user_or_contact):
1608
        return user_or_contact.getEmailAddress()
1609
1610
    user = get_user(user_or_contact)
1611
    if not user:
1612
        return ""
1613
1614
    # contact's email has priority over user's
1615
    contact = get_user_contact(user)
1616
    if not contact:
1617
        return user.getProperty("email", default="")
1618
1619
    return contact.getEmailAddress()
1620
1621
1622
def is_lab_contact(brain_or_object):
1623
    """Checks if the brain or object is a Lab Contact
1624
1625
    :returns: True if the brain or object is a Lab Contact, False otherwise
1626
    """
1627
    if not is_object(brain_or_object):
1628
        return False
1629
    obj = get_object(brain_or_object)
1630
    return ILabContact.providedBy(obj)
1631
1632
1633
def is_client_contact(brain_or_object):
1634
    """Checks if the brain or object is a Client Contact
1635
1636
    :returns: True if the brain or object is a Client Contact, False otherwise
1637
    """
1638
    if not is_object(brain_or_object):
1639
        return False
1640
    obj = get_object(brain_or_object)
1641
    return IContact.providedBy(obj) and not is_global_contact(obj)
1642
1643
1644
def is_global_contact(brain_or_object):
1645
    """Checks if the brain or object is a global contact
1646
1647
    :returns: True if the brain or object is a global contact, False otherwise
1648
    """
1649
    if not is_object(brain_or_object):
1650
        return False
1651
    obj = get_object(brain_or_object)
1652
    if not IContact.providedBy(obj):
1653
        return False
1654
    parent = get_parent(brain_or_object)
1655
    if not IContacts.providedBy(parent):
1656
        return False
1657
    return True
1658
1659
1660
def is_contact(brain_or_object):
1661
    """Checks if the brain or object is a Client Contact or Lab Contact
1662
1663
    :returns: True if the brain or object is a Contact, False otherwise
1664
    """
1665
    if is_client_contact(brain_or_object):
1666
        return True
1667
    if is_lab_contact(brain_or_object):
1668
        return True
1669
    if is_global_contact(brain_or_object):
1670
        return True
1671
    return False
1672
1673
1674
def get_current_client():
1675
    """Returns the current client the current logged in user belongs to, if any
1676
1677
    :returns: Client the current logged in user belongs to or None
1678
    """
1679
    return get_user_client(get_current_user())
1680
1681
1682
def get_cache_key(brain_or_object):
1683
    """Generate a cache key for a common brain or object
1684
1685
    :param brain_or_object: A single catalog brain or content object
1686
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1687
    :returns: Cache Key
1688
    :rtype: str
1689
    """
1690
    key = [
1691
        get_portal_type(brain_or_object),
1692
        get_id(brain_or_object),
1693
        get_uid(brain_or_object),
1694
        # handle different domains gracefully
1695
        get_url(brain_or_object),
1696
        # Return the microsecond since the epoch in GMT
1697
        get_modification_date(brain_or_object).micros(),
1698
    ]
1699
    return "-".join(map(lambda x: str(x), key))
1700
1701
1702
def bika_cache_key_decorator(method, self, brain_or_object):
1703
    """Bika cache key decorator usable for
1704
1705
    :param brain_or_object: A single catalog brain or content object
1706
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1707
    :returns: Cache Key
1708
    :rtype: str
1709
    """
1710
    if brain_or_object is None:
1711
        raise DontCache
1712
    return get_cache_key(brain_or_object)
1713
1714
1715
def normalize_id(string):
1716
    """Normalize the id
1717
1718
    :param string: A string to normalize
1719
    :type string: str
1720
    :returns: Normalized ID
1721
    :rtype: str
1722
    """
1723
    if not isinstance(string, six.string_types):
1724
        fail("Type of argument must be string, found '{}'"
1725
             .format(type(string)))
1726
    # get the id nomalizer utility
1727
    normalizer = getUtility(IIDNormalizer).normalize
1728
    return normalizer(string)
1729
1730
1731
def normalize_filename(string):
1732
    """Normalize the filename
1733
1734
    :param string: A string to normalize
1735
    :type string: str
1736
    :returns: Normalized ID
1737
    :rtype: str
1738
    """
1739
    if not isinstance(string, six.string_types):
1740
        fail("Type of argument must be string, found '{}'"
1741
             .format(type(string)))
1742
    # get the file nomalizer utility
1743
    normalizer = getUtility(IFileNameNormalizer).normalize
1744
    return normalizer(string)
1745
1746
1747
def is_uid(uid, validate=False):
1748
    """Checks if the passed in uid is a valid UID
1749
1750
    :param uid: The uid to check
1751
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1752
    True, also verifies if a brain exists for the uid passed in
1753
    :type uid: string
1754
    :return: True if a valid uid
1755
    :rtype: bool
1756
    """
1757
    if not isinstance(uid, six.string_types):
1758
        return False
1759
    if uid == '0':
1760
        return True
1761
    if len(uid) != 32:
1762
        return False
1763
    if not UID_RX.match(uid):
1764
        return False
1765
    if not validate:
1766
        return True
1767
1768
    # Check if a brain for this uid exists
1769
    uc = get_tool('uid_catalog')
1770
    brains = uc(UID=uid)
1771
    if brains:
1772
        assert (len(brains) == 1)
1773
    return len(brains) > 0
1774
1775
1776
def is_date(date):
1777
    """Checks if the passed in value is a valid Zope's DateTime
1778
1779
    :param date: The date to check
1780
    :type date: DateTime
1781
    :return: True if a valid date
1782
    :rtype: bool
1783
    """
1784
    if not date:
1785
        return False
1786
    return isinstance(date, (DateTime, datetime))
1787
1788
1789
def to_date(value, default=None):
1790
    """Tries to convert the passed in value to Zope's DateTime
1791
1792
    :param value: The value to be converted to a valid DateTime
1793
    :type value: str, DateTime or datetime
1794
    :return: The DateTime representation of the value passed in or default
1795
    """
1796
1797
    # cannot use bika.lims.deprecated (circular dependencies)
1798
    import warnings
1799
    warnings.simplefilter("always", DeprecationWarning)
1800
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1801
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1802
    warnings.simplefilter("default", DeprecationWarning)
1803
1804
    # prevent circular dependencies
1805
    from senaite.core.api.dtime import to_DT
1806
    date = to_DT(value)
1807
    if not date:
1808
        return to_DT(default)
1809
    return date
1810
1811
1812
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1813
               round_to_int=True):
1814
    """Returns the computed total number of minutes
1815
    """
1816
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1817
        float(seconds)/60 + float(milliseconds)/1000/60
1818
    return int(round(total)) if round_to_int else total
1819
1820
1821
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1822
    """Returns a representation of time in a string in xd yh zm format
1823
    """
1824
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1825
                         seconds=seconds, milliseconds=milliseconds)
1826
    delta = timedelta(minutes=int(round(minutes)))
1827
    d = delta.days
1828
    h = delta.seconds // 3600
1829
    m = (delta.seconds // 60) % 60
1830
    m = m and "{}m ".format(str(m)) or ""
1831
    d = d and "{}d ".format(str(d)) or ""
1832
    if m and d:
1833
        h = "{}h ".format(str(h))
1834
    else:
1835
        h = h and "{}h ".format(str(h)) or ""
1836
    return "".join([d, h, m]).strip()
1837
1838
1839
def to_int(value, default=_marker):
1840
    """Tries to convert the value to int.
1841
    Truncates at the decimal point if the value is a float
1842
1843
    :param value: The value to be converted to an int
1844
    :return: The resulting int or default
1845
    """
1846
    if is_floatable(value):
1847
        value = to_float(value)
1848
    try:
1849
        return int(value)
1850
    except (TypeError, ValueError):
1851
        if default is None:
1852
            return default
1853
        if default is not _marker:
1854
            return to_int(default)
1855
        fail("Value %s cannot be converted to int" % repr(value))
1856
1857
1858
def is_floatable(value):
1859
    """Checks if the passed in value is a valid floatable number
1860
1861
    :param value: The value to be evaluated as a float number
1862
    :type value: str, float, int
1863
    :returns: True if is a valid float number
1864
    :rtype: bool"""
1865
    try:
1866
        float(value)
1867
        return True
1868
    except (TypeError, ValueError):
1869
        return False
1870
1871
1872
def to_float(value, default=_marker):
1873
    """Converts the passed in value to a float number
1874
1875
    :param value: The value to be converted to a floatable number
1876
    :type value: str, float, int
1877
    :returns: The float number representation of the passed in value
1878
    :rtype: float
1879
    """
1880
    if not is_floatable(value):
1881
        if default is not _marker:
1882
            return to_float(default)
1883
        fail("Value %s is not floatable" % repr(value))
1884
    return float(value)
1885
1886
1887
def float_to_string(value, default=_marker):
1888
    """Convert a float value to string without exponential notation
1889
1890
    This function preserves the whole fraction
1891
1892
    :param value: The float value to be converted to a string
1893
    :type value: str, float, int
1894
    :returns: String representation of the float w/o exponential notation
1895
    :rtype: str
1896
    """
1897
    if not is_floatable(value):
1898
        if default is not _marker:
1899
            return default
1900
        fail("Value %s is not floatable" % repr(value))
1901
1902
    # Leave floatable string values unchanged
1903
    if isinstance(value, six.string_types):
1904
        return value
1905
1906
    value = float(value)
1907
    str_value = str(value)
1908
1909
    if "." in str_value:
1910
        # might be something like 1.23e-26
1911
        front, back = str_value.split(".")
1912
    else:
1913
        # or 1e-07 for 0.0000001
1914
        back = str_value
1915
1916
    if "e-" in back:
1917
        fraction, zeros = back.split("e-")
1918
        # we want to cover the faction and the zeros
1919
        precision = len(fraction) + int(zeros)
1920
        template = "{:.%df}" % precision
1921
        str_value = template.format(value)
1922
    elif "e+" in back:
1923
        # positive numbers, e.g. 1e+16 don't need a fractional part
1924
        str_value = "{:.0f}".format(value)
1925
1926
    # cut off trailing zeros
1927
    if "." in str_value:
1928
        str_value = str_value.rstrip("0").rstrip(".")
1929
1930
    return str_value
1931
1932
1933
def to_searchable_text_metadata(value):
1934
    """Parse the given metadata value to searchable text
1935
1936
    :param value: The raw value of the metadata column
1937
    :returns: Searchable and translated unicode value or None
1938
    """
1939
    if not value:
1940
        return u""
1941
    if value is Missing.Value:
1942
        return u""
1943
    if is_uid(value):
1944
        return u""
1945
    if isinstance(value, (bool)):
1946
        return u""
1947
    if isinstance(value, (list, tuple)):
1948
        values = map(to_searchable_text_metadata, value)
1949
        values = filter(None, values)
1950
        return " ".join(values)
1951
    if isinstance(value, dict):
1952
        return to_searchable_text_metadata(value.values())
1953
    if is_date(value):
1954
        from senaite.core.api.dtime import date_to_string
1955
        return date_to_string(value, "%Y-%m-%d")
1956
    if is_at_content(value):
1957
        return to_searchable_text_metadata(get_title(value))
1958
    if not isinstance(value, six.string_types):
1959
        value = str(value)
1960
    return safe_unicode(value)
1961
1962
1963
def get_registry_record(name, default=None):
1964
    """Returns the value of a registry record
1965
1966
    :param name: [required] name of the registry record
1967
    :type name: str
1968
    :param default: The value returned if the record is not found
1969
    :type default: anything
1970
    :returns: value of the registry record
1971
    """
1972
    return ploneapi.portal.get_registry_record(name, default=default)
1973
1974
1975
def to_display_list(pairs, sort_by="key", allow_empty=True):
1976
    """Create a Plone DisplayList from list items
1977
1978
    :param pairs: list of key, value pairs
1979
    :param sort_by: Sort the items either by key or value
1980
    :param allow_empty: Allow to select an empty value
1981
    :returns: Plone DisplayList
1982
    """
1983
    dl = DisplayList()
1984
1985
    if isinstance(pairs, six.string_types):
1986
        pairs = [pairs, pairs]
1987
    for pair in pairs:
1988
        # pairs is a list of lists -> add each pair
1989
        if isinstance(pair, (tuple, list)):
1990
            dl.add(*pair)
1991
        # pairs is just a single pair -> add it and stop
1992
        if isinstance(pair, six.string_types):
1993
            dl.add(*pairs)
1994
            break
1995
1996
    # add the empty option
1997
    if allow_empty:
1998
        dl.add("", "")
1999
2000
    # sort by key/value
2001
    if sort_by == "key":
2002
        dl = dl.sortedByKey()
2003
    elif sort_by == "value":
2004
        dl = dl.sortedByValue()
2005
2006
    return dl
2007
2008
2009
def text_to_html(text, wrap="p", encoding="utf8"):
2010
    """Convert `\n` sequences in the text to HTML `\n`
2011
2012
    :param text: Plain text to convert
2013
    :param wrap: Toggle to wrap the text in a
2014
    :returns: HTML converted and encoded text
2015
    """
2016
    if not text:
2017
        return ""
2018
    # handle text internally as unicode
2019
    text = safe_unicode(text)
2020
    # replace newline characters with HTML entities
2021
    html = text.replace("\n", "<br/>")
2022
    if wrap:
2023
        html = u"<{tag}>{html}</{tag}>".format(
2024
            tag=wrap, html=html)
2025
    # return encoded html
2026
    return html.encode(encoding)
2027
2028
2029
def safe_unicode(value, default=u""):
2030
    """Safely convert a value to a unicode string
2031
2032
    :param value: Value to be converted to unicode
2033
    :param default: Default value if conversion fails
2034
    :returns: Unicode string
2035
    """
2036
    if value is None:
2037
        return default
2038
2039
    # If already unicode, return as is
2040
    if isinstance(value, six.text_type):
2041
        return value
2042
2043
    try:
2044
        # First convert to str (handles int, long, etc.)
2045
        try:
2046
            value = str(value)
2047
        except UnicodeDecodeError:
2048
            # If value is bytes with non-ASCII chars
2049
            value = value.encode("utf8")
2050
2051
        # Convert to unicode
2052
        if isinstance(value, six.binary_type):
2053
            return value.decode("utf8")
2054
        return six.text_type(value)
2055
    except Exception:
2056
        return default
2057
2058
2059
def to_utf8(string, default=_marker):
2060
    """Encode string to UTF8
2061
2062
    :param string: String to be encoded to UTF8
2063
    :returns: UTF8 encoded string
2064
    """
2065
    if not isinstance(string, six.string_types):
2066
        if default is _marker:
2067
            fail("Expected string type, got '%s'" % type(string))
2068
        return default
2069
    return safe_unicode(string).encode("utf8")
2070
2071
2072
def is_temporary(obj):
2073
    """Returns whether the given object is temporary or not
2074
2075
    :param obj: the object to evaluate
2076
    :returns: True if the object is temporary
2077
    """
2078
    if ITemporaryObject.providedBy(obj):
2079
        return True
2080
2081
    obj_id = getattr(aq_base(obj), "id", None)
2082
    if obj_id is None or UID_RX.match(obj_id):
2083
        return True
2084
2085
    parent = aq_parent(aq_inner(obj))
2086
    if not parent:
2087
        return True
2088
2089
    parent_id = getattr(aq_base(parent), "id", None)
2090
    if parent_id is None or UID_RX.match(parent_id):
2091
        return True
2092
2093
    # Checks to see if we are created inside the portal_factory.
2094
    # This might also happen for DX types in senaite.databox!
2095
    meta_type = getattr(aq_base(parent), "meta_type", "")
2096
    if meta_type == "TempFolder":
2097
        return True
2098
2099
    return False
2100
2101
2102
def mark_temporary(brain_or_object):
2103
    """Mark the object as temporary
2104
    """
2105
    obj = get_object(brain_or_object)
2106
    alsoProvides(obj, ITemporaryObject)
2107
2108
2109
def unmark_temporary(brain_or_object):
2110
    """Unmark the object as temporary
2111
    """
2112
    obj = get_object(brain_or_object)
2113
    noLongerProvides(obj, ITemporaryObject)
2114
2115
2116
def is_string(thing):
2117
    """Checks if the passed in object is a string type
2118
2119
    :param thing: object to test
2120
    :returns: True if the object is a string
2121
    """
2122
    return isinstance(thing, six.string_types)
2123
2124
2125
def is_list(thing):
2126
    """Checks if the passed in object is a list type
2127
2128
    :param thing: object to test
2129
    :returns: True if the object is a list
2130
    """
2131
    return isinstance(thing, list)
2132
2133
2134
def is_list_iterable(thing):
2135
    """Checks if the passed in object can be iterated like a list
2136
2137
    :param thing: object to test
2138
    :returns: True if the object is a list, tuple or set
2139
    """
2140
    return isinstance(thing, (list, tuple, set))
2141
2142
2143
def parse_json(thing, default=""):
2144
    """Parse from JSON
2145
2146
    :param thing: thing to parse
2147
    :param default: value to return if cannot parse
2148
    :returns: the object representing the JSON or default
2149
    """
2150
    try:
2151
        return json.loads(thing)
2152
    except (TypeError, ValueError):
2153
        return default
2154
2155
2156
def to_list(value):
2157
    """Converts the value to a list
2158
2159
    :param value: the value to be represented as a list
2160
    :returns: a list that represents or contains the value
2161
    """
2162
    if is_string(value):
2163
        val = parse_json(value)
2164
        if isinstance(val, (list, tuple, set)):
2165
            value = val
2166
    if not isinstance(value, (list, tuple, set)):
2167
        value = [value]
2168
    return list(value)
2169
2170
2171
def validate(obj):
2172
    """Validates the full object
2173
2174
    :param obj: the object to validate the data against
2175
    :type obj: ATContentType/DexterityContentType
2176
    :returns: a dict with field names as keys and errors as values
2177
    """
2178
    if is_at_content(obj):
2179
        return obj.validate(data=True)
2180
2181
    if not is_dexterity_content(obj):
2182
        raise TypeError("%r is not supported" % type(obj))
2183
2184
    def is_string_field(field):
2185
        """Check if the field is a string field
2186
        """
2187
        return isinstance(field, (StringField)) or \
2188
            getattr(field, "_type", None) in [str]
2189
2190
    errors = {}
2191
    obj_data = {}
2192
2193
    # iterate through object fields and validate each
2194
    fields = get_fields(obj)
2195
    for field_name, field in fields.items():
2196
2197
        # extract the field value
2198
        value = getattr(obj, field_name, None)
2199
        if callable(value):
2200
            # Handle callable values, e.g. effective, expired etc.
2201
            value = value()
2202
        if isinstance(value, six.string_types):
2203
            value = safe_unicode(value)
2204
        if is_string_field(field):
2205
            # Provide UTF-8 encoded strings for StringField or NativeString.
2206
            # This prevents a WrongType error when the value is unicode instead
2207
            # of str. For example, the field used to store an object's ID is a
2208
            # StringField, which only accepts str. However, in Senaite, values
2209
            # are stored as unicode and returned as UTF-8 to maintain AT legacy
2210
            # behavior.
2211
            # Note that this "trick" only applies to top-level fields: a
2212
            # WrongContainedType error will be raised if the field is, e.g., a
2213
            # DataGridField whose subfield inherits from NativeString.
2214
            missing_value = getattr(field, "missing_value", None)
2215
            value = to_utf8(value, default=None) or missing_value
2216
2217
        # update obj_data for later use with invariants
2218
        obj_data[field_name] = value
2219
2220
        if field_name in SKIP_VALIDATION_FIELDS:
2221
            continue
2222
2223
        try:
2224
            field.validate(value)
2225
        except RequiredMissing:
2226
            errors[field_name] = "required field"
2227
        except WrongType:
2228
            # ignore wrong type errors if the field is not required and unset
2229
            if value is not None:
2230
                errors[field_name] = "wrong type"
2231
        except Invalid as ex:
2232
            errors[field_name] = translate(ex.message) or type(ex).__name__
2233
2234
    # validate invariants from schema
2235
    sch = get_schema(obj)
2236
    try:
2237
        sch.validateInvariants(ValidatorData(sch, obj_data, obj))
2238
    except Invalid as ex:
2239
        errors[sch.getName()] = translate(ex.message) or type(ex).__name__
2240
2241
    # validate invariants from behaviors
2242
    for behavior_id in get_behaviors(obj):
2243
        behavior = lookup_behavior_registration(behavior_id)
2244
        sch = behavior.interface
2245
        try:
2246
            sch.validateInvariants(ValidatorData(sch, obj_data, obj))
2247
        except Invalid as ex:
2248
            errors[behavior_id] = translate(ex.message) or type(ex).__name__
2249
2250
    return errors
2251
2252
2253
def get_portal_types():
2254
    """Returns a list with the registered portal types
2255
2256
    :returns: List of portal type names
2257
    :rtype: list
2258
    """
2259
    types_tool = get_tool("portal_types")
2260
    return types_tool.listContentTypes()
2261
2262
2263
def is_valid_id(thing, container=None):
2264
    """Checks if is a valid ID candidate based on the following conditions:
2265
2266
      - Contains only letters, numbers, hyphens ('-'), or underscores ('_').
2267
      - Starts with a letter or a number.
2268
      - Ends with a letter or a number.
2269
      - Has a minimum length of 3 characters.
2270
      - Does not match reserved words (e.g., 'REQUEST') or portal type names.
2271
2272
    If a container is provided, it also verifies the container does not have
2273
    any attribute or function with same id.
2274
2275
    :param id: the id to validate
2276
    :type id: str
2277
    :returns: True if the id meets all the conditions, False otherwise.
2278
    """
2279
    id_rx = re.compile(r"^[a-z0-9][a-z0-9_\-]+[a-z0-9]$")
2280
    illegal = re.compile(r"^(aq_|manage|request).*")
2281
2282
    if not is_string(thing):
2283
        return False
2284
2285
    # convert to lower to simplify regex
2286
    lower = thing.lower()
2287
2288
    # check length and characters
2289
    if not id_rx.match(lower):
2290
        return False
2291
2292
    # check for reserved/illegal word
2293
    if illegal.match(lower):
2294
        return False
2295
2296
    # check for portal type names
2297
    portal_types = get_portal_types()
2298
    if thing in portal_types:
2299
        return False
2300
2301
    # check for container attributes and functions
2302
    if container and hasattr(container, thing):
2303
        return False
2304
2305
    return True
2306