Passed
Push — 2.x ( 903acc...1f72ad )
by Jordi
09:59 queued 03:30
created

bika.lims.api.get_fti()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from plone import api as ploneapi
41
from plone.api.exc import InvalidParameterError
42
from plone.app.layout.viewlets.content import ContentHistoryView
43
from plone.behavior.interfaces import IBehaviorAssignable
44
from plone.dexterity.interfaces import IDexterityContent
45
from plone.dexterity.schema import SchemaInvalidatedEvent
46
from plone.dexterity.utils import addContentToContainer
47
from plone.dexterity.utils import createContent
48
from plone.dexterity.utils import resolveDottedName
49
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
50
from plone.i18n.normalizer.interfaces import IIDNormalizer
51
from plone.memoize.volatile import DontCache
52
from Products.Archetypes.atapi import DisplayList
53
from Products.Archetypes.BaseObject import BaseObject
54
from Products.Archetypes.event import ObjectInitializedEvent
55
from Products.Archetypes.utils import mapply
56
from Products.CMFCore.interfaces import IFolderish
57
from Products.CMFCore.interfaces import ISiteRoot
58
from Products.CMFCore.permissions import ModifyPortalContent
59
from Products.CMFCore.permissions import View
60
from Products.CMFCore.utils import getToolByName
61
from Products.CMFCore.WorkflowCore import WorkflowException
62
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
63
from Products.CMFPlone.utils import _createObjectByType
64
from Products.CMFPlone.utils import base_hasattr
65
from Products.CMFPlone.utils import safe_unicode
66
from Products.PlonePAS.tools.memberdata import MemberData
67
from Products.ZCatalog.interfaces import ICatalogBrain
68
from senaite.core.interfaces import ITemporaryObject
69
from zope import globalrequest
70
from zope.annotation.interfaces import IAttributeAnnotatable
71
from zope.component import getUtility
72
from zope.component import queryMultiAdapter
73
from zope.event import notify
74
from zope.interface import alsoProvides
75
from zope.interface import directlyProvides
76
from zope.interface import noLongerProvides
77
from zope.publisher.browser import TestRequest
78
from zope.schema import getFieldsInOrder
79
from zope.security.interfaces import Unauthorized
80
81
"""SENAITE LIMS Framework API
82
83
Please see tests/doctests/API.rst for documentation.
84
85
Architecural Notes:
86
87
Please add only functions that do a single thing for a single object.
88
89
Good: `def get_foo(brain_or_object)`
90
Bad:  `def get_foos(list_of_brain_objects)`
91
92
Why?
93
94
Because it makes things more complex. You can always use a pattern like this to
95
achieve the same::
96
97
    >>> foos = map(get_foo, list_of_brain_objects)
98
99
Please add for all functions a descriptive test in tests/doctests/API.rst.
100
101
Thanks.
102
"""
103
104
_marker = object()
105
106
UID_RX = re.compile("[a-z0-9]{32}$")
107
108
UID_CATALOG = "uid_catalog"
109
PORTAL_CATALOG = "portal_catalog"
110
111
112
class APIError(Exception):
113
    """Base exception class for bika.lims errors."""
114
115
116
def get_portal():
117
    """Get the portal object
118
119
    :returns: Portal object
120
    """
121
    return ploneapi.portal.getSite()
122
123
124
def get_setup():
125
    """Fetch the `bika_setup` folder.
126
    """
127
    portal = get_portal()
128
    return portal.get("bika_setup")
129
130
131
def get_bika_setup():
132
    """Fetch the `bika_setup` folder.
133
    """
134
    return get_setup()
135
136
137
def get_senaite_setup():
138
    """Fetch the new DX `setup` folder.
139
    """
140
    portal = get_portal()
141
    return portal.get("setup")
142
143
144
def create(container, portal_type, *args, **kwargs):
145
    """Creates an object in Bika LIMS
146
147
    This code uses most of the parts from the TypesTool
148
    see: `Products.CMFCore.TypesTool._constructInstance`
149
150
    :param container: container
151
    :type container: ATContentType/DexterityContentType/CatalogBrain
152
    :param portal_type: The portal type to create, e.g. "Client"
153
    :type portal_type: string
154
    :param title: The title for the new content object
155
    :type title: string
156
    :returns: The new created object
157
    """
158
    from bika.lims.utils import tmpID
159
160
    tmp_id = tmpID()
161
    id = kwargs.pop("id", "")
162
    title = kwargs.pop("title", "")
163
164
    # get the fti
165
    types_tool = get_tool("portal_types")
166
    fti = types_tool.getTypeInfo(portal_type)
167
168
    if fti.product:
169
        # create the AT object
170
        obj = _createObjectByType(portal_type, container, id or tmp_id)
171
        # update the object with values
172
        edit(obj, check_permissions=False, title=title, **kwargs)
173
        # auto-id if required
174
        if obj._at_rename_after_creation:
175
            obj._renameAfterCreation(check_auto_id=True)
176
        # we are no longer under creation
177
        obj.unmarkCreationFlag()
178
        # notify that the object was created
179
        notify(ObjectInitializedEvent(obj))
180
    else:
181
        content = createContent(portal_type, **kwargs)
182
        content.id = id
183
        content.title = title
184
        obj = addContentToContainer(container, content)
185
186
    return obj
187
188
189
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
190
    """Creates a copy of the source object. If container is None, creates the
191
    copy inside the same container as the source. If portal_type is specified,
192
    creates a new object of this type, and copies the values from source fields
193
    to the destination object. Field values sent as kwargs have priority over
194
    the field values from source.
195
196
    :param source: object from which create a copy
197
    :type source: ATContentType/DexterityContentType/CatalogBrain
198
    :param container: destination container
199
    :type container: ATContentType/DexterityContentType/CatalogBrain
200
    :param portal_type: destination portal type
201
    :returns: The new created object
202
    """
203
    # Prevent circular dependencies
204
    from security import check_permission
205
    # Use same container as source unless explicitly set
206
    source = get_object(source)
207
    if not container:
208
        container = get_parent(source)
209
210
    # Use same portal type as source unless explicitly set
211
    if not portal_type:
212
        portal_type = get_portal_type(source)
213
214
    # Extend the fields to skip with defaults
215
    skip = kwargs.pop("skip", [])
216
    skip = set(skip)
217
    skip.update([
218
        "Products.Archetypes.Field.ComputedField",
219
        "UID",
220
        "id",
221
        "allowDiscussion",
222
        "contributors",
223
        "creation_date",
224
        "creators",
225
        "effectiveDate",
226
        "expirationDate",
227
        "language",
228
        "location",
229
        "modification_date",
230
        "rights",
231
        "subject",
232
    ])
233
    # Build a dict for complexity reduction
234
    skip = dict([(item, True) for item in skip])
235
236
    # Update kwargs with the field values to copy from source
237
    fields = get_fields(source)
238
    for field_name, field in fields.items():
239
        # Prioritize field values passed as kwargs
240
        if field_name in kwargs:
241
            continue
242
        # Skip framework internal fields by name
243
        if skip.get(field_name, False):
244
            continue
245
        # Skip fields of non-suitable types
246
        if hasattr(field, "getType") and skip.get(field.getType(), False):
247
            continue
248
        # Skip readonly fields
249
        if getattr(field, "readonly", False):
250
            continue
251
        # Skip non-readable fields
252
        perm = getattr(field, "read_permission", View)
253
        if perm and not check_permission(perm, source):
254
            continue
255
256
        # do not wake-up objects unnecessarily
257
        if hasattr(field, "getRaw"):
258
            field_value = field.getRaw(source)
259
        elif hasattr(field, "get_raw"):
260
            field_value = field.get_raw(source)
261
        elif hasattr(field, "getAccessor"):
262
            accessor = field.getAccessor(source)
263
            field_value = accessor()
264
        else:
265
            field_value = field.get(source)
266
267
        # Do a hard copy of value if mutable type
268
        if isinstance(field_value, (list, dict, set)):
269
            field_value = copy.deepcopy(field_value)
270
        kwargs.update({field_name: field_value})
271
272
    # Create a copy
273
    return create(container, portal_type, *args, **kwargs)
274
275
276
def edit(obj, check_permissions=True, **kwargs):
277
    """Updates the values of object fields with the new values passed-in
278
    """
279
    # Prevent circular dependencies
280
    from security import check_permission
281
    fields = get_fields(obj)
282
    for name, value in kwargs.items():
283
        field = fields.get(name, None)
284
        if not field:
285
            continue
286
287
        # cannot update readonly fields
288
        readonly = getattr(field, "readonly", False)
289
        if readonly:
290
            raise ValueError("Field '{}' is readonly".format(name))
291
292
        # check field writable permission
293
        if check_permissions:
294
            perm = getattr(field, "write_permission", ModifyPortalContent)
295
            if perm and not check_permission(perm, obj):
296
                raise Unauthorized("Field '{}' is not writeable".format(name))
297
298
        # Set the value
299
        if hasattr(field, "getMutator"):
300
            mutator = field.getMutator(obj)
301
            mapply(mutator, value)
302
        else:
303
            field.set(obj, value)
304
305
306
def get_tool(name, context=None, default=_marker):
307
    """Get a portal tool by name
308
309
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
310
    :type name: string
311
    :param context: A portal object
312
    :type context: ATContentType/DexterityContentType/CatalogBrain
313
    :returns: Portal Tool
314
    """
315
316
    # Try first with the context
317
    if context is not None:
318
        try:
319
            context = get_object(context)
320
            return getToolByName(context, name)
321
        except (APIError, AttributeError) as e:
322
            # https://github.com/senaite/bika.lims/issues/396
323
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
324
                        "-> falling back to plone.api.portal.get_tool('{}')"
325
                        .format(repr(context), name, repr(e), name))
326
            return get_tool(name, default=default)
327
328
    # Try with the plone api
329
    try:
330
        return ploneapi.portal.get_tool(name)
331
    except InvalidParameterError:
332
        if default is not _marker:
333
            if isinstance(default, six.string_types):
334
                return get_tool(default)
335
            return default
336
        fail("No tool named '%s' found." % name)
337
338
339
def fail(msg=None):
340
    """API LIMS Error
341
    """
342
    if msg is None:
343
        msg = "Reason not given."
344
    raise APIError("{}".format(msg))
345
346
347
def is_object(brain_or_object):
348
    """Check if the passed in object is a supported portal content object
349
350
    :param brain_or_object: A single catalog brain or content object
351
    :type brain_or_object: Portal Object
352
    :returns: True if the passed in object is a valid portal content
353
    """
354
    if is_portal(brain_or_object):
355
        return True
356
    if is_supermodel(brain_or_object):
357
        return True
358
    if is_at_content(brain_or_object):
359
        return True
360
    if is_dexterity_content(brain_or_object):
361
        return True
362
    if is_brain(brain_or_object):
363
        return True
364
    return False
365
366
367
def get_object(brain_object_uid, default=_marker):
368
    """Get the full content object
369
370
    :param brain_object_uid: A catalog brain or content object or uid
371
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
372
    /CatalogBrain/basestring
373
    :returns: The full object
374
    """
375
    if is_uid(brain_object_uid):
376
        return get_object_by_uid(brain_object_uid, default=default)
377
    elif is_supermodel(brain_object_uid):
378
        return brain_object_uid.instance
379
    if not is_object(brain_object_uid):
380
        if default is _marker:
381
            fail("{} is not supported.".format(repr(brain_object_uid)))
382
        return default
383
    if is_brain(brain_object_uid):
384
        return brain_object_uid.getObject()
385
    return brain_object_uid
386
387
388
def is_portal(brain_or_object):
389
    """Checks if the passed in object is the portal root object
390
391
    :param brain_or_object: A single catalog brain or content object
392
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
393
    :returns: True if the object is the portal root object
394
    :rtype: bool
395
    """
396
    return ISiteRoot.providedBy(brain_or_object)
397
398
399
def is_brain(brain_or_object):
400
    """Checks if the passed in object is a portal catalog brain
401
402
    :param brain_or_object: A single catalog brain or content object
403
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
404
    :returns: True if the object is a catalog brain
405
    :rtype: bool
406
    """
407
    return ICatalogBrain.providedBy(brain_or_object)
408
409
410
def is_supermodel(brain_or_object):
411
    """Checks if the passed in object is a supermodel
412
413
    :param brain_or_object: A single catalog brain or content object
414
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
415
    :returns: True if the object is a catalog brain
416
    :rtype: bool
417
    """
418
    # avoid circular imports
419
    from senaite.app.supermodel.interfaces import ISuperModel
420
    return ISuperModel.providedBy(brain_or_object)
421
422
423
def is_dexterity_content(brain_or_object):
424
    """Checks if the passed in object is a dexterity content type
425
426
    :param brain_or_object: A single catalog brain or content object
427
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
428
    :returns: True if the object is a dexterity content type
429
    :rtype: bool
430
    """
431
    return IDexterityContent.providedBy(brain_or_object)
432
433
434
def is_at_content(brain_or_object):
435
    """Checks if the passed in object is an AT content type
436
437
    :param brain_or_object: A single catalog brain or content object
438
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
439
    :returns: True if the object is an AT content type
440
    :rtype: bool
441
    """
442
    return isinstance(brain_or_object, BaseObject)
443
444
445
def is_dx_type(portal_type):
446
    """Checks if the portal type is DX based
447
448
    :param portal_type: The portal type name to check
449
    :returns: True if the portal type is DX based
450
    """
451
    portal_types = get_tool("portal_types")
452
    fti = portal_types.getTypeInfo(portal_type)
453
    if fti.product:
454
        return False
455
    return True
456
457
458
def is_at_type(portal_type):
459
    """Checks if the portal type is AT based
460
461
    :param portal_type: The portal type name to check
462
    :returns: True if the portal type is AT based
463
    """
464
    return not is_dx_type(portal_type)
465
466
467
def is_folderish(brain_or_object):
468
    """Checks if the passed in object is folderish
469
470
    :param brain_or_object: A single catalog brain or content object
471
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
472
    :returns: True if the object is folderish
473
    :rtype: bool
474
    """
475
    if hasattr(brain_or_object, "is_folderish"):
476
        if callable(brain_or_object.is_folderish):
477
            return brain_or_object.is_folderish()
478
        return brain_or_object.is_folderish
479
    return IFolderish.providedBy(get_object(brain_or_object))
480
481
482
def get_portal_type(brain_or_object):
483
    """Get the portal type for this object
484
485
    :param brain_or_object: A single catalog brain or content object
486
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
487
    :returns: Portal type
488
    :rtype: string
489
    """
490
    if not is_object(brain_or_object):
491
        fail("{} is not supported.".format(repr(brain_or_object)))
492
    return brain_or_object.portal_type
493
494
495
def get_schema(brain_or_object):
496
    """Get the schema of the content
497
498
    :param brain_or_object: A single catalog brain or content object
499
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
500
    :returns: Schema object
501
    """
502
    obj = get_object(brain_or_object)
503
    if is_portal(obj):
504
        fail("get_schema can't return schema of portal root")
505
    if is_dexterity_content(obj):
506
        pt = get_tool("portal_types")
507
        fti = pt.getTypeInfo(obj.portal_type)
508
        return fti.lookupSchema()
509
    if is_at_content(obj):
510
        return obj.Schema()
511
    fail("{} has no Schema.".format(brain_or_object))
512
513
514
def get_behaviors(portal_type):
515
    """List all behaviors
516
517
    :param portal_type: DX portal type name
518
    """
519
    portal_types = get_tool("portal_types")
520
    fti = portal_types.getTypeInfo(portal_type)
521
    if fti.product:
522
        raise TypeError("Expected DX type, got AT type instead.")
523
    return fti.behaviors
524
525
526
def enable_behavior(portal_type, behavior_id):
527
    """Enable behavior
528
529
    :param portal_type: DX portal type name
530
    :param behavior_id: The behavior to enable
531
    """
532
    portal_types = get_tool("portal_types")
533
    fti = portal_types.getTypeInfo(portal_type)
534
    if fti.product:
535
        raise TypeError("Expected DX type, got AT type instead.")
536
537
    if behavior_id not in fti.behaviors:
538
        fti.behaviors += (behavior_id, )
539
        # invalidate schema cache
540
        notify(SchemaInvalidatedEvent(portal_type))
541
542
543
def disable_behavior(portal_type, behavior_id):
544
    """Disable behavior
545
546
    :param portal_type: DX portal type name
547
    :param behavior_id: The behavior to disable
548
    """
549
    portal_types = get_tool("portal_types")
550
    fti = portal_types.getTypeInfo(portal_type)
551
    if fti.product:
552
        raise TypeError("Expected DX type, got AT type instead.")
553
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
554
    # invalidate schema cache
555
    notify(SchemaInvalidatedEvent(portal_type))
556
557
558
def get_fields(brain_or_object):
559
    """Get a name to field mapping of the object
560
561
    :param brain_or_object: A single catalog brain or content object
562
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
563
    :returns: Mapping of name -> field
564
    :rtype: OrderedDict
565
    """
566
    obj = get_object(brain_or_object)
567
    schema = get_schema(obj)
568
    if is_dexterity_content(obj):
569
        # get the fields directly provided by the interface
570
        fields = getFieldsInOrder(schema)
571
        # append the fields coming from behaviors
572
        behavior_assignable = IBehaviorAssignable(obj)
573
        if behavior_assignable:
574
            behaviors = behavior_assignable.enumerateBehaviors()
575
            for behavior in behaviors:
576
                fields.extend(getFieldsInOrder(behavior.interface))
577
        return OrderedDict(fields)
578
    return OrderedDict(schema)
579
580
581
def get_id(brain_or_object):
582
    """Get the Plone ID for this object
583
584
    :param brain_or_object: A single catalog brain or content object
585
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
586
    :returns: Plone ID
587
    :rtype: string
588
    """
589
    if is_brain(brain_or_object):
590
        if base_hasattr(brain_or_object, "getId"):
591
            return brain_or_object.getId
592
        if base_hasattr(brain_or_object, "id"):
593
            return brain_or_object.id
594
    return get_object(brain_or_object).getId()
595
596
597
def get_title(brain_or_object):
598
    """Get the Title for this object
599
600
    :param brain_or_object: A single catalog brain or content object
601
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
602
    :returns: Title
603
    :rtype: string
604
    """
605
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
606
        return brain_or_object.Title
607
    return get_object(brain_or_object).Title()
608
609
610
def get_description(brain_or_object):
611
    """Get the Title for this object
612
613
    :param brain_or_object: A single catalog brain or content object
614
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
615
    :returns: Title
616
    :rtype: string
617
    """
618
    if is_brain(brain_or_object) \
619
            and base_hasattr(brain_or_object, "Description"):
620
        return brain_or_object.Description
621
    return get_object(brain_or_object).Description()
622
623
624
def get_uid(brain_or_object):
625
    """Get the Plone UID for this object
626
627
    :param brain_or_object: A single catalog brain or content object or an UID
628
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
629
    :returns: Plone UID
630
    :rtype: string
631
    """
632
    if is_uid(brain_or_object):
633
        return brain_or_object
634
    if is_portal(brain_or_object):
635
        return '0'
636
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
637
        return brain_or_object.UID
638
    return get_object(brain_or_object).UID()
639
640
641
def get_url(brain_or_object):
642
    """Get the absolute URL for this object
643
644
    :param brain_or_object: A single catalog brain or content object
645
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
646
    :returns: Absolute URL
647
    :rtype: string
648
    """
649
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
650
        return brain_or_object.getURL()
651
    return get_object(brain_or_object).absolute_url()
652
653
654
def get_icon(brain_or_object, html_tag=True):
655
    """Get the icon of the content object
656
657
    :param brain_or_object: A single catalog brain or content object
658
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
659
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
660
    :type html_tag: bool
661
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
662
    :rtype: string
663
    """
664
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
665
    # work for Contents coming from other catalogs than the
666
    # `portal_catalog`
667
    portal_types = get_tool("portal_types")
668
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
669
    icon = fti.getIcon()
670
    if not icon:
671
        return ""
672
    url = "%s/%s" % (get_url(get_portal()), icon)
673
    if not html_tag:
674
        return url
675
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
676
        url=url, title=get_title(brain_or_object))
677
    return tag
678
679
680
def get_object_by_uid(uid, default=_marker):
681
    """Find an object by a given UID
682
683
    :param uid: The UID of the object to find
684
    :type uid: string
685
    :returns: Found Object or None
686
    """
687
688
    # nothing to do here
689
    if not uid:
690
        if default is not _marker:
691
            return default
692
        fail("get_object_by_uid requires UID as first argument; got {} instead"
693
             .format(uid))
694
695
    # we defined the portal object UID to be '0'::
696
    if uid == '0':
697
        return get_portal()
698
699
    brain = get_brain_by_uid(uid)
700
701
    if brain is None:
702
        if default is not _marker:
703
            return default
704
        fail("No object found for UID {}".format(uid))
705
706
    return get_object(brain)
707
708
709
def get_brain_by_uid(uid, default=None):
710
    """Query a brain by a given UID
711
712
    :param uid: The UID of the object to find
713
    :type uid: string
714
    :returns: ZCatalog brain or None
715
    """
716
    if not is_uid(uid):
717
        return default
718
719
    # we try to find the object with the UID catalog
720
    uc = get_tool(UID_CATALOG)
721
722
    # try to find the object with the reference catalog first
723
    brains = uc(UID=uid)
724
    if len(brains) != 1:
725
        return default
726
    return brains[0]
727
728
729
def get_object_by_path(path, default=_marker):
730
    """Find an object by a given physical path or absolute_url
731
732
    :param path: The physical path of the object to find
733
    :type path: string
734
    :returns: Found Object or None
735
    """
736
737
    # nothing to do here
738
    if not path:
739
        if default is not _marker:
740
            return default
741
        fail("get_object_by_path first argument must be a path; {} received"
742
             .format(path))
743
744
    portal = get_portal()
745
    portal_path = get_path(portal)
746
    portal_url = get_url(portal)
747
748
    # ensure we have a physical path
749
    if path.startswith(portal_url):
750
        request = get_request()
751
        path = "/".join(request.physicalPathFromURL(path))
752
753
    if not path.startswith(portal_path):
754
        if default is not _marker:
755
            return default
756
        fail("Not a physical path inside the portal.")
757
758
    if path == portal_path:
759
        return portal
760
761
    return portal.restrictedTraverse(path, default)
762
763
764
def get_path(brain_or_object):
765
    """Calculate the physical path of this object
766
767
    :param brain_or_object: A single catalog brain or content object
768
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
769
    :returns: Physical path of the object
770
    :rtype: string
771
    """
772
    if is_brain(brain_or_object):
773
        return brain_or_object.getPath()
774
    return "/".join(get_object(brain_or_object).getPhysicalPath())
775
776
777
def get_parent_path(brain_or_object):
778
    """Calculate the physical parent path of this object
779
780
    :param brain_or_object: A single catalog brain or content object
781
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
782
    :returns: Physical path of the parent object
783
    :rtype: string
784
    """
785
    if is_portal(brain_or_object):
786
        return get_path(get_portal())
787
    if is_brain(brain_or_object):
788
        path = get_path(brain_or_object)
789
        return path.rpartition("/")[0]
790
    return get_path(get_object(brain_or_object).aq_parent)
791
792
793
def get_parent(brain_or_object, **kw):
794
    """Locate the parent object of the content/catalog brain
795
796
    :param brain_or_object: A single catalog brain or content object
797
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
798
    :param catalog_search: Use a catalog query to find the parent object
799
    :type catalog_search: bool
800
    :returns: parent object
801
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
802
    """
803
804
    if is_portal(brain_or_object):
805
        return get_portal()
806
807
    # BBB: removed `catalog_search` keyword
808
    if kw:
809
        logger.warn("API function `get_parent` no longer support keywords.")
810
811
    return get_object(brain_or_object).aq_parent
812
813
814
def search(query, catalog=_marker):
815
    """Search for objects.
816
817
    :param query: A suitable search query.
818
    :type query: dict
819
    :param catalog: A single catalog id or a list of catalog ids
820
    :type catalog: str/list
821
    :returns: Search results
822
    :rtype: List of ZCatalog brains
823
    """
824
825
    # query needs to be a dictionary
826
    if not isinstance(query, dict):
827
        fail("Catalog query needs to be a dictionary")
828
829
    # Portal types to query
830
    portal_types = query.get("portal_type", [])
831
    # We want the portal_type as a list
832
    if not isinstance(portal_types, (tuple, list)):
833
        portal_types = [portal_types]
834
835
    # The catalogs used for the query
836
    catalogs = []
837
838
    # The user did **not** specify a catalog
839
    if catalog is _marker:
840
        # Find the registered catalogs for the queried portal types
841
        for portal_type in portal_types:
842
            # Just get the first registered/default catalog
843
            catalogs.append(get_catalogs_for(
844
                portal_type, default=UID_CATALOG)[0])
845
    else:
846
        # User defined catalogs
847
        if isinstance(catalog, (list, tuple)):
848
            catalogs.extend(map(get_tool, catalog))
849
        else:
850
            catalogs.append(get_tool(catalog))
851
852
    # Cleanup: Avoid duplicate catalogs
853
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
854
855
    # We only support **single** catalog queries
856
    if len(catalogs) > 1:
857
        fail("Multi Catalog Queries are not supported!")
858
859
    return catalogs[0](query)
860
861
862
def safe_getattr(brain_or_object, attr, default=_marker):
863
    """Return the attribute value
864
865
    :param brain_or_object: A single catalog brain or content object
866
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
867
    :param attr: Attribute name
868
    :type attr: str
869
    :returns: Attribute value
870
    :rtype: obj
871
    """
872
    try:
873
        value = getattr(brain_or_object, attr, _marker)
874
        if value is _marker:
875
            if default is not _marker:
876
                return default
877
            fail("Attribute '{}' not found.".format(attr))
878
        if callable(value):
879
            return value()
880
        return value
881
    except Unauthorized:
882
        if default is not _marker:
883
            return default
884
        fail("You are not authorized to access '{}' of '{}'.".format(
885
            attr, repr(brain_or_object)))
886
887
888
def get_uid_catalog():
889
    """Get the UID catalog tool
890
891
    :returns: UID Catalog Tool
892
    """
893
    return get_tool(UID_CATALOG)
894
895
896
def get_portal_catalog():
897
    """Get the portal catalog tool
898
899
    :returns: Portal Catalog Tool
900
    """
901
    return get_tool(PORTAL_CATALOG)
902
903
904
def get_review_history(brain_or_object, rev=True):
905
    """Get the review history for the given brain or context.
906
907
    :param brain_or_object: A single catalog brain or content object
908
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
909
    :returns: Workflow history
910
    :rtype: [{}, ...]
911
    """
912
    obj = get_object(brain_or_object)
913
    review_history = []
914
    try:
915
        workflow = get_tool("portal_workflow")
916
        review_history = workflow.getInfoFor(obj, 'review_history')
917
    except WorkflowException as e:
918
        message = str(e)
919
        logger.error("Cannot retrieve review_history on {}: {}".format(
920
            obj, message))
921
    if not isinstance(review_history, (list, tuple)):
922
        logger.error("get_review_history: expected list, received {}".format(
923
            review_history))
924
        review_history = []
925
926
    if isinstance(review_history, tuple):
927
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
928
        # tuple when "review_history" is passed in:
929
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
930
        #
931
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
932
        # Expression.StateChangeInfo, that always returns a list, except when no
933
        # review_history is found:
934
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
935
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
936
        review_history = list(review_history)
937
938
    if rev is True:
939
        review_history.reverse()
940
    return review_history
941
942
943
def get_revision_history(brain_or_object):
944
    """Get the revision history for the given brain or context.
945
946
    :param brain_or_object: A single catalog brain or content object
947
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
948
    :returns: Workflow history
949
    :rtype: obj
950
    """
951
    obj = get_object(brain_or_object)
952
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
953
    return chv.fullHistory()
954
955
956
def get_workflows_for(brain_or_object):
957
    """Get the assigned workflows for the given brain or context.
958
959
    Note: This function supports also the portal_type as parameter.
960
961
    :param brain_or_object: A single catalog brain or content object
962
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
963
    :returns: Assigned Workflows
964
    :rtype: tuple
965
    """
966
    workflow = ploneapi.portal.get_tool("portal_workflow")
967
    if isinstance(brain_or_object, six.string_types):
968
        return workflow.getChainFor(brain_or_object)
969
    obj = get_object(brain_or_object)
970
    return workflow.getChainFor(obj)
971
972
973
def get_workflow_status_of(brain_or_object, state_var="review_state"):
974
    """Get the current workflow status of the given brain or context.
975
976
    :param brain_or_object: A single catalog brain or content object
977
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
978
    :param state_var: The name of the state variable
979
    :type state_var: string
980
    :returns: Status
981
    :rtype: str
982
    """
983
    # Try to get the state from the catalog brain first
984
    if is_brain(brain_or_object):
985
        if state_var in brain_or_object.schema():
986
            return brain_or_object[state_var]
987
988
    # Retrieve the sate from the object
989
    workflow = get_tool("portal_workflow")
990
    obj = get_object(brain_or_object)
991
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
992
993
994
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
995
    """Get the previous workflow status of the object
996
997
    :param brain_or_object: A single catalog brain or content object
998
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
999
    :param skip: Workflow states to skip
1000
    :type skip: tuple/list
1001
    :returns: status
1002
    :rtype: str
1003
    """
1004
1005
    skip = isinstance(skip, (list, tuple)) and skip or []
1006
    history = get_review_history(brain_or_object)
1007
1008
    # Remove consecutive duplicates, some transitions might happen more than
1009
    # once consecutively (e.g. publish)
1010
    history = map(lambda i: i[0], groupby(history))
1011
1012
    for num, item in enumerate(history):
1013
        # skip the current history entry
1014
        if num == 0:
1015
            continue
1016
        status = item.get("review_state")
1017
        if status in skip:
1018
            continue
1019
        return status
1020
    return default
1021
1022
1023
def get_creation_date(brain_or_object):
1024
    """Get the creation date of the brain or object
1025
1026
    :param brain_or_object: A single catalog brain or content object
1027
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1028
    :returns: Creation date
1029
    :rtype: DateTime
1030
    """
1031
    created = getattr(brain_or_object, "created", None)
1032
    if created is None:
1033
        fail("Object {} has no creation date ".format(
1034
             repr(brain_or_object)))
1035
    if callable(created):
1036
        return created()
1037
    return created
1038
1039
1040
def get_modification_date(brain_or_object):
1041
    """Get the modification date of the brain or object
1042
1043
    :param brain_or_object: A single catalog brain or content object
1044
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1045
    :returns: Modification date
1046
    :rtype: DateTime
1047
    """
1048
    modified = getattr(brain_or_object, "modified", None)
1049
    if modified is None:
1050
        fail("Object {} has no modification date ".format(
1051
             repr(brain_or_object)))
1052
    if callable(modified):
1053
        return modified()
1054
    return modified
1055
1056
1057
def get_review_status(brain_or_object):
1058
    """Get the `review_state` of an object
1059
1060
    :param brain_or_object: A single catalog brain or content object
1061
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1062
    :returns: Value of the review_status variable
1063
    :rtype: String
1064
    """
1065
    if is_brain(brain_or_object) \
1066
       and base_hasattr(brain_or_object, "review_state"):
1067
        return brain_or_object.review_state
1068
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1069
1070
1071
def is_active(brain_or_object):
1072
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1073
1074
    :param brain_or_object: A single catalog brain or content object
1075
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1076
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1077
    :rtype: bool
1078
    """
1079
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1080
        return False
1081
    return True
1082
1083
1084
def get_fti(portal_type, default=None):
1085
    """Lookup the Dynamic Filetype Information for the given portal_type
1086
1087
    :param portal_type: The portal type to get the FTI for
1088
    :returns: FTI or default value
1089
    """
1090
    if not is_string(portal_type):
1091
        return default
1092
    portal_types = get_tool("portal_types")
1093
    fti = portal_types.getTypeInfo(portal_type)
1094
    return fti or default
1095
1096
1097
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1098
    """Get all registered catalogs for the given portal_type, catalog brain or
1099
    content object
1100
1101
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1102
          work around the missing `uid_catalog` during snapshot creation when
1103
          installing a fresh site!
1104
1105
    :param brain_or_object: The portal_type, a catalog brain or content object
1106
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1107
    :returns: List of supported catalogs
1108
    :rtype: list
1109
    """
1110
1111
    # only handle catalog lookups by portal_type internally
1112
    if is_uid(brain_or_object) or is_object(brain_or_object):
1113
        obj = get_object(brain_or_object)
1114
        portal_type = get_portal_type(obj)
1115
        return get_catalogs_for(portal_type)
1116
1117
    catalogs = []
1118
1119
    if not is_string(brain_or_object):
1120
        raise APIError("Expected a portal_type string, got <%s>"
1121
                       % type(brain_or_object))
1122
1123
    # at this point the brain_or_object is a portal_type
1124
    portal_type = brain_or_object
1125
1126
    # check static portal_type -> catalog mapping first
1127
    from senaite.core.catalog import get_catalogs_by_type
1128
    catalogs = get_catalogs_by_type(portal_type)
1129
1130
    # no catalogs in static mapping
1131
    # => Lookup catalogs by FTI
1132
    if len(catalogs) == 0:
1133
        fti = get_fti(portal_type)
1134
        if fti.product:
1135
            # AT content type
1136
            # => Looup via archetype_tool
1137
            archetype_tool = get_tool("archetype_tool")
1138
            catalogs = archetype_tool.catalog_map.get(portal_type)
1139
        else:
1140
            # DX content type
1141
            # => resolve the `_catalogs` attribute from the class
1142
            klass = resolveDottedName(fti.klass)
1143
            # XXX: Refactor multi-catalog behavior to not rely
1144
            #      on this hidden `_catalogs` attribute!
1145
            catalogs = getattr(klass, "_catalogs", [])
1146
1147
    # fetch the catalog objects
1148
    catalogs = filter(None, map(lambda cid: get_tool(cid, None), catalogs))
1149
1150
    if len(catalogs) == 0:
1151
        return [get_tool(default, default=PORTAL_CATALOG)]
1152
1153
    return list(catalogs)
1154
1155
1156
def get_transitions_for(brain_or_object):
1157
    """List available workflow transitions for all workflows
1158
1159
    :param brain_or_object: A single catalog brain or content object
1160
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1161
    :returns: All possible available and allowed transitions
1162
    :rtype: list[dict]
1163
    """
1164
    workflow = get_tool('portal_workflow')
1165
    transitions = []
1166
    instance = get_object(brain_or_object)
1167
    for wfid in get_workflows_for(brain_or_object):
1168
        wf = workflow[wfid]
1169
        tlist = wf.getTransitionsFor(instance)
1170
        transitions.extend([t for t in tlist if t not in transitions])
1171
    return transitions
1172
1173
1174
def do_transition_for(brain_or_object, transition):
1175
    """Performs a workflow transition for the passed in object.
1176
1177
    :param brain_or_object: A single catalog brain or content object
1178
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1179
    :returns: The object where the transtion was performed
1180
    """
1181
    if not isinstance(transition, six.string_types):
1182
        fail("Transition type needs to be string, got '%s'" % type(transition))
1183
    obj = get_object(brain_or_object)
1184
    try:
1185
        ploneapi.content.transition(obj, transition)
1186
    except ploneapi.exc.InvalidParameterError as e:
1187
        fail("Failed to perform transition '{}' on {}: {}".format(
1188
             transition, obj, str(e)))
1189
    return obj
1190
1191
1192
def get_roles_for_permission(permission, brain_or_object):
1193
    """Get a list of granted roles for the given permission on the object.
1194
1195
    :param brain_or_object: A single catalog brain or content object
1196
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1197
    :returns: Roles for the given Permission
1198
    :rtype: list
1199
    """
1200
    obj = get_object(brain_or_object)
1201
    allowed = set(rolesForPermissionOn(permission, obj))
1202
    return sorted(allowed)
1203
1204
1205
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1206
    """Checks if the passed in object is versionable.
1207
1208
    :param brain_or_object: A single catalog brain or content object
1209
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1210
    :returns: True if the object is versionable
1211
    :rtype: bool
1212
    """
1213
    pr = get_tool("portal_repository")
1214
    obj = get_object(brain_or_object)
1215
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1216
        and pr.isVersionable(obj)
1217
1218
1219
def get_version(brain_or_object):
1220
    """Get the version of the current object
1221
1222
    :param brain_or_object: A single catalog brain or content object
1223
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1224
    :returns: The current version of the object, or None if not available
1225
    :rtype: int or None
1226
    """
1227
    obj = get_object(brain_or_object)
1228
    if not is_versionable(obj):
1229
        return None
1230
    return getattr(aq_base(obj), "version_id", 0)
1231
1232
1233
def get_view(name, context=None, request=None, default=None):
1234
    """Get the view by name
1235
1236
    :param name: The name of the view
1237
    :type name: str
1238
    :param context: The context to query the view
1239
    :type context: ATContentType/DexterityContentType/CatalogBrain
1240
    :param request: The request to query the view
1241
    :type request: HTTPRequest object
1242
    :returns: HTTP Request
1243
    :rtype: Products.Five.metaclass View object
1244
    """
1245
    context = context or get_portal()
1246
    request = request or get_request() or None
1247
    view = queryMultiAdapter((get_object(context), request), name=name)
1248
    if view is None:
1249
        return default
1250
    return view
1251
1252
1253
def get_request():
1254
    """Get the global request object
1255
1256
    :returns: HTTP Request
1257
    :rtype: HTTPRequest object
1258
    """
1259
    return globalrequest.getRequest()
1260
1261
1262
def get_test_request():
1263
    """Get the TestRequest object
1264
    """
1265
    request = TestRequest()
1266
    directlyProvides(request, IAttributeAnnotatable)
1267
    return request
1268
1269
1270
def get_group(group_or_groupname):
1271
    """Return Plone Group
1272
1273
    :param group_or_groupname: Plone group or the name of the group
1274
    :type groupname:  GroupData/str
1275
    :returns: Plone GroupData
1276
    """
1277
    if not group_or_groupname:
1278
1279
        return None
1280
    if hasattr(group_or_groupname, "_getGroup"):
1281
        return group_or_groupname
1282
    gtool = get_tool("portal_groups")
1283
    return gtool.getGroupById(group_or_groupname)
1284
1285
1286
def get_user(user_or_username):
1287
    """Return Plone User
1288
1289
    :param user_or_username: Plone user or user id
1290
    :returns: Plone MemberData
1291
    """
1292
    user = None
1293
    if isinstance(user_or_username, MemberData):
1294
        user = user_or_username
1295
    if isinstance(user_or_username, six.string_types):
1296
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1297
    return user
1298
1299
1300
def get_user_properties(user_or_username):
1301
    """Return User Properties
1302
1303
    :param user_or_username: Plone group identifier
1304
    :returns: Plone MemberData
1305
    """
1306
    user = get_user(user_or_username)
1307
    if user is None:
1308
        return {}
1309
    if not callable(user.getUser):
1310
        return {}
1311
    out = {}
1312
    plone_user = user.getUser()
1313
    for sheet in plone_user.listPropertysheets():
1314
        ps = plone_user.getPropertysheet(sheet)
1315
        out.update(dict(ps.propertyItems()))
1316
    return out
1317
1318
1319
def get_users_by_roles(roles=None):
1320
    """Search Plone users by their roles
1321
1322
    :param roles: Plone role name or list of roles
1323
    :type roles:  list/str
1324
    :returns: List of Plone users having the role(s)
1325
    """
1326
    if not isinstance(roles, (tuple, list)):
1327
        roles = [roles]
1328
    mtool = get_tool("portal_membership")
1329
    return mtool.searchForMembers(roles=roles)
1330
1331
1332
def get_current_user():
1333
    """Returns the current logged in user
1334
1335
    :returns: Current User
1336
    """
1337
    return ploneapi.user.get_current()
1338
1339
1340
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1341
    """Returns the associated contact of a Plone user
1342
1343
    If the user passed in has no contact associated, return None.
1344
    The `contact_types` parameter filter the portal types for the search.
1345
1346
    :param: Plone user
1347
    :contact_types: List with the contact portal types to search
1348
    :returns: Contact associated to the Plone user or None
1349
    """
1350
    if not user:
1351
        return None
1352
1353
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1354
    query = {"portal_type": contact_types, "getUsername": user.id}
1355
    brains = search(query, catalog=CONTACT_CATALOG)
1356
    if not brains:
1357
        return None
1358
1359
    if len(brains) > 1:
1360
        # Oops, the user has multiple contacts assigned, return None
1361
        contacts = map(lambda c: c.Title, brains)
1362
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1363
        err_msg = err_msg.format(user.id, ','.join(contacts))
1364
        logger.error(err_msg)
1365
        return None
1366
1367
    return get_object(brains[0])
1368
1369
1370
def get_user_client(user_or_contact):
1371
    """Returns the client of the contact of a Plone user
1372
1373
    If the user passed in has no contact or does not belong to any client,
1374
    returns None.
1375
1376
    :param: Plone user or contact
1377
    :returns: Client the contact of the Plone user belongs to
1378
    """
1379
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1380
        # Lab contacts cannot belong to a client
1381
        return None
1382
1383
    if not IContact.providedBy(user_or_contact):
1384
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1385
        if IContact.providedBy(contact):
1386
            return get_user_client(contact)
1387
        return None
1388
1389
    client = get_parent(user_or_contact)
1390
    if client and IClient.providedBy(client):
1391
        return client
1392
1393
    return None
1394
1395
1396
def get_current_client():
1397
    """Returns the current client the current logged in user belongs to, if any
1398
1399
    :returns: Client the current logged in user belongs to or None
1400
    """
1401
    return get_user_client(get_current_user())
1402
1403
1404
def get_cache_key(brain_or_object):
1405
    """Generate a cache key for a common brain or object
1406
1407
    :param brain_or_object: A single catalog brain or content object
1408
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1409
    :returns: Cache Key
1410
    :rtype: str
1411
    """
1412
    key = [
1413
        get_portal_type(brain_or_object),
1414
        get_id(brain_or_object),
1415
        get_uid(brain_or_object),
1416
        # handle different domains gracefully
1417
        get_url(brain_or_object),
1418
        # Return the microsecond since the epoch in GMT
1419
        get_modification_date(brain_or_object).micros(),
1420
    ]
1421
    return "-".join(map(lambda x: str(x), key))
1422
1423
1424
def bika_cache_key_decorator(method, self, brain_or_object):
1425
    """Bika cache key decorator usable for
1426
1427
    :param brain_or_object: A single catalog brain or content object
1428
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1429
    :returns: Cache Key
1430
    :rtype: str
1431
    """
1432
    if brain_or_object is None:
1433
        raise DontCache
1434
    return get_cache_key(brain_or_object)
1435
1436
1437
def normalize_id(string):
1438
    """Normalize the id
1439
1440
    :param string: A string to normalize
1441
    :type string: str
1442
    :returns: Normalized ID
1443
    :rtype: str
1444
    """
1445
    if not isinstance(string, six.string_types):
1446
        fail("Type of argument must be string, found '{}'"
1447
             .format(type(string)))
1448
    # get the id nomalizer utility
1449
    normalizer = getUtility(IIDNormalizer).normalize
1450
    return normalizer(string)
1451
1452
1453
def normalize_filename(string):
1454
    """Normalize the filename
1455
1456
    :param string: A string to normalize
1457
    :type string: str
1458
    :returns: Normalized ID
1459
    :rtype: str
1460
    """
1461
    if not isinstance(string, six.string_types):
1462
        fail("Type of argument must be string, found '{}'"
1463
             .format(type(string)))
1464
    # get the file nomalizer utility
1465
    normalizer = getUtility(IFileNameNormalizer).normalize
1466
    return normalizer(string)
1467
1468
1469
def is_uid(uid, validate=False):
1470
    """Checks if the passed in uid is a valid UID
1471
1472
    :param uid: The uid to check
1473
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1474
    True, also verifies if a brain exists for the uid passed in
1475
    :type uid: string
1476
    :return: True if a valid uid
1477
    :rtype: bool
1478
    """
1479
    if not isinstance(uid, six.string_types):
1480
        return False
1481
    if uid == '0':
1482
        return True
1483
    if len(uid) != 32:
1484
        return False
1485
    if not UID_RX.match(uid):
1486
        return False
1487
    if not validate:
1488
        return True
1489
1490
    # Check if a brain for this uid exists
1491
    uc = get_tool('uid_catalog')
1492
    brains = uc(UID=uid)
1493
    if brains:
1494
        assert (len(brains) == 1)
1495
    return len(brains) > 0
1496
1497
1498
def is_date(date):
1499
    """Checks if the passed in value is a valid Zope's DateTime
1500
1501
    :param date: The date to check
1502
    :type date: DateTime
1503
    :return: True if a valid date
1504
    :rtype: bool
1505
    """
1506
    if not date:
1507
        return False
1508
    return isinstance(date, (DateTime, datetime))
1509
1510
1511
def to_date(value, default=None):
1512
    """Tries to convert the passed in value to Zope's DateTime
1513
1514
    :param value: The value to be converted to a valid DateTime
1515
    :type value: str, DateTime or datetime
1516
    :return: The DateTime representation of the value passed in or default
1517
    """
1518
1519
    # cannot use bika.lims.deprecated (circular dependencies)
1520
    import warnings
1521
    warnings.simplefilter("always", DeprecationWarning)
1522
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1523
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1524
    warnings.simplefilter("default", DeprecationWarning)
1525
1526
    # prevent circular dependencies
1527
    from senaite.core.api.dtime import to_DT
1528
    date = to_DT(value)
1529
    if not date:
1530
        return to_DT(default)
1531
    return date
1532
1533
1534
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1535
               round_to_int=True):
1536
    """Returns the computed total number of minutes
1537
    """
1538
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1539
        float(seconds)/60 + float(milliseconds)/1000/60
1540
    return int(round(total)) if round_to_int else total
1541
1542
1543
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1544
    """Returns a representation of time in a string in xd yh zm format
1545
    """
1546
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1547
                         seconds=seconds, milliseconds=milliseconds)
1548
    delta = timedelta(minutes=int(round(minutes)))
1549
    d = delta.days
1550
    h = delta.seconds // 3600
1551
    m = (delta.seconds // 60) % 60
1552
    m = m and "{}m ".format(str(m)) or ""
1553
    d = d and "{}d ".format(str(d)) or ""
1554
    if m and d:
1555
        h = "{}h ".format(str(h))
1556
    else:
1557
        h = h and "{}h ".format(str(h)) or ""
1558
    return "".join([d, h, m]).strip()
1559
1560
1561
def to_int(value, default=_marker):
1562
    """Tries to convert the value to int.
1563
    Truncates at the decimal point if the value is a float
1564
1565
    :param value: The value to be converted to an int
1566
    :return: The resulting int or default
1567
    """
1568
    if is_floatable(value):
1569
        value = to_float(value)
1570
    try:
1571
        return int(value)
1572
    except (TypeError, ValueError):
1573
        if default is None:
1574
            return default
1575
        if default is not _marker:
1576
            return to_int(default)
1577
        fail("Value %s cannot be converted to int" % repr(value))
1578
1579
1580
def is_floatable(value):
1581
    """Checks if the passed in value is a valid floatable number
1582
1583
    :param value: The value to be evaluated as a float number
1584
    :type value: str, float, int
1585
    :returns: True if is a valid float number
1586
    :rtype: bool"""
1587
    try:
1588
        float(value)
1589
        return True
1590
    except (TypeError, ValueError):
1591
        return False
1592
1593
1594
def to_float(value, default=_marker):
1595
    """Converts the passed in value to a float number
1596
1597
    :param value: The value to be converted to a floatable number
1598
    :type value: str, float, int
1599
    :returns: The float number representation of the passed in value
1600
    :rtype: float
1601
    """
1602
    if not is_floatable(value):
1603
        if default is not _marker:
1604
            return to_float(default)
1605
        fail("Value %s is not floatable" % repr(value))
1606
    return float(value)
1607
1608
1609
def float_to_string(value, default=_marker):
1610
    """Convert a float value to string without exponential notation
1611
1612
    This function preserves the whole fraction
1613
1614
    :param value: The float value to be converted to a string
1615
    :type value: str, float, int
1616
    :returns: String representation of the float w/o exponential notation
1617
    :rtype: str
1618
    """
1619
    if not is_floatable(value):
1620
        if default is not _marker:
1621
            return default
1622
        fail("Value %s is not floatable" % repr(value))
1623
1624
    # Leave floatable string values unchanged
1625
    if isinstance(value, six.string_types):
1626
        return value
1627
1628
    value = float(value)
1629
    str_value = str(value)
1630
1631
    if "." in str_value:
1632
        # might be something like 1.23e-26
1633
        front, back = str_value.split(".")
1634
    else:
1635
        # or 1e-07 for 0.0000001
1636
        back = str_value
1637
1638
    if "e-" in back:
1639
        fraction, zeros = back.split("e-")
1640
        # we want to cover the faction and the zeros
1641
        precision = len(fraction) + int(zeros)
1642
        template = "{:.%df}" % precision
1643
        str_value = template.format(value)
1644
    elif "e+" in back:
1645
        # positive numbers, e.g. 1e+16 don't need a fractional part
1646
        str_value = "{:.0f}".format(value)
1647
1648
    # cut off trailing zeros
1649
    if "." in str_value:
1650
        str_value = str_value.rstrip("0").rstrip(".")
1651
1652
    return str_value
1653
1654
1655
def to_searchable_text_metadata(value):
1656
    """Parse the given metadata value to searchable text
1657
1658
    :param value: The raw value of the metadata column
1659
    :returns: Searchable and translated unicode value or None
1660
    """
1661
    if not value:
1662
        return u""
1663
    if value is Missing.Value:
1664
        return u""
1665
    if is_uid(value):
1666
        return u""
1667
    if isinstance(value, (bool)):
1668
        return u""
1669
    if isinstance(value, (list, tuple)):
1670
        values = map(to_searchable_text_metadata, value)
1671
        values = filter(None, values)
1672
        return " ".join(values)
1673
    if isinstance(value, dict):
1674
        return to_searchable_text_metadata(value.values())
1675
    if is_date(value):
1676
        from senaite.core.api.dtime import date_to_string
1677
        return date_to_string(value, "%Y-%m-%d")
1678
    if is_at_content(value):
1679
        return to_searchable_text_metadata(get_title(value))
1680
    if not isinstance(value, six.string_types):
1681
        value = str(value)
1682
    return safe_unicode(value)
1683
1684
1685
def get_registry_record(name, default=None):
1686
    """Returns the value of a registry record
1687
1688
    :param name: [required] name of the registry record
1689
    :type name: str
1690
    :param default: The value returned if the record is not found
1691
    :type default: anything
1692
    :returns: value of the registry record
1693
    """
1694
    return ploneapi.portal.get_registry_record(name, default=default)
1695
1696
1697
def to_display_list(pairs, sort_by="key", allow_empty=True):
1698
    """Create a Plone DisplayList from list items
1699
1700
    :param pairs: list of key, value pairs
1701
    :param sort_by: Sort the items either by key or value
1702
    :param allow_empty: Allow to select an empty value
1703
    :returns: Plone DisplayList
1704
    """
1705
    dl = DisplayList()
1706
1707
    if isinstance(pairs, six.string_types):
1708
        pairs = [pairs, pairs]
1709
    for pair in pairs:
1710
        # pairs is a list of lists -> add each pair
1711
        if isinstance(pair, (tuple, list)):
1712
            dl.add(*pair)
1713
        # pairs is just a single pair -> add it and stop
1714
        if isinstance(pair, six.string_types):
1715
            dl.add(*pairs)
1716
            break
1717
1718
    # add the empty option
1719
    if allow_empty:
1720
        dl.add("", "")
1721
1722
    # sort by key/value
1723
    if sort_by == "key":
1724
        dl = dl.sortedByKey()
1725
    elif sort_by == "value":
1726
        dl = dl.sortedByValue()
1727
1728
    return dl
1729
1730
1731
def text_to_html(text, wrap="p", encoding="utf8"):
1732
    """Convert `\n` sequences in the text to HTML `\n`
1733
1734
    :param text: Plain text to convert
1735
    :param wrap: Toggle to wrap the text in a
1736
    :returns: HTML converted and encoded text
1737
    """
1738
    if not text:
1739
        return ""
1740
    # handle text internally as unicode
1741
    text = safe_unicode(text)
1742
    # replace newline characters with HTML entities
1743
    html = text.replace("\n", "<br/>")
1744
    if wrap:
1745
        html = u"<{tag}>{html}</{tag}>".format(
1746
            tag=wrap, html=html)
1747
    # return encoded html
1748
    return html.encode(encoding)
1749
1750
1751
def to_utf8(string, default=_marker):
1752
    """Encode string to UTF8
1753
1754
    :param string: String to be encoded to UTF8
1755
    :returns: UTF8 encoded string
1756
    """
1757
    if not isinstance(string, six.string_types):
1758
        if default is _marker:
1759
            fail("Expected string type, got '%s'" % type(string))
1760
        return default
1761
    return safe_unicode(string).encode("utf8")
1762
1763
1764
def is_temporary(obj):
1765
    """Returns whether the given object is temporary or not
1766
1767
    :param obj: the object to evaluate
1768
    :returns: True if the object is temporary
1769
    """
1770
    if ITemporaryObject.providedBy(obj):
1771
        return True
1772
1773
    obj_id = getattr(aq_base(obj), "id", None)
1774
    if obj_id is None or UID_RX.match(obj_id):
1775
        return True
1776
1777
    parent = aq_parent(aq_inner(obj))
1778
    if not parent:
1779
        return True
1780
1781
    parent_id = getattr(aq_base(parent), "id", None)
1782
    if parent_id is None or UID_RX.match(parent_id):
1783
        return True
1784
1785
    # Checks to see if we are created inside the portal_factory.
1786
    # This might also happen for DX types in senaite.databox!
1787
    meta_type = getattr(aq_base(parent), "meta_type", "")
1788
    if meta_type == "TempFolder":
1789
        return True
1790
1791
    return False
1792
1793
1794
def mark_temporary(brain_or_object):
1795
    """Mark the object as temporary
1796
    """
1797
    obj = get_object(brain_or_object)
1798
    alsoProvides(obj, ITemporaryObject)
1799
1800
1801
def unmark_temporary(brain_or_object):
1802
    """Unmark the object as temporary
1803
    """
1804
    obj = get_object(brain_or_object)
1805
    noLongerProvides(obj, ITemporaryObject)
1806
1807
1808
def is_string(thing):
1809
    """Checks if the passed in object is a string type
1810
1811
    :param thing: object to test
1812
    :returns: True if the object is a string
1813
    """
1814
    return isinstance(thing, six.string_types)
1815
1816
1817
def parse_json(thing, default=""):
1818
    """Parse from JSON
1819
1820
    :param thing: thing to parse
1821
    :param default: value to return if cannot parse
1822
    :returns: the object representing the JSON or default
1823
    """
1824
    try:
1825
        return json.loads(thing)
1826
    except (TypeError, ValueError):
1827
        return default
1828
1829
1830
def to_list(value):
1831
    """Converts the value to a list
1832
1833
    :param value: the value to be represented as a list
1834
    :returns: a list that represents or contains the value
1835
    """
1836
    if is_string(value):
1837
        val = parse_json(value)
1838
        if isinstance(val, (list, tuple, set)):
1839
            value = val
1840
    if not isinstance(value, (list, tuple, set)):
1841
        value = [value]
1842
    return list(value)
1843