Passed
Push — 2.x ( 2a1b20...d626d0 )
by Jordi
07:27
created

bika.lims.api.is_valid_id()   B

Complexity

Conditions 7

Size

Total Lines 43
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 43
rs 8
c 0
b 0
f 0
cc 7
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from AccessControl.Permissions import copy_or_move as CopyOrMove
33
from Acquisition import aq_base
34
from Acquisition import aq_inner
35
from Acquisition import aq_parent
36
from bika.lims import logger
37
from bika.lims.interfaces import IClient
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from OFS.event import ObjectWillBeMovedEvent
41
from plone import api as ploneapi
42
from plone.api.exc import InvalidParameterError
43
from plone.app.layout.viewlets.content import ContentHistoryView
44
from plone.behavior.interfaces import IBehaviorAssignable
45
from plone.behavior.registration import lookup_behavior_registration
46
from plone.dexterity.interfaces import IDexterityContent
47
from plone.dexterity.schema import SchemaInvalidatedEvent
48
from plone.dexterity.utils import addContentToContainer
49
from plone.dexterity.utils import createContent
50
from plone.dexterity.utils import resolveDottedName
51
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
52
from plone.i18n.normalizer.interfaces import IIDNormalizer
53
from plone.memoize.volatile import DontCache
54
from Products.Archetypes.atapi import DisplayList
55
from Products.Archetypes.BaseObject import BaseObject
56
from Products.Archetypes.event import ObjectInitializedEvent
57
from Products.Archetypes.public import StringField
58
from Products.Archetypes.utils import mapply
59
from Products.CMFCore.interfaces import IFolderish
60
from Products.CMFCore.interfaces import ISiteRoot
61
from Products.CMFCore.permissions import DeleteObjects
62
from Products.CMFCore.permissions import ModifyPortalContent
63
from Products.CMFCore.permissions import View
64
from Products.CMFCore.utils import getToolByName
65
from Products.CMFCore.WorkflowCore import WorkflowException
66
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
67
from Products.CMFPlone.utils import _createObjectByType
68
from Products.CMFPlone.utils import base_hasattr
69
from Products.CMFPlone.utils import safe_unicode
70
from Products.PlonePAS.tools.memberdata import MemberData
71
from Products.ZCatalog.interfaces import ICatalogBrain
72
from senaite.core.interfaces import IContact
73
from senaite.core.interfaces import IContacts
74
from senaite.core.interfaces import ITemporaryObject
75
from z3c.form.validator import Data as ValidatorData
76
from zope import globalrequest
77
from zope.annotation.interfaces import IAttributeAnnotatable
78
from zope.component import getUtility
79
from zope.component import queryMultiAdapter
80
from zope.container.contained import notifyContainerModified
81
from zope.event import notify
82
from zope.i18n import translate
83
from zope.interface import Invalid
84
from zope.interface import alsoProvides
85
from zope.interface import directlyProvides
86
from zope.interface import noLongerProvides
87
from zope.lifecycleevent import ObjectMovedEvent
88
from zope.publisher.browser import TestRequest
89
from zope.schema import getFieldsInOrder
90
from zope.schema.interfaces import RequiredMissing
91
from zope.schema.interfaces import WrongType
92
from zope.security.interfaces import Unauthorized
93
94
"""SENAITE LIMS Framework API
95
96
Please see tests/doctests/API.rst for documentation.
97
98
Architecural Notes:
99
100
Please add only functions that do a single thing for a single object.
101
102
Good: `def get_foo(brain_or_object)`
103
Bad:  `def get_foos(list_of_brain_objects)`
104
105
Why?
106
107
Because it makes things more complex. You can always use a pattern like this to
108
achieve the same::
109
110
    >>> foos = map(get_foo, list_of_brain_objects)
111
112
Please add for all functions a descriptive test in tests/doctests/API.rst.
113
114
Thanks.
115
"""
116
117
_marker = object()
118
119
UID_RX = re.compile("[a-z0-9]{32}$")
120
121
UID_CATALOG = "uid_catalog"
122
PORTAL_CATALOG = "portal_catalog"
123
124
# fields that are not validated by the API
125
SKIP_VALIDATION_FIELDS = [
126
    "allow_discussion",
127
    "contributors",
128
    "creators",
129
    "effective",
130
    "exclude_from_nav",
131
    "expires",
132
    "language",
133
    "nextPreviousEnabled",
134
    "relatedItems",
135
    "rights",
136
    "subjects",
137
]
138
139
class APIError(Exception):
140
    """Base exception class for bika.lims errors."""
141
142
143
def get_portal():
144
    """Get the portal object
145
146
    :returns: Portal object
147
    """
148
    return ploneapi.portal.getSite()
149
150
151
def get_setup():
152
    """Fetch the `bika_setup` folder.
153
    """
154
    portal = get_portal()
155
    return portal.get("bika_setup")
156
157
158
def get_bika_setup():
159
    """Fetch the `bika_setup` folder.
160
    """
161
    return get_setup()
162
163
164
def get_senaite_setup():
165
    """Fetch the new DX `setup` folder.
166
    """
167
    portal = get_portal()
168
    return portal.get("setup")
169
170
171
def create(container, portal_type, *args, **kwargs):
172
    """Creates an object in Bika LIMS
173
174
    This code uses most of the parts from the TypesTool
175
    see: `Products.CMFCore.TypesTool._constructInstance`
176
177
    :param container: container
178
    :type container: ATContentType/DexterityContentType/CatalogBrain
179
    :param portal_type: The portal type to create, e.g. "Client"
180
    :type portal_type: string
181
    :param title: The title for the new content object
182
    :type title: string
183
    :returns: The new created object
184
    """
185
    from bika.lims.utils import tmpID
186
187
    tmp_id = tmpID()
188
    id = kwargs.pop("id", "")
189
    title = kwargs.pop("title", "")
190
191
    # get the fti
192
    types_tool = get_tool("portal_types")
193
    fti = types_tool.getTypeInfo(portal_type)
194
195
    if fti.product:
196
        # create the AT object
197
        obj = _createObjectByType(portal_type, container, id or tmp_id)
198
        # update the object with values
199
        edit(obj, check_permissions=False, title=title, **kwargs)
200
        # auto-id if required
201
        if obj._at_rename_after_creation:
202
            obj._renameAfterCreation(check_auto_id=True)
203
        # we are no longer under creation
204
        obj.unmarkCreationFlag()
205
        # notify that the object was created
206
        notify(ObjectInitializedEvent(obj))
207
    else:
208
        content = createContent(portal_type, **kwargs)
209
        content.id = id
210
        content.title = title
211
        obj = addContentToContainer(container, content)
212
213
    return obj
214
215
216
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
217
    """Creates a copy of the source object. If container is None, creates the
218
    copy inside the same container as the source. If portal_type is specified,
219
    creates a new object of this type, and copies the values from source fields
220
    to the destination object. Field values sent as kwargs have priority over
221
    the field values from source.
222
223
    :param source: object from which create a copy
224
    :type source: ATContentType/DexterityContentType/CatalogBrain
225
    :param container: destination container
226
    :type container: ATContentType/DexterityContentType/CatalogBrain
227
    :param portal_type: destination portal type
228
    :returns: The new created object
229
    """
230
    # Prevent circular dependencies
231
    from security import check_permission
232
233
    # Use same container as source unless explicitly set
234
    source = get_object(source)
235
    if not container:
236
        container = get_parent(source)
237
238
    # Use same portal type as source unless explicitly set
239
    if not portal_type:
240
        portal_type = get_portal_type(source)
241
242
    # Extend the fields to skip with defaults
243
    skip = kwargs.pop("skip", [])
244
    skip = set(skip)
245
    skip.update([
246
        "Products.Archetypes.Field.ComputedField",
247
        "UID",
248
        "id",
249
        "allowDiscussion",
250
        "contributors",
251
        "creation_date",
252
        "creators",
253
        "effectiveDate",
254
        "expirationDate",
255
        "language",
256
        "location",
257
        "modification_date",
258
        "rights",
259
        "subject",
260
    ])
261
    # Build a dict for complexity reduction
262
    skip = dict([(item, True) for item in skip])
263
264
    # Update kwargs with the field values to copy from source
265
    fields = get_fields(source)
266
    for field_name, field in fields.items():
267
        # Prioritize field values passed as kwargs
268
        if field_name in kwargs:
269
            continue
270
        # Skip framework internal fields by name
271
        if skip.get(field_name, False):
272
            continue
273
        # Skip fields of non-suitable types
274
        if hasattr(field, "getType") and skip.get(field.getType(), False):
275
            continue
276
        # Skip readonly fields
277
        if getattr(field, "readonly", False):
278
            continue
279
        # Skip non-readable fields
280
        perm = getattr(field, "read_permission", View)
281
        if perm and not check_permission(perm, source):
282
            continue
283
284
        # do not wake-up objects unnecessarily
285
        if hasattr(field, "getRaw"):
286
            field_value = field.getRaw(source)
287
        elif hasattr(field, "get_raw"):
288
            field_value = field.get_raw(source)
289
        elif hasattr(field, "getAccessor"):
290
            accessor = field.getAccessor(source)
291
            field_value = accessor()
292
        else:
293
            field_value = field.get(source)
294
295
        # Do a hard copy of value if mutable type
296
        if isinstance(field_value, (list, dict, set)):
297
            field_value = copy.deepcopy(field_value)
298
        kwargs.update({field_name: field_value})
299
300
    # Create a copy
301
    return create(container, portal_type, *args, **kwargs)
302
303
304
def edit(obj, check_permissions=True, **kwargs):
305
    """Updates the values of object fields with the new values passed-in
306
    """
307
    # Prevent circular dependencies
308
    from security import check_permission
309
    fields = get_fields(obj)
310
    for name, value in kwargs.items():
311
        field = fields.get(name, None)
312
        if not field:
313
            continue
314
315
        # cannot update readonly fields
316
        readonly = getattr(field, "readonly", False)
317
        if readonly:
318
            raise ValueError("Field '{}' is readonly".format(name))
319
320
        # check field writable permission
321
        if check_permissions:
322
            perm = getattr(field, "write_permission", ModifyPortalContent)
323
            if perm and not check_permission(perm, obj):
324
                raise Unauthorized("Field '{}' is not writeable".format(name))
325
326
        # Set the value
327
        if hasattr(field, "getMutator"):
328
            mutator = field.getMutator(obj)
329
            mapply(mutator, value)
330
        else:
331
            field.set(obj, value)
332
333
334
def move_object(obj, destination, check_constraints=True):
335
    """Moves the object to the destination folder
336
337
    This function has the same effect as:
338
339
        id = obj.getId()
340
        cp = origin.manage_cutObjects(id)
341
        destination.manage_pasteObjects(cp)
342
343
    but with slightly better performance. The code is mostly grabbed from
344
    OFS.CopySupport.CopyContainer_pasteObjects
345
346
    :param obj: object to move to destination
347
    :type obj: ATContentType/DexterityContentType/CatalogBrain/UID
348
    :param destination: destination container
349
    :type destination: ATContentType/DexterityContentType/CatalogBrain/UID
350
    :param check_constraints: constraints and permissions must be checked
351
    :type check_constraints: bool
352
    :returns: The moved object
353
    """
354
    # prevent circular dependencies
355
    from bika.lims.api.security import check_permission
356
357
    obj = get_object(obj)
358
    destination = get_object(destination)
359
360
    # make sure the object is not moved into itself
361
    if obj == destination:
362
        raise ValueError("Cannot move object into itself: {}".format(obj))
363
364
    # do nothing if destination is the same as origin
365
    origin = get_parent(obj)
366
    if origin == destination:
367
        return obj
368
369
    if check_constraints:
370
371
        # check origin object has CopyOrMove permission
372
        if not check_permission(CopyOrMove, obj):
373
            raise Unauthorized("Cannot move {}".format(obj))
374
375
        # check if portal type is allowed in destination object
376
        portal_type = get_portal_type(obj)
377
        pt = get_tool("portal_types")
378
        ti = pt.getTypeInfo(destination)
379
        if not ti.allowType(portal_type):
380
            raise ValueError("Disallowed subobject type: %s" % portal_type)
381
382
    id = get_id(obj)
383
384
    # notify that the object will be copied to destination
385
    obj._notifyOfCopyTo(destination, op=1)  # noqa
386
387
    # notify that the object will be moved to destination
388
    notify(ObjectWillBeMovedEvent(obj, origin, id, destination, id))
389
390
    # effectively move the object from origin to destination
391
    delete(obj, check_permissions=check_constraints, suppress_events=True)
392
    obj = aq_base(obj)
393
    destination._setObject(id, obj, set_owner=0, suppress_events=True)  # noqa
394
    obj = destination._getOb(id)  # noqa
395
396
    # since we used "suppress_events=True", we need to manually notify that the
397
    # object was moved and containers modified. This also makes the objects to
398
    # be re-catalogued
399
    notify(ObjectMovedEvent(obj, origin, id, destination, id))
400
    notifyContainerModified(origin)
401
    notifyContainerModified(destination)
402
403
    # make ownership implicit if possible, so it acquires the permissions from
404
    # the container
405
    obj.manage_changeOwnershipType(explicit=0)
406
407
    return obj
408
409
410
def uncatalog_object(obj, recursive=False):
411
    """Un-catalog the object from all catalogs
412
413
    :param obj: object to un-catalog
414
    :param recursive: recursively uncatalog all child objects
415
    :type obj: ATContentType/DexterityContentType
416
    """
417
    # un-catalog from registered catalogs
418
    obj.unindexObject()
419
    # explicitly un-catalog from uid_catalog
420
    uid_catalog = get_tool("uid_catalog")
421
    # the uids of uid_catalog are relative paths to portal root
422
    # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
423
    url = "/".join(obj.getPhysicalPath()[2:])
424
    uid_catalog.uncatalog_object(url)
425
426
    if recursive:
427
        for child in obj.objectValues():
428
            uncatalog_object(child, recursive=recursive)
429
430
431
def catalog_object(obj, recursive=False):
432
    """Re-catalog the object
433
434
    :param obj: object to catalog
435
    :param recursive: recursively catalog all child objects
436
    :type obj: ATContentType/DexterityContentType
437
    """
438
    if is_at_content(obj):
439
        # explicitly re-catalog AT types at uid_catalog (DX types are
440
        # automatically reindexed in UID catalog on reindexObject)
441
        uc = get_tool("uid_catalog")
442
        # the uids of uid_catalog are relative paths to portal root
443
        # see Products.Archetypes.UIDCatalog.UIDResolver.catalog_object
444
        url = "/".join(obj.getPhysicalPath()[2:])
445
        uc.catalog_object(obj, url)
446
    obj.reindexObject()
447
448
    if recursive:
449
        for child in obj.objectValues():
450
            catalog_object(child, recursive=recursive)
451
452
453
def delete(obj, check_permissions=True, suppress_events=False):
454
    """Deletes the given object
455
456
    :param obj: object to un-catalog
457
    :param check_permissions: whether delete permission must be checked
458
    :param suppress_events: whether ondelete events have to be fired
459
    :type obj: ATContentType/DexterityContentType
460
    """
461
    from security import check_permission
462
    if check_permissions and not check_permission(DeleteObjects, obj):
463
        raise Unauthorized("Do not have permissions to remove this object")
464
465
    # un-catalog the object from all catalogs (uid_catalog included)
466
    uncatalog_object(obj)
467
    # delete the object
468
    parent = get_parent(obj)
469
    parent._delObject(obj.getId(), suppress_events=suppress_events)
470
471
472
def get_tool(name, context=None, default=_marker):
473
    """Get a portal tool by name
474
475
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
476
    :type name: string
477
    :param context: A portal object
478
    :type context: ATContentType/DexterityContentType/CatalogBrain
479
    :returns: Portal Tool
480
    """
481
482
    # Try first with the context
483
    if context is not None:
484
        try:
485
            context = get_object(context)
486
            return getToolByName(context, name)
487
        except (APIError, AttributeError) as e:
488
            # https://github.com/senaite/bika.lims/issues/396
489
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
490
                        "-> falling back to plone.api.portal.get_tool('{}')"
491
                        .format(repr(context), name, repr(e), name))
492
            return get_tool(name, default=default)
493
494
    # Try with the plone api
495
    try:
496
        return ploneapi.portal.get_tool(name)
497
    except InvalidParameterError:
498
        if default is not _marker:
499
            if isinstance(default, six.string_types):
500
                return get_tool(default)
501
            return default
502
        fail("No tool named '%s' found." % name)
503
504
505
def fail(msg=None):
506
    """API LIMS Error
507
    """
508
    if msg is None:
509
        msg = "Reason not given."
510
    raise APIError("{}".format(msg))
511
512
513
def is_object(brain_or_object):
514
    """Check if the passed in object is a supported portal content object
515
516
    :param brain_or_object: A single catalog brain or content object
517
    :type brain_or_object: Portal Object
518
    :returns: True if the passed in object is a valid portal content
519
    """
520
    if is_portal(brain_or_object):
521
        return True
522
    if is_supermodel(brain_or_object):
523
        return True
524
    if is_at_content(brain_or_object):
525
        return True
526
    if is_dexterity_content(brain_or_object):
527
        return True
528
    if is_brain(brain_or_object):
529
        return True
530
    return False
531
532
533
def get_object(brain_object_uid, default=_marker):
534
    """Get the full content object
535
536
    :param brain_object_uid: A catalog brain or content object or uid
537
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
538
    /CatalogBrain/basestring
539
    :returns: The full object
540
    """
541
    if is_uid(brain_object_uid):
542
        return get_object_by_uid(brain_object_uid, default=default)
543
    elif is_supermodel(brain_object_uid):
544
        return brain_object_uid.instance
545
    if not is_object(brain_object_uid):
546
        if default is _marker:
547
            fail("{} is not supported.".format(repr(brain_object_uid)))
548
        return default
549
    if is_brain(brain_object_uid):
550
        return brain_object_uid.getObject()
551
    return brain_object_uid
552
553
554
def is_portal(brain_or_object):
555
    """Checks if the passed in object is the portal root object
556
557
    :param brain_or_object: A single catalog brain or content object
558
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
559
    :returns: True if the object is the portal root object
560
    :rtype: bool
561
    """
562
    return ISiteRoot.providedBy(brain_or_object)
563
564
565
def is_brain(brain_or_object):
566
    """Checks if the passed in object is a portal catalog brain
567
568
    :param brain_or_object: A single catalog brain or content object
569
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
570
    :returns: True if the object is a catalog brain
571
    :rtype: bool
572
    """
573
    return ICatalogBrain.providedBy(brain_or_object)
574
575
576
def is_supermodel(brain_or_object):
577
    """Checks if the passed in object is a supermodel
578
579
    :param brain_or_object: A single catalog brain or content object
580
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
581
    :returns: True if the object is a catalog brain
582
    :rtype: bool
583
    """
584
    # avoid circular imports
585
    from senaite.app.supermodel.interfaces import ISuperModel
586
    return ISuperModel.providedBy(brain_or_object)
587
588
589
def is_dexterity_content(brain_or_object):
590
    """Checks if the passed in object is a dexterity content type
591
592
    :param brain_or_object: A single catalog brain or content object
593
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
594
    :returns: True if the object is a dexterity content type
595
    :rtype: bool
596
    """
597
    return IDexterityContent.providedBy(brain_or_object)
598
599
600
def is_at_content(brain_or_object):
601
    """Checks if the passed in object is an AT content type
602
603
    :param brain_or_object: A single catalog brain or content object
604
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
605
    :returns: True if the object is an AT content type
606
    :rtype: bool
607
    """
608
    return isinstance(brain_or_object, BaseObject)
609
610
611
def is_dx_type(portal_type):
612
    """Checks if the portal type is DX based
613
614
    :param portal_type: The portal type name to check
615
    :returns: True if the portal type is DX based
616
    """
617
    portal_types = get_tool("portal_types")
618
    fti = portal_types.getTypeInfo(portal_type)
619
    if fti.product:
620
        return False
621
    return True
622
623
624
def is_at_type(portal_type):
625
    """Checks if the portal type is AT based
626
627
    :param portal_type: The portal type name to check
628
    :returns: True if the portal type is AT based
629
    """
630
    return not is_dx_type(portal_type)
631
632
633
def is_folderish(brain_or_object):
634
    """Checks if the passed in object is folderish
635
636
    :param brain_or_object: A single catalog brain or content object
637
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
638
    :returns: True if the object is folderish
639
    :rtype: bool
640
    """
641
    if hasattr(brain_or_object, "is_folderish"):
642
        if callable(brain_or_object.is_folderish):
643
            return brain_or_object.is_folderish()
644
        return brain_or_object.is_folderish
645
    return IFolderish.providedBy(get_object(brain_or_object))
646
647
648
def get_portal_type(brain_or_object):
649
    """Get the portal type for this object
650
651
    :param brain_or_object: A single catalog brain or content object
652
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
653
    :returns: Portal type
654
    :rtype: string
655
    """
656
    if not is_object(brain_or_object):
657
        fail("{} is not supported.".format(repr(brain_or_object)))
658
    return brain_or_object.portal_type
659
660
661
def get_schema(brain_or_object):
662
    """Get the schema of the content
663
664
    :param brain_or_object: A single catalog brain or content object
665
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
666
    :returns: Schema object
667
    """
668
    obj = get_object(brain_or_object)
669
    if is_portal(obj):
670
        fail("get_schema can't return schema of portal root")
671
    if is_dexterity_content(obj):
672
        pt = get_tool("portal_types")
673
        fti = pt.getTypeInfo(obj.portal_type)
674
        return fti.lookupSchema()
675
    if is_at_content(obj):
676
        return obj.Schema()
677
    fail("{} has no Schema.".format(brain_or_object))
678
679
680
def get_behaviors(portal_type):
681
    """List all behaviors
682
683
    :param portal_type: DX portal type name
684
    """
685
    portal_types = get_tool("portal_types")
686
    fti = portal_types.getTypeInfo(portal_type)
687
    if fti.product:
688
        raise TypeError("Expected DX type, got AT type instead.")
689
    return fti.behaviors
690
691
692
def enable_behavior(portal_type, behavior_id):
693
    """Enable behavior
694
695
    :param portal_type: DX portal type name
696
    :param behavior_id: The behavior to enable
697
    """
698
    portal_types = get_tool("portal_types")
699
    fti = portal_types.getTypeInfo(portal_type)
700
    if fti.product:
701
        raise TypeError("Expected DX type, got AT type instead.")
702
703
    if behavior_id not in fti.behaviors:
704
        fti.behaviors += (behavior_id, )
705
        # invalidate schema cache
706
        notify(SchemaInvalidatedEvent(portal_type))
707
708
709
def disable_behavior(portal_type, behavior_id):
710
    """Disable behavior
711
712
    :param portal_type: DX portal type name
713
    :param behavior_id: The behavior to disable
714
    """
715
    portal_types = get_tool("portal_types")
716
    fti = portal_types.getTypeInfo(portal_type)
717
    if fti.product:
718
        raise TypeError("Expected DX type, got AT type instead.")
719
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
720
    # invalidate schema cache
721
    notify(SchemaInvalidatedEvent(portal_type))
722
723
724
def get_fields(brain_or_object):
725
    """Get a name to field mapping of the object
726
727
    :param brain_or_object: A single catalog brain or content object
728
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
729
    :returns: Mapping of name -> field
730
    :rtype: OrderedDict
731
    """
732
    obj = get_object(brain_or_object)
733
    schema = get_schema(obj)
734
    if is_dexterity_content(obj):
735
        # get the fields directly provided by the interface
736
        fields = getFieldsInOrder(schema)
737
        # append the fields coming from behaviors
738
        behavior_assignable = IBehaviorAssignable(obj)
739
        if behavior_assignable:
740
            behaviors = behavior_assignable.enumerateBehaviors()
741
            for behavior in behaviors:
742
                fields.extend(getFieldsInOrder(behavior.interface))
743
        return OrderedDict(fields)
744
    return OrderedDict(schema)
745
746
747
def get_id(brain_or_object):
748
    """Get the Plone ID for this object
749
750
    :param brain_or_object: A single catalog brain or content object
751
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
752
    :returns: Plone ID
753
    :rtype: string
754
    """
755
    if is_brain(brain_or_object):
756
        if base_hasattr(brain_or_object, "getId"):
757
            return brain_or_object.getId
758
        if base_hasattr(brain_or_object, "id"):
759
            return brain_or_object.id
760
    return get_object(brain_or_object).getId()
761
762
763
def get_title(brain_or_object):
764
    """Get the Title for this object
765
766
    :param brain_or_object: A single catalog brain or content object
767
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
768
    :returns: Title
769
    :rtype: string
770
    """
771
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
772
        return brain_or_object.Title
773
    return get_object(brain_or_object).Title()
774
775
776
def get_description(brain_or_object):
777
    """Get the Title for this object
778
779
    :param brain_or_object: A single catalog brain or content object
780
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
781
    :returns: Title
782
    :rtype: string
783
    """
784
    if is_brain(brain_or_object) \
785
            and base_hasattr(brain_or_object, "Description"):
786
        return brain_or_object.Description
787
    return get_object(brain_or_object).Description()
788
789
790
def get_uid(brain_or_object):
791
    """Get the Plone UID for this object
792
793
    :param brain_or_object: A single catalog brain or content object or an UID
794
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
795
    :returns: Plone UID
796
    :rtype: string
797
    """
798
    if is_uid(brain_or_object):
799
        return brain_or_object
800
    if is_portal(brain_or_object):
801
        return '0'
802
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
803
        return brain_or_object.UID
804
    return get_object(brain_or_object).UID()
805
806
807
def get_url(brain_or_object):
808
    """Get the absolute URL for this object
809
810
    :param brain_or_object: A single catalog brain or content object
811
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
812
    :returns: Absolute URL
813
    :rtype: string
814
    """
815
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
816
        return brain_or_object.getURL()
817
    return get_object(brain_or_object).absolute_url()
818
819
820
def get_icon(thing, html_tag=True):
821
    """Get the icon of the content object
822
823
    :param thing: A single catalog brain, content object or portal_type
824
    :type thing: ATContentType/DexterityContentType/CatalogBrain/String
825
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
826
    :type html_tag: bool
827
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
828
    :rtype: string
829
    """
830
    portal_type = thing
831
    if is_object(thing):
832
        portal_type = get_portal_type(thing)
833
834
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
835
    # work for Contents coming from other catalogs than the
836
    # `portal_catalog`
837
    portal_types = get_tool("portal_types")
838
    fti = portal_types.getTypeInfo(portal_type)
839
    if not fti:
840
        fail("No type info for {}".format(repr(thing)))
841
    icon = fti.getIcon()
842
    if not icon:
843
        return ""
844
    url = "%s/%s" % (get_url(get_portal()), icon)
845
    if not html_tag:
846
        return url
847
848
    # build the img element
849
    if is_object(thing):
850
        title = get_title(thing)
851
    else:
852
        title = fti.Title()
853
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'
854
    return tag.format(url=url, title=title)
855
856
857
def get_object_by_uid(uid, default=_marker):
858
    """Find an object by a given UID
859
860
    :param uid: The UID of the object to find
861
    :type uid: string
862
    :returns: Found Object or None
863
    """
864
865
    # nothing to do here
866
    if not uid:
867
        if default is not _marker:
868
            return default
869
        fail("get_object_by_uid requires UID as first argument; got {} instead"
870
             .format(uid))
871
872
    # we defined the portal object UID to be '0'::
873
    if uid == '0':
874
        return get_portal()
875
876
    brain = get_brain_by_uid(uid)
877
878
    if brain is None:
879
        if default is not _marker:
880
            return default
881
        fail("No object found for UID {}".format(uid))
882
883
    return get_object(brain)
884
885
886
def get_brain_by_uid(uid, default=None):
887
    """Query a brain by a given UID
888
889
    :param uid: The UID of the object to find
890
    :type uid: string
891
    :returns: ZCatalog brain or None
892
    """
893
    if not is_uid(uid):
894
        return default
895
896
    # we try to find the object with the UID catalog
897
    uc = get_tool(UID_CATALOG)
898
899
    # try to find the object with the reference catalog first
900
    brains = uc(UID=uid)
901
    if len(brains) != 1:
902
        return default
903
    return brains[0]
904
905
906
def get_object_by_path(path, default=_marker):
907
    """Find an object by a given physical path or absolute_url
908
909
    :param path: The physical path of the object to find
910
    :type path: string
911
    :returns: Found Object or None
912
    """
913
914
    # nothing to do here
915
    if not path:
916
        if default is not _marker:
917
            return default
918
        fail("get_object_by_path first argument must be a path; {} received"
919
             .format(path))
920
921
    portal = get_portal()
922
    portal_path = get_path(portal)
923
    portal_url = get_url(portal)
924
925
    # ensure we have a physical path
926
    if path.startswith(portal_url):
927
        request = get_request()
928
        path = "/".join(request.physicalPathFromURL(path))
929
930
    if not path.startswith(portal_path):
931
        if default is not _marker:
932
            return default
933
        fail("Not a physical path inside the portal.")
934
935
    if path == portal_path:
936
        return portal
937
938
    return portal.restrictedTraverse(path, default)
939
940
941
def get_path(brain_or_object):
942
    """Calculate the physical path of this object
943
944
    :param brain_or_object: A single catalog brain or content object
945
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
946
    :returns: Physical path of the object
947
    :rtype: string
948
    """
949
    if is_brain(brain_or_object):
950
        return brain_or_object.getPath()
951
    return "/".join(get_object(brain_or_object).getPhysicalPath())
952
953
954
def get_parent_path(brain_or_object):
955
    """Calculate the physical parent path of this object
956
957
    :param brain_or_object: A single catalog brain or content object
958
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
959
    :returns: Physical path of the parent object
960
    :rtype: string
961
    """
962
    if is_portal(brain_or_object):
963
        return get_path(get_portal())
964
    if is_brain(brain_or_object):
965
        path = get_path(brain_or_object)
966
        return path.rpartition("/")[0]
967
    return get_path(get_object(brain_or_object).aq_parent)
968
969
970
def get_parent(brain_or_object, **kw):
971
    """Locate the parent object of the content/catalog brain
972
973
    :param brain_or_object: A single catalog brain or content object
974
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
975
    :param catalog_search: Use a catalog query to find the parent object
976
    :type catalog_search: bool
977
    :returns: parent object
978
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
979
    """
980
981
    if is_portal(brain_or_object):
982
        return get_portal()
983
984
    # BBB: removed `catalog_search` keyword
985
    if kw:
986
        logger.warn("API function `get_parent` no longer support keywords.")
987
988
    return get_object(brain_or_object).aq_parent
989
990
991
def search(query, catalog=_marker):
992
    """Search for objects.
993
994
    :param query: A suitable search query.
995
    :type query: dict
996
    :param catalog: A single catalog id or a list of catalog ids
997
    :type catalog: str/list
998
    :returns: Search results
999
    :rtype: List of ZCatalog brains
1000
    """
1001
1002
    # query needs to be a dictionary
1003
    if not isinstance(query, dict):
1004
        fail("Catalog query needs to be a dictionary")
1005
1006
    # Portal types to query
1007
    portal_types = query.get("portal_type", [])
1008
    # We want the portal_type as a list
1009
    if not isinstance(portal_types, (tuple, list)):
1010
        portal_types = [portal_types]
1011
1012
    # The catalogs used for the query
1013
    catalogs = []
1014
1015
    # The user did **not** specify a catalog
1016
    if catalog is _marker:
1017
        # Find the registered catalogs for the queried portal types
1018
        for portal_type in portal_types:
1019
            # Just get the first registered/default catalog
1020
            catalogs.append(get_catalogs_for(
1021
                portal_type, default=UID_CATALOG)[0])
1022
    else:
1023
        # User defined catalogs
1024
        if isinstance(catalog, (list, tuple)):
1025
            catalogs.extend(map(get_tool, catalog))
1026
        else:
1027
            catalogs.append(get_tool(catalog))
1028
1029
    # Cleanup: Avoid duplicate catalogs
1030
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
1031
1032
    # We only support **single** catalog queries
1033
    if len(catalogs) > 1:
1034
        fail("Multi Catalog Queries are not supported!")
1035
1036
    return catalogs[0](query)
1037
1038
1039
def safe_getattr(brain_or_object, attr, default=_marker):
1040
    """Return the attribute value
1041
1042
    :param brain_or_object: A single catalog brain or content object
1043
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1044
    :param attr: Attribute name
1045
    :type attr: str
1046
    :returns: Attribute value
1047
    :rtype: obj
1048
    """
1049
    try:
1050
        value = getattr(brain_or_object, attr, _marker)
1051
        if value is _marker:
1052
            if default is not _marker:
1053
                return default
1054
            fail("Attribute '{}' not found.".format(attr))
1055
        if callable(value):
1056
            return value()
1057
        return value
1058
    except Unauthorized:
1059
        if default is not _marker:
1060
            return default
1061
        fail("You are not authorized to access '{}' of '{}'.".format(
1062
            attr, repr(brain_or_object)))
1063
1064
1065
def get_uid_catalog():
1066
    """Get the UID catalog tool
1067
1068
    :returns: UID Catalog Tool
1069
    """
1070
    return get_tool(UID_CATALOG)
1071
1072
1073
def get_portal_catalog():
1074
    """Get the portal catalog tool
1075
1076
    :returns: Portal Catalog Tool
1077
    """
1078
    return get_tool(PORTAL_CATALOG)
1079
1080
1081
def get_review_history(brain_or_object, rev=True):
1082
    """Get the review history for the given brain or context.
1083
1084
    :param brain_or_object: A single catalog brain or content object
1085
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1086
    :returns: Workflow history
1087
    :rtype: [{}, ...]
1088
    """
1089
    obj = get_object(brain_or_object)
1090
    review_history = []
1091
    try:
1092
        workflow = get_tool("portal_workflow")
1093
        review_history = workflow.getInfoFor(obj, 'review_history')
1094
    except WorkflowException as e:
1095
        message = str(e)
1096
        logger.error("Cannot retrieve review_history on {}: {}".format(
1097
            obj, message))
1098
    if not isinstance(review_history, (list, tuple)):
1099
        logger.error("get_review_history: expected list, received {}".format(
1100
            review_history))
1101
        review_history = []
1102
1103
    if isinstance(review_history, tuple):
1104
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
1105
        # tuple when "review_history" is passed in:
1106
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
1107
        #
1108
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
1109
        # Expression.StateChangeInfo, that always returns a list, except when no
1110
        # review_history is found:
1111
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
1112
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
1113
        review_history = list(review_history)
1114
1115
    if rev is True:
1116
        review_history.reverse()
1117
    return review_history
1118
1119
1120
def get_revision_history(brain_or_object):
1121
    """Get the revision history for the given brain or context.
1122
1123
    :param brain_or_object: A single catalog brain or content object
1124
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1125
    :returns: Workflow history
1126
    :rtype: obj
1127
    """
1128
    obj = get_object(brain_or_object)
1129
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
1130
    return chv.fullHistory()
1131
1132
1133
def get_workflows_for(brain_or_object):
1134
    """Get the assigned workflows for the given brain or context.
1135
1136
    Note: This function supports also the portal_type as parameter.
1137
1138
    :param brain_or_object: A single catalog brain or content object
1139
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1140
    :returns: Assigned Workflows
1141
    :rtype: tuple
1142
    """
1143
    workflow = ploneapi.portal.get_tool("portal_workflow")
1144
    if isinstance(brain_or_object, six.string_types):
1145
        return workflow.getChainFor(brain_or_object)
1146
    obj = get_object(brain_or_object)
1147
    return workflow.getChainFor(obj)
1148
1149
1150
def get_workflow_status_of(brain_or_object, state_var="review_state"):
1151
    """Get the current workflow status of the given brain or context.
1152
1153
    :param brain_or_object: A single catalog brain or content object
1154
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1155
    :param state_var: The name of the state variable
1156
    :type state_var: string
1157
    :returns: Status
1158
    :rtype: str
1159
    """
1160
    # Try to get the state from the catalog brain first
1161
    if is_brain(brain_or_object):
1162
        if state_var in brain_or_object.schema():
1163
            return brain_or_object[state_var]
1164
1165
    # Retrieve the sate from the object
1166
    workflow = get_tool("portal_workflow")
1167
    obj = get_object(brain_or_object)
1168
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1169
1170
1171
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1172
    """Get the previous workflow status of the object
1173
1174
    :param brain_or_object: A single catalog brain or content object
1175
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1176
    :param skip: Workflow states to skip
1177
    :type skip: tuple/list
1178
    :returns: status
1179
    :rtype: str
1180
    """
1181
1182
    skip = isinstance(skip, (list, tuple)) and skip or []
1183
    history = get_review_history(brain_or_object)
1184
1185
    # Remove consecutive duplicates, some transitions might happen more than
1186
    # once consecutively (e.g. publish)
1187
    history = map(lambda i: i[0], groupby(history))
1188
1189
    for num, item in enumerate(history):
1190
        # skip the current history entry
1191
        if num == 0:
1192
            continue
1193
        status = item.get("review_state")
1194
        if status in skip:
1195
            continue
1196
        return status
1197
    return default
1198
1199
1200
def get_creation_date(brain_or_object):
1201
    """Get the creation date of the brain or object
1202
1203
    :param brain_or_object: A single catalog brain or content object
1204
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1205
    :returns: Creation date
1206
    :rtype: DateTime
1207
    """
1208
    created = getattr(brain_or_object, "created", None)
1209
    if created is None:
1210
        fail("Object {} has no creation date ".format(
1211
             repr(brain_or_object)))
1212
    if callable(created):
1213
        return created()
1214
    return created
1215
1216
1217
def get_modification_date(brain_or_object):
1218
    """Get the modification date of the brain or object
1219
1220
    :param brain_or_object: A single catalog brain or content object
1221
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1222
    :returns: Modification date
1223
    :rtype: DateTime
1224
    """
1225
    modified = getattr(brain_or_object, "modified", None)
1226
    if modified is None:
1227
        fail("Object {} has no modification date ".format(
1228
             repr(brain_or_object)))
1229
    if callable(modified):
1230
        return modified()
1231
    return modified
1232
1233
1234
def get_review_status(brain_or_object):
1235
    """Get the `review_state` of an object
1236
1237
    :param brain_or_object: A single catalog brain or content object
1238
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1239
    :returns: Value of the review_status variable
1240
    :rtype: String
1241
    """
1242
    if is_brain(brain_or_object) \
1243
       and base_hasattr(brain_or_object, "review_state"):
1244
        return brain_or_object.review_state
1245
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1246
1247
1248
def is_active(brain_or_object):
1249
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1250
1251
    :param brain_or_object: A single catalog brain or content object
1252
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1253
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1254
    :rtype: bool
1255
    """
1256
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1257
        return False
1258
    return True
1259
1260
1261
def get_fti(portal_type, default=None):
1262
    """Lookup the Dynamic Filetype Information for the given portal_type
1263
1264
    :param portal_type: The portal type to get the FTI for
1265
    :returns: FTI or default value
1266
    """
1267
    if not is_string(portal_type):
1268
        return default
1269
    portal_types = get_tool("portal_types")
1270
    fti = portal_types.getTypeInfo(portal_type)
1271
    return fti or default
1272
1273
1274
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1275
    """Get all registered catalogs for the given portal_type, catalog brain or
1276
    content object
1277
1278
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1279
          work around the missing `uid_catalog` during snapshot creation when
1280
          installing a fresh site!
1281
1282
    :param brain_or_object: The portal_type, a catalog brain or content object
1283
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1284
    :returns: List of supported catalogs
1285
    :rtype: list
1286
    """
1287
1288
    # only handle catalog lookups by portal_type internally
1289
    if is_uid(brain_or_object) or is_object(brain_or_object):
1290
        obj = get_object(brain_or_object)
1291
        portal_type = get_portal_type(obj)
1292
        return get_catalogs_for(portal_type)
1293
1294
    catalogs = []
1295
1296
    if not is_string(brain_or_object):
1297
        raise APIError("Expected a portal_type string, got <%s>"
1298
                       % type(brain_or_object))
1299
1300
    # at this point the brain_or_object is a portal_type
1301
    portal_type = brain_or_object
1302
1303
    # check static portal_type -> catalog mapping first
1304
    from senaite.core.catalog import get_catalogs_by_type
1305
    catalogs = get_catalogs_by_type(portal_type)
1306
1307
    # no catalogs in static mapping
1308
    # => Lookup catalogs by FTI
1309
    if len(catalogs) == 0:
1310
        fti = get_fti(portal_type)
1311
        if fti.product:
1312
            # AT content type
1313
            # => Looup via archetype_tool
1314
            archetype_tool = get_tool("archetype_tool")
1315
            catalogs = archetype_tool.catalog_map.get(portal_type) or []
1316
        else:
1317
            # DX content type
1318
            # => resolve the `_catalogs` attribute from the class
1319
            klass = resolveDottedName(fti.klass)
1320
            # XXX: Refactor multi-catalog behavior to not rely
1321
            #      on this hidden `_catalogs` attribute!
1322
            catalogs = getattr(klass, "_catalogs", [])
1323
1324
    # fetch the catalog objects
1325
    catalogs = filter(None, map(lambda cid: get_tool(cid, None), catalogs))
1326
1327
    if len(catalogs) == 0:
1328
        return [get_tool(default, default=PORTAL_CATALOG)]
1329
1330
    return list(catalogs)
1331
1332
1333
def get_transitions_for(brain_or_object):
1334
    """List available workflow transitions for all workflows
1335
1336
    :param brain_or_object: A single catalog brain or content object
1337
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1338
    :returns: All possible available and allowed transitions
1339
    :rtype: list[dict]
1340
    """
1341
    workflow = get_tool('portal_workflow')
1342
    transitions = []
1343
    instance = get_object(brain_or_object)
1344
    for wfid in get_workflows_for(brain_or_object):
1345
        wf = workflow[wfid]
1346
        tlist = wf.getTransitionsFor(instance)
1347
        transitions.extend([t for t in tlist if t not in transitions])
1348
    return transitions
1349
1350
1351
def do_transition_for(brain_or_object, transition):
1352
    """Performs a workflow transition for the passed in object.
1353
1354
    :param brain_or_object: A single catalog brain or content object
1355
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1356
    :returns: The object where the transtion was performed
1357
    """
1358
    if not isinstance(transition, six.string_types):
1359
        fail("Transition type needs to be string, got '%s'" % type(transition))
1360
    obj = get_object(brain_or_object)
1361
    try:
1362
        ploneapi.content.transition(obj, transition)
1363
    except ploneapi.exc.InvalidParameterError as e:
1364
        fail("Failed to perform transition '{}' on {}: {}".format(
1365
             transition, obj, str(e)))
1366
    return obj
1367
1368
1369
def get_roles_for_permission(permission, brain_or_object):
1370
    """Get a list of granted roles for the given permission on the object.
1371
1372
    :param brain_or_object: A single catalog brain or content object
1373
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1374
    :returns: Roles for the given Permission
1375
    :rtype: list
1376
    """
1377
    obj = get_object(brain_or_object)
1378
    allowed = set(rolesForPermissionOn(permission, obj))
1379
    return sorted(allowed)
1380
1381
1382
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1383
    """Checks if the passed in object is versionable.
1384
1385
    :param brain_or_object: A single catalog brain or content object
1386
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1387
    :returns: True if the object is versionable
1388
    :rtype: bool
1389
    """
1390
    pr = get_tool("portal_repository")
1391
    obj = get_object(brain_or_object)
1392
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1393
        and pr.isVersionable(obj)
1394
1395
1396
def get_version(brain_or_object):
1397
    """Get the version of the current object
1398
1399
    :param brain_or_object: A single catalog brain or content object
1400
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1401
    :returns: The current version of the object, or None if not available
1402
    :rtype: int or None
1403
    """
1404
    obj = get_object(brain_or_object)
1405
    if not is_versionable(obj):
1406
        return None
1407
    return getattr(aq_base(obj), "version_id", 0)
1408
1409
1410
def get_view(name, context=None, request=None, default=None):
1411
    """Get the view by name
1412
1413
    :param name: The name of the view
1414
    :type name: str
1415
    :param context: The context to query the view
1416
    :type context: ATContentType/DexterityContentType/CatalogBrain
1417
    :param request: The request to query the view
1418
    :type request: HTTPRequest object
1419
    :returns: HTTP Request
1420
    :rtype: Products.Five.metaclass View object
1421
    """
1422
    context = context or get_portal()
1423
    request = request or get_request() or None
1424
    view = queryMultiAdapter((get_object(context), request), name=name)
1425
    if view is None:
1426
        return default
1427
    return view
1428
1429
1430
def get_request():
1431
    """Get the global request object
1432
1433
    :returns: HTTP Request
1434
    :rtype: HTTPRequest object
1435
    """
1436
    return globalrequest.getRequest()
1437
1438
1439
def get_test_request():
1440
    """Get the TestRequest object
1441
    """
1442
    request = TestRequest()
1443
    directlyProvides(request, IAttributeAnnotatable)
1444
    return request
1445
1446
1447
def get_group(group_or_groupname):
1448
    """Return Plone Group
1449
1450
    :param group_or_groupname: Plone group or the name of the group
1451
    :type groupname:  GroupData/str
1452
    :returns: Plone GroupData
1453
    """
1454
    if not group_or_groupname:
1455
1456
        return None
1457
    if hasattr(group_or_groupname, "_getGroup"):
1458
        return group_or_groupname
1459
    gtool = get_tool("portal_groups")
1460
    return gtool.getGroupById(group_or_groupname)
1461
1462
1463
def get_user(user_or_username):
1464
    """Return Plone User
1465
1466
    :param user_or_username: Plone user or user id
1467
    :returns: Plone MemberData
1468
    """
1469
    user = None
1470
    if isinstance(user_or_username, MemberData):
1471
        user = user_or_username
1472
    if isinstance(user_or_username, six.string_types):
1473
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1474
    return user
1475
1476
1477
def get_user_properties(user_or_username):
1478
    """Return User Properties
1479
1480
    :param user_or_username: Plone group identifier
1481
    :returns: Plone MemberData
1482
    """
1483
    user = get_user(user_or_username)
1484
    if user is None:
1485
        return {}
1486
    if not callable(user.getUser):
1487
        return {}
1488
    out = {}
1489
    plone_user = user.getUser()
1490
    for sheet in plone_user.listPropertysheets():
1491
        ps = plone_user.getPropertysheet(sheet)
1492
        out.update(dict(ps.propertyItems()))
1493
    return out
1494
1495
1496
def get_users_by_roles(roles=None):
1497
    """Search Plone users by their roles
1498
1499
    :param roles: Plone role name or list of roles
1500
    :type roles:  list/str
1501
    :returns: List of Plone users having the role(s)
1502
    """
1503
    if not isinstance(roles, (tuple, list)):
1504
        roles = [roles]
1505
    mtool = get_tool("portal_membership")
1506
    return mtool.searchForMembers(roles=roles)
1507
1508
1509
def get_current_user():
1510
    """Returns the current logged in user
1511
1512
    :returns: Current User
1513
    """
1514
    return ploneapi.user.get_current()
1515
1516
1517
def get_user_contact(user, contact_types=None):
1518
    """Returns the associated contact of a Plone user
1519
1520
    If the user passed in has no contact associated, return None.
1521
    The `contact_types` parameter filter the portal types for the search.
1522
1523
    :param: Plone user
1524
    :contact_types: List with the contact portal types to search
1525
    :returns: Contact associated to the Plone user or None
1526
    """
1527
    if not user:
1528
        return None
1529
1530
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1531
1532
    if contact_types is None:
1533
        contact_types = ["Contact", "LabContact"]
1534
1535
    query = {"portal_type": contact_types, "getUsername": user.getId()}
1536
    brains = search(query, catalog=CONTACT_CATALOG)
1537
    if not brains:
1538
        return None
1539
1540
    if len(brains) > 1:
1541
        # Oops, the user has multiple contacts assigned, return None
1542
        contacts = map(lambda c: c.Title, brains)
1543
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1544
        err_msg = err_msg.format(user.getId(), ','.join(contacts))
1545
        logger.error(err_msg)
1546
        return None
1547
1548
    return get_object(brains[0])
1549
1550
1551
def get_user_client(user_or_contact):
1552
    """Returns the client of the contact of a Plone user
1553
1554
    If the user passed in has no contact or does not belong to any client,
1555
    returns None.
1556
1557
    :param: Plone user or contact
1558
    :returns: Client the contact of the Plone user belongs to
1559
    """
1560
    if not user_or_contact or is_lab_contact(user_or_contact):
1561
        # Lab contacts cannot belong to a client
1562
        return None
1563
1564
    if not is_contact(user_or_contact):
1565
        contact = get_user_contact(user_or_contact, contact_types=["Contact"])
1566
        if is_client_contact(contact):
1567
            return get_user_client(contact)
1568
        return None
1569
1570
    client = get_parent(user_or_contact)
1571
    if client and IClient.providedBy(client):
1572
        return client
1573
1574
    return None
1575
1576
1577
def get_user_fullname(user_or_contact):
1578
    """Returns the fullname of the contact or Plone user.
1579
1580
    If the user has a linked contact, the fullname of the contact has priority
1581
    over the value of the fullname property from the user
1582
1583
    :param: Plone user or contact
1584
    :returns: Fullname of the contact or user
1585
    """
1586
    if is_contact(user_or_contact):
1587
        return user_or_contact.getFullname()
1588
1589
    user = get_user(user_or_contact)
1590
    if not user:
1591
        return ""
1592
1593
    # contact's fullname has priority over user's
1594
    contact = get_user_contact(user)
1595
    if not contact:
1596
        return user.getProperty("fullname")
1597
1598
    return contact.getFullname()
1599
1600
1601
def get_user_email(user_or_contact):
1602
    """Returns the email of the contact or Plone user.
1603
    If the user has a linked contact, the email of the contact has priority
1604
    over the value of the email property from the user
1605
    :param: Plone user or contact
1606
    :returns: Fullname of the contact or user
1607
    """
1608
    if is_contact(user_or_contact):
1609
        return user_or_contact.getEmailAddress()
1610
1611
    user = get_user(user_or_contact)
1612
    if not user:
1613
        return ""
1614
1615
    # contact's email has priority over user's
1616
    contact = get_user_contact(user)
1617
    if not contact:
1618
        return user.getProperty("email", default="")
1619
1620
    return contact.getEmailAddress()
1621
1622
1623
def is_lab_contact(brain_or_object):
1624
    """Checks if the brain or object is a Lab Contact
1625
1626
    :returns: True if the brain or object is a Lab Contact, False otherwise
1627
    """
1628
    if not is_object(brain_or_object):
1629
        return False
1630
    obj = get_object(brain_or_object)
1631
    return ILabContact.providedBy(obj)
1632
1633
1634
def is_client_contact(brain_or_object):
1635
    """Checks if the brain or object is a Client Contact
1636
1637
    :returns: True if the brain or object is a Client Contact, False otherwise
1638
    """
1639
    if not is_object(brain_or_object):
1640
        return False
1641
    obj = get_object(brain_or_object)
1642
    return IContact.providedBy(obj) and not is_global_contact(obj)
1643
1644
1645
def is_global_contact(brain_or_object):
1646
    """Checks if the brain or object is a global contact
1647
1648
    :returns: True if the brain or object is a global contact, False otherwise
1649
    """
1650
    if not is_object(brain_or_object):
1651
        return False
1652
    obj = get_object(brain_or_object)
1653
    if not IContact.providedBy(obj):
1654
        return False
1655
    parent = get_parent(brain_or_object)
1656
    if not IContacts.providedBy(parent):
1657
        return False
1658
    return True
1659
1660
1661
def is_contact(brain_or_object):
1662
    """Checks if the brain or object is a Client Contact or Lab Contact
1663
1664
    :returns: True if the brain or object is a Contact, False otherwise
1665
    """
1666
    if is_client_contact(brain_or_object):
1667
        return True
1668
    if is_lab_contact(brain_or_object):
1669
        return True
1670
    if is_global_contact(brain_or_object):
1671
        return True
1672
    return False
1673
1674
1675
def get_current_client():
1676
    """Returns the current client the current logged in user belongs to, if any
1677
1678
    :returns: Client the current logged in user belongs to or None
1679
    """
1680
    return get_user_client(get_current_user())
1681
1682
1683
def get_cache_key(brain_or_object):
1684
    """Generate a cache key for a common brain or object
1685
1686
    :param brain_or_object: A single catalog brain or content object
1687
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1688
    :returns: Cache Key
1689
    :rtype: str
1690
    """
1691
    key = [
1692
        get_portal_type(brain_or_object),
1693
        get_id(brain_or_object),
1694
        get_uid(brain_or_object),
1695
        # handle different domains gracefully
1696
        get_url(brain_or_object),
1697
        # Return the microsecond since the epoch in GMT
1698
        get_modification_date(brain_or_object).micros(),
1699
    ]
1700
    return "-".join(map(lambda x: str(x), key))
1701
1702
1703
def bika_cache_key_decorator(method, self, brain_or_object):
1704
    """Bika cache key decorator usable for
1705
1706
    :param brain_or_object: A single catalog brain or content object
1707
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1708
    :returns: Cache Key
1709
    :rtype: str
1710
    """
1711
    if brain_or_object is None:
1712
        raise DontCache
1713
    return get_cache_key(brain_or_object)
1714
1715
1716
def normalize_id(string):
1717
    """Normalize the id
1718
1719
    :param string: A string to normalize
1720
    :type string: str
1721
    :returns: Normalized ID
1722
    :rtype: str
1723
    """
1724
    if not isinstance(string, six.string_types):
1725
        fail("Type of argument must be string, found '{}'"
1726
             .format(type(string)))
1727
    # get the id nomalizer utility
1728
    normalizer = getUtility(IIDNormalizer).normalize
1729
    return normalizer(string)
1730
1731
1732
def normalize_filename(string):
1733
    """Normalize the filename
1734
1735
    :param string: A string to normalize
1736
    :type string: str
1737
    :returns: Normalized ID
1738
    :rtype: str
1739
    """
1740
    if not isinstance(string, six.string_types):
1741
        fail("Type of argument must be string, found '{}'"
1742
             .format(type(string)))
1743
    # get the file nomalizer utility
1744
    normalizer = getUtility(IFileNameNormalizer).normalize
1745
    return normalizer(string)
1746
1747
1748
def is_uid(uid, validate=False):
1749
    """Checks if the passed in uid is a valid UID
1750
1751
    :param uid: The uid to check
1752
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1753
    True, also verifies if a brain exists for the uid passed in
1754
    :type uid: string
1755
    :return: True if a valid uid
1756
    :rtype: bool
1757
    """
1758
    if not isinstance(uid, six.string_types):
1759
        return False
1760
    if uid == '0':
1761
        return True
1762
    if len(uid) != 32:
1763
        return False
1764
    if not UID_RX.match(uid):
1765
        return False
1766
    if not validate:
1767
        return True
1768
1769
    # Check if a brain for this uid exists
1770
    uc = get_tool('uid_catalog')
1771
    brains = uc(UID=uid)
1772
    if brains:
1773
        assert (len(brains) == 1)
1774
    return len(brains) > 0
1775
1776
1777
def is_date(date):
1778
    """Checks if the passed in value is a valid Zope's DateTime
1779
1780
    :param date: The date to check
1781
    :type date: DateTime
1782
    :return: True if a valid date
1783
    :rtype: bool
1784
    """
1785
    if not date:
1786
        return False
1787
    return isinstance(date, (DateTime, datetime))
1788
1789
1790
def to_date(value, default=None):
1791
    """Tries to convert the passed in value to Zope's DateTime
1792
1793
    :param value: The value to be converted to a valid DateTime
1794
    :type value: str, DateTime or datetime
1795
    :return: The DateTime representation of the value passed in or default
1796
    """
1797
1798
    # cannot use bika.lims.deprecated (circular dependencies)
1799
    import warnings
1800
    warnings.simplefilter("always", DeprecationWarning)
1801
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1802
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1803
    warnings.simplefilter("default", DeprecationWarning)
1804
1805
    # prevent circular dependencies
1806
    from senaite.core.api.dtime import to_DT
1807
    date = to_DT(value)
1808
    if not date:
1809
        return to_DT(default)
1810
    return date
1811
1812
1813
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1814
               round_to_int=True):
1815
    """Returns the computed total number of minutes
1816
    """
1817
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1818
        float(seconds)/60 + float(milliseconds)/1000/60
1819
    return int(round(total)) if round_to_int else total
1820
1821
1822
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1823
    """Returns a representation of time in a string in xd yh zm format
1824
    """
1825
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1826
                         seconds=seconds, milliseconds=milliseconds)
1827
    delta = timedelta(minutes=int(round(minutes)))
1828
    d = delta.days
1829
    h = delta.seconds // 3600
1830
    m = (delta.seconds // 60) % 60
1831
    m = m and "{}m ".format(str(m)) or ""
1832
    d = d and "{}d ".format(str(d)) or ""
1833
    if m and d:
1834
        h = "{}h ".format(str(h))
1835
    else:
1836
        h = h and "{}h ".format(str(h)) or ""
1837
    return "".join([d, h, m]).strip()
1838
1839
1840
def to_int(value, default=_marker):
1841
    """Tries to convert the value to int.
1842
    Truncates at the decimal point if the value is a float
1843
1844
    :param value: The value to be converted to an int
1845
    :return: The resulting int or default
1846
    """
1847
    if is_floatable(value):
1848
        value = to_float(value)
1849
    try:
1850
        return int(value)
1851
    except (TypeError, ValueError):
1852
        if default is None:
1853
            return default
1854
        if default is not _marker:
1855
            return to_int(default)
1856
        fail("Value %s cannot be converted to int" % repr(value))
1857
1858
1859
def is_floatable(value):
1860
    """Checks if the passed in value is a valid floatable number
1861
1862
    :param value: The value to be evaluated as a float number
1863
    :type value: str, float, int
1864
    :returns: True if is a valid float number
1865
    :rtype: bool"""
1866
    try:
1867
        float(value)
1868
        return True
1869
    except (TypeError, ValueError):
1870
        return False
1871
1872
1873
def to_float(value, default=_marker):
1874
    """Converts the passed in value to a float number
1875
1876
    :param value: The value to be converted to a floatable number
1877
    :type value: str, float, int
1878
    :returns: The float number representation of the passed in value
1879
    :rtype: float
1880
    """
1881
    if not is_floatable(value):
1882
        if default is not _marker:
1883
            return to_float(default)
1884
        fail("Value %s is not floatable" % repr(value))
1885
    return float(value)
1886
1887
1888
def float_to_string(value, default=_marker):
1889
    """Convert a float value to string without exponential notation
1890
1891
    This function preserves the whole fraction
1892
1893
    :param value: The float value to be converted to a string
1894
    :type value: str, float, int
1895
    :returns: String representation of the float w/o exponential notation
1896
    :rtype: str
1897
    """
1898
    if not is_floatable(value):
1899
        if default is not _marker:
1900
            return default
1901
        fail("Value %s is not floatable" % repr(value))
1902
1903
    # Leave floatable string values unchanged
1904
    if isinstance(value, six.string_types):
1905
        return value
1906
1907
    value = float(value)
1908
    str_value = str(value)
1909
1910
    if "." in str_value:
1911
        # might be something like 1.23e-26
1912
        front, back = str_value.split(".")
1913
    else:
1914
        # or 1e-07 for 0.0000001
1915
        back = str_value
1916
1917
    if "e-" in back:
1918
        fraction, zeros = back.split("e-")
1919
        # we want to cover the faction and the zeros
1920
        precision = len(fraction) + int(zeros)
1921
        template = "{:.%df}" % precision
1922
        str_value = template.format(value)
1923
    elif "e+" in back:
1924
        # positive numbers, e.g. 1e+16 don't need a fractional part
1925
        str_value = "{:.0f}".format(value)
1926
1927
    # cut off trailing zeros
1928
    if "." in str_value:
1929
        str_value = str_value.rstrip("0").rstrip(".")
1930
1931
    return str_value
1932
1933
1934
def to_searchable_text_metadata(value):
1935
    """Parse the given metadata value to searchable text
1936
1937
    :param value: The raw value of the metadata column
1938
    :returns: Searchable and translated unicode value or None
1939
    """
1940
    if not value:
1941
        return u""
1942
    if value is Missing.Value:
1943
        return u""
1944
    if is_uid(value):
1945
        return u""
1946
    if isinstance(value, (bool)):
1947
        return u""
1948
    if isinstance(value, (list, tuple)):
1949
        values = map(to_searchable_text_metadata, value)
1950
        values = filter(None, values)
1951
        return " ".join(values)
1952
    if isinstance(value, dict):
1953
        return to_searchable_text_metadata(value.values())
1954
    if is_date(value):
1955
        from senaite.core.api.dtime import date_to_string
1956
        return date_to_string(value, "%Y-%m-%d")
1957
    if is_at_content(value):
1958
        return to_searchable_text_metadata(get_title(value))
1959
    if not isinstance(value, six.string_types):
1960
        value = str(value)
1961
    return safe_unicode(value)
1962
1963
1964
def get_registry_record(name, default=None):
1965
    """Returns the value of a registry record
1966
1967
    :param name: [required] name of the registry record
1968
    :type name: str
1969
    :param default: The value returned if the record is not found
1970
    :type default: anything
1971
    :returns: value of the registry record
1972
    """
1973
    return ploneapi.portal.get_registry_record(name, default=default)
1974
1975
1976
def to_display_list(pairs, sort_by="key", allow_empty=True):
1977
    """Create a Plone DisplayList from list items
1978
1979
    :param pairs: list of key, value pairs
1980
    :param sort_by: Sort the items either by key or value
1981
    :param allow_empty: Allow to select an empty value
1982
    :returns: Plone DisplayList
1983
    """
1984
    dl = DisplayList()
1985
1986
    if isinstance(pairs, six.string_types):
1987
        pairs = [pairs, pairs]
1988
    for pair in pairs:
1989
        # pairs is a list of lists -> add each pair
1990
        if isinstance(pair, (tuple, list)):
1991
            dl.add(*pair)
1992
        # pairs is just a single pair -> add it and stop
1993
        if isinstance(pair, six.string_types):
1994
            dl.add(*pairs)
1995
            break
1996
1997
    # add the empty option
1998
    if allow_empty:
1999
        dl.add("", "")
2000
2001
    # sort by key/value
2002
    if sort_by == "key":
2003
        dl = dl.sortedByKey()
2004
    elif sort_by == "value":
2005
        dl = dl.sortedByValue()
2006
2007
    return dl
2008
2009
2010
def text_to_html(text, wrap="p", encoding="utf8"):
2011
    """Convert `\n` sequences in the text to HTML `\n`
2012
2013
    :param text: Plain text to convert
2014
    :param wrap: Toggle to wrap the text in a
2015
    :returns: HTML converted and encoded text
2016
    """
2017
    if not text:
2018
        return ""
2019
    # handle text internally as unicode
2020
    text = safe_unicode(text)
2021
    # replace newline characters with HTML entities
2022
    html = text.replace("\n", "<br/>")
2023
    if wrap:
2024
        html = u"<{tag}>{html}</{tag}>".format(
2025
            tag=wrap, html=html)
2026
    # return encoded html
2027
    return html.encode(encoding)
2028
2029
2030
def to_utf8(string, default=_marker):
2031
    """Encode string to UTF8
2032
2033
    :param string: String to be encoded to UTF8
2034
    :returns: UTF8 encoded string
2035
    """
2036
    if not isinstance(string, six.string_types):
2037
        if default is _marker:
2038
            fail("Expected string type, got '%s'" % type(string))
2039
        return default
2040
    return safe_unicode(string).encode("utf8")
2041
2042
2043
def is_temporary(obj):
2044
    """Returns whether the given object is temporary or not
2045
2046
    :param obj: the object to evaluate
2047
    :returns: True if the object is temporary
2048
    """
2049
    if ITemporaryObject.providedBy(obj):
2050
        return True
2051
2052
    obj_id = getattr(aq_base(obj), "id", None)
2053
    if obj_id is None or UID_RX.match(obj_id):
2054
        return True
2055
2056
    parent = aq_parent(aq_inner(obj))
2057
    if not parent:
2058
        return True
2059
2060
    parent_id = getattr(aq_base(parent), "id", None)
2061
    if parent_id is None or UID_RX.match(parent_id):
2062
        return True
2063
2064
    # Checks to see if we are created inside the portal_factory.
2065
    # This might also happen for DX types in senaite.databox!
2066
    meta_type = getattr(aq_base(parent), "meta_type", "")
2067
    if meta_type == "TempFolder":
2068
        return True
2069
2070
    return False
2071
2072
2073
def mark_temporary(brain_or_object):
2074
    """Mark the object as temporary
2075
    """
2076
    obj = get_object(brain_or_object)
2077
    alsoProvides(obj, ITemporaryObject)
2078
2079
2080
def unmark_temporary(brain_or_object):
2081
    """Unmark the object as temporary
2082
    """
2083
    obj = get_object(brain_or_object)
2084
    noLongerProvides(obj, ITemporaryObject)
2085
2086
2087
def is_string(thing):
2088
    """Checks if the passed in object is a string type
2089
2090
    :param thing: object to test
2091
    :returns: True if the object is a string
2092
    """
2093
    return isinstance(thing, six.string_types)
2094
2095
2096
def is_list(thing):
2097
    """Checks if the passed in object is a list type
2098
2099
    :param thing: object to test
2100
    :returns: True if the object is a list
2101
    """
2102
    return isinstance(thing, list)
2103
2104
2105
def is_list_iterable(thing):
2106
    """Checks if the passed in object can be iterated like a list
2107
2108
    :param thing: object to test
2109
    :returns: True if the object is a list, tuple or set
2110
    """
2111
    return isinstance(thing, (list, tuple, set))
2112
2113
2114
def parse_json(thing, default=""):
2115
    """Parse from JSON
2116
2117
    :param thing: thing to parse
2118
    :param default: value to return if cannot parse
2119
    :returns: the object representing the JSON or default
2120
    """
2121
    try:
2122
        return json.loads(thing)
2123
    except (TypeError, ValueError):
2124
        return default
2125
2126
2127
def to_list(value):
2128
    """Converts the value to a list
2129
2130
    :param value: the value to be represented as a list
2131
    :returns: a list that represents or contains the value
2132
    """
2133
    if is_string(value):
2134
        val = parse_json(value)
2135
        if isinstance(val, (list, tuple, set)):
2136
            value = val
2137
    if not isinstance(value, (list, tuple, set)):
2138
        value = [value]
2139
    return list(value)
2140
2141
2142
def validate(obj):
2143
    """Validates the full object
2144
2145
    :param obj: the object to validate the data against
2146
    :type obj: ATContentType/DexterityContentType
2147
    :returns: a dict with field names as keys and errors as values
2148
    """
2149
    if is_at_content(obj):
2150
        return obj.validate(data=True)
2151
2152
    if not is_dexterity_content(obj):
2153
        raise TypeError("%r is not supported" % type(obj))
2154
2155
    def is_string_field(field):
2156
        """Check if the field is a string field
2157
        """
2158
        return isinstance(field, (StringField)) or \
2159
            getattr(field, "_type", None) in [str]
2160
2161
    errors = {}
2162
    obj_data = {}
2163
2164
    # iterate through object fields and validate each
2165
    fields = get_fields(obj)
2166
    for field_name, field in fields.items():
2167
2168
        # extract the field value
2169
        value = getattr(obj, field_name, None)
2170
        if callable(value):
2171
            # Handle callable values, e.g. effective, expired etc.
2172
            value = value()
2173
        if isinstance(value, six.string_types):
2174
            value = safe_unicode(value)
2175
        if is_string_field(field):
2176
            # Provide UTF-8 encoded strings for StringField or NativeString.
2177
            # This prevents a WrongType error when the value is unicode instead
2178
            # of str. For example, the field used to store an object's ID is a
2179
            # StringField, which only accepts str. However, in Senaite, values
2180
            # are stored as unicode and returned as UTF-8 to maintain AT legacy
2181
            # behavior.
2182
            # Note that this "trick" only applies to top-level fields: a
2183
            # WrongContainedType error will be raised if the field is, e.g., a
2184
            # DataGridField whose subfield inherits from NativeString.
2185
            missing_value = getattr(field, "missing_value", None)
2186
            value = to_utf8(value, default=None) or missing_value
2187
2188
        # update obj_data for later use with invariants
2189
        obj_data[field_name] = value
2190
2191
        if field_name in SKIP_VALIDATION_FIELDS:
2192
            continue
2193
2194
        try:
2195
            field.validate(value)
2196
        except RequiredMissing:
2197
            errors[field_name] = "required field"
2198
        except WrongType:
2199
            # ignore wrong type errors if the field is not required and unset
2200
            if value is not None:
2201
                errors[field_name] = "wrong type"
2202
        except Invalid as ex:
2203
            errors[field_name] = translate(ex.message) or type(ex).__name__
2204
2205
    # validate invariants from schema
2206
    sch = get_schema(obj)
2207
    try:
2208
        sch.validateInvariants(ValidatorData(sch, obj_data, obj))
2209
    except Invalid as ex:
2210
        errors[sch.getName()] = translate(ex.message) or type(ex).__name__
2211
2212
    # validate invariants from behaviors
2213
    for behavior_id in get_behaviors(obj):
2214
        behavior = lookup_behavior_registration(behavior_id)
2215
        sch = behavior.interface
2216
        try:
2217
            sch.validateInvariants(ValidatorData(sch, obj_data, obj))
2218
        except Invalid as ex:
2219
            errors[behavior_id] = translate(ex.message) or type(ex).__name__
2220
2221
    return errors
2222
2223
2224
def get_portal_types():
2225
    """Returns a list with the registered portal types
2226
2227
    :returns: List of portal type names
2228
    :rtype: list
2229
    """
2230
    types_tool = get_tool("portal_types")
2231
    return types_tool.listContentTypes()
2232
2233
2234
def is_valid_id(thing, container=None):
2235
    """Checks if is a valid ID candidate based on the following conditions:
2236
2237
      - Contains only letters, numbers, hyphens ('-'), or underscores ('_').
2238
      - Starts with a letter or a number.
2239
      - Ends with a letter or a number.
2240
      - Has a minimum length of 3 characters.
2241
      - Does not match reserved words (e.g., 'REQUEST') or portal type names.
2242
2243
    If a container is provided, it also verifies the container does not have
2244
    any attribute or function with same id.
2245
2246
    :param id: the id to validate
2247
    :type id: str
2248
    :returns: True if the id meets all the conditions, False otherwise.
2249
    """
2250
    id_rx = re.compile(r"^[a-z0-9][a-z0-9_\-]+[a-z0-9]$")
2251
    illegal = re.compile(r"^(aq_|manage|request).*")
2252
2253
    if not is_string(thing):
2254
        return False
2255
2256
    # convert to lower to simplify regex
2257
    lower = thing.lower()
2258
2259
    # check length and characters
2260
    if not id_rx.match(lower):
2261
        return False
2262
2263
    # check for reserved/illegal word
2264
    if illegal.match(lower):
2265
        return False
2266
2267
    # check for portal type names
2268
    portal_types = get_portal_types()
2269
    if thing in portal_types:
2270
        return False
2271
2272
    # check for container attributes and functions
2273
    if container and hasattr(container, thing):
2274
        return False
2275
2276
    return True
2277