Passed
Push — 2.x ( bb8554...7f0140 )
by Jordi
06:37
created

bika.lims.api.mark_temporary()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import re
23
from collections import OrderedDict
24
from datetime import datetime
25
from datetime import timedelta
26
from itertools import groupby
27
28
import Missing
29
import six
30
from AccessControl.PermissionRole import rolesForPermissionOn
31
from Acquisition import aq_base
32
from Acquisition import aq_inner
33
from Acquisition import aq_parent
34
from bika.lims import logger
35
from bika.lims.interfaces import IClient
36
from bika.lims.interfaces import IContact
37
from bika.lims.interfaces import ILabContact
38
from DateTime import DateTime
39
from DateTime.interfaces import DateTimeError
40
from plone import api as ploneapi
41
from plone.api.exc import InvalidParameterError
42
from plone.app.layout.viewlets.content import ContentHistoryView
43
from plone.behavior.interfaces import IBehaviorAssignable
44
from plone.dexterity.interfaces import IDexterityContent
45
from plone.dexterity.schema import SchemaInvalidatedEvent
46
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
47
from plone.i18n.normalizer.interfaces import IIDNormalizer
48
from plone.memoize.volatile import DontCache
49
from Products.Archetypes.atapi import DisplayList
50
from Products.Archetypes.BaseObject import BaseObject
51
from Products.Archetypes.event import ObjectInitializedEvent
52
from Products.Archetypes.utils import mapply
53
from Products.CMFCore.interfaces import IFolderish
54
from Products.CMFCore.interfaces import ISiteRoot
55
from Products.CMFCore.permissions import ModifyPortalContent
56
from Products.CMFCore.permissions import View
57
from Products.CMFCore.utils import getToolByName
58
from Products.CMFCore.WorkflowCore import WorkflowException
59
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
60
from Products.CMFPlone.utils import _createObjectByType
61
from Products.CMFPlone.utils import base_hasattr
62
from Products.CMFPlone.utils import safe_unicode
63
from Products.PlonePAS.tools.memberdata import MemberData
64
from Products.ZCatalog.interfaces import ICatalogBrain
65
from senaite.core.interfaces import ITemporaryObject
66
from zope import globalrequest
67
from zope.annotation.interfaces import IAttributeAnnotatable
68
from zope.component import getUtility
69
from zope.component import queryMultiAdapter
70
from zope.component.interfaces import IFactory
71
from zope.event import notify
72
from zope.interface import alsoProvides
73
from zope.interface import directlyProvides
74
from zope.interface import noLongerProvides
75
from zope.lifecycleevent import ObjectCreatedEvent
76
from zope.publisher.browser import TestRequest
77
from zope.schema import getFieldsInOrder
78
from zope.security.interfaces import Unauthorized
79
80
"""SENAITE LIMS Framework API
81
82
Please see bika.lims/docs/API.rst for documentation.
83
84
Architecural Notes:
85
86
Please add only functions that do a single thing for a single object.
87
88
Good: `def get_foo(brain_or_object)`
89
Bad:  `def get_foos(list_of_brain_objects)`
90
91
Why?
92
93
Because it makes things more complex. You can always use a pattern like this to
94
achieve the same::
95
96
    >>> foos = map(get_foo, list_of_brain_objects)
97
98
Please add for all of your functions a descriptive test in docs/API.rst.
99
100
Thanks.
101
"""
102
103
_marker = object()
104
105
UID_RX = re.compile("[a-z0-9]{32}$")
106
107
108
class APIError(Exception):
109
    """Base exception class for bika.lims errors."""
110
111
112
def get_portal():
113
    """Get the portal object
114
115
    :returns: Portal object
116
    """
117
    return ploneapi.portal.getSite()
118
119
120
def get_setup():
121
    """Fetch the `bika_setup` folder.
122
    """
123
    portal = get_portal()
124
    return portal.get("bika_setup")
125
126
127
def get_bika_setup():
128
    """Fetch the `bika_setup` folder.
129
    """
130
    return get_setup()
131
132
133
def get_senaite_setup():
134
    """Fetch the new DX `setup` folder.
135
    """
136
    portal = get_portal()
137
    return portal.get("setup")
138
139
140
def create(container, portal_type, *args, **kwargs):
141
    """Creates an object in Bika LIMS
142
143
    This code uses most of the parts from the TypesTool
144
    see: `Products.CMFCore.TypesTool._constructInstance`
145
146
    :param container: container
147
    :type container: ATContentType/DexterityContentType/CatalogBrain
148
    :param portal_type: The portal type to create, e.g. "Client"
149
    :type portal_type: string
150
    :param title: The title for the new content object
151
    :type title: string
152
    :returns: The new created object
153
    """
154
    from bika.lims.utils import tmpID
155
156
    id = kwargs.pop("id", tmpID())
157
    title = kwargs.pop("title", "New {}".format(portal_type))
158
159
    # get the fti
160
    types_tool = get_tool("portal_types")
161
    fti = types_tool.getTypeInfo(portal_type)
162
163
    if fti.product:
164
        # create the AT object
165
        obj = _createObjectByType(portal_type, container, id)
166
        # update the object with values
167
        edit(obj, check_permissions=False, title=title, **kwargs)
168
        # auto-id if required
169
        if obj._at_rename_after_creation:
170
            obj._renameAfterCreation(check_auto_id=True)
171
        # we are no longer under creation
172
        obj.unmarkCreationFlag()
173
        # notify that the object was created
174
        notify(ObjectInitializedEvent(obj))
175
    else:
176
        # newstyle factory
177
        factory = getUtility(IFactory, fti.factory)
178
        obj = factory(id, *args, **kwargs)
179
        if hasattr(obj, '_setPortalTypeName'):
180
            obj._setPortalTypeName(fti.getId())
181
        # set the title
182
        obj.title = safe_unicode(title)
183
        # notify that the object was created
184
        notify(ObjectCreatedEvent(obj))
185
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
186
        # ContainerModifiedEvent
187
        container._setObject(id, obj)
188
        # we get the object here with the current object id, as it might be
189
        # renamed already by an event handler
190
        obj = container._getOb(obj.getId())
191
192
    return obj
193
194
195
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
196
    """Creates a copy of the source object. If container is None, creates the
197
    copy inside the same container as the source. If portal_type is specified,
198
    creates a new object of this type, and copies the values from source fields
199
    to the destination object. Field values sent as kwargs have priority over
200
    the field values from source.
201
202
    :param source: object from which create a copy
203
    :type source: ATContentType/DexterityContentType/CatalogBrain
204
    :param container: destination container
205
    :type container: ATContentType/DexterityContentType/CatalogBrain
206
    :param portal_type: destination portal type
207
    :returns: The new created object
208
    """
209
    # Prevent circular dependencies
210
    from security import check_permission
211
    # Use same container as source unless explicitly set
212
    source = get_object(source)
213
    if not container:
214
        container = get_parent(source)
215
216
    # Use same portal type as source unless explicitly set
217
    if not portal_type:
218
        portal_type = get_portal_type(source)
219
220
    # Extend the fields to skip with defaults
221
    skip = kwargs.pop("skip", [])
222
    skip = set(skip)
223
    skip.update([
224
        "Products.Archetypes.Field.ComputedField",
225
        "UID",
226
        "id",
227
        "allowDiscussion",
228
        "contributors",
229
        "creation_date",
230
        "creators",
231
        "effectiveDate",
232
        "expirationDate",
233
        "language",
234
        "location",
235
        "modification_date",
236
        "rights",
237
        "subject",
238
    ])
239
    # Build a dict for complexity reduction
240
    skip = dict([(item, True) for item in skip])
241
242
    # Update kwargs with the field values to copy from source
243
    fields = get_fields(source)
244
    for field_name, field in fields.items():
245
        # Prioritize field values passed as kwargs
246
        if field_name in kwargs:
247
            continue
248
        # Skip framework internal fields by name
249
        if skip.get(field_name, False):
250
            continue
251
        # Skip fields of non-suitable types
252
        if hasattr(field, "getType") and skip.get(field.getType(), False):
253
            continue
254
        # Skip readonly fields
255
        if getattr(field, "readonly", False):
256
            continue
257
        # Skip non-readable fields
258
        perm = getattr(field, "read_permission", View)
259
        if perm and not check_permission(perm, source):
260
            continue
261
262
        # do not wake-up objects unnecessarily
263
        if hasattr(field, "getRaw"):
264
            field_value = field.getRaw(source)
265
        elif hasattr(field, "get_raw"):
266
            field_value = field.get_raw(source)
267
        elif hasattr(field, "getAccessor"):
268
            accessor = field.getAccessor(source)
269
            field_value = accessor()
270
        else:
271
            field_value = field.get(source)
272
273
        # Do a hard copy of value if mutable type
274
        if isinstance(field_value, (list, dict, set)):
275
            field_value = copy.deepcopy(field_value)
276
        kwargs.update({field_name: field_value})
277
278
    # Create a copy
279
    return create(container, portal_type, *args, **kwargs)
280
281
282
def edit(obj, check_permissions=True, **kwargs):
283
    """Updates the values of object fields with the new values passed-in
284
    """
285
    # Prevent circular dependencies
286
    from security import check_permission
287
    fields = get_fields(obj)
288
    for name, value in kwargs.items():
289
        field = fields.get(name, None)
290
        if not field:
291
            continue
292
293
        # cannot update readonly fields
294
        readonly = getattr(field, "readonly", False)
295
        if readonly:
296
            raise ValueError("Field '{}' is readonly".format(name))
297
298
        # check field writable permission
299
        if check_permissions:
300
            perm = getattr(field, "write_permission", ModifyPortalContent)
301
            if perm and not check_permission(perm, obj):
302
                raise Unauthorized("Field '{}' is not writeable".format(name))
303
304
        # Set the value
305
        if hasattr(field, "getMutator"):
306
            mutator = field.getMutator(obj)
307
            mapply(mutator, value)
308
        else:
309
            field.set(obj, value)
310
311
312
def get_tool(name, context=None, default=_marker):
313
    """Get a portal tool by name
314
315
    :param name: The name of the tool, e.g. `portal_catalog`
316
    :type name: string
317
    :param context: A portal object
318
    :type context: ATContentType/DexterityContentType/CatalogBrain
319
    :returns: Portal Tool
320
    """
321
322
    # Try first with the context
323
    if context is not None:
324
        try:
325
            context = get_object(context)
326
            return getToolByName(context, name)
327
        except (APIError, AttributeError) as e:
328
            # https://github.com/senaite/bika.lims/issues/396
329
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
330
                        "-> falling back to plone.api.portal.get_tool('{}')"
331
                        .format(repr(context), name, repr(e), name))
332
            return get_tool(name, default=default)
333
334
    # Try with the plone api
335
    try:
336
        return ploneapi.portal.get_tool(name)
337
    except InvalidParameterError:
338
        if default is not _marker:
339
            if isinstance(default, six.string_types):
340
                return get_tool(default)
341
            return default
342
        fail("No tool named '%s' found." % name)
343
344
345
def fail(msg=None):
346
    """API LIMS Error
347
    """
348
    if msg is None:
349
        msg = "Reason not given."
350
    raise APIError("{}".format(msg))
351
352
353
def is_object(brain_or_object):
354
    """Check if the passed in object is a supported portal content object
355
356
    :param brain_or_object: A single catalog brain or content object
357
    :type brain_or_object: Portal Object
358
    :returns: True if the passed in object is a valid portal content
359
    """
360
    if is_portal(brain_or_object):
361
        return True
362
    if is_supermodel(brain_or_object):
363
        return True
364
    if is_at_content(brain_or_object):
365
        return True
366
    if is_dexterity_content(brain_or_object):
367
        return True
368
    if is_brain(brain_or_object):
369
        return True
370
    return False
371
372
373
def get_object(brain_object_uid, default=_marker):
374
    """Get the full content object
375
376
    :param brain_object_uid: A catalog brain or content object or uid
377
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
378
    /CatalogBrain/basestring
379
    :returns: The full object
380
    """
381
    if is_uid(brain_object_uid):
382
        return get_object_by_uid(brain_object_uid)
383
    elif is_supermodel(brain_object_uid):
384
        return brain_object_uid.instance
385
    if not is_object(brain_object_uid):
386
        if default is _marker:
387
            fail("{} is not supported.".format(repr(brain_object_uid)))
388
        return default
389
    if is_brain(brain_object_uid):
390
        return brain_object_uid.getObject()
391
    return brain_object_uid
392
393
394
def is_portal(brain_or_object):
395
    """Checks if the passed in object is the portal root object
396
397
    :param brain_or_object: A single catalog brain or content object
398
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
399
    :returns: True if the object is the portal root object
400
    :rtype: bool
401
    """
402
    return ISiteRoot.providedBy(brain_or_object)
403
404
405
def is_brain(brain_or_object):
406
    """Checks if the passed in object is a portal catalog brain
407
408
    :param brain_or_object: A single catalog brain or content object
409
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
410
    :returns: True if the object is a catalog brain
411
    :rtype: bool
412
    """
413
    return ICatalogBrain.providedBy(brain_or_object)
414
415
416
def is_supermodel(brain_or_object):
417
    """Checks if the passed in object is a supermodel
418
419
    :param brain_or_object: A single catalog brain or content object
420
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
421
    :returns: True if the object is a catalog brain
422
    :rtype: bool
423
    """
424
    # avoid circular imports
425
    from senaite.app.supermodel.interfaces import ISuperModel
426
    return ISuperModel.providedBy(brain_or_object)
427
428
429
def is_dexterity_content(brain_or_object):
430
    """Checks if the passed in object is a dexterity content type
431
432
    :param brain_or_object: A single catalog brain or content object
433
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
434
    :returns: True if the object is a dexterity content type
435
    :rtype: bool
436
    """
437
    return IDexterityContent.providedBy(brain_or_object)
438
439
440
def is_at_content(brain_or_object):
441
    """Checks if the passed in object is an AT content type
442
443
    :param brain_or_object: A single catalog brain or content object
444
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
445
    :returns: True if the object is an AT content type
446
    :rtype: bool
447
    """
448
    return isinstance(brain_or_object, BaseObject)
449
450
451
def is_dx_type(portal_type):
452
    """Checks if the portal type is DX based
453
454
    :param portal_type: The portal type name to check
455
    :returns: True if the portal type is DX based
456
    """
457
    portal_types = get_tool("portal_types")
458
    fti = portal_types.getTypeInfo(portal_type)
459
    if fti.product:
460
        return False
461
    return True
462
463
464
def is_at_type(portal_type):
465
    """Checks if the portal type is AT based
466
467
    :param portal_type: The portal type name to check
468
    :returns: True if the portal type is AT based
469
    """
470
    return not is_dx_type(portal_type)
471
472
473
def is_folderish(brain_or_object):
474
    """Checks if the passed in object is folderish
475
476
    :param brain_or_object: A single catalog brain or content object
477
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
478
    :returns: True if the object is folderish
479
    :rtype: bool
480
    """
481
    if hasattr(brain_or_object, "is_folderish"):
482
        if callable(brain_or_object.is_folderish):
483
            return brain_or_object.is_folderish()
484
        return brain_or_object.is_folderish
485
    return IFolderish.providedBy(get_object(brain_or_object))
486
487
488
def get_portal_type(brain_or_object):
489
    """Get the portal type for this object
490
491
    :param brain_or_object: A single catalog brain or content object
492
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
493
    :returns: Portal type
494
    :rtype: string
495
    """
496
    if not is_object(brain_or_object):
497
        fail("{} is not supported.".format(repr(brain_or_object)))
498
    return brain_or_object.portal_type
499
500
501
def get_schema(brain_or_object):
502
    """Get the schema of the content
503
504
    :param brain_or_object: A single catalog brain or content object
505
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
506
    :returns: Schema object
507
    """
508
    obj = get_object(brain_or_object)
509
    if is_portal(obj):
510
        fail("get_schema can't return schema of portal root")
511
    if is_dexterity_content(obj):
512
        pt = get_tool("portal_types")
513
        fti = pt.getTypeInfo(obj.portal_type)
514
        return fti.lookupSchema()
515
    if is_at_content(obj):
516
        return obj.Schema()
517
    fail("{} has no Schema.".format(brain_or_object))
518
519
520
def get_behaviors(portal_type):
521
    """List all behaviors
522
523
    :param portal_type: DX portal type name
524
    """
525
    portal_types = get_tool("portal_types")
526
    fti = portal_types.getTypeInfo(portal_type)
527
    if fti.product:
528
        raise TypeError("Expected DX type, got AT type instead.")
529
    return fti.behaviors
530
531
532
def enable_behavior(portal_type, behavior_id):
533
    """Enable behavior
534
535
    :param portal_type: DX portal type name
536
    :param behavior_id: The behavior to enable
537
    """
538
    portal_types = get_tool("portal_types")
539
    fti = portal_types.getTypeInfo(portal_type)
540
    if fti.product:
541
        raise TypeError("Expected DX type, got AT type instead.")
542
543
    if behavior_id not in fti.behaviors:
544
        fti.behaviors += (behavior_id, )
545
        # invalidate schema cache
546
        notify(SchemaInvalidatedEvent(portal_type))
547
548
549
def disable_behavior(portal_type, behavior_id):
550
    """Disable behavior
551
552
    :param portal_type: DX portal type name
553
    :param behavior_id: The behavior to disable
554
    """
555
    portal_types = get_tool("portal_types")
556
    fti = portal_types.getTypeInfo(portal_type)
557
    if fti.product:
558
        raise TypeError("Expected DX type, got AT type instead.")
559
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
560
    # invalidate schema cache
561
    notify(SchemaInvalidatedEvent(portal_type))
562
563
564
def get_fields(brain_or_object):
565
    """Get a name to field mapping of the object
566
567
    :param brain_or_object: A single catalog brain or content object
568
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
569
    :returns: Mapping of name -> field
570
    :rtype: OrderedDict
571
    """
572
    obj = get_object(brain_or_object)
573
    schema = get_schema(obj)
574
    if is_dexterity_content(obj):
575
        # get the fields directly provided by the interface
576
        fields = getFieldsInOrder(schema)
577
        # append the fields coming from behaviors
578
        behavior_assignable = IBehaviorAssignable(obj)
579
        if behavior_assignable:
580
            behaviors = behavior_assignable.enumerateBehaviors()
581
            for behavior in behaviors:
582
                fields.extend(getFieldsInOrder(behavior.interface))
583
        return OrderedDict(fields)
584
    return OrderedDict(schema)
585
586
587
def get_id(brain_or_object):
588
    """Get the Plone ID for this object
589
590
    :param brain_or_object: A single catalog brain or content object
591
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
592
    :returns: Plone ID
593
    :rtype: string
594
    """
595
    if is_brain(brain_or_object):
596
        if base_hasattr(brain_or_object, "getId"):
597
            return brain_or_object.getId
598
        if base_hasattr(brain_or_object, "id"):
599
            return brain_or_object.id
600
    return get_object(brain_or_object).getId()
601
602
603
def get_title(brain_or_object):
604
    """Get the Title for this object
605
606
    :param brain_or_object: A single catalog brain or content object
607
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
608
    :returns: Title
609
    :rtype: string
610
    """
611
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
612
        return brain_or_object.Title
613
    return get_object(brain_or_object).Title()
614
615
616
def get_description(brain_or_object):
617
    """Get the Title for this object
618
619
    :param brain_or_object: A single catalog brain or content object
620
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
621
    :returns: Title
622
    :rtype: string
623
    """
624
    if is_brain(brain_or_object) \
625
            and base_hasattr(brain_or_object, "Description"):
626
        return brain_or_object.Description
627
    return get_object(brain_or_object).Description()
628
629
630
def get_uid(brain_or_object):
631
    """Get the Plone UID for this object
632
633
    :param brain_or_object: A single catalog brain or content object or an UID
634
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
635
    :returns: Plone UID
636
    :rtype: string
637
    """
638
    if is_uid(brain_or_object):
639
        return brain_or_object
640
    if is_portal(brain_or_object):
641
        return '0'
642
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
643
        return brain_or_object.UID
644
    return get_object(brain_or_object).UID()
645
646
647
def get_url(brain_or_object):
648
    """Get the absolute URL for this object
649
650
    :param brain_or_object: A single catalog brain or content object
651
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
652
    :returns: Absolute URL
653
    :rtype: string
654
    """
655
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
656
        return brain_or_object.getURL()
657
    return get_object(brain_or_object).absolute_url()
658
659
660
def get_icon(brain_or_object, html_tag=True):
661
    """Get the icon of the content object
662
663
    :param brain_or_object: A single catalog brain or content object
664
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
665
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
666
    :type html_tag: bool
667
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
668
    :rtype: string
669
    """
670
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
671
    # work for Contents coming from other catalogs than the
672
    # `portal_catalog`
673
    portal_types = get_tool("portal_types")
674
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
675
    icon = fti.getIcon()
676
    if not icon:
677
        return ""
678
    url = "%s/%s" % (get_url(get_portal()), icon)
679
    if not html_tag:
680
        return url
681
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
682
        url=url, title=get_title(brain_or_object))
683
    return tag
684
685
686
def get_object_by_uid(uid, default=_marker):
687
    """Find an object by a given UID
688
689
    :param uid: The UID of the object to find
690
    :type uid: string
691
    :returns: Found Object or None
692
    """
693
694
    # nothing to do here
695
    if not uid:
696
        if default is not _marker:
697
            return default
698
        fail("get_object_by_uid requires UID as first argument; got {} instead"
699
             .format(uid))
700
701
    # we defined the portal object UID to be '0'::
702
    if uid == '0':
703
        return get_portal()
704
705
    brain = get_brain_by_uid(uid)
706
707
    if brain is None:
708
        if default is not _marker:
709
            return default
710
        fail("No object found for UID {}".format(uid))
711
712
    return get_object(brain)
713
714
715
def get_brain_by_uid(uid, default=None):
716
    """Query a brain by a given UID
717
718
    :param uid: The UID of the object to find
719
    :type uid: string
720
    :returns: ZCatalog brain or None
721
    """
722
    if not is_uid(uid):
723
        return default
724
725
    # we try to find the object with the UID catalog
726
    uc = get_tool("uid_catalog")
727
728
    # try to find the object with the reference catalog first
729
    brains = uc(UID=uid)
730
    if len(brains) != 1:
731
        return default
732
    return brains[0]
733
734
735
def get_object_by_path(path, default=_marker):
736
    """Find an object by a given physical path or absolute_url
737
738
    :param path: The physical path of the object to find
739
    :type path: string
740
    :returns: Found Object or None
741
    """
742
743
    # nothing to do here
744
    if not path:
745
        if default is not _marker:
746
            return default
747
        fail("get_object_by_path first argument must be a path; {} received"
748
             .format(path))
749
750
    portal = get_portal()
751
    portal_path = get_path(portal)
752
    portal_url = get_url(portal)
753
754
    # ensure we have a physical path
755
    if path.startswith(portal_url):
756
        request = get_request()
757
        path = "/".join(request.physicalPathFromURL(path))
758
759
    if not path.startswith(portal_path):
760
        if default is not _marker:
761
            return default
762
        fail("Not a physical path inside the portal.")
763
764
    if path == portal_path:
765
        return portal
766
767
    return portal.restrictedTraverse(path, default)
768
769
770
def get_path(brain_or_object):
771
    """Calculate the physical path of this object
772
773
    :param brain_or_object: A single catalog brain or content object
774
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
775
    :returns: Physical path of the object
776
    :rtype: string
777
    """
778
    if is_brain(brain_or_object):
779
        return brain_or_object.getPath()
780
    return "/".join(get_object(brain_or_object).getPhysicalPath())
781
782
783
def get_parent_path(brain_or_object):
784
    """Calculate the physical parent path of this object
785
786
    :param brain_or_object: A single catalog brain or content object
787
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
788
    :returns: Physical path of the parent object
789
    :rtype: string
790
    """
791
    if is_portal(brain_or_object):
792
        return get_path(get_portal())
793
    if is_brain(brain_or_object):
794
        path = get_path(brain_or_object)
795
        return path.rpartition("/")[0]
796
    return get_path(get_object(brain_or_object).aq_parent)
797
798
799
def get_parent(brain_or_object, catalog_search=False):
800
    """Locate the parent object of the content/catalog brain
801
802
    The `catalog_search` switch uses the `portal_catalog` to do a search return
803
    a brain instead of the full parent object. However, if the search returned
804
    no results, it falls back to return the full parent object.
805
806
    :param brain_or_object: A single catalog brain or content object
807
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
808
    :param catalog_search: Use a catalog query to find the parent object
809
    :type catalog_search: bool
810
    :returns: parent object
811
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
812
    """
813
814
    if is_portal(brain_or_object):
815
        return get_portal()
816
817
    # Do a catalog search and return the brain
818
    if catalog_search:
819
        parent_path = get_parent_path(brain_or_object)
820
821
        # parent is the portal object
822
        if parent_path == get_path(get_portal()):
823
            return get_portal()
824
825
        # get the catalog tool
826
        pc = get_portal_catalog()
827
828
        # query for the parent path
829
        results = pc(path={
830
            "query": parent_path,
831
            "depth": 0})
832
833
        # No results fallback: return the parent object
834
        if not results:
835
            return get_object(brain_or_object).aq_parent
836
837
        # return the brain
838
        return results[0]
839
840
    return get_object(brain_or_object).aq_parent
841
842
843
def search(query, catalog=_marker):
844
    """Search for objects.
845
846
    :param query: A suitable search query.
847
    :type query: dict
848
    :param catalog: A single catalog id or a list of catalog ids
849
    :type catalog: str/list
850
    :returns: Search results
851
    :rtype: List of ZCatalog brains
852
    """
853
854
    # query needs to be a dictionary
855
    if not isinstance(query, dict):
856
        fail("Catalog query needs to be a dictionary")
857
858
    # Portal types to query
859
    portal_types = query.get("portal_type", [])
860
    # We want the portal_type as a list
861
    if not isinstance(portal_types, (tuple, list)):
862
        portal_types = [portal_types]
863
864
    # The catalogs used for the query
865
    catalogs = []
866
867
    # The user did **not** specify a catalog
868
    if catalog is _marker:
869
        # Find the registered catalogs for the queried portal types
870
        for portal_type in portal_types:
871
            # Just get the first registered/default catalog
872
            catalogs.append(get_catalogs_for(
873
                portal_type, default="portal_catalog")[0])
874
    else:
875
        # User defined catalogs
876
        if isinstance(catalog, (list, tuple)):
877
            catalogs.extend(map(get_tool, catalog))
878
        else:
879
            catalogs.append(get_tool(catalog))
880
881
    # Cleanup: Avoid duplicate catalogs
882
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
883
884
    # We only support **single** catalog queries
885
    if len(catalogs) > 1:
886
        fail("Multi Catalog Queries are not supported!")
887
888
    return catalogs[0](query)
889
890
891
def safe_getattr(brain_or_object, attr, default=_marker):
892
    """Return the attribute value
893
894
    :param brain_or_object: A single catalog brain or content object
895
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
896
    :param attr: Attribute name
897
    :type attr: str
898
    :returns: Attribute value
899
    :rtype: obj
900
    """
901
    try:
902
        value = getattr(brain_or_object, attr, _marker)
903
        if value is _marker:
904
            if default is not _marker:
905
                return default
906
            fail("Attribute '{}' not found.".format(attr))
907
        if callable(value):
908
            return value()
909
        return value
910
    except Unauthorized:
911
        if default is not _marker:
912
            return default
913
        fail("You are not authorized to access '{}' of '{}'.".format(
914
            attr, repr(brain_or_object)))
915
916
917
def get_portal_catalog():
918
    """Get the portal catalog tool
919
920
    :returns: Portal Catalog Tool
921
    """
922
    return get_tool("portal_catalog")
923
924
925
def get_review_history(brain_or_object, rev=True):
926
    """Get the review history for the given brain or context.
927
928
    :param brain_or_object: A single catalog brain or content object
929
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
930
    :returns: Workflow history
931
    :rtype: [{}, ...]
932
    """
933
    obj = get_object(brain_or_object)
934
    review_history = []
935
    try:
936
        workflow = get_tool("portal_workflow")
937
        review_history = workflow.getInfoFor(obj, 'review_history')
938
    except WorkflowException as e:
939
        message = str(e)
940
        logger.error("Cannot retrieve review_history on {}: {}".format(
941
            obj, message))
942
    if not isinstance(review_history, (list, tuple)):
943
        logger.error("get_review_history: expected list, received {}".format(
944
            review_history))
945
        review_history = []
946
947
    if isinstance(review_history, tuple):
948
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
949
        # tuple when "review_history" is passed in:
950
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
951
        #
952
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
953
        # Expression.StateChangeInfo, that always returns a list, except when no
954
        # review_history is found:
955
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
956
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
957
        review_history = list(review_history)
958
959
    if rev is True:
960
        review_history.reverse()
961
    return review_history
962
963
964
def get_revision_history(brain_or_object):
965
    """Get the revision history for the given brain or context.
966
967
    :param brain_or_object: A single catalog brain or content object
968
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
969
    :returns: Workflow history
970
    :rtype: obj
971
    """
972
    obj = get_object(brain_or_object)
973
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
974
    return chv.fullHistory()
975
976
977
def get_workflows_for(brain_or_object):
978
    """Get the assigned workflows for the given brain or context.
979
980
    Note: This function supports also the portal_type as parameter.
981
982
    :param brain_or_object: A single catalog brain or content object
983
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
984
    :returns: Assigned Workflows
985
    :rtype: tuple
986
    """
987
    workflow = ploneapi.portal.get_tool("portal_workflow")
988
    if isinstance(brain_or_object, six.string_types):
989
        return workflow.getChainFor(brain_or_object)
990
    obj = get_object(brain_or_object)
991
    return workflow.getChainFor(obj)
992
993
994
def get_workflow_status_of(brain_or_object, state_var="review_state"):
995
    """Get the current workflow status of the given brain or context.
996
997
    :param brain_or_object: A single catalog brain or content object
998
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
999
    :param state_var: The name of the state variable
1000
    :type state_var: string
1001
    :returns: Status
1002
    :rtype: str
1003
    """
1004
    # Try to get the state from the catalog brain first
1005
    if is_brain(brain_or_object):
1006
        if state_var in brain_or_object.schema():
1007
            return brain_or_object[state_var]
1008
1009
    # Retrieve the sate from the object
1010
    workflow = get_tool("portal_workflow")
1011
    obj = get_object(brain_or_object)
1012
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1013
1014
1015
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1016
    """Get the previous workflow status of the object
1017
1018
    :param brain_or_object: A single catalog brain or content object
1019
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1020
    :param skip: Workflow states to skip
1021
    :type skip: tuple/list
1022
    :returns: status
1023
    :rtype: str
1024
    """
1025
1026
    skip = isinstance(skip, (list, tuple)) and skip or []
1027
    history = get_review_history(brain_or_object)
1028
1029
    # Remove consecutive duplicates, some transitions might happen more than
1030
    # once consecutively (e.g. publish)
1031
    history = map(lambda i: i[0], groupby(history))
1032
1033
    for num, item in enumerate(history):
1034
        # skip the current history entry
1035
        if num == 0:
1036
            continue
1037
        status = item.get("review_state")
1038
        if status in skip:
1039
            continue
1040
        return status
1041
    return default
1042
1043
1044
def get_creation_date(brain_or_object):
1045
    """Get the creation date of the brain or object
1046
1047
    :param brain_or_object: A single catalog brain or content object
1048
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1049
    :returns: Creation date
1050
    :rtype: DateTime
1051
    """
1052
    created = getattr(brain_or_object, "created", None)
1053
    if created is None:
1054
        fail("Object {} has no creation date ".format(
1055
             repr(brain_or_object)))
1056
    if callable(created):
1057
        return created()
1058
    return created
1059
1060
1061
def get_modification_date(brain_or_object):
1062
    """Get the modification date of the brain or object
1063
1064
    :param brain_or_object: A single catalog brain or content object
1065
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1066
    :returns: Modification date
1067
    :rtype: DateTime
1068
    """
1069
    modified = getattr(brain_or_object, "modified", None)
1070
    if modified is None:
1071
        fail("Object {} has no modification date ".format(
1072
             repr(brain_or_object)))
1073
    if callable(modified):
1074
        return modified()
1075
    return modified
1076
1077
1078
def get_review_status(brain_or_object):
1079
    """Get the `review_state` of an object
1080
1081
    :param brain_or_object: A single catalog brain or content object
1082
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1083
    :returns: Value of the review_status variable
1084
    :rtype: String
1085
    """
1086
    if is_brain(brain_or_object):
1087
        return brain_or_object.review_state
1088
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1089
1090
1091
def is_active(brain_or_object):
1092
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1093
1094
    :param brain_or_object: A single catalog brain or content object
1095
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1096
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1097
    :rtype: bool
1098
    """
1099
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1100
        return False
1101
    return True
1102
1103
1104
def get_catalogs_for(brain_or_object, default="portal_catalog"):
1105
    """Get all registered catalogs for the given portal_type, catalog brain or
1106
    content object
1107
1108
    :param brain_or_object: The portal_type, a catalog brain or content object
1109
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1110
    :returns: List of supported catalogs
1111
    :rtype: list
1112
    """
1113
    archetype_tool = get_tool("archetype_tool", default=None)
1114
    if archetype_tool is None:
1115
        # return the default catalog
1116
        return [get_tool(default, default="portal_catalog")]
1117
1118
    catalogs = []
1119
1120
    # get the registered catalogs for portal_type
1121
    if is_object(brain_or_object):
1122
        catalogs = archetype_tool.getCatalogsByType(
1123
            get_portal_type(brain_or_object))
1124
    if isinstance(brain_or_object, six.string_types):
1125
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
1126
1127
    if not catalogs:
1128
        return [get_tool(default)]
1129
    return catalogs
1130
1131
1132
def get_transitions_for(brain_or_object):
1133
    """List available workflow transitions for all workflows
1134
1135
    :param brain_or_object: A single catalog brain or content object
1136
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1137
    :returns: All possible available and allowed transitions
1138
    :rtype: list[dict]
1139
    """
1140
    workflow = get_tool('portal_workflow')
1141
    transitions = []
1142
    instance = get_object(brain_or_object)
1143
    for wfid in get_workflows_for(brain_or_object):
1144
        wf = workflow[wfid]
1145
        tlist = wf.getTransitionsFor(instance)
1146
        transitions.extend([t for t in tlist if t not in transitions])
1147
    return transitions
1148
1149
1150
def do_transition_for(brain_or_object, transition):
1151
    """Performs a workflow transition for the passed in object.
1152
1153
    :param brain_or_object: A single catalog brain or content object
1154
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1155
    :returns: The object where the transtion was performed
1156
    """
1157
    if not isinstance(transition, six.string_types):
1158
        fail("Transition type needs to be string, got '%s'" % type(transition))
1159
    obj = get_object(brain_or_object)
1160
    try:
1161
        ploneapi.content.transition(obj, transition)
1162
    except ploneapi.exc.InvalidParameterError as e:
1163
        fail("Failed to perform transition '{}' on {}: {}".format(
1164
             transition, obj, str(e)))
1165
    return obj
1166
1167
1168
def get_roles_for_permission(permission, brain_or_object):
1169
    """Get a list of granted roles for the given permission on the object.
1170
1171
    :param brain_or_object: A single catalog brain or content object
1172
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1173
    :returns: Roles for the given Permission
1174
    :rtype: list
1175
    """
1176
    obj = get_object(brain_or_object)
1177
    allowed = set(rolesForPermissionOn(permission, obj))
1178
    return sorted(allowed)
1179
1180
1181
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1182
    """Checks if the passed in object is versionable.
1183
1184
    :param brain_or_object: A single catalog brain or content object
1185
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1186
    :returns: True if the object is versionable
1187
    :rtype: bool
1188
    """
1189
    pr = get_tool("portal_repository")
1190
    obj = get_object(brain_or_object)
1191
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1192
        and pr.isVersionable(obj)
1193
1194
1195
def get_version(brain_or_object):
1196
    """Get the version of the current object
1197
1198
    :param brain_or_object: A single catalog brain or content object
1199
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1200
    :returns: The current version of the object, or None if not available
1201
    :rtype: int or None
1202
    """
1203
    obj = get_object(brain_or_object)
1204
    if not is_versionable(obj):
1205
        return None
1206
    return getattr(aq_base(obj), "version_id", 0)
1207
1208
1209
def get_view(name, context=None, request=None, default=None):
1210
    """Get the view by name
1211
1212
    :param name: The name of the view
1213
    :type name: str
1214
    :param context: The context to query the view
1215
    :type context: ATContentType/DexterityContentType/CatalogBrain
1216
    :param request: The request to query the view
1217
    :type request: HTTPRequest object
1218
    :returns: HTTP Request
1219
    :rtype: Products.Five.metaclass View object
1220
    """
1221
    context = context or get_portal()
1222
    request = request or get_request() or None
1223
    view = queryMultiAdapter((get_object(context), request), name=name)
1224
    if view is None:
1225
        return default
1226
    return view
1227
1228
1229
def get_request():
1230
    """Get the global request object
1231
1232
    :returns: HTTP Request
1233
    :rtype: HTTPRequest object
1234
    """
1235
    return globalrequest.getRequest()
1236
1237
1238
def get_test_request():
1239
    """Get the TestRequest object
1240
    """
1241
    request = TestRequest()
1242
    directlyProvides(request, IAttributeAnnotatable)
1243
    return request
1244
1245
1246
def get_group(group_or_groupname):
1247
    """Return Plone Group
1248
1249
    :param group_or_groupname: Plone group or the name of the group
1250
    :type groupname:  GroupData/str
1251
    :returns: Plone GroupData
1252
    """
1253
    if not group_or_groupname:
1254
1255
        return None
1256
    if hasattr(group_or_groupname, "_getGroup"):
1257
        return group_or_groupname
1258
    gtool = get_tool("portal_groups")
1259
    return gtool.getGroupById(group_or_groupname)
1260
1261
1262
def get_user(user_or_username):
1263
    """Return Plone User
1264
1265
    :param user_or_username: Plone user or user id
1266
    :returns: Plone MemberData
1267
    """
1268
    user = None
1269
    if isinstance(user_or_username, MemberData):
1270
        user = user_or_username
1271
    if isinstance(user_or_username, six.string_types):
1272
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1273
    return user
1274
1275
1276
def get_user_properties(user_or_username):
1277
    """Return User Properties
1278
1279
    :param user_or_username: Plone group identifier
1280
    :returns: Plone MemberData
1281
    """
1282
    user = get_user(user_or_username)
1283
    if user is None:
1284
        return {}
1285
    if not callable(user.getUser):
1286
        return {}
1287
    out = {}
1288
    plone_user = user.getUser()
1289
    for sheet in plone_user.listPropertysheets():
1290
        ps = plone_user.getPropertysheet(sheet)
1291
        out.update(dict(ps.propertyItems()))
1292
    return out
1293
1294
1295
def get_users_by_roles(roles=None):
1296
    """Search Plone users by their roles
1297
1298
    :param roles: Plone role name or list of roles
1299
    :type roles:  list/str
1300
    :returns: List of Plone users having the role(s)
1301
    """
1302
    if not isinstance(roles, (tuple, list)):
1303
        roles = [roles]
1304
    mtool = get_tool("portal_membership")
1305
    return mtool.searchForMembers(roles=roles)
1306
1307
1308
def get_current_user():
1309
    """Returns the current logged in user
1310
1311
    :returns: Current User
1312
    """
1313
    return ploneapi.user.get_current()
1314
1315
1316
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1317
    """Returns the associated contact of a Plone user
1318
1319
    If the user passed in has no contact associated, return None.
1320
    The `contact_types` parameter filter the portal types for the search.
1321
1322
    :param: Plone user
1323
    :contact_types: List with the contact portal types to search
1324
    :returns: Contact associated to the Plone user or None
1325
    """
1326
    if not user:
1327
        return None
1328
1329
    query = {'portal_type': contact_types, 'getUsername': user.id}
1330
    brains = search(query, catalog='portal_catalog')
1331
    if not brains:
1332
        return None
1333
1334
    if len(brains) > 1:
1335
        # Oops, the user has multiple contacts assigned, return None
1336
        contacts = map(lambda c: c.Title, brains)
1337
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1338
        err_msg = err_msg.format(user.id, ','.join(contacts))
1339
        logger.error(err_msg)
1340
        return None
1341
1342
    return get_object(brains[0])
1343
1344
1345
def get_user_client(user_or_contact):
1346
    """Returns the client of the contact of a Plone user
1347
1348
    If the user passed in has no contact or does not belong to any client,
1349
    returns None.
1350
1351
    :param: Plone user or contact
1352
    :returns: Client the contact of the Plone user belongs to
1353
    """
1354
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1355
        # Lab contacts cannot belong to a client
1356
        return None
1357
1358
    if not IContact.providedBy(user_or_contact):
1359
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1360
        if IContact.providedBy(contact):
1361
            return get_user_client(contact)
1362
        return None
1363
1364
    client = get_parent(user_or_contact)
1365
    if client and IClient.providedBy(client):
1366
        return client
1367
1368
    return None
1369
1370
1371
def get_current_client():
1372
    """Returns the current client the current logged in user belongs to, if any
1373
1374
    :returns: Client the current logged in user belongs to or None
1375
    """
1376
    return get_user_client(get_current_user())
1377
1378
1379
def get_cache_key(brain_or_object):
1380
    """Generate a cache key for a common brain or object
1381
1382
    :param brain_or_object: A single catalog brain or content object
1383
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1384
    :returns: Cache Key
1385
    :rtype: str
1386
    """
1387
    key = [
1388
        get_portal_type(brain_or_object),
1389
        get_id(brain_or_object),
1390
        get_uid(brain_or_object),
1391
        # handle different domains gracefully
1392
        get_url(brain_or_object),
1393
        # Return the microsecond since the epoch in GMT
1394
        get_modification_date(brain_or_object).micros(),
1395
    ]
1396
    return "-".join(map(lambda x: str(x), key))
1397
1398
1399
def bika_cache_key_decorator(method, self, brain_or_object):
1400
    """Bika cache key decorator usable for
1401
1402
    :param brain_or_object: A single catalog brain or content object
1403
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1404
    :returns: Cache Key
1405
    :rtype: str
1406
    """
1407
    if brain_or_object is None:
1408
        raise DontCache
1409
    return get_cache_key(brain_or_object)
1410
1411
1412
def normalize_id(string):
1413
    """Normalize the id
1414
1415
    :param string: A string to normalize
1416
    :type string: str
1417
    :returns: Normalized ID
1418
    :rtype: str
1419
    """
1420
    if not isinstance(string, six.string_types):
1421
        fail("Type of argument must be string, found '{}'"
1422
             .format(type(string)))
1423
    # get the id nomalizer utility
1424
    normalizer = getUtility(IIDNormalizer).normalize
1425
    return normalizer(string)
1426
1427
1428
def normalize_filename(string):
1429
    """Normalize the filename
1430
1431
    :param string: A string to normalize
1432
    :type string: str
1433
    :returns: Normalized ID
1434
    :rtype: str
1435
    """
1436
    if not isinstance(string, six.string_types):
1437
        fail("Type of argument must be string, found '{}'"
1438
             .format(type(string)))
1439
    # get the file nomalizer utility
1440
    normalizer = getUtility(IFileNameNormalizer).normalize
1441
    return normalizer(string)
1442
1443
1444
def is_uid(uid, validate=False):
1445
    """Checks if the passed in uid is a valid UID
1446
1447
    :param uid: The uid to check
1448
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1449
    True, also verifies if a brain exists for the uid passed in
1450
    :type uid: string
1451
    :return: True if a valid uid
1452
    :rtype: bool
1453
    """
1454
    if not isinstance(uid, six.string_types):
1455
        return False
1456
    if uid == '0':
1457
        return True
1458
    if len(uid) != 32:
1459
        return False
1460
    if not UID_RX.match(uid):
1461
        return False
1462
    if not validate:
1463
        return True
1464
1465
    # Check if a brain for this uid exists
1466
    uc = get_tool('uid_catalog')
1467
    brains = uc(UID=uid)
1468
    if brains:
1469
        assert (len(brains) == 1)
1470
    return len(brains) > 0
1471
1472
1473
def is_date(date):
1474
    """Checks if the passed in value is a valid Zope's DateTime
1475
1476
    :param date: The date to check
1477
    :type date: DateTime
1478
    :return: True if a valid date
1479
    :rtype: bool
1480
    """
1481
    if not date:
1482
        return False
1483
    return isinstance(date, (DateTime, datetime))
1484
1485
1486
def to_date(value, default=None):
1487
    """Tries to convert the passed in value to Zope's DateTime
1488
1489
    :param value: The value to be converted to a valid DateTime
1490
    :type value: str, DateTime or datetime
1491
    :return: The DateTime representation of the value passed in or default
1492
    """
1493
    if isinstance(value, DateTime):
1494
        return value
1495
    if not value:
1496
        if default is None:
1497
            return None
1498
        return to_date(default)
1499
    try:
1500
        if isinstance(value, str) and '.' in value:
1501
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1502
            return DateTime(value, datefmt='international')
1503
        return DateTime(value)
1504
    except (TypeError, ValueError, DateTimeError):
1505
        return to_date(default)
1506
1507
1508
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1509
               round_to_int=True):
1510
    """Returns the computed total number of minutes
1511
    """
1512
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1513
        float(seconds)/60 + float(milliseconds)/1000/60
1514
    return int(round(total)) if round_to_int else total
1515
1516
1517
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1518
    """Returns a representation of time in a string in xd yh zm format
1519
    """
1520
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1521
                         seconds=seconds, milliseconds=milliseconds)
1522
    delta = timedelta(minutes=int(round(minutes)))
1523
    d = delta.days
1524
    h = delta.seconds // 3600
1525
    m = (delta.seconds // 60) % 60
1526
    m = m and "{}m ".format(str(m)) or ""
1527
    d = d and "{}d ".format(str(d)) or ""
1528
    if m and d:
1529
        h = "{}h ".format(str(h))
1530
    else:
1531
        h = h and "{}h ".format(str(h)) or ""
1532
    return "".join([d, h, m]).strip()
1533
1534
1535
def to_int(value, default=_marker):
1536
    """Tries to convert the value to int.
1537
    Truncates at the decimal point if the value is a float
1538
1539
    :param value: The value to be converted to an int
1540
    :return: The resulting int or default
1541
    """
1542
    if is_floatable(value):
1543
        value = to_float(value)
1544
    try:
1545
        return int(value)
1546
    except (TypeError, ValueError):
1547
        if default is None:
1548
            return default
1549
        if default is not _marker:
1550
            return to_int(default)
1551
        fail("Value %s cannot be converted to int" % repr(value))
1552
1553
1554
def is_floatable(value):
1555
    """Checks if the passed in value is a valid floatable number
1556
1557
    :param value: The value to be evaluated as a float number
1558
    :type value: str, float, int
1559
    :returns: True if is a valid float number
1560
    :rtype: bool"""
1561
    try:
1562
        float(value)
1563
        return True
1564
    except (TypeError, ValueError):
1565
        return False
1566
1567
1568
def to_float(value, default=_marker):
1569
    """Converts the passed in value to a float number
1570
1571
    :param value: The value to be converted to a floatable number
1572
    :type value: str, float, int
1573
    :returns: The float number representation of the passed in value
1574
    :rtype: float
1575
    """
1576
    if not is_floatable(value):
1577
        if default is not _marker:
1578
            return to_float(default)
1579
        fail("Value %s is not floatable" % repr(value))
1580
    return float(value)
1581
1582
1583
def float_to_string(value, default=_marker):
1584
    """Convert a float value to string without exponential notation
1585
1586
    This function preserves the whole fraction
1587
1588
    :param value: The float value to be converted to a string
1589
    :type value: str, float, int
1590
    :returns: String representation of the float w/o exponential notation
1591
    :rtype: str
1592
    """
1593
    if not is_floatable(value):
1594
        if default is not _marker:
1595
            return default
1596
        fail("Value %s is not floatable" % repr(value))
1597
1598
    # Leave floatable string values unchanged
1599
    if isinstance(value, six.string_types):
1600
        return value
1601
1602
    value = float(value)
1603
    str_value = str(value)
1604
1605
    if "." in str_value:
1606
        # might be something like 1.23e-26
1607
        front, back = str_value.split(".")
1608
    else:
1609
        # or 1e-07 for 0.0000001
1610
        back = str_value
1611
1612
    if "e-" in back:
1613
        fraction, zeros = back.split("e-")
1614
        # we want to cover the faction and the zeros
1615
        precision = len(fraction) + int(zeros)
1616
        template = "{:.%df}" % precision
1617
        str_value = template.format(value)
1618
    elif "e+" in back:
1619
        # positive numbers, e.g. 1e+16 don't need a fractional part
1620
        str_value = "{:.0f}".format(value)
1621
1622
    # cut off trailing zeros
1623
    if "." in str_value:
1624
        str_value = str_value.rstrip("0").rstrip(".")
1625
1626
    return str_value
1627
1628
1629
def to_searchable_text_metadata(value):
1630
    """Parse the given metadata value to searchable text
1631
1632
    :param value: The raw value of the metadata column
1633
    :returns: Searchable and translated unicode value or None
1634
    """
1635
    if not value:
1636
        return u""
1637
    if value is Missing.Value:
1638
        return u""
1639
    if is_uid(value):
1640
        return u""
1641
    if isinstance(value, (bool)):
1642
        return u""
1643
    if isinstance(value, (list, tuple)):
1644
        values = map(to_searchable_text_metadata, value)
1645
        values = filter(None, values)
1646
        return " ".join(values)
1647
    if isinstance(value, dict):
1648
        return to_searchable_text_metadata(value.values())
1649
    if is_date(value):
1650
        from senaite.core.api.dtime import date_to_string
1651
        return date_to_string(value, "%Y-%m-%d")
1652
    if is_at_content(value):
1653
        return to_searchable_text_metadata(get_title(value))
1654
    if not isinstance(value, six.string_types):
1655
        value = str(value)
1656
    return safe_unicode(value)
1657
1658
1659
def get_registry_record(name, default=None):
1660
    """Returns the value of a registry record
1661
1662
    :param name: [required] name of the registry record
1663
    :type name: str
1664
    :param default: The value returned if the record is not found
1665
    :type default: anything
1666
    :returns: value of the registry record
1667
    """
1668
    return ploneapi.portal.get_registry_record(name, default=default)
1669
1670
1671
def to_display_list(pairs, sort_by="key", allow_empty=True):
1672
    """Create a Plone DisplayList from list items
1673
1674
    :param pairs: list of key, value pairs
1675
    :param sort_by: Sort the items either by key or value
1676
    :param allow_empty: Allow to select an empty value
1677
    :returns: Plone DisplayList
1678
    """
1679
    dl = DisplayList()
1680
1681
    if isinstance(pairs, six.string_types):
1682
        pairs = [pairs, pairs]
1683
    for pair in pairs:
1684
        # pairs is a list of lists -> add each pair
1685
        if isinstance(pair, (tuple, list)):
1686
            dl.add(*pair)
1687
        # pairs is just a single pair -> add it and stop
1688
        if isinstance(pair, six.string_types):
1689
            dl.add(*pairs)
1690
            break
1691
1692
    # add the empty option
1693
    if allow_empty:
1694
        dl.add("", "")
1695
1696
    # sort by key/value
1697
    if sort_by == "key":
1698
        dl = dl.sortedByKey()
1699
    elif sort_by == "value":
1700
        dl = dl.sortedByValue()
1701
1702
    return dl
1703
1704
1705
def text_to_html(text, wrap="p", encoding="utf8"):
1706
    """Convert `\n` sequences in the text to HTML `\n`
1707
1708
    :param text: Plain text to convert
1709
    :param wrap: Toggle to wrap the text in a
1710
    :returns: HTML converted and encoded text
1711
    """
1712
    if not text:
1713
        return ""
1714
    # handle text internally as unicode
1715
    text = safe_unicode(text)
1716
    # replace newline characters with HTML entities
1717
    html = text.replace("\n", "<br/>")
1718
    if wrap:
1719
        html = u"<{tag}>{html}</{tag}>".format(
1720
            tag=wrap, html=html)
1721
    # return encoded html
1722
    return html.encode(encoding)
1723
1724
1725
def to_utf8(string, default=_marker):
1726
    """Encode string to UTF8
1727
1728
    :param string: String to be encoded to UTF8
1729
    :returns: UTF8 encoded string
1730
    """
1731
    if not isinstance(string, six.string_types):
1732
        if default is _marker:
1733
            fail("Expected string type, got '%s'" % type(string))
1734
        return default
1735
    return safe_unicode(string).encode("utf8")
1736
1737
1738
def is_temporary(obj):
1739
    """Returns whether the given object is temporary or not
1740
1741
    :param obj: the object to evaluate
1742
    :returns: True if the object is temporary
1743
    """
1744
    if ITemporaryObject.providedBy(obj):
1745
        return True
1746
1747
    obj_id = getattr(aq_base(obj), "id", None)
1748
    if obj_id is None or UID_RX.match(obj_id):
1749
        return True
1750
1751
    parent = aq_parent(aq_inner(obj))
1752
    if not parent:
1753
        return True
1754
1755
    parent_id = getattr(aq_base(parent), "id", None)
1756
    if parent_id is None or UID_RX.match(parent_id):
1757
        return True
1758
1759
    if is_at_content(obj):
1760
1761
        # Checks to see if we are created inside the portal_factory. We don't
1762
        # rely here on AT's isFactoryContained because the function is patched
1763
        meta_type = getattr(aq_base(parent), "meta_type", "")
1764
        return meta_type == "TempFolder"
1765
1766
    return False
1767
1768
1769
def mark_temporary(brain_or_object):
1770
    """Mark the object as temporary
1771
    """
1772
    obj = get_object(brain_or_object)
1773
    alsoProvides(obj, ITemporaryObject)
1774
1775
1776
def unmark_temporary(brain_or_object):
1777
    """Unmark the object as temporary
1778
    """
1779
    obj = get_object(brain_or_object)
1780
    noLongerProvides(obj, ITemporaryObject)
1781
1782
1783
def is_string(thing):
1784
    """Checks if the passed in object is a string type
1785
1786
    :param thing: object to test
1787
    :returns: True if the object is a string
1788
    """
1789
    return isinstance(thing, six.string_types)
1790