Passed
Push — 2.x ( b7a2dd...83455a )
by Ramon
07:43
created

bika.lims.api.parse_json()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from DateTime.interfaces import DateTimeError
41
from plone import api as ploneapi
42
from plone.api.exc import InvalidParameterError
43
from plone.app.layout.viewlets.content import ContentHistoryView
44
from plone.behavior.interfaces import IBehaviorAssignable
45
from plone.dexterity.interfaces import IDexterityContent
46
from plone.dexterity.schema import SchemaInvalidatedEvent
47
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
48
from plone.i18n.normalizer.interfaces import IIDNormalizer
49
from plone.memoize.volatile import DontCache
50
from Products.Archetypes.atapi import DisplayList
51
from Products.Archetypes.BaseObject import BaseObject
52
from Products.Archetypes.event import ObjectInitializedEvent
53
from Products.Archetypes.utils import mapply
54
from Products.CMFCore.interfaces import IFolderish
55
from Products.CMFCore.interfaces import ISiteRoot
56
from Products.CMFCore.permissions import ModifyPortalContent
57
from Products.CMFCore.permissions import View
58
from Products.CMFCore.utils import getToolByName
59
from Products.CMFCore.WorkflowCore import WorkflowException
60
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
61
from Products.CMFPlone.utils import _createObjectByType
62
from Products.CMFPlone.utils import base_hasattr
63
from Products.CMFPlone.utils import safe_unicode
64
from Products.PlonePAS.tools.memberdata import MemberData
65
from Products.ZCatalog.interfaces import ICatalogBrain
66
from senaite.core.interfaces import ITemporaryObject
67
from zope import globalrequest
68
from zope.annotation.interfaces import IAttributeAnnotatable
69
from zope.component import getUtility
70
from zope.component import queryMultiAdapter
71
from zope.component.interfaces import IFactory
72
from zope.event import notify
73
from zope.interface import alsoProvides
74
from zope.interface import directlyProvides
75
from zope.interface import noLongerProvides
76
from zope.lifecycleevent import ObjectCreatedEvent
77
from zope.publisher.browser import TestRequest
78
from zope.schema import getFieldsInOrder
79
from zope.security.interfaces import Unauthorized
80
81
"""SENAITE LIMS Framework API
82
83
Please see bika.lims/docs/API.rst for documentation.
84
85
Architecural Notes:
86
87
Please add only functions that do a single thing for a single object.
88
89
Good: `def get_foo(brain_or_object)`
90
Bad:  `def get_foos(list_of_brain_objects)`
91
92
Why?
93
94
Because it makes things more complex. You can always use a pattern like this to
95
achieve the same::
96
97
    >>> foos = map(get_foo, list_of_brain_objects)
98
99
Please add for all of your functions a descriptive test in docs/API.rst.
100
101
Thanks.
102
"""
103
104
_marker = object()
105
106
UID_RX = re.compile("[a-z0-9]{32}$")
107
108
109
class APIError(Exception):
110
    """Base exception class for bika.lims errors."""
111
112
113
def get_portal():
114
    """Get the portal object
115
116
    :returns: Portal object
117
    """
118
    return ploneapi.portal.getSite()
119
120
121
def get_setup():
122
    """Fetch the `bika_setup` folder.
123
    """
124
    portal = get_portal()
125
    return portal.get("bika_setup")
126
127
128
def get_bika_setup():
129
    """Fetch the `bika_setup` folder.
130
    """
131
    return get_setup()
132
133
134
def get_senaite_setup():
135
    """Fetch the new DX `setup` folder.
136
    """
137
    portal = get_portal()
138
    return portal.get("setup")
139
140
141
def create(container, portal_type, *args, **kwargs):
142
    """Creates an object in Bika LIMS
143
144
    This code uses most of the parts from the TypesTool
145
    see: `Products.CMFCore.TypesTool._constructInstance`
146
147
    :param container: container
148
    :type container: ATContentType/DexterityContentType/CatalogBrain
149
    :param portal_type: The portal type to create, e.g. "Client"
150
    :type portal_type: string
151
    :param title: The title for the new content object
152
    :type title: string
153
    :returns: The new created object
154
    """
155
    from bika.lims.utils import tmpID
156
157
    id = kwargs.pop("id", tmpID())
158
    title = kwargs.pop("title", "New {}".format(portal_type))
159
160
    # get the fti
161
    types_tool = get_tool("portal_types")
162
    fti = types_tool.getTypeInfo(portal_type)
163
164
    if fti.product:
165
        # create the AT object
166
        obj = _createObjectByType(portal_type, container, id)
167
        # update the object with values
168
        edit(obj, check_permissions=False, title=title, **kwargs)
169
        # auto-id if required
170
        if obj._at_rename_after_creation:
171
            obj._renameAfterCreation(check_auto_id=True)
172
        # we are no longer under creation
173
        obj.unmarkCreationFlag()
174
        # notify that the object was created
175
        notify(ObjectInitializedEvent(obj))
176
    else:
177
        # newstyle factory
178
        factory = getUtility(IFactory, fti.factory)
179
        obj = factory(id, *args, **kwargs)
180
        if hasattr(obj, '_setPortalTypeName'):
181
            obj._setPortalTypeName(fti.getId())
182
        # set the title
183
        obj.title = safe_unicode(title)
184
        # notify that the object was created
185
        notify(ObjectCreatedEvent(obj))
186
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
187
        # ContainerModifiedEvent
188
        container._setObject(id, obj)
189
        # we get the object here with the current object id, as it might be
190
        # renamed already by an event handler
191
        obj = container._getOb(obj.getId())
192
193
    return obj
194
195
196
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
197
    """Creates a copy of the source object. If container is None, creates the
198
    copy inside the same container as the source. If portal_type is specified,
199
    creates a new object of this type, and copies the values from source fields
200
    to the destination object. Field values sent as kwargs have priority over
201
    the field values from source.
202
203
    :param source: object from which create a copy
204
    :type source: ATContentType/DexterityContentType/CatalogBrain
205
    :param container: destination container
206
    :type container: ATContentType/DexterityContentType/CatalogBrain
207
    :param portal_type: destination portal type
208
    :returns: The new created object
209
    """
210
    # Prevent circular dependencies
211
    from security import check_permission
212
    # Use same container as source unless explicitly set
213
    source = get_object(source)
214
    if not container:
215
        container = get_parent(source)
216
217
    # Use same portal type as source unless explicitly set
218
    if not portal_type:
219
        portal_type = get_portal_type(source)
220
221
    # Extend the fields to skip with defaults
222
    skip = kwargs.pop("skip", [])
223
    skip = set(skip)
224
    skip.update([
225
        "Products.Archetypes.Field.ComputedField",
226
        "UID",
227
        "id",
228
        "allowDiscussion",
229
        "contributors",
230
        "creation_date",
231
        "creators",
232
        "effectiveDate",
233
        "expirationDate",
234
        "language",
235
        "location",
236
        "modification_date",
237
        "rights",
238
        "subject",
239
    ])
240
    # Build a dict for complexity reduction
241
    skip = dict([(item, True) for item in skip])
242
243
    # Update kwargs with the field values to copy from source
244
    fields = get_fields(source)
245
    for field_name, field in fields.items():
246
        # Prioritize field values passed as kwargs
247
        if field_name in kwargs:
248
            continue
249
        # Skip framework internal fields by name
250
        if skip.get(field_name, False):
251
            continue
252
        # Skip fields of non-suitable types
253
        if hasattr(field, "getType") and skip.get(field.getType(), False):
254
            continue
255
        # Skip readonly fields
256
        if getattr(field, "readonly", False):
257
            continue
258
        # Skip non-readable fields
259
        perm = getattr(field, "read_permission", View)
260
        if perm and not check_permission(perm, source):
261
            continue
262
263
        # do not wake-up objects unnecessarily
264
        if hasattr(field, "getRaw"):
265
            field_value = field.getRaw(source)
266
        elif hasattr(field, "get_raw"):
267
            field_value = field.get_raw(source)
268
        elif hasattr(field, "getAccessor"):
269
            accessor = field.getAccessor(source)
270
            field_value = accessor()
271
        else:
272
            field_value = field.get(source)
273
274
        # Do a hard copy of value if mutable type
275
        if isinstance(field_value, (list, dict, set)):
276
            field_value = copy.deepcopy(field_value)
277
        kwargs.update({field_name: field_value})
278
279
    # Create a copy
280
    return create(container, portal_type, *args, **kwargs)
281
282
283
def edit(obj, check_permissions=True, **kwargs):
284
    """Updates the values of object fields with the new values passed-in
285
    """
286
    # Prevent circular dependencies
287
    from security import check_permission
288
    fields = get_fields(obj)
289
    for name, value in kwargs.items():
290
        field = fields.get(name, None)
291
        if not field:
292
            continue
293
294
        # cannot update readonly fields
295
        readonly = getattr(field, "readonly", False)
296
        if readonly:
297
            raise ValueError("Field '{}' is readonly".format(name))
298
299
        # check field writable permission
300
        if check_permissions:
301
            perm = getattr(field, "write_permission", ModifyPortalContent)
302
            if perm and not check_permission(perm, obj):
303
                raise Unauthorized("Field '{}' is not writeable".format(name))
304
305
        # Set the value
306
        if hasattr(field, "getMutator"):
307
            mutator = field.getMutator(obj)
308
            mapply(mutator, value)
309
        else:
310
            field.set(obj, value)
311
312
313
def get_tool(name, context=None, default=_marker):
314
    """Get a portal tool by name
315
316
    :param name: The name of the tool, e.g. `portal_catalog`
317
    :type name: string
318
    :param context: A portal object
319
    :type context: ATContentType/DexterityContentType/CatalogBrain
320
    :returns: Portal Tool
321
    """
322
323
    # Try first with the context
324
    if context is not None:
325
        try:
326
            context = get_object(context)
327
            return getToolByName(context, name)
328
        except (APIError, AttributeError) as e:
329
            # https://github.com/senaite/bika.lims/issues/396
330
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
331
                        "-> falling back to plone.api.portal.get_tool('{}')"
332
                        .format(repr(context), name, repr(e), name))
333
            return get_tool(name, default=default)
334
335
    # Try with the plone api
336
    try:
337
        return ploneapi.portal.get_tool(name)
338
    except InvalidParameterError:
339
        if default is not _marker:
340
            if isinstance(default, six.string_types):
341
                return get_tool(default)
342
            return default
343
        fail("No tool named '%s' found." % name)
344
345
346
def fail(msg=None):
347
    """API LIMS Error
348
    """
349
    if msg is None:
350
        msg = "Reason not given."
351
    raise APIError("{}".format(msg))
352
353
354
def is_object(brain_or_object):
355
    """Check if the passed in object is a supported portal content object
356
357
    :param brain_or_object: A single catalog brain or content object
358
    :type brain_or_object: Portal Object
359
    :returns: True if the passed in object is a valid portal content
360
    """
361
    if is_portal(brain_or_object):
362
        return True
363
    if is_supermodel(brain_or_object):
364
        return True
365
    if is_at_content(brain_or_object):
366
        return True
367
    if is_dexterity_content(brain_or_object):
368
        return True
369
    if is_brain(brain_or_object):
370
        return True
371
    return False
372
373
374
def get_object(brain_object_uid, default=_marker):
375
    """Get the full content object
376
377
    :param brain_object_uid: A catalog brain or content object or uid
378
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
379
    /CatalogBrain/basestring
380
    :returns: The full object
381
    """
382
    if is_uid(brain_object_uid):
383
        return get_object_by_uid(brain_object_uid)
384
    elif is_supermodel(brain_object_uid):
385
        return brain_object_uid.instance
386
    if not is_object(brain_object_uid):
387
        if default is _marker:
388
            fail("{} is not supported.".format(repr(brain_object_uid)))
389
        return default
390
    if is_brain(brain_object_uid):
391
        return brain_object_uid.getObject()
392
    return brain_object_uid
393
394
395
def is_portal(brain_or_object):
396
    """Checks if the passed in object is the portal root object
397
398
    :param brain_or_object: A single catalog brain or content object
399
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
400
    :returns: True if the object is the portal root object
401
    :rtype: bool
402
    """
403
    return ISiteRoot.providedBy(brain_or_object)
404
405
406
def is_brain(brain_or_object):
407
    """Checks if the passed in object is a portal catalog brain
408
409
    :param brain_or_object: A single catalog brain or content object
410
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
411
    :returns: True if the object is a catalog brain
412
    :rtype: bool
413
    """
414
    return ICatalogBrain.providedBy(brain_or_object)
415
416
417
def is_supermodel(brain_or_object):
418
    """Checks if the passed in object is a supermodel
419
420
    :param brain_or_object: A single catalog brain or content object
421
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
422
    :returns: True if the object is a catalog brain
423
    :rtype: bool
424
    """
425
    # avoid circular imports
426
    from senaite.app.supermodel.interfaces import ISuperModel
427
    return ISuperModel.providedBy(brain_or_object)
428
429
430
def is_dexterity_content(brain_or_object):
431
    """Checks if the passed in object is a dexterity content type
432
433
    :param brain_or_object: A single catalog brain or content object
434
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
435
    :returns: True if the object is a dexterity content type
436
    :rtype: bool
437
    """
438
    return IDexterityContent.providedBy(brain_or_object)
439
440
441
def is_at_content(brain_or_object):
442
    """Checks if the passed in object is an AT content type
443
444
    :param brain_or_object: A single catalog brain or content object
445
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
446
    :returns: True if the object is an AT content type
447
    :rtype: bool
448
    """
449
    return isinstance(brain_or_object, BaseObject)
450
451
452
def is_dx_type(portal_type):
453
    """Checks if the portal type is DX based
454
455
    :param portal_type: The portal type name to check
456
    :returns: True if the portal type is DX based
457
    """
458
    portal_types = get_tool("portal_types")
459
    fti = portal_types.getTypeInfo(portal_type)
460
    if fti.product:
461
        return False
462
    return True
463
464
465
def is_at_type(portal_type):
466
    """Checks if the portal type is AT based
467
468
    :param portal_type: The portal type name to check
469
    :returns: True if the portal type is AT based
470
    """
471
    return not is_dx_type(portal_type)
472
473
474
def is_folderish(brain_or_object):
475
    """Checks if the passed in object is folderish
476
477
    :param brain_or_object: A single catalog brain or content object
478
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
479
    :returns: True if the object is folderish
480
    :rtype: bool
481
    """
482
    if hasattr(brain_or_object, "is_folderish"):
483
        if callable(brain_or_object.is_folderish):
484
            return brain_or_object.is_folderish()
485
        return brain_or_object.is_folderish
486
    return IFolderish.providedBy(get_object(brain_or_object))
487
488
489
def get_portal_type(brain_or_object):
490
    """Get the portal type for this object
491
492
    :param brain_or_object: A single catalog brain or content object
493
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
494
    :returns: Portal type
495
    :rtype: string
496
    """
497
    if not is_object(brain_or_object):
498
        fail("{} is not supported.".format(repr(brain_or_object)))
499
    return brain_or_object.portal_type
500
501
502
def get_schema(brain_or_object):
503
    """Get the schema of the content
504
505
    :param brain_or_object: A single catalog brain or content object
506
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
507
    :returns: Schema object
508
    """
509
    obj = get_object(brain_or_object)
510
    if is_portal(obj):
511
        fail("get_schema can't return schema of portal root")
512
    if is_dexterity_content(obj):
513
        pt = get_tool("portal_types")
514
        fti = pt.getTypeInfo(obj.portal_type)
515
        return fti.lookupSchema()
516
    if is_at_content(obj):
517
        return obj.Schema()
518
    fail("{} has no Schema.".format(brain_or_object))
519
520
521
def get_behaviors(portal_type):
522
    """List all behaviors
523
524
    :param portal_type: DX portal type name
525
    """
526
    portal_types = get_tool("portal_types")
527
    fti = portal_types.getTypeInfo(portal_type)
528
    if fti.product:
529
        raise TypeError("Expected DX type, got AT type instead.")
530
    return fti.behaviors
531
532
533
def enable_behavior(portal_type, behavior_id):
534
    """Enable behavior
535
536
    :param portal_type: DX portal type name
537
    :param behavior_id: The behavior to enable
538
    """
539
    portal_types = get_tool("portal_types")
540
    fti = portal_types.getTypeInfo(portal_type)
541
    if fti.product:
542
        raise TypeError("Expected DX type, got AT type instead.")
543
544
    if behavior_id not in fti.behaviors:
545
        fti.behaviors += (behavior_id, )
546
        # invalidate schema cache
547
        notify(SchemaInvalidatedEvent(portal_type))
548
549
550
def disable_behavior(portal_type, behavior_id):
551
    """Disable behavior
552
553
    :param portal_type: DX portal type name
554
    :param behavior_id: The behavior to disable
555
    """
556
    portal_types = get_tool("portal_types")
557
    fti = portal_types.getTypeInfo(portal_type)
558
    if fti.product:
559
        raise TypeError("Expected DX type, got AT type instead.")
560
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
561
    # invalidate schema cache
562
    notify(SchemaInvalidatedEvent(portal_type))
563
564
565
def get_fields(brain_or_object):
566
    """Get a name to field mapping of the object
567
568
    :param brain_or_object: A single catalog brain or content object
569
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
570
    :returns: Mapping of name -> field
571
    :rtype: OrderedDict
572
    """
573
    obj = get_object(brain_or_object)
574
    schema = get_schema(obj)
575
    if is_dexterity_content(obj):
576
        # get the fields directly provided by the interface
577
        fields = getFieldsInOrder(schema)
578
        # append the fields coming from behaviors
579
        behavior_assignable = IBehaviorAssignable(obj)
580
        if behavior_assignable:
581
            behaviors = behavior_assignable.enumerateBehaviors()
582
            for behavior in behaviors:
583
                fields.extend(getFieldsInOrder(behavior.interface))
584
        return OrderedDict(fields)
585
    return OrderedDict(schema)
586
587
588
def get_id(brain_or_object):
589
    """Get the Plone ID for this object
590
591
    :param brain_or_object: A single catalog brain or content object
592
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
593
    :returns: Plone ID
594
    :rtype: string
595
    """
596
    if is_brain(brain_or_object):
597
        if base_hasattr(brain_or_object, "getId"):
598
            return brain_or_object.getId
599
        if base_hasattr(brain_or_object, "id"):
600
            return brain_or_object.id
601
    return get_object(brain_or_object).getId()
602
603
604
def get_title(brain_or_object):
605
    """Get the Title for this object
606
607
    :param brain_or_object: A single catalog brain or content object
608
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
609
    :returns: Title
610
    :rtype: string
611
    """
612
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
613
        return brain_or_object.Title
614
    return get_object(brain_or_object).Title()
615
616
617
def get_description(brain_or_object):
618
    """Get the Title for this object
619
620
    :param brain_or_object: A single catalog brain or content object
621
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
622
    :returns: Title
623
    :rtype: string
624
    """
625
    if is_brain(brain_or_object) \
626
            and base_hasattr(brain_or_object, "Description"):
627
        return brain_or_object.Description
628
    return get_object(brain_or_object).Description()
629
630
631
def get_uid(brain_or_object):
632
    """Get the Plone UID for this object
633
634
    :param brain_or_object: A single catalog brain or content object or an UID
635
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
636
    :returns: Plone UID
637
    :rtype: string
638
    """
639
    if is_uid(brain_or_object):
640
        return brain_or_object
641
    if is_portal(brain_or_object):
642
        return '0'
643
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
644
        return brain_or_object.UID
645
    return get_object(brain_or_object).UID()
646
647
648
def get_url(brain_or_object):
649
    """Get the absolute URL for this object
650
651
    :param brain_or_object: A single catalog brain or content object
652
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
653
    :returns: Absolute URL
654
    :rtype: string
655
    """
656
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
657
        return brain_or_object.getURL()
658
    return get_object(brain_or_object).absolute_url()
659
660
661
def get_icon(brain_or_object, html_tag=True):
662
    """Get the icon of the content object
663
664
    :param brain_or_object: A single catalog brain or content object
665
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
666
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
667
    :type html_tag: bool
668
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
669
    :rtype: string
670
    """
671
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
672
    # work for Contents coming from other catalogs than the
673
    # `portal_catalog`
674
    portal_types = get_tool("portal_types")
675
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
676
    icon = fti.getIcon()
677
    if not icon:
678
        return ""
679
    url = "%s/%s" % (get_url(get_portal()), icon)
680
    if not html_tag:
681
        return url
682
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
683
        url=url, title=get_title(brain_or_object))
684
    return tag
685
686
687
def get_object_by_uid(uid, default=_marker):
688
    """Find an object by a given UID
689
690
    :param uid: The UID of the object to find
691
    :type uid: string
692
    :returns: Found Object or None
693
    """
694
695
    # nothing to do here
696
    if not uid:
697
        if default is not _marker:
698
            return default
699
        fail("get_object_by_uid requires UID as first argument; got {} instead"
700
             .format(uid))
701
702
    # we defined the portal object UID to be '0'::
703
    if uid == '0':
704
        return get_portal()
705
706
    brain = get_brain_by_uid(uid)
707
708
    if brain is None:
709
        if default is not _marker:
710
            return default
711
        fail("No object found for UID {}".format(uid))
712
713
    return get_object(brain)
714
715
716
def get_brain_by_uid(uid, default=None):
717
    """Query a brain by a given UID
718
719
    :param uid: The UID of the object to find
720
    :type uid: string
721
    :returns: ZCatalog brain or None
722
    """
723
    if not is_uid(uid):
724
        return default
725
726
    # we try to find the object with the UID catalog
727
    uc = get_tool("uid_catalog")
728
729
    # try to find the object with the reference catalog first
730
    brains = uc(UID=uid)
731
    if len(brains) != 1:
732
        return default
733
    return brains[0]
734
735
736
def get_object_by_path(path, default=_marker):
737
    """Find an object by a given physical path or absolute_url
738
739
    :param path: The physical path of the object to find
740
    :type path: string
741
    :returns: Found Object or None
742
    """
743
744
    # nothing to do here
745
    if not path:
746
        if default is not _marker:
747
            return default
748
        fail("get_object_by_path first argument must be a path; {} received"
749
             .format(path))
750
751
    portal = get_portal()
752
    portal_path = get_path(portal)
753
    portal_url = get_url(portal)
754
755
    # ensure we have a physical path
756
    if path.startswith(portal_url):
757
        request = get_request()
758
        path = "/".join(request.physicalPathFromURL(path))
759
760
    if not path.startswith(portal_path):
761
        if default is not _marker:
762
            return default
763
        fail("Not a physical path inside the portal.")
764
765
    if path == portal_path:
766
        return portal
767
768
    return portal.restrictedTraverse(path, default)
769
770
771
def get_path(brain_or_object):
772
    """Calculate the physical path of this object
773
774
    :param brain_or_object: A single catalog brain or content object
775
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
776
    :returns: Physical path of the object
777
    :rtype: string
778
    """
779
    if is_brain(brain_or_object):
780
        return brain_or_object.getPath()
781
    return "/".join(get_object(brain_or_object).getPhysicalPath())
782
783
784
def get_parent_path(brain_or_object):
785
    """Calculate the physical parent path of this object
786
787
    :param brain_or_object: A single catalog brain or content object
788
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
789
    :returns: Physical path of the parent object
790
    :rtype: string
791
    """
792
    if is_portal(brain_or_object):
793
        return get_path(get_portal())
794
    if is_brain(brain_or_object):
795
        path = get_path(brain_or_object)
796
        return path.rpartition("/")[0]
797
    return get_path(get_object(brain_or_object).aq_parent)
798
799
800
def get_parent(brain_or_object, catalog_search=False):
801
    """Locate the parent object of the content/catalog brain
802
803
    The `catalog_search` switch uses the `portal_catalog` to do a search return
804
    a brain instead of the full parent object. However, if the search returned
805
    no results, it falls back to return the full parent object.
806
807
    :param brain_or_object: A single catalog brain or content object
808
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
809
    :param catalog_search: Use a catalog query to find the parent object
810
    :type catalog_search: bool
811
    :returns: parent object
812
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
813
    """
814
815
    if is_portal(brain_or_object):
816
        return get_portal()
817
818
    # Do a catalog search and return the brain
819
    if catalog_search:
820
        parent_path = get_parent_path(brain_or_object)
821
822
        # parent is the portal object
823
        if parent_path == get_path(get_portal()):
824
            return get_portal()
825
826
        # get the catalog tool
827
        pc = get_portal_catalog()
828
829
        # query for the parent path
830
        results = pc(path={
831
            "query": parent_path,
832
            "depth": 0})
833
834
        # No results fallback: return the parent object
835
        if not results:
836
            return get_object(brain_or_object).aq_parent
837
838
        # return the brain
839
        return results[0]
840
841
    return get_object(brain_or_object).aq_parent
842
843
844
def search(query, catalog=_marker):
845
    """Search for objects.
846
847
    :param query: A suitable search query.
848
    :type query: dict
849
    :param catalog: A single catalog id or a list of catalog ids
850
    :type catalog: str/list
851
    :returns: Search results
852
    :rtype: List of ZCatalog brains
853
    """
854
855
    # query needs to be a dictionary
856
    if not isinstance(query, dict):
857
        fail("Catalog query needs to be a dictionary")
858
859
    # Portal types to query
860
    portal_types = query.get("portal_type", [])
861
    # We want the portal_type as a list
862
    if not isinstance(portal_types, (tuple, list)):
863
        portal_types = [portal_types]
864
865
    # The catalogs used for the query
866
    catalogs = []
867
868
    # The user did **not** specify a catalog
869
    if catalog is _marker:
870
        # Find the registered catalogs for the queried portal types
871
        for portal_type in portal_types:
872
            # Just get the first registered/default catalog
873
            catalogs.append(get_catalogs_for(
874
                portal_type, default="portal_catalog")[0])
875
    else:
876
        # User defined catalogs
877
        if isinstance(catalog, (list, tuple)):
878
            catalogs.extend(map(get_tool, catalog))
879
        else:
880
            catalogs.append(get_tool(catalog))
881
882
    # Cleanup: Avoid duplicate catalogs
883
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
884
885
    # We only support **single** catalog queries
886
    if len(catalogs) > 1:
887
        fail("Multi Catalog Queries are not supported!")
888
889
    return catalogs[0](query)
890
891
892
def safe_getattr(brain_or_object, attr, default=_marker):
893
    """Return the attribute value
894
895
    :param brain_or_object: A single catalog brain or content object
896
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
897
    :param attr: Attribute name
898
    :type attr: str
899
    :returns: Attribute value
900
    :rtype: obj
901
    """
902
    try:
903
        value = getattr(brain_or_object, attr, _marker)
904
        if value is _marker:
905
            if default is not _marker:
906
                return default
907
            fail("Attribute '{}' not found.".format(attr))
908
        if callable(value):
909
            return value()
910
        return value
911
    except Unauthorized:
912
        if default is not _marker:
913
            return default
914
        fail("You are not authorized to access '{}' of '{}'.".format(
915
            attr, repr(brain_or_object)))
916
917
918
def get_portal_catalog():
919
    """Get the portal catalog tool
920
921
    :returns: Portal Catalog Tool
922
    """
923
    return get_tool("portal_catalog")
924
925
926
def get_review_history(brain_or_object, rev=True):
927
    """Get the review history for the given brain or context.
928
929
    :param brain_or_object: A single catalog brain or content object
930
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
931
    :returns: Workflow history
932
    :rtype: [{}, ...]
933
    """
934
    obj = get_object(brain_or_object)
935
    review_history = []
936
    try:
937
        workflow = get_tool("portal_workflow")
938
        review_history = workflow.getInfoFor(obj, 'review_history')
939
    except WorkflowException as e:
940
        message = str(e)
941
        logger.error("Cannot retrieve review_history on {}: {}".format(
942
            obj, message))
943
    if not isinstance(review_history, (list, tuple)):
944
        logger.error("get_review_history: expected list, received {}".format(
945
            review_history))
946
        review_history = []
947
948
    if isinstance(review_history, tuple):
949
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
950
        # tuple when "review_history" is passed in:
951
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
952
        #
953
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
954
        # Expression.StateChangeInfo, that always returns a list, except when no
955
        # review_history is found:
956
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
957
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
958
        review_history = list(review_history)
959
960
    if rev is True:
961
        review_history.reverse()
962
    return review_history
963
964
965
def get_revision_history(brain_or_object):
966
    """Get the revision history for the given brain or context.
967
968
    :param brain_or_object: A single catalog brain or content object
969
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
970
    :returns: Workflow history
971
    :rtype: obj
972
    """
973
    obj = get_object(brain_or_object)
974
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
975
    return chv.fullHistory()
976
977
978
def get_workflows_for(brain_or_object):
979
    """Get the assigned workflows for the given brain or context.
980
981
    Note: This function supports also the portal_type as parameter.
982
983
    :param brain_or_object: A single catalog brain or content object
984
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
985
    :returns: Assigned Workflows
986
    :rtype: tuple
987
    """
988
    workflow = ploneapi.portal.get_tool("portal_workflow")
989
    if isinstance(brain_or_object, six.string_types):
990
        return workflow.getChainFor(brain_or_object)
991
    obj = get_object(brain_or_object)
992
    return workflow.getChainFor(obj)
993
994
995
def get_workflow_status_of(brain_or_object, state_var="review_state"):
996
    """Get the current workflow status of the given brain or context.
997
998
    :param brain_or_object: A single catalog brain or content object
999
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1000
    :param state_var: The name of the state variable
1001
    :type state_var: string
1002
    :returns: Status
1003
    :rtype: str
1004
    """
1005
    # Try to get the state from the catalog brain first
1006
    if is_brain(brain_or_object):
1007
        if state_var in brain_or_object.schema():
1008
            return brain_or_object[state_var]
1009
1010
    # Retrieve the sate from the object
1011
    workflow = get_tool("portal_workflow")
1012
    obj = get_object(brain_or_object)
1013
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1014
1015
1016
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1017
    """Get the previous workflow status of the object
1018
1019
    :param brain_or_object: A single catalog brain or content object
1020
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1021
    :param skip: Workflow states to skip
1022
    :type skip: tuple/list
1023
    :returns: status
1024
    :rtype: str
1025
    """
1026
1027
    skip = isinstance(skip, (list, tuple)) and skip or []
1028
    history = get_review_history(brain_or_object)
1029
1030
    # Remove consecutive duplicates, some transitions might happen more than
1031
    # once consecutively (e.g. publish)
1032
    history = map(lambda i: i[0], groupby(history))
1033
1034
    for num, item in enumerate(history):
1035
        # skip the current history entry
1036
        if num == 0:
1037
            continue
1038
        status = item.get("review_state")
1039
        if status in skip:
1040
            continue
1041
        return status
1042
    return default
1043
1044
1045
def get_creation_date(brain_or_object):
1046
    """Get the creation date of the brain or object
1047
1048
    :param brain_or_object: A single catalog brain or content object
1049
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1050
    :returns: Creation date
1051
    :rtype: DateTime
1052
    """
1053
    created = getattr(brain_or_object, "created", None)
1054
    if created is None:
1055
        fail("Object {} has no creation date ".format(
1056
             repr(brain_or_object)))
1057
    if callable(created):
1058
        return created()
1059
    return created
1060
1061
1062
def get_modification_date(brain_or_object):
1063
    """Get the modification date of the brain or object
1064
1065
    :param brain_or_object: A single catalog brain or content object
1066
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1067
    :returns: Modification date
1068
    :rtype: DateTime
1069
    """
1070
    modified = getattr(brain_or_object, "modified", None)
1071
    if modified is None:
1072
        fail("Object {} has no modification date ".format(
1073
             repr(brain_or_object)))
1074
    if callable(modified):
1075
        return modified()
1076
    return modified
1077
1078
1079
def get_review_status(brain_or_object):
1080
    """Get the `review_state` of an object
1081
1082
    :param brain_or_object: A single catalog brain or content object
1083
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1084
    :returns: Value of the review_status variable
1085
    :rtype: String
1086
    """
1087
    if is_brain(brain_or_object):
1088
        return brain_or_object.review_state
1089
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1090
1091
1092
def is_active(brain_or_object):
1093
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1094
1095
    :param brain_or_object: A single catalog brain or content object
1096
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1097
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1098
    :rtype: bool
1099
    """
1100
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1101
        return False
1102
    return True
1103
1104
1105
def get_catalogs_for(brain_or_object, default="portal_catalog"):
1106
    """Get all registered catalogs for the given portal_type, catalog brain or
1107
    content object
1108
1109
    :param brain_or_object: The portal_type, a catalog brain or content object
1110
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1111
    :returns: List of supported catalogs
1112
    :rtype: list
1113
    """
1114
    archetype_tool = get_tool("archetype_tool", default=None)
1115
    if archetype_tool is None:
1116
        # return the default catalog
1117
        return [get_tool(default, default="portal_catalog")]
1118
1119
    catalogs = []
1120
1121
    # get the registered catalogs for portal_type
1122
    if is_object(brain_or_object):
1123
        catalogs = archetype_tool.getCatalogsByType(
1124
            get_portal_type(brain_or_object))
1125
    if isinstance(brain_or_object, six.string_types):
1126
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
1127
1128
    if not catalogs:
1129
        return [get_tool(default)]
1130
    return catalogs
1131
1132
1133
def get_transitions_for(brain_or_object):
1134
    """List available workflow transitions for all workflows
1135
1136
    :param brain_or_object: A single catalog brain or content object
1137
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1138
    :returns: All possible available and allowed transitions
1139
    :rtype: list[dict]
1140
    """
1141
    workflow = get_tool('portal_workflow')
1142
    transitions = []
1143
    instance = get_object(brain_or_object)
1144
    for wfid in get_workflows_for(brain_or_object):
1145
        wf = workflow[wfid]
1146
        tlist = wf.getTransitionsFor(instance)
1147
        transitions.extend([t for t in tlist if t not in transitions])
1148
    return transitions
1149
1150
1151
def do_transition_for(brain_or_object, transition):
1152
    """Performs a workflow transition for the passed in object.
1153
1154
    :param brain_or_object: A single catalog brain or content object
1155
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1156
    :returns: The object where the transtion was performed
1157
    """
1158
    if not isinstance(transition, six.string_types):
1159
        fail("Transition type needs to be string, got '%s'" % type(transition))
1160
    obj = get_object(brain_or_object)
1161
    try:
1162
        ploneapi.content.transition(obj, transition)
1163
    except ploneapi.exc.InvalidParameterError as e:
1164
        fail("Failed to perform transition '{}' on {}: {}".format(
1165
             transition, obj, str(e)))
1166
    return obj
1167
1168
1169
def get_roles_for_permission(permission, brain_or_object):
1170
    """Get a list of granted roles for the given permission on the object.
1171
1172
    :param brain_or_object: A single catalog brain or content object
1173
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1174
    :returns: Roles for the given Permission
1175
    :rtype: list
1176
    """
1177
    obj = get_object(brain_or_object)
1178
    allowed = set(rolesForPermissionOn(permission, obj))
1179
    return sorted(allowed)
1180
1181
1182
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1183
    """Checks if the passed in object is versionable.
1184
1185
    :param brain_or_object: A single catalog brain or content object
1186
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1187
    :returns: True if the object is versionable
1188
    :rtype: bool
1189
    """
1190
    pr = get_tool("portal_repository")
1191
    obj = get_object(brain_or_object)
1192
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1193
        and pr.isVersionable(obj)
1194
1195
1196
def get_version(brain_or_object):
1197
    """Get the version of the current object
1198
1199
    :param brain_or_object: A single catalog brain or content object
1200
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1201
    :returns: The current version of the object, or None if not available
1202
    :rtype: int or None
1203
    """
1204
    obj = get_object(brain_or_object)
1205
    if not is_versionable(obj):
1206
        return None
1207
    return getattr(aq_base(obj), "version_id", 0)
1208
1209
1210
def get_view(name, context=None, request=None, default=None):
1211
    """Get the view by name
1212
1213
    :param name: The name of the view
1214
    :type name: str
1215
    :param context: The context to query the view
1216
    :type context: ATContentType/DexterityContentType/CatalogBrain
1217
    :param request: The request to query the view
1218
    :type request: HTTPRequest object
1219
    :returns: HTTP Request
1220
    :rtype: Products.Five.metaclass View object
1221
    """
1222
    context = context or get_portal()
1223
    request = request or get_request() or None
1224
    view = queryMultiAdapter((get_object(context), request), name=name)
1225
    if view is None:
1226
        return default
1227
    return view
1228
1229
1230
def get_request():
1231
    """Get the global request object
1232
1233
    :returns: HTTP Request
1234
    :rtype: HTTPRequest object
1235
    """
1236
    return globalrequest.getRequest()
1237
1238
1239
def get_test_request():
1240
    """Get the TestRequest object
1241
    """
1242
    request = TestRequest()
1243
    directlyProvides(request, IAttributeAnnotatable)
1244
    return request
1245
1246
1247
def get_group(group_or_groupname):
1248
    """Return Plone Group
1249
1250
    :param group_or_groupname: Plone group or the name of the group
1251
    :type groupname:  GroupData/str
1252
    :returns: Plone GroupData
1253
    """
1254
    if not group_or_groupname:
1255
1256
        return None
1257
    if hasattr(group_or_groupname, "_getGroup"):
1258
        return group_or_groupname
1259
    gtool = get_tool("portal_groups")
1260
    return gtool.getGroupById(group_or_groupname)
1261
1262
1263
def get_user(user_or_username):
1264
    """Return Plone User
1265
1266
    :param user_or_username: Plone user or user id
1267
    :returns: Plone MemberData
1268
    """
1269
    user = None
1270
    if isinstance(user_or_username, MemberData):
1271
        user = user_or_username
1272
    if isinstance(user_or_username, six.string_types):
1273
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1274
    return user
1275
1276
1277
def get_user_properties(user_or_username):
1278
    """Return User Properties
1279
1280
    :param user_or_username: Plone group identifier
1281
    :returns: Plone MemberData
1282
    """
1283
    user = get_user(user_or_username)
1284
    if user is None:
1285
        return {}
1286
    if not callable(user.getUser):
1287
        return {}
1288
    out = {}
1289
    plone_user = user.getUser()
1290
    for sheet in plone_user.listPropertysheets():
1291
        ps = plone_user.getPropertysheet(sheet)
1292
        out.update(dict(ps.propertyItems()))
1293
    return out
1294
1295
1296
def get_users_by_roles(roles=None):
1297
    """Search Plone users by their roles
1298
1299
    :param roles: Plone role name or list of roles
1300
    :type roles:  list/str
1301
    :returns: List of Plone users having the role(s)
1302
    """
1303
    if not isinstance(roles, (tuple, list)):
1304
        roles = [roles]
1305
    mtool = get_tool("portal_membership")
1306
    return mtool.searchForMembers(roles=roles)
1307
1308
1309
def get_current_user():
1310
    """Returns the current logged in user
1311
1312
    :returns: Current User
1313
    """
1314
    return ploneapi.user.get_current()
1315
1316
1317
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1318
    """Returns the associated contact of a Plone user
1319
1320
    If the user passed in has no contact associated, return None.
1321
    The `contact_types` parameter filter the portal types for the search.
1322
1323
    :param: Plone user
1324
    :contact_types: List with the contact portal types to search
1325
    :returns: Contact associated to the Plone user or None
1326
    """
1327
    if not user:
1328
        return None
1329
1330
    query = {'portal_type': contact_types, 'getUsername': user.id}
1331
    brains = search(query, catalog='portal_catalog')
1332
    if not brains:
1333
        return None
1334
1335
    if len(brains) > 1:
1336
        # Oops, the user has multiple contacts assigned, return None
1337
        contacts = map(lambda c: c.Title, brains)
1338
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1339
        err_msg = err_msg.format(user.id, ','.join(contacts))
1340
        logger.error(err_msg)
1341
        return None
1342
1343
    return get_object(brains[0])
1344
1345
1346
def get_user_client(user_or_contact):
1347
    """Returns the client of the contact of a Plone user
1348
1349
    If the user passed in has no contact or does not belong to any client,
1350
    returns None.
1351
1352
    :param: Plone user or contact
1353
    :returns: Client the contact of the Plone user belongs to
1354
    """
1355
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1356
        # Lab contacts cannot belong to a client
1357
        return None
1358
1359
    if not IContact.providedBy(user_or_contact):
1360
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1361
        if IContact.providedBy(contact):
1362
            return get_user_client(contact)
1363
        return None
1364
1365
    client = get_parent(user_or_contact)
1366
    if client and IClient.providedBy(client):
1367
        return client
1368
1369
    return None
1370
1371
1372
def get_current_client():
1373
    """Returns the current client the current logged in user belongs to, if any
1374
1375
    :returns: Client the current logged in user belongs to or None
1376
    """
1377
    return get_user_client(get_current_user())
1378
1379
1380
def get_cache_key(brain_or_object):
1381
    """Generate a cache key for a common brain or object
1382
1383
    :param brain_or_object: A single catalog brain or content object
1384
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1385
    :returns: Cache Key
1386
    :rtype: str
1387
    """
1388
    key = [
1389
        get_portal_type(brain_or_object),
1390
        get_id(brain_or_object),
1391
        get_uid(brain_or_object),
1392
        # handle different domains gracefully
1393
        get_url(brain_or_object),
1394
        # Return the microsecond since the epoch in GMT
1395
        get_modification_date(brain_or_object).micros(),
1396
    ]
1397
    return "-".join(map(lambda x: str(x), key))
1398
1399
1400
def bika_cache_key_decorator(method, self, brain_or_object):
1401
    """Bika cache key decorator usable for
1402
1403
    :param brain_or_object: A single catalog brain or content object
1404
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1405
    :returns: Cache Key
1406
    :rtype: str
1407
    """
1408
    if brain_or_object is None:
1409
        raise DontCache
1410
    return get_cache_key(brain_or_object)
1411
1412
1413
def normalize_id(string):
1414
    """Normalize the id
1415
1416
    :param string: A string to normalize
1417
    :type string: str
1418
    :returns: Normalized ID
1419
    :rtype: str
1420
    """
1421
    if not isinstance(string, six.string_types):
1422
        fail("Type of argument must be string, found '{}'"
1423
             .format(type(string)))
1424
    # get the id nomalizer utility
1425
    normalizer = getUtility(IIDNormalizer).normalize
1426
    return normalizer(string)
1427
1428
1429
def normalize_filename(string):
1430
    """Normalize the filename
1431
1432
    :param string: A string to normalize
1433
    :type string: str
1434
    :returns: Normalized ID
1435
    :rtype: str
1436
    """
1437
    if not isinstance(string, six.string_types):
1438
        fail("Type of argument must be string, found '{}'"
1439
             .format(type(string)))
1440
    # get the file nomalizer utility
1441
    normalizer = getUtility(IFileNameNormalizer).normalize
1442
    return normalizer(string)
1443
1444
1445
def is_uid(uid, validate=False):
1446
    """Checks if the passed in uid is a valid UID
1447
1448
    :param uid: The uid to check
1449
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1450
    True, also verifies if a brain exists for the uid passed in
1451
    :type uid: string
1452
    :return: True if a valid uid
1453
    :rtype: bool
1454
    """
1455
    if not isinstance(uid, six.string_types):
1456
        return False
1457
    if uid == '0':
1458
        return True
1459
    if len(uid) != 32:
1460
        return False
1461
    if not UID_RX.match(uid):
1462
        return False
1463
    if not validate:
1464
        return True
1465
1466
    # Check if a brain for this uid exists
1467
    uc = get_tool('uid_catalog')
1468
    brains = uc(UID=uid)
1469
    if brains:
1470
        assert (len(brains) == 1)
1471
    return len(brains) > 0
1472
1473
1474
def is_date(date):
1475
    """Checks if the passed in value is a valid Zope's DateTime
1476
1477
    :param date: The date to check
1478
    :type date: DateTime
1479
    :return: True if a valid date
1480
    :rtype: bool
1481
    """
1482
    if not date:
1483
        return False
1484
    return isinstance(date, (DateTime, datetime))
1485
1486
1487
def to_date(value, default=None):
1488
    """Tries to convert the passed in value to Zope's DateTime
1489
1490
    :param value: The value to be converted to a valid DateTime
1491
    :type value: str, DateTime or datetime
1492
    :return: The DateTime representation of the value passed in or default
1493
    """
1494
    if isinstance(value, DateTime):
1495
        return value
1496
    if not value:
1497
        if default is None:
1498
            return None
1499
        return to_date(default)
1500
    try:
1501
        if isinstance(value, str) and '.' in value:
1502
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1503
            return DateTime(value, datefmt='international')
1504
        return DateTime(value)
1505
    except (TypeError, ValueError, DateTimeError):
1506
        return to_date(default)
1507
1508
1509
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1510
               round_to_int=True):
1511
    """Returns the computed total number of minutes
1512
    """
1513
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1514
        float(seconds)/60 + float(milliseconds)/1000/60
1515
    return int(round(total)) if round_to_int else total
1516
1517
1518
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1519
    """Returns a representation of time in a string in xd yh zm format
1520
    """
1521
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1522
                         seconds=seconds, milliseconds=milliseconds)
1523
    delta = timedelta(minutes=int(round(minutes)))
1524
    d = delta.days
1525
    h = delta.seconds // 3600
1526
    m = (delta.seconds // 60) % 60
1527
    m = m and "{}m ".format(str(m)) or ""
1528
    d = d and "{}d ".format(str(d)) or ""
1529
    if m and d:
1530
        h = "{}h ".format(str(h))
1531
    else:
1532
        h = h and "{}h ".format(str(h)) or ""
1533
    return "".join([d, h, m]).strip()
1534
1535
1536
def to_int(value, default=_marker):
1537
    """Tries to convert the value to int.
1538
    Truncates at the decimal point if the value is a float
1539
1540
    :param value: The value to be converted to an int
1541
    :return: The resulting int or default
1542
    """
1543
    if is_floatable(value):
1544
        value = to_float(value)
1545
    try:
1546
        return int(value)
1547
    except (TypeError, ValueError):
1548
        if default is None:
1549
            return default
1550
        if default is not _marker:
1551
            return to_int(default)
1552
        fail("Value %s cannot be converted to int" % repr(value))
1553
1554
1555
def is_floatable(value):
1556
    """Checks if the passed in value is a valid floatable number
1557
1558
    :param value: The value to be evaluated as a float number
1559
    :type value: str, float, int
1560
    :returns: True if is a valid float number
1561
    :rtype: bool"""
1562
    try:
1563
        float(value)
1564
        return True
1565
    except (TypeError, ValueError):
1566
        return False
1567
1568
1569
def to_float(value, default=_marker):
1570
    """Converts the passed in value to a float number
1571
1572
    :param value: The value to be converted to a floatable number
1573
    :type value: str, float, int
1574
    :returns: The float number representation of the passed in value
1575
    :rtype: float
1576
    """
1577
    if not is_floatable(value):
1578
        if default is not _marker:
1579
            return to_float(default)
1580
        fail("Value %s is not floatable" % repr(value))
1581
    return float(value)
1582
1583
1584
def float_to_string(value, default=_marker):
1585
    """Convert a float value to string without exponential notation
1586
1587
    This function preserves the whole fraction
1588
1589
    :param value: The float value to be converted to a string
1590
    :type value: str, float, int
1591
    :returns: String representation of the float w/o exponential notation
1592
    :rtype: str
1593
    """
1594
    if not is_floatable(value):
1595
        if default is not _marker:
1596
            return default
1597
        fail("Value %s is not floatable" % repr(value))
1598
1599
    # Leave floatable string values unchanged
1600
    if isinstance(value, six.string_types):
1601
        return value
1602
1603
    value = float(value)
1604
    str_value = str(value)
1605
1606
    if "." in str_value:
1607
        # might be something like 1.23e-26
1608
        front, back = str_value.split(".")
1609
    else:
1610
        # or 1e-07 for 0.0000001
1611
        back = str_value
1612
1613
    if "e-" in back:
1614
        fraction, zeros = back.split("e-")
1615
        # we want to cover the faction and the zeros
1616
        precision = len(fraction) + int(zeros)
1617
        template = "{:.%df}" % precision
1618
        str_value = template.format(value)
1619
    elif "e+" in back:
1620
        # positive numbers, e.g. 1e+16 don't need a fractional part
1621
        str_value = "{:.0f}".format(value)
1622
1623
    # cut off trailing zeros
1624
    if "." in str_value:
1625
        str_value = str_value.rstrip("0").rstrip(".")
1626
1627
    return str_value
1628
1629
1630
def to_searchable_text_metadata(value):
1631
    """Parse the given metadata value to searchable text
1632
1633
    :param value: The raw value of the metadata column
1634
    :returns: Searchable and translated unicode value or None
1635
    """
1636
    if not value:
1637
        return u""
1638
    if value is Missing.Value:
1639
        return u""
1640
    if is_uid(value):
1641
        return u""
1642
    if isinstance(value, (bool)):
1643
        return u""
1644
    if isinstance(value, (list, tuple)):
1645
        values = map(to_searchable_text_metadata, value)
1646
        values = filter(None, values)
1647
        return " ".join(values)
1648
    if isinstance(value, dict):
1649
        return to_searchable_text_metadata(value.values())
1650
    if is_date(value):
1651
        from senaite.core.api.dtime import date_to_string
1652
        return date_to_string(value, "%Y-%m-%d")
1653
    if is_at_content(value):
1654
        return to_searchable_text_metadata(get_title(value))
1655
    if not isinstance(value, six.string_types):
1656
        value = str(value)
1657
    return safe_unicode(value)
1658
1659
1660
def get_registry_record(name, default=None):
1661
    """Returns the value of a registry record
1662
1663
    :param name: [required] name of the registry record
1664
    :type name: str
1665
    :param default: The value returned if the record is not found
1666
    :type default: anything
1667
    :returns: value of the registry record
1668
    """
1669
    return ploneapi.portal.get_registry_record(name, default=default)
1670
1671
1672
def to_display_list(pairs, sort_by="key", allow_empty=True):
1673
    """Create a Plone DisplayList from list items
1674
1675
    :param pairs: list of key, value pairs
1676
    :param sort_by: Sort the items either by key or value
1677
    :param allow_empty: Allow to select an empty value
1678
    :returns: Plone DisplayList
1679
    """
1680
    dl = DisplayList()
1681
1682
    if isinstance(pairs, six.string_types):
1683
        pairs = [pairs, pairs]
1684
    for pair in pairs:
1685
        # pairs is a list of lists -> add each pair
1686
        if isinstance(pair, (tuple, list)):
1687
            dl.add(*pair)
1688
        # pairs is just a single pair -> add it and stop
1689
        if isinstance(pair, six.string_types):
1690
            dl.add(*pairs)
1691
            break
1692
1693
    # add the empty option
1694
    if allow_empty:
1695
        dl.add("", "")
1696
1697
    # sort by key/value
1698
    if sort_by == "key":
1699
        dl = dl.sortedByKey()
1700
    elif sort_by == "value":
1701
        dl = dl.sortedByValue()
1702
1703
    return dl
1704
1705
1706
def text_to_html(text, wrap="p", encoding="utf8"):
1707
    """Convert `\n` sequences in the text to HTML `\n`
1708
1709
    :param text: Plain text to convert
1710
    :param wrap: Toggle to wrap the text in a
1711
    :returns: HTML converted and encoded text
1712
    """
1713
    if not text:
1714
        return ""
1715
    # handle text internally as unicode
1716
    text = safe_unicode(text)
1717
    # replace newline characters with HTML entities
1718
    html = text.replace("\n", "<br/>")
1719
    if wrap:
1720
        html = u"<{tag}>{html}</{tag}>".format(
1721
            tag=wrap, html=html)
1722
    # return encoded html
1723
    return html.encode(encoding)
1724
1725
1726
def to_utf8(string, default=_marker):
1727
    """Encode string to UTF8
1728
1729
    :param string: String to be encoded to UTF8
1730
    :returns: UTF8 encoded string
1731
    """
1732
    if not isinstance(string, six.string_types):
1733
        if default is _marker:
1734
            fail("Expected string type, got '%s'" % type(string))
1735
        return default
1736
    return safe_unicode(string).encode("utf8")
1737
1738
1739
def is_temporary(obj):
1740
    """Returns whether the given object is temporary or not
1741
1742
    :param obj: the object to evaluate
1743
    :returns: True if the object is temporary
1744
    """
1745
    if ITemporaryObject.providedBy(obj):
1746
        return True
1747
1748
    obj_id = getattr(aq_base(obj), "id", None)
1749
    if obj_id is None or UID_RX.match(obj_id):
1750
        return True
1751
1752
    parent = aq_parent(aq_inner(obj))
1753
    if not parent:
1754
        return True
1755
1756
    parent_id = getattr(aq_base(parent), "id", None)
1757
    if parent_id is None or UID_RX.match(parent_id):
1758
        return True
1759
1760
    if is_at_content(obj):
1761
1762
        # Checks to see if we are created inside the portal_factory. We don't
1763
        # rely here on AT's isFactoryContained because the function is patched
1764
        meta_type = getattr(aq_base(parent), "meta_type", "")
1765
        return meta_type == "TempFolder"
1766
1767
    return False
1768
1769
1770
def mark_temporary(brain_or_object):
1771
    """Mark the object as temporary
1772
    """
1773
    obj = get_object(brain_or_object)
1774
    alsoProvides(obj, ITemporaryObject)
1775
1776
1777
def unmark_temporary(brain_or_object):
1778
    """Unmark the object as temporary
1779
    """
1780
    obj = get_object(brain_or_object)
1781
    noLongerProvides(obj, ITemporaryObject)
1782
1783
1784
def is_string(thing):
1785
    """Checks if the passed in object is a string type
1786
1787
    :param thing: object to test
1788
    :returns: True if the object is a string
1789
    """
1790
    return isinstance(thing, six.string_types)
1791
1792
1793
def parse_json(thing, default=""):
1794
    """Parse from JSON
1795
1796
    :param thing: thing to parse
1797
    :param default: value to return if cannot parse
1798
    :returns: the object representing the JSON or default
1799
    """
1800
    try:
1801
        return json.loads(thing)
1802
    except (TypeError, ValueError):
1803
        return default
1804
1805
1806
def to_list(value):
1807
    """Converts the value to a list
1808
1809
    :param value: the value to be represented as a list
1810
    :returns: a list that represents or contains the value
1811
    """
1812
    if is_string(value):
1813
        val = parse_json(value)
1814
        if isinstance(val, (list, tuple, set)):
1815
            value = val
1816
    if not isinstance(value, (list, tuple, set)):
1817
        value = [value]
1818
    return list(value)
1819